Compare commits
10 Commits
bc468d9140
...
e1f128a79c
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | e1f128a79c | |
Tyler Goodlet | d85f4fda57 | |
Tyler Goodlet | 52f135d85d | |
Tyler Goodlet | b29d2f7053 | |
Tyler Goodlet | 5960330413 | |
Tyler Goodlet | 727a2084d6 | |
Tyler Goodlet | f943ea0119 | |
Tyler Goodlet | cb2d2ed9d5 | |
Tyler Goodlet | 7b2543512c | |
Tyler Goodlet | 7d41492f53 |
|
@ -52,9 +52,14 @@ async def assert_state(value: bool):
|
||||||
assert _state == value
|
assert _state == value
|
||||||
|
|
||||||
|
|
||||||
def test_simple_contex():
|
@pytest.mark.parametrize(
|
||||||
|
'error_parent',
|
||||||
|
[False, True],
|
||||||
|
)
|
||||||
|
def test_simple_context(error_parent):
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
|
@ -74,10 +79,19 @@ def test_simple_contex():
|
||||||
# after cancellation
|
# after cancellation
|
||||||
await portal.run(assert_state, value=False)
|
await portal.run(assert_state, value=False)
|
||||||
|
|
||||||
|
if error_parent:
|
||||||
|
raise ValueError
|
||||||
|
|
||||||
# shut down daemon
|
# shut down daemon
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
trio.run(main)
|
if error_parent:
|
||||||
|
try:
|
||||||
|
trio.run(main)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -140,3 +140,81 @@ def test_dynamic_pub_sub():
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def one_task_streams_and_one_handles_reqresp(
|
||||||
|
|
||||||
|
ctx: tractor.Context,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
async def pingpong():
|
||||||
|
'''Run a simple req/response service.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async for msg in stream:
|
||||||
|
print('rpc server ping')
|
||||||
|
assert msg == 'ping'
|
||||||
|
print('rpc server pong')
|
||||||
|
await stream.send('pong')
|
||||||
|
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
n.start_soon(pingpong)
|
||||||
|
|
||||||
|
for _ in itertools.count():
|
||||||
|
await stream.send('yo')
|
||||||
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
def test_reqresp_ontopof_streaming():
|
||||||
|
'''Test a subactor that both streams with one task and
|
||||||
|
spawns another which handles a small requests-response
|
||||||
|
dialogue over the same bidir-stream.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
with trio.move_on_after(2):
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
# name of this actor will be same as target func
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'dual_tasks',
|
||||||
|
enable_modules=[__name__]
|
||||||
|
)
|
||||||
|
|
||||||
|
# flat to make sure we get at least one pong
|
||||||
|
got_pong: bool = False
|
||||||
|
|
||||||
|
async with portal.open_context(
|
||||||
|
one_task_streams_and_one_handles_reqresp,
|
||||||
|
|
||||||
|
) as (ctx, first):
|
||||||
|
|
||||||
|
assert first is None
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
await stream.send('ping')
|
||||||
|
|
||||||
|
async for msg in stream:
|
||||||
|
print(f'client received: {msg}')
|
||||||
|
|
||||||
|
assert msg in {'pong', 'yo'}
|
||||||
|
|
||||||
|
if msg == 'pong':
|
||||||
|
got_pong = True
|
||||||
|
await stream.send('ping')
|
||||||
|
print('client sent ping')
|
||||||
|
|
||||||
|
assert got_pong
|
||||||
|
|
||||||
|
try:
|
||||||
|
trio.run(main)
|
||||||
|
except trio.TooSlowError:
|
||||||
|
pass
|
||||||
|
|
|
@ -479,7 +479,7 @@ class Actor:
|
||||||
try:
|
try:
|
||||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
send_chan, recv_chan = trio.open_memory_channel(1000)
|
send_chan, recv_chan = trio.open_memory_channel(2*10)
|
||||||
send_chan.cid = cid # type: ignore
|
send_chan.cid = cid # type: ignore
|
||||||
recv_chan.cid = cid # type: ignore
|
recv_chan.cid = cid # type: ignore
|
||||||
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
||||||
|
@ -528,11 +528,14 @@ class Actor:
|
||||||
task_status.started(loop_cs)
|
task_status.started(loop_cs)
|
||||||
async for msg in chan:
|
async for msg in chan:
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Cancelling all tasks for {chan} from {chan.uid}")
|
f"Cancelling all tasks for {chan} from {chan.uid}")
|
||||||
for (channel, cid) in self._rpc_tasks:
|
|
||||||
|
for (channel, cid) in self._rpc_tasks.copy():
|
||||||
if channel is chan:
|
if channel is chan:
|
||||||
await self._cancel_task(cid, channel)
|
await self._cancel_task(cid, channel)
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Msg loop signalled to terminate for"
|
f"Msg loop signalled to terminate for"
|
||||||
f" {chan} from {chan.uid}")
|
f" {chan} from {chan.uid}")
|
||||||
|
|
|
@ -359,6 +359,7 @@ class Portal:
|
||||||
fn_mod_path, fn_name = func_deats(func)
|
fn_mod_path, fn_name = func_deats(func)
|
||||||
|
|
||||||
|
|
||||||
|
recv_chan: trio.ReceiveMemoryChannel = None
|
||||||
try:
|
try:
|
||||||
cid, recv_chan, functype, first_msg = await self._submit(
|
cid, recv_chan, functype, first_msg = await self._submit(
|
||||||
fn_mod_path, fn_name, kwargs)
|
fn_mod_path, fn_name, kwargs)
|
||||||
|
@ -390,7 +391,8 @@ class Portal:
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
await recv_chan.aclose()
|
if recv_chan is not None:
|
||||||
|
await recv_chan.aclose()
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class LocalPortal:
|
class LocalPortal:
|
||||||
|
|
|
@ -339,6 +339,7 @@ async def new_proc(
|
||||||
bind_addr=bind_addr,
|
bind_addr=bind_addr,
|
||||||
parent_addr=parent_addr,
|
parent_addr=parent_addr,
|
||||||
_runtime_vars=_runtime_vars,
|
_runtime_vars=_runtime_vars,
|
||||||
|
infect_asyncio=infect_asyncio,
|
||||||
task_status=task_status,
|
task_status=task_status,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -354,6 +355,7 @@ async def mp_new_proc(
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
@ -12,7 +12,7 @@ import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._state import current_actor, is_main_process
|
from ._state import current_actor, is_main_process, is_root_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -263,6 +263,26 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
"to complete"
|
"to complete"
|
||||||
)
|
)
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
|
||||||
|
if is_root_process() and (
|
||||||
|
type(err) in {
|
||||||
|
Exception, trio.MultiError, trio.Cancelled
|
||||||
|
}
|
||||||
|
):
|
||||||
|
# if we error in the root but the debugger is
|
||||||
|
# engaged we don't want to prematurely kill (and
|
||||||
|
# thus clobber access to) the local tty streams.
|
||||||
|
# instead try to wait for pdb to be released before
|
||||||
|
# tearing down.
|
||||||
|
debug_complete = _debug._pdb_complete
|
||||||
|
if debug_complete and not debug_complete.is_set():
|
||||||
|
log.warning(
|
||||||
|
"Root has errored but pdb is active..waiting "
|
||||||
|
"on debug lock")
|
||||||
|
await _debug._pdb_complete.wait()
|
||||||
|
|
||||||
|
# raise
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
# worry more are coming).
|
# worry more are coming).
|
||||||
|
@ -377,26 +397,11 @@ async def open_nursery(
|
||||||
async with open_root_actor(**kwargs) as actor:
|
async with open_root_actor(**kwargs) as actor:
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
try:
|
# try:
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor
|
actor
|
||||||
) as anursery:
|
) as anursery:
|
||||||
yield anursery
|
yield anursery
|
||||||
|
|
||||||
except (Exception, trio.MultiError, trio.Cancelled):
|
|
||||||
# if we error in the root but the debugger is
|
|
||||||
# engaged we don't want to prematurely kill (and
|
|
||||||
# thus clobber access to) the local tty streams.
|
|
||||||
# instead try to wait for pdb to be released before
|
|
||||||
# tearing down.
|
|
||||||
if not _debug._pdb_complete.is_set():
|
|
||||||
log.warning(
|
|
||||||
"Root has errored but pdb is active..waiting "
|
|
||||||
"on debug lock")
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await _debug._pdb_complete.wait()
|
|
||||||
|
|
||||||
raise
|
|
||||||
|
|
||||||
else: # sub-nursery case
|
else: # sub-nursery case
|
||||||
|
|
||||||
|
|
|
@ -101,14 +101,16 @@ def _run_asyncio_task(
|
||||||
"""Cancel the calling ``trio`` task on error.
|
"""Cancel the calling ``trio`` task on error.
|
||||||
"""
|
"""
|
||||||
nonlocal aio_err
|
nonlocal aio_err
|
||||||
aio_err = task.exception()
|
try:
|
||||||
|
aio_err = task.exception()
|
||||||
|
except asyncio.CancelledError as cerr:
|
||||||
|
aio_err = cerr
|
||||||
|
|
||||||
if aio_err:
|
if aio_err:
|
||||||
log.exception(f"asyncio task errorred:\n{aio_err}")
|
log.exception(f"asyncio task errorred:\n{aio_err}")
|
||||||
|
|
||||||
# cancel_scope.cancel()
|
# cancel_scope.cancel()
|
||||||
from_aio._err = aio_err
|
from_aio._err = aio_err
|
||||||
to_trio.close()
|
|
||||||
|
|
||||||
task.add_done_callback(cancel_trio)
|
task.add_done_callback(cancel_trio)
|
||||||
|
|
||||||
|
@ -233,17 +235,25 @@ async def run_task(
|
||||||
# raise aio_err
|
# raise aio_err
|
||||||
|
|
||||||
# Do we need this?
|
# Do we need this?
|
||||||
except BaseException as err:
|
except Exception as err:
|
||||||
# await tractor.breakpoint()
|
# await tractor.breakpoint()
|
||||||
aio_err = from_aio._err
|
aio_err = from_aio._err
|
||||||
|
|
||||||
|
# try:
|
||||||
if aio_err is not None:
|
if aio_err is not None:
|
||||||
# always raise from any captured asyncio error
|
# always raise from any captured asyncio error
|
||||||
raise err from aio_err
|
raise err from aio_err
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
# finally:
|
||||||
|
# if not task.done():
|
||||||
|
# task.cancel()
|
||||||
|
|
||||||
finally:
|
except trio.Cancelled:
|
||||||
task.cancel()
|
if not task.done():
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
# async def stream_from_task
|
# async def stream_from_task
|
||||||
|
|
Loading…
Reference in New Issue