Add `maybe_signal_aio_task()` + cause-chain guard

Factor the "deliver an exc to a running aio task" pattern out of
`translate_aio_errors()` + `open_channel_from()` into a shared
`maybe_signal_aio_task()` helper. Add a cause-chain matrix comment
+ relay-echo guard so the final-raise block can't cycle
  `trio_err.__cause__` back onto its own derivative relay.

`maybe_signal_aio_task()`,
- Delivers `exc` via `aio_task._fut_waiter.set_exception()` — NOT
  `aio_task.set_exception()` which on py3.13+ ALWAYS raises
  `RuntimeError("Task does not support set_exception")` (dead code as
  a relay mechanism).
- Returns `(delivered: bool, report: str)`. Caller uses `delivered` to
  flip `wait_on_aio_task` when delivery failed (avoids hanging on
  `_aio_task_complete.wait()`).
- `pre_captured_fut=`: required when the caller crosses a trio
  checkpoint between capturing `_fut_waiter` and invoking the helper.
  `Task._wakeup` clears `_fut_waiter = None` so re-reading
  post-checkpoint loses the ref even though the exc is still in-flight
  on the (now-`done()`) original fut.
- `cause=`: sets `exc.__cause__ = cause` so the relay carries
  a "trio_err -> caused -> relay" chain through `set_exception()`
  → `Task._wakeup` → coro raise → `wait_on_coro_final_result`
  → `signal_trio_when_done` → `task.result()`-raise.
- `allow_cancel_fallback=True`: opt-in `aio_task.cancel()` for the
  narrow case where `_fut_waiter is None` AND task is runnable (sitting
  in asyncio's ready queue, not parked on a poke-able future). NEVER
  cancels when `_fut_waiter` carries an in-flight exc — that would race
  + mask the real terminating exc.

`translate_aio_errors()`,
- Replace the two ad-hoc `_fut_waiter.set_exception()`
  / `aio_task.set_exception()` call sites w/ the helper.
- Capture `pre_cp_fut = aio_task._fut_waiter` BEFORE the post-shutdown
  `trio.lowlevel.checkpoint()` (critical: `_wakeup` clears the ref).
- New "cross-loop cause-chain matrix" comment block on the final-raise
  — tabulates every `(trio_err, aio_err, trio_to_raise)` combo into
  exactly one terminal `raise X [from Y]` or early `return`. Covers the
  sibling `signal_trio_when_done()` resolution + the relay-echo
  INVARIANT.
- New relay-echo guard: if `aio_err` is one of OUR OWN signals
  (`TrioTaskExited`/`TrioCancelled`) AND `aio_err.__cause__ is
  trio_err`, raise the bare `trio_err` instead of `trio_err from
  aio_err` (which would CYCLE the cause chain since the relay was itself
  caused-by `trio_err`).
- Drop the stale "the `task.set_exception(aio_taskc)` call MUST NOT
  EXCEPT or this WILL HANG" warning — the helper handles the failure
  path explicitly via `delivered=False` → `wait_on_aio_task = False`.
- Carry `cause=trio_err` on both the cancel-relay (`TrioCancelled`) and
  the graceful-exit relay (`TrioTaskExited`) so the aio-side traceback
  shows the real root.

`open_channel_from()`,
- Adopt the same helper; drop the dead "SHOULD NEVER GET HERE !?!?"
  + `tractor.pause(shield=True)` panic branch.
- Capture in-flight trio-side exc via `sys.exc_info()[1]` and pass as
  `cause=` — non-`None` only when the `try` body raised (graceful exit
  → None).

Other,
- Top-level import: `sys` (for `sys.exc_info()`).
- `run_as_asyncio_guest()`: add commented-out alt `out: Outcome = await
  trio_done_fute` next to the shielded version — exploratory note for
  the longstanding "why is `.shield()` needed?" TODO.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
trionics.start_or_cancel
Gud Boi 2026-05-29 16:41:37 -04:00
parent 3d9c75b6ed
commit acd1cbeec4
1 changed files with 295 additions and 96 deletions

View File

@ -27,6 +27,7 @@ from contextlib import asynccontextmanager as acm
from dataclasses import dataclass from dataclasses import dataclass
import inspect import inspect
import platform import platform
import sys
import traceback import traceback
from typing import ( from typing import (
Any, Any,
@ -810,6 +811,151 @@ def _run_asyncio_task(
return chan return chan
def maybe_signal_aio_task(
aio_task: asyncio.Task,
exc: BaseException,
*,
cause: BaseException|None = None,
pre_captured_fut: asyncio.Future|None = None,
allow_cancel_fallback: bool = False,
) -> tuple[bool, str]:
'''
Best-effort delivery of `exc` to a still-running `aio_task`
via its `_fut_waiter` (the `asyncio.Future` the task is
currently `await`-ing on).
Returns `(delivered, report)` where `delivered=True` iff
either,
- `fut.set_exception(exc)` was successfully called on an
un-`done()` `_fut_waiter`, OR
- the cancel-fallback path fired (only when the caller
opted-in via `allow_cancel_fallback=True`).
Why `_fut_waiter.set_exception(exc)` and NOT
`aio_task.set_exception(exc)`:
On py3.13+ `asyncio.Task.set_exception()` ALWAYS raises
`RuntimeError("Task does not support set_exception
operation")` — so calling it as a relay mechanism is dead
code. The `_fut_waiter` is a plain `asyncio.Future` and
its `set_exception()` works on all Python versions; the
task's `_wakeup` callback then propagates the exc into
the coro on its next tick.
Why we PREFER NOT to call `aio_task.cancel()`:
`Task.cancel()` injects a `CancelledError` that races
any in-flight exception already queued on `_fut_waiter`
(e.g. via a prior `set_exception()` from a sibling
teardown path). The race can mask BOTH the original
trio-side error and any asyncio-side error the task was
mid-raising. See the
`test_trio_closes_early_and_channel_exits` hang TODO
around the `translate_aio_errors` finally for the
historical artifact.
However a caller may have NO OTHER way to terminate the
task when `_fut_waiter is None` AND the task is busy
looping / runnable, neither `set_exception` nor a chan
close can poke it. In that narrow case `cancel()` is the
only available termination signal; opt-in via
`allow_cancel_fallback=True`. The fallback NEVER runs
when `_fut_waiter` carries an in-flight exc (the
`fut.done()` branch); only when there's truly no
`_fut_waiter` ref to poke.
Pre-checkpoint capture:
`asyncio.Task._wakeup` clears `_fut_waiter = None` as
part of the wakeup sequence. If the caller crosses a
trio checkpoint between fut-capture and this call,
re-reading `aio_task._fut_waiter` will see `None` even
though the exc is still in flight on the (now-`done()`)
original fut. Pass `pre_captured_fut` to use the
already-captured reference.
Causal chaining via `cause`:
Pass the underlying trio-side exc (the *reason* we're
poking the aio side) via `cause` and the helper sets
`exc.__cause__ = cause`. The chain travels with `exc`
through `_fut_waiter.set_exception()` `Task._wakeup`
coro raise `wait_on_coro_final_result`'s except →
`signal_trio_when_done`'s `task.result()`-`raise
aio_err`. The final traceback then renders as
"<trio-side exc> -> (direct cause of) -> <relay exc>"
instead of an opaque, root-cause-detached relay.
See the "cross-loop cause-chain matrix" comment in
`translate_aio_errors()`'s final-raise block for how this
`cause` interacts with every `raise X [from Y]` exit path
(esp. the relay-echo guard which prevents a cause CYCLE).
'''
if cause is not None and exc.__cause__ is None:
exc.__cause__ = cause
if aio_task.done():
return False, (
f'aio-task already done; nothing to signal\n'
f' |_{aio_task!r}\n'
)
fut: asyncio.Future|None = (
pre_captured_fut
if pre_captured_fut is not None
else aio_task._fut_waiter
)
if fut and not fut.done():
fut.set_exception(exc)
return True, (
f'signalled aio-task via `_fut_waiter.set_exception()`\n'
f'exc: {exc!r}\n'
f' |_{aio_task!r}\n'
)
if fut and fut.done():
# NEVER cancel here even when `allow_cancel_fallback=True`
# — the in-flight exc on `fut` will terminate the task
# on its next tick; injecting `CancelledError` on top
# would race and mask the real exc.
return False, (
f'`_fut_waiter` already signalled with,\n'
f' |_{fut.exception()!r}\n'
f'aio-task will exit on next tick via the in-flight exc;\n'
f'SKIPPING re-signal (would race in-flight delivery).\n'
f' |_{aio_task!r}\n'
)
# fut is None — task is runnable (sitting in asyncio's
# ready queue), not parked on a future we can poke.
if allow_cancel_fallback:
cancel_msg: str = (
f'\n'
f'MANUALLY Cancelling `asyncio`-task: '
f'{aio_task.get_name()}!\n\n'
f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n'
)
aio_task.cancel(msg=cancel_msg)
return True, (
f'aio-task has no `_fut_waiter`; FALLBACK cancel issued\n'
f'(caller opted-in via `allow_cancel_fallback=True`).\n'
f'{cancel_msg}'
f' |_{aio_task!r}\n'
)
return False, (
f'aio-task has no `_fut_waiter`; cannot signal without\n'
f'`aio_task.cancel()` which can mask errors.\n'
f'LEAVING AS-IS (caller did NOT opt-in to cancel fallback);\n'
f'task should exit via chan close / aio-loop teardown\n'
f'already in flight.\n'
f' |_{aio_task!r}\n'
)
@acm @acm
async def translate_aio_errors( async def translate_aio_errors(
chan: LinkedTaskChannel, chan: LinkedTaskChannel,
@ -985,38 +1131,25 @@ async def translate_aio_errors(
# if isinstance(chan._aio_err, AsyncioTaskExited): # if isinstance(chan._aio_err, AsyncioTaskExited):
# await tractor.pause(shield=True) # await tractor.pause(shield=True)
# if aio side is still active cancel it due to the trio-side # if aio side is still active relay the trio-side error
# error! # to it via `_fut_waiter.set_exception()`.
# ?TODO, mk `AsyncioCancelled[typeof(trio_err)]` embed the # ?TODO, mk `AsyncioCancelled[typeof(trio_err)]` embed the
# current exc? # current exc?
if (
# not aio_task.cancelled()
# and
not aio_task.done() # TODO? only need this one?
# XXX LOL, so if it's not set it's an error !?
# yet another good jerb by `ascyncio`..
# and
# not aio_task.exception()
):
aio_taskc = TrioCancelled( aio_taskc = TrioCancelled(
f'The `trio`-side task crashed!\n' f'The `trio`-side task crashed!\n'
f'{trio_err}' f'{trio_err}'
) )
# ??TODO? move this into the func that tries to use delivered, report = maybe_signal_aio_task(
# `Task._fut_waiter: Future` instead?? aio_task,
# aio_taskc,
# aio_task.set_exception(aio_taskc) # so the relay carries a "<trio_err> -> caused ->
# wait_on_aio_task = False # TrioCancelled" chain when it eventually re-raises
try: # on the aio side.
aio_task.set_exception(aio_taskc) cause=trio_err,
except ( )
asyncio.InvalidStateError, if not delivered:
RuntimeError,
# ^XXX, uhh bc apparently we can't use `.set_exception()`
# any more XD .. ??
):
wait_on_aio_task = False wait_on_aio_task = False
log.cancel(report)
finally: finally:
# record wtv `trio`-side error transpired # record wtv `trio`-side error transpired
@ -1099,27 +1232,22 @@ async def translate_aio_errors(
if _py_313: if _py_313:
chan._to_aio.shutdown() chan._to_aio.shutdown()
# XXX CRITICAL ordering: capture `_fut_waiter`
# BEFORE the checkpoint. `asyncio.Task._wakeup`
# clears `_fut_waiter = None` as part of wakeup,
# so re-reading after the checkpoint loses the
# ref even though the exc is still in-flight on
# the (now-`done()`) original fut. The helper
# uses `pre_captured_fut` to recover that.
pre_cp_fut: asyncio.Future|None = aio_task._fut_waiter
# pump this event-loop (well `Runner` but ya) # pump this event-loop (well `Runner` but ya)
# # so the aio side can error on next tick and we
# TODO? is this actually needed? # sync task states from here onward.
# -[ ] theory is this let's the aio side error on
# next tick and then we sync task states from
# here onward?
await trio.lowlevel.checkpoint() await trio.lowlevel.checkpoint()
# TODO? factor the next 2 branches into a func like
# `try_terminate_aio_task()` and use it for the taskc
# case above as well?
fut: asyncio.Future|None = aio_task._fut_waiter
if (
fut
and
not fut.done()
):
# await tractor.pause()
if graceful_trio_exit: if graceful_trio_exit:
fut.set_exception( relay_exc = TrioTaskExited(
TrioTaskExited(
f'the `trio.Task` gracefully exited but ' f'the `trio.Task` gracefully exited but '
f'its `asyncio` peer is not done?\n' f'its `asyncio` peer is not done?\n'
f')>\n' f')>\n'
@ -1128,31 +1256,30 @@ async def translate_aio_errors(
f'>>\n' f'>>\n'
f' |_{aio_task!r}\n' f' |_{aio_task!r}\n'
) )
)
# TODO? should this need to exist given the equiv
# `TrioCancelled` equivalent in the be handler
# above??
else: else:
fut.set_exception( relay_exc = TrioTaskExited(
TrioTaskExited(
f'The `trio`-side task crashed!\n' f'The `trio`-side task crashed!\n'
f'{trio_err}' f'{trio_err}'
) )
delivered, signal_report = maybe_signal_aio_task(
aio_task,
relay_exc,
pre_captured_fut=pre_cp_fut,
# XXX historically this branch called
# `aio_task.cancel()` when `_fut_waiter`
# was None — required to actually terminate
# aio tasks that aren't parked on a poke-able
# future (e.g. the `aio_echo_server` loop in
# `test_echoserver_detailed_mechanics`). Opt
# into the fallback so we don't regress.
allow_cancel_fallback=True,
# carry the trio-side exc (if any) as the
# cause so the aio-side relay shows the
# real root-cause chain when re-raised.
cause=trio_err,
) )
else: report += signal_report
aio_taskc_warn: str = (
f'\n'
f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n'
f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n'
)
# await tractor.pause()
report += aio_taskc_warn
# TODO XXX, figure out the case where calling this makes the
# `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits`
# hang and then don't call it in that case!
#
aio_task.cancel(msg=aio_taskc_warn)
log.warning(report) log.warning(report)
@ -1161,10 +1288,11 @@ async def translate_aio_errors(
# `channel._aio_err/._trio_to_raise`) BEFORE calling # `channel._aio_err/._trio_to_raise`) BEFORE calling
# `maybe_raise_aio_side_err()` below! # `maybe_raise_aio_side_err()` below!
# #
# XXX WARNING NOTE # NOTE, `wait_on_aio_task` may have been flipped to `False`
# the `task.set_exception(aio_taskc)` call above MUST NOT # by `maybe_signal_aio_task()` above when delivery
# EXCEPT or this WILL HANG!! SO, if you get a hang maybe step # failed (e.g. `_fut_waiter is None`) — in that case we
# through and figure out why it erroed out up there! # skip the wait since the aio task won't process our
# relay exc and `_aio_task_complete` may never set.
# #
if wait_on_aio_task: if wait_on_aio_task:
await chan._aio_task_complete.wait() await chan._aio_task_complete.wait()
@ -1181,6 +1309,47 @@ async def translate_aio_errors(
- `run_task()` - `run_task()`
''' '''
# ===== cross-loop cause-chain matrix =====
# How `(trio_err, aio_err, trio_to_raise)` resolve into ONE
# terminal `raise X [from Y]` (or an early `return`).
#
# legend (the possible `X` / `Y` operands):
# - trio_err : `chan._trio_err`, the trio-side exc.
# - aio_err : `chan._aio_err`, the aio-side exc.
# - trio_to_raise : `chan._trio_to_raise`, a tractor-chosen
# relay exc (`AsyncioCancelled`/`AsyncioTaskExited`).
# - raise_from : `trio_err if (aio_err is trio_to_raise)
# else aio_err` (the chosen `__cause__`).
# - relay-echo : an `aio_err` that is one of OUR OWN
# `TrioTaskExited|TrioCancelled` signals,
# synth'd + delivered to the aio-side by
# `maybe_signal_aio_task()`; its `__cause__`
# is ALREADY `trio_err`.
# - "(bare)" : raised with NO explicit `from` clause.
#
# this block (final-raise in `translate_aio_errors`):
# condition => raises from
# ----------------------------------- ------------- -----------
# not suppress_graceful_exits => trio_to_raise raise_from
# AsyncioTaskExited + trio Cancelled/None => return (aio-exit ignored)
# AsyncioTaskExited + trio EoC => trio_err (bare)
# AsyncioCancelled + trio Cancelled => return (co-cancel ignored)
# trio_to_raise match catch-all => trio_to_raise raise_from
# aio_err is relay-echo ◄── the GUARD => trio_err (bare)
# aio_err independent (real aio fail) => trio_err aio_err
# aio_err independent, no trio_err => aio_err (bare)
# only trio_err => trio_err (bare)
#
# sibling block (`signal_trio_when_done()`, the aio done-cb):
# AsyncioTaskExited relay-out => trio_to_raise aio_err
# plain aio_err re-raise => aio_err (__cause__ preset)
#
# INVARIANT: a relay-echo must NEVER become `trio_err.__cause__`
# (it's ALREADY caused-BY `trio_err`) → doing so would CYCLE
# (`trio_err ◄─► relay`). So the guard raises the root
# `trio_err` bare; the relay still keeps its own correct
# "relay ◄ trio_err" chain for any aio-side inspection.
# ===== / cross-loop cause-chain matrix =====
aio_err: BaseException|None = chan._aio_err aio_err: BaseException|None = chan._aio_err
trio_to_raise: ( trio_to_raise: (
AsyncioCancelled| AsyncioCancelled|
@ -1237,6 +1406,32 @@ async def translate_aio_errors(
and and
type(aio_err) is not AsyncioCancelled type(aio_err) is not AsyncioCancelled
): ):
# XXX, if `aio_err` is one of OUR OWN relay-signals
# (`TrioTaskExited`/`TrioCancelled`) that we delivered
# to the aio-side via `maybe_signal_aio_task()`, AND
# its `__cause__` already points back at `trio_err`,
# then it's just a derivative ECHO of the trio-side
# error, NOT an independent asyncio failure.
#
# Raising `trio_err from aio_err` here would invert
# (and cyclically tangle) the cause chain since the
# relay was itself caused-by `trio_err`:
#
# trio_err.__cause__ = aio_err (from `raise .. from`)
# aio_err.__cause__ = trio_err (set in `maybe_signal_aio_task`)
#
# So raise the REAL root `trio_err` alone; the relay's
# own `__cause__` chain still correctly reads
# "TrioTaskExited <- trio_err" for aio-side inspection.
if (
trio_err is not None
and
isinstance(aio_err, (TrioTaskExited, TrioCancelled))
and
aio_err.__cause__ is trio_err
):
raise trio_err
# always raise from any captured asyncio error # always raise from any captured asyncio error
if trio_err: if trio_err:
raise trio_err from aio_err raise trio_err from aio_err
@ -1353,19 +1548,22 @@ async def open_channel_from(
# a `Return`-msg for IPC ctxs) # a `Return`-msg for IPC ctxs)
aio_task: asyncio.Task = chan._aio_task aio_task: asyncio.Task = chan._aio_task
if not aio_task.done(): if not aio_task.done():
fut: asyncio.Future|None = aio_task._fut_waiter # capture the in-flight trio-side exc (if any)
if fut: # so the relay's `__cause__` chain shows the
fut.set_exception( # real root cause when the aio task re-raises.
# `sys.exc_info()[1]` is non-`None` only when
# the `try` body raised (graceful exit -> None).
trio_exc: BaseException|None = sys.exc_info()[1]
_, report = maybe_signal_aio_task(
aio_task,
TrioTaskExited( TrioTaskExited(
f'but the child `asyncio` task is still running?\n' f'but the child `asyncio` task is still running?\n'
f'>>\n' f'>>\n'
f' |_{aio_task!r}\n' f' |_{aio_task!r}\n'
),
cause=trio_exc,
) )
) log.cancel(report)
else:
# XXX SHOULD NEVER HAPPEN!
log.error("SHOULD NEVER GET HERE !?!?")
await tractor.pause(shield=True)
else: else:
chan._to_trio.close() chan._to_trio.close()
@ -1602,6 +1800,7 @@ def run_as_asyncio_guest(
fute_err: BaseException|None = None fute_err: BaseException|None = None
try: try:
out: Outcome = await asyncio.shield(trio_done_fute) out: Outcome = await asyncio.shield(trio_done_fute)
# out: Outcome = await trio_done_fute
# ^TODO still don't really understand why the `.shield()` # ^TODO still don't really understand why the `.shield()`
# is required ... ?? # is required ... ??
# https://docs.python.org/3/library/asyncio-task.html#asyncio.shield # https://docs.python.org/3/library/asyncio-task.html#asyncio.shield