From 9b77b8c9eea7ff4c3dfa957dd5d3884cfe87d9cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Feb 2022 13:43:11 -0500 Subject: [PATCH] Add more explicit `asyncio` task error logging When an `asyncio` side task errors or is cancelled we now explicitly report the traceback and task name if possible as well as the source reason for the error (some come from the `trio` side). Further, properly set any `trio` side exception (after unwrapping it from the `outcome.Error`) on the future that runs the `trio` guest run. --- tractor/to_asyncio.py | 75 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 65 insertions(+), 10 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 9b18a87..371a936 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -23,6 +23,7 @@ from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect +import traceback from typing import ( Any, Callable, @@ -32,6 +33,7 @@ from typing import ( ) import trio +from outcome import Error from .log import get_logger from ._state import current_actor @@ -67,6 +69,14 @@ class LinkedTaskChannel(trio.abc.Channel): async def receive(self) -> Any: async with translate_aio_errors(self): + + # TODO: do we need this to guarantee asyncio code get's + # cancelled in the case where the trio side somehow creates + # a state where the asyncio cycle-task isn't getting the + # cancel request sent by (in theory) the last checkpoint + # cycle on the trio side? + # await trio.lowlevel.checkpoint() + return await self._from_aio.receive() async def wait_ayncio_complete(self) -> None: @@ -202,20 +212,20 @@ def _run_asyncio_task( ''' nonlocal chan aio_err = chan._aio_err + task_err: Optional[BaseException] = None # only to avoid ``asyncio`` complaining about uncaptured # task exceptions try: task.exception() except BaseException as terr: + task_err = terr + log.exception(f'`asyncio` task: {task.get_name()} errored') assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' if aio_err is not None: - if type(aio_err) is CancelledError: - log.cancel("infected task was cancelled") - else: - aio_err.with_traceback(aio_err.__traceback__) - log.exception("infected task errorred:") + # XXX: uhh is this true? + # assert task_err, f'Asyncio task {task.get_name()} discrepancy!?' # NOTE: currently mem chan closure may act as a form # of error relay (at least in the ``asyncio.CancelledError`` @@ -224,8 +234,25 @@ def _run_asyncio_task( # We might want to change this in the future though. from_aio.close() - task.add_done_callback(cancel_trio) + if type(aio_err) is CancelledError: + log.cancel("infected task was cancelled") + # TODO: show that the cancellation originated + # from the ``trio`` side? right? + # if cancel_scope.cancelled: + # raise aio_err from err + + elif task_err is None: + aio_err.with_traceback(aio_err.__traceback__) + msg = ''.join(traceback.format_exception(aio_err)) + log.error( + f'infected task errorred:\n{msg}' + ) + + # raise any ``asyncio`` side error. + raise aio_err + + task.add_done_callback(cancel_trio) return chan @@ -240,6 +267,8 @@ async def translate_aio_errors( appropriately translates errors and cancels into ``trio`` land. ''' + trio_task = trio.lowlevel.current_task() + aio_err: Optional[BaseException] = None def maybe_raise_aio_err( @@ -260,10 +289,21 @@ async def translate_aio_errors( assert task try: yield + + except ( + trio.Cancelled, + ): + # relay cancel through to called ``asyncio`` task + chan._aio_task.cancel( + msg=f'the `trio` caller task was cancelled:\n{trio_task.name}' + ) + raise + except ( # NOTE: see the note in the ``cancel_trio()`` asyncio task # termination callback trio.ClosedResourceError, + # trio.BrokenResourceError, ): aio_err = chan._aio_err if ( @@ -277,6 +317,7 @@ async def translate_aio_errors( else: raise + finally: # always cancel the ``asyncio`` task if we've made it this far # and it's not done. @@ -289,6 +330,7 @@ async def translate_aio_errors( maybe_raise_aio_err() + async def run_task( func: Callable, *, @@ -309,7 +351,6 @@ async def run_task( **kwargs, ) with chan._from_aio: - # try: async with translate_aio_errors(chan): # return single value that is the output from the # ``asyncio`` function-as-task. Expect the mem chan api to @@ -343,7 +384,7 @@ async def open_channel_from( # ``asyncio`` task. first = await chan.receive() - # stream values upward + # deliver stream handle upward yield first, chan @@ -380,9 +421,22 @@ def run_as_asyncio_guest( trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): + actor = current_actor() - print(f"trio_main finished: {main_outcome!r}") - trio_done_fut.set_result(main_outcome) + if isinstance(main_outcome, Error): + error = main_outcome.error + trio_done_fut.set_exception(error) + + # TODO: explicit asyncio tb? + # traceback.print_exception(error) + + # XXX: do we need this? + # actor.cancel_soon() + + main_outcome.unwrap() + else: + trio_done_fut.set_result(main_outcome) + print(f"trio_main finished: {main_outcome!r}") # start the infection: run trio on the asyncio loop in "guest mode" log.info(f"Infecting asyncio process with {trio_main}") @@ -392,6 +446,7 @@ def run_as_asyncio_guest( run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) + # ``.unwrap()`` will raise here on error return (await trio_done_fut).unwrap() # might as well if it's installed.