diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index d5f78ca..3f8d20d 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -342,20 +342,29 @@ def _run_asyncio_task( '`trio` received final result from {task}\n' f'|_{res}\n' ) - except BaseException as terr: - task_err: BaseException = terr + except BaseException as _aio_err: + task_err: BaseException = _aio_err # read again AFTER the `asyncio` side errors in case # it was cancelled due to an error from `trio` (or # some other out of band exc). aio_err: BaseException|None = chan._aio_err + # always true right? + assert ( + type(_aio_err) is type(aio_err) + ), ( + f'`asyncio`-side task errors mismatch?!?\n\n' + f'caught: {_aio_err}\n' + f'chan._aio_err: {aio_err}\n' + ) + msg: str = ( '`trio`-side reports that the `asyncio`-side ' '{etype_str}\n' # ^NOTE filled in below ) - if isinstance(terr, CancelledError): + if isinstance(_aio_err, CancelledError): msg += ( f'c)>\n' f' |_{task}\n' @@ -372,9 +381,6 @@ def _run_asyncio_task( msg.format(etype_str='errored') ) - assert ( - type(terr) is type(aio_err) - ), '`asyncio` task error mismatch?!?' if aio_err is not None: # import pdbp; pdbp.set_trace() @@ -394,7 +400,7 @@ def _run_asyncio_task( # aio_err.with_traceback(aio_err.__traceback__) # TODO: show when cancellation originated - # from each side more pedantically? + # from each side more pedantically in log-msg? # elif ( # type(aio_err) is CancelledError # and # trio was the cause? @@ -429,6 +435,19 @@ def _run_asyncio_task( return chan +class TrioTaskExited(AsyncioCancelled): + ''' + The `trio`-side task exited without explicitly cancelling the + `asyncio.Task` peer. + + This is very similar to how `trio.ClosedResource` acts as + a "clean shutdown" signal to the consumer side of a mem-chan, + + https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels + + ''' + + @acm async def translate_aio_errors( chan: LinkedTaskChannel, @@ -455,10 +474,11 @@ async def translate_aio_errors( trio_err: BaseException|None = None try: yield # back to one of the cross-loop apis - except ( - trio.Cancelled, - ) as _trio_err: - trio_err = _trio_err + except trio.Cancelled as taskc: + trio_err = taskc + + # should NEVER be the case that `trio` is cancel-handling + # BEFORE the other side's task-ref was set!? assert chan._aio_task # import pdbp; pdbp.set_trace() # lolevel-debug @@ -483,14 +503,13 @@ async def translate_aio_errors( # ) # raise + # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio + # task-done-callback. except ( - # NOTE: also see note in the `cancel_trio()` asyncio task - # termination callback trio.ClosedResourceError, # trio.BrokenResourceError, - - ) as _trio_err: - trio_err = _trio_err + ) as cre: + trio_err = cre aio_err = chan._aio_err # import pdbp; pdbp.set_trace() @@ -498,10 +517,21 @@ async def translate_aio_errors( # this channel close, raise our (non-`BaseException`) wrapper # exception (`AsyncioCancelled`) from that source error. if ( - # NOTE, not until it terminates? - aio_task.cancelled() + # aio-side is cancelled? + aio_task.cancelled() # not set until it terminates?? and type(aio_err) is CancelledError + + # TODO, if we want suppression of the + # silent-exit-by-`trio` case? + # -[ ] the parent task can also just catch it though? + # -[ ] OR, offer a `signal_aio_side_on_exit=True` ?? + # + # or + # aio_err is None + # and + # chan._trio_exited + ): raise AsyncioCancelled( f'asyncio`-side cancelled the `trio`-side,\n' @@ -511,6 +541,7 @@ async def translate_aio_errors( f'{trio_err!r}\n' ) from aio_err + # maybe the chan-closure is due to something else? else: raise @@ -552,6 +583,7 @@ async def translate_aio_errors( finally: # record wtv `trio`-side error transpired chan._trio_err = trio_err + ya_trio_exited: bool = chan._trio_exited # NOTE! by default always cancel the `asyncio` task if # we've made it this far and it's not done. @@ -568,26 +600,56 @@ async def translate_aio_errors( # indicating the lifetime of the ``asyncio``-side task # should also be terminated. or ( - chan._trio_exited + ya_trio_exited and not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. ) ): - # pass - msg: str = ( - f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' - f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' - - f'trio-side exited silently!' + report: str = ( + 'trio-side exited silently!' ) - # TODO XXX, figure out the case where calling this makes the - # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` - # hang and then don't call it in that case! - # - aio_task.cancel(msg=msg) - log.warning(msg) - # assert not aio_err, 'WTF how did asyncio do this?!' - # import pdbp; pdbp.set_trace() + assert not aio_err, 'WTF how did asyncio do this?!' + + # if the `trio.Task` already exited the `open_channel_from()` + # block we ensure the asyncio-side gets signalled via an + # explicit exception and its `Queue` is shutdown. + if ya_trio_exited: + chan._to_aio.shutdown() + + # pump the other side's task? needed? + await trio.lowlevel.checkpoint() + + if ( + not chan._trio_err + and + (fut := aio_task._fut_waiter) + ): + fut.set_exception( + TrioTaskExited( + f'The peer `asyncio` task is still blocking/running?\n' + f'>>\n' + f'|_{aio_task!r}\n' + ) + ) + else: + # from tractor._state import is_root_process + # if is_root_process(): + # breakpoint() + # import pdbp; pdbp.set_trace() + + aio_taskc_warn: str = ( + f'\n' + f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' + f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' + ) + report += aio_taskc_warn + # TODO XXX, figure out the case where calling this makes the + # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` + # hang and then don't call it in that case! + # + aio_task.cancel(msg=aio_taskc_warn) + + log.warning(report) # Required to sync with the far end `asyncio`-task to ensure # any error is captured (via monkeypatching the @@ -1077,6 +1139,8 @@ def run_as_asyncio_guest( except ( asyncio.InvalidStateError, # asyncio.CancelledError, + # ^^XXX `.shield()` call above prevents this?? + )as state_err: # XXX be super dupere noisy about abandonment issues!