Hm, `asyncio.Task._fut_waiter.set_exception()`?

Since we can't use it to `Task.set_exception()` (since that task method never
seems to work.. XD) and setting the private/internal always seems to do
the desired raising in the task? I realize it's an internal `asyncio`
runtime field but i'd rather take the risk of it breaking then having to
rely on our own equivalent hack..

Also, it seems like the case where the task's associated (and internal)
future-waiter field is null, we won't run into the (same?) prior hanging
issues (maybe since there's nothing for `asyncio` internals to use to
wait XD ??) when `Task.cancel()` is used..??

Main deats,
- add and `Future.set_exception()` a new signal-exception
  `class TrioTaskExited(AsyncioCancelled):` whenever the trio-task exits
  gracefully and the asyncio-side task is still doing blocking work (of
  some sort) which *seem to* be predicated by a check that
  `._fut_waiter is not None`.
- always call `asyncio.Queue.shutdown()` for the same^ as well as
  whenever we decide to call `Task.cancel()`; in that case the shutdown
  relays correctly?

Some further refinements,
- only warn about `Task.cancel()` usage when actually used ;)
- more local scope vars setting in the exit phase of
  `translate_aio_errors()`.
- also in ^ use explicit caught-exc var names for each error-type.
hilevel_serman
Tyler Goodlet 2025-01-02 15:35:36 -05:00
parent 7b8a8dcc7c
commit 89fc072ca0
1 changed files with 97 additions and 33 deletions

View File

@ -342,20 +342,29 @@ def _run_asyncio_task(
'`trio` received final result from {task}\n' '`trio` received final result from {task}\n'
f'|_{res}\n' f'|_{res}\n'
) )
except BaseException as terr: except BaseException as _aio_err:
task_err: BaseException = terr task_err: BaseException = _aio_err
# read again AFTER the `asyncio` side errors in case # read again AFTER the `asyncio` side errors in case
# it was cancelled due to an error from `trio` (or # it was cancelled due to an error from `trio` (or
# some other out of band exc). # some other out of band exc).
aio_err: BaseException|None = chan._aio_err aio_err: BaseException|None = chan._aio_err
# always true right?
assert (
type(_aio_err) is type(aio_err)
), (
f'`asyncio`-side task errors mismatch?!?\n\n'
f'caught: {_aio_err}\n'
f'chan._aio_err: {aio_err}\n'
)
msg: str = ( msg: str = (
'`trio`-side reports that the `asyncio`-side ' '`trio`-side reports that the `asyncio`-side '
'{etype_str}\n' '{etype_str}\n'
# ^NOTE filled in below # ^NOTE filled in below
) )
if isinstance(terr, CancelledError): if isinstance(_aio_err, CancelledError):
msg += ( msg += (
f'c)>\n' f'c)>\n'
f' |_{task}\n' f' |_{task}\n'
@ -372,9 +381,6 @@ def _run_asyncio_task(
msg.format(etype_str='errored') msg.format(etype_str='errored')
) )
assert (
type(terr) is type(aio_err)
), '`asyncio` task error mismatch?!?'
if aio_err is not None: if aio_err is not None:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
@ -394,7 +400,7 @@ def _run_asyncio_task(
# aio_err.with_traceback(aio_err.__traceback__) # aio_err.with_traceback(aio_err.__traceback__)
# TODO: show when cancellation originated # TODO: show when cancellation originated
# from each side more pedantically? # from each side more pedantically in log-msg?
# elif ( # elif (
# type(aio_err) is CancelledError # type(aio_err) is CancelledError
# and # trio was the cause? # and # trio was the cause?
@ -429,6 +435,19 @@ def _run_asyncio_task(
return chan return chan
class TrioTaskExited(AsyncioCancelled):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
This is very similar to how `trio.ClosedResource` acts as
a "clean shutdown" signal to the consumer side of a mem-chan,
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
'''
@acm @acm
async def translate_aio_errors( async def translate_aio_errors(
chan: LinkedTaskChannel, chan: LinkedTaskChannel,
@ -455,10 +474,11 @@ async def translate_aio_errors(
trio_err: BaseException|None = None trio_err: BaseException|None = None
try: try:
yield # back to one of the cross-loop apis yield # back to one of the cross-loop apis
except ( except trio.Cancelled as taskc:
trio.Cancelled, trio_err = taskc
) as _trio_err:
trio_err = _trio_err # should NEVER be the case that `trio` is cancel-handling
# BEFORE the other side's task-ref was set!?
assert chan._aio_task assert chan._aio_task
# import pdbp; pdbp.set_trace() # lolevel-debug # import pdbp; pdbp.set_trace() # lolevel-debug
@ -483,14 +503,13 @@ async def translate_aio_errors(
# ) # )
# raise # raise
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
# task-done-callback.
except ( except (
# NOTE: also see note in the `cancel_trio()` asyncio task
# termination callback
trio.ClosedResourceError, trio.ClosedResourceError,
# trio.BrokenResourceError, # trio.BrokenResourceError,
) as cre:
) as _trio_err: trio_err = cre
trio_err = _trio_err
aio_err = chan._aio_err aio_err = chan._aio_err
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
@ -498,10 +517,21 @@ async def translate_aio_errors(
# this channel close, raise our (non-`BaseException`) wrapper # this channel close, raise our (non-`BaseException`) wrapper
# exception (`AsyncioCancelled`) from that source error. # exception (`AsyncioCancelled`) from that source error.
if ( if (
# NOTE, not until it terminates? # aio-side is cancelled?
aio_task.cancelled() aio_task.cancelled() # not set until it terminates??
and and
type(aio_err) is CancelledError type(aio_err) is CancelledError
# TODO, if we want suppression of the
# silent-exit-by-`trio` case?
# -[ ] the parent task can also just catch it though?
# -[ ] OR, offer a `signal_aio_side_on_exit=True` ??
#
# or
# aio_err is None
# and
# chan._trio_exited
): ):
raise AsyncioCancelled( raise AsyncioCancelled(
f'asyncio`-side cancelled the `trio`-side,\n' f'asyncio`-side cancelled the `trio`-side,\n'
@ -511,6 +541,7 @@ async def translate_aio_errors(
f'{trio_err!r}\n' f'{trio_err!r}\n'
) from aio_err ) from aio_err
# maybe the chan-closure is due to something else?
else: else:
raise raise
@ -552,6 +583,7 @@ async def translate_aio_errors(
finally: finally:
# record wtv `trio`-side error transpired # record wtv `trio`-side error transpired
chan._trio_err = trio_err chan._trio_err = trio_err
ya_trio_exited: bool = chan._trio_exited
# NOTE! by default always cancel the `asyncio` task if # NOTE! by default always cancel the `asyncio` task if
# we've made it this far and it's not done. # we've made it this far and it's not done.
@ -568,26 +600,56 @@ async def translate_aio_errors(
# indicating the lifetime of the ``asyncio``-side task # indicating the lifetime of the ``asyncio``-side task
# should also be terminated. # should also be terminated.
or ( or (
chan._trio_exited ya_trio_exited
and and
not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man.. not chan._trio_err # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man..
) )
): ):
# pass report: str = (
msg: str = ( 'trio-side exited silently!'
)
assert not aio_err, 'WTF how did asyncio do this?!'
# if the `trio.Task` already exited the `open_channel_from()`
# block we ensure the asyncio-side gets signalled via an
# explicit exception and its `Queue` is shutdown.
if ya_trio_exited:
chan._to_aio.shutdown()
# pump the other side's task? needed?
await trio.lowlevel.checkpoint()
if (
not chan._trio_err
and
(fut := aio_task._fut_waiter)
):
fut.set_exception(
TrioTaskExited(
f'The peer `asyncio` task is still blocking/running?\n'
f'>>\n'
f'|_{aio_task!r}\n'
)
)
else:
# from tractor._state import is_root_process
# if is_root_process():
# breakpoint()
# import pdbp; pdbp.set_trace()
aio_taskc_warn: str = (
f'\n'
f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n' f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n'
f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n' f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n'
f'trio-side exited silently!'
) )
report += aio_taskc_warn
# TODO XXX, figure out the case where calling this makes the # TODO XXX, figure out the case where calling this makes the
# `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits` # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits`
# hang and then don't call it in that case! # hang and then don't call it in that case!
# #
aio_task.cancel(msg=msg) aio_task.cancel(msg=aio_taskc_warn)
log.warning(msg)
# assert not aio_err, 'WTF how did asyncio do this?!' log.warning(report)
# import pdbp; pdbp.set_trace()
# Required to sync with the far end `asyncio`-task to ensure # Required to sync with the far end `asyncio`-task to ensure
# any error is captured (via monkeypatching the # any error is captured (via monkeypatching the
@ -1077,6 +1139,8 @@ def run_as_asyncio_guest(
except ( except (
asyncio.InvalidStateError, asyncio.InvalidStateError,
# asyncio.CancelledError, # asyncio.CancelledError,
# ^^XXX `.shield()` call above prevents this??
)as state_err: )as state_err:
# XXX be super dupere noisy about abandonment issues! # XXX be super dupere noisy about abandonment issues!