Relay `asyncio` errors via EoC and raise from rent

Makes the newly added `test_aio_side_raises_before_started` test pass by
ensuring errors raised by any `.to_asyncio.open_channel_from()` spawned
child-`asyncio.Task` are relayed by any caught `trio.EndOfChannel` by
checking for a new `LinkedTaskChannel._closed_by_aio_task: bool`.

Impl deats,
- obvi add `LinkedTaskChannel._closed_by_aio_task: bool = False`
- in `translate_aio_errors()` always check for the new flag on EOC
  conditions and in such cases set `chan._trio_to_raise = aio_err` such
  that the `trio`-parent-task always raises the child's exception
  directly, OW keep original EoC passthrough in place.
- include *very* detailed per-case comments around the extended handler.
- adjust re-raising logic with a new `raise_from` where we only give the
  `aio_err` priority if it's not already set as to `trio_to_raise`.

Also,
- hide the `_run_asyncio_task()` frame by def.
to_asyncio_eoc_signal
Tyler Goodlet 2025-07-29 14:30:42 -04:00
parent 808dd9d73c
commit 466dce8aed
1 changed files with 68 additions and 16 deletions

View File

@ -130,6 +130,7 @@ class LinkedTaskChannel(
_trio_task: trio.Task _trio_task: trio.Task
_aio_task_complete: trio.Event _aio_task_complete: trio.Event
_closed_by_aio_task: bool = False
_suppress_graceful_exits: bool = True _suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None _trio_err: BaseException|None = None
@ -242,6 +243,7 @@ class LinkedTaskChannel(
# cycle on the trio side? # cycle on the trio side?
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
return await self._from_aio.receive() return await self._from_aio.receive()
except BaseException as err: except BaseException as err:
async with translate_aio_errors( async with translate_aio_errors(
chan=self, chan=self,
@ -319,7 +321,7 @@ def _run_asyncio_task(
qsize: int = 1, qsize: int = 1,
provide_channels: bool = False, provide_channels: bool = False,
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
hide_tb: bool = False, hide_tb: bool = True,
**kwargs, **kwargs,
) -> LinkedTaskChannel: ) -> LinkedTaskChannel:
@ -445,9 +447,15 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n' f'Task exited with final result: {result!r}\n'
) )
# only close the sender side which will relay # only close the aio (child) side which will relay
# a `trio.EndOfChannel` to the trio (consumer) side. # a `trio.EndOfChannel` to the trio (parent) side.
#
# XXX NOTE, that trio-side MUST then in such cases
# check for a `chan._aio_err` and raise it!!
to_trio.close() to_trio.close()
# specially mark the closure as due to the
# asyncio.Task terminating!
chan._closed_by_aio_task = True
aio_task_complete.set() aio_task_complete.set()
log.runtime( log.runtime(
@ -645,8 +653,9 @@ def _run_asyncio_task(
not trio_cs.cancel_called not trio_cs.cancel_called
): ):
log.cancel( log.cancel(
f'Cancelling `trio` side due to aio-side src exc\n' f'Cancelling trio-side due to aio-side src exc\n'
f'{curr_aio_err}\n' f'\n'
f'{curr_aio_err!r}\n'
f'\n' f'\n'
f'(c>\n' f'(c>\n'
f' |_{trio_task}\n' f' |_{trio_task}\n'
@ -758,6 +767,7 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done() aio_done_before_trio: bool = aio_task.done()
assert aio_task assert aio_task
trio_err: BaseException|None = None trio_err: BaseException|None = None
eoc: trio.EndOfChannel|None = None
try: try:
yield # back to one of the cross-loop apis yield # back to one of the cross-loop apis
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
@ -789,12 +799,50 @@ async def translate_aio_errors(
# ) # )
# raise # raise
# XXX always passthrough EoC since this translator is often # XXX EoC is a special SIGNAL from the aio-side here!
# called from `LinkedTaskChannel.receive()` which we want # There are 2 cases to handle:
# passthrough and further we have no special meaning for it in # 1. the "EoC passthrough" case.
# terms of relaying errors or signals from the aio side! # - the aio-task actually closed the channel "gracefully" and
except trio.EndOfChannel as eoc: # the trio-task should unwind any ongoing channel
trio_err = chan._trio_err = eoc # iteration/receiving,
# |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
# in which case we want to relay the actual "end-of-chan" for
# iteration purposes.
#
# 2. relaying the "asyncio.Task termination" case.
# - if the aio-task terminates, maybe with an error, AND the
# `open_channel_from()` API was used, it will always signal
# that termination.
# |_`wait_on_coro_final_result()` always calls
# `to_trio.close()` when `provide_channels=True` so we need to
# always check if there is an aio-side exc which needs to be
# relayed to the parent trio side!
# |_in this case the special `chan._closed_by_aio_task` is
# ALWAYS set.
#
except trio.EndOfChannel as _eoc:
eoc = _eoc
if (
chan._closed_by_aio_task
and
aio_err
):
log.cancel(
f'The asyncio-child task terminated due to error\n'
f'{aio_err!r}\n'
)
chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc
#
# await tractor.pause(shield=True)
#
# ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying
# call stack?
else:
trio_err = chan._trio_err = eoc
raise eoc raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
@ -1047,7 +1095,7 @@ async def translate_aio_errors(
# #
if wait_on_aio_task: if wait_on_aio_task:
await chan._aio_task_complete.wait() await chan._aio_task_complete.wait()
log.info( log.debug(
'asyncio-task is done and unblocked trio-side!\n' 'asyncio-task is done and unblocked trio-side!\n'
) )
@ -1064,11 +1112,17 @@ async def translate_aio_errors(
trio_to_raise: ( trio_to_raise: (
AsyncioCancelled| AsyncioCancelled|
AsyncioTaskExited| AsyncioTaskExited|
Exception| # relayed from aio-task
None None
) = chan._trio_to_raise ) = chan._trio_to_raise
raise_from: Exception = (
trio_err if (aio_err is trio_to_raise)
else aio_err
)
if not suppress_graceful_exits: if not suppress_graceful_exits:
raise trio_to_raise from (aio_err or trio_err) raise trio_to_raise from raise_from
if trio_to_raise: if trio_to_raise:
match ( match (
@ -1101,7 +1155,7 @@ async def translate_aio_errors(
) )
return return
case _: case _:
raise trio_to_raise from (aio_err or trio_err) raise trio_to_raise from raise_from
# Check if the asyncio-side is the cause of the trio-side # Check if the asyncio-side is the cause of the trio-side
# error. # error.
@ -1167,7 +1221,6 @@ async def run_task(
@acm @acm
async def open_channel_from( async def open_channel_from(
target: Callable[..., Any], target: Callable[..., Any],
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
**target_kwargs, **target_kwargs,
@ -1201,7 +1254,6 @@ async def open_channel_from(
# deliver stream handle upward # deliver stream handle upward
yield first, chan yield first, chan
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
# await tractor.pause(shield=True) # ya it worx ;)
if cs.cancel_called: if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled): if isinstance(chan._trio_to_raise, AsyncioCancelled):
log.cancel( log.cancel(