Solve our abandonment issues..
To make the recent set of tests pass this (hopefully) finally solves all `asyncio` embedded `trio` guest-run abandonment by ensuring we "pump the event loop" until the guest-run future is fully complete. Accomplished via simple poll loop of the form `while not trio_done_fut.done(): await asyncio.sleep(.1)` in the `aio_main()` task's exception teardown sequence. The loop does a naive 10ms "pump-via-sleep & poll" for the `trio` side to complete before finally exiting (and presumably raising) from the SIGINT cancellation. Other related cleanups and refinements: - use `asyncio.Task.result()` inside `cancel_trio()` since it also inline-raises any exception outcome and we can also log-report the result in non-error cases. - comment out buncha not-sure-we-need-it stuff in `cancel_trio()`. - remove the botched `AsyncioCancelled(CancelledError):` idea obvi XD - comment `greenback` init for now in `aio_main()` since (pretty sure) we don't ever want to actually REPL in that specific func-as-task? - always capture any `fute_err: BaseException` from the `main_outcome: Outcome` delivered by the `trio` side guest-run task. - add and raise a new super noisy `AsyncioRuntimeTranslationError` whenever we detect that the guest-run `trio_done_fut` has not completed before task exit; should avoid abandonment issues ever happening again without knowing!aio_abandons
parent
268bd0d8ec
commit
9133f42b07
|
@ -33,11 +33,12 @@ from typing import (
|
|||
)
|
||||
|
||||
import tractor
|
||||
from tractor._exceptions import AsyncioCancelled
|
||||
from tractor._state import (
|
||||
debug_mode,
|
||||
)
|
||||
from tractor.log import get_logger
|
||||
from tractor.devx import _debug
|
||||
from tractor.log import get_logger
|
||||
from tractor.trionics._broadcast import (
|
||||
broadcast_receiver,
|
||||
BroadcastReceiver,
|
||||
|
@ -51,7 +52,10 @@ from outcome import (
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
__all__ = ['run_task', 'run_as_asyncio_guest']
|
||||
__all__ = [
|
||||
'run_task',
|
||||
'run_as_asyncio_guest',
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -155,15 +159,16 @@ def _run_asyncio_task(
|
|||
*,
|
||||
qsize: int = 1,
|
||||
provide_channels: bool = False,
|
||||
hide_tb: bool = False,
|
||||
**kwargs,
|
||||
|
||||
) -> LinkedTaskChannel:
|
||||
'''
|
||||
Run an ``asyncio`` async function or generator in a task, return
|
||||
or stream the result back to ``trio``.
|
||||
or stream the result back to the caller `trio.lowleve.Task`.
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
__tracebackhide__: bool = hide_tb
|
||||
if not tractor.current_actor().is_infected_aio():
|
||||
raise RuntimeError(
|
||||
"`infect_asyncio` mode is not enabled!?"
|
||||
|
@ -224,6 +229,7 @@ def _run_asyncio_task(
|
|||
try:
|
||||
result = await coro
|
||||
except BaseException as aio_err:
|
||||
chan._aio_err = aio_err
|
||||
if isinstance(aio_err, CancelledError):
|
||||
log.runtime(
|
||||
'`asyncio` task was cancelled..\n'
|
||||
|
@ -232,7 +238,6 @@ def _run_asyncio_task(
|
|||
log.exception(
|
||||
'`asyncio` task errored\n'
|
||||
)
|
||||
chan._aio_err = aio_err
|
||||
raise
|
||||
|
||||
else:
|
||||
|
@ -268,7 +273,7 @@ def _run_asyncio_task(
|
|||
aio_task_complete
|
||||
)
|
||||
)
|
||||
chan._aio_task = task
|
||||
chan._aio_task: asyncio.Task = task
|
||||
|
||||
# XXX TODO XXX get this actually workin.. XD
|
||||
# maybe setup `greenback` for `asyncio`-side task REPLing
|
||||
|
@ -284,19 +289,19 @@ def _run_asyncio_task(
|
|||
|
||||
def cancel_trio(task: asyncio.Task) -> None:
|
||||
'''
|
||||
Cancel the calling ``trio`` task on error.
|
||||
Cancel the calling `trio` task on error.
|
||||
|
||||
'''
|
||||
nonlocal chan
|
||||
aio_err = chan._aio_err
|
||||
aio_err: BaseException|None = chan._aio_err
|
||||
task_err: BaseException|None = None
|
||||
|
||||
# only to avoid ``asyncio`` complaining about uncaptured
|
||||
# only to avoid `asyncio` complaining about uncaptured
|
||||
# task exceptions
|
||||
try:
|
||||
task.exception()
|
||||
res: Any = task.result()
|
||||
except BaseException as terr:
|
||||
task_err = terr
|
||||
task_err: BaseException = terr
|
||||
|
||||
msg: str = (
|
||||
'Infected `asyncio` task {etype_str}\n'
|
||||
|
@ -328,42 +333,49 @@ def _run_asyncio_task(
|
|||
|
||||
if task_err is None:
|
||||
assert aio_err
|
||||
aio_err.with_traceback(aio_err.__traceback__)
|
||||
# log.error(
|
||||
# 'infected task errorred'
|
||||
# )
|
||||
# wait, wut?
|
||||
# aio_err.with_traceback(aio_err.__traceback__)
|
||||
|
||||
# TODO: show that the cancellation originated
|
||||
# from the ``trio`` side? right?
|
||||
# elif type(aio_err) is CancelledError:
|
||||
# TODO: show when cancellation originated
|
||||
# from each side more pedantically?
|
||||
# elif (
|
||||
# type(aio_err) is CancelledError
|
||||
# and # trio was the cause?
|
||||
# cancel_scope.cancel_called
|
||||
# ):
|
||||
# log.cancel(
|
||||
# 'infected task was cancelled'
|
||||
# 'infected task was cancelled by `trio`-side'
|
||||
# )
|
||||
# raise aio_err from task_err
|
||||
|
||||
# if cancel_scope.cancelled:
|
||||
# raise aio_err from err
|
||||
|
||||
# XXX: alway cancel the scope on error
|
||||
# in case the trio task is blocking
|
||||
# on a checkpoint.
|
||||
# XXX: if not already, alway cancel the scope
|
||||
# on a task error in case the trio task is blocking on
|
||||
# a checkpoint.
|
||||
cancel_scope.cancel()
|
||||
|
||||
if (
|
||||
task_err
|
||||
and
|
||||
aio_err is not task_err
|
||||
):
|
||||
raise aio_err from task_err
|
||||
|
||||
# raise any `asyncio` side error.
|
||||
raise aio_err
|
||||
|
||||
log.info(
|
||||
'`trio` received final result from {task}\n'
|
||||
f'|_{res}\n'
|
||||
)
|
||||
# TODO: do we need this?
|
||||
# if task_err:
|
||||
# cancel_scope.cancel()
|
||||
# raise task_err
|
||||
|
||||
task.add_done_callback(cancel_trio)
|
||||
return chan
|
||||
|
||||
|
||||
class AsyncioCancelled(CancelledError):
|
||||
'''
|
||||
Asyncio cancelled translation (non-base) error
|
||||
for use with the ``to_asyncio`` module
|
||||
to be raised in the ``trio`` side task
|
||||
|
||||
'''
|
||||
|
||||
|
||||
@acm
|
||||
async def translate_aio_errors(
|
||||
|
||||
|
@ -386,7 +398,9 @@ async def translate_aio_errors(
|
|||
) -> None:
|
||||
aio_err = chan._aio_err
|
||||
if (
|
||||
aio_err is not None and
|
||||
aio_err is not None
|
||||
and
|
||||
# not isinstance(aio_err, CancelledError)
|
||||
type(aio_err) != CancelledError
|
||||
):
|
||||
# always raise from any captured asyncio error
|
||||
|
@ -418,13 +432,17 @@ async def translate_aio_errors(
|
|||
):
|
||||
aio_err = chan._aio_err
|
||||
if (
|
||||
task.cancelled() and
|
||||
task.cancelled()
|
||||
and
|
||||
type(aio_err) is CancelledError
|
||||
):
|
||||
# if an underlying ``asyncio.CancelledError`` triggered this
|
||||
# if an underlying `asyncio.CancelledError` triggered this
|
||||
# channel close, raise our (non-``BaseException``) wrapper
|
||||
# error: ``AsyncioCancelled`` from that source error.
|
||||
raise AsyncioCancelled from aio_err
|
||||
raise AsyncioCancelled(
|
||||
f'Task cancelled\n'
|
||||
f'|_{task}\n'
|
||||
) from aio_err
|
||||
|
||||
else:
|
||||
raise
|
||||
|
@ -467,8 +485,8 @@ async def run_task(
|
|||
|
||||
) -> Any:
|
||||
'''
|
||||
Run an ``asyncio`` async function or generator in a task, return
|
||||
or stream the result back to ``trio``.
|
||||
Run an `asyncio` async function or generator in a task, return
|
||||
or stream the result back to `trio`.
|
||||
|
||||
'''
|
||||
# simple async func
|
||||
|
@ -526,10 +544,27 @@ async def open_channel_from(
|
|||
chan._to_trio.close()
|
||||
|
||||
|
||||
class AsyncioRuntimeTranslationError(RuntimeError):
|
||||
'''
|
||||
We failed to correctly relay runtime semantics and/or maintain SC
|
||||
supervision rules cross-event-loop.
|
||||
|
||||
'''
|
||||
|
||||
|
||||
def run_as_asyncio_guest(
|
||||
trio_main: Callable,
|
||||
# ^-NOTE-^ when spawned with `infected_aio=True` this func is
|
||||
# normally `Actor._async_main()` as is passed by some boostrap
|
||||
# entrypoint like `._entry._trio_main()`.
|
||||
|
||||
) -> None:
|
||||
# ^-TODO-^ technically whatever `trio_main` returns.. we should
|
||||
# try to use func-typevar-params at leaast by 3.13!
|
||||
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#callback-protocols
|
||||
# -[ ] https://peps.python.org/pep-0646/#using-type-variable-tuples-in-functions
|
||||
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#unpack-for-keyword-arguments
|
||||
# -[ ] https://peps.python.org/pep-0718/
|
||||
'''
|
||||
Entry for an "infected ``asyncio`` actor".
|
||||
|
||||
|
@ -555,7 +590,13 @@ def run_as_asyncio_guest(
|
|||
# :)
|
||||
|
||||
async def aio_main(trio_main):
|
||||
'''
|
||||
Main `asyncio.Task` which calls
|
||||
`trio.lowlevel.start_guest_run()` to "infect" the `asyncio`
|
||||
event-loop by embedding the `trio` scheduler allowing us to
|
||||
boot the `tractor` runtime and connect back to our parent.
|
||||
|
||||
'''
|
||||
loop = asyncio.get_running_loop()
|
||||
trio_done_fut = asyncio.Future()
|
||||
startup_msg: str = (
|
||||
|
@ -564,17 +605,22 @@ def run_as_asyncio_guest(
|
|||
'-> built a `trio`-done future\n'
|
||||
)
|
||||
|
||||
if debug_mode():
|
||||
# XXX make it obvi we know this isn't supported yet!
|
||||
log.error(
|
||||
'Attempting to enter unsupported `greenback` init '
|
||||
'from `asyncio` task..'
|
||||
)
|
||||
await _debug.maybe_init_greenback(
|
||||
force_reload=True,
|
||||
)
|
||||
# TODO: shoudn't this be done in the guest-run trio task?
|
||||
# if debug_mode():
|
||||
# # XXX make it obvi we know this isn't supported yet!
|
||||
# log.error(
|
||||
# 'Attempting to enter unsupported `greenback` init '
|
||||
# 'from `asyncio` task..'
|
||||
# )
|
||||
# await _debug.maybe_init_greenback(
|
||||
# force_reload=True,
|
||||
# )
|
||||
|
||||
def trio_done_callback(main_outcome):
|
||||
log.info(
|
||||
f'trio_main finished with\n'
|
||||
f'|_{main_outcome!r}'
|
||||
)
|
||||
|
||||
if isinstance(main_outcome, Error):
|
||||
error: BaseException = main_outcome.error
|
||||
|
@ -594,7 +640,6 @@ def run_as_asyncio_guest(
|
|||
|
||||
else:
|
||||
trio_done_fut.set_result(main_outcome)
|
||||
log.runtime(f'trio_main finished: {main_outcome!r}')
|
||||
|
||||
startup_msg += (
|
||||
f'-> created {trio_done_callback!r}\n'
|
||||
|
@ -613,26 +658,48 @@ def run_as_asyncio_guest(
|
|||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||
done_callback=trio_done_callback,
|
||||
)
|
||||
fute_err: BaseException|None = None
|
||||
try:
|
||||
# TODO: better SIGINT handling since shielding seems to
|
||||
# make NO DIFFERENCE XD
|
||||
# -[ ] maybe this is due to 3.11's recent SIGINT handling
|
||||
# changes and we can better work with/around it?
|
||||
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
|
||||
out: Outcome = await asyncio.shield(trio_done_fut)
|
||||
# NOTE `Error.unwrap()` will raise
|
||||
|
||||
# NOTE will raise (via `Error.unwrap()`) from any
|
||||
# exception packed into the guest-run's `main_outcome`.
|
||||
return out.unwrap()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
except (
|
||||
# XXX special SIGINT-handling is required since
|
||||
# `asyncio.shield()`-ing seems to NOT handle that case as
|
||||
# per recent changes in 3.11:
|
||||
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
|
||||
#
|
||||
# NOTE: further, apparently ONLY need to handle this
|
||||
# special SIGINT case since all other `asyncio`-side
|
||||
# errors can be processed via our `chan._aio_err`
|
||||
# relaying (right?); SIGINT seems to be totally diff
|
||||
# error path in `asyncio`'s runtime..?
|
||||
asyncio.CancelledError,
|
||||
|
||||
) as fute_err:
|
||||
err_message: str = (
|
||||
'main `asyncio` task '
|
||||
)
|
||||
if isinstance(fute_err, asyncio.CancelledError):
|
||||
err_message += 'was cancelled!\n'
|
||||
else:
|
||||
err_message += f'errored with {out.error!r}\n'
|
||||
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
log.exception(
|
||||
'`asyncio`-side main task was cancelled!\n'
|
||||
'Cancelling actor-runtime..\n'
|
||||
err_message
|
||||
+
|
||||
'Cancelling `trio`-side `tractor`-runtime..\n'
|
||||
f'c)>\n'
|
||||
f' |_{actor}.cancel_soon()\n'
|
||||
|
||||
)
|
||||
|
||||
# TODO: reduce this comment bloc since abandon issues are
|
||||
# now solved?
|
||||
#
|
||||
# XXX NOTE XXX the next LOC is super important!!!
|
||||
# => without it, we can get a guest-run abandonment case
|
||||
# where asyncio will not trigger `trio` in a final event
|
||||
|
@ -681,16 +748,55 @@ def run_as_asyncio_guest(
|
|||
# is apparently a working fix!
|
||||
actor.cancel_soon()
|
||||
|
||||
# XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to
|
||||
# `trio`-guest-run to complete and teardown !!
|
||||
# XXX NOTE XXX pump the `asyncio` event-loop to allow
|
||||
# `trio`-side to `trio`-guest-run to complete and
|
||||
# teardown !!
|
||||
#
|
||||
# XXX WITHOUT THIS the guest-run gets race-conditionally
|
||||
# abandoned by `asyncio`!!
|
||||
# XD XD XD
|
||||
await asyncio.shield(
|
||||
asyncio.sleep(.1) # NOPE! it can't be 0 either XD
|
||||
# *WITHOUT THIS* the guest-run can get race-conditionally abandoned!!
|
||||
# XD
|
||||
#
|
||||
await asyncio.sleep(.1) # `delay` can't be 0 either XD
|
||||
while not trio_done_fut.done():
|
||||
log.runtime(
|
||||
'Waiting on main guest-run `asyncio` task to complete..\n'
|
||||
f'|_trio_done_fut: {trio_done_fut}\n'
|
||||
)
|
||||
raise
|
||||
await asyncio.sleep(.1)
|
||||
|
||||
# XXX: don't actually need the shield.. seems to
|
||||
# make no difference (??) and we know it spawns an
|
||||
# internal task..
|
||||
# await asyncio.shield(asyncio.sleep(.1))
|
||||
|
||||
# XXX alt approach but can block indefinitely..
|
||||
# so don't use?
|
||||
# loop._run_once()
|
||||
|
||||
try:
|
||||
return trio_done_fut.result()
|
||||
except asyncio.exceptions.InvalidStateError as state_err:
|
||||
|
||||
# XXX be super dupere noisy about abandonment issues!
|
||||
aio_task: asyncio.Task = asyncio.current_task()
|
||||
message: str = (
|
||||
'The `asyncio`-side task likely exited before the '
|
||||
'`trio`-side guest-run completed!\n\n'
|
||||
)
|
||||
if fute_err:
|
||||
message += (
|
||||
f'The main {aio_task}\n'
|
||||
f'STOPPED due to {type(fute_err)}\n\n'
|
||||
)
|
||||
|
||||
message += (
|
||||
f'Likely something inside our guest-run-as-task impl is '
|
||||
f'not effectively waiting on the `trio`-side to complete ?!\n'
|
||||
f'This code -> {aio_main!r}\n\n'
|
||||
|
||||
'Below you will likely see a '
|
||||
'"RuntimeWarning: Trio guest run got abandoned.." !!\n'
|
||||
)
|
||||
raise AsyncioRuntimeTranslationError(message) from state_err
|
||||
|
||||
# might as well if it's installed.
|
||||
try:
|
||||
|
@ -698,7 +804,7 @@ def run_as_asyncio_guest(
|
|||
loop = uvloop.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
except ImportError:
|
||||
pass
|
||||
log.runtime('`uvloop` not available..')
|
||||
|
||||
return asyncio.run(
|
||||
aio_main(trio_main),
|
||||
|
|
Loading…
Reference in New Issue