Be extra sure to re-raise EoCs from translator

That is whenever `trio.EndOfChannel` is raised (presumably from the
`._to_trio.receive()` call inside `LinkedTaskChannel.receive()`) we need
to be extra certain that we let it bubble upward transparently DESPITE
special exc-as-signal handling that is normally suppressed from the aio
side; REPEAT we want to ALWAYS bubble any `trio_err ==
trio.EndOfChannel` in the `finally:` handler of `translate_aio_errors()`
despite `chan._trio_to_raise == AsyncioTaskExited` such that the
caller's iterable machinery will operate as normal when the inter-task
stream is stopped (again, presumably by the aio side task terminating
the inter-task stream).

Main impl deats for this,
- in the EoC handler block ensure we assign both `chan._trio_err` and
  the local `trio_err` as well as continue to re-raise.
- add a case to the match block in the `finally:` handler which FOR SURE
  re-raises any `type(trio_err) is EndOfChannel`!

Additionally fix a bad bug,
- a ref bug where we were NOT using the
  `except BaseException as _trio_err` to assign to `chan._trio_err` (by
  accident was missing the leading `_`..)

Unrelated impl tweak,
- move all `maybe_raise_aio_side_err()` content back to inline with its
  parent func - makes it easier to use `tractor.pause()` mostly Bp
- go back to trying to use `aio_task.set_exception(aio_taskc)` for now
  even though i'm pretty sure we're going to move to a try-fute-first
  style helper for this in the future.

Adjust some tests to match/mk-them-green,
- break from `aio_echo_server()` recv loop on
  `to_asyncio.TrioTaskExited` much like how you'd expect to (implicitly
  with a `for`) with a `trio.EndOfChannel`.
- toss in a masked `value is None` pause point i needed for debugging
  inf looping caused by not re-raising EoCs per the main patch
  description.
- add a debug-mode sized delay to root-infected test.
py313_support
Tyler Goodlet 2025-03-03 21:50:51 -05:00
parent 986ada2ce9
commit b93e0cb846
3 changed files with 109 additions and 113 deletions

View File

@ -491,7 +491,13 @@ async def stream_from_aio(
], ],
): ):
async for value in chan: async for value in chan:
print(f'trio received {value}') print(f'trio received: {value!r}')
# XXX, debugging EoC not being handled correctly
# in `transate_aio_errors()`..
# if value is None:
# await tractor.pause(shield=True)
pulled.append(value) pulled.append(value)
if value == 50: if value == 50:
@ -733,7 +739,13 @@ async def aio_echo_server(
to_trio.send_nowait('start') to_trio.send_nowait('start')
while True: while True:
try:
msg = await from_trio.get() msg = await from_trio.get()
except to_asyncio.TrioTaskExited:
print(
'breaking aio echo loop due to `trio` exit!'
)
break
# echo the msg back # echo the msg back
to_trio.send_nowait(msg) to_trio.send_nowait(msg)

View File

@ -39,7 +39,7 @@ def test_infected_root_actor(
''' '''
async def _trio_main(): async def _trio_main():
with trio.fail_after(2): with trio.fail_after(2 if not debug_mode else 999):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
async with ( async with (
@ -59,7 +59,11 @@ def test_infected_root_actor(
assert out == i assert out == i
print(f'asyncio echoing {i}') print(f'asyncio echoing {i}')
if raise_error_mid_stream and i == 500: if (
raise_error_mid_stream
and
i == 500
):
raise raise_error_mid_stream raise raise_error_mid_stream
if out is None: if out is None:

View File

@ -428,8 +428,7 @@ def _run_asyncio_task(
not chan._aio_err not chan._aio_err
): ):
chan._trio_to_raise = AsyncioTaskExited( chan._trio_to_raise = AsyncioTaskExited(
f'Task existed with final result\n' f'Task exited with final result: {result!r}\n'
f'{result!r}\n'
) )
# only close the sender side which will relay # only close the sender side which will relay
@ -741,7 +740,6 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done() aio_done_before_trio: bool = aio_task.done()
assert aio_task assert aio_task
trio_err: BaseException|None = None trio_err: BaseException|None = None
to_raise_trio: BaseException|None = None
try: try:
yield # back to one of the cross-loop apis yield # back to one of the cross-loop apis
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
@ -777,8 +775,9 @@ async def translate_aio_errors(
# called from `LinkedTaskChannel.receive()` which we want # called from `LinkedTaskChannel.receive()` which we want
# passthrough and further we have no special meaning for it in # passthrough and further we have no special meaning for it in
# terms of relaying errors or signals from the aio side! # terms of relaying errors or signals from the aio side!
except trio.EndOfChannel: except trio.EndOfChannel as eoc:
raise trio_err = chan._trio_err = eoc
raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
# task-done-callback. # task-done-callback.
@ -824,7 +823,7 @@ async def translate_aio_errors(
raise cre raise cre
except BaseException as _trio_err: except BaseException as _trio_err:
trio_err = chan._trio_err = trio_err trio_err = chan._trio_err = _trio_err
# await tractor.pause(shield=True) # workx! # await tractor.pause(shield=True) # workx!
entered: bool = await _debug._maybe_enter_pm( entered: bool = await _debug._maybe_enter_pm(
trio_err, trio_err,
@ -865,25 +864,27 @@ async def translate_aio_errors(
f'The `trio`-side task crashed!\n' f'The `trio`-side task crashed!\n'
f'{trio_err}' f'{trio_err}'
) )
aio_task.set_exception(aio_taskc) # ??TODO? move this into the func that tries to use
wait_on_aio_task = False # `Task._fut_waiter: Future` instead??
# try: #
# aio_task.set_exception(aio_taskc) # aio_task.set_exception(aio_taskc)
# except (
# asyncio.InvalidStateError,
# RuntimeError,
# # ^XXX, uhh bc apparently we can't use `.set_exception()`
# # any more XD .. ??
# ):
# wait_on_aio_task = False # wait_on_aio_task = False
try:
aio_task.set_exception(aio_taskc)
except (
asyncio.InvalidStateError,
RuntimeError,
# ^XXX, uhh bc apparently we can't use `.set_exception()`
# any more XD .. ??
):
wait_on_aio_task = False
finally: finally:
# record wtv `trio`-side error transpired # record wtv `trio`-side error transpired
if trio_err: if trio_err:
if chan._trio_err is not trio_err: assert chan._trio_err is trio_err
await tractor.pause(shield=True) # if chan._trio_err is not trio_err:
# await tractor.pause(shield=True)
# assert chan._trio_err is trio_err
ya_trio_exited: bool = chan._trio_exited ya_trio_exited: bool = chan._trio_exited
graceful_trio_exit: bool = ( graceful_trio_exit: bool = (
@ -1031,15 +1032,8 @@ async def translate_aio_errors(
'asyncio-task is done and unblocked trio-side!\n' 'asyncio-task is done and unblocked trio-side!\n'
) )
# TODO? # NOTE, was a `maybe_raise_aio_side_err()` closure that
# -[ ] make this a channel method, OR # i moved inline BP
# -[ ] just put back inline below?
#
# await tractor.pause(shield=True)
# TODO, go back to inlining this..
def maybe_raise_aio_side_err(
trio_err: Exception,
) -> None:
''' '''
Raise any `trio`-side-caused cancellation or legit task Raise any `trio`-side-caused cancellation or legit task
error normally propagated from the caller of either, error normally propagated from the caller of either,
@ -1058,20 +1052,26 @@ async def translate_aio_errors(
raise trio_to_raise from (aio_err or trio_err) raise trio_to_raise from (aio_err or trio_err)
if trio_to_raise: if trio_to_raise:
# import pdbp; pdbp.set_trace()
match ( match (
trio_to_raise, trio_to_raise,
trio_err, trio_err,
): ):
case ( case (
AsyncioTaskExited(), AsyncioTaskExited(),
trio.Cancelled()|None, trio.Cancelled()|
None,
): ):
log.info( log.info(
'Ignoring aio exit signal since trio also exited!' 'Ignoring aio exit signal since trio also exited!'
) )
return return
case (
AsyncioTaskExited(),
trio.EndOfChannel(),
):
raise trio_err
case ( case (
AsyncioCancelled(), AsyncioCancelled(),
trio.Cancelled(), trio.Cancelled(),
@ -1090,24 +1090,6 @@ async def translate_aio_errors(
aio_err is not None aio_err is not None
and and
type(aio_err) is not AsyncioCancelled type(aio_err) is not AsyncioCancelled
# and (
# type(aio_err) is not AsyncioTaskExited
# and
# not ya_trio_exited
# and
# not trio_err
# )
# TODO, case where trio_err is not None and
# aio_err is AsyncioTaskExited => raise eg!
# -[ ] maybe use a match bc this get's real
# complex fast XD
#
# or
# type(aio_err) is not AsyncioTaskExited
# and
# trio_err
# )
): ):
# always raise from any captured asyncio error # always raise from any captured asyncio error
if trio_err: if trio_err:
@ -1122,13 +1104,11 @@ async def translate_aio_errors(
if trio_err: if trio_err:
raise trio_err raise trio_err
# await tractor.pause() # ^^TODO?? case where trio_err is not None and
# NOTE: if any ``asyncio`` error was caught, raise it here inline # aio_err is AsyncioTaskExited => raise eg!
# here in the ``trio`` task # -[x] maybe use a match bc this get's real
# if trio_err: # complex fast XD
maybe_raise_aio_side_err( # => i did this above for silent exit cases ya?
trio_err=to_raise_trio or trio_err
)
async def run_task( async def run_task(