From 4c82b6e94fe004f30f43b9018c74d9145a434bc0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Mar 2025 21:25:05 -0500 Subject: [PATCH] TOAMMEND: Add exit per-side task exit and cancellation signals --- tests/test_infected_asyncio.py | 159 ++++++-- tractor/_exceptions.py | 52 ++- tractor/to_asyncio.py | 701 ++++++++++++++++++++++++--------- 3 files changed, 681 insertions(+), 231 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index d462f59d..f41a0efe 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -66,13 +66,15 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): ''' async def main(): - async with tractor.open_nursery( - registry_addrs=[reg_addr] - ) as n: - await n.run_in_actor( - trio_cancels_single_aio_task, - infect_asyncio=True, - ) + # with trio.fail_after(1): + with trio.fail_after(999): + async with tractor.open_nursery( + registry_addrs=[reg_addr] + ) as n: + await n.run_in_actor( + trio_cancels_single_aio_task, + infect_asyncio=True, + ) trio.run(main) @@ -250,6 +252,7 @@ def test_context_spawns_aio_task_that_errors( ''' async def main(): with trio.fail_after(2): + # with trio.fail_after(999): async with tractor.open_nursery() as n: p = await n.start_actor( 'aio_daemon', @@ -342,7 +345,8 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( # NOTE: normally the `an.__aexit__()` waits on the # portal's result but we do it explicitly here # to avoid indent levels. - with trio.fail_after(1): + # with trio.fail_after(1): + with trio.fail_after(999): await p.wait_for_result() with pytest.raises( @@ -353,11 +357,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( # might get multiple `trio.Cancelled`s as well inside an inception err: RemoteActorError|ExceptionGroup = excinfo.value if isinstance(err, ExceptionGroup): - err = next(itertools.dropwhile( - lambda exc: not isinstance(exc, tractor.RemoteActorError), - err.exceptions - )) - assert err + excs = err.exceptions + assert len(excs) == 1 + final_exc = excs[0] + assert isinstance(final_exc, tractor.RemoteActorError) # relayed boxed error should be our `trio`-task's # cancel-signal-proxy-equivalent of `asyncio.CancelledError`. @@ -370,15 +373,18 @@ async def no_to_trio_in_args(): async def push_from_aio_task( - sequence: Iterable, to_trio: trio.abc.SendChannel, expect_cancel: False, fail_early: bool, + exit_early: bool, ) -> None: try: + # print('trying breakpoint') + # breakpoint() + # sync caller ctx manager to_trio.send_nowait(True) @@ -387,10 +393,26 @@ async def push_from_aio_task( to_trio.send_nowait(i) await asyncio.sleep(0.001) - if i == 50 and fail_early: - raise Exception + if ( + i == 50 + ): + if fail_early: + raise Exception - print('asyncio streamer complete!') + if exit_early: + # TODO? really you could enforce the same + # SC-proto we use for actors here with asyncio + # such that a Return[None] msg would be + # implicitly delivered to the trio side? + # + # XXX => this might be the end-all soln for + # converting any-inter-task system (regardless + # of maybe-remote runtime or language) to be + # SC-compat no? + print(f'asyncio breaking early @ {i!r}') + break + + print('asyncio streaming complete!') except asyncio.CancelledError: if not expect_cancel: @@ -402,9 +424,10 @@ async def push_from_aio_task( async def stream_from_aio( - exit_early: bool = False, + trio_exit_early: bool = False, raise_err: bool = False, aio_raise_err: bool = False, + aio_exit_early: bool = False, fan_out: bool = False, ) -> None: @@ -417,8 +440,9 @@ async def stream_from_aio( async with to_asyncio.open_channel_from( push_from_aio_task, sequence=seq, - expect_cancel=raise_err or exit_early, + expect_cancel=raise_err or trio_exit_early, fail_early=aio_raise_err, + exit_early=aio_exit_early, ) as (first, chan): @@ -437,7 +461,7 @@ async def stream_from_aio( if value == 50: if raise_err: raise Exception - elif exit_early: + elif trio_exit_early: print('`consume()` breaking early!\n') break @@ -472,9 +496,13 @@ async def stream_from_aio( finally: if ( - not raise_err and - not exit_early and + not raise_err + and + not trio_exit_early + and not aio_raise_err + and + not aio_exit_early ): if fan_out: # we get double the pulled values in the @@ -484,6 +512,7 @@ async def stream_from_aio( assert list(sorted(pulled)) == expect else: + # await tractor.pause() assert pulled == expect else: assert not fan_out @@ -530,37 +559,103 @@ def test_trio_error_cancels_intertask_chan(reg_addr): excinfo.value.boxed_type is Exception -def test_trio_closes_early_and_channel_exits( +def test_trio_closes_early_causes_aio_checkpoint_raise( reg_addr: tuple[str, int], ): ''' - Check that if the `trio`-task "exits early" on `async for`ing the - inter-task-channel (via a `break`) we exit silently from the - `open_channel_from()` block and get a final `Return[None]` msg. + Check that if the `trio`-task "exits early and silently" (in this + case during `async for`-ing the inter-task-channel via + a `break`-from-loop), we raise `TrioTaskExited` on the + `asyncio`-side which also then bubbles up through the + `open_channel_from()` block indicating that the `asyncio.Task` + hit a ran another checkpoint despite the `trio.Task` exit. ''' async def main(): with trio.fail_after(2): + # with trio.fail_after(999): async with tractor.open_nursery( # debug_mode=True, # enable_stack_on_sig=True, ) as n: portal = await n.run_in_actor( stream_from_aio, - exit_early=True, + trio_exit_early=True, infect_asyncio=True, ) # should raise RAE diectly print('waiting on final infected subactor result..') res: None = await portal.wait_for_result() assert res is None - print('infected subactor returned result: {res!r}\n') + print(f'infected subactor returned result: {res!r}\n') # should be a quiet exit on a simple channel exit - trio.run( - main, - # strict_exception_groups=False, - ) + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure remote error is an explicit `AsyncioCancelled` sub-type + # which indicates to the aio task that the trio side exited + # silently WITHOUT raising a `trio.Cancelled` (which would + # normally be raised instead as a `AsyncioCancelled`). + excinfo.value.boxed_type is to_asyncio.TrioTaskExited + + +def test_aio_exits_early_relays_AsyncioTaskExited( + # TODO, parametrize the 3 possible trio side conditions: + # - trio blocking on receive, aio exits early + # - trio cancelled AND aio exits early on its next tick + # - trio errors AND aio exits early on its next tick + reg_addr: tuple[str, int], +): + ''' + Check that if the `asyncio`-task "exits early and silently" (in this + case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel` + it `break`s from the loop), we raise `AsyncioTaskExited` on the + `trio`-side which then DOES NOT BUBBLE up through the + `open_channel_from()` block UNLESS, + + - the trio.Task also errored/cancelled, in which case we wrap + both errors in an eg + - the trio.Task was blocking on rxing a value from the + `InterLoopTaskChannel`. + + ''' + async def main(): + with trio.fail_after(2): + # with trio.fail_after(999): + async with tractor.open_nursery( + # debug_mode=True, + # enable_stack_on_sig=True, + ) as an: + portal = await an.run_in_actor( + stream_from_aio, + infect_asyncio=True, + trio_exit_early=False, + aio_exit_early=True, + ) + # should raise RAE diectly + print('waiting on final infected subactor result..') + res: None = await portal.wait_for_result() + assert res is None + print(f'infected subactor returned result: {res!r}\n') + + # should be a quiet exit on a simple channel exit + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + exc = excinfo.value + + # TODO, wow bug! + # -[ ] bp handler not replaced!?!? + # breakpoint() + + # import pdbp; pdbp.set_trace() + + # ensure remote error is an explicit `AsyncioCancelled` sub-type + # which indicates to the aio task that the trio side exited + # silently WITHOUT raising a `trio.Cancelled` (which would + # normally be raised instead as a `AsyncioCancelled`). + assert exc.boxed_type is to_asyncio.AsyncioTaskExited def test_aio_errors_and_channel_propagates_and_closes(reg_addr): diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index b4386db0..3382be10 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -82,6 +82,39 @@ class InternalError(RuntimeError): ''' +class AsyncioCancelled(Exception): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + NOTE: this should NOT inherit from `asyncio.CancelledError` or + tests should break! + + ''' + + +class AsyncioTaskExited(Exception): + ''' + asyncio.Task "exited" translation error for use with the + `to_asyncio` APIs to be raised in the `trio` side task indicating + on `.run_task()`/`.open_channel_from()` exit that the aio side + exited early/silently. + + ''' + +class TrioTaskExited(AsyncioCancelled): + ''' + The `trio`-side task exited without explicitly cancelling the + `asyncio.Task` peer. + + This is very similar to how `trio.ClosedResource` acts as + a "clean shutdown" signal to the consumer side of a mem-chan, + + https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels + + ''' + # NOTE: more or less should be close to these: # 'boxed_type', @@ -127,8 +160,8 @@ _body_fields: list[str] = list( def get_err_type(type_name: str) -> BaseException|None: ''' - Look up an exception type by name from the set of locally - known namespaces: + Look up an exception type by name from the set of locally known + namespaces: - `builtins` - `tractor._exceptions` @@ -358,6 +391,13 @@ class RemoteActorError(Exception): self._ipc_msg.src_type_str ) + if not self._src_type: + raise TypeError( + f'Failed to lookup src error type with ' + f'`tractor._exceptions.get_err_type()` :\n' + f'{self.src_type_str}' + ) + return self._src_type @property @@ -652,16 +692,10 @@ class RemoteActorError(Exception): failing actor's remote env. ''' - src_type_ref: Type[BaseException] = self.src_type - if not src_type_ref: - raise TypeError( - 'Failed to lookup src error type:\n' - f'{self.src_type_str}' - ) - # TODO: better tb insertion and all the fancier dunder # metadata stuff as per `.__context__` etc. and friends: # https://github.com/python-trio/trio/issues/611 + src_type_ref: Type[BaseException] = self.src_type return src_type_ref(self.tb_str) # TODO: local recontruction of nested inception for a given diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 75dfb5cb..b7e39c36 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -20,7 +20,12 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' from __future__ import annotations import asyncio -from asyncio.exceptions import CancelledError +from asyncio.exceptions import ( + CancelledError, +) +from asyncio import ( + QueueShutDown, +) from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect @@ -34,12 +39,18 @@ from typing import ( import tractor from tractor._exceptions import ( + InternalError, is_multi_cancelled, + TrioTaskExited, + TrioCancelled, + AsyncioTaskExited, + AsyncioCancelled, ) from tractor._state import ( debug_mode, _runtime_vars, ) +from tractor._context import Unresolved from tractor.devx import _debug from tractor.log import ( get_logger, @@ -69,6 +80,21 @@ __all__ = [ ] +# TODO, generally speaking we can generalize this abstraction, a "SC linked +# parent->child task pair", as the same "supervision scope primitive" +# **that is** our `._context.Context` with the only difference being +# in how the tasks conduct msg-passing comms. +# +# For `LinkedTaskChannel` we are passing the equivalent of (once you +# include all the recently added `._trio/aio_to_raise` +# exd-as-signals) our SC-dialog-proto over each asyncIO framework's +# mem-chan impl, +# +# verus in `Context` +# +# We are doing the same thing but msg-passing comms happens over an +# IPC transport between tasks in different memory domains. + @dataclass class LinkedTaskChannel( trio.abc.Channel, @@ -84,18 +110,83 @@ class LinkedTaskChannel( ''' _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel + _to_trio: trio.MemorySendChannel _trio_cs: trio.CancelScope _trio_task: trio.Task _aio_task_complete: trio.Event _trio_err: BaseException|None = None + _trio_to_raise: ( + AsyncioTaskExited| # aio task exits while trio ongoing + AsyncioCancelled| # aio task is (self-)cancelled + BaseException| + None + ) = None _trio_exited: bool = False - # set after ``asyncio.create_task()`` - # _aio_first: Any|None = None + # set after `asyncio.create_task()` _aio_task: asyncio.Task|None = None _aio_err: BaseException|None = None + _aio_to_raise: ( + TrioTaskExited| # trio task exits while aio ongoing + BaseException| + None + ) = None + # _aio_first: Any|None = None # TODO? + _aio_result: Any|Unresolved = Unresolved + + def _final_result_is_set(self) -> bool: + return self._aio_result is not Unresolved + + # TODO? equiv from `Context`? + # @property + # def has_outcome(self) -> bool: + # return ( + # bool(self.maybe_error) + # or + # self._final_result_is_set() + # ) + + async def wait_for_result( + self, + hide_tb: bool = True, + + ) -> Any: + ''' + Wait for the `asyncio.Task.result()` from `trio` + + ''' + __tracebackhide__: bool = hide_tb + assert self._portal, ( + '`Context.wait_for_result()` can not be called from callee side!' + ) + if self._final_result_is_set(): + return self._aio_result + + async with translate_aio_errors( + chan=self, + wait_aio_task=False, + ): + await self._aio_task_complete.wait() + + if ( + not self._final_result_is_set() + ): + if (trio_to_raise := self._trio_to_raise): + raise trio_to_raise from self._aio_err + + elif aio_err := self._aio_err: + raise aio_err + + else: + raise InternalError( + f'Asyncio-task has no result or error set !?\n' + f'{self._aio_task}' + ) + + return self._aio_result + _broadcaster: BroadcastReceiver|None = None async def aclose(self) -> None: @@ -137,7 +228,7 @@ class LinkedTaskChannel( return await self._from_aio.receive() except BaseException as err: async with translate_aio_errors( - self, + chan=self, # XXX: obviously this will deadlock if an on-going stream is # being procesed. @@ -154,8 +245,9 @@ class LinkedTaskChannel( ''' self._to_aio.put_nowait(item) - async def wait_aio_complete(self) -> None: - await self._aio_task_complete.wait() + # TODO? needed? + # async def wait_aio_complete(self) -> None: + # await self._aio_task_complete.wait() def cancel_asyncio_task( self, @@ -269,9 +361,8 @@ def _run_asyncio_task( ) -> None: ''' - Await `coro` and relay result back to `trio`. - - This can only be run as an `asyncio.Task`! + Await input `coro` as/in an `asyncio.Task` and deliver final + `return`-ed result back to `trio`. ''' nonlocal aio_err @@ -279,7 +370,8 @@ def _run_asyncio_task( orig = result = id(coro) try: - result = await coro + result: Any = await coro + chan._aio_result = result except BaseException as aio_err: chan._aio_err = aio_err if isinstance(aio_err, CancelledError): @@ -306,22 +398,42 @@ def _run_asyncio_task( to_trio.send_nowait(result) finally: - # breakpoint() - # import pdbp; pdbp.set_trace() - # if the task was spawned using `open_channel_from()` # then we close the channels on exit. if provide_channels: + # breakpoint() # TODO! why no work!? + # import pdbp; pdbp.set_trace() + + # IFF there is a blocked trio waiter, we set the + # aio-side error to be an explicit "exited early" + # (much like a `Return` in our SC IPC proto) for the + # `.open_channel_from()` case where the parent trio + # task might not wait directly for a final returned + # result (i.e. the trio side might be waiting on + # a streamed value) - this is a signal that the + # asyncio.Task has returned early! + # + # TODO, solve other cases where trio side might, + # - raise Cancelled but aio side exits on next tick. + # - raise error but aio side exits on next tick. + # - raise error and aio side errors "independently" + # on next tick (SEE draft HANDLER BELOW). + stats: trio.MemoryChannelStatistics = to_trio.statistics() + if stats.tasks_waiting_receive: + chan._trio_to_raise = AsyncioTaskExited( + f'Task existed with final result\n' + f'{result!r}\n' + ) + # only close the sender side which will relay - # a ``trio.EndOfChannel`` to the trio (consumer) side. + # a `trio.EndOfChannel` to the trio (consumer) side. to_trio.close() aio_task_complete.set() - # await asyncio.sleep(0.1) - log.info( - f'`asyncio` task terminated\n' - f'x)>\n' - f' |_{task}\n' + log.runtime( + f'`asyncio` task completed\n' + f')>\n' + f' |_{task}\n' ) # start the asyncio task we submitted from trio @@ -331,6 +443,7 @@ def _run_asyncio_task( f'{coro!r}' ) + # schedule the (bg) `asyncio.Task` task: asyncio.Task = asyncio.create_task( wait_on_coro_final_result( to_trio, @@ -359,16 +472,36 @@ def _run_asyncio_task( ) greenback.bestow_portal(task) - def cancel_trio( + def signal_trio_when_done( task: asyncio.Task, ) -> None: ''' - Cancel the parent `trio` task on any error raised by the - `asyncio` side. + Maybe-cancel, relay-and-raise an error to, OR pack a final + `return`-value for the parent (in SC terms) `trio.Task` on + completion of the `asyncio.Task`. + + Note for certain "edge" scheduling-race-conditions we allow + the aio side to dictate dedicated `tractor`-defined excs to + be raised in the `trio` parent task; the intention is to + indicate those races in a VERY pedantic manner! ''' nonlocal chan - relayed_aio_err: BaseException|None = chan._aio_err + trio_err: BaseException|None = chan._trio_err + + # XXX, since the original error we read from the asyncio.Task + # might change between BEFORE and AFTER we here call + # `asyncio.Task.result()` + # + # -> THIS is DUE TO US in `translate_aio_errors()`! + # + # => for example we might set a special exc + # (`AsyncioCancelled|AsyncioTaskExited`) meant to be raised + # in trio (and maybe absorbed depending on the called API) + # BEFORE this done-callback is invoked by `asyncio`'s + # runtime. + trio_to_raise: BaseException|None = chan._trio_to_raise + orig_aio_err: BaseException|None = chan._aio_err aio_err: BaseException|None = None # only to avoid `asyncio` complaining about uncaptured @@ -376,24 +509,42 @@ def _run_asyncio_task( try: res: Any = task.result() log.info( - '`trio` received final result from {task}\n' - f'|_{res}\n' + f'`trio` received final result from `asyncio` task,\n' + f')> {res}\n' + f' |_{task}\n' ) + # ?TODO, should we also raise `AsyncioTaskExited[res]` + # in any case where trio is NOT blocking on the + # `._to_trio` chan? + # + # -> ?NO RIGHT? since the + # `open_channel_from().__aexit__()` should detect this + # and then set any final `res` from above as a field + # that can optionally be read by the trio-paren-task as + # needed (just like in our + # `Context.wait_for_result()/.result` API yah? + # + # if provide_channels: + except BaseException as _aio_err: aio_err: BaseException = _aio_err - # read again AFTER the `asyncio` side errors in case + + # READ AGAIN, AFTER the `asyncio` side errors, in case # it was cancelled due to an error from `trio` (or # some other out of band exc) and then set to something # else? - relayed_aio_err: BaseException|None = chan._aio_err + curr_aio_err: BaseException|None = chan._aio_err # always true right? assert ( - type(_aio_err) is type(relayed_aio_err) + type(aio_err) + is type(orig_aio_err) + is type(curr_aio_err) ), ( f'`asyncio`-side task errors mismatch?!?\n\n' f'(caught) aio_err: {aio_err}\n' - f'chan._aio_err: {relayed_aio_err}\n' + f'ORIG chan._aio_err: {orig_aio_err}\n' + f'chan._aio_err: {curr_aio_err}\n' ) msg: str = ( @@ -401,7 +552,7 @@ def _run_asyncio_task( '{etype_str}\n' # ^NOTE filled in below ) - if isinstance(_aio_err, CancelledError): + if isinstance(aio_err, CancelledError): msg += ( f'c)>\n' f' |_{task}\n' @@ -409,6 +560,28 @@ def _run_asyncio_task( log.cancel( msg.format(etype_str='cancelled') ) + + # XXX when the asyncio.Task exits early (before the trio + # side) we relay through an exc-as-signal which is + # normally suppressed unless the trio.Task also errors + # + # ?TODO, is this even needed (does it happen) now? + elif isinstance(aio_err, QueueShutDown): + # import pdbp; pdbp.set_trace() + trio_err = AsyncioTaskExited( + 'Task exited before `trio` side' + ) + if not chan._trio_err: + chan._trio_err = trio_err + + msg += ( + f')>\n' + f' |_{task}\n' + ) + log.info( + msg.format(etype_str='exited') + ) + else: msg += ( f'x)>\n' @@ -418,13 +591,19 @@ def _run_asyncio_task( msg.format(etype_str='errored') ) + # is trio the src of the aio task's exc-as-outcome? trio_err: BaseException|None = chan._trio_err if ( - relayed_aio_err + curr_aio_err or trio_err + or + trio_to_raise ): - # import pdbp; pdbp.set_trace() + # XXX, if not already, ALWAYs cancel the trio-side on an + # aio-side error or early return. In the case where the trio task is + # blocking on a checkpoint or `asyncio.Queue.get()`. + # NOTE: currently mem chan closure may act as a form # of error relay (at least in the `asyncio.CancelledError` # case) since we have no way to directly trigger a `trio` @@ -432,30 +611,18 @@ def _run_asyncio_task( # We might want to change this in the future though. from_aio.close() - # wait, wut? - # aio_err.with_traceback(aio_err.__traceback__) - - # TODO: show when cancellation originated - # from each side more pedantically in log-msg? - # elif ( - # type(aio_err) is CancelledError - # and # trio was the cause? - # trio_cs.cancel_called - # ): - # log.cancel( - # 'infected task was cancelled by `trio`-side' - # ) - # raise aio_err from task_err - - # XXX: if not already, alway cancel the scope on a task - # error in case the trio task is blocking on - # a checkpoint. if ( not trio_cs.cancelled_caught or not trio_cs.cancel_called ): - # import pdbp; pdbp.set_trace() + log.cancel( + f'Cancelling `trio` side due to aio-side src exc\n' + f'{curr_aio_err}\n' + f'\n' + f'(c>\n' + f' |_{trio_task}\n' + ) trio_cs.cancel() # maybe the `trio` task errored independent from the @@ -478,28 +645,34 @@ def _run_asyncio_task( # for reproducing detailed edge cases as per the above # cases. # + trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise + aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise if ( not trio_cs.cancelled_caught - and - (trio_err := chan._trio_err) - and - type(trio_err) not in { - trio.Cancelled, - } and ( - aio_err - and - type(aio_err) not in { + (aio_err and type(aio_err) not in { asyncio.CancelledError - } + }) + or + aio_to_raise + ) + and ( + ((trio_err := chan._trio_err) and type(trio_err) not in { + trio.Cancelled, + }) + or + trio_to_raise ) ): eg = ExceptionGroup( 'Both the `trio` and `asyncio` tasks errored independently!!\n', - (trio_err, aio_err), + ( + trio_to_raise or trio_err, + aio_to_raise or aio_err, + ), ) - chan._trio_err = eg - chan._aio_err = eg + # chan._trio_err = eg + # chan._aio_err = eg raise eg elif aio_err: @@ -507,45 +680,24 @@ def _run_asyncio_task( # match the one we just caught from the task above! # (that would indicate something weird/very-wrong # going on?) - if aio_err is not relayed_aio_err: - raise aio_err from relayed_aio_err + if aio_err is not trio_to_raise: + # raise aio_err from relayed_aio_err + raise trio_to_raise from curr_aio_err raise aio_err - task.add_done_callback(cancel_trio) + task.add_done_callback(signal_trio_when_done) return chan -class AsyncioCancelled(Exception): - ''' - Asyncio cancelled translation (non-base) error - for use with the ``to_asyncio`` module - to be raised in the ``trio`` side task - - NOTE: this should NOT inherit from `asyncio.CancelledError` or - tests should break! - - ''' - - -class TrioTaskExited(AsyncioCancelled): - ''' - The `trio`-side task exited without explicitly cancelling the - `asyncio.Task` peer. - - This is very similar to how `trio.ClosedResource` acts as - a "clean shutdown" signal to the consumer side of a mem-chan, - - https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels - - ''' - - @acm async def translate_aio_errors( chan: LinkedTaskChannel, wait_on_aio_task: bool = False, cancel_aio_task_on_trio_exit: bool = True, + suppress_graceful_exits: bool = True, + + hide_tb: bool = True, ) -> AsyncIterator[None]: ''' @@ -558,17 +710,19 @@ async def translate_aio_errors( appropriately translates errors and cancels into ``trio`` land. ''' + __tracebackhide__: bool = hide_tb + trio_task = trio.lowlevel.current_task() - - aio_err: BaseException|None = None - + aio_err: BaseException|None = chan._aio_err aio_task: asyncio.Task = chan._aio_task assert aio_task trio_err: BaseException|None = None + to_raise_trio: BaseException|None = None try: yield # back to one of the cross-loop apis except trio.Cancelled as taskc: trio_err = taskc + chan._trio_err = trio_err # should NEVER be the case that `trio` is cancel-handling # BEFORE the other side's task-ref was set!? @@ -577,12 +731,12 @@ async def translate_aio_errors( # import pdbp; pdbp.set_trace() # lolevel-debug # relay cancel through to called `asyncio` task - chan._aio_err = AsyncioCancelled( + chan._aio_to_raise = TrioCancelled( f'trio`-side cancelled the `asyncio`-side,\n' f'c)>\n' - f' |_{trio_task}\n\n' - - f'{trio_err!r}\n' + f' |_{trio_task}\n' + f'\n' + f'trio src exc: {trio_err!r}\n' ) # XXX NOTE XXX seems like we can get all sorts of unreliable @@ -595,22 +749,32 @@ async def translate_aio_errors( # ) # raise + # XXX always passthrough EoC since this translator is often + # called from `LinkedTaskChannel.receive()` which we want + # passthrough and further we have no special meaning for it in + # terms of relaying errors or signals from the aio side! + except trio.EndOfChannel: + raise + # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # task-done-callback. + # + # when the aio side is (possibly self-)cancelled it will close + # the `chan._to_trio` and thus trigger the trio side to raise + # a dedicated `AsyncioCancelled` except ( trio.ClosedResourceError, - # trio.BrokenResourceError, ) as cre: - trio_err = cre + chan._trio_err = cre aio_err = chan._aio_err - # import pdbp; pdbp.set_trace() # XXX if an underlying `asyncio.CancelledError` triggered # this channel close, raise our (non-`BaseException`) wrapper # exception (`AsyncioCancelled`) from that source error. if ( # aio-side is cancelled? - aio_task.cancelled() # not set until it terminates?? + # |_ first not set until it terminates?? + aio_task.cancelled() and type(aio_err) is CancelledError @@ -618,32 +782,26 @@ async def translate_aio_errors( # silent-exit-by-`trio` case? # -[ ] the parent task can also just catch it though? # -[ ] OR, offer a `signal_aio_side_on_exit=True` ?? - # - # or - # aio_err is None - # and - # chan._trio_exited - ): - raise AsyncioCancelled( + # await tractor.pause(shield=True) + chan._trio_to_raise = AsyncioCancelled( f'asyncio`-side cancelled the `trio`-side,\n' f'c(>\n' f' |_{aio_task}\n\n' - f'{trio_err!r}\n' - ) from aio_err + f'(triggered on the `trio`-side by a {cre!r})\n' + ) + # TODO?? needed or does this just get reraised in the + # `finally:` block below? + # raise to_raise_trio from aio_err # maybe the chan-closure is due to something else? else: - raise + raise cre except BaseException as _trio_err: - # await tractor.pause(shield=True) - trio_err = _trio_err - log.exception( - '`trio`-side task errored?' - ) - + trio_err = chan._trio_err = trio_err + # await tractor.pause(shield=True) # workx! entered: bool = await _debug._maybe_enter_pm( trio_err, api_frame=inspect.currentframe(), @@ -653,89 +811,177 @@ async def translate_aio_errors( and not is_multi_cancelled(trio_err) ): - log.exception('actor crashed\n') + log.exception( + '`trio`-side task errored?' + ) + # __tracebackhide__: bool = False - aio_taskc = AsyncioCancelled( - f'`trio`-side task errored!\n' - f'{trio_err}' - ) #from trio_err + # TODO, just a log msg here indicating the scope closed + # and that the trio-side expects that and what the final + # result from the aio side was? + # + # if isinstance(chan._aio_err, AsyncioTaskExited): + # await tractor.pause(shield=True) - try: - aio_task.set_exception(aio_taskc) - except ( - asyncio.InvalidStateError, - RuntimeError, - # ^XXX, uhh bc apparently we can't use `.set_exception()` - # any more XD .. ?? + # if aio side is still active cancel it due to the trio-side + # error! + # ?TODO, mk `AsyncioCancelled[typeof(trio_err)]` embed the + # current exc? + if ( + # not aio_task.cancelled() + # and + not aio_task.done() # TODO? only need this one? + + # XXX LOL, so if it's not set it's an error !? + # yet another good jerb by `ascyncio`.. + # and + # not aio_task.exception() ): + aio_taskc = TrioCancelled( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) + aio_task.set_exception(aio_taskc) wait_on_aio_task = False - - # import pdbp; pdbp.set_trace() - # raise aio_taskc from trio_err + # try: + # aio_task.set_exception(aio_taskc) + # except ( + # asyncio.InvalidStateError, + # RuntimeError, + # # ^XXX, uhh bc apparently we can't use `.set_exception()` + # # any more XD .. ?? + # ): + # wait_on_aio_task = False finally: # record wtv `trio`-side error transpired - chan._trio_err = trio_err - ya_trio_exited: bool = chan._trio_exited + if trio_err: + if chan._trio_err is not trio_err: + await tractor.pause(shield=True) - # NOTE! by default always cancel the `asyncio` task if + # assert chan._trio_err is trio_err + + ya_trio_exited: bool = chan._trio_exited + graceful_trio_exit: bool = ( + ya_trio_exited + and + not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. + ) + + # XXX NOTE! XXX by default always cancel the `asyncio` task if # we've made it this far and it's not done. # TODO, how to detect if there's an out-of-band error that # caused the exit? if ( - cancel_aio_task_on_trio_exit - and not aio_task.done() - and - aio_err + and ( + cancel_aio_task_on_trio_exit + # and + # chan._aio_err # TODO, if it's not .done() is this possible? - # or the trio side has exited it's surrounding cancel scope - # indicating the lifetime of the ``asyncio``-side task - # should also be terminated. - or ( - ya_trio_exited - and - not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. + # did the `.open_channel_from()` parent caller already + # (gracefully) exit scope before this translator was + # invoked? + # => since we couple the lifetime of the `asyncio.Task` + # to the `trio` parent task, it should should also be + # terminated via either, + # + # 1. raising an explicit `TrioTaskExited|TrioCancelled` + # in task via `asyncio.Task._fut_waiter.set_exception()` + # + # 2. or (worst case) by cancelling the aio task using + # the std-but-never-working `asyncio.Task.cancel()` + # (which i can't figure out why that nor + # `Task.set_exception()` seem to never ever do the + # rignt thing! XD). + or + graceful_trio_exit ) ): report: str = ( 'trio-side exited silently!' ) - assert not aio_err, 'WTF how did asyncio do this?!' + assert not chan._aio_err, ( + 'WTF why duz asyncio have err but not dun?!' + ) - # if the `trio.Task` already exited the `open_channel_from()` - # block we ensure the asyncio-side gets signalled via an - # explicit exception and its `Queue` is shutdown. + # if the `trio.Task` terminated without raising + # `trio.Cancelled` (curently handled above) there's + # 2 posibilities, + # + # i. it raised a `trio_err` + # ii. it did a "silent exit" where the + # `open_channel_from().__aexit__()` phase ran without + # any raise or taskc (task cancel) and no final result + # was collected (yet) from the aio side. + # + # SO, ensure the asyncio-side is notified and terminated + # by a dedicated exc-as-signal which distinguishes + # various aio-task-state at termination cases. + # + # Consequently if the aio task doesn't absorb said + # exc-as-signal, the trio side should then see the same exc + # propagate up through the .open_channel_from() call to + # the parent task. + # + # if the `trio.Task` already exited (only can happen for + # the `open_channel_from()` use case) block due to to + # either plain ol' graceful `__aexit__()` or due to taskc + # or an error, we ensure the aio-side gets signalled via + # an explicit exception and its `Queue` is shutdown. if ya_trio_exited: + # raise `QueueShutDown` on next `Queue.get()` call on + # aio side. chan._to_aio.shutdown() - # pump the other side's task? needed? + # pump this event-loop (well `Runner` but ya) + # + # TODO? is this actually needed? + # -[ ] theory is this let's the aio side error on + # next tick and then we sync task states from + # here onward? await trio.lowlevel.checkpoint() - # from tractor._state import is_root_process - # if is_root_process(): - # breakpoint() - + # TODO? factor the next 2 branches into a func like + # `try_terminate_aio_task()` and use it for the taskc + # case above as well? + fut: asyncio.Future|None = aio_task._fut_waiter if ( - not chan._trio_err + fut and - (fut := aio_task._fut_waiter) + not fut.done() ): - # await trio.lowlevel.checkpoint() - # import pdbp; pdbp.set_trace() - fut.set_exception( - TrioTaskExited( - f'The peer `asyncio` task is still blocking/running?\n' - f'>>\n' - f'|_{aio_task!r}\n' + # await tractor.pause() + if graceful_trio_exit: + fut.set_exception( + TrioTaskExited( + f'the `trio.Task` gracefully exited but ' + f'its `asyncio` peer is not done?\n' + f')>\n' + f' |_{trio_task}\n' + f'\n' + f'>>\n' + f' |_{aio_task!r}\n' + ) + ) + + # TODO? should this need to exist given the equiv + # `TrioCancelled` equivalent in the be handler + # above?? + else: + fut.set_exception( + TrioTaskExited( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) ) - ) else: aio_taskc_warn: str = ( f'\n' f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' ) + # await tractor.pause() report += aio_taskc_warn # TODO XXX, figure out the case where calling this makes the # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` @@ -745,19 +991,17 @@ async def translate_aio_errors( log.warning(report) - # Required to sync with the far end `asyncio`-task to ensure - # any error is captured (via monkeypatching the - # `channel._aio_err`) before calling ``maybe_raise_aio_err()`` - # below! + # sync with the `asyncio.Task`'s completion to ensure any + # error is captured and relayed (via + # `channel._aio_err/._trio_to_raise`) BEFORE calling + # `maybe_raise_aio_side_err()` below! # - # XXX NOTE XXX the `task.set_exception(aio_taskc)` call above - # MUST NOT EXCEPT or this WILL HANG!! - # - # so if you get a hang maybe step through and figure out why - # it erroed out up there! + # XXX WARNING NOTE + # the `task.set_exception(aio_taskc)` call above MUST NOT + # EXCEPT or this WILL HANG!! SO, if you get a hang maybe step + # through and figure out why it erroed out up there! # if wait_on_aio_task: - # await chan.wait_aio_complete() await chan._aio_task_complete.wait() log.info( 'asyncio-task is done and unblocked trio-side!\n' @@ -778,31 +1022,86 @@ async def translate_aio_errors( ''' aio_err: BaseException|None = chan._aio_err + trio_to_raise: ( + AsyncioCancelled| + AsyncioTaskExited| + None + ) = chan._trio_to_raise + + if ( + trio_to_raise + and + suppress_graceful_exits + ): + # import pdbp; pdbp.set_trace() + match ( + trio_to_raise, + trio_err, + ): + case ( + AsyncioTaskExited(), + None|trio.Cancelled(), + ): + log.info( + 'Ignoring aio exit signal since trio also exited!' + ) + return + + case ( + AsyncioCancelled(), + trio.Cancelled(), + ): + log.info( + 'Ignoring aio cancelled signal since trio was also cancelled!' + ) + return + + raise trio_to_raise from (aio_err or trio_err) # Check if the asyncio-side is the cause of the trio-side # error. - if ( + elif ( aio_err is not None and type(aio_err) is not AsyncioCancelled + # and ( + # type(aio_err) is not AsyncioTaskExited + # and + # not ya_trio_exited + # and + # not trio_err + # ) - # not isinstance(aio_err, CancelledError) - # type(aio_err) is not CancelledError + # TODO, case where trio_err is not None and + # aio_err is AsyncioTaskExited => raise eg! + # -[ ] maybe use a match bc this get's real + # complex fast XD + # + # or + # type(aio_err) is not AsyncioTaskExited + # and + # trio_err + # ) ): # always raise from any captured asyncio error if trio_err: raise trio_err from aio_err + # XXX NOTE! above in the `trio.ClosedResourceError` + # handler we specifically set the + # `aio_err = AsyncioCancelled` such that it is raised + # as that special exc here! raise aio_err if trio_err: raise trio_err + # await tractor.pause() # NOTE: if any ``asyncio`` error was caught, raise it here inline # here in the ``trio`` task # if trio_err: maybe_raise_aio_side_err( - trio_err=trio_err + trio_err=to_raise_trio or trio_err ) @@ -831,11 +1130,13 @@ async def run_task( wait_on_aio_task=True, ): # return single value that is the output from the - # ``asyncio`` function-as-task. Expect the mem chan api to - # do the job of handling cross-framework cancellations + # ``asyncio`` function-as-task. Expect the mem chan api + # to do the job of handling cross-framework cancellations # / errors via closure and translation in the - # ``translate_aio_errors()`` in the above ctx mngr. - return await chan.receive() + # `translate_aio_errors()` in the above ctx mngr. + + return await chan._from_aio.receive() + # return await chan.receive() @acm @@ -873,17 +1174,37 @@ async def open_channel_from( except trio.Cancelled as taskc: # await tractor.pause(shield=True) # ya it worx ;) if cs.cancel_called: - log.cancel( - f'trio-side was manually cancelled by aio side\n' - f'|_c>}}{cs!r}?\n' - ) + if isinstance(chan._trio_to_raise, AsyncioCancelled): + log.cancel( + f'trio-side was manually cancelled by aio side\n' + f'|_c>}}{cs!r}?\n' + ) # TODO, maybe a special `TrioCancelled`??? raise taskc finally: chan._trio_exited = True - chan._to_trio.close() + + # when the aio side is still ongoing but trio exits + # early we signal with a special exc (kinda like + # a `Return`-msg for IPC ctxs) + aio_task: asyncio.Task = chan._aio_task + if not aio_task.done(): + fut: asyncio.Future|None = aio_task._fut_waiter + if fut: + fut.set_exception( + TrioTaskExited( + f'but the child `asyncio` task is still running?\n' + f'>>\n' + f' |_{aio_task!r}\n' + ) + ) + else: + # XXX SHOULD NEVER HAPPEN! + await tractor.pause() + else: + chan._to_trio.close() class AsyncioRuntimeTranslationError(RuntimeError):