diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 24f1ace..d5f78ca 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -33,13 +33,19 @@ from typing import ( ) import tractor -from tractor._exceptions import AsyncioCancelled +from tractor._exceptions import ( + AsyncioCancelled, + is_multi_cancelled, +) from tractor._state import ( debug_mode, _runtime_vars, ) from tractor.devx import _debug -from tractor.log import get_logger +from tractor.log import ( + get_logger, + StackLevelAdapter, +) from tractor.trionics._broadcast import ( broadcast_receiver, BroadcastReceiver, @@ -50,7 +56,7 @@ from outcome import ( Outcome, ) -log = get_logger(__name__) +log: StackLevelAdapter = get_logger(__name__) __all__ = [ @@ -70,9 +76,10 @@ class LinkedTaskChannel(trio.abc.Channel): _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel _to_trio: trio.MemorySendChannel - _trio_cs: trio.CancelScope _aio_task_complete: trio.Event + + _trio_err: BaseException|None = None _trio_exited: bool = False # set after ``asyncio.create_task()`` @@ -84,28 +91,40 @@ class LinkedTaskChannel(trio.abc.Channel): await self._from_aio.aclose() async def receive(self) -> Any: - async with translate_aio_errors( - self, - - # XXX: obviously this will deadlock if an on-going stream is - # being procesed. - # wait_on_aio_task=False, - ): + ''' + Receive a value from the paired `asyncio.Task` with + exception/cancel handling to teardown both sides on any + unexpected error. + ''' + try: # TODO: do we need this to guarantee asyncio code get's # cancelled in the case where the trio side somehow creates # a state where the asyncio cycle-task isn't getting the # cancel request sent by (in theory) the last checkpoint # cycle on the trio side? # await trio.lowlevel.checkpoint() - return await self._from_aio.receive() + except BaseException as err: + async with translate_aio_errors( + self, + + # XXX: obviously this will deadlock if an on-going stream is + # being procesed. + # wait_on_aio_task=False, + ): + raise err async def wait_asyncio_complete(self) -> None: await self._aio_task_complete.wait() - # def cancel_asyncio_task(self) -> None: - # self._aio_task.cancel() + def cancel_asyncio_task( + self, + msg: str = '', + ) -> None: + self._aio_task.cancel( + msg=msg, + ) async def send(self, item: Any) -> None: ''' @@ -155,7 +174,6 @@ class LinkedTaskChannel(trio.abc.Channel): def _run_asyncio_task( - func: Callable, *, qsize: int = 1, @@ -165,8 +183,9 @@ def _run_asyncio_task( ) -> LinkedTaskChannel: ''' - Run an ``asyncio`` async function or generator in a task, return - or stream the result back to the caller `trio.lowleve.Task`. + Run an `asyncio`-compat async function or generator in a task, + return or stream the result back to the caller + `trio.lowleve.Task`. ''' __tracebackhide__: bool = hide_tb @@ -204,23 +223,23 @@ def _run_asyncio_task( aio_err: BaseException|None = None chan = LinkedTaskChannel( - aio_q, # asyncio.Queue - from_aio, # recv chan - to_trio, # send chan - - cancel_scope, - aio_task_complete, + _to_aio=aio_q, # asyncio.Queue + _from_aio=from_aio, # recv chan + _to_trio=to_trio, # send chan + _trio_cs=cancel_scope, + _aio_task_complete=aio_task_complete, ) async def wait_on_coro_final_result( - to_trio: trio.MemorySendChannel, coro: Awaitable, aio_task_complete: trio.Event, ) -> None: ''' - Await ``coro`` and relay result back to ``trio``. + Await `coro` and relay result back to `trio`. + + This can only be run as an `asyncio.Task`! ''' nonlocal aio_err @@ -243,8 +262,10 @@ def _run_asyncio_task( else: if ( - result != orig and - aio_err is None and + result != orig + and + aio_err is None + and # in the `open_channel_from()` case we don't # relay through the "return value". @@ -260,12 +281,21 @@ def _run_asyncio_task( # a ``trio.EndOfChannel`` to the trio (consumer) side. to_trio.close() + # import pdbp; pdbp.set_trace() aio_task_complete.set() - log.runtime(f'`asyncio` task: {task.get_name()} is complete') + # await asyncio.sleep(0.1) + log.info( + f'`asyncio` task terminated\n' + f'x)>\n' + f' |_{task}\n' + ) # start the asyncio task we submitted from trio if not inspect.isawaitable(coro): - raise TypeError(f"No support for invoking {coro}") + raise TypeError( + f'Pass the async-fn NOT a coroutine\n' + f'{coro!r}' + ) task: asyncio.Task = asyncio.create_task( wait_on_coro_final_result( @@ -289,6 +319,10 @@ def _run_asyncio_task( raise_not_found=False, )) ): + log.info( + f'Bestowing `greenback` portal for `asyncio`-task\n' + f'{task}\n' + ) greenback.bestow_portal(task) def cancel_trio(task: asyncio.Task) -> None: @@ -304,11 +338,22 @@ def _run_asyncio_task( # task exceptions try: res: Any = task.result() + log.info( + '`trio` received final result from {task}\n' + f'|_{res}\n' + ) except BaseException as terr: task_err: BaseException = terr + # read again AFTER the `asyncio` side errors in case + # it was cancelled due to an error from `trio` (or + # some other out of band exc). + aio_err: BaseException|None = chan._aio_err + msg: str = ( - 'Infected `asyncio` task {etype_str}\n' + '`trio`-side reports that the `asyncio`-side ' + '{etype_str}\n' + # ^NOTE filled in below ) if isinstance(terr, CancelledError): msg += ( @@ -327,17 +372,18 @@ def _run_asyncio_task( msg.format(etype_str='errored') ) - assert type(terr) is type(aio_err), ( - '`asyncio` task error mismatch?!?' - ) + assert ( + type(terr) is type(aio_err) + ), '`asyncio` task error mismatch?!?' if aio_err is not None: + # import pdbp; pdbp.set_trace() # XXX: uhh is this true? # assert task_err, f'Asyncio task {task.get_name()} discrepancy!?' # NOTE: currently mem chan closure may act as a form - # of error relay (at least in the ``asyncio.CancelledError`` - # case) since we have no way to directly trigger a ``trio`` + # of error relay (at least in the `asyncio.CancelledError` + # case) since we have no way to directly trigger a `trio` # task error without creating a nursery to throw one. # We might want to change this in the future though. from_aio.close() @@ -359,29 +405,25 @@ def _run_asyncio_task( # ) # raise aio_err from task_err - # XXX: if not already, alway cancel the scope - # on a task error in case the trio task is blocking on + # XXX: if not already, alway cancel the scope on a task + # error in case the trio task is blocking on # a checkpoint. - cancel_scope.cancel() - if ( - task_err - and - aio_err is not task_err + not cancel_scope.cancelled_caught + or + not cancel_scope.cancel_called ): - raise aio_err from task_err + # import pdbp; pdbp.set_trace() + cancel_scope.cancel() - # raise any `asyncio` side error. - raise aio_err - - log.info( - '`trio` received final result from {task}\n' - f'|_{res}\n' - ) - # TODO: do we need this? - # if task_err: - # cancel_scope.cancel() - # raise task_err + if task_err: + # XXX raise any `asyncio` side error IFF it doesn't + # match the one we just caught from the task above! + # (that would indicate something weird/very-wrong + # going on?) + if aio_err is not task_err: + # import pdbp; pdbp.set_trace() + raise aio_err from task_err task.add_done_callback(cancel_trio) return chan @@ -389,13 +431,18 @@ def _run_asyncio_task( @acm async def translate_aio_errors( - chan: LinkedTaskChannel, wait_on_aio_task: bool = False, + cancel_aio_task_on_trio_exit: bool = True, ) -> AsyncIterator[None]: ''' - Error handling context around ``asyncio`` task spawns which + An error handling to cross-loop propagation context around + `asyncio.Task` spawns via one of this module's APIs: + + - `open_channel_from()` + - `run_task()` + appropriately translates errors and cancels into ``trio`` land. ''' @@ -403,88 +450,204 @@ async def translate_aio_errors( aio_err: BaseException|None = None - # TODO: make thisi a channel method? - def maybe_raise_aio_err( - err: Exception|None = None - ) -> None: - aio_err = chan._aio_err - if ( - aio_err is not None - and - # not isinstance(aio_err, CancelledError) - type(aio_err) != CancelledError - ): - # always raise from any captured asyncio error - if err: - raise aio_err from err - else: - raise aio_err - - task = chan._aio_task - assert task + aio_task: asyncio.Task = chan._aio_task + assert aio_task + trio_err: BaseException|None = None try: - yield - + yield # back to one of the cross-loop apis except ( trio.Cancelled, - ): - # relay cancel through to called ``asyncio`` task + ) as _trio_err: + trio_err = _trio_err assert chan._aio_task - chan._aio_task.cancel( - msg=f'the `trio` caller task was cancelled: {trio_task.name}' + + # import pdbp; pdbp.set_trace() # lolevel-debug + + # relay cancel through to called ``asyncio`` task + chan._aio_err = AsyncioCancelled( + f'trio`-side cancelled the `asyncio`-side,\n' + f'c)>\n' + f' |_{trio_task}\n\n' + + + f'{trio_err!r}\n' ) - raise + + # XXX NOTE XXX seems like we can get all sorts of unreliable + # behaviour from `asyncio` under various cancellation + # conditions (like SIGINT/kbi) when this is used.. + # SO FOR NOW, try to avoid it at most costs! + # + # aio_task.cancel( + # msg=f'the `trio` parent task was cancelled: {trio_task.name}' + # ) + # raise except ( - # NOTE: see the note in the ``cancel_trio()`` asyncio task + # NOTE: also see note in the `cancel_trio()` asyncio task # termination callback trio.ClosedResourceError, # trio.BrokenResourceError, - ): + + ) as _trio_err: + trio_err = _trio_err aio_err = chan._aio_err + # import pdbp; pdbp.set_trace() + + # XXX if an underlying `asyncio.CancelledError` triggered + # this channel close, raise our (non-`BaseException`) wrapper + # exception (`AsyncioCancelled`) from that source error. if ( - task.cancelled() + # NOTE, not until it terminates? + aio_task.cancelled() and type(aio_err) is CancelledError ): - # if an underlying `asyncio.CancelledError` triggered this - # channel close, raise our (non-``BaseException``) wrapper - # error: ``AsyncioCancelled`` from that source error. raise AsyncioCancelled( - f'Task cancelled\n' - f'|_{task}\n' + f'asyncio`-side cancelled the `trio`-side,\n' + f'c(>\n' + f' |_{aio_task}\n\n' + + f'{trio_err!r}\n' ) from aio_err else: raise - finally: + except BaseException as _trio_err: + trio_err = _trio_err + log.exception( + '`trio`-side task errored?' + ) + + entered: bool = await _debug._maybe_enter_pm( + trio_err, + api_frame=inspect.currentframe(), + ) if ( - # NOTE: always cancel the ``asyncio`` task if we've made it - # this far and it's not done. - not task.done() and aio_err + not entered + and + not is_multi_cancelled(trio_err) + ): + log.exception('actor crashed\n') + + aio_taskc = AsyncioCancelled( + f'`trio`-side task errored!\n' + f'{trio_err}' + ) #from trio_err + + try: + aio_task.set_exception(aio_taskc) + except ( + asyncio.InvalidStateError, + RuntimeError, + # ^XXX, uhh bc apparently we can't use `.set_exception()` + # any more XD .. ?? + ): + wait_on_aio_task = False + + # import pdbp; pdbp.set_trace() + # raise aio_taskc from trio_err + + finally: + # record wtv `trio`-side error transpired + chan._trio_err = trio_err + + # NOTE! by default always cancel the `asyncio` task if + # we've made it this far and it's not done. + # TODO, how to detect if there's an out-of-band error that + # caused the exit? + if ( + cancel_aio_task_on_trio_exit + and + not aio_task.done() + and + aio_err # or the trio side has exited it's surrounding cancel scope # indicating the lifetime of the ``asyncio``-side task # should also be terminated. - or chan._trio_exited - ): - log.runtime( - f'Cancelling `asyncio`-task: {task.get_name()}' + or ( + chan._trio_exited + and + not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. ) - # assert not aio_err, 'WTF how did asyncio do this?!' - task.cancel() + ): + # pass + msg: str = ( + f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' + f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' - # Required to sync with the far end ``asyncio``-task to ensure + f'trio-side exited silently!' + ) + # TODO XXX, figure out the case where calling this makes the + # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` + # hang and then don't call it in that case! + # + aio_task.cancel(msg=msg) + log.warning(msg) + # assert not aio_err, 'WTF how did asyncio do this?!' + # import pdbp; pdbp.set_trace() + + # Required to sync with the far end `asyncio`-task to ensure # any error is captured (via monkeypatching the - # ``channel._aio_err``) before calling ``maybe_raise_aio_err()`` + # `channel._aio_err`) before calling ``maybe_raise_aio_err()`` # below! + # + # XXX NOTE XXX the `task.set_exception(aio_taskc)` call above + # MUST NOT EXCEPT or this WILL HANG!! + # + # so if you get a hang maybe step through and figure out why + # it erroed out up there! + # if wait_on_aio_task: + # await chan.wait_asyncio_complete() await chan._aio_task_complete.wait() + log.info( + 'asyncio-task is done and unblocked trio-side!\n' + ) + + # TODO? + # -[ ] make this a channel method, OR + # -[ ] just put back inline below? + # + def maybe_raise_aio_side_err( + trio_err: Exception, + ) -> None: + ''' + Raise any `trio`-side-caused cancellation or legit task + error normally propagated from the caller of either, + - `open_channel_from()` + - `run_task()` + + ''' + aio_err: BaseException|None = chan._aio_err + + # Check if the asyncio-side is the cause of the trio-side + # error. + if ( + aio_err is not None + and + type(aio_err) is not AsyncioCancelled + + # not isinstance(aio_err, CancelledError) + # type(aio_err) is not CancelledError + ): + # always raise from any captured asyncio error + if trio_err: + raise trio_err from aio_err + + raise aio_err + + if trio_err: + raise trio_err # NOTE: if any ``asyncio`` error was caught, raise it here inline # here in the ``trio`` task - maybe_raise_aio_err() + # if trio_err: + maybe_raise_aio_side_err( + trio_err=trio_err + ) async def run_task( @@ -496,8 +659,8 @@ async def run_task( ) -> Any: ''' - Run an `asyncio` async function or generator in a task, return - or stream the result back to `trio`. + Run an `asyncio`-compat async function or generator in a task, + return or stream the result back to `trio`. ''' # simple async func @@ -537,6 +700,7 @@ async def open_channel_from( provide_channels=True, **kwargs, ) + # TODO, tuple form here? async with chan._from_aio: async with translate_aio_errors( chan, @@ -685,18 +849,21 @@ def run_as_asyncio_guest( # Uh, oh. # # :o - - # It looks like your event loop has caught a case of the ``trio``s. - - # :() - - # Don't worry, we've heard you'll barely notice. You might - # hallucinate a few more propagating errors and feel like your - # digestion has slowed but if anything get's too bad your parents - # will know about it. - + # + # looks like your stdlib event loop has caught a case of "the trios" ! + # + # :O + # + # Don't worry, we've heard you'll barely notice. + # # :) - + # + # You might hallucinate a few more propagating errors and feel + # like your digestion has slowed, but if anything get's too bad + # your parents will know about it. + # + # B) + # async def aio_main(trio_main): ''' Main `asyncio.Task` which calls @@ -713,16 +880,20 @@ def run_as_asyncio_guest( '-> built a `trio`-done future\n' ) - # TODO: shoudn't this be done in the guest-run trio task? - # if debug_mode(): - # # XXX make it obvi we know this isn't supported yet! - # log.error( - # 'Attempting to enter unsupported `greenback` init ' - # 'from `asyncio` task..' - # ) - # await _debug.maybe_init_greenback( - # force_reload=True, - # ) + # TODO: is this evern run or needed? + # -[ ] pretty sure it never gets run for root-infected-aio + # since this main task is always the parent of any + # eventual `open_root_actor()` call? + if debug_mode(): + log.error( + 'Attempting to enter non-required `greenback` init ' + 'from `asyncio` task ???' + ) + # XXX make it obvi we know this isn't supported yet! + assert 0 + # await _debug.maybe_init_greenback( + # force_reload=True, + # ) def trio_done_callback(main_outcome): log.runtime( @@ -732,6 +903,7 @@ def run_as_asyncio_guest( ) if isinstance(main_outcome, Error): + # import pdbp; pdbp.set_trace() error: BaseException = main_outcome.error # show an dedicated `asyncio`-side tb from the error @@ -751,7 +923,7 @@ def run_as_asyncio_guest( trio_done_fute.set_result(main_outcome) log.info( - f'`trio` guest-run finished with outcome\n' + f'`trio` guest-run finished with,\n' f')>\n' f'|_{trio_done_fute}\n' ) @@ -777,9 +949,20 @@ def run_as_asyncio_guest( done_callback=trio_done_callback, ) fute_err: BaseException|None = None - try: out: Outcome = await asyncio.shield(trio_done_fute) + # ^TODO still don't really understand why the `.shield()` + # is required ... ?? + # https://docs.python.org/3/library/asyncio-task.html#asyncio.shield + # ^ seems as though in combo with the try/except here + # we're BOLDLY INGORING cancel of the trio fute? + # + # I guess it makes sense bc we don't want `asyncio` to + # cancel trio just because they can't handle SIGINT + # sanely? XD .. kk + + # XXX, sin-shield causes guest-run abandons on SIGINT.. + # out: Outcome = await trio_done_fute # NOTE will raise (via `Error.unwrap()`) from any # exception packed into the guest-run's `main_outcome`. @@ -802,27 +985,32 @@ def run_as_asyncio_guest( fute_err = _fute_err err_message: str = ( 'main `asyncio` task ' + 'was cancelled!\n' ) - if isinstance(fute_err, asyncio.CancelledError): - err_message += 'was cancelled!\n' - else: - err_message += f'errored with {out.error!r}\n' + # TODO, handle possible edge cases with + # `open_root_actor()` closing before this is run! + # actor: tractor.Actor = tractor.current_actor() + log.exception( err_message + 'Cancelling `trio`-side `tractor`-runtime..\n' - f'c)>\n' + f'c(>\n' f' |_{actor}.cancel_soon()\n' ) - # XXX WARNING XXX the next LOCs are super important, since - # without them, we can get guest-run abandonment cases - # where `asyncio` will not schedule or wait on the `trio` - # guest-run task before final shutdown! This is - # particularly true if the `trio` side has tasks doing - # shielded work when a SIGINT condition occurs. + # XXX WARNING XXX the next LOCs are super important! + # + # SINCE without them, we can get guest-run ABANDONMENT + # cases where `asyncio` will not schedule or wait on the + # guest-run `trio.Task` nor invoke its registered + # `trio_done_callback()` before final shutdown! + # + # This is particularly true if the `trio` side has tasks + # in shielded sections when an OC-cancel (SIGINT) + # condition occurs! # # We now have the # `test_infected_asyncio.test_sigint_closes_lifetime_stack()` @@ -886,7 +1074,10 @@ def run_as_asyncio_guest( try: return trio_done_fute.result() - except asyncio.exceptions.InvalidStateError as state_err: + except ( + asyncio.InvalidStateError, + # asyncio.CancelledError, + )as state_err: # XXX be super dupere noisy about abandonment issues! aio_task: asyncio.Task = asyncio.current_task()