diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 8ad2a026..e2b51e17 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -27,6 +27,7 @@ from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect import platform +import sys import traceback from typing import ( Any, @@ -810,6 +811,151 @@ def _run_asyncio_task( return chan +def maybe_signal_aio_task( + aio_task: asyncio.Task, + exc: BaseException, + *, + cause: BaseException|None = None, + pre_captured_fut: asyncio.Future|None = None, + allow_cancel_fallback: bool = False, + +) -> tuple[bool, str]: + ''' + Best-effort delivery of `exc` to a still-running `aio_task` + via its `_fut_waiter` (the `asyncio.Future` the task is + currently `await`-ing on). + + Returns `(delivered, report)` where `delivered=True` iff + either, + - `fut.set_exception(exc)` was successfully called on an + un-`done()` `_fut_waiter`, OR + - the cancel-fallback path fired (only when the caller + opted-in via `allow_cancel_fallback=True`). + + Why `_fut_waiter.set_exception(exc)` and NOT + `aio_task.set_exception(exc)`: + + On py3.13+ `asyncio.Task.set_exception()` ALWAYS raises + `RuntimeError("Task does not support set_exception + operation")` — so calling it as a relay mechanism is dead + code. The `_fut_waiter` is a plain `asyncio.Future` and + its `set_exception()` works on all Python versions; the + task's `_wakeup` callback then propagates the exc into + the coro on its next tick. + + Why we PREFER NOT to call `aio_task.cancel()`: + + `Task.cancel()` injects a `CancelledError` that races + any in-flight exception already queued on `_fut_waiter` + (e.g. via a prior `set_exception()` from a sibling + teardown path). The race can mask BOTH the original + trio-side error and any asyncio-side error the task was + mid-raising. See the + `test_trio_closes_early_and_channel_exits` hang TODO + around the `translate_aio_errors` finally for the + historical artifact. + + However a caller may have NO OTHER way to terminate the + task — when `_fut_waiter is None` AND the task is busy + looping / runnable, neither `set_exception` nor a chan + close can poke it. In that narrow case `cancel()` is the + only available termination signal; opt-in via + `allow_cancel_fallback=True`. The fallback NEVER runs + when `_fut_waiter` carries an in-flight exc (the + `fut.done()` branch); only when there's truly no + `_fut_waiter` ref to poke. + + Pre-checkpoint capture: + + `asyncio.Task._wakeup` clears `_fut_waiter = None` as + part of the wakeup sequence. If the caller crosses a + trio checkpoint between fut-capture and this call, + re-reading `aio_task._fut_waiter` will see `None` even + though the exc is still in flight on the (now-`done()`) + original fut. Pass `pre_captured_fut` to use the + already-captured reference. + + Causal chaining via `cause`: + + Pass the underlying trio-side exc (the *reason* we're + poking the aio side) via `cause` and the helper sets + `exc.__cause__ = cause`. The chain travels with `exc` + through `_fut_waiter.set_exception()` → `Task._wakeup` + → coro raise → `wait_on_coro_final_result`'s except → + `signal_trio_when_done`'s `task.result()`-`raise + aio_err`. The final traceback then renders as + " -> (direct cause of) -> " + instead of an opaque, root-cause-detached relay. + + See the "cross-loop cause-chain matrix" comment in + `translate_aio_errors()`'s final-raise block for how this + `cause` interacts with every `raise X [from Y]` exit path + (esp. the relay-echo guard which prevents a cause CYCLE). + + ''' + if cause is not None and exc.__cause__ is None: + exc.__cause__ = cause + + if aio_task.done(): + return False, ( + f'aio-task already done; nothing to signal\n' + f' |_{aio_task!r}\n' + ) + + fut: asyncio.Future|None = ( + pre_captured_fut + if pre_captured_fut is not None + else aio_task._fut_waiter + ) + + if fut and not fut.done(): + fut.set_exception(exc) + return True, ( + f'signalled aio-task via `_fut_waiter.set_exception()`\n' + f'exc: {exc!r}\n' + f' |_{aio_task!r}\n' + ) + + if fut and fut.done(): + # NEVER cancel here even when `allow_cancel_fallback=True` + # — the in-flight exc on `fut` will terminate the task + # on its next tick; injecting `CancelledError` on top + # would race and mask the real exc. + return False, ( + f'`_fut_waiter` already signalled with,\n' + f' |_{fut.exception()!r}\n' + f'aio-task will exit on next tick via the in-flight exc;\n' + f'SKIPPING re-signal (would race in-flight delivery).\n' + f' |_{aio_task!r}\n' + ) + + # fut is None — task is runnable (sitting in asyncio's + # ready queue), not parked on a future we can poke. + if allow_cancel_fallback: + cancel_msg: str = ( + f'\n' + f'MANUALLY Cancelling `asyncio`-task: ' + f'{aio_task.get_name()}!\n\n' + f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' + ) + aio_task.cancel(msg=cancel_msg) + return True, ( + f'aio-task has no `_fut_waiter`; FALLBACK cancel issued\n' + f'(caller opted-in via `allow_cancel_fallback=True`).\n' + f'{cancel_msg}' + f' |_{aio_task!r}\n' + ) + + return False, ( + f'aio-task has no `_fut_waiter`; cannot signal without\n' + f'`aio_task.cancel()` which can mask errors.\n' + f'LEAVING AS-IS (caller did NOT opt-in to cancel fallback);\n' + f'task should exit via chan close / aio-loop teardown\n' + f'already in flight.\n' + f' |_{aio_task!r}\n' + ) + + @acm async def translate_aio_errors( chan: LinkedTaskChannel, @@ -985,38 +1131,25 @@ async def translate_aio_errors( # if isinstance(chan._aio_err, AsyncioTaskExited): # await tractor.pause(shield=True) - # if aio side is still active cancel it due to the trio-side - # error! + # if aio side is still active relay the trio-side error + # to it via `_fut_waiter.set_exception()`. # ?TODO, mk `AsyncioCancelled[typeof(trio_err)]` embed the # current exc? - if ( - # not aio_task.cancelled() - # and - not aio_task.done() # TODO? only need this one? - - # XXX LOL, so if it's not set it's an error !? - # yet another good jerb by `ascyncio`.. - # and - # not aio_task.exception() - ): - aio_taskc = TrioCancelled( - f'The `trio`-side task crashed!\n' - f'{trio_err}' - ) - # ??TODO? move this into the func that tries to use - # `Task._fut_waiter: Future` instead?? - # - # aio_task.set_exception(aio_taskc) - # wait_on_aio_task = False - try: - aio_task.set_exception(aio_taskc) - except ( - asyncio.InvalidStateError, - RuntimeError, - # ^XXX, uhh bc apparently we can't use `.set_exception()` - # any more XD .. ?? - ): - wait_on_aio_task = False + aio_taskc = TrioCancelled( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) + delivered, report = maybe_signal_aio_task( + aio_task, + aio_taskc, + # so the relay carries a " -> caused -> + # TrioCancelled" chain when it eventually re-raises + # on the aio side. + cause=trio_err, + ) + if not delivered: + wait_on_aio_task = False + log.cancel(report) finally: # record wtv `trio`-side error transpired @@ -1099,60 +1232,54 @@ async def translate_aio_errors( if _py_313: chan._to_aio.shutdown() + # XXX CRITICAL ordering: capture `_fut_waiter` + # BEFORE the checkpoint. `asyncio.Task._wakeup` + # clears `_fut_waiter = None` as part of wakeup, + # so re-reading after the checkpoint loses the + # ref even though the exc is still in-flight on + # the (now-`done()`) original fut. The helper + # uses `pre_captured_fut` to recover that. + pre_cp_fut: asyncio.Future|None = aio_task._fut_waiter + # pump this event-loop (well `Runner` but ya) - # - # TODO? is this actually needed? - # -[ ] theory is this let's the aio side error on - # next tick and then we sync task states from - # here onward? + # so the aio side can error on next tick and we + # sync task states from here onward. await trio.lowlevel.checkpoint() - # TODO? factor the next 2 branches into a func like - # `try_terminate_aio_task()` and use it for the taskc - # case above as well? - fut: asyncio.Future|None = aio_task._fut_waiter - if ( - fut - and - not fut.done() - ): - # await tractor.pause() - if graceful_trio_exit: - fut.set_exception( - TrioTaskExited( - f'the `trio.Task` gracefully exited but ' - f'its `asyncio` peer is not done?\n' - f')>\n' - f' |_{trio_task}\n' - f'\n' - f'>>\n' - f' |_{aio_task!r}\n' - ) - ) - - # TODO? should this need to exist given the equiv - # `TrioCancelled` equivalent in the be handler - # above?? - else: - fut.set_exception( - TrioTaskExited( - f'The `trio`-side task crashed!\n' - f'{trio_err}' - ) - ) - else: - aio_taskc_warn: str = ( + if graceful_trio_exit: + relay_exc = TrioTaskExited( + f'the `trio.Task` gracefully exited but ' + f'its `asyncio` peer is not done?\n' + f')>\n' + f' |_{trio_task}\n' f'\n' - f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' - f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' + f'>>\n' + f' |_{aio_task!r}\n' ) - # await tractor.pause() - report += aio_taskc_warn - # TODO XXX, figure out the case where calling this makes the - # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` - # hang and then don't call it in that case! - # - aio_task.cancel(msg=aio_taskc_warn) + else: + relay_exc = TrioTaskExited( + f'The `trio`-side task crashed!\n' + f'{trio_err}' + ) + + delivered, signal_report = maybe_signal_aio_task( + aio_task, + relay_exc, + pre_captured_fut=pre_cp_fut, + # XXX historically this branch called + # `aio_task.cancel()` when `_fut_waiter` + # was None — required to actually terminate + # aio tasks that aren't parked on a poke-able + # future (e.g. the `aio_echo_server` loop in + # `test_echoserver_detailed_mechanics`). Opt + # into the fallback so we don't regress. + allow_cancel_fallback=True, + # carry the trio-side exc (if any) as the + # cause so the aio-side relay shows the + # real root-cause chain when re-raised. + cause=trio_err, + ) + report += signal_report log.warning(report) @@ -1161,10 +1288,11 @@ async def translate_aio_errors( # `channel._aio_err/._trio_to_raise`) BEFORE calling # `maybe_raise_aio_side_err()` below! # - # XXX WARNING NOTE - # the `task.set_exception(aio_taskc)` call above MUST NOT - # EXCEPT or this WILL HANG!! SO, if you get a hang maybe step - # through and figure out why it erroed out up there! + # NOTE, `wait_on_aio_task` may have been flipped to `False` + # by `maybe_signal_aio_task()` above when delivery + # failed (e.g. `_fut_waiter is None`) — in that case we + # skip the wait since the aio task won't process our + # relay exc and `_aio_task_complete` may never set. # if wait_on_aio_task: await chan._aio_task_complete.wait() @@ -1181,6 +1309,47 @@ async def translate_aio_errors( - `run_task()` ''' + # ===== cross-loop cause-chain matrix ===== + # How `(trio_err, aio_err, trio_to_raise)` resolve into ONE + # terminal `raise X [from Y]` (or an early `return`). + # + # legend (the possible `X` / `Y` operands): + # - trio_err : `chan._trio_err`, the trio-side exc. + # - aio_err : `chan._aio_err`, the aio-side exc. + # - trio_to_raise : `chan._trio_to_raise`, a tractor-chosen + # relay exc (`AsyncioCancelled`/`AsyncioTaskExited`). + # - raise_from : `trio_err if (aio_err is trio_to_raise) + # else aio_err` (the chosen `__cause__`). + # - relay-echo : an `aio_err` that is one of OUR OWN + # `TrioTaskExited|TrioCancelled` signals, + # synth'd + delivered to the aio-side by + # `maybe_signal_aio_task()`; its `__cause__` + # is ALREADY `trio_err`. + # - "(bare)" : raised with NO explicit `from` clause. + # + # this block (final-raise in `translate_aio_errors`): + # condition => raises from + # ----------------------------------- ------------- ----------- + # not suppress_graceful_exits => trio_to_raise raise_from + # AsyncioTaskExited + trio Cancelled/None => return (aio-exit ignored) + # AsyncioTaskExited + trio EoC => trio_err (bare) + # AsyncioCancelled + trio Cancelled => return (co-cancel ignored) + # trio_to_raise match catch-all => trio_to_raise raise_from + # aio_err is relay-echo ◄── the GUARD => trio_err (bare) + # aio_err independent (real aio fail) => trio_err aio_err + # aio_err independent, no trio_err => aio_err (bare) + # only trio_err => trio_err (bare) + # + # sibling block (`signal_trio_when_done()`, the aio done-cb): + # AsyncioTaskExited relay-out => trio_to_raise aio_err + # plain aio_err re-raise => aio_err (__cause__ preset) + # + # INVARIANT: a relay-echo must NEVER become `trio_err.__cause__` + # (it's ALREADY caused-BY `trio_err`) → doing so would CYCLE + # (`trio_err ◄─► relay`). So the guard raises the root + # `trio_err` bare; the relay still keeps its own correct + # "relay ◄ trio_err" chain for any aio-side inspection. + # ===== / cross-loop cause-chain matrix ===== aio_err: BaseException|None = chan._aio_err trio_to_raise: ( AsyncioCancelled| @@ -1237,6 +1406,32 @@ async def translate_aio_errors( and type(aio_err) is not AsyncioCancelled ): + # XXX, if `aio_err` is one of OUR OWN relay-signals + # (`TrioTaskExited`/`TrioCancelled`) that we delivered + # to the aio-side via `maybe_signal_aio_task()`, AND + # its `__cause__` already points back at `trio_err`, + # then it's just a derivative ECHO of the trio-side + # error, NOT an independent asyncio failure. + # + # Raising `trio_err from aio_err` here would invert + # (and cyclically tangle) the cause chain since the + # relay was itself caused-by `trio_err`: + # + # trio_err.__cause__ = aio_err (from `raise .. from`) + # aio_err.__cause__ = trio_err (set in `maybe_signal_aio_task`) + # + # So raise the REAL root `trio_err` alone; the relay's + # own `__cause__` chain still correctly reads + # "TrioTaskExited <- trio_err" for aio-side inspection. + if ( + trio_err is not None + and + isinstance(aio_err, (TrioTaskExited, TrioCancelled)) + and + aio_err.__cause__ is trio_err + ): + raise trio_err + # always raise from any captured asyncio error if trio_err: raise trio_err from aio_err @@ -1353,19 +1548,22 @@ async def open_channel_from( # a `Return`-msg for IPC ctxs) aio_task: asyncio.Task = chan._aio_task if not aio_task.done(): - fut: asyncio.Future|None = aio_task._fut_waiter - if fut: - fut.set_exception( - TrioTaskExited( - f'but the child `asyncio` task is still running?\n' - f'>>\n' - f' |_{aio_task!r}\n' - ) - ) - else: - # XXX SHOULD NEVER HAPPEN! - log.error("SHOULD NEVER GET HERE !?!?") - await tractor.pause(shield=True) + # capture the in-flight trio-side exc (if any) + # so the relay's `__cause__` chain shows the + # real root cause when the aio task re-raises. + # `sys.exc_info()[1]` is non-`None` only when + # the `try` body raised (graceful exit -> None). + trio_exc: BaseException|None = sys.exc_info()[1] + _, report = maybe_signal_aio_task( + aio_task, + TrioTaskExited( + f'but the child `asyncio` task is still running?\n' + f'>>\n' + f' |_{aio_task!r}\n' + ), + cause=trio_exc, + ) + log.cancel(report) else: chan._to_trio.close() @@ -1602,6 +1800,7 @@ def run_as_asyncio_guest( fute_err: BaseException|None = None try: out: Outcome = await asyncio.shield(trio_done_fute) + # out: Outcome = await trio_done_fute # ^TODO still don't really understand why the `.shield()` # is required ... ?? # https://docs.python.org/3/library/asyncio-task.html#asyncio.shield