diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 8726ad8..465decc 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -491,7 +491,13 @@ async def stream_from_aio( ], ): async for value in chan: - print(f'trio received {value}') + print(f'trio received: {value!r}') + + # XXX, debugging EoC not being handled correctly + # in `transate_aio_errors()`.. + # if value is None: + # await tractor.pause(shield=True) + pulled.append(value) if value == 50: @@ -733,7 +739,13 @@ async def aio_echo_server( to_trio.send_nowait('start') while True: - msg = await from_trio.get() + try: + msg = await from_trio.get() + except to_asyncio.TrioTaskExited: + print( + 'breaking aio echo loop due to `trio` exit!' + ) + break # echo the msg back to_trio.send_nowait(msg) diff --git a/tests/test_root_infect_asyncio.py b/tests/test_root_infect_asyncio.py index 331b631..93deba1 100644 --- a/tests/test_root_infect_asyncio.py +++ b/tests/test_root_infect_asyncio.py @@ -39,7 +39,7 @@ def test_infected_root_actor( ''' async def _trio_main(): - with trio.fail_after(2): + with trio.fail_after(2 if not debug_mode else 999): first: str chan: to_asyncio.LinkedTaskChannel async with ( @@ -59,7 +59,11 @@ def test_infected_root_actor( assert out == i print(f'asyncio echoing {i}') - if raise_error_mid_stream and i == 500: + if ( + raise_error_mid_stream + and + i == 500 + ): raise raise_error_mid_stream if out is None: diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index baef981..7b87be0 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -428,8 +428,7 @@ def _run_asyncio_task( not chan._aio_err ): chan._trio_to_raise = AsyncioTaskExited( - f'Task existed with final result\n' - f'{result!r}\n' + f'Task exited with final result: {result!r}\n' ) # only close the sender side which will relay @@ -741,7 +740,6 @@ async def translate_aio_errors( aio_done_before_trio: bool = aio_task.done() assert aio_task trio_err: BaseException|None = None - to_raise_trio: BaseException|None = None try: yield # back to one of the cross-loop apis except trio.Cancelled as taskc: @@ -777,8 +775,9 @@ async def translate_aio_errors( # called from `LinkedTaskChannel.receive()` which we want # passthrough and further we have no special meaning for it in # terms of relaying errors or signals from the aio side! - except trio.EndOfChannel: - raise + except trio.EndOfChannel as eoc: + trio_err = chan._trio_err = eoc + raise eoc # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # task-done-callback. @@ -824,7 +823,7 @@ async def translate_aio_errors( raise cre except BaseException as _trio_err: - trio_err = chan._trio_err = trio_err + trio_err = chan._trio_err = _trio_err # await tractor.pause(shield=True) # workx! entered: bool = await _debug._maybe_enter_pm( trio_err, @@ -865,25 +864,27 @@ async def translate_aio_errors( f'The `trio`-side task crashed!\n' f'{trio_err}' ) - aio_task.set_exception(aio_taskc) - wait_on_aio_task = False - # try: - # aio_task.set_exception(aio_taskc) - # except ( - # asyncio.InvalidStateError, - # RuntimeError, - # # ^XXX, uhh bc apparently we can't use `.set_exception()` - # # any more XD .. ?? - # ): - # wait_on_aio_task = False + # ??TODO? move this into the func that tries to use + # `Task._fut_waiter: Future` instead?? + # + # aio_task.set_exception(aio_taskc) + # wait_on_aio_task = False + try: + aio_task.set_exception(aio_taskc) + except ( + asyncio.InvalidStateError, + RuntimeError, + # ^XXX, uhh bc apparently we can't use `.set_exception()` + # any more XD .. ?? + ): + wait_on_aio_task = False finally: # record wtv `trio`-side error transpired if trio_err: - if chan._trio_err is not trio_err: - await tractor.pause(shield=True) - - # assert chan._trio_err is trio_err + assert chan._trio_err is trio_err + # if chan._trio_err is not trio_err: + # await tractor.pause(shield=True) ya_trio_exited: bool = chan._trio_exited graceful_trio_exit: bool = ( @@ -1031,104 +1032,83 @@ async def translate_aio_errors( 'asyncio-task is done and unblocked trio-side!\n' ) - # TODO? - # -[ ] make this a channel method, OR - # -[ ] just put back inline below? - # - # await tractor.pause(shield=True) - # TODO, go back to inlining this.. - def maybe_raise_aio_side_err( - trio_err: Exception, - ) -> None: - ''' - Raise any `trio`-side-caused cancellation or legit task - error normally propagated from the caller of either, - - `open_channel_from()` - - `run_task()` + # NOTE, was a `maybe_raise_aio_side_err()` closure that + # i moved inline BP + ''' + Raise any `trio`-side-caused cancellation or legit task + error normally propagated from the caller of either, + - `open_channel_from()` + - `run_task()` - ''' - aio_err: BaseException|None = chan._aio_err - trio_to_raise: ( - AsyncioCancelled| - AsyncioTaskExited| - None - ) = chan._trio_to_raise + ''' + aio_err: BaseException|None = chan._aio_err + trio_to_raise: ( + AsyncioCancelled| + AsyncioTaskExited| + None + ) = chan._trio_to_raise - if not suppress_graceful_exits: - raise trio_to_raise from (aio_err or trio_err) + if not suppress_graceful_exits: + raise trio_to_raise from (aio_err or trio_err) - if trio_to_raise: - # import pdbp; pdbp.set_trace() - match ( - trio_to_raise, - trio_err, + if trio_to_raise: + match ( + trio_to_raise, + trio_err, + ): + case ( + AsyncioTaskExited(), + trio.Cancelled()| + None, ): - case ( - AsyncioTaskExited(), - trio.Cancelled()|None, - ): + log.info( + 'Ignoring aio exit signal since trio also exited!' + ) + return + + case ( + AsyncioTaskExited(), + trio.EndOfChannel(), + ): + raise trio_err + + case ( + AsyncioCancelled(), + trio.Cancelled(), + ): + if not aio_done_before_trio: log.info( - 'Ignoring aio exit signal since trio also exited!' + 'Ignoring aio cancelled signal since trio was also cancelled!' ) return + case _: + raise trio_to_raise from (aio_err or trio_err) - case ( - AsyncioCancelled(), - trio.Cancelled(), - ): - if not aio_done_before_trio: - log.info( - 'Ignoring aio cancelled signal since trio was also cancelled!' - ) - return - case _: - raise trio_to_raise from (aio_err or trio_err) - - # Check if the asyncio-side is the cause of the trio-side - # error. - elif ( - aio_err is not None - and - type(aio_err) is not AsyncioCancelled - # and ( - # type(aio_err) is not AsyncioTaskExited - # and - # not ya_trio_exited - # and - # not trio_err - # ) - - # TODO, case where trio_err is not None and - # aio_err is AsyncioTaskExited => raise eg! - # -[ ] maybe use a match bc this get's real - # complex fast XD - # - # or - # type(aio_err) is not AsyncioTaskExited - # and - # trio_err - # ) - ): - # always raise from any captured asyncio error - if trio_err: - raise trio_err from aio_err - - # XXX NOTE! above in the `trio.ClosedResourceError` - # handler we specifically set the - # `aio_err = AsyncioCancelled` such that it is raised - # as that special exc here! - raise aio_err - + # Check if the asyncio-side is the cause of the trio-side + # error. + elif ( + aio_err is not None + and + type(aio_err) is not AsyncioCancelled + ): + # always raise from any captured asyncio error if trio_err: - raise trio_err + raise trio_err from aio_err - # await tractor.pause() - # NOTE: if any ``asyncio`` error was caught, raise it here inline - # here in the ``trio`` task - # if trio_err: - maybe_raise_aio_side_err( - trio_err=to_raise_trio or trio_err - ) + # XXX NOTE! above in the `trio.ClosedResourceError` + # handler we specifically set the + # `aio_err = AsyncioCancelled` such that it is raised + # as that special exc here! + raise aio_err + + if trio_err: + raise trio_err + + # ^^TODO?? case where trio_err is not None and + # aio_err is AsyncioTaskExited => raise eg! + # -[x] maybe use a match bc this get's real + # complex fast XD + # => i did this above for silent exit cases ya? async def run_task(