Hack `asyncio` to not abandon a guest-mode run?

Took me a while to figure out what the heck was going on but, turns out
`asyncio` changed their SIGINT handling in 3.11 as per:

https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption

I'm not entirely sure if it's the 3.11 changes or possibly wtv further
updates were made in 3.12  but more or less due to the way
our current main task was written the `trio` guest-run was getting
abandoned on SIGINTs sent from the OS to the infected child proc..

Note that much of the bug and soln cases are layed out in very detailed
comment-notes both in the new test and `run_as_asyncio_guest()`, right
above the final "fix" lines.

Add new `test_infected_aio.test_sigint_closes_lifetime_stack()` test suite
which reliably triggers all abandonment issues with multiple cases
of different parent behaviour post-sending-SIGINT-to-child:
 1. briefly sleep then raise a KBI in the parent which was originally
    demonstrating the file leak not being cleaned up by `Actor.lifetime_stack.close()`
    and simulates a ctl-c from the console (relayed in tandem by
    the OS to the parent and child processes).
 2. do `Context.wait_for_result()` on the child context which would
    hang and timeout since the actor runtime would never complete and
    thus never relay a `ContextCancelled`.
 3. both with and without running a `asyncio` task in the `manage_file`
    child actor; originally it seemed that with an aio task scheduled in
    the child actor the guest-run abandonment always was the "loud" case
    where there seemed to be some actor teardown but with tbs from
    python failing to gracefully exit the `trio` runtime..

The (seemingly working) "fix" required 2 lines of code to be run inside
a `asyncio.CancelledError` handler around the call to `await trio_done_fut`:
- `Actor.cancel_soon()` which schedules the actor runtime to cancel on
  the next `trio` runner cycle and results in a "self cancellation" of
  the actor.
- "pumping the `asyncio` event loop with a non-0 `.sleep(0.1)` XD
 |_ seems that a "shielded" pump with some actual `delay: float >= 0`
   did the trick to get `asyncio` to allow the `trio` runner/loop to
   fully complete its guest-run without abandonment.

Other supporting changes:
- move `._exceptions.AsyncioCancelled`, our renamed
  `asyncio.CancelledError` error-sub-type-wrapper, to `.to_asyncio` and make
  it derive from `CancelledError` so as to be sure when raised by our
  `asyncio` x-> `trio` exception relay machinery that `asyncio` is
  getting the specific type it expects during cancellation.
- do "summary status" style logging in `run_as_asyncio_guest()` wherein
  we compile the eventual `startup_msg: str` emitted just before waiting
  on the `trio_done_fut`.
- shield-wait with `out: Outcome = await asyncio.shield(trio_done_fut)`
  even though it seems to do nothing in the SIGINT handling case..(I
  presume it might help avoid abandonment in a `asyncio.Task.cancel()`
  case maybe?)
Tyler Goodlet 2024-06-24 13:52:19 -04:00
parent 3d12a7e005
commit 5ed30dec40
3 changed files with 344 additions and 47 deletions

View File

@ -2,16 +2,25 @@
The hipster way to force SC onto the stdlib's "async": 'infection mode'. The hipster way to force SC onto the stdlib's "async": 'infection mode'.
''' '''
from typing import Optional, Iterable, Union
import asyncio import asyncio
import builtins import builtins
from contextlib import ExitStack
import itertools import itertools
import importlib import importlib
import os
from pathlib import Path
import signal
from typing import (
Callable,
Iterable,
Union,
)
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor import ( from tractor import (
current_actor,
to_asyncio, to_asyncio,
RemoteActorError, RemoteActorError,
ContextCancelled, ContextCancelled,
@ -25,8 +34,8 @@ async def sleep_and_err(
# just signature placeholders for compat with # just signature placeholders for compat with
# ``to_asyncio.open_channel_from()`` # ``to_asyncio.open_channel_from()``
to_trio: Optional[trio.MemorySendChannel] = None, to_trio: trio.MemorySendChannel|None = None,
from_trio: Optional[asyncio.Queue] = None, from_trio: asyncio.Queue|None = None,
): ):
if to_trio: if to_trio:
@ -36,7 +45,7 @@ async def sleep_and_err(
assert 0 assert 0
async def sleep_forever(): async def aio_sleep_forever():
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@ -44,7 +53,7 @@ async def trio_cancels_single_aio_task():
# spawn an ``asyncio`` task to run a func and return result # spawn an ``asyncio`` task to run a func and return result
with trio.move_on_after(.2): with trio.move_on_after(.2):
await tractor.to_asyncio.run_task(sleep_forever) await tractor.to_asyncio.run_task(aio_sleep_forever)
def test_trio_cancels_aio_on_actor_side(reg_addr): def test_trio_cancels_aio_on_actor_side(reg_addr):
@ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
async def asyncio_actor( async def asyncio_actor(
target: str, target: str,
expect_err: Exception|None = None expect_err: Exception|None = None
) -> None: ) -> None:
assert tractor.current_actor().is_infected_aio() assert tractor.current_actor().is_infected_aio()
target = globals()[target] target: Callable = globals()[target]
if '.' in expect_err: if '.' in expect_err:
modpath, _, name = expect_err.rpartition('.') modpath, _, name = expect_err.rpartition('.')
@ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
infect_asyncio=True, infect_asyncio=True,
) )
@ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
await n.run_in_actor( await n.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
infect_asyncio=True, infect_asyncio=True,
) )
@ -195,7 +203,7 @@ async def trio_ctx(
# spawn another asyncio task for the cuck of it. # spawn another asyncio task for the cuck of it.
n.start_soon( n.start_soon(
tractor.to_asyncio.run_task, tractor.to_asyncio.run_task,
sleep_forever, aio_sleep_forever,
) )
await trio.sleep_forever() await trio.sleep_forever()
@ -285,7 +293,7 @@ async def aio_cancel():
# cancel and enter sleep # cancel and enter sleep
task.cancel() task.cancel()
await sleep_forever() await aio_sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
@ -355,7 +363,6 @@ async def push_from_aio_task(
async def stream_from_aio( async def stream_from_aio(
exit_early: bool = False, exit_early: bool = False,
raise_err: bool = False, raise_err: bool = False,
aio_raise_err: bool = False, aio_raise_err: bool = False,
@ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics(
trio.run(main) trio.run(main)
@tractor.context
async def manage_file(
ctx: tractor.Context,
tmp_path_str: str,
bg_aio_task: bool = False,
):
'''
Start an `asyncio` task that just sleeps after registering a context
with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree
and ensure the stack is closed in the infected mode child.
To verify the teardown state just write a tmpfile to the `testdir`
and delete it on actor close.
'''
tmp_path: Path = Path(tmp_path_str)
tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file'
# create a the tmp file and tell the parent where it's at
assert not tmp_file.is_file()
tmp_file.touch()
stack: ExitStack = current_actor().lifetime_stack
stack.callback(tmp_file.unlink)
await ctx.started((
str(tmp_file),
os.getpid(),
))
# expect to be cancelled from here!
try:
# NOTE: turns out you don't even need to sched an aio task
# since the original issue, even though seemingly was due to
# the guest-run being abandoned + a `._debug.pause()` inside
# `._runtime._async_main()` (which was originally trying to
# debug the `.lifetime_stack` not closing), IS NOT actually
# the core issue?
#
# further notes:
#
# - `trio` only issues the " RuntimeWarning: Trio guest run
# got abandoned without properly finishing... weird stuff
# might happen" IFF you DO run a asyncio task here, BUT
# - the original issue of the `.lifetime_stack` not closing
# will still happen even if you don't run an `asyncio` task
# here even though the "abandon" messgage won't be shown..
#
# => ????? honestly i'm lost but it seems to be some issue
# with `asyncio` and SIGINT..
#
# XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and
# `run_as_asyncio_guest()` is written WITHOUT THE
# `.cancel_soon()` soln, both of these tests will pass ??
# so maybe it has something to do with `asyncio` loop init
# state?!?
# honestly, this REALLY reminds me why i haven't used
# `asyncio` by choice in years.. XD
#
# await tractor.to_asyncio.run_task(aio_sleep_forever)
if bg_aio_task:
async with trio.open_nursery() as tn:
tn.start_soon(
tractor.to_asyncio.run_task,
aio_sleep_forever,
)
await trio.sleep_forever()
# signalled manually at the OS level (aka KBI) by the parent actor.
except KeyboardInterrupt:
print('child raised KBI..')
assert tmp_file.exists()
raise
else:
raise RuntimeError('shoulda received a KBI?')
@pytest.mark.parametrize(
'bg_aio_task',
[
False,
# NOTE: (and see notes in `manage_file()` above as well) if
# we FOR SURE SPAWN AN AIO TASK in the child it seems the
# "silent-abandon" case (as is described in detail in
# `to_asyncio.run_as_asyncio_guest()`) does not happen and
# `asyncio`'s loop will at least abandon the `trio` side
# loudly? .. prolly the state-spot to start looking for
# a soln that results in NO ABANDONMENT.. XD
True,
],
ids=[
'bg_aio_task',
'just_trio_slee',
],
)
@pytest.mark.parametrize(
'wait_for_ctx',
[
False,
True,
],
ids=[
'raise_KBI_in_rent',
'wait_for_ctx',
],
)
def test_sigint_closes_lifetime_stack(
tmp_path: Path,
wait_for_ctx: bool,
bg_aio_task: bool,
):
'''
Ensure that an infected child can use the `Actor.lifetime_stack`
to make a file on boot and it's automatically cleaned up by the
actor-lifetime-linked exit stack closure.
'''
async def main():
try:
async with tractor.open_nursery() as n:
p = await n.start_actor(
'file_mngr',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
manage_file,
tmp_path_str=str(tmp_path),
bg_aio_task=bg_aio_task,
) as (ctx, first):
path_str, cpid = first
tmp_file: Path = Path(path_str)
assert tmp_file.exists()
# XXX originally to simulate what (hopefully)
# the below now triggers.. had to manually
# trigger a SIGINT from a ctl-c in the root.
# await trio.sleep_forever()
# XXX NOTE XXX signal infected-`asyncio` child to
# OS-cancel with SIGINT; this should trigger the
# bad `asyncio` cancel behaviour that can cause
# a guest-run abandon as was seen causing
# shm-buffer leaks in `piker`'s live quote stream
# susbys!
#
# await trio.sleep(.5)
await trio.sleep(.2)
os.kill(
cpid,
signal.SIGINT,
)
# XXX CASE 1: without the bug fixed, in
# the non-KBI-raised-in-parent case, this
# timeout should trigger!
if wait_for_ctx:
print('waiting for ctx outcome in parent..')
try:
with trio.fail_after(.7):
await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid
raise
# XXX CASE 2: this seems to be the source of the
# original issue which exhibited BEFORE we put
# a `Actor.cancel_soon()` inside
# `run_as_asyncio_guest()`..
else:
raise KeyboardInterrupt
pytest.fail('should have raised some kinda error?!?')
except (
KeyboardInterrupt,
ContextCancelled,
):
# XXX CASE 2: without the bug fixed, in the
# KBI-raised-in-parent case, the actor teardown should
# never get run (silently abaondoned by `asyncio`..) and
# thus the file should leak!
assert not tmp_file.exists()
assert ctx.maybe_error
trio.run(main)
# TODO: debug_mode tests once we get support for `asyncio`! # TODO: debug_mode tests once we get support for `asyncio`!
# #
# -[ ] need tests to wrap both scripts: # -[ ] need tests to wrap both scripts:

View File

@ -922,15 +922,6 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
'''
class MessagingError(Exception): class MessagingError(Exception):
''' '''
IPC related msg (typing), transaction (ordering) or dialog IPC related msg (typing), transaction (ordering) or dialog
@ -1324,7 +1315,9 @@ def _mk_recv_mte(
any_pld: Any = msgpack.decode(msg.pld) any_pld: Any = msgpack.decode(msg.pld)
message: str = ( message: str = (
f'invalid `{msg_type.__qualname__}` msg payload\n\n' f'invalid `{msg_type.__qualname__}` msg payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: ' f'{any_pld!r}\n\n'
f'has type {type(any_pld)!r}\n\n'
f'and does not match type-spec '
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
) )
bad_msg = msg bad_msg = msg

View File

@ -18,11 +18,13 @@
Infection apis for ``asyncio`` loops running ``trio`` using guest mode. Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
''' '''
from __future__ import annotations
import asyncio import asyncio
from asyncio.exceptions import CancelledError from asyncio.exceptions import CancelledError
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from dataclasses import dataclass from dataclasses import dataclass
import inspect import inspect
import traceback
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -30,20 +32,21 @@ from typing import (
Awaitable, Awaitable,
) )
import trio import tractor
from outcome import Error
from tractor.log import get_logger
from tractor._state import ( from tractor._state import (
current_actor,
debug_mode, debug_mode,
) )
from tractor.log import get_logger
from tractor.devx import _debug from tractor.devx import _debug
from tractor._exceptions import AsyncioCancelled
from tractor.trionics._broadcast import ( from tractor.trionics._broadcast import (
broadcast_receiver, broadcast_receiver,
BroadcastReceiver, BroadcastReceiver,
) )
import trio
from outcome import (
Error,
Outcome,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -161,7 +164,7 @@ def _run_asyncio_task(
''' '''
__tracebackhide__ = True __tracebackhide__ = True
if not current_actor().is_infected_aio(): if not tractor.current_actor().is_infected_aio():
raise RuntimeError( raise RuntimeError(
"`infect_asyncio` mode is not enabled!?" "`infect_asyncio` mode is not enabled!?"
) )
@ -172,7 +175,6 @@ def _run_asyncio_task(
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
args = tuple(inspect.getfullargspec(func).args) args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None): if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the # the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return # send channel then it intends to yield more then one return
@ -346,13 +348,22 @@ def _run_asyncio_task(
# on a checkpoint. # on a checkpoint.
cancel_scope.cancel() cancel_scope.cancel()
# raise any ``asyncio`` side error. # raise any `asyncio` side error.
raise aio_err raise aio_err
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
return chan return chan
class AsyncioCancelled(CancelledError):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
'''
@acm @acm
async def translate_aio_errors( async def translate_aio_errors(
@ -516,7 +527,6 @@ async def open_channel_from(
def run_as_asyncio_guest( def run_as_asyncio_guest(
trio_main: Callable, trio_main: Callable,
) -> None: ) -> None:
@ -548,6 +558,11 @@ def run_as_asyncio_guest(
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future() trio_done_fut = asyncio.Future()
startup_msg: str = (
'Starting `asyncio` guest-loop-run\n'
'-> got running loop\n'
'-> built a `trio`-done future\n'
)
if debug_mode(): if debug_mode():
# XXX make it obvi we know this isn't supported yet! # XXX make it obvi we know this isn't supported yet!
@ -562,34 +577,120 @@ def run_as_asyncio_guest(
def trio_done_callback(main_outcome): def trio_done_callback(main_outcome):
if isinstance(main_outcome, Error): if isinstance(main_outcome, Error):
error = main_outcome.error error: BaseException = main_outcome.error
# show an dedicated `asyncio`-side tb from the error
tb_str: str = ''.join(traceback.format_exception(error))
log.exception(
'Guest-run errored!?\n\n'
f'{main_outcome}\n'
f'{error}\n\n'
f'{tb_str}\n'
)
trio_done_fut.set_exception(error) trio_done_fut.set_exception(error)
# TODO: explicit asyncio tb? # raise inline
# traceback.print_exception(error)
# XXX: do we need this?
# actor.cancel_soon()
main_outcome.unwrap() main_outcome.unwrap()
else: else:
trio_done_fut.set_result(main_outcome) trio_done_fut.set_result(main_outcome)
log.runtime(f"trio_main finished: {main_outcome!r}") log.runtime(f'trio_main finished: {main_outcome!r}')
startup_msg += (
f'-> created {trio_done_callback!r}\n'
f'-> scheduling `trio_main`: {trio_main!r}\n'
)
# start the infection: run trio on the asyncio loop in "guest mode" # start the infection: run trio on the asyncio loop in "guest mode"
log.runtime( log.runtime(
'Infecting `asyncio`-process with a `trio` guest-run of\n\n' f'{startup_msg}\n\n'
f'{trio_main!r}\n\n' +
'Infecting `asyncio`-process with a `trio` guest-run!\n'
f'{trio_done_callback}\n'
) )
trio.lowlevel.start_guest_run( trio.lowlevel.start_guest_run(
trio_main, trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe, run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback, done_callback=trio_done_callback,
) )
# NOTE `.unwrap()` will raise on error try:
return (await trio_done_fut).unwrap() # TODO: better SIGINT handling since shielding seems to
# make NO DIFFERENCE XD
# -[ ] maybe this is due to 3.11's recent SIGINT handling
# changes and we can better work with/around it?
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
out: Outcome = await asyncio.shield(trio_done_fut)
# NOTE `Error.unwrap()` will raise
return out.unwrap()
except asyncio.CancelledError:
actor: tractor.Actor = tractor.current_actor()
log.exception(
'`asyncio`-side main task was cancelled!\n'
'Cancelling actor-runtime..\n'
f'c)>\n'
f' |_{actor}.cancel_soon()\n'
)
# XXX NOTE XXX the next LOC is super important!!!
# => without it, we can get a guest-run abandonment case
# where asyncio will not trigger `trio` in a final event
# loop cycle!
#
# our test,
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
# demonstrates how if when we raise a SIGINT-signal in an infected
# child we get a variable race condition outcome where
# either of the following can indeterminately happen,
#
# - "silent-abandon": `asyncio` abandons the `trio`
# guest-run task silently and no `trio`-guest-run or
# `tractor`-actor-runtime teardown happens whatsoever..
# this is the WORST (race) case outcome.
#
# - OR, "loud-abandon": the guest run get's abaondoned "loudly" with
# `trio` reporting a console traceback and further tbs of all
# the failed shutdown routines also show on console..
#
# our test can thus fail and (has been parametrized for)
# the 2 cases:
#
# - when the parent raises a KBI just after
# signalling the child,
# |_silent-abandon => the `Actor.lifetime_stack` will
# never be closed thus leaking a resource!
# -> FAIL!
# |_loud-abandon => despite the abandonment at least the
# stack will be closed out..
# -> PASS
#
# - when the parent instead simply waits on `ctx.wait_for_result()`
# (i.e. DOES not raise a KBI itself),
# |_silent-abandon => test will just hang and thus the ctx
# and actor will never be closed/cancelled/shutdown
# resulting in leaking a (file) resource since the
# `trio`/`tractor` runtime never relays a ctxc back to
# the parent; the test's timeout will trigger..
# -> FAIL!
# |_loud-abandon => this case seems to never happen??
#
# XXX FIRST PART XXX, SO, this is a fix to the
# "silent-abandon" case, NOT the `trio`-guest-run
# abandonment issue in general, for which the NEXT LOC
# is apparently a working fix!
actor.cancel_soon()
# XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to
# `trio`-guest-run to complete and teardown !!
#
# XXX WITHOUT THIS the guest-run gets race-conditionally
# abandoned by `asyncio`!!
# XD XD XD
await asyncio.shield(
asyncio.sleep(.1) # NOPE! it can't be 0 either XD
)
raise
# might as well if it's installed. # might as well if it's installed.
try: try:
@ -599,4 +700,6 @@ def run_as_asyncio_guest(
except ImportError: except ImportError:
pass pass
return asyncio.run(aio_main(trio_main)) return asyncio.run(
aio_main(trio_main),
)