Compare commits
No commits in common. "e1f128a79cd7d4487987ee1d50c96d47cabcbdc5" and "bc468d9140edf29f6c286e12a393b52c6477f057" have entirely different histories.
e1f128a79c
...
bc468d9140
|
@ -52,14 +52,9 @@ async def assert_state(value: bool):
|
||||||
assert _state == value
|
assert _state == value
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
def test_simple_contex():
|
||||||
'error_parent',
|
|
||||||
[False, True],
|
|
||||||
)
|
|
||||||
def test_simple_context(error_parent):
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
|
@ -79,18 +74,9 @@ def test_simple_context(error_parent):
|
||||||
# after cancellation
|
# after cancellation
|
||||||
await portal.run(assert_state, value=False)
|
await portal.run(assert_state, value=False)
|
||||||
|
|
||||||
if error_parent:
|
|
||||||
raise ValueError
|
|
||||||
|
|
||||||
# shut down daemon
|
# shut down daemon
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
if error_parent:
|
|
||||||
try:
|
|
||||||
trio.run(main)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -140,81 +140,3 @@ def test_dynamic_pub_sub():
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def one_task_streams_and_one_handles_reqresp(
|
|
||||||
|
|
||||||
ctx: tractor.Context,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
await ctx.started()
|
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
|
||||||
|
|
||||||
async def pingpong():
|
|
||||||
'''Run a simple req/response service.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async for msg in stream:
|
|
||||||
print('rpc server ping')
|
|
||||||
assert msg == 'ping'
|
|
||||||
print('rpc server pong')
|
|
||||||
await stream.send('pong')
|
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
|
||||||
n.start_soon(pingpong)
|
|
||||||
|
|
||||||
for _ in itertools.count():
|
|
||||||
await stream.send('yo')
|
|
||||||
await trio.sleep(0.01)
|
|
||||||
|
|
||||||
|
|
||||||
def test_reqresp_ontopof_streaming():
|
|
||||||
'''Test a subactor that both streams with one task and
|
|
||||||
spawns another which handles a small requests-response
|
|
||||||
dialogue over the same bidir-stream.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def main():
|
|
||||||
|
|
||||||
with trio.move_on_after(2):
|
|
||||||
async with tractor.open_nursery() as n:
|
|
||||||
|
|
||||||
# name of this actor will be same as target func
|
|
||||||
portal = await n.start_actor(
|
|
||||||
'dual_tasks',
|
|
||||||
enable_modules=[__name__]
|
|
||||||
)
|
|
||||||
|
|
||||||
# flat to make sure we get at least one pong
|
|
||||||
got_pong: bool = False
|
|
||||||
|
|
||||||
async with portal.open_context(
|
|
||||||
one_task_streams_and_one_handles_reqresp,
|
|
||||||
|
|
||||||
) as (ctx, first):
|
|
||||||
|
|
||||||
assert first is None
|
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
|
||||||
|
|
||||||
await stream.send('ping')
|
|
||||||
|
|
||||||
async for msg in stream:
|
|
||||||
print(f'client received: {msg}')
|
|
||||||
|
|
||||||
assert msg in {'pong', 'yo'}
|
|
||||||
|
|
||||||
if msg == 'pong':
|
|
||||||
got_pong = True
|
|
||||||
await stream.send('ping')
|
|
||||||
print('client sent ping')
|
|
||||||
|
|
||||||
assert got_pong
|
|
||||||
|
|
||||||
try:
|
|
||||||
trio.run(main)
|
|
||||||
except trio.TooSlowError:
|
|
||||||
pass
|
|
||||||
|
|
|
@ -479,7 +479,7 @@ class Actor:
|
||||||
try:
|
try:
|
||||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
send_chan, recv_chan = trio.open_memory_channel(2*10)
|
send_chan, recv_chan = trio.open_memory_channel(1000)
|
||||||
send_chan.cid = cid # type: ignore
|
send_chan.cid = cid # type: ignore
|
||||||
recv_chan.cid = cid # type: ignore
|
recv_chan.cid = cid # type: ignore
|
||||||
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
||||||
|
@ -528,14 +528,11 @@ class Actor:
|
||||||
task_status.started(loop_cs)
|
task_status.started(loop_cs)
|
||||||
async for msg in chan:
|
async for msg in chan:
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Cancelling all tasks for {chan} from {chan.uid}")
|
f"Cancelling all tasks for {chan} from {chan.uid}")
|
||||||
|
for (channel, cid) in self._rpc_tasks:
|
||||||
for (channel, cid) in self._rpc_tasks.copy():
|
|
||||||
if channel is chan:
|
if channel is chan:
|
||||||
await self._cancel_task(cid, channel)
|
await self._cancel_task(cid, channel)
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Msg loop signalled to terminate for"
|
f"Msg loop signalled to terminate for"
|
||||||
f" {chan} from {chan.uid}")
|
f" {chan} from {chan.uid}")
|
||||||
|
|
|
@ -359,7 +359,6 @@ class Portal:
|
||||||
fn_mod_path, fn_name = func_deats(func)
|
fn_mod_path, fn_name = func_deats(func)
|
||||||
|
|
||||||
|
|
||||||
recv_chan: trio.ReceiveMemoryChannel = None
|
|
||||||
try:
|
try:
|
||||||
cid, recv_chan, functype, first_msg = await self._submit(
|
cid, recv_chan, functype, first_msg = await self._submit(
|
||||||
fn_mod_path, fn_name, kwargs)
|
fn_mod_path, fn_name, kwargs)
|
||||||
|
@ -391,7 +390,6 @@ class Portal:
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if recv_chan is not None:
|
|
||||||
await recv_chan.aclose()
|
await recv_chan.aclose()
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
|
@ -339,7 +339,6 @@ async def new_proc(
|
||||||
bind_addr=bind_addr,
|
bind_addr=bind_addr,
|
||||||
parent_addr=parent_addr,
|
parent_addr=parent_addr,
|
||||||
_runtime_vars=_runtime_vars,
|
_runtime_vars=_runtime_vars,
|
||||||
infect_asyncio=infect_asyncio,
|
|
||||||
task_status=task_status,
|
task_status=task_status,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -355,7 +354,6 @@ async def mp_new_proc(
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
infect_asyncio: bool = False,
|
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
@ -12,7 +12,7 @@ import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._state import current_actor, is_main_process, is_root_process
|
from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -263,26 +263,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
"to complete"
|
"to complete"
|
||||||
)
|
)
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
|
||||||
if is_root_process() and (
|
|
||||||
type(err) in {
|
|
||||||
Exception, trio.MultiError, trio.Cancelled
|
|
||||||
}
|
|
||||||
):
|
|
||||||
# if we error in the root but the debugger is
|
|
||||||
# engaged we don't want to prematurely kill (and
|
|
||||||
# thus clobber access to) the local tty streams.
|
|
||||||
# instead try to wait for pdb to be released before
|
|
||||||
# tearing down.
|
|
||||||
debug_complete = _debug._pdb_complete
|
|
||||||
if debug_complete and not debug_complete.is_set():
|
|
||||||
log.warning(
|
|
||||||
"Root has errored but pdb is active..waiting "
|
|
||||||
"on debug lock")
|
|
||||||
await _debug._pdb_complete.wait()
|
|
||||||
|
|
||||||
# raise
|
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
# worry more are coming).
|
# worry more are coming).
|
||||||
|
@ -397,12 +377,27 @@ async def open_nursery(
|
||||||
async with open_root_actor(**kwargs) as actor:
|
async with open_root_actor(**kwargs) as actor:
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
# try:
|
try:
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor
|
actor
|
||||||
) as anursery:
|
) as anursery:
|
||||||
yield anursery
|
yield anursery
|
||||||
|
|
||||||
|
except (Exception, trio.MultiError, trio.Cancelled):
|
||||||
|
# if we error in the root but the debugger is
|
||||||
|
# engaged we don't want to prematurely kill (and
|
||||||
|
# thus clobber access to) the local tty streams.
|
||||||
|
# instead try to wait for pdb to be released before
|
||||||
|
# tearing down.
|
||||||
|
if not _debug._pdb_complete.is_set():
|
||||||
|
log.warning(
|
||||||
|
"Root has errored but pdb is active..waiting "
|
||||||
|
"on debug lock")
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await _debug._pdb_complete.wait()
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
else: # sub-nursery case
|
else: # sub-nursery case
|
||||||
|
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
|
|
@ -101,16 +101,14 @@ def _run_asyncio_task(
|
||||||
"""Cancel the calling ``trio`` task on error.
|
"""Cancel the calling ``trio`` task on error.
|
||||||
"""
|
"""
|
||||||
nonlocal aio_err
|
nonlocal aio_err
|
||||||
try:
|
|
||||||
aio_err = task.exception()
|
aio_err = task.exception()
|
||||||
except asyncio.CancelledError as cerr:
|
|
||||||
aio_err = cerr
|
|
||||||
|
|
||||||
if aio_err:
|
if aio_err:
|
||||||
log.exception(f"asyncio task errorred:\n{aio_err}")
|
log.exception(f"asyncio task errorred:\n{aio_err}")
|
||||||
|
|
||||||
# cancel_scope.cancel()
|
# cancel_scope.cancel()
|
||||||
from_aio._err = aio_err
|
from_aio._err = aio_err
|
||||||
|
to_trio.close()
|
||||||
|
|
||||||
task.add_done_callback(cancel_trio)
|
task.add_done_callback(cancel_trio)
|
||||||
|
|
||||||
|
@ -235,26 +233,18 @@ async def run_task(
|
||||||
# raise aio_err
|
# raise aio_err
|
||||||
|
|
||||||
# Do we need this?
|
# Do we need this?
|
||||||
except Exception as err:
|
except BaseException as err:
|
||||||
# await tractor.breakpoint()
|
# await tractor.breakpoint()
|
||||||
aio_err = from_aio._err
|
aio_err = from_aio._err
|
||||||
|
|
||||||
# try:
|
|
||||||
if aio_err is not None:
|
if aio_err is not None:
|
||||||
# always raise from any captured asyncio error
|
# always raise from any captured asyncio error
|
||||||
raise err from aio_err
|
raise err from aio_err
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
# finally:
|
|
||||||
# if not task.done():
|
|
||||||
# task.cancel()
|
|
||||||
|
|
||||||
except trio.Cancelled:
|
finally:
|
||||||
if not task.done():
|
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
# async def stream_from_task
|
# async def stream_from_task
|
||||||
# pass
|
# pass
|
||||||
|
|
Loading…
Reference in New Issue