From a870df68c0c91d510b0ca52c8cc98ca3d3a9b30b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 Jun 2024 16:10:23 -0400 Subject: [PATCH] Hack `asyncio` to not abandon a guest-mode run? Took me a while to figure out what the heck was going on but, turns out `asyncio` changed their SIGINT handling in 3.11 as per: https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption I'm not entirely sure if it's the 3.11 changes or possibly wtv further updates were made in 3.12 but more or less due to the way our current main task was written the `trio` guest-run was getting abandoned on SIGINTs sent from the OS to the infected child proc.. Note that much of the bug and soln cases are layed out in very detailed comment-notes both in the new test and `run_as_asyncio_guest()`, right above the final "fix" lines. Add new `test_infected_aio.test_sigint_closes_lifetime_stack()` test suite which reliably triggers all abandonment issues with multiple cases of different parent behaviour post-sending-SIGINT-to-child: 1. briefly sleep then raise a KBI in the parent which was originally demonstrating the file leak not being cleaned up by `Actor.lifetime_stack.close()` and simulates a ctl-c from the console (relayed in tandem by the OS to the parent and child processes). 2. do `Context.wait_for_result()` on the child context which would hang and timeout since the actor runtime would never complete and thus never relay a `ContextCancelled`. 3. both with and without running a `asyncio` task in the `manage_file` child actor; originally it seemed that with an aio task scheduled in the child actor the guest-run abandonment always was the "loud" case where there seemed to be some actor teardown but with tbs from python failing to gracefully exit the `trio` runtime.. The (seemingly working) "fix" required 2 lines of code to be run inside a `asyncio.CancelledError` handler around the call to `await trio_done_fut`: - `Actor.cancel_soon()` which schedules the actor runtime to cancel on the next `trio` runner cycle and results in a "self cancellation" of the actor. - "pumping the `asyncio` event loop" with a non-0 `.sleep(0.1)` XD |_ seems that a "shielded" pump with some actual `delay: float >= 0` did the trick to get `asyncio` to allow the `trio` runner/loop to fully complete its guest-run without abandonment. Other supporting changes: - move `._exceptions.AsyncioCancelled`, our renamed `asyncio.CancelledError` error-sub-type-wrapper, to `.to_asyncio` and make it derive from `CancelledError` so as to be sure when raised by our `asyncio` x-> `trio` exception relay machinery that `asyncio` is getting the specific type it expects during cancellation. - do "summary status" style logging in `run_as_asyncio_guest()` wherein we compile the eventual `startup_msg: str` emitted just before waiting on the `trio_done_fut`. - shield-wait with `out: Outcome = await asyncio.shield(trio_done_fut)` even though it seems to do nothing in the SIGINT handling case..(I presume it might help avoid abandonment in a `asyncio.Task.cancel()` case maybe?) --- tests/test_infected_asyncio.py | 225 +++++++++++++++++++++++++++++++-- tractor/_exceptions.py | 13 +- tractor/to_asyncio.py | 153 ++++++++++++++++++---- 3 files changed, 344 insertions(+), 47 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 45722a6..8d4697f 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -2,16 +2,25 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'. ''' -from typing import Optional, Iterable, Union import asyncio import builtins +from contextlib import ExitStack import itertools import importlib +import os +from pathlib import Path +import signal +from typing import ( + Callable, + Iterable, + Union, +) import pytest import trio import tractor from tractor import ( + current_actor, to_asyncio, RemoteActorError, ContextCancelled, @@ -25,8 +34,8 @@ async def sleep_and_err( # just signature placeholders for compat with # ``to_asyncio.open_channel_from()`` - to_trio: Optional[trio.MemorySendChannel] = None, - from_trio: Optional[asyncio.Queue] = None, + to_trio: trio.MemorySendChannel|None = None, + from_trio: asyncio.Queue|None = None, ): if to_trio: @@ -36,7 +45,7 @@ async def sleep_and_err( assert 0 -async def sleep_forever(): +async def aio_sleep_forever(): await asyncio.sleep(float('inf')) @@ -44,7 +53,7 @@ async def trio_cancels_single_aio_task(): # spawn an ``asyncio`` task to run a func and return result with trio.move_on_after(.2): - await tractor.to_asyncio.run_task(sleep_forever) + await tractor.to_asyncio.run_task(aio_sleep_forever) def test_trio_cancels_aio_on_actor_side(reg_addr): @@ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): async def asyncio_actor( - target: str, expect_err: Exception|None = None ) -> None: assert tractor.current_actor().is_infected_aio() - target = globals()[target] + target: Callable = globals()[target] if '.' in expect_err: modpath, _, name = expect_err.rpartition('.') @@ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr): async with tractor.open_nursery() as n: portal = await n.run_in_actor( asyncio_actor, - target='sleep_forever', + target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) @@ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr): async with tractor.open_nursery() as n: await n.run_in_actor( asyncio_actor, - target='sleep_forever', + target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) @@ -195,7 +203,7 @@ async def trio_ctx( # spawn another asyncio task for the cuck of it. n.start_soon( tractor.to_asyncio.run_task, - sleep_forever, + aio_sleep_forever, ) await trio.sleep_forever() @@ -285,7 +293,7 @@ async def aio_cancel(): # cancel and enter sleep task.cancel() - await sleep_forever() + await aio_sleep_forever() def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): @@ -355,7 +363,6 @@ async def push_from_aio_task( async def stream_from_aio( - exit_early: bool = False, raise_err: bool = False, aio_raise_err: bool = False, @@ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics( trio.run(main) + +@tractor.context +async def manage_file( + ctx: tractor.Context, + tmp_path_str: str, + bg_aio_task: bool = False, +): + ''' + Start an `asyncio` task that just sleeps after registering a context + with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree + and ensure the stack is closed in the infected mode child. + + To verify the teardown state just write a tmpfile to the `testdir` + and delete it on actor close. + + ''' + + tmp_path: Path = Path(tmp_path_str) + tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file' + + # create a the tmp file and tell the parent where it's at + assert not tmp_file.is_file() + tmp_file.touch() + + stack: ExitStack = current_actor().lifetime_stack + stack.callback(tmp_file.unlink) + + await ctx.started(( + str(tmp_file), + os.getpid(), + )) + + # expect to be cancelled from here! + try: + + # NOTE: turns out you don't even need to sched an aio task + # since the original issue, even though seemingly was due to + # the guest-run being abandoned + a `._debug.pause()` inside + # `._runtime._async_main()` (which was originally trying to + # debug the `.lifetime_stack` not closing), IS NOT actually + # the core issue? + # + # further notes: + # + # - `trio` only issues the " RuntimeWarning: Trio guest run + # got abandoned without properly finishing... weird stuff + # might happen" IFF you DO run a asyncio task here, BUT + # - the original issue of the `.lifetime_stack` not closing + # will still happen even if you don't run an `asyncio` task + # here even though the "abandon" messgage won't be shown.. + # + # => ????? honestly i'm lost but it seems to be some issue + # with `asyncio` and SIGINT.. + # + # XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and + # `run_as_asyncio_guest()` is written WITHOUT THE + # `.cancel_soon()` soln, both of these tests will pass ?? + # so maybe it has something to do with `asyncio` loop init + # state?!? + # honestly, this REALLY reminds me why i haven't used + # `asyncio` by choice in years.. XD + # + # await tractor.to_asyncio.run_task(aio_sleep_forever) + if bg_aio_task: + async with trio.open_nursery() as tn: + tn.start_soon( + tractor.to_asyncio.run_task, + aio_sleep_forever, + ) + + await trio.sleep_forever() + + # signalled manually at the OS level (aka KBI) by the parent actor. + except KeyboardInterrupt: + print('child raised KBI..') + assert tmp_file.exists() + raise + else: + raise RuntimeError('shoulda received a KBI?') + + +@pytest.mark.parametrize( + 'bg_aio_task', + [ + False, + + # NOTE: (and see notes in `manage_file()` above as well) if + # we FOR SURE SPAWN AN AIO TASK in the child it seems the + # "silent-abandon" case (as is described in detail in + # `to_asyncio.run_as_asyncio_guest()`) does not happen and + # `asyncio`'s loop will at least abandon the `trio` side + # loudly? .. prolly the state-spot to start looking for + # a soln that results in NO ABANDONMENT.. XD + True, + ], + ids=[ + 'bg_aio_task', + 'just_trio_slee', + ], +) +@pytest.mark.parametrize( + 'wait_for_ctx', + [ + False, + True, + ], + ids=[ + 'raise_KBI_in_rent', + 'wait_for_ctx', + ], +) +def test_sigint_closes_lifetime_stack( + tmp_path: Path, + wait_for_ctx: bool, + bg_aio_task: bool, +): + ''' + Ensure that an infected child can use the `Actor.lifetime_stack` + to make a file on boot and it's automatically cleaned up by the + actor-lifetime-linked exit stack closure. + + ''' + async def main(): + try: + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'file_mngr', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + manage_file, + tmp_path_str=str(tmp_path), + bg_aio_task=bg_aio_task, + ) as (ctx, first): + + path_str, cpid = first + tmp_file: Path = Path(path_str) + assert tmp_file.exists() + + # XXX originally to simulate what (hopefully) + # the below now triggers.. had to manually + # trigger a SIGINT from a ctl-c in the root. + # await trio.sleep_forever() + + # XXX NOTE XXX signal infected-`asyncio` child to + # OS-cancel with SIGINT; this should trigger the + # bad `asyncio` cancel behaviour that can cause + # a guest-run abandon as was seen causing + # shm-buffer leaks in `piker`'s live quote stream + # susbys! + # + # await trio.sleep(.5) + await trio.sleep(.2) + os.kill( + cpid, + signal.SIGINT, + ) + + # XXX CASE 1: without the bug fixed, in + # the non-KBI-raised-in-parent case, this + # timeout should trigger! + if wait_for_ctx: + print('waiting for ctx outcome in parent..') + try: + with trio.fail_after(.7): + await ctx.wait_for_result() + except tractor.ContextCancelled as ctxc: + assert ctxc.canceller == ctx.chan.uid + raise + + # XXX CASE 2: this seems to be the source of the + # original issue which exhibited BEFORE we put + # a `Actor.cancel_soon()` inside + # `run_as_asyncio_guest()`.. + else: + raise KeyboardInterrupt + + pytest.fail('should have raised some kinda error?!?') + + except ( + KeyboardInterrupt, + ContextCancelled, + ): + # XXX CASE 2: without the bug fixed, in the + # KBI-raised-in-parent case, the actor teardown should + # never get run (silently abaondoned by `asyncio`..) and + # thus the file should leak! + assert not tmp_file.exists() + assert ctx.maybe_error + + trio.run(main) + + # TODO: debug_mode tests once we get support for `asyncio`! # # -[ ] need tests to wrap both scripts: diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 7164d6a..46b64c1 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -922,15 +922,6 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" - -class AsyncioCancelled(Exception): - ''' - Asyncio cancelled translation (non-base) error - for use with the ``to_asyncio`` module - to be raised in the ``trio`` side task - - ''' - class MessagingError(Exception): ''' IPC related msg (typing), transaction (ordering) or dialog @@ -1324,7 +1315,9 @@ def _mk_recv_mte( any_pld: Any = msgpack.decode(msg.pld) message: str = ( f'invalid `{msg_type.__qualname__}` msg payload\n\n' - f'value: `{any_pld!r}` does not match type-spec: ' + f'{any_pld!r}\n\n' + f'has type {type(any_pld)!r}\n\n' + f'and does not match type-spec ' f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' ) bad_msg = msg diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index d1451b4..e041721 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -18,11 +18,13 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' +from __future__ import annotations import asyncio from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect +import traceback from typing import ( Any, Callable, @@ -30,20 +32,21 @@ from typing import ( Awaitable, ) -import trio -from outcome import Error - -from tractor.log import get_logger +import tractor from tractor._state import ( - current_actor, debug_mode, ) +from tractor.log import get_logger from tractor.devx import _debug -from tractor._exceptions import AsyncioCancelled from tractor.trionics._broadcast import ( broadcast_receiver, BroadcastReceiver, ) +import trio +from outcome import ( + Error, + Outcome, +) log = get_logger(__name__) @@ -161,7 +164,7 @@ def _run_asyncio_task( ''' __tracebackhide__ = True - if not current_actor().is_infected_aio(): + if not tractor.current_actor().is_infected_aio(): raise RuntimeError( "`infect_asyncio` mode is not enabled!?" ) @@ -172,7 +175,6 @@ def _run_asyncio_task( to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore args = tuple(inspect.getfullargspec(func).args) - if getattr(func, '_tractor_steam_function', None): # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return @@ -346,13 +348,22 @@ def _run_asyncio_task( # on a checkpoint. cancel_scope.cancel() - # raise any ``asyncio`` side error. + # raise any `asyncio` side error. raise aio_err task.add_done_callback(cancel_trio) return chan +class AsyncioCancelled(CancelledError): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + ''' + + @acm async def translate_aio_errors( @@ -516,7 +527,6 @@ async def open_channel_from( def run_as_asyncio_guest( - trio_main: Callable, ) -> None: @@ -548,6 +558,11 @@ def run_as_asyncio_guest( loop = asyncio.get_running_loop() trio_done_fut = asyncio.Future() + startup_msg: str = ( + 'Starting `asyncio` guest-loop-run\n' + '-> got running loop\n' + '-> built a `trio`-done future\n' + ) if debug_mode(): # XXX make it obvi we know this isn't supported yet! @@ -562,34 +577,120 @@ def run_as_asyncio_guest( def trio_done_callback(main_outcome): if isinstance(main_outcome, Error): - error = main_outcome.error + error: BaseException = main_outcome.error + + # show an dedicated `asyncio`-side tb from the error + tb_str: str = ''.join(traceback.format_exception(error)) + log.exception( + 'Guest-run errored!?\n\n' + f'{main_outcome}\n' + f'{error}\n\n' + f'{tb_str}\n' + ) trio_done_fut.set_exception(error) - # TODO: explicit asyncio tb? - # traceback.print_exception(error) - - # XXX: do we need this? - # actor.cancel_soon() - + # raise inline main_outcome.unwrap() + else: trio_done_fut.set_result(main_outcome) - log.runtime(f"trio_main finished: {main_outcome!r}") + log.runtime(f'trio_main finished: {main_outcome!r}') + + startup_msg += ( + f'-> created {trio_done_callback!r}\n' + f'-> scheduling `trio_main`: {trio_main!r}\n' + ) # start the infection: run trio on the asyncio loop in "guest mode" log.runtime( - 'Infecting `asyncio`-process with a `trio` guest-run of\n\n' - f'{trio_main!r}\n\n' - - f'{trio_done_callback}\n' + f'{startup_msg}\n\n' + + + 'Infecting `asyncio`-process with a `trio` guest-run!\n' ) + trio.lowlevel.start_guest_run( trio_main, run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) - # NOTE `.unwrap()` will raise on error - return (await trio_done_fut).unwrap() + try: + # TODO: better SIGINT handling since shielding seems to + # make NO DIFFERENCE XD + # -[ ] maybe this is due to 3.11's recent SIGINT handling + # changes and we can better work with/around it? + # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption + out: Outcome = await asyncio.shield(trio_done_fut) + # NOTE `Error.unwrap()` will raise + return out.unwrap() + + except asyncio.CancelledError: + actor: tractor.Actor = tractor.current_actor() + log.exception( + '`asyncio`-side main task was cancelled!\n' + 'Cancelling actor-runtime..\n' + f'c)>\n' + f' |_{actor}.cancel_soon()\n' + + ) + + # XXX NOTE XXX the next LOC is super important!!! + # => without it, we can get a guest-run abandonment case + # where asyncio will not trigger `trio` in a final event + # loop cycle! + # + # our test, + # `test_infected_asyncio.test_sigint_closes_lifetime_stack()` + # demonstrates how if when we raise a SIGINT-signal in an infected + # child we get a variable race condition outcome where + # either of the following can indeterminately happen, + # + # - "silent-abandon": `asyncio` abandons the `trio` + # guest-run task silently and no `trio`-guest-run or + # `tractor`-actor-runtime teardown happens whatsoever.. + # this is the WORST (race) case outcome. + # + # - OR, "loud-abandon": the guest run get's abaondoned "loudly" with + # `trio` reporting a console traceback and further tbs of all + # the failed shutdown routines also show on console.. + # + # our test can thus fail and (has been parametrized for) + # the 2 cases: + # + # - when the parent raises a KBI just after + # signalling the child, + # |_silent-abandon => the `Actor.lifetime_stack` will + # never be closed thus leaking a resource! + # -> FAIL! + # |_loud-abandon => despite the abandonment at least the + # stack will be closed out.. + # -> PASS + # + # - when the parent instead simply waits on `ctx.wait_for_result()` + # (i.e. DOES not raise a KBI itself), + # |_silent-abandon => test will just hang and thus the ctx + # and actor will never be closed/cancelled/shutdown + # resulting in leaking a (file) resource since the + # `trio`/`tractor` runtime never relays a ctxc back to + # the parent; the test's timeout will trigger.. + # -> FAIL! + # |_loud-abandon => this case seems to never happen?? + # + # XXX FIRST PART XXX, SO, this is a fix to the + # "silent-abandon" case, NOT the `trio`-guest-run + # abandonment issue in general, for which the NEXT LOC + # is apparently a working fix! + actor.cancel_soon() + + # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to + # `trio`-guest-run to complete and teardown !! + # + # XXX WITHOUT THIS the guest-run gets race-conditionally + # abandoned by `asyncio`!! + # XD XD XD + await asyncio.shield( + asyncio.sleep(.1) # NOPE! it can't be 0 either XD + ) + raise # might as well if it's installed. try: @@ -599,4 +700,6 @@ def run_as_asyncio_guest( except ImportError: pass - return asyncio.run(aio_main(trio_main)) + return asyncio.run( + aio_main(trio_main), + )