diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index d462f59..7787756 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -32,6 +32,17 @@ from tractor.trionics import BroadcastReceiver from tractor._testing import expect_ctxc +@pytest.fixture( + scope='module', + # autouse=True, +) +def delay(debug_mode: bool) -> int: + if debug_mode: + return 999 + else: + return 1 + + async def sleep_and_err( sleep_for: float = 0.1, @@ -59,20 +70,24 @@ async def trio_cancels_single_aio_task(): await tractor.to_asyncio.run_task(aio_sleep_forever) -def test_trio_cancels_aio_on_actor_side(reg_addr): +def test_trio_cancels_aio_on_actor_side( + reg_addr: tuple[str, int], + delay: int, +): ''' Spawn an infected actor that is cancelled by the ``trio`` side task using std cancel scope apis. ''' async def main(): - async with tractor.open_nursery( - registry_addrs=[reg_addr] - ) as n: - await n.run_in_actor( - trio_cancels_single_aio_task, - infect_asyncio=True, - ) + with trio.fail_after(1 + delay): + async with tractor.open_nursery( + registry_addrs=[reg_addr] + ) as n: + await n.run_in_actor( + trio_cancels_single_aio_task, + infect_asyncio=True, + ) trio.run(main) @@ -116,7 +131,9 @@ async def asyncio_actor( raise -def test_aio_simple_error(reg_addr): +def test_aio_simple_error( + reg_addr: tuple[str, int], +): ''' Verify a simple remote asyncio error propagates back through trio to the parent actor. @@ -153,7 +170,9 @@ def test_aio_simple_error(reg_addr): assert err.boxed_type is AssertionError -def test_tractor_cancels_aio(reg_addr): +def test_tractor_cancels_aio( + reg_addr: tuple[str, int], +): ''' Verify we can cancel a spawned asyncio task gracefully. @@ -172,7 +191,9 @@ def test_tractor_cancels_aio(reg_addr): trio.run(main) -def test_trio_cancels_aio(reg_addr): +def test_trio_cancels_aio( + reg_addr: tuple[str, int], +): ''' Much like the above test with ``tractor.Portal.cancel_actor()`` except we just use a standard ``trio`` cancellation api. @@ -203,7 +224,8 @@ async def trio_ctx( # this will block until the ``asyncio`` task sends a "first" # message. - with trio.fail_after(2): + delay: int = 999 if tractor.debug_mode() else 1 + with trio.fail_after(1 + delay): try: async with ( trio.open_nursery( @@ -239,7 +261,8 @@ async def trio_ctx( ids='parent_actor_cancels_child={}'.format ) def test_context_spawns_aio_task_that_errors( - reg_addr, + reg_addr: tuple[str, int], + delay: int, parent_cancels: bool, ): ''' @@ -249,7 +272,7 @@ def test_context_spawns_aio_task_that_errors( ''' async def main(): - with trio.fail_after(2): + with trio.fail_after(1 + delay): async with tractor.open_nursery() as n: p = await n.start_actor( 'aio_daemon', @@ -322,11 +345,12 @@ async def aio_cancel(): def test_aio_cancelled_from_aio_causes_trio_cancelled( reg_addr: tuple, + delay: int, ): ''' - When the `asyncio.Task` cancels itself the `trio` side cshould + When the `asyncio.Task` cancels itself the `trio` side should also cancel and teardown and relay the cancellation cross-process - to the caller (parent). + to the parent caller. ''' async def main(): @@ -342,7 +366,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( # NOTE: normally the `an.__aexit__()` waits on the # portal's result but we do it explicitly here # to avoid indent levels. - with trio.fail_after(1): + with trio.fail_after(1 + delay): await p.wait_for_result() with pytest.raises( @@ -353,11 +377,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( # might get multiple `trio.Cancelled`s as well inside an inception err: RemoteActorError|ExceptionGroup = excinfo.value if isinstance(err, ExceptionGroup): - err = next(itertools.dropwhile( - lambda exc: not isinstance(exc, tractor.RemoteActorError), - err.exceptions - )) - assert err + excs = err.exceptions + assert len(excs) == 1 + final_exc = excs[0] + assert isinstance(final_exc, tractor.RemoteActorError) # relayed boxed error should be our `trio`-task's # cancel-signal-proxy-equivalent of `asyncio.CancelledError`. @@ -370,15 +393,18 @@ async def no_to_trio_in_args(): async def push_from_aio_task( - sequence: Iterable, to_trio: trio.abc.SendChannel, expect_cancel: False, fail_early: bool, + exit_early: bool, ) -> None: try: + # print('trying breakpoint') + # breakpoint() + # sync caller ctx manager to_trio.send_nowait(True) @@ -387,10 +413,27 @@ async def push_from_aio_task( to_trio.send_nowait(i) await asyncio.sleep(0.001) - if i == 50 and fail_early: - raise Exception + if ( + i == 50 + ): + if fail_early: + print('Raising exc from aio side!') + raise Exception - print('asyncio streamer complete!') + if exit_early: + # TODO? really you could enforce the same + # SC-proto we use for actors here with asyncio + # such that a Return[None] msg would be + # implicitly delivered to the trio side? + # + # XXX => this might be the end-all soln for + # converting any-inter-task system (regardless + # of maybe-remote runtime or language) to be + # SC-compat no? + print(f'asyncio breaking early @ {i!r}') + break + + print('asyncio streaming complete!') except asyncio.CancelledError: if not expect_cancel: @@ -402,9 +445,10 @@ async def push_from_aio_task( async def stream_from_aio( - exit_early: bool = False, - raise_err: bool = False, + trio_exit_early: bool = False, + trio_raise_err: bool = False, aio_raise_err: bool = False, + aio_exit_early: bool = False, fan_out: bool = False, ) -> None: @@ -417,8 +461,17 @@ async def stream_from_aio( async with to_asyncio.open_channel_from( push_from_aio_task, sequence=seq, - expect_cancel=raise_err or exit_early, + expect_cancel=trio_raise_err or trio_exit_early, fail_early=aio_raise_err, + exit_early=aio_exit_early, + + # such that we can test exit early cases + # for each side explicitly. + suppress_graceful_exits=(not( + aio_exit_early + or + trio_exit_early + )) ) as (first, chan): @@ -435,9 +488,9 @@ async def stream_from_aio( pulled.append(value) if value == 50: - if raise_err: + if trio_raise_err: raise Exception - elif exit_early: + elif trio_exit_early: print('`consume()` breaking early!\n') break @@ -471,10 +524,14 @@ async def stream_from_aio( finally: - if ( - not raise_err and - not exit_early and - not aio_raise_err + if not ( + trio_raise_err + or + trio_exit_early + or + aio_raise_err + or + aio_exit_early ): if fan_out: # we get double the pulled values in the @@ -484,6 +541,7 @@ async def stream_from_aio( assert list(sorted(pulled)) == expect else: + # await tractor.pause() assert pulled == expect else: assert not fan_out @@ -497,7 +555,10 @@ async def stream_from_aio( 'fan_out', [False, True], ids='fan_out_w_chan_subscribe={}'.format ) -def test_basic_interloop_channel_stream(reg_addr, fan_out): +def test_basic_interloop_channel_stream( + reg_addr: tuple[str, int], + fan_out: bool, +): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -517,7 +578,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr): async with tractor.open_nursery() as n: portal = await n.run_in_actor( stream_from_aio, - raise_err=True, + trio_raise_err=True, infect_asyncio=True, ) # should trigger remote actor error @@ -530,42 +591,114 @@ def test_trio_error_cancels_intertask_chan(reg_addr): excinfo.value.boxed_type is Exception -def test_trio_closes_early_and_channel_exits( +def test_trio_closes_early_causes_aio_checkpoint_raise( reg_addr: tuple[str, int], + delay: int, ): ''' - Check that if the `trio`-task "exits early" on `async for`ing the - inter-task-channel (via a `break`) we exit silently from the - `open_channel_from()` block and get a final `Return[None]` msg. + Check that if the `trio`-task "exits early and silently" (in this + case during `async for`-ing the inter-task-channel via + a `break`-from-loop), we raise `TrioTaskExited` on the + `asyncio`-side which also then bubbles up through the + `open_channel_from()` block indicating that the `asyncio.Task` + hit a ran another checkpoint despite the `trio.Task` exit. ''' async def main(): - with trio.fail_after(2): + with trio.fail_after(1 + delay): async with tractor.open_nursery( # debug_mode=True, # enable_stack_on_sig=True, ) as n: portal = await n.run_in_actor( stream_from_aio, - exit_early=True, + trio_exit_early=True, infect_asyncio=True, ) # should raise RAE diectly print('waiting on final infected subactor result..') res: None = await portal.wait_for_result() assert res is None - print('infected subactor returned result: {res!r}\n') + print(f'infected subactor returned result: {res!r}\n') # should be a quiet exit on a simple channel exit - trio.run( - main, - # strict_exception_groups=False, - ) + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure remote error is an explicit `AsyncioCancelled` sub-type + # which indicates to the aio task that the trio side exited + # silently WITHOUT raising a `trio.Cancelled` (which would + # normally be raised instead as a `AsyncioCancelled`). + excinfo.value.boxed_type is to_asyncio.TrioTaskExited -def test_aio_errors_and_channel_propagates_and_closes(reg_addr): +def test_aio_exits_early_relays_AsyncioTaskExited( + # TODO, parametrize the 3 possible trio side conditions: + # - trio blocking on receive, aio exits early + # - trio cancelled AND aio exits early on its next tick + # - trio errors AND aio exits early on its next tick + reg_addr: tuple[str, int], + debug_mode: bool, + delay: int, +): + ''' + Check that if the `asyncio`-task "exits early and silently" (in this + case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel` + it `break`s from the loop), we raise `AsyncioTaskExited` on the + `trio`-side which then DOES NOT BUBBLE up through the + `open_channel_from()` block UNLESS, + + - the trio.Task also errored/cancelled, in which case we wrap + both errors in an eg + - the trio.Task was blocking on rxing a value from the + `InterLoopTaskChannel`. + + ''' async def main(): - async with tractor.open_nursery() as n: + with trio.fail_after(1 + delay): + async with tractor.open_nursery( + debug_mode=debug_mode, + # enable_stack_on_sig=True, + ) as an: + portal = await an.run_in_actor( + stream_from_aio, + infect_asyncio=True, + trio_exit_early=False, + aio_exit_early=True, + ) + # should raise RAE diectly + print('waiting on final infected subactor result..') + res: None = await portal.wait_for_result() + assert res is None + print(f'infected subactor returned result: {res!r}\n') + + # should be a quiet exit on a simple channel exit + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + exc = excinfo.value + + # TODO, wow bug! + # -[ ] bp handler not replaced!?!? + # breakpoint() + + # import pdbp; pdbp.set_trace() + + # ensure remote error is an explicit `AsyncioCancelled` sub-type + # which indicates to the aio task that the trio side exited + # silently WITHOUT raising a `trio.Cancelled` (which would + # normally be raised instead as a `AsyncioCancelled`). + assert exc.boxed_type is to_asyncio.AsyncioTaskExited + + +def test_aio_errors_and_channel_propagates_and_closes( + reg_addr: tuple[str, int], + debug_mode: bool, +): + async def main(): + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as n: portal = await n.run_in_actor( stream_from_aio, aio_raise_err=True, @@ -852,6 +985,8 @@ def test_sigint_closes_lifetime_stack( ''' async def main(): + + delay = 999 if tractor.debug_mode() else 1 try: an: tractor.ActorNursery async with tractor.open_nursery( @@ -902,7 +1037,7 @@ def test_sigint_closes_lifetime_stack( if wait_for_ctx: print('waiting for ctx outcome in parent..') try: - with trio.fail_after(1): + with trio.fail_after(1 + delay): await ctx.wait_for_result() except tractor.ContextCancelled as ctxc: assert ctxc.canceller == ctx.chan.uid diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index b4386db..3382be1 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -82,6 +82,39 @@ class InternalError(RuntimeError): ''' +class AsyncioCancelled(Exception): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + NOTE: this should NOT inherit from `asyncio.CancelledError` or + tests should break! + + ''' + + +class AsyncioTaskExited(Exception): + ''' + asyncio.Task "exited" translation error for use with the + `to_asyncio` APIs to be raised in the `trio` side task indicating + on `.run_task()`/`.open_channel_from()` exit that the aio side + exited early/silently. + + ''' + +class TrioTaskExited(AsyncioCancelled): + ''' + The `trio`-side task exited without explicitly cancelling the + `asyncio.Task` peer. + + This is very similar to how `trio.ClosedResource` acts as + a "clean shutdown" signal to the consumer side of a mem-chan, + + https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels + + ''' + # NOTE: more or less should be close to these: # 'boxed_type', @@ -127,8 +160,8 @@ _body_fields: list[str] = list( def get_err_type(type_name: str) -> BaseException|None: ''' - Look up an exception type by name from the set of locally - known namespaces: + Look up an exception type by name from the set of locally known + namespaces: - `builtins` - `tractor._exceptions` @@ -358,6 +391,13 @@ class RemoteActorError(Exception): self._ipc_msg.src_type_str ) + if not self._src_type: + raise TypeError( + f'Failed to lookup src error type with ' + f'`tractor._exceptions.get_err_type()` :\n' + f'{self.src_type_str}' + ) + return self._src_type @property @@ -652,16 +692,10 @@ class RemoteActorError(Exception): failing actor's remote env. ''' - src_type_ref: Type[BaseException] = self.src_type - if not src_type_ref: - raise TypeError( - 'Failed to lookup src error type:\n' - f'{self.src_type_str}' - ) - # TODO: better tb insertion and all the fancier dunder # metadata stuff as per `.__context__` etc. and friends: # https://github.com/python-trio/trio/issues/611 + src_type_ref: Type[BaseException] = self.src_type return src_type_ref(self.tb_str) # TODO: local recontruction of nested inception for a given diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 75dfb5c..f65cc7e 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -20,7 +20,12 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' from __future__ import annotations import asyncio -from asyncio.exceptions import CancelledError +from asyncio.exceptions import ( + CancelledError, +) +from asyncio import ( + QueueShutDown, +) from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect @@ -34,12 +39,18 @@ from typing import ( import tractor from tractor._exceptions import ( + InternalError, is_multi_cancelled, + TrioTaskExited, + TrioCancelled, + AsyncioTaskExited, + AsyncioCancelled, ) from tractor._state import ( debug_mode, _runtime_vars, ) +from tractor._context import Unresolved from tractor.devx import _debug from tractor.log import ( get_logger, @@ -69,6 +80,21 @@ __all__ = [ ] +# TODO, generally speaking we can generalize this abstraction, a "SC linked +# parent->child task pair", as the same "supervision scope primitive" +# **that is** our `._context.Context` with the only difference being +# in how the tasks conduct msg-passing comms. +# +# For `LinkedTaskChannel` we are passing the equivalent of (once you +# include all the recently added `._trio/aio_to_raise` +# exd-as-signals) our SC-dialog-proto over each asyncIO framework's +# mem-chan impl, +# +# verus in `Context` +# +# We are doing the same thing but msg-passing comms happens over an +# IPC transport between tasks in different memory domains. + @dataclass class LinkedTaskChannel( trio.abc.Channel, @@ -84,18 +110,85 @@ class LinkedTaskChannel( ''' _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel + _to_trio: trio.MemorySendChannel _trio_cs: trio.CancelScope _trio_task: trio.Task _aio_task_complete: trio.Event + _suppress_graceful_exits: bool = True + _trio_err: BaseException|None = None + _trio_to_raise: ( + AsyncioTaskExited| # aio task exits while trio ongoing + AsyncioCancelled| # aio task is (self-)cancelled + BaseException| + None + ) = None _trio_exited: bool = False - # set after ``asyncio.create_task()`` - # _aio_first: Any|None = None + # set after `asyncio.create_task()` _aio_task: asyncio.Task|None = None _aio_err: BaseException|None = None + _aio_to_raise: ( + TrioTaskExited| # trio task exits while aio ongoing + BaseException| + None + ) = None + # _aio_first: Any|None = None # TODO? + _aio_result: Any|Unresolved = Unresolved + + def _final_result_is_set(self) -> bool: + return self._aio_result is not Unresolved + + # TODO? equiv from `Context`? + # @property + # def has_outcome(self) -> bool: + # return ( + # bool(self.maybe_error) + # or + # self._final_result_is_set() + # ) + + async def wait_for_result( + self, + hide_tb: bool = True, + + ) -> Any: + ''' + Wait for the `asyncio.Task.result()` from `trio` + + ''' + __tracebackhide__: bool = hide_tb + assert self._portal, ( + '`Context.wait_for_result()` can not be called from callee side!' + ) + if self._final_result_is_set(): + return self._aio_result + + async with translate_aio_errors( + chan=self, + wait_aio_task=False, + ): + await self._aio_task_complete.wait() + + if ( + not self._final_result_is_set() + ): + if (trio_to_raise := self._trio_to_raise): + raise trio_to_raise from self._aio_err + + elif aio_err := self._aio_err: + raise aio_err + + else: + raise InternalError( + f'Asyncio-task has no result or error set !?\n' + f'{self._aio_task}' + ) + + return self._aio_result + _broadcaster: BroadcastReceiver|None = None async def aclose(self) -> None: @@ -137,7 +230,9 @@ class LinkedTaskChannel( return await self._from_aio.receive() except BaseException as err: async with translate_aio_errors( - self, + chan=self, + # NOTE, determined by `open_channel_from()` input arg + suppress_graceful_exits=self._suppress_graceful_exits, # XXX: obviously this will deadlock if an on-going stream is # being procesed. @@ -154,8 +249,9 @@ class LinkedTaskChannel( ''' self._to_aio.put_nowait(item) - async def wait_aio_complete(self) -> None: - await self._aio_task_complete.wait() + # TODO? needed? + # async def wait_aio_complete(self) -> None: + # await self._aio_task_complete.wait() def cancel_asyncio_task( self, @@ -208,6 +304,7 @@ def _run_asyncio_task( *, qsize: int = 1, provide_channels: bool = False, + suppress_graceful_exits: bool = True, hide_tb: bool = False, **kwargs, @@ -260,6 +357,7 @@ def _run_asyncio_task( _trio_cs=trio_cs, _trio_task=trio_task, _aio_task_complete=aio_task_complete, + _suppress_graceful_exits=suppress_graceful_exits, ) async def wait_on_coro_final_result( @@ -269,17 +367,16 @@ def _run_asyncio_task( ) -> None: ''' - Await `coro` and relay result back to `trio`. - - This can only be run as an `asyncio.Task`! + Await input `coro` as/in an `asyncio.Task` and deliver final + `return`-ed result back to `trio`. ''' - nonlocal aio_err nonlocal chan orig = result = id(coro) try: - result = await coro + result: Any = await coro + chan._aio_result = result except BaseException as aio_err: chan._aio_err = aio_err if isinstance(aio_err, CancelledError): @@ -291,7 +388,6 @@ def _run_asyncio_task( '`asyncio` task errored\n' ) raise - else: if ( result != orig @@ -306,22 +402,46 @@ def _run_asyncio_task( to_trio.send_nowait(result) finally: - # breakpoint() - # import pdbp; pdbp.set_trace() - # if the task was spawned using `open_channel_from()` # then we close the channels on exit. if provide_channels: + # breakpoint() # TODO! why no work!? + # import pdbp; pdbp.set_trace() + + # IFF there is a blocked trio waiter, we set the + # aio-side error to be an explicit "exited early" + # (much like a `Return` in our SC IPC proto) for the + # `.open_channel_from()` case where the parent trio + # task might not wait directly for a final returned + # result (i.e. the trio side might be waiting on + # a streamed value) - this is a signal that the + # asyncio.Task has returned early! + # + # TODO, solve other cases where trio side might, + # - raise Cancelled but aio side exits on next tick. + # - raise error but aio side exits on next tick. + # - raise error and aio side errors "independently" + # on next tick (SEE draft HANDLER BELOW). + stats: trio.MemoryChannelStatistics = to_trio.statistics() + if ( + stats.tasks_waiting_receive + and + not chan._aio_err + ): + chan._trio_to_raise = AsyncioTaskExited( + f'Task existed with final result\n' + f'{result!r}\n' + ) + # only close the sender side which will relay - # a ``trio.EndOfChannel`` to the trio (consumer) side. + # a `trio.EndOfChannel` to the trio (consumer) side. to_trio.close() aio_task_complete.set() - # await asyncio.sleep(0.1) - log.info( - f'`asyncio` task terminated\n' - f'x)>\n' - f' |_{task}\n' + log.runtime( + f'`asyncio` task completed\n' + f')>\n' + f' |_{task}\n' ) # start the asyncio task we submitted from trio @@ -331,6 +451,7 @@ def _run_asyncio_task( f'{coro!r}' ) + # schedule the (bg) `asyncio.Task` task: asyncio.Task = asyncio.create_task( wait_on_coro_final_result( to_trio, @@ -359,16 +480,36 @@ def _run_asyncio_task( ) greenback.bestow_portal(task) - def cancel_trio( + def signal_trio_when_done( task: asyncio.Task, ) -> None: ''' - Cancel the parent `trio` task on any error raised by the - `asyncio` side. + Maybe-cancel, relay-and-raise an error to, OR pack a final + `return`-value for the parent (in SC terms) `trio.Task` on + completion of the `asyncio.Task`. + + Note for certain "edge" scheduling-race-conditions we allow + the aio side to dictate dedicated `tractor`-defined excs to + be raised in the `trio` parent task; the intention is to + indicate those races in a VERY pedantic manner! ''' nonlocal chan - relayed_aio_err: BaseException|None = chan._aio_err + trio_err: BaseException|None = chan._trio_err + + # XXX, since the original error we read from the asyncio.Task + # might change between BEFORE and AFTER we here call + # `asyncio.Task.result()` + # + # -> THIS is DUE TO US in `translate_aio_errors()`! + # + # => for example we might set a special exc + # (`AsyncioCancelled|AsyncioTaskExited`) meant to be raised + # in trio (and maybe absorbed depending on the called API) + # BEFORE this done-callback is invoked by `asyncio`'s + # runtime. + trio_to_raise: BaseException|None = chan._trio_to_raise + orig_aio_err: BaseException|None = chan._aio_err aio_err: BaseException|None = None # only to avoid `asyncio` complaining about uncaptured @@ -376,24 +517,45 @@ def _run_asyncio_task( try: res: Any = task.result() log.info( - '`trio` received final result from {task}\n' - f'|_{res}\n' + f'`trio` received final result from `asyncio` task,\n' + f')> {res}\n' + f' |_{task}\n' ) + if not chan._aio_result: + chan._aio_result = res + + # ?TODO, should we also raise `AsyncioTaskExited[res]` + # in any case where trio is NOT blocking on the + # `._to_trio` chan? + # + # -> ?NO RIGHT? since the + # `open_channel_from().__aexit__()` should detect this + # and then set any final `res` from above as a field + # that can optionally be read by the trio-paren-task as + # needed (just like in our + # `Context.wait_for_result()/.result` API yah? + # + # if provide_channels: + except BaseException as _aio_err: aio_err: BaseException = _aio_err - # read again AFTER the `asyncio` side errors in case + + # READ AGAIN, AFTER the `asyncio` side errors, in case # it was cancelled due to an error from `trio` (or # some other out of band exc) and then set to something # else? - relayed_aio_err: BaseException|None = chan._aio_err + curr_aio_err: BaseException|None = chan._aio_err # always true right? assert ( - type(_aio_err) is type(relayed_aio_err) + type(aio_err) + is type(orig_aio_err) + is type(curr_aio_err) ), ( f'`asyncio`-side task errors mismatch?!?\n\n' f'(caught) aio_err: {aio_err}\n' - f'chan._aio_err: {relayed_aio_err}\n' + f'ORIG chan._aio_err: {orig_aio_err}\n' + f'chan._aio_err: {curr_aio_err}\n' ) msg: str = ( @@ -401,7 +563,7 @@ def _run_asyncio_task( '{etype_str}\n' # ^NOTE filled in below ) - if isinstance(_aio_err, CancelledError): + if isinstance(aio_err, CancelledError): msg += ( f'c)>\n' f' |_{task}\n' @@ -409,6 +571,28 @@ def _run_asyncio_task( log.cancel( msg.format(etype_str='cancelled') ) + + # XXX when the asyncio.Task exits early (before the trio + # side) we relay through an exc-as-signal which is + # normally suppressed unless the trio.Task also errors + # + # ?TODO, is this even needed (does it happen) now? + elif isinstance(aio_err, QueueShutDown): + # import pdbp; pdbp.set_trace() + trio_err = AsyncioTaskExited( + 'Task exited before `trio` side' + ) + if not chan._trio_err: + chan._trio_err = trio_err + + msg += ( + f')>\n' + f' |_{task}\n' + ) + log.info( + msg.format(etype_str='exited') + ) + else: msg += ( f'x)>\n' @@ -418,13 +602,20 @@ def _run_asyncio_task( msg.format(etype_str='errored') ) + # is trio the src of the aio task's exc-as-outcome? trio_err: BaseException|None = chan._trio_err + curr_aio_err: BaseException|None = chan._aio_err if ( - relayed_aio_err + curr_aio_err or trio_err + or + trio_to_raise ): - # import pdbp; pdbp.set_trace() + # XXX, if not already, ALWAYs cancel the trio-side on an + # aio-side error or early return. In the case where the trio task is + # blocking on a checkpoint or `asyncio.Queue.get()`. + # NOTE: currently mem chan closure may act as a form # of error relay (at least in the `asyncio.CancelledError` # case) since we have no way to directly trigger a `trio` @@ -432,30 +623,18 @@ def _run_asyncio_task( # We might want to change this in the future though. from_aio.close() - # wait, wut? - # aio_err.with_traceback(aio_err.__traceback__) - - # TODO: show when cancellation originated - # from each side more pedantically in log-msg? - # elif ( - # type(aio_err) is CancelledError - # and # trio was the cause? - # trio_cs.cancel_called - # ): - # log.cancel( - # 'infected task was cancelled by `trio`-side' - # ) - # raise aio_err from task_err - - # XXX: if not already, alway cancel the scope on a task - # error in case the trio task is blocking on - # a checkpoint. if ( not trio_cs.cancelled_caught or not trio_cs.cancel_called ): - # import pdbp; pdbp.set_trace() + log.cancel( + f'Cancelling `trio` side due to aio-side src exc\n' + f'{curr_aio_err}\n' + f'\n' + f'(c>\n' + f' |_{trio_task}\n' + ) trio_cs.cancel() # maybe the `trio` task errored independent from the @@ -478,28 +657,36 @@ def _run_asyncio_task( # for reproducing detailed edge cases as per the above # cases. # + trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise + aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise if ( + not chan._aio_result + and not trio_cs.cancelled_caught - and - (trio_err := chan._trio_err) - and - type(trio_err) not in { - trio.Cancelled, - } and ( - aio_err - and - type(aio_err) not in { + (aio_err and type(aio_err) not in { asyncio.CancelledError - } + }) + or + aio_to_raise + ) + and ( + ((trio_err := chan._trio_err) and type(trio_err) not in { + trio.Cancelled, + }) + or + trio_to_raise ) ): eg = ExceptionGroup( 'Both the `trio` and `asyncio` tasks errored independently!!\n', - (trio_err, aio_err), + ( + trio_to_raise or trio_err, + aio_to_raise or aio_err, + ), ) - chan._trio_err = eg - chan._aio_err = eg + # chan._trio_err = eg + # chan._aio_err = eg raise eg elif aio_err: @@ -507,45 +694,34 @@ def _run_asyncio_task( # match the one we just caught from the task above! # (that would indicate something weird/very-wrong # going on?) - if aio_err is not relayed_aio_err: - raise aio_err from relayed_aio_err + if ( + aio_err is not trio_to_raise + and ( + not suppress_graceful_exits + and ( + chan._aio_result is not Unresolved + and + isinstance(trio_to_raise, AsyncioTaskExited) + ) + ) + ): + # raise aio_err from relayed_aio_err + raise trio_to_raise from curr_aio_err raise aio_err - task.add_done_callback(cancel_trio) + task.add_done_callback(signal_trio_when_done) return chan -class AsyncioCancelled(Exception): - ''' - Asyncio cancelled translation (non-base) error - for use with the ``to_asyncio`` module - to be raised in the ``trio`` side task - - NOTE: this should NOT inherit from `asyncio.CancelledError` or - tests should break! - - ''' - - -class TrioTaskExited(AsyncioCancelled): - ''' - The `trio`-side task exited without explicitly cancelling the - `asyncio.Task` peer. - - This is very similar to how `trio.ClosedResource` acts as - a "clean shutdown" signal to the consumer side of a mem-chan, - - https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels - - ''' - - @acm async def translate_aio_errors( chan: LinkedTaskChannel, wait_on_aio_task: bool = False, cancel_aio_task_on_trio_exit: bool = True, + suppress_graceful_exits: bool = True, + + hide_tb: bool = True, ) -> AsyncIterator[None]: ''' @@ -558,17 +734,20 @@ async def translate_aio_errors( appropriately translates errors and cancels into ``trio`` land. ''' + __tracebackhide__: bool = hide_tb + trio_task = trio.lowlevel.current_task() - - aio_err: BaseException|None = None - + aio_err: BaseException|None = chan._aio_err aio_task: asyncio.Task = chan._aio_task + aio_done_before_trio: bool = aio_task.done() assert aio_task trio_err: BaseException|None = None + to_raise_trio: BaseException|None = None try: yield # back to one of the cross-loop apis except trio.Cancelled as taskc: trio_err = taskc + chan._trio_err = trio_err # should NEVER be the case that `trio` is cancel-handling # BEFORE the other side's task-ref was set!? @@ -577,12 +756,12 @@ async def translate_aio_errors( # import pdbp; pdbp.set_trace() # lolevel-debug # relay cancel through to called `asyncio` task - chan._aio_err = AsyncioCancelled( + chan._aio_to_raise = TrioCancelled( f'trio`-side cancelled the `asyncio`-side,\n' f'c)>\n' - f' |_{trio_task}\n\n' - - f'{trio_err!r}\n' + f' |_{trio_task}\n' + f'\n' + f'trio src exc: {trio_err!r}\n' ) # XXX NOTE XXX seems like we can get all sorts of unreliable @@ -595,22 +774,32 @@ async def translate_aio_errors( # ) # raise + # XXX always passthrough EoC since this translator is often + # called from `LinkedTaskChannel.receive()` which we want + # passthrough and further we have no special meaning for it in + # terms of relaying errors or signals from the aio side! + except trio.EndOfChannel: + raise + # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # task-done-callback. + # + # when the aio side is (possibly self-)cancelled it will close + # the `chan._to_trio` and thus trigger the trio side to raise + # a dedicated `AsyncioCancelled` except ( trio.ClosedResourceError, - # trio.BrokenResourceError, ) as cre: - trio_err = cre + chan._trio_err = cre aio_err = chan._aio_err - # import pdbp; pdbp.set_trace() # XXX if an underlying `asyncio.CancelledError` triggered # this channel close, raise our (non-`BaseException`) wrapper # exception (`AsyncioCancelled`) from that source error. if ( # aio-side is cancelled? - aio_task.cancelled() # not set until it terminates?? + # |_ first not set until it terminates?? + aio_task.cancelled() and type(aio_err) is CancelledError @@ -618,32 +807,26 @@ async def translate_aio_errors( # silent-exit-by-`trio` case? # -[ ] the parent task can also just catch it though? # -[ ] OR, offer a `signal_aio_side_on_exit=True` ?? - # - # or - # aio_err is None - # and - # chan._trio_exited - ): - raise AsyncioCancelled( + # await tractor.pause(shield=True) + chan._trio_to_raise = AsyncioCancelled( f'asyncio`-side cancelled the `trio`-side,\n' f'c(>\n' f' |_{aio_task}\n\n' - f'{trio_err!r}\n' - ) from aio_err + f'(triggered on the `trio`-side by a {cre!r})\n' + ) + # TODO?? needed or does this just get reraised in the + # `finally:` block below? + # raise to_raise_trio from aio_err # maybe the chan-closure is due to something else? else: - raise + raise cre except BaseException as _trio_err: - # await tractor.pause(shield=True) - trio_err = _trio_err - log.exception( - '`trio`-side task errored?' - ) - + trio_err = chan._trio_err = trio_err + # await tractor.pause(shield=True) # workx! entered: bool = await _debug._maybe_enter_pm( trio_err, api_frame=inspect.currentframe(), @@ -653,89 +836,177 @@ async def translate_aio_errors( and not is_multi_cancelled(trio_err) ): - log.exception('actor crashed\n') + log.exception( + '`trio`-side task errored?' + ) + # __tracebackhide__: bool = False - aio_taskc = AsyncioCancelled( - f'`trio`-side task errored!\n' - f'{trio_err}' - ) #from trio_err + # TODO, just a log msg here indicating the scope closed + # and that the trio-side expects that and what the final + # result from the aio side was? + # + # if isinstance(chan._aio_err, AsyncioTaskExited): + # await tractor.pause(shield=True) - try: - aio_task.set_exception(aio_taskc) - except ( - asyncio.InvalidStateError, - RuntimeError, - # ^XXX, uhh bc apparently we can't use `.set_exception()` - # any more XD .. ?? + # if aio side is still active cancel it due to the trio-side + # error! + # ?TODO, mk `AsyncioCancelled[typeof(trio_err)]` embed the + # current exc? + if ( + # not aio_task.cancelled() + # and + not aio_task.done() # TODO? only need this one? + + # XXX LOL, so if it's not set it's an error !? + # yet another good jerb by `ascyncio`.. + # and + # not aio_task.exception() ): + aio_taskc = TrioCancelled( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) + aio_task.set_exception(aio_taskc) wait_on_aio_task = False - - # import pdbp; pdbp.set_trace() - # raise aio_taskc from trio_err + # try: + # aio_task.set_exception(aio_taskc) + # except ( + # asyncio.InvalidStateError, + # RuntimeError, + # # ^XXX, uhh bc apparently we can't use `.set_exception()` + # # any more XD .. ?? + # ): + # wait_on_aio_task = False finally: # record wtv `trio`-side error transpired - chan._trio_err = trio_err - ya_trio_exited: bool = chan._trio_exited + if trio_err: + if chan._trio_err is not trio_err: + await tractor.pause(shield=True) - # NOTE! by default always cancel the `asyncio` task if + # assert chan._trio_err is trio_err + + ya_trio_exited: bool = chan._trio_exited + graceful_trio_exit: bool = ( + ya_trio_exited + and + not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. + ) + + # XXX NOTE! XXX by default always cancel the `asyncio` task if # we've made it this far and it's not done. # TODO, how to detect if there's an out-of-band error that # caused the exit? if ( - cancel_aio_task_on_trio_exit - and not aio_task.done() - and - aio_err + and ( + cancel_aio_task_on_trio_exit + # and + # chan._aio_err # TODO, if it's not .done() is this possible? - # or the trio side has exited it's surrounding cancel scope - # indicating the lifetime of the ``asyncio``-side task - # should also be terminated. - or ( - ya_trio_exited - and - not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. + # did the `.open_channel_from()` parent caller already + # (gracefully) exit scope before this translator was + # invoked? + # => since we couple the lifetime of the `asyncio.Task` + # to the `trio` parent task, it should should also be + # terminated via either, + # + # 1. raising an explicit `TrioTaskExited|TrioCancelled` + # in task via `asyncio.Task._fut_waiter.set_exception()` + # + # 2. or (worst case) by cancelling the aio task using + # the std-but-never-working `asyncio.Task.cancel()` + # (which i can't figure out why that nor + # `Task.set_exception()` seem to never ever do the + # rignt thing! XD). + or + graceful_trio_exit ) ): report: str = ( 'trio-side exited silently!' ) - assert not aio_err, 'WTF how did asyncio do this?!' + assert not chan._aio_err, ( + 'WTF why duz asyncio have err but not dun?!' + ) - # if the `trio.Task` already exited the `open_channel_from()` - # block we ensure the asyncio-side gets signalled via an - # explicit exception and its `Queue` is shutdown. + # if the `trio.Task` terminated without raising + # `trio.Cancelled` (curently handled above) there's + # 2 posibilities, + # + # i. it raised a `trio_err` + # ii. it did a "silent exit" where the + # `open_channel_from().__aexit__()` phase ran without + # any raise or taskc (task cancel) and no final result + # was collected (yet) from the aio side. + # + # SO, ensure the asyncio-side is notified and terminated + # by a dedicated exc-as-signal which distinguishes + # various aio-task-state at termination cases. + # + # Consequently if the aio task doesn't absorb said + # exc-as-signal, the trio side should then see the same exc + # propagate up through the .open_channel_from() call to + # the parent task. + # + # if the `trio.Task` already exited (only can happen for + # the `open_channel_from()` use case) block due to to + # either plain ol' graceful `__aexit__()` or due to taskc + # or an error, we ensure the aio-side gets signalled via + # an explicit exception and its `Queue` is shutdown. if ya_trio_exited: + # raise `QueueShutDown` on next `Queue.get()` call on + # aio side. chan._to_aio.shutdown() - # pump the other side's task? needed? + # pump this event-loop (well `Runner` but ya) + # + # TODO? is this actually needed? + # -[ ] theory is this let's the aio side error on + # next tick and then we sync task states from + # here onward? await trio.lowlevel.checkpoint() - # from tractor._state import is_root_process - # if is_root_process(): - # breakpoint() - + # TODO? factor the next 2 branches into a func like + # `try_terminate_aio_task()` and use it for the taskc + # case above as well? + fut: asyncio.Future|None = aio_task._fut_waiter if ( - not chan._trio_err + fut and - (fut := aio_task._fut_waiter) + not fut.done() ): - # await trio.lowlevel.checkpoint() - # import pdbp; pdbp.set_trace() - fut.set_exception( - TrioTaskExited( - f'The peer `asyncio` task is still blocking/running?\n' - f'>>\n' - f'|_{aio_task!r}\n' + # await tractor.pause() + if graceful_trio_exit: + fut.set_exception( + TrioTaskExited( + f'the `trio.Task` gracefully exited but ' + f'its `asyncio` peer is not done?\n' + f')>\n' + f' |_{trio_task}\n' + f'\n' + f'>>\n' + f' |_{aio_task!r}\n' + ) + ) + + # TODO? should this need to exist given the equiv + # `TrioCancelled` equivalent in the be handler + # above?? + else: + fut.set_exception( + TrioTaskExited( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) ) - ) else: aio_taskc_warn: str = ( f'\n' f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' ) + # await tractor.pause() report += aio_taskc_warn # TODO XXX, figure out the case where calling this makes the # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` @@ -745,19 +1016,17 @@ async def translate_aio_errors( log.warning(report) - # Required to sync with the far end `asyncio`-task to ensure - # any error is captured (via monkeypatching the - # `channel._aio_err`) before calling ``maybe_raise_aio_err()`` - # below! + # sync with the `asyncio.Task`'s completion to ensure any + # error is captured and relayed (via + # `channel._aio_err/._trio_to_raise`) BEFORE calling + # `maybe_raise_aio_side_err()` below! # - # XXX NOTE XXX the `task.set_exception(aio_taskc)` call above - # MUST NOT EXCEPT or this WILL HANG!! - # - # so if you get a hang maybe step through and figure out why - # it erroed out up there! + # XXX WARNING NOTE + # the `task.set_exception(aio_taskc)` call above MUST NOT + # EXCEPT or this WILL HANG!! SO, if you get a hang maybe step + # through and figure out why it erroed out up there! # if wait_on_aio_task: - # await chan.wait_aio_complete() await chan._aio_task_complete.wait() log.info( 'asyncio-task is done and unblocked trio-side!\n' @@ -767,6 +1036,8 @@ async def translate_aio_errors( # -[ ] make this a channel method, OR # -[ ] just put back inline below? # + # await tractor.pause(shield=True) + # TODO, go back to inlining this.. def maybe_raise_aio_side_err( trio_err: Exception, ) -> None: @@ -778,31 +1049,86 @@ async def translate_aio_errors( ''' aio_err: BaseException|None = chan._aio_err + trio_to_raise: ( + AsyncioCancelled| + AsyncioTaskExited| + None + ) = chan._trio_to_raise + + if not suppress_graceful_exits: + raise trio_to_raise from (aio_err or trio_err) + + if trio_to_raise: + # import pdbp; pdbp.set_trace() + match ( + trio_to_raise, + trio_err, + ): + case ( + AsyncioTaskExited(), + trio.Cancelled()|None, + ): + log.info( + 'Ignoring aio exit signal since trio also exited!' + ) + return + + case ( + AsyncioCancelled(), + trio.Cancelled(), + ): + if not aio_done_before_trio: + log.info( + 'Ignoring aio cancelled signal since trio was also cancelled!' + ) + return + case _: + raise trio_to_raise from (aio_err or trio_err) # Check if the asyncio-side is the cause of the trio-side # error. - if ( + elif ( aio_err is not None and type(aio_err) is not AsyncioCancelled + # and ( + # type(aio_err) is not AsyncioTaskExited + # and + # not ya_trio_exited + # and + # not trio_err + # ) - # not isinstance(aio_err, CancelledError) - # type(aio_err) is not CancelledError + # TODO, case where trio_err is not None and + # aio_err is AsyncioTaskExited => raise eg! + # -[ ] maybe use a match bc this get's real + # complex fast XD + # + # or + # type(aio_err) is not AsyncioTaskExited + # and + # trio_err + # ) ): # always raise from any captured asyncio error if trio_err: raise trio_err from aio_err + # XXX NOTE! above in the `trio.ClosedResourceError` + # handler we specifically set the + # `aio_err = AsyncioCancelled` such that it is raised + # as that special exc here! raise aio_err if trio_err: raise trio_err + # await tractor.pause() # NOTE: if any ``asyncio`` error was caught, raise it here inline # here in the ``trio`` task # if trio_err: maybe_raise_aio_side_err( - trio_err=trio_err + trio_err=to_raise_trio or trio_err ) @@ -829,20 +1155,24 @@ async def run_task( async with translate_aio_errors( chan, wait_on_aio_task=True, + suppress_graceful_exits=chan._suppress_graceful_exits, ): # return single value that is the output from the - # ``asyncio`` function-as-task. Expect the mem chan api to - # do the job of handling cross-framework cancellations + # ``asyncio`` function-as-task. Expect the mem chan api + # to do the job of handling cross-framework cancellations # / errors via closure and translation in the - # ``translate_aio_errors()`` in the above ctx mngr. - return await chan.receive() + # `translate_aio_errors()` in the above ctx mngr. + + return await chan._from_aio.receive() + # return await chan.receive() @acm async def open_channel_from( target: Callable[..., Any], - **kwargs, + suppress_graceful_exits: bool = True, + **target_kwargs, ) -> AsyncIterator[Any]: ''' @@ -854,13 +1184,15 @@ async def open_channel_from( target, qsize=2**8, provide_channels=True, - **kwargs, + suppress_graceful_exits=suppress_graceful_exits, + **target_kwargs, ) # TODO, tuple form here? async with chan._from_aio: async with translate_aio_errors( chan, wait_on_aio_task=True, + suppress_graceful_exits=suppress_graceful_exits, ): # sync to a "started()"-like first delivered value from the # ``asyncio`` task. @@ -873,17 +1205,37 @@ async def open_channel_from( except trio.Cancelled as taskc: # await tractor.pause(shield=True) # ya it worx ;) if cs.cancel_called: - log.cancel( - f'trio-side was manually cancelled by aio side\n' - f'|_c>}}{cs!r}?\n' - ) + if isinstance(chan._trio_to_raise, AsyncioCancelled): + log.cancel( + f'trio-side was manually cancelled by aio side\n' + f'|_c>}}{cs!r}?\n' + ) # TODO, maybe a special `TrioCancelled`??? raise taskc finally: chan._trio_exited = True - chan._to_trio.close() + + # when the aio side is still ongoing but trio exits + # early we signal with a special exc (kinda like + # a `Return`-msg for IPC ctxs) + aio_task: asyncio.Task = chan._aio_task + if not aio_task.done(): + fut: asyncio.Future|None = aio_task._fut_waiter + if fut: + fut.set_exception( + TrioTaskExited( + f'but the child `asyncio` task is still running?\n' + f'>>\n' + f' |_{aio_task!r}\n' + ) + ) + else: + # XXX SHOULD NEVER HAPPEN! + await tractor.pause() + else: + chan._to_trio.close() class AsyncioRuntimeTranslationError(RuntimeError):