Compare commits
9 Commits
e8111e40f9
...
e646ce5c0d
Author | SHA1 | Date |
---|---|---|
|
e646ce5c0d | |
|
b6d800954a | |
|
beb7097ab4 | |
|
724c22d266 | |
|
ecd61226d8 | |
|
69fd46e1ce | |
|
af660c1019 | |
|
34e9e529d2 | |
|
816b82f9fe |
|
@ -39,7 +39,6 @@ async def main(
|
|||
loglevel='devx',
|
||||
) as an,
|
||||
):
|
||||
|
||||
ptl: tractor.Portal = await an.start_actor(
|
||||
'hanger',
|
||||
enable_modules=[__name__],
|
||||
|
@ -54,13 +53,16 @@ async def main(
|
|||
|
||||
print(
|
||||
'Yo my child hanging..?\n'
|
||||
'Sending SIGUSR1 to see a tree-trace!\n'
|
||||
# "i'm a user who wants to see a `stackscope` tree!\n"
|
||||
)
|
||||
|
||||
# XXX simulate the wrapping test's "user actions"
|
||||
# (i.e. if a human didn't run this manually but wants to
|
||||
# know what they should do to reproduce test behaviour)
|
||||
if from_test:
|
||||
print(
|
||||
f'Sending SIGUSR1 to {cpid!r}!\n'
|
||||
)
|
||||
os.kill(
|
||||
cpid,
|
||||
signal.SIGUSR1,
|
||||
|
|
|
@ -30,7 +30,7 @@ from ..conftest import (
|
|||
@pytest.fixture
|
||||
def spawn(
|
||||
start_method,
|
||||
testdir: pytest.Testdir,
|
||||
testdir: pytest.Pytester,
|
||||
reg_addr: tuple[str, int],
|
||||
|
||||
) -> Callable[[str], None]:
|
||||
|
@ -44,16 +44,32 @@ def spawn(
|
|||
'`pexpect` based tests only supported on `trio` backend'
|
||||
)
|
||||
|
||||
def unset_colors():
|
||||
'''
|
||||
Python 3.13 introduced colored tracebacks that break patt
|
||||
matching,
|
||||
|
||||
https://docs.python.org/3/using/cmdline.html#envvar-PYTHON_COLORS
|
||||
https://docs.python.org/3/using/cmdline.html#using-on-controlling-color
|
||||
|
||||
'''
|
||||
import os
|
||||
os.environ['PYTHON_COLORS'] = '0'
|
||||
|
||||
def _spawn(
|
||||
cmd: str,
|
||||
**mkcmd_kwargs,
|
||||
):
|
||||
unset_colors()
|
||||
return testdir.spawn(
|
||||
cmd=mk_cmd(
|
||||
cmd,
|
||||
**mkcmd_kwargs,
|
||||
),
|
||||
expect_timeout=3,
|
||||
# preexec_fn=unset_colors,
|
||||
# ^TODO? get `pytest` core to expose underlying
|
||||
# `pexpect.spawn()` stuff?
|
||||
)
|
||||
|
||||
# such that test-dep can pass input script name.
|
||||
|
@ -83,6 +99,14 @@ def ctlc(
|
|||
'https://github.com/goodboy/tractor/issues/320'
|
||||
)
|
||||
|
||||
if mark.name == 'ctlcs_bish':
|
||||
pytest.skip(
|
||||
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
|
||||
f'The test and/or underlying example script can *sometimes* run fine '
|
||||
f'locally but more then likely until the cpython peeps get their sh#$ together, '
|
||||
f'this test will definitely not behave like `trio` under SIGINT..\n'
|
||||
)
|
||||
|
||||
if use_ctlc:
|
||||
# XXX: disable pygments highlighting for auto-tests
|
||||
# since some envs (like actions CI) will struggle
|
||||
|
|
|
@ -6,6 +6,9 @@ All these tests can be understood (somewhat) by running the
|
|||
equivalent `examples/debugging/` scripts manually.
|
||||
|
||||
'''
|
||||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
)
|
||||
# from functools import partial
|
||||
# import itertools
|
||||
import time
|
||||
|
@ -15,7 +18,7 @@ import time
|
|||
|
||||
import pytest
|
||||
from pexpect.exceptions import (
|
||||
# TIMEOUT,
|
||||
TIMEOUT,
|
||||
EOF,
|
||||
)
|
||||
|
||||
|
@ -32,7 +35,23 @@ from .conftest import (
|
|||
# _repl_fail_msg,
|
||||
)
|
||||
|
||||
@cm
|
||||
def maybe_expect_timeout(
|
||||
ctlc: bool = False,
|
||||
) -> None:
|
||||
try:
|
||||
yield
|
||||
except TIMEOUT:
|
||||
# breakpoint()
|
||||
if ctlc:
|
||||
pytest.xfail(
|
||||
'Some kinda redic threading SIGINT bug i think?\n'
|
||||
'See the notes in `examples/debugging/sync_bp.py`..\n'
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@pytest.mark.ctlcs_bish
|
||||
def test_pause_from_sync(
|
||||
spawn,
|
||||
ctlc: bool,
|
||||
|
@ -67,10 +86,10 @@ def test_pause_from_sync(
|
|||
child.expect(PROMPT)
|
||||
|
||||
# XXX shouldn't see gb loaded message with PDB loglevel!
|
||||
assert not in_prompt_msg(
|
||||
child,
|
||||
['`greenback` portal opened!'],
|
||||
)
|
||||
# assert not in_prompt_msg(
|
||||
# child,
|
||||
# ['`greenback` portal opened!'],
|
||||
# )
|
||||
# should be same root task
|
||||
assert_before(
|
||||
child,
|
||||
|
@ -162,7 +181,14 @@ def test_pause_from_sync(
|
|||
)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(EOF)
|
||||
|
||||
# XXX TODO, weird threading bug it seems despite the
|
||||
# `abandon_on_cancel: bool` setting to
|
||||
# `trio.to_thread.run_sync()`..
|
||||
with maybe_expect_timeout(
|
||||
ctlc=ctlc,
|
||||
):
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
def expect_any_of(
|
||||
|
@ -220,8 +246,10 @@ def expect_any_of(
|
|||
return expected_patts
|
||||
|
||||
|
||||
@pytest.mark.ctlcs_bish
|
||||
def test_sync_pause_from_aio_task(
|
||||
spawn,
|
||||
|
||||
ctlc: bool
|
||||
# ^TODO, fix for `asyncio`!!
|
||||
):
|
||||
|
@ -270,10 +298,12 @@ def test_sync_pause_from_aio_task(
|
|||
# error raised in `asyncio.Task`
|
||||
"raise ValueError('asyncio side error!')": [
|
||||
_crash_msg,
|
||||
'return await chan.receive()', # `.to_asyncio` impl internals in tb
|
||||
"<Task 'trio_ctx'",
|
||||
"@ ('aio_daemon'",
|
||||
"ValueError: asyncio side error!",
|
||||
|
||||
# XXX, we no longer show this frame by default!
|
||||
# 'return await chan.receive()', # `.to_asyncio` impl internals in tb
|
||||
],
|
||||
|
||||
# parent-side propagation via actor-nursery/portal
|
||||
|
@ -325,6 +355,7 @@ def test_sync_pause_from_aio_task(
|
|||
)
|
||||
|
||||
child.sendline('c')
|
||||
# with maybe_expect_timeout():
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ TODO:
|
|||
'''
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
|
||||
from .conftest import (
|
||||
expect,
|
||||
|
@ -53,41 +54,39 @@ def test_shield_pause(
|
|||
]
|
||||
)
|
||||
|
||||
script_pid: int = child.pid
|
||||
print(
|
||||
'Sending SIGUSR1 to see a tree-trace!',
|
||||
f'Sending SIGUSR1 to {script_pid}\n'
|
||||
f'(kill -s SIGUSR1 {script_pid})\n'
|
||||
)
|
||||
os.kill(
|
||||
child.pid,
|
||||
script_pid,
|
||||
signal.SIGUSR1,
|
||||
)
|
||||
time.sleep(0.2)
|
||||
expect(
|
||||
child,
|
||||
# end-of-tree delimiter
|
||||
"------ \('root', ",
|
||||
"end-of-\('root'",
|
||||
)
|
||||
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
'Trying to dump `stackscope` tree..',
|
||||
'Dumping `stackscope` tree for actor',
|
||||
# 'Srying to dump `stackscope` tree..',
|
||||
# 'Dumping `stackscope` tree for actor',
|
||||
"('root'", # uid line
|
||||
|
||||
# TODO!? this used to show?
|
||||
# -[ ] mk reproducable for @oremanj?
|
||||
#
|
||||
# parent block point (non-shielded)
|
||||
'await trio.sleep_forever() # in root',
|
||||
# 'await trio.sleep_forever() # in root',
|
||||
]
|
||||
)
|
||||
|
||||
# expect(
|
||||
# child,
|
||||
# # relay to the sub should be reported
|
||||
# 'Relaying `SIGUSR1`[10] to sub-actor',
|
||||
# )
|
||||
|
||||
expect(
|
||||
child,
|
||||
# end-of-tree delimiter
|
||||
"------ \('hanger', ",
|
||||
"end-of-\('hanger'",
|
||||
)
|
||||
assert_before(
|
||||
child,
|
||||
|
@ -97,11 +96,11 @@ def test_shield_pause(
|
|||
|
||||
"('hanger'", # uid line
|
||||
|
||||
# TODO!? SEE ABOVE
|
||||
# hanger LOC where it's shield-halted
|
||||
'await trio.sleep_forever() # in subactor',
|
||||
# 'await trio.sleep_forever() # in subactor',
|
||||
]
|
||||
)
|
||||
# breakpoint()
|
||||
|
||||
# simulate the user sending a ctl-c to the hanging program.
|
||||
# this should result in the terminator kicking in since
|
||||
|
|
|
@ -130,7 +130,7 @@ def test_multierror(
|
|||
try:
|
||||
await portal2.result()
|
||||
except tractor.RemoteActorError as err:
|
||||
assert err.boxed_type == AssertionError
|
||||
assert err.boxed_type is AssertionError
|
||||
print("Look Maa that first actor failed hard, hehh")
|
||||
raise
|
||||
|
||||
|
@ -182,7 +182,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
|
|||
|
||||
for exc in exceptions:
|
||||
assert isinstance(exc, tractor.RemoteActorError)
|
||||
assert exc.boxed_type == AssertionError
|
||||
assert exc.boxed_type is AssertionError
|
||||
|
||||
|
||||
async def do_nothing():
|
||||
|
@ -504,7 +504,9 @@ def test_cancel_via_SIGINT_other_task(
|
|||
if is_win(): # smh
|
||||
timeout += 1
|
||||
|
||||
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
|
||||
async def spawn_and_sleep_forever(
|
||||
task_status=trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
async with tractor.open_nursery() as tn:
|
||||
for i in range(3):
|
||||
await tn.run_in_actor(
|
||||
|
|
|
@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
|
|||
from tractor._testing import expect_ctxc
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='module',
|
||||
)
|
||||
def delay(debug_mode: bool) -> int:
|
||||
if debug_mode:
|
||||
return 999
|
||||
else:
|
||||
return 1
|
||||
|
||||
|
||||
async def sleep_and_err(
|
||||
sleep_for: float = 0.1,
|
||||
|
||||
|
@ -59,20 +69,26 @@ async def trio_cancels_single_aio_task():
|
|||
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||
|
||||
|
||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||
def test_trio_cancels_aio_on_actor_side(
|
||||
reg_addr: tuple[str, int],
|
||||
delay: int,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Spawn an infected actor that is cancelled by the ``trio`` side
|
||||
task using std cancel scope apis.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr]
|
||||
) as n:
|
||||
await n.run_in_actor(
|
||||
trio_cancels_single_aio_task,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
await an.run_in_actor(
|
||||
trio_cancels_single_aio_task,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
@ -116,7 +132,10 @@ async def asyncio_actor(
|
|||
raise
|
||||
|
||||
|
||||
def test_aio_simple_error(reg_addr):
|
||||
def test_aio_simple_error(
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify a simple remote asyncio error propagates back through trio
|
||||
to the parent actor.
|
||||
|
@ -125,9 +144,10 @@ def test_aio_simple_error(reg_addr):
|
|||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr]
|
||||
) as n:
|
||||
await n.run_in_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
await an.run_in_actor(
|
||||
asyncio_actor,
|
||||
target='sleep_and_err',
|
||||
expect_err='AssertionError',
|
||||
|
@ -153,14 +173,19 @@ def test_aio_simple_error(reg_addr):
|
|||
assert err.boxed_type is AssertionError
|
||||
|
||||
|
||||
def test_tractor_cancels_aio(reg_addr):
|
||||
def test_tractor_cancels_aio(
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify we can cancel a spawned asyncio task gracefully.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
asyncio_actor,
|
||||
target='aio_sleep_forever',
|
||||
expect_err='trio.Cancelled',
|
||||
|
@ -172,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
|
|||
trio.run(main)
|
||||
|
||||
|
||||
def test_trio_cancels_aio(reg_addr):
|
||||
def test_trio_cancels_aio(
|
||||
reg_addr: tuple[str, int],
|
||||
):
|
||||
'''
|
||||
Much like the above test with ``tractor.Portal.cancel_actor()``
|
||||
except we just use a standard ``trio`` cancellation api.
|
||||
|
@ -203,7 +230,8 @@ async def trio_ctx(
|
|||
|
||||
# this will block until the ``asyncio`` task sends a "first"
|
||||
# message.
|
||||
with trio.fail_after(2):
|
||||
delay: int = 999 if tractor.debug_mode() else 1
|
||||
with trio.fail_after(1 + delay):
|
||||
try:
|
||||
async with (
|
||||
trio.open_nursery(
|
||||
|
@ -239,8 +267,10 @@ async def trio_ctx(
|
|||
ids='parent_actor_cancels_child={}'.format
|
||||
)
|
||||
def test_context_spawns_aio_task_that_errors(
|
||||
reg_addr,
|
||||
reg_addr: tuple[str, int],
|
||||
delay: int,
|
||||
parent_cancels: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that spawning a task via an intertask channel ctx mngr that
|
||||
|
@ -249,13 +279,13 @@ def test_context_spawns_aio_task_that_errors(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
async with tractor.open_nursery() as n:
|
||||
p = await n.start_actor(
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery() as an:
|
||||
p = await an.start_actor(
|
||||
'aio_daemon',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
# debug_mode=True,
|
||||
debug_mode=debug_mode,
|
||||
loglevel='cancel',
|
||||
)
|
||||
async with (
|
||||
|
@ -322,11 +352,12 @@ async def aio_cancel():
|
|||
|
||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||
reg_addr: tuple,
|
||||
delay: int,
|
||||
):
|
||||
'''
|
||||
When the `asyncio.Task` cancels itself the `trio` side cshould
|
||||
When the `asyncio.Task` cancels itself the `trio` side should
|
||||
also cancel and teardown and relay the cancellation cross-process
|
||||
to the caller (parent).
|
||||
to the parent caller.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
@ -342,7 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
|||
# NOTE: normally the `an.__aexit__()` waits on the
|
||||
# portal's result but we do it explicitly here
|
||||
# to avoid indent levels.
|
||||
with trio.fail_after(1):
|
||||
with trio.fail_after(1 + delay):
|
||||
await p.wait_for_result()
|
||||
|
||||
with pytest.raises(
|
||||
|
@ -353,11 +384,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
|||
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||
err: RemoteActorError|ExceptionGroup = excinfo.value
|
||||
if isinstance(err, ExceptionGroup):
|
||||
err = next(itertools.dropwhile(
|
||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
||||
err.exceptions
|
||||
))
|
||||
assert err
|
||||
excs = err.exceptions
|
||||
assert len(excs) == 1
|
||||
final_exc = excs[0]
|
||||
assert isinstance(final_exc, tractor.RemoteActorError)
|
||||
|
||||
# relayed boxed error should be our `trio`-task's
|
||||
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
||||
|
@ -370,15 +400,18 @@ async def no_to_trio_in_args():
|
|||
|
||||
|
||||
async def push_from_aio_task(
|
||||
|
||||
sequence: Iterable,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
expect_cancel: False,
|
||||
fail_early: bool,
|
||||
exit_early: bool,
|
||||
|
||||
) -> None:
|
||||
|
||||
try:
|
||||
# print('trying breakpoint')
|
||||
# breakpoint()
|
||||
|
||||
# sync caller ctx manager
|
||||
to_trio.send_nowait(True)
|
||||
|
||||
|
@ -387,10 +420,27 @@ async def push_from_aio_task(
|
|||
to_trio.send_nowait(i)
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
if i == 50 and fail_early:
|
||||
raise Exception
|
||||
if (
|
||||
i == 50
|
||||
):
|
||||
if fail_early:
|
||||
print('Raising exc from aio side!')
|
||||
raise Exception
|
||||
|
||||
print('asyncio streamer complete!')
|
||||
if exit_early:
|
||||
# TODO? really you could enforce the same
|
||||
# SC-proto we use for actors here with asyncio
|
||||
# such that a Return[None] msg would be
|
||||
# implicitly delivered to the trio side?
|
||||
#
|
||||
# XXX => this might be the end-all soln for
|
||||
# converting any-inter-task system (regardless
|
||||
# of maybe-remote runtime or language) to be
|
||||
# SC-compat no?
|
||||
print(f'asyncio breaking early @ {i!r}')
|
||||
break
|
||||
|
||||
print('asyncio streaming complete!')
|
||||
|
||||
except asyncio.CancelledError:
|
||||
if not expect_cancel:
|
||||
|
@ -402,9 +452,10 @@ async def push_from_aio_task(
|
|||
|
||||
|
||||
async def stream_from_aio(
|
||||
exit_early: bool = False,
|
||||
raise_err: bool = False,
|
||||
trio_exit_early: bool = False,
|
||||
trio_raise_err: bool = False,
|
||||
aio_raise_err: bool = False,
|
||||
aio_exit_early: bool = False,
|
||||
fan_out: bool = False,
|
||||
|
||||
) -> None:
|
||||
|
@ -417,8 +468,17 @@ async def stream_from_aio(
|
|||
async with to_asyncio.open_channel_from(
|
||||
push_from_aio_task,
|
||||
sequence=seq,
|
||||
expect_cancel=raise_err or exit_early,
|
||||
expect_cancel=trio_raise_err or trio_exit_early,
|
||||
fail_early=aio_raise_err,
|
||||
exit_early=aio_exit_early,
|
||||
|
||||
# such that we can test exit early cases
|
||||
# for each side explicitly.
|
||||
suppress_graceful_exits=(not(
|
||||
aio_exit_early
|
||||
or
|
||||
trio_exit_early
|
||||
))
|
||||
|
||||
) as (first, chan):
|
||||
|
||||
|
@ -431,13 +491,19 @@ async def stream_from_aio(
|
|||
],
|
||||
):
|
||||
async for value in chan:
|
||||
print(f'trio received {value}')
|
||||
print(f'trio received: {value!r}')
|
||||
|
||||
# XXX, debugging EoC not being handled correctly
|
||||
# in `transate_aio_errors()`..
|
||||
# if value is None:
|
||||
# await tractor.pause(shield=True)
|
||||
|
||||
pulled.append(value)
|
||||
|
||||
if value == 50:
|
||||
if raise_err:
|
||||
if trio_raise_err:
|
||||
raise Exception
|
||||
elif exit_early:
|
||||
elif trio_exit_early:
|
||||
print('`consume()` breaking early!\n')
|
||||
break
|
||||
|
||||
|
@ -454,11 +520,11 @@ async def stream_from_aio(
|
|||
# tasks are joined..
|
||||
chan.subscribe() as br,
|
||||
|
||||
trio.open_nursery() as n,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# start 2nd task that get's broadcast the same
|
||||
# value set.
|
||||
n.start_soon(consume, br)
|
||||
tn.start_soon(consume, br)
|
||||
await consume(chan)
|
||||
|
||||
else:
|
||||
|
@ -471,10 +537,14 @@ async def stream_from_aio(
|
|||
|
||||
finally:
|
||||
|
||||
if (
|
||||
not raise_err and
|
||||
not exit_early and
|
||||
not aio_raise_err
|
||||
if not (
|
||||
trio_raise_err
|
||||
or
|
||||
trio_exit_early
|
||||
or
|
||||
aio_raise_err
|
||||
or
|
||||
aio_exit_early
|
||||
):
|
||||
if fan_out:
|
||||
# we get double the pulled values in the
|
||||
|
@ -484,6 +554,7 @@ async def stream_from_aio(
|
|||
assert list(sorted(pulled)) == expect
|
||||
|
||||
else:
|
||||
# await tractor.pause()
|
||||
assert pulled == expect
|
||||
else:
|
||||
assert not fan_out
|
||||
|
@ -497,10 +568,13 @@ async def stream_from_aio(
|
|||
'fan_out', [False, True],
|
||||
ids='fan_out_w_chan_subscribe={}'.format
|
||||
)
|
||||
def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||
def test_basic_interloop_channel_stream(
|
||||
reg_addr: tuple[str, int],
|
||||
fan_out: bool,
|
||||
):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
infect_asyncio=True,
|
||||
fan_out=fan_out,
|
||||
|
@ -514,10 +588,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
|||
# TODO: parametrize the above test and avoid the duplication here?
|
||||
def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
raise_err=True,
|
||||
trio_raise_err=True,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
# should trigger remote actor error
|
||||
|
@ -530,43 +604,116 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
|||
excinfo.value.boxed_type is Exception
|
||||
|
||||
|
||||
def test_trio_closes_early_and_channel_exits(
|
||||
def test_trio_closes_early_causes_aio_checkpoint_raise(
|
||||
reg_addr: tuple[str, int],
|
||||
delay: int,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Check that if the `trio`-task "exits early" on `async for`ing the
|
||||
inter-task-channel (via a `break`) we exit silently from the
|
||||
`open_channel_from()` block and get a final `Return[None]` msg.
|
||||
Check that if the `trio`-task "exits early and silently" (in this
|
||||
case during `async for`-ing the inter-task-channel via
|
||||
a `break`-from-loop), we raise `TrioTaskExited` on the
|
||||
`asyncio`-side which also then bubbles up through the
|
||||
`open_channel_from()` block indicating that the `asyncio.Task`
|
||||
hit a ran another checkpoint despite the `trio.Task` exit.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery(
|
||||
# debug_mode=True,
|
||||
debug_mode=debug_mode,
|
||||
# enable_stack_on_sig=True,
|
||||
) as n:
|
||||
portal = await n.run_in_actor(
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
exit_early=True,
|
||||
trio_exit_early=True,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
# should raise RAE diectly
|
||||
print('waiting on final infected subactor result..')
|
||||
res: None = await portal.wait_for_result()
|
||||
assert res is None
|
||||
print('infected subactor returned result: {res!r}\n')
|
||||
print(f'infected subactor returned result: {res!r}\n')
|
||||
|
||||
# should be a quiet exit on a simple channel exit
|
||||
trio.run(
|
||||
main,
|
||||
# strict_exception_groups=False,
|
||||
)
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||
# which indicates to the aio task that the trio side exited
|
||||
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||
# normally be raised instead as a `AsyncioCancelled`).
|
||||
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
|
||||
|
||||
|
||||
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||
def test_aio_exits_early_relays_AsyncioTaskExited(
|
||||
# TODO, parametrize the 3 possible trio side conditions:
|
||||
# - trio blocking on receive, aio exits early
|
||||
# - trio cancelled AND aio exits early on its next tick
|
||||
# - trio errors AND aio exits early on its next tick
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
delay: int,
|
||||
):
|
||||
'''
|
||||
Check that if the `asyncio`-task "exits early and silently" (in this
|
||||
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
|
||||
it `break`s from the loop), we raise `AsyncioTaskExited` on the
|
||||
`trio`-side which then DOES NOT BUBBLE up through the
|
||||
`open_channel_from()` block UNLESS,
|
||||
|
||||
- the trio.Task also errored/cancelled, in which case we wrap
|
||||
both errors in an eg
|
||||
- the trio.Task was blocking on rxing a value from the
|
||||
`InterLoopTaskChannel`.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
# enable_stack_on_sig=True,
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
infect_asyncio=True,
|
||||
trio_exit_early=False,
|
||||
aio_exit_early=True,
|
||||
)
|
||||
# should raise RAE diectly
|
||||
print('waiting on final infected subactor result..')
|
||||
res: None = await portal.wait_for_result()
|
||||
assert res is None
|
||||
print(f'infected subactor returned result: {res!r}\n')
|
||||
|
||||
# should be a quiet exit on a simple channel exit
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
exc = excinfo.value
|
||||
|
||||
# TODO, wow bug!
|
||||
# -[ ] bp handler not replaced!?!?
|
||||
# breakpoint()
|
||||
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||
# which indicates to the aio task that the trio side exited
|
||||
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||
# normally be raised instead as a `AsyncioCancelled`).
|
||||
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
|
||||
|
||||
|
||||
def test_aio_errors_and_channel_propagates_and_closes(
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
):
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
aio_raise_err=True,
|
||||
infect_asyncio=True,
|
||||
|
@ -592,7 +739,13 @@ async def aio_echo_server(
|
|||
to_trio.send_nowait('start')
|
||||
|
||||
while True:
|
||||
msg = await from_trio.get()
|
||||
try:
|
||||
msg = await from_trio.get()
|
||||
except to_asyncio.TrioTaskExited:
|
||||
print(
|
||||
'breaking aio echo loop due to `trio` exit!'
|
||||
)
|
||||
break
|
||||
|
||||
# echo the msg back
|
||||
to_trio.send_nowait(msg)
|
||||
|
@ -641,13 +794,15 @@ async def trio_to_aio_echo_server(
|
|||
ids='raise_error={}'.format,
|
||||
)
|
||||
def test_echoserver_detailed_mechanics(
|
||||
reg_addr,
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
raise_error_mid_stream,
|
||||
):
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
p = await n.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
p = await an.start_actor(
|
||||
'aio_server',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
|
@ -852,6 +1007,8 @@ def test_sigint_closes_lifetime_stack(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
delay = 999 if tractor.debug_mode() else 1
|
||||
try:
|
||||
an: tractor.ActorNursery
|
||||
async with tractor.open_nursery(
|
||||
|
@ -902,7 +1059,7 @@ def test_sigint_closes_lifetime_stack(
|
|||
if wait_for_ctx:
|
||||
print('waiting for ctx outcome in parent..')
|
||||
try:
|
||||
with trio.fail_after(1):
|
||||
with trio.fail_after(1 + delay):
|
||||
await ctx.wait_for_result()
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
assert ctxc.canceller == ctx.chan.uid
|
||||
|
|
|
@ -170,7 +170,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
|
|||
trio.run(main)
|
||||
|
||||
rae = excinfo.value
|
||||
assert rae.boxed_type == TypeError
|
||||
assert rae.boxed_type is TypeError
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
|
@ -39,7 +39,7 @@ def test_infected_root_actor(
|
|||
|
||||
'''
|
||||
async def _trio_main():
|
||||
with trio.fail_after(2):
|
||||
with trio.fail_after(2 if not debug_mode else 999):
|
||||
first: str
|
||||
chan: to_asyncio.LinkedTaskChannel
|
||||
async with (
|
||||
|
@ -59,7 +59,11 @@ def test_infected_root_actor(
|
|||
assert out == i
|
||||
print(f'asyncio echoing {i}')
|
||||
|
||||
if raise_error_mid_stream and i == 500:
|
||||
if (
|
||||
raise_error_mid_stream
|
||||
and
|
||||
i == 500
|
||||
):
|
||||
raise raise_error_mid_stream
|
||||
|
||||
if out is None:
|
||||
|
|
|
@ -82,6 +82,39 @@ class InternalError(RuntimeError):
|
|||
|
||||
'''
|
||||
|
||||
class AsyncioCancelled(Exception):
|
||||
'''
|
||||
Asyncio cancelled translation (non-base) error
|
||||
for use with the ``to_asyncio`` module
|
||||
to be raised in the ``trio`` side task
|
||||
|
||||
NOTE: this should NOT inherit from `asyncio.CancelledError` or
|
||||
tests should break!
|
||||
|
||||
'''
|
||||
|
||||
|
||||
class AsyncioTaskExited(Exception):
|
||||
'''
|
||||
asyncio.Task "exited" translation error for use with the
|
||||
`to_asyncio` APIs to be raised in the `trio` side task indicating
|
||||
on `.run_task()`/`.open_channel_from()` exit that the aio side
|
||||
exited early/silently.
|
||||
|
||||
'''
|
||||
|
||||
class TrioTaskExited(AsyncioCancelled):
|
||||
'''
|
||||
The `trio`-side task exited without explicitly cancelling the
|
||||
`asyncio.Task` peer.
|
||||
|
||||
This is very similar to how `trio.ClosedResource` acts as
|
||||
a "clean shutdown" signal to the consumer side of a mem-chan,
|
||||
|
||||
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
|
||||
|
||||
'''
|
||||
|
||||
|
||||
# NOTE: more or less should be close to these:
|
||||
# 'boxed_type',
|
||||
|
@ -127,8 +160,8 @@ _body_fields: list[str] = list(
|
|||
|
||||
def get_err_type(type_name: str) -> BaseException|None:
|
||||
'''
|
||||
Look up an exception type by name from the set of locally
|
||||
known namespaces:
|
||||
Look up an exception type by name from the set of locally known
|
||||
namespaces:
|
||||
|
||||
- `builtins`
|
||||
- `tractor._exceptions`
|
||||
|
@ -358,6 +391,13 @@ class RemoteActorError(Exception):
|
|||
self._ipc_msg.src_type_str
|
||||
)
|
||||
|
||||
if not self._src_type:
|
||||
raise TypeError(
|
||||
f'Failed to lookup src error type with '
|
||||
f'`tractor._exceptions.get_err_type()` :\n'
|
||||
f'{self.src_type_str}'
|
||||
)
|
||||
|
||||
return self._src_type
|
||||
|
||||
@property
|
||||
|
@ -652,16 +692,10 @@ class RemoteActorError(Exception):
|
|||
failing actor's remote env.
|
||||
|
||||
'''
|
||||
src_type_ref: Type[BaseException] = self.src_type
|
||||
if not src_type_ref:
|
||||
raise TypeError(
|
||||
'Failed to lookup src error type:\n'
|
||||
f'{self.src_type_str}'
|
||||
)
|
||||
|
||||
# TODO: better tb insertion and all the fancier dunder
|
||||
# metadata stuff as per `.__context__` etc. and friends:
|
||||
# https://github.com/python-trio/trio/issues/611
|
||||
src_type_ref: Type[BaseException] = self.src_type
|
||||
return src_type_ref(self.tb_str)
|
||||
|
||||
# TODO: local recontruction of nested inception for a given
|
||||
|
|
|
@ -35,6 +35,7 @@ from signal import (
|
|||
signal,
|
||||
getsignal,
|
||||
SIGUSR1,
|
||||
SIGINT,
|
||||
)
|
||||
# import traceback
|
||||
from types import ModuleType
|
||||
|
@ -48,6 +49,7 @@ from tractor import (
|
|||
_state,
|
||||
log as logmod,
|
||||
)
|
||||
from tractor.devx import _debug
|
||||
|
||||
log = logmod.get_logger(__name__)
|
||||
|
||||
|
@ -76,22 +78,45 @@ def dump_task_tree() -> None:
|
|||
)
|
||||
actor: Actor = _state.current_actor()
|
||||
thr: Thread = current_thread()
|
||||
current_sigint_handler: Callable = getsignal(SIGINT)
|
||||
if (
|
||||
current_sigint_handler
|
||||
is not
|
||||
_debug.DebugStatus._trio_handler
|
||||
):
|
||||
sigint_handler_report: str = (
|
||||
'The default `trio` SIGINT handler was replaced?!'
|
||||
)
|
||||
else:
|
||||
sigint_handler_report: str = (
|
||||
'The default `trio` SIGINT handler is in use?!'
|
||||
)
|
||||
|
||||
# sclang symbology
|
||||
# |_<object>
|
||||
# |_(Task/Thread/Process/Actor
|
||||
# |_{Supervisor/Scope
|
||||
# |_[Storage/Memory/IPC-Stream/Data-Struct
|
||||
|
||||
log.devx(
|
||||
f'Dumping `stackscope` tree for actor\n'
|
||||
f'{actor.uid}:\n'
|
||||
f'|_{mp.current_process()}\n'
|
||||
f' |_{thr}\n'
|
||||
f' |_{actor}\n\n'
|
||||
|
||||
# start-of-trace-tree delimiter (mostly for testing)
|
||||
'------ - ------\n'
|
||||
'\n'
|
||||
+
|
||||
f'{tree_str}\n'
|
||||
+
|
||||
# end-of-trace-tree delimiter (mostly for testing)
|
||||
f'(>: {actor.uid!r}\n'
|
||||
f' |_{mp.current_process()}\n'
|
||||
f' |_{thr}\n'
|
||||
f' |_{actor}\n'
|
||||
f'\n'
|
||||
f'------ {actor.uid!r} ------\n'
|
||||
f'{sigint_handler_report}\n'
|
||||
f'signal.getsignal(SIGINT) -> {current_sigint_handler!r}\n'
|
||||
# f'\n'
|
||||
# start-of-trace-tree delimiter (mostly for testing)
|
||||
# f'------ {actor.uid!r} ------\n'
|
||||
f'\n'
|
||||
f'------ start-of-{actor.uid!r} ------\n'
|
||||
f'|\n'
|
||||
f'{tree_str}'
|
||||
# end-of-trace-tree delimiter (mostly for testing)
|
||||
f'|\n'
|
||||
f'|_____ end-of-{actor.uid!r} ______\n'
|
||||
)
|
||||
# TODO: can remove this right?
|
||||
# -[ ] was original code from author
|
||||
|
@ -123,11 +148,11 @@ def dump_tree_on_sig(
|
|||
) -> None:
|
||||
global _tree_dumped, _handler_lock
|
||||
with _handler_lock:
|
||||
if _tree_dumped:
|
||||
log.warning(
|
||||
'Already dumped for this actor...??'
|
||||
)
|
||||
return
|
||||
# if _tree_dumped:
|
||||
# log.warning(
|
||||
# 'Already dumped for this actor...??'
|
||||
# )
|
||||
# return
|
||||
|
||||
_tree_dumped = True
|
||||
|
||||
|
@ -161,9 +186,9 @@ def dump_tree_on_sig(
|
|||
)
|
||||
raise
|
||||
|
||||
log.devx(
|
||||
'Supposedly we dumped just fine..?'
|
||||
)
|
||||
# log.devx(
|
||||
# 'Supposedly we dumped just fine..?'
|
||||
# )
|
||||
|
||||
if not relay_to_subs:
|
||||
return
|
||||
|
@ -202,11 +227,11 @@ def enable_stack_on_sig(
|
|||
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
|
||||
you could use:
|
||||
|
||||
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
|
||||
>> kill -SIGUSR1 $(pgrep -f <part-of-cmd: str>)
|
||||
|
||||
Or with with `xonsh` (which has diff capture-from-subproc syntax)
|
||||
OR without a sub-shell,
|
||||
|
||||
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
||||
>> pkill --signal SIGUSR1 -f <part-of-cmd: str>
|
||||
|
||||
'''
|
||||
try:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue