forked from goodboy/tractor
1
0
Fork 0

Add more explicit `asyncio` task error logging

When an `asyncio` side task errors or is cancelled we now explicitly
report the traceback and task name if possible as well as the source
reason for the error (some come from the `trio` side).

Further, properly set any `trio` side exception (after unwrapping it
from the `outcome.Error`) on the future that runs the `trio` guest run.
aio_explicit_task_cancels
Tyler Goodlet 2022-02-24 13:43:11 -05:00
parent 13c8300226
commit 9b77b8c9ee
1 changed files with 65 additions and 10 deletions

View File

@ -23,6 +23,7 @@ from asyncio.exceptions import CancelledError
from contextlib import asynccontextmanager as acm
from dataclasses import dataclass
import inspect
import traceback
from typing import (
Any,
Callable,
@ -32,6 +33,7 @@ from typing import (
)
import trio
from outcome import Error
from .log import get_logger
from ._state import current_actor
@ -67,6 +69,14 @@ class LinkedTaskChannel(trio.abc.Channel):
async def receive(self) -> Any:
async with translate_aio_errors(self):
# TODO: do we need this to guarantee asyncio code get's
# cancelled in the case where the trio side somehow creates
# a state where the asyncio cycle-task isn't getting the
# cancel request sent by (in theory) the last checkpoint
# cycle on the trio side?
# await trio.lowlevel.checkpoint()
return await self._from_aio.receive()
async def wait_ayncio_complete(self) -> None:
@ -202,20 +212,20 @@ def _run_asyncio_task(
'''
nonlocal chan
aio_err = chan._aio_err
task_err: Optional[BaseException] = None
# only to avoid ``asyncio`` complaining about uncaptured
# task exceptions
try:
task.exception()
except BaseException as terr:
task_err = terr
log.exception(f'`asyncio` task: {task.get_name()} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None:
if type(aio_err) is CancelledError:
log.cancel("infected task was cancelled")
else:
aio_err.with_traceback(aio_err.__traceback__)
log.exception("infected task errorred:")
# XXX: uhh is this true?
# assert task_err, f'Asyncio task {task.get_name()} discrepancy!?'
# NOTE: currently mem chan closure may act as a form
# of error relay (at least in the ``asyncio.CancelledError``
@ -224,8 +234,25 @@ def _run_asyncio_task(
# We might want to change this in the future though.
from_aio.close()
task.add_done_callback(cancel_trio)
if type(aio_err) is CancelledError:
log.cancel("infected task was cancelled")
# TODO: show that the cancellation originated
# from the ``trio`` side? right?
# if cancel_scope.cancelled:
# raise aio_err from err
elif task_err is None:
aio_err.with_traceback(aio_err.__traceback__)
msg = ''.join(traceback.format_exception(aio_err))
log.error(
f'infected task errorred:\n{msg}'
)
# raise any ``asyncio`` side error.
raise aio_err
task.add_done_callback(cancel_trio)
return chan
@ -240,6 +267,8 @@ async def translate_aio_errors(
appropriately translates errors and cancels into ``trio`` land.
'''
trio_task = trio.lowlevel.current_task()
aio_err: Optional[BaseException] = None
def maybe_raise_aio_err(
@ -260,10 +289,21 @@ async def translate_aio_errors(
assert task
try:
yield
except (
trio.Cancelled,
):
# relay cancel through to called ``asyncio`` task
chan._aio_task.cancel(
msg=f'the `trio` caller task was cancelled:\n{trio_task.name}'
)
raise
except (
# NOTE: see the note in the ``cancel_trio()`` asyncio task
# termination callback
trio.ClosedResourceError,
# trio.BrokenResourceError,
):
aio_err = chan._aio_err
if (
@ -277,6 +317,7 @@ async def translate_aio_errors(
else:
raise
finally:
# always cancel the ``asyncio`` task if we've made it this far
# and it's not done.
@ -289,6 +330,7 @@ async def translate_aio_errors(
maybe_raise_aio_err()
async def run_task(
func: Callable,
*,
@ -309,7 +351,6 @@ async def run_task(
**kwargs,
)
with chan._from_aio:
# try:
async with translate_aio_errors(chan):
# return single value that is the output from the
# ``asyncio`` function-as-task. Expect the mem chan api to
@ -343,7 +384,7 @@ async def open_channel_from(
# ``asyncio`` task.
first = await chan.receive()
# stream values upward
# deliver stream handle upward
yield first, chan
@ -380,9 +421,22 @@ def run_as_asyncio_guest(
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
actor = current_actor()
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
if isinstance(main_outcome, Error):
error = main_outcome.error
trio_done_fut.set_exception(error)
# TODO: explicit asyncio tb?
# traceback.print_exception(error)
# XXX: do we need this?
# actor.cancel_soon()
main_outcome.unwrap()
else:
trio_done_fut.set_result(main_outcome)
print(f"trio_main finished: {main_outcome!r}")
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
@ -392,6 +446,7 @@ def run_as_asyncio_guest(
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
# ``.unwrap()`` will raise here on error
return (await trio_done_fut).unwrap()
# might as well if it's installed.