Much more limited `asyncio.Task.cancel()` use

Since it can not only cause the guest-mode run to abandon but also in
some edge cases prevent `trio`-errors from propagating (at least on
py3.12-13?) as discovered as part of supporting this mode officially
in the *root actor*.

As such try to avoid that method as much as possible instead opting to
pass the `trio`-side error via the iter-task channel ref.

Deats,
- add a `LinkedTaskChannel._trio_err: BaseException|None` which gets set
  whenver the `trio.Task` error is caught; ONLY set `AsyncioCancelled`
  when the `trio` task was for sure the cause, whether itself cancelled
  or errored.
- always check for this error when exiting the `asyncio` side (even when
  terminated via a call to `asyncio.Task.cancel()` or during any other
  `CancelledError` handling such that the `asyncio`-task can expect to
  handle `AsyncioCancelled` due to the above^^ cases.
- never `cs.cancel()` the `trio` side unless that cancel scope has not
  yet been `.cancel_called` whatsoever; it's a noop anyway.
- only raise any exc from `asyncio.Task.result()` when `chan._aio_err`
  does not already match it since the existence of the pre-existing
  `task_err` means `asyncio` prolly intends (or has already) raised and
  interrupted the task elsewhere.

Various supporting tweaks,
- don't bother maybe-init-ing `greenback` from the actor entrypoint
  since we already need to (and do) bestow the portals to each `asyncio`
  task spawned using the `run_task()`/`open_channel_from()` API; further
  the init-ing should be done already by client code that enables
  infected mode (even in the root actor).
 |_we should prolly also codify it from any
   `run_daemon(infected_aio=True, debug_mode=True)` usage we offer.
- pass all the `_<field>`s to `Linked TaskChannel` explicitly in named
  kwarg style.
- better sclang-style log reports throughout, particularly on teardowns.
- generally more/better comments and docs around (not well understood)
  edge cases.
- prep to just inline `maybe_raise_aio_side_err()` closure..
hilevel_serman
Tyler Goodlet 2024-12-31 18:10:09 -05:00
parent c63b94f61f
commit 7b8a8dcc7c
1 changed files with 331 additions and 140 deletions

View File

@ -33,13 +33,19 @@ from typing import (
)
import tractor
from tractor._exceptions import AsyncioCancelled
from tractor._exceptions import (
AsyncioCancelled,
is_multi_cancelled,
)
from tractor._state import (
debug_mode,
_runtime_vars,
)
from tractor.devx import _debug
from tractor.log import get_logger
from tractor.log import (
get_logger,
StackLevelAdapter,
)
from tractor.trionics._broadcast import (
broadcast_receiver,
BroadcastReceiver,
@ -50,7 +56,7 @@ from outcome import (
Outcome,
)
log = get_logger(__name__)
log: StackLevelAdapter = get_logger(__name__)
__all__ = [
@ -70,9 +76,10 @@ class LinkedTaskChannel(trio.abc.Channel):
_to_aio: asyncio.Queue
_from_aio: trio.MemoryReceiveChannel
_to_trio: trio.MemorySendChannel
_trio_cs: trio.CancelScope
_aio_task_complete: trio.Event
_trio_err: BaseException|None = None
_trio_exited: bool = False
# set after ``asyncio.create_task()``
@ -84,28 +91,40 @@ class LinkedTaskChannel(trio.abc.Channel):
await self._from_aio.aclose()
async def receive(self) -> Any:
async with translate_aio_errors(
self,
# XXX: obviously this will deadlock if an on-going stream is
# being procesed.
# wait_on_aio_task=False,
):
'''
Receive a value from the paired `asyncio.Task` with
exception/cancel handling to teardown both sides on any
unexpected error.
'''
try:
# TODO: do we need this to guarantee asyncio code get's
# cancelled in the case where the trio side somehow creates
# a state where the asyncio cycle-task isn't getting the
# cancel request sent by (in theory) the last checkpoint
# cycle on the trio side?
# await trio.lowlevel.checkpoint()
return await self._from_aio.receive()
except BaseException as err:
async with translate_aio_errors(
self,
# XXX: obviously this will deadlock if an on-going stream is
# being procesed.
# wait_on_aio_task=False,
):
raise err
async def wait_asyncio_complete(self) -> None:
await self._aio_task_complete.wait()
# def cancel_asyncio_task(self) -> None:
# self._aio_task.cancel()
def cancel_asyncio_task(
self,
msg: str = '',
) -> None:
self._aio_task.cancel(
msg=msg,
)
async def send(self, item: Any) -> None:
'''
@ -155,7 +174,6 @@ class LinkedTaskChannel(trio.abc.Channel):
def _run_asyncio_task(
func: Callable,
*,
qsize: int = 1,
@ -165,8 +183,9 @@ def _run_asyncio_task(
) -> LinkedTaskChannel:
'''
Run an ``asyncio`` async function or generator in a task, return
or stream the result back to the caller `trio.lowleve.Task`.
Run an `asyncio`-compat async function or generator in a task,
return or stream the result back to the caller
`trio.lowleve.Task`.
'''
__tracebackhide__: bool = hide_tb
@ -204,23 +223,23 @@ def _run_asyncio_task(
aio_err: BaseException|None = None
chan = LinkedTaskChannel(
aio_q, # asyncio.Queue
from_aio, # recv chan
to_trio, # send chan
cancel_scope,
aio_task_complete,
_to_aio=aio_q, # asyncio.Queue
_from_aio=from_aio, # recv chan
_to_trio=to_trio, # send chan
_trio_cs=cancel_scope,
_aio_task_complete=aio_task_complete,
)
async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
aio_task_complete: trio.Event,
) -> None:
'''
Await ``coro`` and relay result back to ``trio``.
Await `coro` and relay result back to `trio`.
This can only be run as an `asyncio.Task`!
'''
nonlocal aio_err
@ -243,8 +262,10 @@ def _run_asyncio_task(
else:
if (
result != orig and
aio_err is None and
result != orig
and
aio_err is None
and
# in the `open_channel_from()` case we don't
# relay through the "return value".
@ -260,12 +281,21 @@ def _run_asyncio_task(
# a ``trio.EndOfChannel`` to the trio (consumer) side.
to_trio.close()
# import pdbp; pdbp.set_trace()
aio_task_complete.set()
log.runtime(f'`asyncio` task: {task.get_name()} is complete')
# await asyncio.sleep(0.1)
log.info(
f'`asyncio` task terminated\n'
f'x)>\n'
f' |_{task}\n'
)
# start the asyncio task we submitted from trio
if not inspect.isawaitable(coro):
raise TypeError(f"No support for invoking {coro}")
raise TypeError(
f'Pass the async-fn NOT a coroutine\n'
f'{coro!r}'
)
task: asyncio.Task = asyncio.create_task(
wait_on_coro_final_result(
@ -289,6 +319,10 @@ def _run_asyncio_task(
raise_not_found=False,
))
):
log.info(
f'Bestowing `greenback` portal for `asyncio`-task\n'
f'{task}\n'
)
greenback.bestow_portal(task)
def cancel_trio(task: asyncio.Task) -> None:
@ -304,11 +338,22 @@ def _run_asyncio_task(
# task exceptions
try:
res: Any = task.result()
log.info(
'`trio` received final result from {task}\n'
f'|_{res}\n'
)
except BaseException as terr:
task_err: BaseException = terr
# read again AFTER the `asyncio` side errors in case
# it was cancelled due to an error from `trio` (or
# some other out of band exc).
aio_err: BaseException|None = chan._aio_err
msg: str = (
'Infected `asyncio` task {etype_str}\n'
'`trio`-side reports that the `asyncio`-side '
'{etype_str}\n'
# ^NOTE filled in below
)
if isinstance(terr, CancelledError):
msg += (
@ -327,17 +372,18 @@ def _run_asyncio_task(
msg.format(etype_str='errored')
)
assert type(terr) is type(aio_err), (
'`asyncio` task error mismatch?!?'
)
assert (
type(terr) is type(aio_err)
), '`asyncio` task error mismatch?!?'
if aio_err is not None:
# import pdbp; pdbp.set_trace()
# XXX: uhh is this true?
# assert task_err, f'Asyncio task {task.get_name()} discrepancy!?'
# NOTE: currently mem chan closure may act as a form
# of error relay (at least in the ``asyncio.CancelledError``
# case) since we have no way to directly trigger a ``trio``
# of error relay (at least in the `asyncio.CancelledError`
# case) since we have no way to directly trigger a `trio`
# task error without creating a nursery to throw one.
# We might want to change this in the future though.
from_aio.close()
@ -359,29 +405,25 @@ def _run_asyncio_task(
# )
# raise aio_err from task_err
# XXX: if not already, alway cancel the scope
# on a task error in case the trio task is blocking on
# XXX: if not already, alway cancel the scope on a task
# error in case the trio task is blocking on
# a checkpoint.
cancel_scope.cancel()
if (
task_err
and
aio_err is not task_err
not cancel_scope.cancelled_caught
or
not cancel_scope.cancel_called
):
raise aio_err from task_err
# import pdbp; pdbp.set_trace()
cancel_scope.cancel()
# raise any `asyncio` side error.
raise aio_err
log.info(
'`trio` received final result from {task}\n'
f'|_{res}\n'
)
# TODO: do we need this?
# if task_err:
# cancel_scope.cancel()
# raise task_err
if task_err:
# XXX raise any `asyncio` side error IFF it doesn't
# match the one we just caught from the task above!
# (that would indicate something weird/very-wrong
# going on?)
if aio_err is not task_err:
# import pdbp; pdbp.set_trace()
raise aio_err from task_err
task.add_done_callback(cancel_trio)
return chan
@ -389,13 +431,18 @@ def _run_asyncio_task(
@acm
async def translate_aio_errors(
chan: LinkedTaskChannel,
wait_on_aio_task: bool = False,
cancel_aio_task_on_trio_exit: bool = True,
) -> AsyncIterator[None]:
'''
Error handling context around ``asyncio`` task spawns which
An error handling to cross-loop propagation context around
`asyncio.Task` spawns via one of this module's APIs:
- `open_channel_from()`
- `run_task()`
appropriately translates errors and cancels into ``trio`` land.
'''
@ -403,88 +450,204 @@ async def translate_aio_errors(
aio_err: BaseException|None = None
# TODO: make thisi a channel method?
def maybe_raise_aio_err(
err: Exception|None = None
) -> None:
aio_err = chan._aio_err
if (
aio_err is not None
and
# not isinstance(aio_err, CancelledError)
type(aio_err) != CancelledError
):
# always raise from any captured asyncio error
if err:
raise aio_err from err
else:
raise aio_err
task = chan._aio_task
assert task
aio_task: asyncio.Task = chan._aio_task
assert aio_task
trio_err: BaseException|None = None
try:
yield
yield # back to one of the cross-loop apis
except (
trio.Cancelled,
):
# relay cancel through to called ``asyncio`` task
) as _trio_err:
trio_err = _trio_err
assert chan._aio_task
chan._aio_task.cancel(
msg=f'the `trio` caller task was cancelled: {trio_task.name}'
# import pdbp; pdbp.set_trace() # lolevel-debug
# relay cancel through to called ``asyncio`` task
chan._aio_err = AsyncioCancelled(
f'trio`-side cancelled the `asyncio`-side,\n'
f'c)>\n'
f' |_{trio_task}\n\n'
f'{trio_err!r}\n'
)
raise
# XXX NOTE XXX seems like we can get all sorts of unreliable
# behaviour from `asyncio` under various cancellation
# conditions (like SIGINT/kbi) when this is used..
# SO FOR NOW, try to avoid it at most costs!
#
# aio_task.cancel(
# msg=f'the `trio` parent task was cancelled: {trio_task.name}'
# )
# raise
except (
# NOTE: see the note in the ``cancel_trio()`` asyncio task
# NOTE: also see note in the `cancel_trio()` asyncio task
# termination callback
trio.ClosedResourceError,
# trio.BrokenResourceError,
):
) as _trio_err:
trio_err = _trio_err
aio_err = chan._aio_err
# import pdbp; pdbp.set_trace()
# XXX if an underlying `asyncio.CancelledError` triggered
# this channel close, raise our (non-`BaseException`) wrapper
# exception (`AsyncioCancelled`) from that source error.
if (
task.cancelled()
# NOTE, not until it terminates?
aio_task.cancelled()
and
type(aio_err) is CancelledError
):
# if an underlying `asyncio.CancelledError` triggered this
# channel close, raise our (non-``BaseException``) wrapper
# error: ``AsyncioCancelled`` from that source error.
raise AsyncioCancelled(
f'Task cancelled\n'
f'|_{task}\n'
f'asyncio`-side cancelled the `trio`-side,\n'
f'c(>\n'
f' |_{aio_task}\n\n'
f'{trio_err!r}\n'
) from aio_err
else:
raise
finally:
except BaseException as _trio_err:
trio_err = _trio_err
log.exception(
'`trio`-side task errored?'
)
entered: bool = await _debug._maybe_enter_pm(
trio_err,
api_frame=inspect.currentframe(),
)
if (
# NOTE: always cancel the ``asyncio`` task if we've made it
# this far and it's not done.
not task.done() and aio_err
not entered
and
not is_multi_cancelled(trio_err)
):
log.exception('actor crashed\n')
aio_taskc = AsyncioCancelled(
f'`trio`-side task errored!\n'
f'{trio_err}'
) #from trio_err
try:
aio_task.set_exception(aio_taskc)
except (
asyncio.InvalidStateError,
RuntimeError,
# ^XXX, uhh bc apparently we can't use `.set_exception()`
# any more XD .. ??
):
wait_on_aio_task = False
# import pdbp; pdbp.set_trace()
# raise aio_taskc from trio_err
finally:
# record wtv `trio`-side error transpired
chan._trio_err = trio_err
# NOTE! by default always cancel the `asyncio` task if
# we've made it this far and it's not done.
# TODO, how to detect if there's an out-of-band error that
# caused the exit?
if (
cancel_aio_task_on_trio_exit
and
not aio_task.done()
and
aio_err
# or the trio side has exited it's surrounding cancel scope
# indicating the lifetime of the ``asyncio``-side task
# should also be terminated.
or chan._trio_exited
):
log.runtime(
f'Cancelling `asyncio`-task: {task.get_name()}'
or (
chan._trio_exited
and
not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man..
)
# assert not aio_err, 'WTF how did asyncio do this?!'
task.cancel()
):
# pass
msg: str = (
f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n'
f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n'
# Required to sync with the far end ``asyncio``-task to ensure
f'trio-side exited silently!'
)
# TODO XXX, figure out the case where calling this makes the
# `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits`
# hang and then don't call it in that case!
#
aio_task.cancel(msg=msg)
log.warning(msg)
# assert not aio_err, 'WTF how did asyncio do this?!'
# import pdbp; pdbp.set_trace()
# Required to sync with the far end `asyncio`-task to ensure
# any error is captured (via monkeypatching the
# ``channel._aio_err``) before calling ``maybe_raise_aio_err()``
# `channel._aio_err`) before calling ``maybe_raise_aio_err()``
# below!
#
# XXX NOTE XXX the `task.set_exception(aio_taskc)` call above
# MUST NOT EXCEPT or this WILL HANG!!
#
# so if you get a hang maybe step through and figure out why
# it erroed out up there!
#
if wait_on_aio_task:
# await chan.wait_asyncio_complete()
await chan._aio_task_complete.wait()
log.info(
'asyncio-task is done and unblocked trio-side!\n'
)
# TODO?
# -[ ] make this a channel method, OR
# -[ ] just put back inline below?
#
def maybe_raise_aio_side_err(
trio_err: Exception,
) -> None:
'''
Raise any `trio`-side-caused cancellation or legit task
error normally propagated from the caller of either,
- `open_channel_from()`
- `run_task()`
'''
aio_err: BaseException|None = chan._aio_err
# Check if the asyncio-side is the cause of the trio-side
# error.
if (
aio_err is not None
and
type(aio_err) is not AsyncioCancelled
# not isinstance(aio_err, CancelledError)
# type(aio_err) is not CancelledError
):
# always raise from any captured asyncio error
if trio_err:
raise trio_err from aio_err
raise aio_err
if trio_err:
raise trio_err
# NOTE: if any ``asyncio`` error was caught, raise it here inline
# here in the ``trio`` task
maybe_raise_aio_err()
# if trio_err:
maybe_raise_aio_side_err(
trio_err=trio_err
)
async def run_task(
@ -496,8 +659,8 @@ async def run_task(
) -> Any:
'''
Run an `asyncio` async function or generator in a task, return
or stream the result back to `trio`.
Run an `asyncio`-compat async function or generator in a task,
return or stream the result back to `trio`.
'''
# simple async func
@ -537,6 +700,7 @@ async def open_channel_from(
provide_channels=True,
**kwargs,
)
# TODO, tuple form here?
async with chan._from_aio:
async with translate_aio_errors(
chan,
@ -685,18 +849,21 @@ def run_as_asyncio_guest(
# Uh, oh.
#
# :o
# It looks like your event loop has caught a case of the ``trio``s.
# :()
# Don't worry, we've heard you'll barely notice. You might
# hallucinate a few more propagating errors and feel like your
# digestion has slowed but if anything get's too bad your parents
# will know about it.
#
# looks like your stdlib event loop has caught a case of "the trios" !
#
# :O
#
# Don't worry, we've heard you'll barely notice.
#
# :)
#
# You might hallucinate a few more propagating errors and feel
# like your digestion has slowed, but if anything get's too bad
# your parents will know about it.
#
# B)
#
async def aio_main(trio_main):
'''
Main `asyncio.Task` which calls
@ -713,16 +880,20 @@ def run_as_asyncio_guest(
'-> built a `trio`-done future\n'
)
# TODO: shoudn't this be done in the guest-run trio task?
# if debug_mode():
# # XXX make it obvi we know this isn't supported yet!
# log.error(
# 'Attempting to enter unsupported `greenback` init '
# 'from `asyncio` task..'
# )
# await _debug.maybe_init_greenback(
# force_reload=True,
# )
# TODO: is this evern run or needed?
# -[ ] pretty sure it never gets run for root-infected-aio
# since this main task is always the parent of any
# eventual `open_root_actor()` call?
if debug_mode():
log.error(
'Attempting to enter non-required `greenback` init '
'from `asyncio` task ???'
)
# XXX make it obvi we know this isn't supported yet!
assert 0
# await _debug.maybe_init_greenback(
# force_reload=True,
# )
def trio_done_callback(main_outcome):
log.runtime(
@ -732,6 +903,7 @@ def run_as_asyncio_guest(
)
if isinstance(main_outcome, Error):
# import pdbp; pdbp.set_trace()
error: BaseException = main_outcome.error
# show an dedicated `asyncio`-side tb from the error
@ -751,7 +923,7 @@ def run_as_asyncio_guest(
trio_done_fute.set_result(main_outcome)
log.info(
f'`trio` guest-run finished with outcome\n'
f'`trio` guest-run finished with,\n'
f')>\n'
f'|_{trio_done_fute}\n'
)
@ -777,9 +949,20 @@ def run_as_asyncio_guest(
done_callback=trio_done_callback,
)
fute_err: BaseException|None = None
try:
out: Outcome = await asyncio.shield(trio_done_fute)
# ^TODO still don't really understand why the `.shield()`
# is required ... ??
# https://docs.python.org/3/library/asyncio-task.html#asyncio.shield
# ^ seems as though in combo with the try/except here
# we're BOLDLY INGORING cancel of the trio fute?
#
# I guess it makes sense bc we don't want `asyncio` to
# cancel trio just because they can't handle SIGINT
# sanely? XD .. kk
# XXX, sin-shield causes guest-run abandons on SIGINT..
# out: Outcome = await trio_done_fute
# NOTE will raise (via `Error.unwrap()`) from any
# exception packed into the guest-run's `main_outcome`.
@ -802,27 +985,32 @@ def run_as_asyncio_guest(
fute_err = _fute_err
err_message: str = (
'main `asyncio` task '
'was cancelled!\n'
)
if isinstance(fute_err, asyncio.CancelledError):
err_message += 'was cancelled!\n'
else:
err_message += f'errored with {out.error!r}\n'
# TODO, handle possible edge cases with
# `open_root_actor()` closing before this is run!
#
actor: tractor.Actor = tractor.current_actor()
log.exception(
err_message
+
'Cancelling `trio`-side `tractor`-runtime..\n'
f'c)>\n'
f'c(>\n'
f' |_{actor}.cancel_soon()\n'
)
# XXX WARNING XXX the next LOCs are super important, since
# without them, we can get guest-run abandonment cases
# where `asyncio` will not schedule or wait on the `trio`
# guest-run task before final shutdown! This is
# particularly true if the `trio` side has tasks doing
# shielded work when a SIGINT condition occurs.
# XXX WARNING XXX the next LOCs are super important!
#
# SINCE without them, we can get guest-run ABANDONMENT
# cases where `asyncio` will not schedule or wait on the
# guest-run `trio.Task` nor invoke its registered
# `trio_done_callback()` before final shutdown!
#
# This is particularly true if the `trio` side has tasks
# in shielded sections when an OC-cancel (SIGINT)
# condition occurs!
#
# We now have the
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
@ -886,7 +1074,10 @@ def run_as_asyncio_guest(
try:
return trio_done_fute.result()
except asyncio.exceptions.InvalidStateError as state_err:
except (
asyncio.InvalidStateError,
# asyncio.CancelledError,
)as state_err:
# XXX be super dupere noisy about abandonment issues!
aio_task: asyncio.Task = asyncio.current_task()