From 9133f42b07e69cbae2d8c5da077397b946cf4525 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 26 Jun 2024 13:48:36 -0400 Subject: [PATCH] Solve our abandonment issues.. To make the recent set of tests pass this (hopefully) finally solves all `asyncio` embedded `trio` guest-run abandonment by ensuring we "pump the event loop" until the guest-run future is fully complete. Accomplished via simple poll loop of the form `while not trio_done_fut.done(): await asyncio.sleep(.1)` in the `aio_main()` task's exception teardown sequence. The loop does a naive 10ms "pump-via-sleep & poll" for the `trio` side to complete before finally exiting (and presumably raising) from the SIGINT cancellation. Other related cleanups and refinements: - use `asyncio.Task.result()` inside `cancel_trio()` since it also inline-raises any exception outcome and we can also log-report the result in non-error cases. - comment out buncha not-sure-we-need-it stuff in `cancel_trio()`. - remove the botched `AsyncioCancelled(CancelledError):` idea obvi XD - comment `greenback` init for now in `aio_main()` since (pretty sure) we don't ever want to actually REPL in that specific func-as-task? - always capture any `fute_err: BaseException` from the `main_outcome: Outcome` delivered by the `trio` side guest-run task. - add and raise a new super noisy `AsyncioRuntimeTranslationError` whenever we detect that the guest-run `trio_done_fut` has not completed before task exit; should avoid abandonment issues ever happening again without knowing! --- tractor/to_asyncio.py | 246 ++++++++++++++++++++++++++++++------------ 1 file changed, 176 insertions(+), 70 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index e041721..fb18ba8 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -33,11 +33,12 @@ from typing import ( ) import tractor +from tractor._exceptions import AsyncioCancelled from tractor._state import ( debug_mode, ) -from tractor.log import get_logger from tractor.devx import _debug +from tractor.log import get_logger from tractor.trionics._broadcast import ( broadcast_receiver, BroadcastReceiver, @@ -51,7 +52,10 @@ from outcome import ( log = get_logger(__name__) -__all__ = ['run_task', 'run_as_asyncio_guest'] +__all__ = [ + 'run_task', + 'run_as_asyncio_guest', +] @dataclass @@ -155,15 +159,16 @@ def _run_asyncio_task( *, qsize: int = 1, provide_channels: bool = False, + hide_tb: bool = False, **kwargs, ) -> LinkedTaskChannel: ''' Run an ``asyncio`` async function or generator in a task, return - or stream the result back to ``trio``. + or stream the result back to the caller `trio.lowleve.Task`. ''' - __tracebackhide__ = True + __tracebackhide__: bool = hide_tb if not tractor.current_actor().is_infected_aio(): raise RuntimeError( "`infect_asyncio` mode is not enabled!?" @@ -224,6 +229,7 @@ def _run_asyncio_task( try: result = await coro except BaseException as aio_err: + chan._aio_err = aio_err if isinstance(aio_err, CancelledError): log.runtime( '`asyncio` task was cancelled..\n' @@ -232,7 +238,6 @@ def _run_asyncio_task( log.exception( '`asyncio` task errored\n' ) - chan._aio_err = aio_err raise else: @@ -268,7 +273,7 @@ def _run_asyncio_task( aio_task_complete ) ) - chan._aio_task = task + chan._aio_task: asyncio.Task = task # XXX TODO XXX get this actually workin.. XD # maybe setup `greenback` for `asyncio`-side task REPLing @@ -284,19 +289,19 @@ def _run_asyncio_task( def cancel_trio(task: asyncio.Task) -> None: ''' - Cancel the calling ``trio`` task on error. + Cancel the calling `trio` task on error. ''' nonlocal chan - aio_err = chan._aio_err + aio_err: BaseException|None = chan._aio_err task_err: BaseException|None = None - # only to avoid ``asyncio`` complaining about uncaptured + # only to avoid `asyncio` complaining about uncaptured # task exceptions try: - task.exception() + res: Any = task.result() except BaseException as terr: - task_err = terr + task_err: BaseException = terr msg: str = ( 'Infected `asyncio` task {etype_str}\n' @@ -328,42 +333,49 @@ def _run_asyncio_task( if task_err is None: assert aio_err - aio_err.with_traceback(aio_err.__traceback__) - # log.error( - # 'infected task errorred' - # ) + # wait, wut? + # aio_err.with_traceback(aio_err.__traceback__) - # TODO: show that the cancellation originated - # from the ``trio`` side? right? - # elif type(aio_err) is CancelledError: + # TODO: show when cancellation originated + # from each side more pedantically? + # elif ( + # type(aio_err) is CancelledError + # and # trio was the cause? + # cancel_scope.cancel_called + # ): # log.cancel( - # 'infected task was cancelled' + # 'infected task was cancelled by `trio`-side' # ) + # raise aio_err from task_err - # if cancel_scope.cancelled: - # raise aio_err from err - - # XXX: alway cancel the scope on error - # in case the trio task is blocking - # on a checkpoint. + # XXX: if not already, alway cancel the scope + # on a task error in case the trio task is blocking on + # a checkpoint. cancel_scope.cancel() + if ( + task_err + and + aio_err is not task_err + ): + raise aio_err from task_err + # raise any `asyncio` side error. raise aio_err + log.info( + '`trio` received final result from {task}\n' + f'|_{res}\n' + ) + # TODO: do we need this? + # if task_err: + # cancel_scope.cancel() + # raise task_err + task.add_done_callback(cancel_trio) return chan -class AsyncioCancelled(CancelledError): - ''' - Asyncio cancelled translation (non-base) error - for use with the ``to_asyncio`` module - to be raised in the ``trio`` side task - - ''' - - @acm async def translate_aio_errors( @@ -386,7 +398,9 @@ async def translate_aio_errors( ) -> None: aio_err = chan._aio_err if ( - aio_err is not None and + aio_err is not None + and + # not isinstance(aio_err, CancelledError) type(aio_err) != CancelledError ): # always raise from any captured asyncio error @@ -418,13 +432,17 @@ async def translate_aio_errors( ): aio_err = chan._aio_err if ( - task.cancelled() and + task.cancelled() + and type(aio_err) is CancelledError ): - # if an underlying ``asyncio.CancelledError`` triggered this + # if an underlying `asyncio.CancelledError` triggered this # channel close, raise our (non-``BaseException``) wrapper # error: ``AsyncioCancelled`` from that source error. - raise AsyncioCancelled from aio_err + raise AsyncioCancelled( + f'Task cancelled\n' + f'|_{task}\n' + ) from aio_err else: raise @@ -467,8 +485,8 @@ async def run_task( ) -> Any: ''' - Run an ``asyncio`` async function or generator in a task, return - or stream the result back to ``trio``. + Run an `asyncio` async function or generator in a task, return + or stream the result back to `trio`. ''' # simple async func @@ -526,10 +544,27 @@ async def open_channel_from( chan._to_trio.close() +class AsyncioRuntimeTranslationError(RuntimeError): + ''' + We failed to correctly relay runtime semantics and/or maintain SC + supervision rules cross-event-loop. + + ''' + + def run_as_asyncio_guest( trio_main: Callable, + # ^-NOTE-^ when spawned with `infected_aio=True` this func is + # normally `Actor._async_main()` as is passed by some boostrap + # entrypoint like `._entry._trio_main()`. ) -> None: +# ^-TODO-^ technically whatever `trio_main` returns.. we should +# try to use func-typevar-params at leaast by 3.13! +# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#callback-protocols +# -[ ] https://peps.python.org/pep-0646/#using-type-variable-tuples-in-functions +# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#unpack-for-keyword-arguments +# -[ ] https://peps.python.org/pep-0718/ ''' Entry for an "infected ``asyncio`` actor". @@ -555,7 +590,13 @@ def run_as_asyncio_guest( # :) async def aio_main(trio_main): + ''' + Main `asyncio.Task` which calls + `trio.lowlevel.start_guest_run()` to "infect" the `asyncio` + event-loop by embedding the `trio` scheduler allowing us to + boot the `tractor` runtime and connect back to our parent. + ''' loop = asyncio.get_running_loop() trio_done_fut = asyncio.Future() startup_msg: str = ( @@ -564,17 +605,22 @@ def run_as_asyncio_guest( '-> built a `trio`-done future\n' ) - if debug_mode(): - # XXX make it obvi we know this isn't supported yet! - log.error( - 'Attempting to enter unsupported `greenback` init ' - 'from `asyncio` task..' - ) - await _debug.maybe_init_greenback( - force_reload=True, - ) + # TODO: shoudn't this be done in the guest-run trio task? + # if debug_mode(): + # # XXX make it obvi we know this isn't supported yet! + # log.error( + # 'Attempting to enter unsupported `greenback` init ' + # 'from `asyncio` task..' + # ) + # await _debug.maybe_init_greenback( + # force_reload=True, + # ) def trio_done_callback(main_outcome): + log.info( + f'trio_main finished with\n' + f'|_{main_outcome!r}' + ) if isinstance(main_outcome, Error): error: BaseException = main_outcome.error @@ -594,7 +640,6 @@ def run_as_asyncio_guest( else: trio_done_fut.set_result(main_outcome) - log.runtime(f'trio_main finished: {main_outcome!r}') startup_msg += ( f'-> created {trio_done_callback!r}\n' @@ -613,26 +658,48 @@ def run_as_asyncio_guest( run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) + fute_err: BaseException|None = None try: - # TODO: better SIGINT handling since shielding seems to - # make NO DIFFERENCE XD - # -[ ] maybe this is due to 3.11's recent SIGINT handling - # changes and we can better work with/around it? - # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption out: Outcome = await asyncio.shield(trio_done_fut) - # NOTE `Error.unwrap()` will raise + + # NOTE will raise (via `Error.unwrap()`) from any + # exception packed into the guest-run's `main_outcome`. return out.unwrap() - except asyncio.CancelledError: + except ( + # XXX special SIGINT-handling is required since + # `asyncio.shield()`-ing seems to NOT handle that case as + # per recent changes in 3.11: + # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption + # + # NOTE: further, apparently ONLY need to handle this + # special SIGINT case since all other `asyncio`-side + # errors can be processed via our `chan._aio_err` + # relaying (right?); SIGINT seems to be totally diff + # error path in `asyncio`'s runtime..? + asyncio.CancelledError, + + ) as fute_err: + err_message: str = ( + 'main `asyncio` task ' + ) + if isinstance(fute_err, asyncio.CancelledError): + err_message += 'was cancelled!\n' + else: + err_message += f'errored with {out.error!r}\n' + actor: tractor.Actor = tractor.current_actor() log.exception( - '`asyncio`-side main task was cancelled!\n' - 'Cancelling actor-runtime..\n' + err_message + + + 'Cancelling `trio`-side `tractor`-runtime..\n' f'c)>\n' f' |_{actor}.cancel_soon()\n' - ) + # TODO: reduce this comment bloc since abandon issues are + # now solved? + # # XXX NOTE XXX the next LOC is super important!!! # => without it, we can get a guest-run abandonment case # where asyncio will not trigger `trio` in a final event @@ -681,16 +748,55 @@ def run_as_asyncio_guest( # is apparently a working fix! actor.cancel_soon() - # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to - # `trio`-guest-run to complete and teardown !! + # XXX NOTE XXX pump the `asyncio` event-loop to allow + # `trio`-side to `trio`-guest-run to complete and + # teardown !! # - # XXX WITHOUT THIS the guest-run gets race-conditionally - # abandoned by `asyncio`!! - # XD XD XD - await asyncio.shield( - asyncio.sleep(.1) # NOPE! it can't be 0 either XD - ) - raise + # *WITHOUT THIS* the guest-run can get race-conditionally abandoned!! + # XD + # + await asyncio.sleep(.1) # `delay` can't be 0 either XD + while not trio_done_fut.done(): + log.runtime( + 'Waiting on main guest-run `asyncio` task to complete..\n' + f'|_trio_done_fut: {trio_done_fut}\n' + ) + await asyncio.sleep(.1) + + # XXX: don't actually need the shield.. seems to + # make no difference (??) and we know it spawns an + # internal task.. + # await asyncio.shield(asyncio.sleep(.1)) + + # XXX alt approach but can block indefinitely.. + # so don't use? + # loop._run_once() + + try: + return trio_done_fut.result() + except asyncio.exceptions.InvalidStateError as state_err: + + # XXX be super dupere noisy about abandonment issues! + aio_task: asyncio.Task = asyncio.current_task() + message: str = ( + 'The `asyncio`-side task likely exited before the ' + '`trio`-side guest-run completed!\n\n' + ) + if fute_err: + message += ( + f'The main {aio_task}\n' + f'STOPPED due to {type(fute_err)}\n\n' + ) + + message += ( + f'Likely something inside our guest-run-as-task impl is ' + f'not effectively waiting on the `trio`-side to complete ?!\n' + f'This code -> {aio_main!r}\n\n' + + 'Below you will likely see a ' + '"RuntimeWarning: Trio guest run got abandoned.." !!\n' + ) + raise AsyncioRuntimeTranslationError(message) from state_err # might as well if it's installed. try: @@ -698,7 +804,7 @@ def run_as_asyncio_guest( loop = uvloop.new_event_loop() asyncio.set_event_loop(loop) except ImportError: - pass + log.runtime('`uvloop` not available..') return asyncio.run( aio_main(trio_main),