TOAMMEND: Add exit per-side task exit and cancellation signals

Tyler Goodlet 2025-03-01 21:25:05 -05:00
parent 71562e0af7
commit 4c82b6e94f
3 changed files with 681 additions and 231 deletions

View File

@ -66,13 +66,15 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
infect_asyncio=True,
)
# with trio.fail_after(1):
with trio.fail_after(999):
async with tractor.open_nursery(
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
infect_asyncio=True,
)
trio.run(main)
@ -250,6 +252,7 @@ def test_context_spawns_aio_task_that_errors(
'''
async def main():
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_daemon',
@ -342,7 +345,8 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here
# to avoid indent levels.
with trio.fail_after(1):
# with trio.fail_after(1):
with trio.fail_after(999):
await p.wait_for_result()
with pytest.raises(
@ -353,11 +357,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# might get multiple `trio.Cancelled`s as well inside an inception
err: RemoteActorError|ExceptionGroup = excinfo.value
if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
err.exceptions
))
assert err
excs = err.exceptions
assert len(excs) == 1
final_exc = excs[0]
assert isinstance(final_exc, tractor.RemoteActorError)
# relayed boxed error should be our `trio`-task's
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
@ -370,15 +373,18 @@ async def no_to_trio_in_args():
async def push_from_aio_task(
sequence: Iterable,
to_trio: trio.abc.SendChannel,
expect_cancel: False,
fail_early: bool,
exit_early: bool,
) -> None:
try:
# print('trying breakpoint')
# breakpoint()
# sync caller ctx manager
to_trio.send_nowait(True)
@ -387,10 +393,26 @@ async def push_from_aio_task(
to_trio.send_nowait(i)
await asyncio.sleep(0.001)
if i == 50 and fail_early:
raise Exception
if (
i == 50
):
if fail_early:
raise Exception
print('asyncio streamer complete!')
if exit_early:
# TODO? really you could enforce the same
# SC-proto we use for actors here with asyncio
# such that a Return[None] msg would be
# implicitly delivered to the trio side?
#
# XXX => this might be the end-all soln for
# converting any-inter-task system (regardless
# of maybe-remote runtime or language) to be
# SC-compat no?
print(f'asyncio breaking early @ {i!r}')
break
print('asyncio streaming complete!')
except asyncio.CancelledError:
if not expect_cancel:
@ -402,9 +424,10 @@ async def push_from_aio_task(
async def stream_from_aio(
exit_early: bool = False,
trio_exit_early: bool = False,
raise_err: bool = False,
aio_raise_err: bool = False,
aio_exit_early: bool = False,
fan_out: bool = False,
) -> None:
@ -417,8 +440,9 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from(
push_from_aio_task,
sequence=seq,
expect_cancel=raise_err or exit_early,
expect_cancel=raise_err or trio_exit_early,
fail_early=aio_raise_err,
exit_early=aio_exit_early,
) as (first, chan):
@ -437,7 +461,7 @@ async def stream_from_aio(
if value == 50:
if raise_err:
raise Exception
elif exit_early:
elif trio_exit_early:
print('`consume()` breaking early!\n')
break
@ -472,9 +496,13 @@ async def stream_from_aio(
finally:
if (
not raise_err and
not exit_early and
not raise_err
and
not trio_exit_early
and
not aio_raise_err
and
not aio_exit_early
):
if fan_out:
# we get double the pulled values in the
@ -484,6 +512,7 @@ async def stream_from_aio(
assert list(sorted(pulled)) == expect
else:
# await tractor.pause()
assert pulled == expect
else:
assert not fan_out
@ -530,37 +559,103 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
excinfo.value.boxed_type is Exception
def test_trio_closes_early_and_channel_exits(
def test_trio_closes_early_causes_aio_checkpoint_raise(
reg_addr: tuple[str, int],
):
'''
Check that if the `trio`-task "exits early" on `async for`ing the
inter-task-channel (via a `break`) we exit silently from the
`open_channel_from()` block and get a final `Return[None]` msg.
Check that if the `trio`-task "exits early and silently" (in this
case during `async for`-ing the inter-task-channel via
a `break`-from-loop), we raise `TrioTaskExited` on the
`asyncio`-side which also then bubbles up through the
`open_channel_from()` block indicating that the `asyncio.Task`
hit a ran another checkpoint despite the `trio.Task` exit.
'''
async def main():
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery(
# debug_mode=True,
# enable_stack_on_sig=True,
) as n:
portal = await n.run_in_actor(
stream_from_aio,
exit_early=True,
trio_exit_early=True,
infect_asyncio=True,
)
# should raise RAE diectly
print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print('infected subactor returned result: {res!r}\n')
print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit
trio.run(
main,
# strict_exception_groups=False,
)
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
def test_aio_exits_early_relays_AsyncioTaskExited(
# TODO, parametrize the 3 possible trio side conditions:
# - trio blocking on receive, aio exits early
# - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int],
):
'''
Check that if the `asyncio`-task "exits early and silently" (in this
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
it `break`s from the loop), we raise `AsyncioTaskExited` on the
`trio`-side which then DOES NOT BUBBLE up through the
`open_channel_from()` block UNLESS,
- the trio.Task also errored/cancelled, in which case we wrap
both errors in an eg
- the trio.Task was blocking on rxing a value from the
`InterLoopTaskChannel`.
'''
async def main():
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery(
# debug_mode=True,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
trio_exit_early=False,
aio_exit_early=True,
)
# should raise RAE diectly
print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
exc = excinfo.value
# TODO, wow bug!
# -[ ] bp handler not replaced!?!?
# breakpoint()
# import pdbp; pdbp.set_trace()
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):

View File

@ -82,6 +82,39 @@ class InternalError(RuntimeError):
'''
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
class AsyncioTaskExited(Exception):
'''
asyncio.Task "exited" translation error for use with the
`to_asyncio` APIs to be raised in the `trio` side task indicating
on `.run_task()`/`.open_channel_from()` exit that the aio side
exited early/silently.
'''
class TrioTaskExited(AsyncioCancelled):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
This is very similar to how `trio.ClosedResource` acts as
a "clean shutdown" signal to the consumer side of a mem-chan,
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
'''
# NOTE: more or less should be close to these:
# 'boxed_type',
@ -127,8 +160,8 @@ _body_fields: list[str] = list(
def get_err_type(type_name: str) -> BaseException|None:
'''
Look up an exception type by name from the set of locally
known namespaces:
Look up an exception type by name from the set of locally known
namespaces:
- `builtins`
- `tractor._exceptions`
@ -358,6 +391,13 @@ class RemoteActorError(Exception):
self._ipc_msg.src_type_str
)
if not self._src_type:
raise TypeError(
f'Failed to lookup src error type with '
f'`tractor._exceptions.get_err_type()` :\n'
f'{self.src_type_str}'
)
return self._src_type
@property
@ -652,16 +692,10 @@ class RemoteActorError(Exception):
failing actor's remote env.
'''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611
src_type_ref: Type[BaseException] = self.src_type
return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given

File diff suppressed because it is too large Load Diff