Compare commits
1 Commits
284fa0340e
...
a870df68c0
Author | SHA1 | Date |
---|---|---|
|
a870df68c0 |
|
@ -2,16 +2,25 @@
|
|||
The hipster way to force SC onto the stdlib's "async": 'infection mode'.
|
||||
|
||||
'''
|
||||
from typing import Optional, Iterable, Union
|
||||
import asyncio
|
||||
import builtins
|
||||
from contextlib import ExitStack
|
||||
import itertools
|
||||
import importlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
import signal
|
||||
from typing import (
|
||||
Callable,
|
||||
Iterable,
|
||||
Union,
|
||||
)
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import (
|
||||
current_actor,
|
||||
to_asyncio,
|
||||
RemoteActorError,
|
||||
ContextCancelled,
|
||||
|
@ -25,8 +34,8 @@ async def sleep_and_err(
|
|||
|
||||
# just signature placeholders for compat with
|
||||
# ``to_asyncio.open_channel_from()``
|
||||
to_trio: Optional[trio.MemorySendChannel] = None,
|
||||
from_trio: Optional[asyncio.Queue] = None,
|
||||
to_trio: trio.MemorySendChannel|None = None,
|
||||
from_trio: asyncio.Queue|None = None,
|
||||
|
||||
):
|
||||
if to_trio:
|
||||
|
@ -36,7 +45,7 @@ async def sleep_and_err(
|
|||
assert 0
|
||||
|
||||
|
||||
async def sleep_forever():
|
||||
async def aio_sleep_forever():
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
||||
|
@ -44,7 +53,7 @@ async def trio_cancels_single_aio_task():
|
|||
|
||||
# spawn an ``asyncio`` task to run a func and return result
|
||||
with trio.move_on_after(.2):
|
||||
await tractor.to_asyncio.run_task(sleep_forever)
|
||||
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||
|
||||
|
||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||
|
@ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
|
|||
|
||||
|
||||
async def asyncio_actor(
|
||||
|
||||
target: str,
|
||||
expect_err: Exception|None = None
|
||||
|
||||
) -> None:
|
||||
|
||||
assert tractor.current_actor().is_infected_aio()
|
||||
target = globals()[target]
|
||||
target: Callable = globals()[target]
|
||||
|
||||
if '.' in expect_err:
|
||||
modpath, _, name = expect_err.rpartition('.')
|
||||
|
@ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr):
|
|||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
asyncio_actor,
|
||||
target='sleep_forever',
|
||||
target='aio_sleep_forever',
|
||||
expect_err='trio.Cancelled',
|
||||
infect_asyncio=True,
|
||||
)
|
||||
|
@ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr):
|
|||
async with tractor.open_nursery() as n:
|
||||
await n.run_in_actor(
|
||||
asyncio_actor,
|
||||
target='sleep_forever',
|
||||
target='aio_sleep_forever',
|
||||
expect_err='trio.Cancelled',
|
||||
infect_asyncio=True,
|
||||
)
|
||||
|
@ -195,7 +203,7 @@ async def trio_ctx(
|
|||
# spawn another asyncio task for the cuck of it.
|
||||
n.start_soon(
|
||||
tractor.to_asyncio.run_task,
|
||||
sleep_forever,
|
||||
aio_sleep_forever,
|
||||
)
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
@ -285,7 +293,7 @@ async def aio_cancel():
|
|||
|
||||
# cancel and enter sleep
|
||||
task.cancel()
|
||||
await sleep_forever()
|
||||
await aio_sleep_forever()
|
||||
|
||||
|
||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||
|
@ -355,7 +363,6 @@ async def push_from_aio_task(
|
|||
|
||||
|
||||
async def stream_from_aio(
|
||||
|
||||
exit_early: bool = False,
|
||||
raise_err: bool = False,
|
||||
aio_raise_err: bool = False,
|
||||
|
@ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics(
|
|||
trio.run(main)
|
||||
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def manage_file(
|
||||
ctx: tractor.Context,
|
||||
tmp_path_str: str,
|
||||
bg_aio_task: bool = False,
|
||||
):
|
||||
'''
|
||||
Start an `asyncio` task that just sleeps after registering a context
|
||||
with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree
|
||||
and ensure the stack is closed in the infected mode child.
|
||||
|
||||
To verify the teardown state just write a tmpfile to the `testdir`
|
||||
and delete it on actor close.
|
||||
|
||||
'''
|
||||
|
||||
tmp_path: Path = Path(tmp_path_str)
|
||||
tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file'
|
||||
|
||||
# create a the tmp file and tell the parent where it's at
|
||||
assert not tmp_file.is_file()
|
||||
tmp_file.touch()
|
||||
|
||||
stack: ExitStack = current_actor().lifetime_stack
|
||||
stack.callback(tmp_file.unlink)
|
||||
|
||||
await ctx.started((
|
||||
str(tmp_file),
|
||||
os.getpid(),
|
||||
))
|
||||
|
||||
# expect to be cancelled from here!
|
||||
try:
|
||||
|
||||
# NOTE: turns out you don't even need to sched an aio task
|
||||
# since the original issue, even though seemingly was due to
|
||||
# the guest-run being abandoned + a `._debug.pause()` inside
|
||||
# `._runtime._async_main()` (which was originally trying to
|
||||
# debug the `.lifetime_stack` not closing), IS NOT actually
|
||||
# the core issue?
|
||||
#
|
||||
# further notes:
|
||||
#
|
||||
# - `trio` only issues the " RuntimeWarning: Trio guest run
|
||||
# got abandoned without properly finishing... weird stuff
|
||||
# might happen" IFF you DO run a asyncio task here, BUT
|
||||
# - the original issue of the `.lifetime_stack` not closing
|
||||
# will still happen even if you don't run an `asyncio` task
|
||||
# here even though the "abandon" messgage won't be shown..
|
||||
#
|
||||
# => ????? honestly i'm lost but it seems to be some issue
|
||||
# with `asyncio` and SIGINT..
|
||||
#
|
||||
# XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and
|
||||
# `run_as_asyncio_guest()` is written WITHOUT THE
|
||||
# `.cancel_soon()` soln, both of these tests will pass ??
|
||||
# so maybe it has something to do with `asyncio` loop init
|
||||
# state?!?
|
||||
# honestly, this REALLY reminds me why i haven't used
|
||||
# `asyncio` by choice in years.. XD
|
||||
#
|
||||
# await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||
if bg_aio_task:
|
||||
async with trio.open_nursery() as tn:
|
||||
tn.start_soon(
|
||||
tractor.to_asyncio.run_task,
|
||||
aio_sleep_forever,
|
||||
)
|
||||
|
||||
await trio.sleep_forever()
|
||||
|
||||
# signalled manually at the OS level (aka KBI) by the parent actor.
|
||||
except KeyboardInterrupt:
|
||||
print('child raised KBI..')
|
||||
assert tmp_file.exists()
|
||||
raise
|
||||
else:
|
||||
raise RuntimeError('shoulda received a KBI?')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'bg_aio_task',
|
||||
[
|
||||
False,
|
||||
|
||||
# NOTE: (and see notes in `manage_file()` above as well) if
|
||||
# we FOR SURE SPAWN AN AIO TASK in the child it seems the
|
||||
# "silent-abandon" case (as is described in detail in
|
||||
# `to_asyncio.run_as_asyncio_guest()`) does not happen and
|
||||
# `asyncio`'s loop will at least abandon the `trio` side
|
||||
# loudly? .. prolly the state-spot to start looking for
|
||||
# a soln that results in NO ABANDONMENT.. XD
|
||||
True,
|
||||
],
|
||||
ids=[
|
||||
'bg_aio_task',
|
||||
'just_trio_slee',
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'wait_for_ctx',
|
||||
[
|
||||
False,
|
||||
True,
|
||||
],
|
||||
ids=[
|
||||
'raise_KBI_in_rent',
|
||||
'wait_for_ctx',
|
||||
],
|
||||
)
|
||||
def test_sigint_closes_lifetime_stack(
|
||||
tmp_path: Path,
|
||||
wait_for_ctx: bool,
|
||||
bg_aio_task: bool,
|
||||
):
|
||||
'''
|
||||
Ensure that an infected child can use the `Actor.lifetime_stack`
|
||||
to make a file on boot and it's automatically cleaned up by the
|
||||
actor-lifetime-linked exit stack closure.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
try:
|
||||
async with tractor.open_nursery() as n:
|
||||
p = await n.start_actor(
|
||||
'file_mngr',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
)
|
||||
async with p.open_context(
|
||||
manage_file,
|
||||
tmp_path_str=str(tmp_path),
|
||||
bg_aio_task=bg_aio_task,
|
||||
) as (ctx, first):
|
||||
|
||||
path_str, cpid = first
|
||||
tmp_file: Path = Path(path_str)
|
||||
assert tmp_file.exists()
|
||||
|
||||
# XXX originally to simulate what (hopefully)
|
||||
# the below now triggers.. had to manually
|
||||
# trigger a SIGINT from a ctl-c in the root.
|
||||
# await trio.sleep_forever()
|
||||
|
||||
# XXX NOTE XXX signal infected-`asyncio` child to
|
||||
# OS-cancel with SIGINT; this should trigger the
|
||||
# bad `asyncio` cancel behaviour that can cause
|
||||
# a guest-run abandon as was seen causing
|
||||
# shm-buffer leaks in `piker`'s live quote stream
|
||||
# susbys!
|
||||
#
|
||||
# await trio.sleep(.5)
|
||||
await trio.sleep(.2)
|
||||
os.kill(
|
||||
cpid,
|
||||
signal.SIGINT,
|
||||
)
|
||||
|
||||
# XXX CASE 1: without the bug fixed, in
|
||||
# the non-KBI-raised-in-parent case, this
|
||||
# timeout should trigger!
|
||||
if wait_for_ctx:
|
||||
print('waiting for ctx outcome in parent..')
|
||||
try:
|
||||
with trio.fail_after(.7):
|
||||
await ctx.wait_for_result()
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
assert ctxc.canceller == ctx.chan.uid
|
||||
raise
|
||||
|
||||
# XXX CASE 2: this seems to be the source of the
|
||||
# original issue which exhibited BEFORE we put
|
||||
# a `Actor.cancel_soon()` inside
|
||||
# `run_as_asyncio_guest()`..
|
||||
else:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
pytest.fail('should have raised some kinda error?!?')
|
||||
|
||||
except (
|
||||
KeyboardInterrupt,
|
||||
ContextCancelled,
|
||||
):
|
||||
# XXX CASE 2: without the bug fixed, in the
|
||||
# KBI-raised-in-parent case, the actor teardown should
|
||||
# never get run (silently abaondoned by `asyncio`..) and
|
||||
# thus the file should leak!
|
||||
assert not tmp_file.exists()
|
||||
assert ctx.maybe_error
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
# TODO: debug_mode tests once we get support for `asyncio`!
|
||||
#
|
||||
# -[ ] need tests to wrap both scripts:
|
||||
|
|
|
@ -922,15 +922,6 @@ class NoRuntime(RuntimeError):
|
|||
"The root actor has not been initialized yet"
|
||||
|
||||
|
||||
|
||||
class AsyncioCancelled(Exception):
|
||||
'''
|
||||
Asyncio cancelled translation (non-base) error
|
||||
for use with the ``to_asyncio`` module
|
||||
to be raised in the ``trio`` side task
|
||||
|
||||
'''
|
||||
|
||||
class MessagingError(Exception):
|
||||
'''
|
||||
IPC related msg (typing), transaction (ordering) or dialog
|
||||
|
@ -1324,7 +1315,9 @@ def _mk_recv_mte(
|
|||
any_pld: Any = msgpack.decode(msg.pld)
|
||||
message: str = (
|
||||
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
|
||||
f'value: `{any_pld!r}` does not match type-spec: '
|
||||
f'{any_pld!r}\n\n'
|
||||
f'has type {type(any_pld)!r}\n\n'
|
||||
f'and does not match type-spec '
|
||||
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
||||
)
|
||||
bad_msg = msg
|
||||
|
|
|
@ -18,11 +18,13 @@
|
|||
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
from asyncio.exceptions import CancelledError
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from dataclasses import dataclass
|
||||
import inspect
|
||||
import traceback
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
|
@ -30,20 +32,21 @@ from typing import (
|
|||
Awaitable,
|
||||
)
|
||||
|
||||
import trio
|
||||
from outcome import Error
|
||||
|
||||
from tractor.log import get_logger
|
||||
import tractor
|
||||
from tractor._state import (
|
||||
current_actor,
|
||||
debug_mode,
|
||||
)
|
||||
from tractor.log import get_logger
|
||||
from tractor.devx import _debug
|
||||
from tractor._exceptions import AsyncioCancelled
|
||||
from tractor.trionics._broadcast import (
|
||||
broadcast_receiver,
|
||||
BroadcastReceiver,
|
||||
)
|
||||
import trio
|
||||
from outcome import (
|
||||
Error,
|
||||
Outcome,
|
||||
)
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -161,7 +164,7 @@ def _run_asyncio_task(
|
|||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
if not current_actor().is_infected_aio():
|
||||
if not tractor.current_actor().is_infected_aio():
|
||||
raise RuntimeError(
|
||||
"`infect_asyncio` mode is not enabled!?"
|
||||
)
|
||||
|
@ -172,7 +175,6 @@ def _run_asyncio_task(
|
|||
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
|
||||
|
||||
args = tuple(inspect.getfullargspec(func).args)
|
||||
|
||||
if getattr(func, '_tractor_steam_function', None):
|
||||
# the assumption is that the target async routine accepts the
|
||||
# send channel then it intends to yield more then one return
|
||||
|
@ -346,13 +348,22 @@ def _run_asyncio_task(
|
|||
# on a checkpoint.
|
||||
cancel_scope.cancel()
|
||||
|
||||
# raise any ``asyncio`` side error.
|
||||
# raise any `asyncio` side error.
|
||||
raise aio_err
|
||||
|
||||
task.add_done_callback(cancel_trio)
|
||||
return chan
|
||||
|
||||
|
||||
class AsyncioCancelled(CancelledError):
|
||||
'''
|
||||
Asyncio cancelled translation (non-base) error
|
||||
for use with the ``to_asyncio`` module
|
||||
to be raised in the ``trio`` side task
|
||||
|
||||
'''
|
||||
|
||||
|
||||
@acm
|
||||
async def translate_aio_errors(
|
||||
|
||||
|
@ -516,7 +527,6 @@ async def open_channel_from(
|
|||
|
||||
|
||||
def run_as_asyncio_guest(
|
||||
|
||||
trio_main: Callable,
|
||||
|
||||
) -> None:
|
||||
|
@ -548,6 +558,11 @@ def run_as_asyncio_guest(
|
|||
|
||||
loop = asyncio.get_running_loop()
|
||||
trio_done_fut = asyncio.Future()
|
||||
startup_msg: str = (
|
||||
'Starting `asyncio` guest-loop-run\n'
|
||||
'-> got running loop\n'
|
||||
'-> built a `trio`-done future\n'
|
||||
)
|
||||
|
||||
if debug_mode():
|
||||
# XXX make it obvi we know this isn't supported yet!
|
||||
|
@ -562,34 +577,120 @@ def run_as_asyncio_guest(
|
|||
def trio_done_callback(main_outcome):
|
||||
|
||||
if isinstance(main_outcome, Error):
|
||||
error = main_outcome.error
|
||||
error: BaseException = main_outcome.error
|
||||
|
||||
# show an dedicated `asyncio`-side tb from the error
|
||||
tb_str: str = ''.join(traceback.format_exception(error))
|
||||
log.exception(
|
||||
'Guest-run errored!?\n\n'
|
||||
f'{main_outcome}\n'
|
||||
f'{error}\n\n'
|
||||
f'{tb_str}\n'
|
||||
)
|
||||
trio_done_fut.set_exception(error)
|
||||
|
||||
# TODO: explicit asyncio tb?
|
||||
# traceback.print_exception(error)
|
||||
|
||||
# XXX: do we need this?
|
||||
# actor.cancel_soon()
|
||||
|
||||
# raise inline
|
||||
main_outcome.unwrap()
|
||||
|
||||
else:
|
||||
trio_done_fut.set_result(main_outcome)
|
||||
log.runtime(f"trio_main finished: {main_outcome!r}")
|
||||
log.runtime(f'trio_main finished: {main_outcome!r}')
|
||||
|
||||
startup_msg += (
|
||||
f'-> created {trio_done_callback!r}\n'
|
||||
f'-> scheduling `trio_main`: {trio_main!r}\n'
|
||||
)
|
||||
|
||||
# start the infection: run trio on the asyncio loop in "guest mode"
|
||||
log.runtime(
|
||||
'Infecting `asyncio`-process with a `trio` guest-run of\n\n'
|
||||
f'{trio_main!r}\n\n'
|
||||
|
||||
f'{trio_done_callback}\n'
|
||||
f'{startup_msg}\n\n'
|
||||
+
|
||||
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
||||
)
|
||||
|
||||
trio.lowlevel.start_guest_run(
|
||||
trio_main,
|
||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||
done_callback=trio_done_callback,
|
||||
)
|
||||
# NOTE `.unwrap()` will raise on error
|
||||
return (await trio_done_fut).unwrap()
|
||||
try:
|
||||
# TODO: better SIGINT handling since shielding seems to
|
||||
# make NO DIFFERENCE XD
|
||||
# -[ ] maybe this is due to 3.11's recent SIGINT handling
|
||||
# changes and we can better work with/around it?
|
||||
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
|
||||
out: Outcome = await asyncio.shield(trio_done_fut)
|
||||
# NOTE `Error.unwrap()` will raise
|
||||
return out.unwrap()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
log.exception(
|
||||
'`asyncio`-side main task was cancelled!\n'
|
||||
'Cancelling actor-runtime..\n'
|
||||
f'c)>\n'
|
||||
f' |_{actor}.cancel_soon()\n'
|
||||
|
||||
)
|
||||
|
||||
# XXX NOTE XXX the next LOC is super important!!!
|
||||
# => without it, we can get a guest-run abandonment case
|
||||
# where asyncio will not trigger `trio` in a final event
|
||||
# loop cycle!
|
||||
#
|
||||
# our test,
|
||||
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
|
||||
# demonstrates how if when we raise a SIGINT-signal in an infected
|
||||
# child we get a variable race condition outcome where
|
||||
# either of the following can indeterminately happen,
|
||||
#
|
||||
# - "silent-abandon": `asyncio` abandons the `trio`
|
||||
# guest-run task silently and no `trio`-guest-run or
|
||||
# `tractor`-actor-runtime teardown happens whatsoever..
|
||||
# this is the WORST (race) case outcome.
|
||||
#
|
||||
# - OR, "loud-abandon": the guest run get's abaondoned "loudly" with
|
||||
# `trio` reporting a console traceback and further tbs of all
|
||||
# the failed shutdown routines also show on console..
|
||||
#
|
||||
# our test can thus fail and (has been parametrized for)
|
||||
# the 2 cases:
|
||||
#
|
||||
# - when the parent raises a KBI just after
|
||||
# signalling the child,
|
||||
# |_silent-abandon => the `Actor.lifetime_stack` will
|
||||
# never be closed thus leaking a resource!
|
||||
# -> FAIL!
|
||||
# |_loud-abandon => despite the abandonment at least the
|
||||
# stack will be closed out..
|
||||
# -> PASS
|
||||
#
|
||||
# - when the parent instead simply waits on `ctx.wait_for_result()`
|
||||
# (i.e. DOES not raise a KBI itself),
|
||||
# |_silent-abandon => test will just hang and thus the ctx
|
||||
# and actor will never be closed/cancelled/shutdown
|
||||
# resulting in leaking a (file) resource since the
|
||||
# `trio`/`tractor` runtime never relays a ctxc back to
|
||||
# the parent; the test's timeout will trigger..
|
||||
# -> FAIL!
|
||||
# |_loud-abandon => this case seems to never happen??
|
||||
#
|
||||
# XXX FIRST PART XXX, SO, this is a fix to the
|
||||
# "silent-abandon" case, NOT the `trio`-guest-run
|
||||
# abandonment issue in general, for which the NEXT LOC
|
||||
# is apparently a working fix!
|
||||
actor.cancel_soon()
|
||||
|
||||
# XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to
|
||||
# `trio`-guest-run to complete and teardown !!
|
||||
#
|
||||
# XXX WITHOUT THIS the guest-run gets race-conditionally
|
||||
# abandoned by `asyncio`!!
|
||||
# XD XD XD
|
||||
await asyncio.shield(
|
||||
asyncio.sleep(.1) # NOPE! it can't be 0 either XD
|
||||
)
|
||||
raise
|
||||
|
||||
# might as well if it's installed.
|
||||
try:
|
||||
|
@ -599,4 +700,6 @@ def run_as_asyncio_guest(
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
return asyncio.run(aio_main(trio_main))
|
||||
return asyncio.run(
|
||||
aio_main(trio_main),
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue