Compare commits
1 Commits
284fa0340e
...
a870df68c0
Author | SHA1 | Date |
---|---|---|
|
a870df68c0 |
|
@ -2,16 +2,25 @@
|
||||||
The hipster way to force SC onto the stdlib's "async": 'infection mode'.
|
The hipster way to force SC onto the stdlib's "async": 'infection mode'.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from typing import Optional, Iterable, Union
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import builtins
|
import builtins
|
||||||
|
from contextlib import ExitStack
|
||||||
import itertools
|
import itertools
|
||||||
import importlib
|
import importlib
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
import signal
|
||||||
|
from typing import (
|
||||||
|
Callable,
|
||||||
|
Iterable,
|
||||||
|
Union,
|
||||||
|
)
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import (
|
from tractor import (
|
||||||
|
current_actor,
|
||||||
to_asyncio,
|
to_asyncio,
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
@ -25,8 +34,8 @@ async def sleep_and_err(
|
||||||
|
|
||||||
# just signature placeholders for compat with
|
# just signature placeholders for compat with
|
||||||
# ``to_asyncio.open_channel_from()``
|
# ``to_asyncio.open_channel_from()``
|
||||||
to_trio: Optional[trio.MemorySendChannel] = None,
|
to_trio: trio.MemorySendChannel|None = None,
|
||||||
from_trio: Optional[asyncio.Queue] = None,
|
from_trio: asyncio.Queue|None = None,
|
||||||
|
|
||||||
):
|
):
|
||||||
if to_trio:
|
if to_trio:
|
||||||
|
@ -36,7 +45,7 @@ async def sleep_and_err(
|
||||||
assert 0
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
async def sleep_forever():
|
async def aio_sleep_forever():
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,7 +53,7 @@ async def trio_cancels_single_aio_task():
|
||||||
|
|
||||||
# spawn an ``asyncio`` task to run a func and return result
|
# spawn an ``asyncio`` task to run a func and return result
|
||||||
with trio.move_on_after(.2):
|
with trio.move_on_after(.2):
|
||||||
await tractor.to_asyncio.run_task(sleep_forever)
|
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||||
|
@ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||||
|
|
||||||
|
|
||||||
async def asyncio_actor(
|
async def asyncio_actor(
|
||||||
|
|
||||||
target: str,
|
target: str,
|
||||||
expect_err: Exception|None = None
|
expect_err: Exception|None = None
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
assert tractor.current_actor().is_infected_aio()
|
assert tractor.current_actor().is_infected_aio()
|
||||||
target = globals()[target]
|
target: Callable = globals()[target]
|
||||||
|
|
||||||
if '.' in expect_err:
|
if '.' in expect_err:
|
||||||
modpath, _, name = expect_err.rpartition('.')
|
modpath, _, name = expect_err.rpartition('.')
|
||||||
|
@ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr):
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
portal = await n.run_in_actor(
|
portal = await n.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='sleep_forever',
|
target='aio_sleep_forever',
|
||||||
expect_err='trio.Cancelled',
|
expect_err='trio.Cancelled',
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
@ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr):
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
await n.run_in_actor(
|
await n.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='sleep_forever',
|
target='aio_sleep_forever',
|
||||||
expect_err='trio.Cancelled',
|
expect_err='trio.Cancelled',
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
@ -195,7 +203,7 @@ async def trio_ctx(
|
||||||
# spawn another asyncio task for the cuck of it.
|
# spawn another asyncio task for the cuck of it.
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
tractor.to_asyncio.run_task,
|
tractor.to_asyncio.run_task,
|
||||||
sleep_forever,
|
aio_sleep_forever,
|
||||||
)
|
)
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
@ -285,7 +293,7 @@ async def aio_cancel():
|
||||||
|
|
||||||
# cancel and enter sleep
|
# cancel and enter sleep
|
||||||
task.cancel()
|
task.cancel()
|
||||||
await sleep_forever()
|
await aio_sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
|
@ -355,7 +363,6 @@ async def push_from_aio_task(
|
||||||
|
|
||||||
|
|
||||||
async def stream_from_aio(
|
async def stream_from_aio(
|
||||||
|
|
||||||
exit_early: bool = False,
|
exit_early: bool = False,
|
||||||
raise_err: bool = False,
|
raise_err: bool = False,
|
||||||
aio_raise_err: bool = False,
|
aio_raise_err: bool = False,
|
||||||
|
@ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def manage_file(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
tmp_path_str: str,
|
||||||
|
bg_aio_task: bool = False,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Start an `asyncio` task that just sleeps after registering a context
|
||||||
|
with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree
|
||||||
|
and ensure the stack is closed in the infected mode child.
|
||||||
|
|
||||||
|
To verify the teardown state just write a tmpfile to the `testdir`
|
||||||
|
and delete it on actor close.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
tmp_path: Path = Path(tmp_path_str)
|
||||||
|
tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file'
|
||||||
|
|
||||||
|
# create a the tmp file and tell the parent where it's at
|
||||||
|
assert not tmp_file.is_file()
|
||||||
|
tmp_file.touch()
|
||||||
|
|
||||||
|
stack: ExitStack = current_actor().lifetime_stack
|
||||||
|
stack.callback(tmp_file.unlink)
|
||||||
|
|
||||||
|
await ctx.started((
|
||||||
|
str(tmp_file),
|
||||||
|
os.getpid(),
|
||||||
|
))
|
||||||
|
|
||||||
|
# expect to be cancelled from here!
|
||||||
|
try:
|
||||||
|
|
||||||
|
# NOTE: turns out you don't even need to sched an aio task
|
||||||
|
# since the original issue, even though seemingly was due to
|
||||||
|
# the guest-run being abandoned + a `._debug.pause()` inside
|
||||||
|
# `._runtime._async_main()` (which was originally trying to
|
||||||
|
# debug the `.lifetime_stack` not closing), IS NOT actually
|
||||||
|
# the core issue?
|
||||||
|
#
|
||||||
|
# further notes:
|
||||||
|
#
|
||||||
|
# - `trio` only issues the " RuntimeWarning: Trio guest run
|
||||||
|
# got abandoned without properly finishing... weird stuff
|
||||||
|
# might happen" IFF you DO run a asyncio task here, BUT
|
||||||
|
# - the original issue of the `.lifetime_stack` not closing
|
||||||
|
# will still happen even if you don't run an `asyncio` task
|
||||||
|
# here even though the "abandon" messgage won't be shown..
|
||||||
|
#
|
||||||
|
# => ????? honestly i'm lost but it seems to be some issue
|
||||||
|
# with `asyncio` and SIGINT..
|
||||||
|
#
|
||||||
|
# XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and
|
||||||
|
# `run_as_asyncio_guest()` is written WITHOUT THE
|
||||||
|
# `.cancel_soon()` soln, both of these tests will pass ??
|
||||||
|
# so maybe it has something to do with `asyncio` loop init
|
||||||
|
# state?!?
|
||||||
|
# honestly, this REALLY reminds me why i haven't used
|
||||||
|
# `asyncio` by choice in years.. XD
|
||||||
|
#
|
||||||
|
# await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||||
|
if bg_aio_task:
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
tn.start_soon(
|
||||||
|
tractor.to_asyncio.run_task,
|
||||||
|
aio_sleep_forever,
|
||||||
|
)
|
||||||
|
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# signalled manually at the OS level (aka KBI) by the parent actor.
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print('child raised KBI..')
|
||||||
|
assert tmp_file.exists()
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
raise RuntimeError('shoulda received a KBI?')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'bg_aio_task',
|
||||||
|
[
|
||||||
|
False,
|
||||||
|
|
||||||
|
# NOTE: (and see notes in `manage_file()` above as well) if
|
||||||
|
# we FOR SURE SPAWN AN AIO TASK in the child it seems the
|
||||||
|
# "silent-abandon" case (as is described in detail in
|
||||||
|
# `to_asyncio.run_as_asyncio_guest()`) does not happen and
|
||||||
|
# `asyncio`'s loop will at least abandon the `trio` side
|
||||||
|
# loudly? .. prolly the state-spot to start looking for
|
||||||
|
# a soln that results in NO ABANDONMENT.. XD
|
||||||
|
True,
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'bg_aio_task',
|
||||||
|
'just_trio_slee',
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'wait_for_ctx',
|
||||||
|
[
|
||||||
|
False,
|
||||||
|
True,
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'raise_KBI_in_rent',
|
||||||
|
'wait_for_ctx',
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_sigint_closes_lifetime_stack(
|
||||||
|
tmp_path: Path,
|
||||||
|
wait_for_ctx: bool,
|
||||||
|
bg_aio_task: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Ensure that an infected child can use the `Actor.lifetime_stack`
|
||||||
|
to make a file on boot and it's automatically cleaned up by the
|
||||||
|
actor-lifetime-linked exit stack closure.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
try:
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
p = await n.start_actor(
|
||||||
|
'file_mngr',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
infect_asyncio=True,
|
||||||
|
)
|
||||||
|
async with p.open_context(
|
||||||
|
manage_file,
|
||||||
|
tmp_path_str=str(tmp_path),
|
||||||
|
bg_aio_task=bg_aio_task,
|
||||||
|
) as (ctx, first):
|
||||||
|
|
||||||
|
path_str, cpid = first
|
||||||
|
tmp_file: Path = Path(path_str)
|
||||||
|
assert tmp_file.exists()
|
||||||
|
|
||||||
|
# XXX originally to simulate what (hopefully)
|
||||||
|
# the below now triggers.. had to manually
|
||||||
|
# trigger a SIGINT from a ctl-c in the root.
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
|
||||||
|
# XXX NOTE XXX signal infected-`asyncio` child to
|
||||||
|
# OS-cancel with SIGINT; this should trigger the
|
||||||
|
# bad `asyncio` cancel behaviour that can cause
|
||||||
|
# a guest-run abandon as was seen causing
|
||||||
|
# shm-buffer leaks in `piker`'s live quote stream
|
||||||
|
# susbys!
|
||||||
|
#
|
||||||
|
# await trio.sleep(.5)
|
||||||
|
await trio.sleep(.2)
|
||||||
|
os.kill(
|
||||||
|
cpid,
|
||||||
|
signal.SIGINT,
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX CASE 1: without the bug fixed, in
|
||||||
|
# the non-KBI-raised-in-parent case, this
|
||||||
|
# timeout should trigger!
|
||||||
|
if wait_for_ctx:
|
||||||
|
print('waiting for ctx outcome in parent..')
|
||||||
|
try:
|
||||||
|
with trio.fail_after(.7):
|
||||||
|
await ctx.wait_for_result()
|
||||||
|
except tractor.ContextCancelled as ctxc:
|
||||||
|
assert ctxc.canceller == ctx.chan.uid
|
||||||
|
raise
|
||||||
|
|
||||||
|
# XXX CASE 2: this seems to be the source of the
|
||||||
|
# original issue which exhibited BEFORE we put
|
||||||
|
# a `Actor.cancel_soon()` inside
|
||||||
|
# `run_as_asyncio_guest()`..
|
||||||
|
else:
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
pytest.fail('should have raised some kinda error?!?')
|
||||||
|
|
||||||
|
except (
|
||||||
|
KeyboardInterrupt,
|
||||||
|
ContextCancelled,
|
||||||
|
):
|
||||||
|
# XXX CASE 2: without the bug fixed, in the
|
||||||
|
# KBI-raised-in-parent case, the actor teardown should
|
||||||
|
# never get run (silently abaondoned by `asyncio`..) and
|
||||||
|
# thus the file should leak!
|
||||||
|
assert not tmp_file.exists()
|
||||||
|
assert ctx.maybe_error
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
# TODO: debug_mode tests once we get support for `asyncio`!
|
# TODO: debug_mode tests once we get support for `asyncio`!
|
||||||
#
|
#
|
||||||
# -[ ] need tests to wrap both scripts:
|
# -[ ] need tests to wrap both scripts:
|
||||||
|
|
|
@ -922,15 +922,6 @@ class NoRuntime(RuntimeError):
|
||||||
"The root actor has not been initialized yet"
|
"The root actor has not been initialized yet"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncioCancelled(Exception):
|
|
||||||
'''
|
|
||||||
Asyncio cancelled translation (non-base) error
|
|
||||||
for use with the ``to_asyncio`` module
|
|
||||||
to be raised in the ``trio`` side task
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
class MessagingError(Exception):
|
class MessagingError(Exception):
|
||||||
'''
|
'''
|
||||||
IPC related msg (typing), transaction (ordering) or dialog
|
IPC related msg (typing), transaction (ordering) or dialog
|
||||||
|
@ -1324,7 +1315,9 @@ def _mk_recv_mte(
|
||||||
any_pld: Any = msgpack.decode(msg.pld)
|
any_pld: Any = msgpack.decode(msg.pld)
|
||||||
message: str = (
|
message: str = (
|
||||||
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
|
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
|
||||||
f'value: `{any_pld!r}` does not match type-spec: '
|
f'{any_pld!r}\n\n'
|
||||||
|
f'has type {type(any_pld)!r}\n\n'
|
||||||
|
f'and does not match type-spec '
|
||||||
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
||||||
)
|
)
|
||||||
bad_msg = msg
|
bad_msg = msg
|
||||||
|
|
|
@ -18,11 +18,13 @@
|
||||||
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
|
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
import asyncio
|
import asyncio
|
||||||
from asyncio.exceptions import CancelledError
|
from asyncio.exceptions import CancelledError
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import inspect
|
import inspect
|
||||||
|
import traceback
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
|
@ -30,20 +32,21 @@ from typing import (
|
||||||
Awaitable,
|
Awaitable,
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
import tractor
|
||||||
from outcome import Error
|
|
||||||
|
|
||||||
from tractor.log import get_logger
|
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
current_actor,
|
|
||||||
debug_mode,
|
debug_mode,
|
||||||
)
|
)
|
||||||
|
from tractor.log import get_logger
|
||||||
from tractor.devx import _debug
|
from tractor.devx import _debug
|
||||||
from tractor._exceptions import AsyncioCancelled
|
|
||||||
from tractor.trionics._broadcast import (
|
from tractor.trionics._broadcast import (
|
||||||
broadcast_receiver,
|
broadcast_receiver,
|
||||||
BroadcastReceiver,
|
BroadcastReceiver,
|
||||||
)
|
)
|
||||||
|
import trio
|
||||||
|
from outcome import (
|
||||||
|
Error,
|
||||||
|
Outcome,
|
||||||
|
)
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -161,7 +164,7 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
if not current_actor().is_infected_aio():
|
if not tractor.current_actor().is_infected_aio():
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"`infect_asyncio` mode is not enabled!?"
|
"`infect_asyncio` mode is not enabled!?"
|
||||||
)
|
)
|
||||||
|
@ -172,7 +175,6 @@ def _run_asyncio_task(
|
||||||
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
|
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
|
||||||
|
|
||||||
args = tuple(inspect.getfullargspec(func).args)
|
args = tuple(inspect.getfullargspec(func).args)
|
||||||
|
|
||||||
if getattr(func, '_tractor_steam_function', None):
|
if getattr(func, '_tractor_steam_function', None):
|
||||||
# the assumption is that the target async routine accepts the
|
# the assumption is that the target async routine accepts the
|
||||||
# send channel then it intends to yield more then one return
|
# send channel then it intends to yield more then one return
|
||||||
|
@ -346,13 +348,22 @@ def _run_asyncio_task(
|
||||||
# on a checkpoint.
|
# on a checkpoint.
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
|
||||||
# raise any ``asyncio`` side error.
|
# raise any `asyncio` side error.
|
||||||
raise aio_err
|
raise aio_err
|
||||||
|
|
||||||
task.add_done_callback(cancel_trio)
|
task.add_done_callback(cancel_trio)
|
||||||
return chan
|
return chan
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncioCancelled(CancelledError):
|
||||||
|
'''
|
||||||
|
Asyncio cancelled translation (non-base) error
|
||||||
|
for use with the ``to_asyncio`` module
|
||||||
|
to be raised in the ``trio`` side task
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def translate_aio_errors(
|
async def translate_aio_errors(
|
||||||
|
|
||||||
|
@ -516,7 +527,6 @@ async def open_channel_from(
|
||||||
|
|
||||||
|
|
||||||
def run_as_asyncio_guest(
|
def run_as_asyncio_guest(
|
||||||
|
|
||||||
trio_main: Callable,
|
trio_main: Callable,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -548,6 +558,11 @@ def run_as_asyncio_guest(
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
trio_done_fut = asyncio.Future()
|
trio_done_fut = asyncio.Future()
|
||||||
|
startup_msg: str = (
|
||||||
|
'Starting `asyncio` guest-loop-run\n'
|
||||||
|
'-> got running loop\n'
|
||||||
|
'-> built a `trio`-done future\n'
|
||||||
|
)
|
||||||
|
|
||||||
if debug_mode():
|
if debug_mode():
|
||||||
# XXX make it obvi we know this isn't supported yet!
|
# XXX make it obvi we know this isn't supported yet!
|
||||||
|
@ -562,34 +577,120 @@ def run_as_asyncio_guest(
|
||||||
def trio_done_callback(main_outcome):
|
def trio_done_callback(main_outcome):
|
||||||
|
|
||||||
if isinstance(main_outcome, Error):
|
if isinstance(main_outcome, Error):
|
||||||
error = main_outcome.error
|
error: BaseException = main_outcome.error
|
||||||
|
|
||||||
|
# show an dedicated `asyncio`-side tb from the error
|
||||||
|
tb_str: str = ''.join(traceback.format_exception(error))
|
||||||
|
log.exception(
|
||||||
|
'Guest-run errored!?\n\n'
|
||||||
|
f'{main_outcome}\n'
|
||||||
|
f'{error}\n\n'
|
||||||
|
f'{tb_str}\n'
|
||||||
|
)
|
||||||
trio_done_fut.set_exception(error)
|
trio_done_fut.set_exception(error)
|
||||||
|
|
||||||
# TODO: explicit asyncio tb?
|
# raise inline
|
||||||
# traceback.print_exception(error)
|
|
||||||
|
|
||||||
# XXX: do we need this?
|
|
||||||
# actor.cancel_soon()
|
|
||||||
|
|
||||||
main_outcome.unwrap()
|
main_outcome.unwrap()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
trio_done_fut.set_result(main_outcome)
|
trio_done_fut.set_result(main_outcome)
|
||||||
log.runtime(f"trio_main finished: {main_outcome!r}")
|
log.runtime(f'trio_main finished: {main_outcome!r}')
|
||||||
|
|
||||||
|
startup_msg += (
|
||||||
|
f'-> created {trio_done_callback!r}\n'
|
||||||
|
f'-> scheduling `trio_main`: {trio_main!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# start the infection: run trio on the asyncio loop in "guest mode"
|
# start the infection: run trio on the asyncio loop in "guest mode"
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Infecting `asyncio`-process with a `trio` guest-run of\n\n'
|
f'{startup_msg}\n\n'
|
||||||
f'{trio_main!r}\n\n'
|
+
|
||||||
|
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
||||||
f'{trio_done_callback}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
trio.lowlevel.start_guest_run(
|
trio.lowlevel.start_guest_run(
|
||||||
trio_main,
|
trio_main,
|
||||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||||
done_callback=trio_done_callback,
|
done_callback=trio_done_callback,
|
||||||
)
|
)
|
||||||
# NOTE `.unwrap()` will raise on error
|
try:
|
||||||
return (await trio_done_fut).unwrap()
|
# TODO: better SIGINT handling since shielding seems to
|
||||||
|
# make NO DIFFERENCE XD
|
||||||
|
# -[ ] maybe this is due to 3.11's recent SIGINT handling
|
||||||
|
# changes and we can better work with/around it?
|
||||||
|
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
|
||||||
|
out: Outcome = await asyncio.shield(trio_done_fut)
|
||||||
|
# NOTE `Error.unwrap()` will raise
|
||||||
|
return out.unwrap()
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
actor: tractor.Actor = tractor.current_actor()
|
||||||
|
log.exception(
|
||||||
|
'`asyncio`-side main task was cancelled!\n'
|
||||||
|
'Cancelling actor-runtime..\n'
|
||||||
|
f'c)>\n'
|
||||||
|
f' |_{actor}.cancel_soon()\n'
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX NOTE XXX the next LOC is super important!!!
|
||||||
|
# => without it, we can get a guest-run abandonment case
|
||||||
|
# where asyncio will not trigger `trio` in a final event
|
||||||
|
# loop cycle!
|
||||||
|
#
|
||||||
|
# our test,
|
||||||
|
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
|
||||||
|
# demonstrates how if when we raise a SIGINT-signal in an infected
|
||||||
|
# child we get a variable race condition outcome where
|
||||||
|
# either of the following can indeterminately happen,
|
||||||
|
#
|
||||||
|
# - "silent-abandon": `asyncio` abandons the `trio`
|
||||||
|
# guest-run task silently and no `trio`-guest-run or
|
||||||
|
# `tractor`-actor-runtime teardown happens whatsoever..
|
||||||
|
# this is the WORST (race) case outcome.
|
||||||
|
#
|
||||||
|
# - OR, "loud-abandon": the guest run get's abaondoned "loudly" with
|
||||||
|
# `trio` reporting a console traceback and further tbs of all
|
||||||
|
# the failed shutdown routines also show on console..
|
||||||
|
#
|
||||||
|
# our test can thus fail and (has been parametrized for)
|
||||||
|
# the 2 cases:
|
||||||
|
#
|
||||||
|
# - when the parent raises a KBI just after
|
||||||
|
# signalling the child,
|
||||||
|
# |_silent-abandon => the `Actor.lifetime_stack` will
|
||||||
|
# never be closed thus leaking a resource!
|
||||||
|
# -> FAIL!
|
||||||
|
# |_loud-abandon => despite the abandonment at least the
|
||||||
|
# stack will be closed out..
|
||||||
|
# -> PASS
|
||||||
|
#
|
||||||
|
# - when the parent instead simply waits on `ctx.wait_for_result()`
|
||||||
|
# (i.e. DOES not raise a KBI itself),
|
||||||
|
# |_silent-abandon => test will just hang and thus the ctx
|
||||||
|
# and actor will never be closed/cancelled/shutdown
|
||||||
|
# resulting in leaking a (file) resource since the
|
||||||
|
# `trio`/`tractor` runtime never relays a ctxc back to
|
||||||
|
# the parent; the test's timeout will trigger..
|
||||||
|
# -> FAIL!
|
||||||
|
# |_loud-abandon => this case seems to never happen??
|
||||||
|
#
|
||||||
|
# XXX FIRST PART XXX, SO, this is a fix to the
|
||||||
|
# "silent-abandon" case, NOT the `trio`-guest-run
|
||||||
|
# abandonment issue in general, for which the NEXT LOC
|
||||||
|
# is apparently a working fix!
|
||||||
|
actor.cancel_soon()
|
||||||
|
|
||||||
|
# XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to
|
||||||
|
# `trio`-guest-run to complete and teardown !!
|
||||||
|
#
|
||||||
|
# XXX WITHOUT THIS the guest-run gets race-conditionally
|
||||||
|
# abandoned by `asyncio`!!
|
||||||
|
# XD XD XD
|
||||||
|
await asyncio.shield(
|
||||||
|
asyncio.sleep(.1) # NOPE! it can't be 0 either XD
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
# might as well if it's installed.
|
# might as well if it's installed.
|
||||||
try:
|
try:
|
||||||
|
@ -599,4 +700,6 @@ def run_as_asyncio_guest(
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return asyncio.run(aio_main(trio_main))
|
return asyncio.run(
|
||||||
|
aio_main(trio_main),
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue