Compare commits
9 Commits
e8111e40f9
...
e646ce5c0d
Author | SHA1 | Date |
---|---|---|
|
e646ce5c0d | |
|
b6d800954a | |
|
beb7097ab4 | |
|
724c22d266 | |
|
ecd61226d8 | |
|
69fd46e1ce | |
|
af660c1019 | |
|
34e9e529d2 | |
|
816b82f9fe |
|
@ -39,7 +39,6 @@ async def main(
|
||||||
loglevel='devx',
|
loglevel='devx',
|
||||||
) as an,
|
) as an,
|
||||||
):
|
):
|
||||||
|
|
||||||
ptl: tractor.Portal = await an.start_actor(
|
ptl: tractor.Portal = await an.start_actor(
|
||||||
'hanger',
|
'hanger',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
|
@ -54,13 +53,16 @@ async def main(
|
||||||
|
|
||||||
print(
|
print(
|
||||||
'Yo my child hanging..?\n'
|
'Yo my child hanging..?\n'
|
||||||
'Sending SIGUSR1 to see a tree-trace!\n'
|
# "i'm a user who wants to see a `stackscope` tree!\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX simulate the wrapping test's "user actions"
|
# XXX simulate the wrapping test's "user actions"
|
||||||
# (i.e. if a human didn't run this manually but wants to
|
# (i.e. if a human didn't run this manually but wants to
|
||||||
# know what they should do to reproduce test behaviour)
|
# know what they should do to reproduce test behaviour)
|
||||||
if from_test:
|
if from_test:
|
||||||
|
print(
|
||||||
|
f'Sending SIGUSR1 to {cpid!r}!\n'
|
||||||
|
)
|
||||||
os.kill(
|
os.kill(
|
||||||
cpid,
|
cpid,
|
||||||
signal.SIGUSR1,
|
signal.SIGUSR1,
|
||||||
|
|
|
@ -30,7 +30,7 @@ from ..conftest import (
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def spawn(
|
def spawn(
|
||||||
start_method,
|
start_method,
|
||||||
testdir: pytest.Testdir,
|
testdir: pytest.Pytester,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
|
|
||||||
) -> Callable[[str], None]:
|
) -> Callable[[str], None]:
|
||||||
|
@ -44,16 +44,32 @@ def spawn(
|
||||||
'`pexpect` based tests only supported on `trio` backend'
|
'`pexpect` based tests only supported on `trio` backend'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def unset_colors():
|
||||||
|
'''
|
||||||
|
Python 3.13 introduced colored tracebacks that break patt
|
||||||
|
matching,
|
||||||
|
|
||||||
|
https://docs.python.org/3/using/cmdline.html#envvar-PYTHON_COLORS
|
||||||
|
https://docs.python.org/3/using/cmdline.html#using-on-controlling-color
|
||||||
|
|
||||||
|
'''
|
||||||
|
import os
|
||||||
|
os.environ['PYTHON_COLORS'] = '0'
|
||||||
|
|
||||||
def _spawn(
|
def _spawn(
|
||||||
cmd: str,
|
cmd: str,
|
||||||
**mkcmd_kwargs,
|
**mkcmd_kwargs,
|
||||||
):
|
):
|
||||||
|
unset_colors()
|
||||||
return testdir.spawn(
|
return testdir.spawn(
|
||||||
cmd=mk_cmd(
|
cmd=mk_cmd(
|
||||||
cmd,
|
cmd,
|
||||||
**mkcmd_kwargs,
|
**mkcmd_kwargs,
|
||||||
),
|
),
|
||||||
expect_timeout=3,
|
expect_timeout=3,
|
||||||
|
# preexec_fn=unset_colors,
|
||||||
|
# ^TODO? get `pytest` core to expose underlying
|
||||||
|
# `pexpect.spawn()` stuff?
|
||||||
)
|
)
|
||||||
|
|
||||||
# such that test-dep can pass input script name.
|
# such that test-dep can pass input script name.
|
||||||
|
@ -83,6 +99,14 @@ def ctlc(
|
||||||
'https://github.com/goodboy/tractor/issues/320'
|
'https://github.com/goodboy/tractor/issues/320'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if mark.name == 'ctlcs_bish':
|
||||||
|
pytest.skip(
|
||||||
|
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
|
||||||
|
f'The test and/or underlying example script can *sometimes* run fine '
|
||||||
|
f'locally but more then likely until the cpython peeps get their sh#$ together, '
|
||||||
|
f'this test will definitely not behave like `trio` under SIGINT..\n'
|
||||||
|
)
|
||||||
|
|
||||||
if use_ctlc:
|
if use_ctlc:
|
||||||
# XXX: disable pygments highlighting for auto-tests
|
# XXX: disable pygments highlighting for auto-tests
|
||||||
# since some envs (like actions CI) will struggle
|
# since some envs (like actions CI) will struggle
|
||||||
|
|
|
@ -6,6 +6,9 @@ All these tests can be understood (somewhat) by running the
|
||||||
equivalent `examples/debugging/` scripts manually.
|
equivalent `examples/debugging/` scripts manually.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from contextlib import (
|
||||||
|
contextmanager as cm,
|
||||||
|
)
|
||||||
# from functools import partial
|
# from functools import partial
|
||||||
# import itertools
|
# import itertools
|
||||||
import time
|
import time
|
||||||
|
@ -15,7 +18,7 @@ import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from pexpect.exceptions import (
|
from pexpect.exceptions import (
|
||||||
# TIMEOUT,
|
TIMEOUT,
|
||||||
EOF,
|
EOF,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -32,7 +35,23 @@ from .conftest import (
|
||||||
# _repl_fail_msg,
|
# _repl_fail_msg,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def maybe_expect_timeout(
|
||||||
|
ctlc: bool = False,
|
||||||
|
) -> None:
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except TIMEOUT:
|
||||||
|
# breakpoint()
|
||||||
|
if ctlc:
|
||||||
|
pytest.xfail(
|
||||||
|
'Some kinda redic threading SIGINT bug i think?\n'
|
||||||
|
'See the notes in `examples/debugging/sync_bp.py`..\n'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.ctlcs_bish
|
||||||
def test_pause_from_sync(
|
def test_pause_from_sync(
|
||||||
spawn,
|
spawn,
|
||||||
ctlc: bool,
|
ctlc: bool,
|
||||||
|
@ -67,10 +86,10 @@ def test_pause_from_sync(
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
|
|
||||||
# XXX shouldn't see gb loaded message with PDB loglevel!
|
# XXX shouldn't see gb loaded message with PDB loglevel!
|
||||||
assert not in_prompt_msg(
|
# assert not in_prompt_msg(
|
||||||
child,
|
# child,
|
||||||
['`greenback` portal opened!'],
|
# ['`greenback` portal opened!'],
|
||||||
)
|
# )
|
||||||
# should be same root task
|
# should be same root task
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
|
@ -162,7 +181,14 @@ def test_pause_from_sync(
|
||||||
)
|
)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(EOF)
|
|
||||||
|
# XXX TODO, weird threading bug it seems despite the
|
||||||
|
# `abandon_on_cancel: bool` setting to
|
||||||
|
# `trio.to_thread.run_sync()`..
|
||||||
|
with maybe_expect_timeout(
|
||||||
|
ctlc=ctlc,
|
||||||
|
):
|
||||||
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
def expect_any_of(
|
def expect_any_of(
|
||||||
|
@ -220,8 +246,10 @@ def expect_any_of(
|
||||||
return expected_patts
|
return expected_patts
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.ctlcs_bish
|
||||||
def test_sync_pause_from_aio_task(
|
def test_sync_pause_from_aio_task(
|
||||||
spawn,
|
spawn,
|
||||||
|
|
||||||
ctlc: bool
|
ctlc: bool
|
||||||
# ^TODO, fix for `asyncio`!!
|
# ^TODO, fix for `asyncio`!!
|
||||||
):
|
):
|
||||||
|
@ -270,10 +298,12 @@ def test_sync_pause_from_aio_task(
|
||||||
# error raised in `asyncio.Task`
|
# error raised in `asyncio.Task`
|
||||||
"raise ValueError('asyncio side error!')": [
|
"raise ValueError('asyncio side error!')": [
|
||||||
_crash_msg,
|
_crash_msg,
|
||||||
'return await chan.receive()', # `.to_asyncio` impl internals in tb
|
|
||||||
"<Task 'trio_ctx'",
|
"<Task 'trio_ctx'",
|
||||||
"@ ('aio_daemon'",
|
"@ ('aio_daemon'",
|
||||||
"ValueError: asyncio side error!",
|
"ValueError: asyncio side error!",
|
||||||
|
|
||||||
|
# XXX, we no longer show this frame by default!
|
||||||
|
# 'return await chan.receive()', # `.to_asyncio` impl internals in tb
|
||||||
],
|
],
|
||||||
|
|
||||||
# parent-side propagation via actor-nursery/portal
|
# parent-side propagation via actor-nursery/portal
|
||||||
|
@ -325,6 +355,7 @@ def test_sync_pause_from_aio_task(
|
||||||
)
|
)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
# with maybe_expect_timeout():
|
||||||
child.expect(EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ TODO:
|
||||||
'''
|
'''
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
import time
|
||||||
|
|
||||||
from .conftest import (
|
from .conftest import (
|
||||||
expect,
|
expect,
|
||||||
|
@ -53,41 +54,39 @@ def test_shield_pause(
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
script_pid: int = child.pid
|
||||||
print(
|
print(
|
||||||
'Sending SIGUSR1 to see a tree-trace!',
|
f'Sending SIGUSR1 to {script_pid}\n'
|
||||||
|
f'(kill -s SIGUSR1 {script_pid})\n'
|
||||||
)
|
)
|
||||||
os.kill(
|
os.kill(
|
||||||
child.pid,
|
script_pid,
|
||||||
signal.SIGUSR1,
|
signal.SIGUSR1,
|
||||||
)
|
)
|
||||||
|
time.sleep(0.2)
|
||||||
expect(
|
expect(
|
||||||
child,
|
child,
|
||||||
# end-of-tree delimiter
|
# end-of-tree delimiter
|
||||||
"------ \('root', ",
|
"end-of-\('root'",
|
||||||
)
|
)
|
||||||
|
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
[
|
[
|
||||||
'Trying to dump `stackscope` tree..',
|
# 'Srying to dump `stackscope` tree..',
|
||||||
'Dumping `stackscope` tree for actor',
|
# 'Dumping `stackscope` tree for actor',
|
||||||
"('root'", # uid line
|
"('root'", # uid line
|
||||||
|
|
||||||
|
# TODO!? this used to show?
|
||||||
|
# -[ ] mk reproducable for @oremanj?
|
||||||
|
#
|
||||||
# parent block point (non-shielded)
|
# parent block point (non-shielded)
|
||||||
'await trio.sleep_forever() # in root',
|
# 'await trio.sleep_forever() # in root',
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
# expect(
|
|
||||||
# child,
|
|
||||||
# # relay to the sub should be reported
|
|
||||||
# 'Relaying `SIGUSR1`[10] to sub-actor',
|
|
||||||
# )
|
|
||||||
|
|
||||||
expect(
|
expect(
|
||||||
child,
|
child,
|
||||||
# end-of-tree delimiter
|
# end-of-tree delimiter
|
||||||
"------ \('hanger', ",
|
"end-of-\('hanger'",
|
||||||
)
|
)
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
|
@ -97,11 +96,11 @@ def test_shield_pause(
|
||||||
|
|
||||||
"('hanger'", # uid line
|
"('hanger'", # uid line
|
||||||
|
|
||||||
|
# TODO!? SEE ABOVE
|
||||||
# hanger LOC where it's shield-halted
|
# hanger LOC where it's shield-halted
|
||||||
'await trio.sleep_forever() # in subactor',
|
# 'await trio.sleep_forever() # in subactor',
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
# breakpoint()
|
|
||||||
|
|
||||||
# simulate the user sending a ctl-c to the hanging program.
|
# simulate the user sending a ctl-c to the hanging program.
|
||||||
# this should result in the terminator kicking in since
|
# this should result in the terminator kicking in since
|
||||||
|
|
|
@ -130,7 +130,7 @@ def test_multierror(
|
||||||
try:
|
try:
|
||||||
await portal2.result()
|
await portal2.result()
|
||||||
except tractor.RemoteActorError as err:
|
except tractor.RemoteActorError as err:
|
||||||
assert err.boxed_type == AssertionError
|
assert err.boxed_type is AssertionError
|
||||||
print("Look Maa that first actor failed hard, hehh")
|
print("Look Maa that first actor failed hard, hehh")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
|
||||||
|
|
||||||
for exc in exceptions:
|
for exc in exceptions:
|
||||||
assert isinstance(exc, tractor.RemoteActorError)
|
assert isinstance(exc, tractor.RemoteActorError)
|
||||||
assert exc.boxed_type == AssertionError
|
assert exc.boxed_type is AssertionError
|
||||||
|
|
||||||
|
|
||||||
async def do_nothing():
|
async def do_nothing():
|
||||||
|
@ -504,7 +504,9 @@ def test_cancel_via_SIGINT_other_task(
|
||||||
if is_win(): # smh
|
if is_win(): # smh
|
||||||
timeout += 1
|
timeout += 1
|
||||||
|
|
||||||
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
|
async def spawn_and_sleep_forever(
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
|
):
|
||||||
async with tractor.open_nursery() as tn:
|
async with tractor.open_nursery() as tn:
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
await tn.run_in_actor(
|
await tn.run_in_actor(
|
||||||
|
|
|
@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
|
||||||
from tractor._testing import expect_ctxc
|
from tractor._testing import expect_ctxc
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope='module',
|
||||||
|
)
|
||||||
|
def delay(debug_mode: bool) -> int:
|
||||||
|
if debug_mode:
|
||||||
|
return 999
|
||||||
|
else:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
async def sleep_and_err(
|
async def sleep_and_err(
|
||||||
sleep_for: float = 0.1,
|
sleep_for: float = 0.1,
|
||||||
|
|
||||||
|
@ -59,20 +69,26 @@ async def trio_cancels_single_aio_task():
|
||||||
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
def test_trio_cancels_aio_on_actor_side(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Spawn an infected actor that is cancelled by the ``trio`` side
|
Spawn an infected actor that is cancelled by the ``trio`` side
|
||||||
task using std cancel scope apis.
|
task using std cancel scope apis.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
with trio.fail_after(1 + delay):
|
||||||
registry_addrs=[reg_addr]
|
async with tractor.open_nursery(
|
||||||
) as n:
|
registry_addrs=[reg_addr],
|
||||||
await n.run_in_actor(
|
debug_mode=debug_mode,
|
||||||
trio_cancels_single_aio_task,
|
) as an:
|
||||||
infect_asyncio=True,
|
await an.run_in_actor(
|
||||||
)
|
trio_cancels_single_aio_task,
|
||||||
|
infect_asyncio=True,
|
||||||
|
)
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -116,7 +132,10 @@ async def asyncio_actor(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
def test_aio_simple_error(reg_addr):
|
def test_aio_simple_error(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify a simple remote asyncio error propagates back through trio
|
Verify a simple remote asyncio error propagates back through trio
|
||||||
to the parent actor.
|
to the parent actor.
|
||||||
|
@ -125,9 +144,10 @@ def test_aio_simple_error(reg_addr):
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
registry_addrs=[reg_addr]
|
registry_addrs=[reg_addr],
|
||||||
) as n:
|
debug_mode=debug_mode,
|
||||||
await n.run_in_actor(
|
) as an:
|
||||||
|
await an.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='sleep_and_err',
|
target='sleep_and_err',
|
||||||
expect_err='AssertionError',
|
expect_err='AssertionError',
|
||||||
|
@ -153,14 +173,19 @@ def test_aio_simple_error(reg_addr):
|
||||||
assert err.boxed_type is AssertionError
|
assert err.boxed_type is AssertionError
|
||||||
|
|
||||||
|
|
||||||
def test_tractor_cancels_aio(reg_addr):
|
def test_tractor_cancels_aio(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify we can cancel a spawned asyncio task gracefully.
|
Verify we can cancel a spawned asyncio task gracefully.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
portal = await n.run_in_actor(
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='aio_sleep_forever',
|
target='aio_sleep_forever',
|
||||||
expect_err='trio.Cancelled',
|
expect_err='trio.Cancelled',
|
||||||
|
@ -172,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_aio(reg_addr):
|
def test_trio_cancels_aio(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Much like the above test with ``tractor.Portal.cancel_actor()``
|
Much like the above test with ``tractor.Portal.cancel_actor()``
|
||||||
except we just use a standard ``trio`` cancellation api.
|
except we just use a standard ``trio`` cancellation api.
|
||||||
|
@ -203,7 +230,8 @@ async def trio_ctx(
|
||||||
|
|
||||||
# this will block until the ``asyncio`` task sends a "first"
|
# this will block until the ``asyncio`` task sends a "first"
|
||||||
# message.
|
# message.
|
||||||
with trio.fail_after(2):
|
delay: int = 999 if tractor.debug_mode() else 1
|
||||||
|
with trio.fail_after(1 + delay):
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
trio.open_nursery(
|
||||||
|
@ -239,8 +267,10 @@ async def trio_ctx(
|
||||||
ids='parent_actor_cancels_child={}'.format
|
ids='parent_actor_cancels_child={}'.format
|
||||||
)
|
)
|
||||||
def test_context_spawns_aio_task_that_errors(
|
def test_context_spawns_aio_task_that_errors(
|
||||||
reg_addr,
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
parent_cancels: bool,
|
parent_cancels: bool,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify that spawning a task via an intertask channel ctx mngr that
|
Verify that spawning a task via an intertask channel ctx mngr that
|
||||||
|
@ -249,13 +279,13 @@ def test_context_spawns_aio_task_that_errors(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(1 + delay):
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
p = await n.start_actor(
|
p = await an.start_actor(
|
||||||
'aio_daemon',
|
'aio_daemon',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
# debug_mode=True,
|
debug_mode=debug_mode,
|
||||||
loglevel='cancel',
|
loglevel='cancel',
|
||||||
)
|
)
|
||||||
async with (
|
async with (
|
||||||
|
@ -322,11 +352,12 @@ async def aio_cancel():
|
||||||
|
|
||||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
reg_addr: tuple,
|
reg_addr: tuple,
|
||||||
|
delay: int,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
When the `asyncio.Task` cancels itself the `trio` side cshould
|
When the `asyncio.Task` cancels itself the `trio` side should
|
||||||
also cancel and teardown and relay the cancellation cross-process
|
also cancel and teardown and relay the cancellation cross-process
|
||||||
to the caller (parent).
|
to the parent caller.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -342,7 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
# NOTE: normally the `an.__aexit__()` waits on the
|
# NOTE: normally the `an.__aexit__()` waits on the
|
||||||
# portal's result but we do it explicitly here
|
# portal's result but we do it explicitly here
|
||||||
# to avoid indent levels.
|
# to avoid indent levels.
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1 + delay):
|
||||||
await p.wait_for_result()
|
await p.wait_for_result()
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
|
@ -353,11 +384,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||||
err: RemoteActorError|ExceptionGroup = excinfo.value
|
err: RemoteActorError|ExceptionGroup = excinfo.value
|
||||||
if isinstance(err, ExceptionGroup):
|
if isinstance(err, ExceptionGroup):
|
||||||
err = next(itertools.dropwhile(
|
excs = err.exceptions
|
||||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
assert len(excs) == 1
|
||||||
err.exceptions
|
final_exc = excs[0]
|
||||||
))
|
assert isinstance(final_exc, tractor.RemoteActorError)
|
||||||
assert err
|
|
||||||
|
|
||||||
# relayed boxed error should be our `trio`-task's
|
# relayed boxed error should be our `trio`-task's
|
||||||
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
||||||
|
@ -370,15 +400,18 @@ async def no_to_trio_in_args():
|
||||||
|
|
||||||
|
|
||||||
async def push_from_aio_task(
|
async def push_from_aio_task(
|
||||||
|
|
||||||
sequence: Iterable,
|
sequence: Iterable,
|
||||||
to_trio: trio.abc.SendChannel,
|
to_trio: trio.abc.SendChannel,
|
||||||
expect_cancel: False,
|
expect_cancel: False,
|
||||||
fail_early: bool,
|
fail_early: bool,
|
||||||
|
exit_early: bool,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# print('trying breakpoint')
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
# sync caller ctx manager
|
# sync caller ctx manager
|
||||||
to_trio.send_nowait(True)
|
to_trio.send_nowait(True)
|
||||||
|
|
||||||
|
@ -387,10 +420,27 @@ async def push_from_aio_task(
|
||||||
to_trio.send_nowait(i)
|
to_trio.send_nowait(i)
|
||||||
await asyncio.sleep(0.001)
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
if i == 50 and fail_early:
|
if (
|
||||||
raise Exception
|
i == 50
|
||||||
|
):
|
||||||
|
if fail_early:
|
||||||
|
print('Raising exc from aio side!')
|
||||||
|
raise Exception
|
||||||
|
|
||||||
print('asyncio streamer complete!')
|
if exit_early:
|
||||||
|
# TODO? really you could enforce the same
|
||||||
|
# SC-proto we use for actors here with asyncio
|
||||||
|
# such that a Return[None] msg would be
|
||||||
|
# implicitly delivered to the trio side?
|
||||||
|
#
|
||||||
|
# XXX => this might be the end-all soln for
|
||||||
|
# converting any-inter-task system (regardless
|
||||||
|
# of maybe-remote runtime or language) to be
|
||||||
|
# SC-compat no?
|
||||||
|
print(f'asyncio breaking early @ {i!r}')
|
||||||
|
break
|
||||||
|
|
||||||
|
print('asyncio streaming complete!')
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
if not expect_cancel:
|
if not expect_cancel:
|
||||||
|
@ -402,9 +452,10 @@ async def push_from_aio_task(
|
||||||
|
|
||||||
|
|
||||||
async def stream_from_aio(
|
async def stream_from_aio(
|
||||||
exit_early: bool = False,
|
trio_exit_early: bool = False,
|
||||||
raise_err: bool = False,
|
trio_raise_err: bool = False,
|
||||||
aio_raise_err: bool = False,
|
aio_raise_err: bool = False,
|
||||||
|
aio_exit_early: bool = False,
|
||||||
fan_out: bool = False,
|
fan_out: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -417,8 +468,17 @@ async def stream_from_aio(
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
push_from_aio_task,
|
push_from_aio_task,
|
||||||
sequence=seq,
|
sequence=seq,
|
||||||
expect_cancel=raise_err or exit_early,
|
expect_cancel=trio_raise_err or trio_exit_early,
|
||||||
fail_early=aio_raise_err,
|
fail_early=aio_raise_err,
|
||||||
|
exit_early=aio_exit_early,
|
||||||
|
|
||||||
|
# such that we can test exit early cases
|
||||||
|
# for each side explicitly.
|
||||||
|
suppress_graceful_exits=(not(
|
||||||
|
aio_exit_early
|
||||||
|
or
|
||||||
|
trio_exit_early
|
||||||
|
))
|
||||||
|
|
||||||
) as (first, chan):
|
) as (first, chan):
|
||||||
|
|
||||||
|
@ -431,13 +491,19 @@ async def stream_from_aio(
|
||||||
],
|
],
|
||||||
):
|
):
|
||||||
async for value in chan:
|
async for value in chan:
|
||||||
print(f'trio received {value}')
|
print(f'trio received: {value!r}')
|
||||||
|
|
||||||
|
# XXX, debugging EoC not being handled correctly
|
||||||
|
# in `transate_aio_errors()`..
|
||||||
|
# if value is None:
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
|
||||||
pulled.append(value)
|
pulled.append(value)
|
||||||
|
|
||||||
if value == 50:
|
if value == 50:
|
||||||
if raise_err:
|
if trio_raise_err:
|
||||||
raise Exception
|
raise Exception
|
||||||
elif exit_early:
|
elif trio_exit_early:
|
||||||
print('`consume()` breaking early!\n')
|
print('`consume()` breaking early!\n')
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -454,11 +520,11 @@ async def stream_from_aio(
|
||||||
# tasks are joined..
|
# tasks are joined..
|
||||||
chan.subscribe() as br,
|
chan.subscribe() as br,
|
||||||
|
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
# start 2nd task that get's broadcast the same
|
# start 2nd task that get's broadcast the same
|
||||||
# value set.
|
# value set.
|
||||||
n.start_soon(consume, br)
|
tn.start_soon(consume, br)
|
||||||
await consume(chan)
|
await consume(chan)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -471,10 +537,14 @@ async def stream_from_aio(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
if (
|
if not (
|
||||||
not raise_err and
|
trio_raise_err
|
||||||
not exit_early and
|
or
|
||||||
not aio_raise_err
|
trio_exit_early
|
||||||
|
or
|
||||||
|
aio_raise_err
|
||||||
|
or
|
||||||
|
aio_exit_early
|
||||||
):
|
):
|
||||||
if fan_out:
|
if fan_out:
|
||||||
# we get double the pulled values in the
|
# we get double the pulled values in the
|
||||||
|
@ -484,6 +554,7 @@ async def stream_from_aio(
|
||||||
assert list(sorted(pulled)) == expect
|
assert list(sorted(pulled)) == expect
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# await tractor.pause()
|
||||||
assert pulled == expect
|
assert pulled == expect
|
||||||
else:
|
else:
|
||||||
assert not fan_out
|
assert not fan_out
|
||||||
|
@ -497,10 +568,13 @@ async def stream_from_aio(
|
||||||
'fan_out', [False, True],
|
'fan_out', [False, True],
|
||||||
ids='fan_out_w_chan_subscribe={}'.format
|
ids='fan_out_w_chan_subscribe={}'.format
|
||||||
)
|
)
|
||||||
def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
def test_basic_interloop_channel_stream(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
fan_out: bool,
|
||||||
|
):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
fan_out=fan_out,
|
fan_out=fan_out,
|
||||||
|
@ -514,10 +588,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||||
# TODO: parametrize the above test and avoid the duplication here?
|
# TODO: parametrize the above test and avoid the duplication here?
|
||||||
def test_trio_error_cancels_intertask_chan(reg_addr):
|
def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
raise_err=True,
|
trio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
|
@ -530,43 +604,116 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
excinfo.value.boxed_type is Exception
|
excinfo.value.boxed_type is Exception
|
||||||
|
|
||||||
|
|
||||||
def test_trio_closes_early_and_channel_exits(
|
def test_trio_closes_early_causes_aio_checkpoint_raise(
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Check that if the `trio`-task "exits early" on `async for`ing the
|
Check that if the `trio`-task "exits early and silently" (in this
|
||||||
inter-task-channel (via a `break`) we exit silently from the
|
case during `async for`-ing the inter-task-channel via
|
||||||
`open_channel_from()` block and get a final `Return[None]` msg.
|
a `break`-from-loop), we raise `TrioTaskExited` on the
|
||||||
|
`asyncio`-side which also then bubbles up through the
|
||||||
|
`open_channel_from()` block indicating that the `asyncio.Task`
|
||||||
|
hit a ran another checkpoint despite the `trio.Task` exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(1 + delay):
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# debug_mode=True,
|
debug_mode=debug_mode,
|
||||||
# enable_stack_on_sig=True,
|
# enable_stack_on_sig=True,
|
||||||
) as n:
|
) as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
exit_early=True,
|
trio_exit_early=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should raise RAE diectly
|
# should raise RAE diectly
|
||||||
print('waiting on final infected subactor result..')
|
print('waiting on final infected subactor result..')
|
||||||
res: None = await portal.wait_for_result()
|
res: None = await portal.wait_for_result()
|
||||||
assert res is None
|
assert res is None
|
||||||
print('infected subactor returned result: {res!r}\n')
|
print(f'infected subactor returned result: {res!r}\n')
|
||||||
|
|
||||||
# should be a quiet exit on a simple channel exit
|
# should be a quiet exit on a simple channel exit
|
||||||
trio.run(
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
main,
|
trio.run(main)
|
||||||
# strict_exception_groups=False,
|
|
||||||
)
|
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||||
|
# which indicates to the aio task that the trio side exited
|
||||||
|
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||||
|
# normally be raised instead as a `AsyncioCancelled`).
|
||||||
|
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
|
||||||
|
|
||||||
|
|
||||||
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
def test_aio_exits_early_relays_AsyncioTaskExited(
|
||||||
|
# TODO, parametrize the 3 possible trio side conditions:
|
||||||
|
# - trio blocking on receive, aio exits early
|
||||||
|
# - trio cancelled AND aio exits early on its next tick
|
||||||
|
# - trio errors AND aio exits early on its next tick
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
delay: int,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Check that if the `asyncio`-task "exits early and silently" (in this
|
||||||
|
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
|
||||||
|
it `break`s from the loop), we raise `AsyncioTaskExited` on the
|
||||||
|
`trio`-side which then DOES NOT BUBBLE up through the
|
||||||
|
`open_channel_from()` block UNLESS,
|
||||||
|
|
||||||
|
- the trio.Task also errored/cancelled, in which case we wrap
|
||||||
|
both errors in an eg
|
||||||
|
- the trio.Task was blocking on rxing a value from the
|
||||||
|
`InterLoopTaskChannel`.
|
||||||
|
|
||||||
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
with trio.fail_after(1 + delay):
|
||||||
portal = await n.run_in_actor(
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
# enable_stack_on_sig=True,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
|
stream_from_aio,
|
||||||
|
infect_asyncio=True,
|
||||||
|
trio_exit_early=False,
|
||||||
|
aio_exit_early=True,
|
||||||
|
)
|
||||||
|
# should raise RAE diectly
|
||||||
|
print('waiting on final infected subactor result..')
|
||||||
|
res: None = await portal.wait_for_result()
|
||||||
|
assert res is None
|
||||||
|
print(f'infected subactor returned result: {res!r}\n')
|
||||||
|
|
||||||
|
# should be a quiet exit on a simple channel exit
|
||||||
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
exc = excinfo.value
|
||||||
|
|
||||||
|
# TODO, wow bug!
|
||||||
|
# -[ ] bp handler not replaced!?!?
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||||
|
# which indicates to the aio task that the trio side exited
|
||||||
|
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||||
|
# normally be raised instead as a `AsyncioCancelled`).
|
||||||
|
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
|
||||||
|
|
||||||
|
|
||||||
|
def test_aio_errors_and_channel_propagates_and_closes(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
aio_raise_err=True,
|
aio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -592,7 +739,13 @@ async def aio_echo_server(
|
||||||
to_trio.send_nowait('start')
|
to_trio.send_nowait('start')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
msg = await from_trio.get()
|
try:
|
||||||
|
msg = await from_trio.get()
|
||||||
|
except to_asyncio.TrioTaskExited:
|
||||||
|
print(
|
||||||
|
'breaking aio echo loop due to `trio` exit!'
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
# echo the msg back
|
# echo the msg back
|
||||||
to_trio.send_nowait(msg)
|
to_trio.send_nowait(msg)
|
||||||
|
@ -641,13 +794,15 @@ async def trio_to_aio_echo_server(
|
||||||
ids='raise_error={}'.format,
|
ids='raise_error={}'.format,
|
||||||
)
|
)
|
||||||
def test_echoserver_detailed_mechanics(
|
def test_echoserver_detailed_mechanics(
|
||||||
reg_addr,
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
raise_error_mid_stream,
|
raise_error_mid_stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
p = await n.start_actor(
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
p = await an.start_actor(
|
||||||
'aio_server',
|
'aio_server',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -852,6 +1007,8 @@ def test_sigint_closes_lifetime_stack(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
|
delay = 999 if tractor.debug_mode() else 1
|
||||||
try:
|
try:
|
||||||
an: tractor.ActorNursery
|
an: tractor.ActorNursery
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
@ -902,7 +1059,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
if wait_for_ctx:
|
if wait_for_ctx:
|
||||||
print('waiting for ctx outcome in parent..')
|
print('waiting for ctx outcome in parent..')
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1 + delay):
|
||||||
await ctx.wait_for_result()
|
await ctx.wait_for_result()
|
||||||
except tractor.ContextCancelled as ctxc:
|
except tractor.ContextCancelled as ctxc:
|
||||||
assert ctxc.canceller == ctx.chan.uid
|
assert ctxc.canceller == ctx.chan.uid
|
||||||
|
|
|
@ -170,7 +170,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
rae = excinfo.value
|
rae = excinfo.value
|
||||||
assert rae.boxed_type == TypeError
|
assert rae.boxed_type is TypeError
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -39,7 +39,7 @@ def test_infected_root_actor(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def _trio_main():
|
async def _trio_main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(2 if not debug_mode else 999):
|
||||||
first: str
|
first: str
|
||||||
chan: to_asyncio.LinkedTaskChannel
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
async with (
|
async with (
|
||||||
|
@ -59,7 +59,11 @@ def test_infected_root_actor(
|
||||||
assert out == i
|
assert out == i
|
||||||
print(f'asyncio echoing {i}')
|
print(f'asyncio echoing {i}')
|
||||||
|
|
||||||
if raise_error_mid_stream and i == 500:
|
if (
|
||||||
|
raise_error_mid_stream
|
||||||
|
and
|
||||||
|
i == 500
|
||||||
|
):
|
||||||
raise raise_error_mid_stream
|
raise raise_error_mid_stream
|
||||||
|
|
||||||
if out is None:
|
if out is None:
|
||||||
|
|
|
@ -82,6 +82,39 @@ class InternalError(RuntimeError):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
class AsyncioCancelled(Exception):
|
||||||
|
'''
|
||||||
|
Asyncio cancelled translation (non-base) error
|
||||||
|
for use with the ``to_asyncio`` module
|
||||||
|
to be raised in the ``trio`` side task
|
||||||
|
|
||||||
|
NOTE: this should NOT inherit from `asyncio.CancelledError` or
|
||||||
|
tests should break!
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncioTaskExited(Exception):
|
||||||
|
'''
|
||||||
|
asyncio.Task "exited" translation error for use with the
|
||||||
|
`to_asyncio` APIs to be raised in the `trio` side task indicating
|
||||||
|
on `.run_task()`/`.open_channel_from()` exit that the aio side
|
||||||
|
exited early/silently.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
class TrioTaskExited(AsyncioCancelled):
|
||||||
|
'''
|
||||||
|
The `trio`-side task exited without explicitly cancelling the
|
||||||
|
`asyncio.Task` peer.
|
||||||
|
|
||||||
|
This is very similar to how `trio.ClosedResource` acts as
|
||||||
|
a "clean shutdown" signal to the consumer side of a mem-chan,
|
||||||
|
|
||||||
|
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
# NOTE: more or less should be close to these:
|
# NOTE: more or less should be close to these:
|
||||||
# 'boxed_type',
|
# 'boxed_type',
|
||||||
|
@ -127,8 +160,8 @@ _body_fields: list[str] = list(
|
||||||
|
|
||||||
def get_err_type(type_name: str) -> BaseException|None:
|
def get_err_type(type_name: str) -> BaseException|None:
|
||||||
'''
|
'''
|
||||||
Look up an exception type by name from the set of locally
|
Look up an exception type by name from the set of locally known
|
||||||
known namespaces:
|
namespaces:
|
||||||
|
|
||||||
- `builtins`
|
- `builtins`
|
||||||
- `tractor._exceptions`
|
- `tractor._exceptions`
|
||||||
|
@ -358,6 +391,13 @@ class RemoteActorError(Exception):
|
||||||
self._ipc_msg.src_type_str
|
self._ipc_msg.src_type_str
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not self._src_type:
|
||||||
|
raise TypeError(
|
||||||
|
f'Failed to lookup src error type with '
|
||||||
|
f'`tractor._exceptions.get_err_type()` :\n'
|
||||||
|
f'{self.src_type_str}'
|
||||||
|
)
|
||||||
|
|
||||||
return self._src_type
|
return self._src_type
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -652,16 +692,10 @@ class RemoteActorError(Exception):
|
||||||
failing actor's remote env.
|
failing actor's remote env.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
src_type_ref: Type[BaseException] = self.src_type
|
|
||||||
if not src_type_ref:
|
|
||||||
raise TypeError(
|
|
||||||
'Failed to lookup src error type:\n'
|
|
||||||
f'{self.src_type_str}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: better tb insertion and all the fancier dunder
|
# TODO: better tb insertion and all the fancier dunder
|
||||||
# metadata stuff as per `.__context__` etc. and friends:
|
# metadata stuff as per `.__context__` etc. and friends:
|
||||||
# https://github.com/python-trio/trio/issues/611
|
# https://github.com/python-trio/trio/issues/611
|
||||||
|
src_type_ref: Type[BaseException] = self.src_type
|
||||||
return src_type_ref(self.tb_str)
|
return src_type_ref(self.tb_str)
|
||||||
|
|
||||||
# TODO: local recontruction of nested inception for a given
|
# TODO: local recontruction of nested inception for a given
|
||||||
|
|
|
@ -35,6 +35,7 @@ from signal import (
|
||||||
signal,
|
signal,
|
||||||
getsignal,
|
getsignal,
|
||||||
SIGUSR1,
|
SIGUSR1,
|
||||||
|
SIGINT,
|
||||||
)
|
)
|
||||||
# import traceback
|
# import traceback
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
@ -48,6 +49,7 @@ from tractor import (
|
||||||
_state,
|
_state,
|
||||||
log as logmod,
|
log as logmod,
|
||||||
)
|
)
|
||||||
|
from tractor.devx import _debug
|
||||||
|
|
||||||
log = logmod.get_logger(__name__)
|
log = logmod.get_logger(__name__)
|
||||||
|
|
||||||
|
@ -76,22 +78,45 @@ def dump_task_tree() -> None:
|
||||||
)
|
)
|
||||||
actor: Actor = _state.current_actor()
|
actor: Actor = _state.current_actor()
|
||||||
thr: Thread = current_thread()
|
thr: Thread = current_thread()
|
||||||
|
current_sigint_handler: Callable = getsignal(SIGINT)
|
||||||
|
if (
|
||||||
|
current_sigint_handler
|
||||||
|
is not
|
||||||
|
_debug.DebugStatus._trio_handler
|
||||||
|
):
|
||||||
|
sigint_handler_report: str = (
|
||||||
|
'The default `trio` SIGINT handler was replaced?!'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
sigint_handler_report: str = (
|
||||||
|
'The default `trio` SIGINT handler is in use?!'
|
||||||
|
)
|
||||||
|
|
||||||
|
# sclang symbology
|
||||||
|
# |_<object>
|
||||||
|
# |_(Task/Thread/Process/Actor
|
||||||
|
# |_{Supervisor/Scope
|
||||||
|
# |_[Storage/Memory/IPC-Stream/Data-Struct
|
||||||
|
|
||||||
log.devx(
|
log.devx(
|
||||||
f'Dumping `stackscope` tree for actor\n'
|
f'Dumping `stackscope` tree for actor\n'
|
||||||
f'{actor.uid}:\n'
|
f'(>: {actor.uid!r}\n'
|
||||||
f'|_{mp.current_process()}\n'
|
f' |_{mp.current_process()}\n'
|
||||||
f' |_{thr}\n'
|
f' |_{thr}\n'
|
||||||
f' |_{actor}\n\n'
|
f' |_{actor}\n'
|
||||||
|
|
||||||
# start-of-trace-tree delimiter (mostly for testing)
|
|
||||||
'------ - ------\n'
|
|
||||||
'\n'
|
|
||||||
+
|
|
||||||
f'{tree_str}\n'
|
|
||||||
+
|
|
||||||
# end-of-trace-tree delimiter (mostly for testing)
|
|
||||||
f'\n'
|
f'\n'
|
||||||
f'------ {actor.uid!r} ------\n'
|
f'{sigint_handler_report}\n'
|
||||||
|
f'signal.getsignal(SIGINT) -> {current_sigint_handler!r}\n'
|
||||||
|
# f'\n'
|
||||||
|
# start-of-trace-tree delimiter (mostly for testing)
|
||||||
|
# f'------ {actor.uid!r} ------\n'
|
||||||
|
f'\n'
|
||||||
|
f'------ start-of-{actor.uid!r} ------\n'
|
||||||
|
f'|\n'
|
||||||
|
f'{tree_str}'
|
||||||
|
# end-of-trace-tree delimiter (mostly for testing)
|
||||||
|
f'|\n'
|
||||||
|
f'|_____ end-of-{actor.uid!r} ______\n'
|
||||||
)
|
)
|
||||||
# TODO: can remove this right?
|
# TODO: can remove this right?
|
||||||
# -[ ] was original code from author
|
# -[ ] was original code from author
|
||||||
|
@ -123,11 +148,11 @@ def dump_tree_on_sig(
|
||||||
) -> None:
|
) -> None:
|
||||||
global _tree_dumped, _handler_lock
|
global _tree_dumped, _handler_lock
|
||||||
with _handler_lock:
|
with _handler_lock:
|
||||||
if _tree_dumped:
|
# if _tree_dumped:
|
||||||
log.warning(
|
# log.warning(
|
||||||
'Already dumped for this actor...??'
|
# 'Already dumped for this actor...??'
|
||||||
)
|
# )
|
||||||
return
|
# return
|
||||||
|
|
||||||
_tree_dumped = True
|
_tree_dumped = True
|
||||||
|
|
||||||
|
@ -161,9 +186,9 @@ def dump_tree_on_sig(
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
log.devx(
|
# log.devx(
|
||||||
'Supposedly we dumped just fine..?'
|
# 'Supposedly we dumped just fine..?'
|
||||||
)
|
# )
|
||||||
|
|
||||||
if not relay_to_subs:
|
if not relay_to_subs:
|
||||||
return
|
return
|
||||||
|
@ -202,11 +227,11 @@ def enable_stack_on_sig(
|
||||||
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
|
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
|
||||||
you could use:
|
you could use:
|
||||||
|
|
||||||
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
|
>> kill -SIGUSR1 $(pgrep -f <part-of-cmd: str>)
|
||||||
|
|
||||||
Or with with `xonsh` (which has diff capture-from-subproc syntax)
|
OR without a sub-shell,
|
||||||
|
|
||||||
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
>> pkill --signal SIGUSR1 -f <part-of-cmd: str>
|
||||||
|
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue