From d6a0c515ec7a811087cdd7bd61ce5ec34ffd9856 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Mar 2025 21:25:05 -0500 Subject: [PATCH] Add per-side graceful-exit/cancel excs-as-signals Such that any combination of task terminations/exits can be explicitly handled and "dual side independent" crash cases re-raised in egs. The main error-or-exit impl changes include, - use of new per-side "signaling exceptions": - TrioTaskExited|TrioCancelled for signalling aio. - AsyncioTaskExited|AsyncioCancelled for signalling trio. - NOT overloading the `LinkedTaskChannel._trio/aio_err` fields for err-as-signal relay and instead add a new pair of `._trio/aio_to_raise` maybe-exc-attrs which allow each side's task to specify what it would want the other side to raise to signal its/a termination outcome: - `._trio_to_raise: AsyncioTaskExited|AsyncioCancelled` to signal, |_ the aio task having returned while the trio side was still reading from the `asyncio.Queue` or is just not `.done()`. |_ the aio task being self or trio-request cancelled where a `asyncio.CancelledError` is raised and caught but NOT relayed as is back to trio; instead signal a "more explicit" exc type. - `._aio_to_raise: TrioTaskExited|TrioCancelled` to signal, |_ the trio task having returned while the aio side was still reading from the mem chan and indicating that the trio side might not care any more about future streamed values (like the `Stop/EndOfChannel` equivs for ipc `Context`s). |_ when the trio task canceld we do a `asyncio.Future.set_exception(TrioTaskExited())` to indicate to the aio side verbosely that it should cancel due to the trio parent. - `_aio/trio_err` are now left to only capturing the **actual** per-side task excs for introspection / other side's handling logic. - supporting "graceful exits" depending on API in use from `translate_aio_errors()` such that if either side exits but the other side isn't expect to consume the final `return`ed value, we just exit silently, which required: - adding a `suppress_graceful_exits: bool` flag. - adjusting the `maybe_raise_aio_side_err()` logic to use that flag and suppress only on certain combos of `._trio_to_raise/._trio_err`. - prefer to raise `._trio_to_raise` when the aio-side is the src and vice versa. - filling out pedantic logging for cancellation cases indicating which side is the cause. - add a `LinkedTaskChannel._aio_result` modelled after our `Context._result` a a similar `.wait_for_result()` interface which allows maybe accessing the aio task's final return value if desired when using the `open_channel_from()` API. - rename `cancel_trio()` done handler -> `signal_trio_when_done()` Also some fairly major test suite updates, - add a `delay: int` producing fixture which delivers a much larger timeout whenever `debug_mode` is set so that the REPL can be used without a surrounding cancel firing. - add a new `test_aio_exits_early_relays_AsyncioTaskExited` including a paired `exit_early: bool` flag to `push_from_aio_task()`. - adjust `test_trio_closes_early_causes_aio_checkpoint_raise` to expect a `to_asyncio.TrioTaskExited`. --- tests/test_infected_asyncio.py | 237 ++++++++--- tractor/_exceptions.py | 52 ++- tractor/to_asyncio.py | 740 ++++++++++++++++++++++++--------- 3 files changed, 775 insertions(+), 254 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index d462f59..7787756 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -32,6 +32,17 @@ from tractor.trionics import BroadcastReceiver from tractor._testing import expect_ctxc +@pytest.fixture( + scope='module', + # autouse=True, +) +def delay(debug_mode: bool) -> int: + if debug_mode: + return 999 + else: + return 1 + + async def sleep_and_err( sleep_for: float = 0.1, @@ -59,20 +70,24 @@ async def trio_cancels_single_aio_task(): await tractor.to_asyncio.run_task(aio_sleep_forever) -def test_trio_cancels_aio_on_actor_side(reg_addr): +def test_trio_cancels_aio_on_actor_side( + reg_addr: tuple[str, int], + delay: int, +): ''' Spawn an infected actor that is cancelled by the ``trio`` side task using std cancel scope apis. ''' async def main(): - async with tractor.open_nursery( - registry_addrs=[reg_addr] - ) as n: - await n.run_in_actor( - trio_cancels_single_aio_task, - infect_asyncio=True, - ) + with trio.fail_after(1 + delay): + async with tractor.open_nursery( + registry_addrs=[reg_addr] + ) as n: + await n.run_in_actor( + trio_cancels_single_aio_task, + infect_asyncio=True, + ) trio.run(main) @@ -116,7 +131,9 @@ async def asyncio_actor( raise -def test_aio_simple_error(reg_addr): +def test_aio_simple_error( + reg_addr: tuple[str, int], +): ''' Verify a simple remote asyncio error propagates back through trio to the parent actor. @@ -153,7 +170,9 @@ def test_aio_simple_error(reg_addr): assert err.boxed_type is AssertionError -def test_tractor_cancels_aio(reg_addr): +def test_tractor_cancels_aio( + reg_addr: tuple[str, int], +): ''' Verify we can cancel a spawned asyncio task gracefully. @@ -172,7 +191,9 @@ def test_tractor_cancels_aio(reg_addr): trio.run(main) -def test_trio_cancels_aio(reg_addr): +def test_trio_cancels_aio( + reg_addr: tuple[str, int], +): ''' Much like the above test with ``tractor.Portal.cancel_actor()`` except we just use a standard ``trio`` cancellation api. @@ -203,7 +224,8 @@ async def trio_ctx( # this will block until the ``asyncio`` task sends a "first" # message. - with trio.fail_after(2): + delay: int = 999 if tractor.debug_mode() else 1 + with trio.fail_after(1 + delay): try: async with ( trio.open_nursery( @@ -239,7 +261,8 @@ async def trio_ctx( ids='parent_actor_cancels_child={}'.format ) def test_context_spawns_aio_task_that_errors( - reg_addr, + reg_addr: tuple[str, int], + delay: int, parent_cancels: bool, ): ''' @@ -249,7 +272,7 @@ def test_context_spawns_aio_task_that_errors( ''' async def main(): - with trio.fail_after(2): + with trio.fail_after(1 + delay): async with tractor.open_nursery() as n: p = await n.start_actor( 'aio_daemon', @@ -322,11 +345,12 @@ async def aio_cancel(): def test_aio_cancelled_from_aio_causes_trio_cancelled( reg_addr: tuple, + delay: int, ): ''' - When the `asyncio.Task` cancels itself the `trio` side cshould + When the `asyncio.Task` cancels itself the `trio` side should also cancel and teardown and relay the cancellation cross-process - to the caller (parent). + to the parent caller. ''' async def main(): @@ -342,7 +366,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( # NOTE: normally the `an.__aexit__()` waits on the # portal's result but we do it explicitly here # to avoid indent levels. - with trio.fail_after(1): + with trio.fail_after(1 + delay): await p.wait_for_result() with pytest.raises( @@ -353,11 +377,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( # might get multiple `trio.Cancelled`s as well inside an inception err: RemoteActorError|ExceptionGroup = excinfo.value if isinstance(err, ExceptionGroup): - err = next(itertools.dropwhile( - lambda exc: not isinstance(exc, tractor.RemoteActorError), - err.exceptions - )) - assert err + excs = err.exceptions + assert len(excs) == 1 + final_exc = excs[0] + assert isinstance(final_exc, tractor.RemoteActorError) # relayed boxed error should be our `trio`-task's # cancel-signal-proxy-equivalent of `asyncio.CancelledError`. @@ -370,15 +393,18 @@ async def no_to_trio_in_args(): async def push_from_aio_task( - sequence: Iterable, to_trio: trio.abc.SendChannel, expect_cancel: False, fail_early: bool, + exit_early: bool, ) -> None: try: + # print('trying breakpoint') + # breakpoint() + # sync caller ctx manager to_trio.send_nowait(True) @@ -387,10 +413,27 @@ async def push_from_aio_task( to_trio.send_nowait(i) await asyncio.sleep(0.001) - if i == 50 and fail_early: - raise Exception + if ( + i == 50 + ): + if fail_early: + print('Raising exc from aio side!') + raise Exception - print('asyncio streamer complete!') + if exit_early: + # TODO? really you could enforce the same + # SC-proto we use for actors here with asyncio + # such that a Return[None] msg would be + # implicitly delivered to the trio side? + # + # XXX => this might be the end-all soln for + # converting any-inter-task system (regardless + # of maybe-remote runtime or language) to be + # SC-compat no? + print(f'asyncio breaking early @ {i!r}') + break + + print('asyncio streaming complete!') except asyncio.CancelledError: if not expect_cancel: @@ -402,9 +445,10 @@ async def push_from_aio_task( async def stream_from_aio( - exit_early: bool = False, - raise_err: bool = False, + trio_exit_early: bool = False, + trio_raise_err: bool = False, aio_raise_err: bool = False, + aio_exit_early: bool = False, fan_out: bool = False, ) -> None: @@ -417,8 +461,17 @@ async def stream_from_aio( async with to_asyncio.open_channel_from( push_from_aio_task, sequence=seq, - expect_cancel=raise_err or exit_early, + expect_cancel=trio_raise_err or trio_exit_early, fail_early=aio_raise_err, + exit_early=aio_exit_early, + + # such that we can test exit early cases + # for each side explicitly. + suppress_graceful_exits=(not( + aio_exit_early + or + trio_exit_early + )) ) as (first, chan): @@ -435,9 +488,9 @@ async def stream_from_aio( pulled.append(value) if value == 50: - if raise_err: + if trio_raise_err: raise Exception - elif exit_early: + elif trio_exit_early: print('`consume()` breaking early!\n') break @@ -471,10 +524,14 @@ async def stream_from_aio( finally: - if ( - not raise_err and - not exit_early and - not aio_raise_err + if not ( + trio_raise_err + or + trio_exit_early + or + aio_raise_err + or + aio_exit_early ): if fan_out: # we get double the pulled values in the @@ -484,6 +541,7 @@ async def stream_from_aio( assert list(sorted(pulled)) == expect else: + # await tractor.pause() assert pulled == expect else: assert not fan_out @@ -497,7 +555,10 @@ async def stream_from_aio( 'fan_out', [False, True], ids='fan_out_w_chan_subscribe={}'.format ) -def test_basic_interloop_channel_stream(reg_addr, fan_out): +def test_basic_interloop_channel_stream( + reg_addr: tuple[str, int], + fan_out: bool, +): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -517,7 +578,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr): async with tractor.open_nursery() as n: portal = await n.run_in_actor( stream_from_aio, - raise_err=True, + trio_raise_err=True, infect_asyncio=True, ) # should trigger remote actor error @@ -530,42 +591,114 @@ def test_trio_error_cancels_intertask_chan(reg_addr): excinfo.value.boxed_type is Exception -def test_trio_closes_early_and_channel_exits( +def test_trio_closes_early_causes_aio_checkpoint_raise( reg_addr: tuple[str, int], + delay: int, ): ''' - Check that if the `trio`-task "exits early" on `async for`ing the - inter-task-channel (via a `break`) we exit silently from the - `open_channel_from()` block and get a final `Return[None]` msg. + Check that if the `trio`-task "exits early and silently" (in this + case during `async for`-ing the inter-task-channel via + a `break`-from-loop), we raise `TrioTaskExited` on the + `asyncio`-side which also then bubbles up through the + `open_channel_from()` block indicating that the `asyncio.Task` + hit a ran another checkpoint despite the `trio.Task` exit. ''' async def main(): - with trio.fail_after(2): + with trio.fail_after(1 + delay): async with tractor.open_nursery( # debug_mode=True, # enable_stack_on_sig=True, ) as n: portal = await n.run_in_actor( stream_from_aio, - exit_early=True, + trio_exit_early=True, infect_asyncio=True, ) # should raise RAE diectly print('waiting on final infected subactor result..') res: None = await portal.wait_for_result() assert res is None - print('infected subactor returned result: {res!r}\n') + print(f'infected subactor returned result: {res!r}\n') # should be a quiet exit on a simple channel exit - trio.run( - main, - # strict_exception_groups=False, - ) + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure remote error is an explicit `AsyncioCancelled` sub-type + # which indicates to the aio task that the trio side exited + # silently WITHOUT raising a `trio.Cancelled` (which would + # normally be raised instead as a `AsyncioCancelled`). + excinfo.value.boxed_type is to_asyncio.TrioTaskExited -def test_aio_errors_and_channel_propagates_and_closes(reg_addr): +def test_aio_exits_early_relays_AsyncioTaskExited( + # TODO, parametrize the 3 possible trio side conditions: + # - trio blocking on receive, aio exits early + # - trio cancelled AND aio exits early on its next tick + # - trio errors AND aio exits early on its next tick + reg_addr: tuple[str, int], + debug_mode: bool, + delay: int, +): + ''' + Check that if the `asyncio`-task "exits early and silently" (in this + case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel` + it `break`s from the loop), we raise `AsyncioTaskExited` on the + `trio`-side which then DOES NOT BUBBLE up through the + `open_channel_from()` block UNLESS, + + - the trio.Task also errored/cancelled, in which case we wrap + both errors in an eg + - the trio.Task was blocking on rxing a value from the + `InterLoopTaskChannel`. + + ''' async def main(): - async with tractor.open_nursery() as n: + with trio.fail_after(1 + delay): + async with tractor.open_nursery( + debug_mode=debug_mode, + # enable_stack_on_sig=True, + ) as an: + portal = await an.run_in_actor( + stream_from_aio, + infect_asyncio=True, + trio_exit_early=False, + aio_exit_early=True, + ) + # should raise RAE diectly + print('waiting on final infected subactor result..') + res: None = await portal.wait_for_result() + assert res is None + print(f'infected subactor returned result: {res!r}\n') + + # should be a quiet exit on a simple channel exit + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + exc = excinfo.value + + # TODO, wow bug! + # -[ ] bp handler not replaced!?!? + # breakpoint() + + # import pdbp; pdbp.set_trace() + + # ensure remote error is an explicit `AsyncioCancelled` sub-type + # which indicates to the aio task that the trio side exited + # silently WITHOUT raising a `trio.Cancelled` (which would + # normally be raised instead as a `AsyncioCancelled`). + assert exc.boxed_type is to_asyncio.AsyncioTaskExited + + +def test_aio_errors_and_channel_propagates_and_closes( + reg_addr: tuple[str, int], + debug_mode: bool, +): + async def main(): + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as n: portal = await n.run_in_actor( stream_from_aio, aio_raise_err=True, @@ -852,6 +985,8 @@ def test_sigint_closes_lifetime_stack( ''' async def main(): + + delay = 999 if tractor.debug_mode() else 1 try: an: tractor.ActorNursery async with tractor.open_nursery( @@ -902,7 +1037,7 @@ def test_sigint_closes_lifetime_stack( if wait_for_ctx: print('waiting for ctx outcome in parent..') try: - with trio.fail_after(1): + with trio.fail_after(1 + delay): await ctx.wait_for_result() except tractor.ContextCancelled as ctxc: assert ctxc.canceller == ctx.chan.uid diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index b4386db..3382be1 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -82,6 +82,39 @@ class InternalError(RuntimeError): ''' +class AsyncioCancelled(Exception): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + NOTE: this should NOT inherit from `asyncio.CancelledError` or + tests should break! + + ''' + + +class AsyncioTaskExited(Exception): + ''' + asyncio.Task "exited" translation error for use with the + `to_asyncio` APIs to be raised in the `trio` side task indicating + on `.run_task()`/`.open_channel_from()` exit that the aio side + exited early/silently. + + ''' + +class TrioTaskExited(AsyncioCancelled): + ''' + The `trio`-side task exited without explicitly cancelling the + `asyncio.Task` peer. + + This is very similar to how `trio.ClosedResource` acts as + a "clean shutdown" signal to the consumer side of a mem-chan, + + https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels + + ''' + # NOTE: more or less should be close to these: # 'boxed_type', @@ -127,8 +160,8 @@ _body_fields: list[str] = list( def get_err_type(type_name: str) -> BaseException|None: ''' - Look up an exception type by name from the set of locally - known namespaces: + Look up an exception type by name from the set of locally known + namespaces: - `builtins` - `tractor._exceptions` @@ -358,6 +391,13 @@ class RemoteActorError(Exception): self._ipc_msg.src_type_str ) + if not self._src_type: + raise TypeError( + f'Failed to lookup src error type with ' + f'`tractor._exceptions.get_err_type()` :\n' + f'{self.src_type_str}' + ) + return self._src_type @property @@ -652,16 +692,10 @@ class RemoteActorError(Exception): failing actor's remote env. ''' - src_type_ref: Type[BaseException] = self.src_type - if not src_type_ref: - raise TypeError( - 'Failed to lookup src error type:\n' - f'{self.src_type_str}' - ) - # TODO: better tb insertion and all the fancier dunder # metadata stuff as per `.__context__` etc. and friends: # https://github.com/python-trio/trio/issues/611 + src_type_ref: Type[BaseException] = self.src_type return src_type_ref(self.tb_str) # TODO: local recontruction of nested inception for a given diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 75dfb5c..f65cc7e 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -20,7 +20,12 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' from __future__ import annotations import asyncio -from asyncio.exceptions import CancelledError +from asyncio.exceptions import ( + CancelledError, +) +from asyncio import ( + QueueShutDown, +) from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect @@ -34,12 +39,18 @@ from typing import ( import tractor from tractor._exceptions import ( + InternalError, is_multi_cancelled, + TrioTaskExited, + TrioCancelled, + AsyncioTaskExited, + AsyncioCancelled, ) from tractor._state import ( debug_mode, _runtime_vars, ) +from tractor._context import Unresolved from tractor.devx import _debug from tractor.log import ( get_logger, @@ -69,6 +80,21 @@ __all__ = [ ] +# TODO, generally speaking we can generalize this abstraction, a "SC linked +# parent->child task pair", as the same "supervision scope primitive" +# **that is** our `._context.Context` with the only difference being +# in how the tasks conduct msg-passing comms. +# +# For `LinkedTaskChannel` we are passing the equivalent of (once you +# include all the recently added `._trio/aio_to_raise` +# exd-as-signals) our SC-dialog-proto over each asyncIO framework's +# mem-chan impl, +# +# verus in `Context` +# +# We are doing the same thing but msg-passing comms happens over an +# IPC transport between tasks in different memory domains. + @dataclass class LinkedTaskChannel( trio.abc.Channel, @@ -84,18 +110,85 @@ class LinkedTaskChannel( ''' _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel + _to_trio: trio.MemorySendChannel _trio_cs: trio.CancelScope _trio_task: trio.Task _aio_task_complete: trio.Event + _suppress_graceful_exits: bool = True + _trio_err: BaseException|None = None + _trio_to_raise: ( + AsyncioTaskExited| # aio task exits while trio ongoing + AsyncioCancelled| # aio task is (self-)cancelled + BaseException| + None + ) = None _trio_exited: bool = False - # set after ``asyncio.create_task()`` - # _aio_first: Any|None = None + # set after `asyncio.create_task()` _aio_task: asyncio.Task|None = None _aio_err: BaseException|None = None + _aio_to_raise: ( + TrioTaskExited| # trio task exits while aio ongoing + BaseException| + None + ) = None + # _aio_first: Any|None = None # TODO? + _aio_result: Any|Unresolved = Unresolved + + def _final_result_is_set(self) -> bool: + return self._aio_result is not Unresolved + + # TODO? equiv from `Context`? + # @property + # def has_outcome(self) -> bool: + # return ( + # bool(self.maybe_error) + # or + # self._final_result_is_set() + # ) + + async def wait_for_result( + self, + hide_tb: bool = True, + + ) -> Any: + ''' + Wait for the `asyncio.Task.result()` from `trio` + + ''' + __tracebackhide__: bool = hide_tb + assert self._portal, ( + '`Context.wait_for_result()` can not be called from callee side!' + ) + if self._final_result_is_set(): + return self._aio_result + + async with translate_aio_errors( + chan=self, + wait_aio_task=False, + ): + await self._aio_task_complete.wait() + + if ( + not self._final_result_is_set() + ): + if (trio_to_raise := self._trio_to_raise): + raise trio_to_raise from self._aio_err + + elif aio_err := self._aio_err: + raise aio_err + + else: + raise InternalError( + f'Asyncio-task has no result or error set !?\n' + f'{self._aio_task}' + ) + + return self._aio_result + _broadcaster: BroadcastReceiver|None = None async def aclose(self) -> None: @@ -137,7 +230,9 @@ class LinkedTaskChannel( return await self._from_aio.receive() except BaseException as err: async with translate_aio_errors( - self, + chan=self, + # NOTE, determined by `open_channel_from()` input arg + suppress_graceful_exits=self._suppress_graceful_exits, # XXX: obviously this will deadlock if an on-going stream is # being procesed. @@ -154,8 +249,9 @@ class LinkedTaskChannel( ''' self._to_aio.put_nowait(item) - async def wait_aio_complete(self) -> None: - await self._aio_task_complete.wait() + # TODO? needed? + # async def wait_aio_complete(self) -> None: + # await self._aio_task_complete.wait() def cancel_asyncio_task( self, @@ -208,6 +304,7 @@ def _run_asyncio_task( *, qsize: int = 1, provide_channels: bool = False, + suppress_graceful_exits: bool = True, hide_tb: bool = False, **kwargs, @@ -260,6 +357,7 @@ def _run_asyncio_task( _trio_cs=trio_cs, _trio_task=trio_task, _aio_task_complete=aio_task_complete, + _suppress_graceful_exits=suppress_graceful_exits, ) async def wait_on_coro_final_result( @@ -269,17 +367,16 @@ def _run_asyncio_task( ) -> None: ''' - Await `coro` and relay result back to `trio`. - - This can only be run as an `asyncio.Task`! + Await input `coro` as/in an `asyncio.Task` and deliver final + `return`-ed result back to `trio`. ''' - nonlocal aio_err nonlocal chan orig = result = id(coro) try: - result = await coro + result: Any = await coro + chan._aio_result = result except BaseException as aio_err: chan._aio_err = aio_err if isinstance(aio_err, CancelledError): @@ -291,7 +388,6 @@ def _run_asyncio_task( '`asyncio` task errored\n' ) raise - else: if ( result != orig @@ -306,22 +402,46 @@ def _run_asyncio_task( to_trio.send_nowait(result) finally: - # breakpoint() - # import pdbp; pdbp.set_trace() - # if the task was spawned using `open_channel_from()` # then we close the channels on exit. if provide_channels: + # breakpoint() # TODO! why no work!? + # import pdbp; pdbp.set_trace() + + # IFF there is a blocked trio waiter, we set the + # aio-side error to be an explicit "exited early" + # (much like a `Return` in our SC IPC proto) for the + # `.open_channel_from()` case where the parent trio + # task might not wait directly for a final returned + # result (i.e. the trio side might be waiting on + # a streamed value) - this is a signal that the + # asyncio.Task has returned early! + # + # TODO, solve other cases where trio side might, + # - raise Cancelled but aio side exits on next tick. + # - raise error but aio side exits on next tick. + # - raise error and aio side errors "independently" + # on next tick (SEE draft HANDLER BELOW). + stats: trio.MemoryChannelStatistics = to_trio.statistics() + if ( + stats.tasks_waiting_receive + and + not chan._aio_err + ): + chan._trio_to_raise = AsyncioTaskExited( + f'Task existed with final result\n' + f'{result!r}\n' + ) + # only close the sender side which will relay - # a ``trio.EndOfChannel`` to the trio (consumer) side. + # a `trio.EndOfChannel` to the trio (consumer) side. to_trio.close() aio_task_complete.set() - # await asyncio.sleep(0.1) - log.info( - f'`asyncio` task terminated\n' - f'x)>\n' - f' |_{task}\n' + log.runtime( + f'`asyncio` task completed\n' + f')>\n' + f' |_{task}\n' ) # start the asyncio task we submitted from trio @@ -331,6 +451,7 @@ def _run_asyncio_task( f'{coro!r}' ) + # schedule the (bg) `asyncio.Task` task: asyncio.Task = asyncio.create_task( wait_on_coro_final_result( to_trio, @@ -359,16 +480,36 @@ def _run_asyncio_task( ) greenback.bestow_portal(task) - def cancel_trio( + def signal_trio_when_done( task: asyncio.Task, ) -> None: ''' - Cancel the parent `trio` task on any error raised by the - `asyncio` side. + Maybe-cancel, relay-and-raise an error to, OR pack a final + `return`-value for the parent (in SC terms) `trio.Task` on + completion of the `asyncio.Task`. + + Note for certain "edge" scheduling-race-conditions we allow + the aio side to dictate dedicated `tractor`-defined excs to + be raised in the `trio` parent task; the intention is to + indicate those races in a VERY pedantic manner! ''' nonlocal chan - relayed_aio_err: BaseException|None = chan._aio_err + trio_err: BaseException|None = chan._trio_err + + # XXX, since the original error we read from the asyncio.Task + # might change between BEFORE and AFTER we here call + # `asyncio.Task.result()` + # + # -> THIS is DUE TO US in `translate_aio_errors()`! + # + # => for example we might set a special exc + # (`AsyncioCancelled|AsyncioTaskExited`) meant to be raised + # in trio (and maybe absorbed depending on the called API) + # BEFORE this done-callback is invoked by `asyncio`'s + # runtime. + trio_to_raise: BaseException|None = chan._trio_to_raise + orig_aio_err: BaseException|None = chan._aio_err aio_err: BaseException|None = None # only to avoid `asyncio` complaining about uncaptured @@ -376,24 +517,45 @@ def _run_asyncio_task( try: res: Any = task.result() log.info( - '`trio` received final result from {task}\n' - f'|_{res}\n' + f'`trio` received final result from `asyncio` task,\n' + f')> {res}\n' + f' |_{task}\n' ) + if not chan._aio_result: + chan._aio_result = res + + # ?TODO, should we also raise `AsyncioTaskExited[res]` + # in any case where trio is NOT blocking on the + # `._to_trio` chan? + # + # -> ?NO RIGHT? since the + # `open_channel_from().__aexit__()` should detect this + # and then set any final `res` from above as a field + # that can optionally be read by the trio-paren-task as + # needed (just like in our + # `Context.wait_for_result()/.result` API yah? + # + # if provide_channels: + except BaseException as _aio_err: aio_err: BaseException = _aio_err - # read again AFTER the `asyncio` side errors in case + + # READ AGAIN, AFTER the `asyncio` side errors, in case # it was cancelled due to an error from `trio` (or # some other out of band exc) and then set to something # else? - relayed_aio_err: BaseException|None = chan._aio_err + curr_aio_err: BaseException|None = chan._aio_err # always true right? assert ( - type(_aio_err) is type(relayed_aio_err) + type(aio_err) + is type(orig_aio_err) + is type(curr_aio_err) ), ( f'`asyncio`-side task errors mismatch?!?\n\n' f'(caught) aio_err: {aio_err}\n' - f'chan._aio_err: {relayed_aio_err}\n' + f'ORIG chan._aio_err: {orig_aio_err}\n' + f'chan._aio_err: {curr_aio_err}\n' ) msg: str = ( @@ -401,7 +563,7 @@ def _run_asyncio_task( '{etype_str}\n' # ^NOTE filled in below ) - if isinstance(_aio_err, CancelledError): + if isinstance(aio_err, CancelledError): msg += ( f'c)>\n' f' |_{task}\n' @@ -409,6 +571,28 @@ def _run_asyncio_task( log.cancel( msg.format(etype_str='cancelled') ) + + # XXX when the asyncio.Task exits early (before the trio + # side) we relay through an exc-as-signal which is + # normally suppressed unless the trio.Task also errors + # + # ?TODO, is this even needed (does it happen) now? + elif isinstance(aio_err, QueueShutDown): + # import pdbp; pdbp.set_trace() + trio_err = AsyncioTaskExited( + 'Task exited before `trio` side' + ) + if not chan._trio_err: + chan._trio_err = trio_err + + msg += ( + f')>\n' + f' |_{task}\n' + ) + log.info( + msg.format(etype_str='exited') + ) + else: msg += ( f'x)>\n' @@ -418,13 +602,20 @@ def _run_asyncio_task( msg.format(etype_str='errored') ) + # is trio the src of the aio task's exc-as-outcome? trio_err: BaseException|None = chan._trio_err + curr_aio_err: BaseException|None = chan._aio_err if ( - relayed_aio_err + curr_aio_err or trio_err + or + trio_to_raise ): - # import pdbp; pdbp.set_trace() + # XXX, if not already, ALWAYs cancel the trio-side on an + # aio-side error or early return. In the case where the trio task is + # blocking on a checkpoint or `asyncio.Queue.get()`. + # NOTE: currently mem chan closure may act as a form # of error relay (at least in the `asyncio.CancelledError` # case) since we have no way to directly trigger a `trio` @@ -432,30 +623,18 @@ def _run_asyncio_task( # We might want to change this in the future though. from_aio.close() - # wait, wut? - # aio_err.with_traceback(aio_err.__traceback__) - - # TODO: show when cancellation originated - # from each side more pedantically in log-msg? - # elif ( - # type(aio_err) is CancelledError - # and # trio was the cause? - # trio_cs.cancel_called - # ): - # log.cancel( - # 'infected task was cancelled by `trio`-side' - # ) - # raise aio_err from task_err - - # XXX: if not already, alway cancel the scope on a task - # error in case the trio task is blocking on - # a checkpoint. if ( not trio_cs.cancelled_caught or not trio_cs.cancel_called ): - # import pdbp; pdbp.set_trace() + log.cancel( + f'Cancelling `trio` side due to aio-side src exc\n' + f'{curr_aio_err}\n' + f'\n' + f'(c>\n' + f' |_{trio_task}\n' + ) trio_cs.cancel() # maybe the `trio` task errored independent from the @@ -478,28 +657,36 @@ def _run_asyncio_task( # for reproducing detailed edge cases as per the above # cases. # + trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise + aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise if ( + not chan._aio_result + and not trio_cs.cancelled_caught - and - (trio_err := chan._trio_err) - and - type(trio_err) not in { - trio.Cancelled, - } and ( - aio_err - and - type(aio_err) not in { + (aio_err and type(aio_err) not in { asyncio.CancelledError - } + }) + or + aio_to_raise + ) + and ( + ((trio_err := chan._trio_err) and type(trio_err) not in { + trio.Cancelled, + }) + or + trio_to_raise ) ): eg = ExceptionGroup( 'Both the `trio` and `asyncio` tasks errored independently!!\n', - (trio_err, aio_err), + ( + trio_to_raise or trio_err, + aio_to_raise or aio_err, + ), ) - chan._trio_err = eg - chan._aio_err = eg + # chan._trio_err = eg + # chan._aio_err = eg raise eg elif aio_err: @@ -507,45 +694,34 @@ def _run_asyncio_task( # match the one we just caught from the task above! # (that would indicate something weird/very-wrong # going on?) - if aio_err is not relayed_aio_err: - raise aio_err from relayed_aio_err + if ( + aio_err is not trio_to_raise + and ( + not suppress_graceful_exits + and ( + chan._aio_result is not Unresolved + and + isinstance(trio_to_raise, AsyncioTaskExited) + ) + ) + ): + # raise aio_err from relayed_aio_err + raise trio_to_raise from curr_aio_err raise aio_err - task.add_done_callback(cancel_trio) + task.add_done_callback(signal_trio_when_done) return chan -class AsyncioCancelled(Exception): - ''' - Asyncio cancelled translation (non-base) error - for use with the ``to_asyncio`` module - to be raised in the ``trio`` side task - - NOTE: this should NOT inherit from `asyncio.CancelledError` or - tests should break! - - ''' - - -class TrioTaskExited(AsyncioCancelled): - ''' - The `trio`-side task exited without explicitly cancelling the - `asyncio.Task` peer. - - This is very similar to how `trio.ClosedResource` acts as - a "clean shutdown" signal to the consumer side of a mem-chan, - - https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels - - ''' - - @acm async def translate_aio_errors( chan: LinkedTaskChannel, wait_on_aio_task: bool = False, cancel_aio_task_on_trio_exit: bool = True, + suppress_graceful_exits: bool = True, + + hide_tb: bool = True, ) -> AsyncIterator[None]: ''' @@ -558,17 +734,20 @@ async def translate_aio_errors( appropriately translates errors and cancels into ``trio`` land. ''' + __tracebackhide__: bool = hide_tb + trio_task = trio.lowlevel.current_task() - - aio_err: BaseException|None = None - + aio_err: BaseException|None = chan._aio_err aio_task: asyncio.Task = chan._aio_task + aio_done_before_trio: bool = aio_task.done() assert aio_task trio_err: BaseException|None = None + to_raise_trio: BaseException|None = None try: yield # back to one of the cross-loop apis except trio.Cancelled as taskc: trio_err = taskc + chan._trio_err = trio_err # should NEVER be the case that `trio` is cancel-handling # BEFORE the other side's task-ref was set!? @@ -577,12 +756,12 @@ async def translate_aio_errors( # import pdbp; pdbp.set_trace() # lolevel-debug # relay cancel through to called `asyncio` task - chan._aio_err = AsyncioCancelled( + chan._aio_to_raise = TrioCancelled( f'trio`-side cancelled the `asyncio`-side,\n' f'c)>\n' - f' |_{trio_task}\n\n' - - f'{trio_err!r}\n' + f' |_{trio_task}\n' + f'\n' + f'trio src exc: {trio_err!r}\n' ) # XXX NOTE XXX seems like we can get all sorts of unreliable @@ -595,22 +774,32 @@ async def translate_aio_errors( # ) # raise + # XXX always passthrough EoC since this translator is often + # called from `LinkedTaskChannel.receive()` which we want + # passthrough and further we have no special meaning for it in + # terms of relaying errors or signals from the aio side! + except trio.EndOfChannel: + raise + # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # task-done-callback. + # + # when the aio side is (possibly self-)cancelled it will close + # the `chan._to_trio` and thus trigger the trio side to raise + # a dedicated `AsyncioCancelled` except ( trio.ClosedResourceError, - # trio.BrokenResourceError, ) as cre: - trio_err = cre + chan._trio_err = cre aio_err = chan._aio_err - # import pdbp; pdbp.set_trace() # XXX if an underlying `asyncio.CancelledError` triggered # this channel close, raise our (non-`BaseException`) wrapper # exception (`AsyncioCancelled`) from that source error. if ( # aio-side is cancelled? - aio_task.cancelled() # not set until it terminates?? + # |_ first not set until it terminates?? + aio_task.cancelled() and type(aio_err) is CancelledError @@ -618,32 +807,26 @@ async def translate_aio_errors( # silent-exit-by-`trio` case? # -[ ] the parent task can also just catch it though? # -[ ] OR, offer a `signal_aio_side_on_exit=True` ?? - # - # or - # aio_err is None - # and - # chan._trio_exited - ): - raise AsyncioCancelled( + # await tractor.pause(shield=True) + chan._trio_to_raise = AsyncioCancelled( f'asyncio`-side cancelled the `trio`-side,\n' f'c(>\n' f' |_{aio_task}\n\n' - f'{trio_err!r}\n' - ) from aio_err + f'(triggered on the `trio`-side by a {cre!r})\n' + ) + # TODO?? needed or does this just get reraised in the + # `finally:` block below? + # raise to_raise_trio from aio_err # maybe the chan-closure is due to something else? else: - raise + raise cre except BaseException as _trio_err: - # await tractor.pause(shield=True) - trio_err = _trio_err - log.exception( - '`trio`-side task errored?' - ) - + trio_err = chan._trio_err = trio_err + # await tractor.pause(shield=True) # workx! entered: bool = await _debug._maybe_enter_pm( trio_err, api_frame=inspect.currentframe(), @@ -653,89 +836,177 @@ async def translate_aio_errors( and not is_multi_cancelled(trio_err) ): - log.exception('actor crashed\n') + log.exception( + '`trio`-side task errored?' + ) + # __tracebackhide__: bool = False - aio_taskc = AsyncioCancelled( - f'`trio`-side task errored!\n' - f'{trio_err}' - ) #from trio_err + # TODO, just a log msg here indicating the scope closed + # and that the trio-side expects that and what the final + # result from the aio side was? + # + # if isinstance(chan._aio_err, AsyncioTaskExited): + # await tractor.pause(shield=True) - try: - aio_task.set_exception(aio_taskc) - except ( - asyncio.InvalidStateError, - RuntimeError, - # ^XXX, uhh bc apparently we can't use `.set_exception()` - # any more XD .. ?? + # if aio side is still active cancel it due to the trio-side + # error! + # ?TODO, mk `AsyncioCancelled[typeof(trio_err)]` embed the + # current exc? + if ( + # not aio_task.cancelled() + # and + not aio_task.done() # TODO? only need this one? + + # XXX LOL, so if it's not set it's an error !? + # yet another good jerb by `ascyncio`.. + # and + # not aio_task.exception() ): + aio_taskc = TrioCancelled( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) + aio_task.set_exception(aio_taskc) wait_on_aio_task = False - - # import pdbp; pdbp.set_trace() - # raise aio_taskc from trio_err + # try: + # aio_task.set_exception(aio_taskc) + # except ( + # asyncio.InvalidStateError, + # RuntimeError, + # # ^XXX, uhh bc apparently we can't use `.set_exception()` + # # any more XD .. ?? + # ): + # wait_on_aio_task = False finally: # record wtv `trio`-side error transpired - chan._trio_err = trio_err - ya_trio_exited: bool = chan._trio_exited + if trio_err: + if chan._trio_err is not trio_err: + await tractor.pause(shield=True) - # NOTE! by default always cancel the `asyncio` task if + # assert chan._trio_err is trio_err + + ya_trio_exited: bool = chan._trio_exited + graceful_trio_exit: bool = ( + ya_trio_exited + and + not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. + ) + + # XXX NOTE! XXX by default always cancel the `asyncio` task if # we've made it this far and it's not done. # TODO, how to detect if there's an out-of-band error that # caused the exit? if ( - cancel_aio_task_on_trio_exit - and not aio_task.done() - and - aio_err + and ( + cancel_aio_task_on_trio_exit + # and + # chan._aio_err # TODO, if it's not .done() is this possible? - # or the trio side has exited it's surrounding cancel scope - # indicating the lifetime of the ``asyncio``-side task - # should also be terminated. - or ( - ya_trio_exited - and - not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. + # did the `.open_channel_from()` parent caller already + # (gracefully) exit scope before this translator was + # invoked? + # => since we couple the lifetime of the `asyncio.Task` + # to the `trio` parent task, it should should also be + # terminated via either, + # + # 1. raising an explicit `TrioTaskExited|TrioCancelled` + # in task via `asyncio.Task._fut_waiter.set_exception()` + # + # 2. or (worst case) by cancelling the aio task using + # the std-but-never-working `asyncio.Task.cancel()` + # (which i can't figure out why that nor + # `Task.set_exception()` seem to never ever do the + # rignt thing! XD). + or + graceful_trio_exit ) ): report: str = ( 'trio-side exited silently!' ) - assert not aio_err, 'WTF how did asyncio do this?!' + assert not chan._aio_err, ( + 'WTF why duz asyncio have err but not dun?!' + ) - # if the `trio.Task` already exited the `open_channel_from()` - # block we ensure the asyncio-side gets signalled via an - # explicit exception and its `Queue` is shutdown. + # if the `trio.Task` terminated without raising + # `trio.Cancelled` (curently handled above) there's + # 2 posibilities, + # + # i. it raised a `trio_err` + # ii. it did a "silent exit" where the + # `open_channel_from().__aexit__()` phase ran without + # any raise or taskc (task cancel) and no final result + # was collected (yet) from the aio side. + # + # SO, ensure the asyncio-side is notified and terminated + # by a dedicated exc-as-signal which distinguishes + # various aio-task-state at termination cases. + # + # Consequently if the aio task doesn't absorb said + # exc-as-signal, the trio side should then see the same exc + # propagate up through the .open_channel_from() call to + # the parent task. + # + # if the `trio.Task` already exited (only can happen for + # the `open_channel_from()` use case) block due to to + # either plain ol' graceful `__aexit__()` or due to taskc + # or an error, we ensure the aio-side gets signalled via + # an explicit exception and its `Queue` is shutdown. if ya_trio_exited: + # raise `QueueShutDown` on next `Queue.get()` call on + # aio side. chan._to_aio.shutdown() - # pump the other side's task? needed? + # pump this event-loop (well `Runner` but ya) + # + # TODO? is this actually needed? + # -[ ] theory is this let's the aio side error on + # next tick and then we sync task states from + # here onward? await trio.lowlevel.checkpoint() - # from tractor._state import is_root_process - # if is_root_process(): - # breakpoint() - + # TODO? factor the next 2 branches into a func like + # `try_terminate_aio_task()` and use it for the taskc + # case above as well? + fut: asyncio.Future|None = aio_task._fut_waiter if ( - not chan._trio_err + fut and - (fut := aio_task._fut_waiter) + not fut.done() ): - # await trio.lowlevel.checkpoint() - # import pdbp; pdbp.set_trace() - fut.set_exception( - TrioTaskExited( - f'The peer `asyncio` task is still blocking/running?\n' - f'>>\n' - f'|_{aio_task!r}\n' + # await tractor.pause() + if graceful_trio_exit: + fut.set_exception( + TrioTaskExited( + f'the `trio.Task` gracefully exited but ' + f'its `asyncio` peer is not done?\n' + f')>\n' + f' |_{trio_task}\n' + f'\n' + f'>>\n' + f' |_{aio_task!r}\n' + ) + ) + + # TODO? should this need to exist given the equiv + # `TrioCancelled` equivalent in the be handler + # above?? + else: + fut.set_exception( + TrioTaskExited( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) ) - ) else: aio_taskc_warn: str = ( f'\n' f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' ) + # await tractor.pause() report += aio_taskc_warn # TODO XXX, figure out the case where calling this makes the # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` @@ -745,19 +1016,17 @@ async def translate_aio_errors( log.warning(report) - # Required to sync with the far end `asyncio`-task to ensure - # any error is captured (via monkeypatching the - # `channel._aio_err`) before calling ``maybe_raise_aio_err()`` - # below! + # sync with the `asyncio.Task`'s completion to ensure any + # error is captured and relayed (via + # `channel._aio_err/._trio_to_raise`) BEFORE calling + # `maybe_raise_aio_side_err()` below! # - # XXX NOTE XXX the `task.set_exception(aio_taskc)` call above - # MUST NOT EXCEPT or this WILL HANG!! - # - # so if you get a hang maybe step through and figure out why - # it erroed out up there! + # XXX WARNING NOTE + # the `task.set_exception(aio_taskc)` call above MUST NOT + # EXCEPT or this WILL HANG!! SO, if you get a hang maybe step + # through and figure out why it erroed out up there! # if wait_on_aio_task: - # await chan.wait_aio_complete() await chan._aio_task_complete.wait() log.info( 'asyncio-task is done and unblocked trio-side!\n' @@ -767,6 +1036,8 @@ async def translate_aio_errors( # -[ ] make this a channel method, OR # -[ ] just put back inline below? # + # await tractor.pause(shield=True) + # TODO, go back to inlining this.. def maybe_raise_aio_side_err( trio_err: Exception, ) -> None: @@ -778,31 +1049,86 @@ async def translate_aio_errors( ''' aio_err: BaseException|None = chan._aio_err + trio_to_raise: ( + AsyncioCancelled| + AsyncioTaskExited| + None + ) = chan._trio_to_raise + + if not suppress_graceful_exits: + raise trio_to_raise from (aio_err or trio_err) + + if trio_to_raise: + # import pdbp; pdbp.set_trace() + match ( + trio_to_raise, + trio_err, + ): + case ( + AsyncioTaskExited(), + trio.Cancelled()|None, + ): + log.info( + 'Ignoring aio exit signal since trio also exited!' + ) + return + + case ( + AsyncioCancelled(), + trio.Cancelled(), + ): + if not aio_done_before_trio: + log.info( + 'Ignoring aio cancelled signal since trio was also cancelled!' + ) + return + case _: + raise trio_to_raise from (aio_err or trio_err) # Check if the asyncio-side is the cause of the trio-side # error. - if ( + elif ( aio_err is not None and type(aio_err) is not AsyncioCancelled + # and ( + # type(aio_err) is not AsyncioTaskExited + # and + # not ya_trio_exited + # and + # not trio_err + # ) - # not isinstance(aio_err, CancelledError) - # type(aio_err) is not CancelledError + # TODO, case where trio_err is not None and + # aio_err is AsyncioTaskExited => raise eg! + # -[ ] maybe use a match bc this get's real + # complex fast XD + # + # or + # type(aio_err) is not AsyncioTaskExited + # and + # trio_err + # ) ): # always raise from any captured asyncio error if trio_err: raise trio_err from aio_err + # XXX NOTE! above in the `trio.ClosedResourceError` + # handler we specifically set the + # `aio_err = AsyncioCancelled` such that it is raised + # as that special exc here! raise aio_err if trio_err: raise trio_err + # await tractor.pause() # NOTE: if any ``asyncio`` error was caught, raise it here inline # here in the ``trio`` task # if trio_err: maybe_raise_aio_side_err( - trio_err=trio_err + trio_err=to_raise_trio or trio_err ) @@ -829,20 +1155,24 @@ async def run_task( async with translate_aio_errors( chan, wait_on_aio_task=True, + suppress_graceful_exits=chan._suppress_graceful_exits, ): # return single value that is the output from the - # ``asyncio`` function-as-task. Expect the mem chan api to - # do the job of handling cross-framework cancellations + # ``asyncio`` function-as-task. Expect the mem chan api + # to do the job of handling cross-framework cancellations # / errors via closure and translation in the - # ``translate_aio_errors()`` in the above ctx mngr. - return await chan.receive() + # `translate_aio_errors()` in the above ctx mngr. + + return await chan._from_aio.receive() + # return await chan.receive() @acm async def open_channel_from( target: Callable[..., Any], - **kwargs, + suppress_graceful_exits: bool = True, + **target_kwargs, ) -> AsyncIterator[Any]: ''' @@ -854,13 +1184,15 @@ async def open_channel_from( target, qsize=2**8, provide_channels=True, - **kwargs, + suppress_graceful_exits=suppress_graceful_exits, + **target_kwargs, ) # TODO, tuple form here? async with chan._from_aio: async with translate_aio_errors( chan, wait_on_aio_task=True, + suppress_graceful_exits=suppress_graceful_exits, ): # sync to a "started()"-like first delivered value from the # ``asyncio`` task. @@ -873,17 +1205,37 @@ async def open_channel_from( except trio.Cancelled as taskc: # await tractor.pause(shield=True) # ya it worx ;) if cs.cancel_called: - log.cancel( - f'trio-side was manually cancelled by aio side\n' - f'|_c>}}{cs!r}?\n' - ) + if isinstance(chan._trio_to_raise, AsyncioCancelled): + log.cancel( + f'trio-side was manually cancelled by aio side\n' + f'|_c>}}{cs!r}?\n' + ) # TODO, maybe a special `TrioCancelled`??? raise taskc finally: chan._trio_exited = True - chan._to_trio.close() + + # when the aio side is still ongoing but trio exits + # early we signal with a special exc (kinda like + # a `Return`-msg for IPC ctxs) + aio_task: asyncio.Task = chan._aio_task + if not aio_task.done(): + fut: asyncio.Future|None = aio_task._fut_waiter + if fut: + fut.set_exception( + TrioTaskExited( + f'but the child `asyncio` task is still running?\n' + f'>>\n' + f' |_{aio_task!r}\n' + ) + ) + else: + # XXX SHOULD NEVER HAPPEN! + await tractor.pause() + else: + chan._to_trio.close() class AsyncioRuntimeTranslationError(RuntimeError):