Compare commits

...

1 Commits

Author SHA1 Message Date
Tyler Goodlet 4c82b6e94f TOAMMEND: Add exit per-side task exit and cancellation signals 2025-03-01 21:25:05 -05:00
3 changed files with 681 additions and 231 deletions

View File

@ -66,6 +66,8 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
''' '''
async def main(): async def main():
# with trio.fail_after(1):
with trio.fail_after(999):
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr] registry_addrs=[reg_addr]
) as n: ) as n:
@ -250,6 +252,7 @@ def test_context_spawns_aio_task_that_errors(
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
p = await n.start_actor( p = await n.start_actor(
'aio_daemon', 'aio_daemon',
@ -342,7 +345,8 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# NOTE: normally the `an.__aexit__()` waits on the # NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here # portal's result but we do it explicitly here
# to avoid indent levels. # to avoid indent levels.
with trio.fail_after(1): # with trio.fail_after(1):
with trio.fail_after(999):
await p.wait_for_result() await p.wait_for_result()
with pytest.raises( with pytest.raises(
@ -353,11 +357,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# might get multiple `trio.Cancelled`s as well inside an inception # might get multiple `trio.Cancelled`s as well inside an inception
err: RemoteActorError|ExceptionGroup = excinfo.value err: RemoteActorError|ExceptionGroup = excinfo.value
if isinstance(err, ExceptionGroup): if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile( excs = err.exceptions
lambda exc: not isinstance(exc, tractor.RemoteActorError), assert len(excs) == 1
err.exceptions final_exc = excs[0]
)) assert isinstance(final_exc, tractor.RemoteActorError)
assert err
# relayed boxed error should be our `trio`-task's # relayed boxed error should be our `trio`-task's
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`. # cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
@ -370,15 +373,18 @@ async def no_to_trio_in_args():
async def push_from_aio_task( async def push_from_aio_task(
sequence: Iterable, sequence: Iterable,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
expect_cancel: False, expect_cancel: False,
fail_early: bool, fail_early: bool,
exit_early: bool,
) -> None: ) -> None:
try: try:
# print('trying breakpoint')
# breakpoint()
# sync caller ctx manager # sync caller ctx manager
to_trio.send_nowait(True) to_trio.send_nowait(True)
@ -387,10 +393,26 @@ async def push_from_aio_task(
to_trio.send_nowait(i) to_trio.send_nowait(i)
await asyncio.sleep(0.001) await asyncio.sleep(0.001)
if i == 50 and fail_early: if (
i == 50
):
if fail_early:
raise Exception raise Exception
print('asyncio streamer complete!') if exit_early:
# TODO? really you could enforce the same
# SC-proto we use for actors here with asyncio
# such that a Return[None] msg would be
# implicitly delivered to the trio side?
#
# XXX => this might be the end-all soln for
# converting any-inter-task system (regardless
# of maybe-remote runtime or language) to be
# SC-compat no?
print(f'asyncio breaking early @ {i!r}')
break
print('asyncio streaming complete!')
except asyncio.CancelledError: except asyncio.CancelledError:
if not expect_cancel: if not expect_cancel:
@ -402,9 +424,10 @@ async def push_from_aio_task(
async def stream_from_aio( async def stream_from_aio(
exit_early: bool = False, trio_exit_early: bool = False,
raise_err: bool = False, raise_err: bool = False,
aio_raise_err: bool = False, aio_raise_err: bool = False,
aio_exit_early: bool = False,
fan_out: bool = False, fan_out: bool = False,
) -> None: ) -> None:
@ -417,8 +440,9 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
push_from_aio_task, push_from_aio_task,
sequence=seq, sequence=seq,
expect_cancel=raise_err or exit_early, expect_cancel=raise_err or trio_exit_early,
fail_early=aio_raise_err, fail_early=aio_raise_err,
exit_early=aio_exit_early,
) as (first, chan): ) as (first, chan):
@ -437,7 +461,7 @@ async def stream_from_aio(
if value == 50: if value == 50:
if raise_err: if raise_err:
raise Exception raise Exception
elif exit_early: elif trio_exit_early:
print('`consume()` breaking early!\n') print('`consume()` breaking early!\n')
break break
@ -472,9 +496,13 @@ async def stream_from_aio(
finally: finally:
if ( if (
not raise_err and not raise_err
not exit_early and and
not trio_exit_early
and
not aio_raise_err not aio_raise_err
and
not aio_exit_early
): ):
if fan_out: if fan_out:
# we get double the pulled values in the # we get double the pulled values in the
@ -484,6 +512,7 @@ async def stream_from_aio(
assert list(sorted(pulled)) == expect assert list(sorted(pulled)) == expect
else: else:
# await tractor.pause()
assert pulled == expect assert pulled == expect
else: else:
assert not fan_out assert not fan_out
@ -530,37 +559,103 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
excinfo.value.boxed_type is Exception excinfo.value.boxed_type is Exception
def test_trio_closes_early_and_channel_exits( def test_trio_closes_early_causes_aio_checkpoint_raise(
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
): ):
''' '''
Check that if the `trio`-task "exits early" on `async for`ing the Check that if the `trio`-task "exits early and silently" (in this
inter-task-channel (via a `break`) we exit silently from the case during `async for`-ing the inter-task-channel via
`open_channel_from()` block and get a final `Return[None]` msg. a `break`-from-loop), we raise `TrioTaskExited` on the
`asyncio`-side which also then bubbles up through the
`open_channel_from()` block indicating that the `asyncio.Task`
hit a ran another checkpoint despite the `trio.Task` exit.
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery( async with tractor.open_nursery(
# debug_mode=True, # debug_mode=True,
# enable_stack_on_sig=True, # enable_stack_on_sig=True,
) as n: ) as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(
stream_from_aio, stream_from_aio,
exit_early=True, trio_exit_early=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should raise RAE diectly # should raise RAE diectly
print('waiting on final infected subactor result..') print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result() res: None = await portal.wait_for_result()
assert res is None assert res is None
print('infected subactor returned result: {res!r}\n') print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit # should be a quiet exit on a simple channel exit
trio.run( with pytest.raises(RemoteActorError) as excinfo:
main, trio.run(main)
# strict_exception_groups=False,
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
def test_aio_exits_early_relays_AsyncioTaskExited(
# TODO, parametrize the 3 possible trio side conditions:
# - trio blocking on receive, aio exits early
# - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int],
):
'''
Check that if the `asyncio`-task "exits early and silently" (in this
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
it `break`s from the loop), we raise `AsyncioTaskExited` on the
`trio`-side which then DOES NOT BUBBLE up through the
`open_channel_from()` block UNLESS,
- the trio.Task also errored/cancelled, in which case we wrap
both errors in an eg
- the trio.Task was blocking on rxing a value from the
`InterLoopTaskChannel`.
'''
async def main():
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery(
# debug_mode=True,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
trio_exit_early=False,
aio_exit_early=True,
) )
# should raise RAE diectly
print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
exc = excinfo.value
# TODO, wow bug!
# -[ ] bp handler not replaced!?!?
# breakpoint()
# import pdbp; pdbp.set_trace()
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(reg_addr): def test_aio_errors_and_channel_propagates_and_closes(reg_addr):

View File

@ -82,6 +82,39 @@ class InternalError(RuntimeError):
''' '''
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
class AsyncioTaskExited(Exception):
'''
asyncio.Task "exited" translation error for use with the
`to_asyncio` APIs to be raised in the `trio` side task indicating
on `.run_task()`/`.open_channel_from()` exit that the aio side
exited early/silently.
'''
class TrioTaskExited(AsyncioCancelled):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
This is very similar to how `trio.ClosedResource` acts as
a "clean shutdown" signal to the consumer side of a mem-chan,
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
'''
# NOTE: more or less should be close to these: # NOTE: more or less should be close to these:
# 'boxed_type', # 'boxed_type',
@ -127,8 +160,8 @@ _body_fields: list[str] = list(
def get_err_type(type_name: str) -> BaseException|None: def get_err_type(type_name: str) -> BaseException|None:
''' '''
Look up an exception type by name from the set of locally Look up an exception type by name from the set of locally known
known namespaces: namespaces:
- `builtins` - `builtins`
- `tractor._exceptions` - `tractor._exceptions`
@ -358,6 +391,13 @@ class RemoteActorError(Exception):
self._ipc_msg.src_type_str self._ipc_msg.src_type_str
) )
if not self._src_type:
raise TypeError(
f'Failed to lookup src error type with '
f'`tractor._exceptions.get_err_type()` :\n'
f'{self.src_type_str}'
)
return self._src_type return self._src_type
@property @property
@ -652,16 +692,10 @@ class RemoteActorError(Exception):
failing actor's remote env. failing actor's remote env.
''' '''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder # TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends: # metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611 # https://github.com/python-trio/trio/issues/611
src_type_ref: Type[BaseException] = self.src_type
return src_type_ref(self.tb_str) return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given # TODO: local recontruction of nested inception for a given

File diff suppressed because it is too large Load Diff