diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 63c20bc1..a9096f7f 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -130,6 +130,7 @@ class LinkedTaskChannel( _trio_task: trio.Task _aio_task_complete: trio.Event + _closed_by_aio_task: bool = False _suppress_graceful_exits: bool = True _trio_err: BaseException|None = None @@ -242,6 +243,7 @@ class LinkedTaskChannel( # cycle on the trio side? # await trio.lowlevel.checkpoint() return await self._from_aio.receive() + except BaseException as err: async with translate_aio_errors( chan=self, @@ -319,7 +321,7 @@ def _run_asyncio_task( qsize: int = 1, provide_channels: bool = False, suppress_graceful_exits: bool = True, - hide_tb: bool = False, + hide_tb: bool = True, **kwargs, ) -> LinkedTaskChannel: @@ -445,9 +447,15 @@ def _run_asyncio_task( f'Task exited with final result: {result!r}\n' ) - # only close the sender side which will relay - # a `trio.EndOfChannel` to the trio (consumer) side. + # only close the aio (child) side which will relay + # a `trio.EndOfChannel` to the trio (parent) side. + # + # XXX NOTE, that trio-side MUST then in such cases + # check for a `chan._aio_err` and raise it!! to_trio.close() + # specially mark the closure as due to the + # asyncio.Task terminating! + chan._closed_by_aio_task = True aio_task_complete.set() log.runtime( @@ -645,8 +653,9 @@ def _run_asyncio_task( not trio_cs.cancel_called ): log.cancel( - f'Cancelling `trio` side due to aio-side src exc\n' - f'{curr_aio_err}\n' + f'Cancelling trio-side due to aio-side src exc\n' + f'\n' + f'{curr_aio_err!r}\n' f'\n' f'(c>\n' f' |_{trio_task}\n' @@ -758,6 +767,7 @@ async def translate_aio_errors( aio_done_before_trio: bool = aio_task.done() assert aio_task trio_err: BaseException|None = None + eoc: trio.EndOfChannel|None = None try: yield # back to one of the cross-loop apis except trio.Cancelled as taskc: @@ -789,12 +799,50 @@ async def translate_aio_errors( # ) # raise - # XXX always passthrough EoC since this translator is often - # called from `LinkedTaskChannel.receive()` which we want - # passthrough and further we have no special meaning for it in - # terms of relaying errors or signals from the aio side! - except trio.EndOfChannel as eoc: - trio_err = chan._trio_err = eoc + # XXX EoC is a special SIGNAL from the aio-side here! + # There are 2 cases to handle: + # 1. the "EoC passthrough" case. + # - the aio-task actually closed the channel "gracefully" and + # the trio-task should unwind any ongoing channel + # iteration/receiving, + # |_this exc-translator wraps calls to `LinkedTaskChannel.receive()` + # in which case we want to relay the actual "end-of-chan" for + # iteration purposes. + # + # 2. relaying the "asyncio.Task termination" case. + # - if the aio-task terminates, maybe with an error, AND the + # `open_channel_from()` API was used, it will always signal + # that termination. + # |_`wait_on_coro_final_result()` always calls + # `to_trio.close()` when `provide_channels=True` so we need to + # always check if there is an aio-side exc which needs to be + # relayed to the parent trio side! + # |_in this case the special `chan._closed_by_aio_task` is + # ALWAYS set. + # + except trio.EndOfChannel as _eoc: + eoc = _eoc + if ( + chan._closed_by_aio_task + and + aio_err + ): + log.cancel( + f'The asyncio-child task terminated due to error\n' + f'{aio_err!r}\n' + ) + chan._trio_to_raise = aio_err + trio_err = chan._trio_err = eoc + # + # await tractor.pause(shield=True) + # + # ?TODO?, raise something like a, + # chan._trio_to_raise = AsyncioErrored() + # BUT, with the tb rewritten to reflect the underlying + # call stack? + else: + trio_err = chan._trio_err = eoc + raise eoc # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio @@ -1047,7 +1095,7 @@ async def translate_aio_errors( # if wait_on_aio_task: await chan._aio_task_complete.wait() - log.info( + log.debug( 'asyncio-task is done and unblocked trio-side!\n' ) @@ -1064,11 +1112,17 @@ async def translate_aio_errors( trio_to_raise: ( AsyncioCancelled| AsyncioTaskExited| + Exception| # relayed from aio-task None ) = chan._trio_to_raise + raise_from: Exception = ( + trio_err if (aio_err is trio_to_raise) + else aio_err + ) + if not suppress_graceful_exits: - raise trio_to_raise from (aio_err or trio_err) + raise trio_to_raise from raise_from if trio_to_raise: match ( @@ -1101,7 +1155,7 @@ async def translate_aio_errors( ) return case _: - raise trio_to_raise from (aio_err or trio_err) + raise trio_to_raise from raise_from # Check if the asyncio-side is the cause of the trio-side # error. @@ -1167,7 +1221,6 @@ async def run_task( @acm async def open_channel_from( - target: Callable[..., Any], suppress_graceful_exits: bool = True, **target_kwargs, @@ -1201,7 +1254,6 @@ async def open_channel_from( # deliver stream handle upward yield first, chan except trio.Cancelled as taskc: - # await tractor.pause(shield=True) # ya it worx ;) if cs.cancel_called: if isinstance(chan._trio_to_raise, AsyncioCancelled): log.cancel(