Compare commits
9 Commits
9be821a5cf
...
547b957bbf
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 547b957bbf | |
Tyler Goodlet | d216068713 | |
Tyler Goodlet | 131e3e8157 | |
Tyler Goodlet | fc95c6719f | |
Tyler Goodlet | bef3dd9e97 | |
Tyler Goodlet | e6ccfce751 | |
Tyler Goodlet | 31207f92ee | |
Tyler Goodlet | 5f8f8e98ba | |
Tyler Goodlet | b56352b0e4 |
|
@ -4,6 +4,13 @@ import time
|
|||
import trio
|
||||
import tractor
|
||||
|
||||
# TODO: only import these when not running from test harness?
|
||||
# can we detect `pexpect` usage maybe?
|
||||
# from tractor.devx._debug import (
|
||||
# get_lock,
|
||||
# get_debug_req,
|
||||
# )
|
||||
|
||||
|
||||
def sync_pause(
|
||||
use_builtin: bool = False,
|
||||
|
@ -18,7 +25,13 @@ def sync_pause(
|
|||
breakpoint(hide_tb=hide_tb)
|
||||
|
||||
else:
|
||||
# TODO: maybe for testing some kind of cm style interface
|
||||
# where the `._set_trace()` call doesn't happen until block
|
||||
# exit?
|
||||
# assert get_lock().ctx_in_debug is None
|
||||
# assert get_debug_req().repl is None
|
||||
tractor.pause_from_sync()
|
||||
# assert get_debug_req().repl is None
|
||||
|
||||
if error:
|
||||
raise RuntimeError('yoyo sync code error')
|
||||
|
@ -41,10 +54,11 @@ async def start_n_sync_pause(
|
|||
async def main() -> None:
|
||||
async with (
|
||||
tractor.open_nursery(
|
||||
# NOTE: required for pausing from sync funcs
|
||||
maybe_enable_greenback=True,
|
||||
debug_mode=True,
|
||||
# loglevel='cancel',
|
||||
maybe_enable_greenback=True,
|
||||
enable_stack_on_sig=True,
|
||||
# loglevel='warning',
|
||||
# loglevel='devx',
|
||||
) as an,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
|
@ -138,7 +152,9 @@ async def main() -> None:
|
|||
# the case 2. from above still exists!
|
||||
use_builtin=True,
|
||||
),
|
||||
abandon_on_cancel=False,
|
||||
# TODO: with this `False` we can hang!??!
|
||||
# abandon_on_cancel=False,
|
||||
abandon_on_cancel=True,
|
||||
thread_name='inline_root_bg_thread',
|
||||
)
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ async def main(service_name):
|
|||
async with tractor.open_nursery() as an:
|
||||
await an.start_actor(service_name)
|
||||
|
||||
async with tractor.get_arbiter('127.0.0.1', 1616) as portal:
|
||||
async with tractor.get_registry('127.0.0.1', 1616) as portal:
|
||||
print(f"Arbiter is listening on {portal.channel}")
|
||||
|
||||
async with tractor.wait_for_actor(service_name) as sockaddr:
|
||||
|
|
|
@ -0,0 +1,167 @@
|
|||
'''
|
||||
`tractor.devx.*` tooling sub-pkg test space.
|
||||
|
||||
'''
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import pytest
|
||||
from pexpect.exceptions import (
|
||||
TIMEOUT,
|
||||
)
|
||||
from tractor._testing import (
|
||||
mk_cmd,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def spawn(
|
||||
start_method,
|
||||
testdir: pytest.Testdir,
|
||||
reg_addr: tuple[str, int],
|
||||
|
||||
) -> Callable[[str], None]:
|
||||
'''
|
||||
Use the `pexpect` module shipped via `testdir.spawn()` to
|
||||
run an `./examples/..` script by name.
|
||||
|
||||
'''
|
||||
if start_method != 'trio':
|
||||
pytest.skip(
|
||||
'`pexpect` based tests only supported on `trio` backend'
|
||||
)
|
||||
|
||||
def _spawn(
|
||||
cmd: str,
|
||||
**mkcmd_kwargs,
|
||||
):
|
||||
return testdir.spawn(
|
||||
cmd=mk_cmd(
|
||||
cmd,
|
||||
**mkcmd_kwargs,
|
||||
),
|
||||
expect_timeout=3,
|
||||
)
|
||||
|
||||
# such that test-dep can pass input script name.
|
||||
return _spawn
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
params=[False, True],
|
||||
ids='ctl-c={}'.format,
|
||||
)
|
||||
def ctlc(
|
||||
request,
|
||||
ci_env: bool,
|
||||
|
||||
) -> bool:
|
||||
|
||||
use_ctlc = request.param
|
||||
|
||||
node = request.node
|
||||
markers = node.own_markers
|
||||
for mark in markers:
|
||||
if mark.name == 'has_nested_actors':
|
||||
pytest.skip(
|
||||
f'Test {node} has nested actors and fails with Ctrl-C.\n'
|
||||
f'The test can sometimes run fine locally but until'
|
||||
' we solve' 'this issue this CI test will be xfail:\n'
|
||||
'https://github.com/goodboy/tractor/issues/320'
|
||||
)
|
||||
|
||||
if use_ctlc:
|
||||
# XXX: disable pygments highlighting for auto-tests
|
||||
# since some envs (like actions CI) will struggle
|
||||
# the the added color-char encoding..
|
||||
from tractor.devx._debug import TractorConfig
|
||||
TractorConfig.use_pygements = False
|
||||
|
||||
yield use_ctlc
|
||||
|
||||
|
||||
def expect(
|
||||
child,
|
||||
|
||||
# normally a `pdb` prompt by default
|
||||
patt: str,
|
||||
|
||||
**kwargs,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Expect wrapper that prints last seen console
|
||||
data before failing.
|
||||
|
||||
'''
|
||||
try:
|
||||
child.expect(
|
||||
patt,
|
||||
**kwargs,
|
||||
)
|
||||
except TIMEOUT:
|
||||
before = str(child.before.decode())
|
||||
print(before)
|
||||
raise
|
||||
|
||||
|
||||
def in_prompt_msg(
|
||||
prompt: str,
|
||||
parts: list[str],
|
||||
|
||||
pause_on_false: bool = False,
|
||||
err_on_false: bool = False,
|
||||
print_prompt_on_false: bool = True,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Predicate check if (the prompt's) std-streams output has all
|
||||
`str`-parts in it.
|
||||
|
||||
Can be used in test asserts for bulk matching expected
|
||||
log/REPL output for a given `pdb` interact point.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
for part in parts:
|
||||
if part not in prompt:
|
||||
if pause_on_false:
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
|
||||
if print_prompt_on_false:
|
||||
print(prompt)
|
||||
|
||||
if err_on_false:
|
||||
raise ValueError(
|
||||
f'Could not find pattern: {part!r} in `before` output?'
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# TODO: todo support terminal color-chars stripping so we can match
|
||||
# against call stack frame output from the the 'll' command the like!
|
||||
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
||||
def assert_before(
|
||||
child,
|
||||
patts: list[str],
|
||||
|
||||
**kwargs,
|
||||
|
||||
) -> None:
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
# as in before the prompt end
|
||||
before: str = str(child.before.decode())
|
||||
assert in_prompt_msg(
|
||||
prompt=before,
|
||||
parts=patts,
|
||||
|
||||
# since this is an "assert" helper ;)
|
||||
err_on_false=True,
|
||||
**kwargs
|
||||
)
|
|
@ -13,11 +13,9 @@ TODO:
|
|||
from functools import partial
|
||||
import itertools
|
||||
import platform
|
||||
import pathlib
|
||||
import time
|
||||
|
||||
import pytest
|
||||
import pexpect
|
||||
from pexpect.exceptions import (
|
||||
TIMEOUT,
|
||||
EOF,
|
||||
|
@ -28,12 +26,14 @@ from tractor.devx._debug import (
|
|||
_crash_msg,
|
||||
_repl_fail_msg,
|
||||
)
|
||||
from tractor._testing import (
|
||||
examples_dir,
|
||||
)
|
||||
from conftest import (
|
||||
_ci_env,
|
||||
)
|
||||
from .conftest import (
|
||||
expect,
|
||||
in_prompt_msg,
|
||||
assert_before,
|
||||
)
|
||||
|
||||
# TODO: The next great debugger audit could be done by you!
|
||||
# - recurrent entry to breakpoint() from single actor *after* and an
|
||||
|
@ -52,15 +52,6 @@ if platform.system() == 'Windows':
|
|||
)
|
||||
|
||||
|
||||
def mk_cmd(ex_name: str) -> str:
|
||||
'''
|
||||
Generate a command suitable to pass to ``pexpect.spawn()``.
|
||||
|
||||
'''
|
||||
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
|
||||
return ' '.join(['python', str(script_path)])
|
||||
|
||||
|
||||
# TODO: was trying to this xfail style but some weird bug i see in CI
|
||||
# that's happening at collect time.. pretty soon gonna dump actions i'm
|
||||
# thinkin...
|
||||
|
@ -79,142 +70,9 @@ has_nested_actors = pytest.mark.has_nested_actors
|
|||
# )
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def spawn(
|
||||
start_method,
|
||||
testdir,
|
||||
reg_addr,
|
||||
) -> 'pexpect.spawn':
|
||||
|
||||
if start_method != 'trio':
|
||||
pytest.skip(
|
||||
"Debugger tests are only supported on the trio backend"
|
||||
)
|
||||
|
||||
def _spawn(cmd):
|
||||
return testdir.spawn(
|
||||
cmd=mk_cmd(cmd),
|
||||
expect_timeout=3,
|
||||
)
|
||||
|
||||
return _spawn
|
||||
|
||||
|
||||
PROMPT = r"\(Pdb\+\)"
|
||||
|
||||
|
||||
def expect(
|
||||
child,
|
||||
|
||||
# prompt by default
|
||||
patt: str = PROMPT,
|
||||
|
||||
**kwargs,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Expect wrapper that prints last seen console
|
||||
data before failing.
|
||||
|
||||
'''
|
||||
try:
|
||||
child.expect(
|
||||
patt,
|
||||
**kwargs,
|
||||
)
|
||||
except TIMEOUT:
|
||||
before = str(child.before.decode())
|
||||
print(before)
|
||||
raise
|
||||
|
||||
|
||||
def in_prompt_msg(
|
||||
prompt: str,
|
||||
parts: list[str],
|
||||
|
||||
pause_on_false: bool = False,
|
||||
print_prompt_on_false: bool = True,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Predicate check if (the prompt's) std-streams output has all
|
||||
`str`-parts in it.
|
||||
|
||||
Can be used in test asserts for bulk matching expected
|
||||
log/REPL output for a given `pdb` interact point.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
for part in parts:
|
||||
if part not in prompt:
|
||||
if pause_on_false:
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
|
||||
if print_prompt_on_false:
|
||||
print(prompt)
|
||||
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# TODO: todo support terminal color-chars stripping so we can match
|
||||
# against call stack frame output from the the 'll' command the like!
|
||||
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
||||
def assert_before(
|
||||
child,
|
||||
patts: list[str],
|
||||
|
||||
**kwargs,
|
||||
|
||||
) -> None:
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
# as in before the prompt end
|
||||
before: str = str(child.before.decode())
|
||||
assert in_prompt_msg(
|
||||
prompt=before,
|
||||
parts=patts,
|
||||
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
params=[False, True],
|
||||
ids='ctl-c={}'.format,
|
||||
)
|
||||
def ctlc(
|
||||
request,
|
||||
ci_env: bool,
|
||||
|
||||
) -> bool:
|
||||
|
||||
use_ctlc = request.param
|
||||
|
||||
node = request.node
|
||||
markers = node.own_markers
|
||||
for mark in markers:
|
||||
if mark.name == 'has_nested_actors':
|
||||
pytest.skip(
|
||||
f'Test {node} has nested actors and fails with Ctrl-C.\n'
|
||||
f'The test can sometimes run fine locally but until'
|
||||
' we solve' 'this issue this CI test will be xfail:\n'
|
||||
'https://github.com/goodboy/tractor/issues/320'
|
||||
)
|
||||
|
||||
if use_ctlc:
|
||||
# XXX: disable pygments highlighting for auto-tests
|
||||
# since some envs (like actions CI) will struggle
|
||||
# the the added color-char encoding..
|
||||
from tractor.devx._debug import TractorConfig
|
||||
TractorConfig.use_pygements = False
|
||||
|
||||
yield use_ctlc
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'user_in_out',
|
||||
[
|
||||
|
@ -279,7 +137,7 @@ def test_root_actor_bp(spawn, user_in_out):
|
|||
child.expect('\r\n')
|
||||
|
||||
# process should exit
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
if expect_err_str is None:
|
||||
assert 'Error' not in str(child.before)
|
||||
|
@ -299,7 +157,9 @@ def do_ctlc(
|
|||
# needs some further investigation potentially...
|
||||
expect_prompt: bool = not _ci_env,
|
||||
|
||||
) -> None:
|
||||
) -> str|None:
|
||||
|
||||
before: str|None = None
|
||||
|
||||
# make sure ctl-c sends don't do anything but repeat output
|
||||
for _ in range(count):
|
||||
|
@ -309,15 +169,18 @@ def do_ctlc(
|
|||
# TODO: figure out why this makes CI fail..
|
||||
# if you run this test manually it works just fine..
|
||||
if expect_prompt:
|
||||
before = str(child.before.decode())
|
||||
time.sleep(delay)
|
||||
child.expect(PROMPT)
|
||||
before = str(child.before.decode())
|
||||
time.sleep(delay)
|
||||
|
||||
if patt:
|
||||
# should see the last line on console
|
||||
assert patt in before
|
||||
|
||||
# return the console content up to the final prompt
|
||||
return before
|
||||
|
||||
|
||||
def test_root_actor_bp_forever(
|
||||
spawn,
|
||||
|
@ -358,7 +221,7 @@ def test_root_actor_bp_forever(
|
|||
|
||||
# quit out of the loop
|
||||
child.sendline('q')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -423,7 +286,7 @@ def test_subactor_error(
|
|||
child.expect('\r\n')
|
||||
|
||||
# process should exit
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
def test_subactor_breakpoint(
|
||||
|
@ -486,7 +349,7 @@ def test_subactor_breakpoint(
|
|||
child.sendline('c')
|
||||
|
||||
# process should exit
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
before = str(child.before.decode())
|
||||
assert in_prompt_msg(
|
||||
|
@ -629,7 +492,7 @@ def test_multi_subactors(
|
|||
|
||||
# process should exit
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
# repeat of previous multierror for final output
|
||||
assert_before(child, [
|
||||
|
@ -769,7 +632,7 @@ def test_multi_daemon_subactors(
|
|||
)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
@has_nested_actors
|
||||
|
@ -845,7 +708,7 @@ def test_multi_subactors_root_errors(
|
|||
])
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
assert_before(child, [
|
||||
# "Attaching to pdb in crashed actor: ('root'",
|
||||
|
@ -975,7 +838,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
|||
|
||||
for i in range(3):
|
||||
try:
|
||||
child.expect(pexpect.EOF, timeout=0.5)
|
||||
child.expect(EOF, timeout=0.5)
|
||||
break
|
||||
except TIMEOUT:
|
||||
child.sendline('c')
|
||||
|
@ -1017,7 +880,7 @@ def test_root_cancels_child_context_during_startup(
|
|||
do_ctlc(child)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
def test_different_debug_mode_per_actor(
|
||||
|
@ -1038,7 +901,7 @@ def test_different_debug_mode_per_actor(
|
|||
do_ctlc(child)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
before = str(child.before.decode())
|
||||
|
||||
|
@ -1085,10 +948,10 @@ def test_pause_from_sync(
|
|||
)
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
# ^NOTE^ subactor not spawned yet; don't need extra delay.
|
||||
|
||||
child.sendline('c')
|
||||
|
||||
|
||||
# first `await tractor.pause()` inside `p.open_context()` body
|
||||
child.expect(PROMPT)
|
||||
|
||||
|
@ -1109,7 +972,27 @@ def test_pause_from_sync(
|
|||
)
|
||||
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
do_ctlc(
|
||||
child,
|
||||
# NOTE: setting this to 0 (or some other sufficient
|
||||
# small val) can cause the test to fail since the
|
||||
# `subactor` suffers a race where the root/parent
|
||||
# sends an actor-cancel prior to it hitting its pause
|
||||
# point; by def the value is 0.1
|
||||
delay=0.4,
|
||||
)
|
||||
|
||||
# XXX, fwiw without a brief sleep here the SIGINT might actually
|
||||
# trigger "subactor" cancellation by its parent before the
|
||||
# shield-handler is engaged.
|
||||
#
|
||||
# => similar to the `delay` input to `do_ctlc()` below, setting
|
||||
# this too low can cause the test to fail since the `subactor`
|
||||
# suffers a race where the root/parent sends an actor-cancel
|
||||
# prior to the context task hitting its pause point (and thus
|
||||
# engaging the `sigint_shield()` handler in time); this value
|
||||
# seems be good enuf?
|
||||
time.sleep(0.6)
|
||||
|
||||
# one of the bg thread or subactor should have
|
||||
# `Lock.acquire()`-ed
|
||||
|
@ -1128,32 +1011,48 @@ def test_pause_from_sync(
|
|||
"('root'",
|
||||
],
|
||||
}
|
||||
conts: int = 0 # for debugging below matching logic on failure
|
||||
while attach_patts:
|
||||
child.sendline('c')
|
||||
conts += 1
|
||||
child.expect(PROMPT)
|
||||
before = str(child.before.decode())
|
||||
for key in attach_patts.copy():
|
||||
for key in attach_patts:
|
||||
if key in before:
|
||||
attach_key: str = key
|
||||
expected_patts: str = attach_patts.pop(key)
|
||||
assert_before(
|
||||
child,
|
||||
[_pause_msg] + expected_patts
|
||||
[_pause_msg]
|
||||
+
|
||||
expected_patts
|
||||
)
|
||||
break
|
||||
else:
|
||||
pytest.fail(
|
||||
f'No keys found?\n\n'
|
||||
f'{attach_patts.keys()}\n\n'
|
||||
f'{before}\n'
|
||||
)
|
||||
|
||||
# ensure no other task/threads engaged a REPL
|
||||
# at the same time as the one that was detected above.
|
||||
for key, other_patts in attach_patts.items():
|
||||
for key, other_patts in attach_patts.copy().items():
|
||||
assert not in_prompt_msg(
|
||||
before,
|
||||
other_patts,
|
||||
)
|
||||
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
do_ctlc(
|
||||
child,
|
||||
patt=attach_key,
|
||||
# NOTE same as comment above
|
||||
delay=0.4,
|
||||
)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
def test_post_mortem_api(
|
||||
|
@ -1258,7 +1157,7 @@ def test_post_mortem_api(
|
|||
# )
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
def test_shield_pause(
|
||||
|
@ -1333,7 +1232,7 @@ def test_shield_pause(
|
|||
]
|
||||
)
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
child.expect(EOF)
|
||||
|
||||
|
||||
# TODO: better error for "non-ideal" usage from the root actor.
|
|
@ -0,0 +1,120 @@
|
|||
'''
|
||||
That "native" runtime-hackin toolset better be dang useful!
|
||||
|
||||
Verify the funtion of a variety of "developer-experience" tools we
|
||||
offer from the `.devx` sub-pkg:
|
||||
|
||||
- use of the lovely `stackscope` for dumping actor `trio`-task trees
|
||||
during operation and hangs.
|
||||
|
||||
TODO:
|
||||
- demonstration of `CallerInfo` call stack frame filtering such that
|
||||
for logging and REPL purposes a user sees exactly the layers needed
|
||||
when debugging a problem inside the stack vs. in their app.
|
||||
|
||||
'''
|
||||
import os
|
||||
import signal
|
||||
|
||||
from .conftest import (
|
||||
expect,
|
||||
assert_before,
|
||||
# in_prompt_msg,
|
||||
)
|
||||
|
||||
|
||||
def test_shield_pause(
|
||||
spawn,
|
||||
):
|
||||
'''
|
||||
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||
already cancelled `trio.CancelScope` and that you can step to the
|
||||
next checkpoint wherein the cancelled will get raised.
|
||||
|
||||
'''
|
||||
child = spawn(
|
||||
'shield_hang_in_sub'
|
||||
)
|
||||
expect(
|
||||
child,
|
||||
'Yo my child hanging..?',
|
||||
)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
'Entering shield sleep..',
|
||||
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
|
||||
]
|
||||
)
|
||||
|
||||
print(
|
||||
'Sending SIGUSR1 to see a tree-trace!',
|
||||
)
|
||||
os.kill(
|
||||
child.pid,
|
||||
signal.SIGUSR1,
|
||||
)
|
||||
expect(
|
||||
child,
|
||||
# end-of-tree delimiter
|
||||
"------ \('root', ",
|
||||
)
|
||||
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
'Trying to dump `stackscope` tree..',
|
||||
'Dumping `stackscope` tree for actor',
|
||||
"('root'", # uid line
|
||||
|
||||
# parent block point (non-shielded)
|
||||
'await trio.sleep_forever() # in root',
|
||||
]
|
||||
)
|
||||
|
||||
# expect(
|
||||
# child,
|
||||
# # relay to the sub should be reported
|
||||
# 'Relaying `SIGUSR1`[10] to sub-actor',
|
||||
# )
|
||||
|
||||
expect(
|
||||
child,
|
||||
# end-of-tree delimiter
|
||||
"------ \('hanger', ",
|
||||
)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
# relay to the sub should be reported
|
||||
'Relaying `SIGUSR1`[10] to sub-actor',
|
||||
|
||||
"('hanger'", # uid line
|
||||
|
||||
# hanger LOC where it's shield-halted
|
||||
'await trio.sleep_forever() # in subactor',
|
||||
]
|
||||
)
|
||||
# breakpoint()
|
||||
|
||||
# simulate the user sending a ctl-c to the hanging program.
|
||||
# this should result in the terminator kicking in since
|
||||
# the sub is shield blocking and can't respond to SIGINT.
|
||||
os.kill(
|
||||
child.pid,
|
||||
signal.SIGINT,
|
||||
)
|
||||
expect(
|
||||
child,
|
||||
'Shutting down actor runtime',
|
||||
timeout=6,
|
||||
)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
'raise KeyboardInterrupt',
|
||||
# 'Shutting down actor runtime',
|
||||
'#T-800 deployed to collect zombie B0',
|
||||
"'--uid', \"('hanger',",
|
||||
]
|
||||
)
|
|
@ -91,7 +91,8 @@ def test_ipc_channel_break_during_stream(
|
|||
|
||||
# non-`trio` spawners should never hit the hang condition that
|
||||
# requires the user to do ctl-c to cancel the actor tree.
|
||||
expect_final_exc = trio.ClosedResourceError
|
||||
# expect_final_exc = trio.ClosedResourceError
|
||||
expect_final_exc = tractor.TransportClosed
|
||||
|
||||
mod: ModuleType = import_path(
|
||||
examples_dir() / 'advanced_faults'
|
||||
|
@ -157,7 +158,7 @@ def test_ipc_channel_break_during_stream(
|
|||
if pre_aclose_msgstream:
|
||||
expect_final_exc = KeyboardInterrupt
|
||||
|
||||
# NOTE when the parent IPC side dies (even if the child's does as well
|
||||
# NOTE when the parent IPC side dies (even if the child does as well
|
||||
# but the child fails BEFORE the parent) we always expect the
|
||||
# IPC layer to raise a closed-resource, NEVER do we expect
|
||||
# a stop msg since the parent-side ctx apis will error out
|
||||
|
@ -169,7 +170,8 @@ def test_ipc_channel_break_during_stream(
|
|||
and
|
||||
ipc_break['break_child_ipc_after'] is False
|
||||
):
|
||||
expect_final_exc = trio.ClosedResourceError
|
||||
# expect_final_exc = trio.ClosedResourceError
|
||||
expect_final_exc = tractor.TransportClosed
|
||||
|
||||
# BOTH but, PARENT breaks FIRST
|
||||
elif (
|
||||
|
@ -180,7 +182,8 @@ def test_ipc_channel_break_during_stream(
|
|||
ipc_break['break_parent_ipc_after']
|
||||
)
|
||||
):
|
||||
expect_final_exc = trio.ClosedResourceError
|
||||
# expect_final_exc = trio.ClosedResourceError
|
||||
expect_final_exc = tractor.TransportClosed
|
||||
|
||||
with pytest.raises(
|
||||
expected_exception=(
|
||||
|
@ -199,8 +202,8 @@ def test_ipc_channel_break_during_stream(
|
|||
**ipc_break,
|
||||
)
|
||||
)
|
||||
except KeyboardInterrupt as kbi:
|
||||
_err = kbi
|
||||
except KeyboardInterrupt as _kbi:
|
||||
kbi = _kbi
|
||||
if expect_final_exc is not KeyboardInterrupt:
|
||||
pytest.fail(
|
||||
'Rxed unexpected KBI !?\n'
|
||||
|
@ -209,6 +212,21 @@ def test_ipc_channel_break_during_stream(
|
|||
|
||||
raise
|
||||
|
||||
except tractor.TransportClosed as _tc:
|
||||
tc = _tc
|
||||
if expect_final_exc is KeyboardInterrupt:
|
||||
pytest.fail(
|
||||
'Unexpected transport failure !?\n'
|
||||
f'{repr(tc)}'
|
||||
)
|
||||
cause: Exception = tc.__cause__
|
||||
assert (
|
||||
type(cause) is trio.ClosedResourceError
|
||||
and
|
||||
cause.args[0] == 'another task closed this fd'
|
||||
)
|
||||
raise
|
||||
|
||||
# get raw instance from pytest wrapper
|
||||
value = excinfo.value
|
||||
if isinstance(value, ExceptionGroup):
|
||||
|
|
|
@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
|
|||
portal = await n.start_actor('actor', enable_modules=[__name__])
|
||||
uid = portal.channel.uid
|
||||
|
||||
async with tractor.get_arbiter(*reg_addr) as aportal:
|
||||
async with tractor.get_registry(*reg_addr) as aportal:
|
||||
# this local actor should be the arbiter
|
||||
assert actor is aportal.actor
|
||||
|
||||
|
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
|
|||
async with tractor.open_root_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
# runtime needs to be up to call this
|
||||
actor = tractor.current_actor()
|
||||
|
||||
|
@ -298,7 +298,7 @@ async def close_chans_before_nursery(
|
|||
async with tractor.open_root_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
async with tractor.get_arbiter(*reg_addr) as aportal:
|
||||
async with tractor.get_registry(*reg_addr) as aportal:
|
||||
try:
|
||||
get_reg = partial(unpack_reg, aportal)
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
|
|||
"Verify waiting on the arbiter to register itself using a local portal."
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
assert isinstance(portal, tractor._portal.LocalPortal)
|
||||
|
||||
with trio.fail_after(0.2):
|
||||
|
|
|
@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
|
|||
@tractor_test
|
||||
async def test_cancel_remote_arbiter(daemon, reg_addr):
|
||||
assert not tractor.current_actor().is_arbiter
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
await portal.cancel_actor()
|
||||
|
||||
time.sleep(0.1)
|
||||
|
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
|
|||
|
||||
# no arbiter socket should exist
|
||||
with pytest.raises(OSError):
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
pass
|
||||
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ from ._streaming import (
|
|||
stream as stream,
|
||||
)
|
||||
from ._discovery import (
|
||||
get_arbiter as get_arbiter,
|
||||
get_registry as get_registry,
|
||||
find_actor as find_actor,
|
||||
wait_for_actor as wait_for_actor,
|
||||
query_actor as query_actor,
|
||||
|
|
|
@ -2376,8 +2376,9 @@ async def open_context_from_portal(
|
|||
and ctx.cancel_acked
|
||||
):
|
||||
log.cancel(
|
||||
f'Context cancelled by {ctx.side!r}-side task\n'
|
||||
f'|_{ctx._task}\n\n'
|
||||
f'Context cancelled by local {ctx.side!r}-side task\n'
|
||||
f'c)>\n'
|
||||
f' |_{ctx._task}\n\n'
|
||||
f'{repr(scope_err)}\n'
|
||||
)
|
||||
|
||||
|
@ -2393,8 +2394,10 @@ async def open_context_from_portal(
|
|||
# type_only=True,
|
||||
)
|
||||
log.cancel(
|
||||
f'Context terminated due to local {ctx.side!r}-side error:\n\n'
|
||||
f'{ctx.chan.uid} => {outcome_str}\n'
|
||||
f'Context terminated due to {ctx.side!r}-side\n\n'
|
||||
# TODO: do an x)> on err and c)> only for ctxc?
|
||||
f'c)> {outcome_str}\n'
|
||||
f' |_{ctx.repr_rpc}\n'
|
||||
)
|
||||
|
||||
# FINALLY, remove the context from runtime tracking and
|
||||
|
|
|
@ -26,8 +26,8 @@ from typing import (
|
|||
TYPE_CHECKING,
|
||||
)
|
||||
from contextlib import asynccontextmanager as acm
|
||||
import warnings
|
||||
|
||||
from tractor.log import get_logger
|
||||
from .trionics import gather_contexts
|
||||
from ._ipc import _connect_chan, Channel
|
||||
from ._portal import (
|
||||
|
@ -40,11 +40,13 @@ from ._state import (
|
|||
_runtime_vars,
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
@acm
|
||||
async def get_registry(
|
||||
host: str,
|
||||
|
@ -56,14 +58,12 @@ async def get_registry(
|
|||
]:
|
||||
'''
|
||||
Return a portal instance connected to a local or remote
|
||||
arbiter.
|
||||
registry-service actor; if a connection already exists re-use it
|
||||
(presumably to call a `.register_actor()` registry runtime RPC
|
||||
ep).
|
||||
|
||||
'''
|
||||
actor = current_actor()
|
||||
|
||||
if not actor:
|
||||
raise RuntimeError("No actor instance has been defined yet?")
|
||||
|
||||
actor: Actor = current_actor()
|
||||
if actor.is_registrar:
|
||||
# we're already the arbiter
|
||||
# (likely a re-entrant call from the arbiter actor)
|
||||
|
@ -72,6 +72,8 @@ async def get_registry(
|
|||
Channel((host, port))
|
||||
)
|
||||
else:
|
||||
# TODO: try to look pre-existing connection from
|
||||
# `Actor._peers` and use it instead?
|
||||
async with (
|
||||
_connect_chan(host, port) as chan,
|
||||
open_portal(chan) as regstr_ptl,
|
||||
|
@ -80,19 +82,6 @@ async def get_registry(
|
|||
|
||||
|
||||
|
||||
# TODO: deprecate and this remove _arbiter form!
|
||||
@acm
|
||||
async def get_arbiter(*args, **kwargs):
|
||||
warnings.warn(
|
||||
'`tractor.get_arbiter()` is now deprecated!\n'
|
||||
'Use `.get_registry()` instead!',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
async with get_registry(*args, **kwargs) as to_yield:
|
||||
yield to_yield
|
||||
|
||||
|
||||
@acm
|
||||
async def get_root(
|
||||
**kwargs,
|
||||
|
@ -110,22 +99,53 @@ async def get_root(
|
|||
yield portal
|
||||
|
||||
|
||||
def get_peer_by_name(
|
||||
name: str,
|
||||
# uuid: str|None = None,
|
||||
|
||||
) -> list[Channel]|None: # at least 1
|
||||
'''
|
||||
Scan for an existing connection (set) to a named actor
|
||||
and return any channels from `Actor._peers`.
|
||||
|
||||
This is an optimization method over querying the registrar for
|
||||
the same info.
|
||||
|
||||
'''
|
||||
actor: Actor = current_actor()
|
||||
to_scan: dict[tuple, list[Channel]] = actor._peers.copy()
|
||||
pchan: Channel|None = actor._parent_chan
|
||||
if pchan:
|
||||
to_scan[pchan.uid].append(pchan)
|
||||
|
||||
for aid, chans in to_scan.items():
|
||||
_, peer_name = aid
|
||||
if name == peer_name:
|
||||
if not chans:
|
||||
log.warning(
|
||||
'No IPC chans for matching peer {peer_name}\n'
|
||||
)
|
||||
continue
|
||||
return chans
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@acm
|
||||
async def query_actor(
|
||||
name: str,
|
||||
arbiter_sockaddr: tuple[str, int] | None = None,
|
||||
regaddr: tuple[str, int] | None = None,
|
||||
regaddr: tuple[str, int]|None = None,
|
||||
|
||||
) -> AsyncGenerator[
|
||||
tuple[str, int] | None,
|
||||
tuple[str, int]|None,
|
||||
None,
|
||||
]:
|
||||
'''
|
||||
Make a transport address lookup for an actor name to a specific
|
||||
registrar.
|
||||
Lookup a transport address (by actor name) via querying a registrar
|
||||
listening @ `regaddr`.
|
||||
|
||||
Returns the (socket) address or ``None`` if no entry under that
|
||||
name exists for the given registrar listening @ `regaddr`.
|
||||
Returns the transport protocol (socket) address or `None` if no
|
||||
entry under that name exists.
|
||||
|
||||
'''
|
||||
actor: Actor = current_actor()
|
||||
|
@ -137,14 +157,10 @@ async def query_actor(
|
|||
'The current actor IS the registry!?'
|
||||
)
|
||||
|
||||
if arbiter_sockaddr is not None:
|
||||
warnings.warn(
|
||||
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
|
||||
'Use `registry_addrs: list[tuple]` instead!',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
regaddr: list[tuple[str, int]] = arbiter_sockaddr
|
||||
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||
if maybe_peers:
|
||||
yield maybe_peers[0].raddr
|
||||
return
|
||||
|
||||
reg_portal: Portal
|
||||
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
|
||||
|
@ -159,10 +175,28 @@ async def query_actor(
|
|||
yield sockaddr
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_portal(
|
||||
addr: tuple[str, int],
|
||||
name: str,
|
||||
):
|
||||
async with query_actor(
|
||||
name=name,
|
||||
regaddr=addr,
|
||||
) as sockaddr:
|
||||
pass
|
||||
|
||||
if sockaddr:
|
||||
async with _connect_chan(*sockaddr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
else:
|
||||
yield None
|
||||
|
||||
|
||||
@acm
|
||||
async def find_actor(
|
||||
name: str,
|
||||
arbiter_sockaddr: tuple[str, int]|None = None,
|
||||
registry_addrs: list[tuple[str, int]]|None = None,
|
||||
|
||||
only_first: bool = True,
|
||||
|
@ -179,29 +213,12 @@ async def find_actor(
|
|||
known to the arbiter.
|
||||
|
||||
'''
|
||||
if arbiter_sockaddr is not None:
|
||||
warnings.warn(
|
||||
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
|
||||
'Use `registry_addrs: list[tuple]` instead!',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
|
||||
|
||||
@acm
|
||||
async def maybe_open_portal_from_reg_addr(
|
||||
addr: tuple[str, int],
|
||||
):
|
||||
async with query_actor(
|
||||
name=name,
|
||||
regaddr=addr,
|
||||
) as sockaddr:
|
||||
if sockaddr:
|
||||
async with _connect_chan(*sockaddr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
else:
|
||||
yield None
|
||||
# optimization path, use any pre-existing peer channel
|
||||
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||
if maybe_peers and only_first:
|
||||
async with open_portal(maybe_peers[0]) as peer_portal:
|
||||
yield peer_portal
|
||||
return
|
||||
|
||||
if not registry_addrs:
|
||||
# XXX NOTE: make sure to dynamically read the value on
|
||||
|
@ -217,10 +234,13 @@ async def find_actor(
|
|||
maybe_portals: list[
|
||||
AsyncContextManager[tuple[str, int]]
|
||||
] = list(
|
||||
maybe_open_portal_from_reg_addr(addr)
|
||||
maybe_open_portal(
|
||||
addr=addr,
|
||||
name=name,
|
||||
)
|
||||
for addr in registry_addrs
|
||||
)
|
||||
|
||||
portals: list[Portal]
|
||||
async with gather_contexts(
|
||||
mngrs=maybe_portals,
|
||||
) as portals:
|
||||
|
@ -254,31 +274,31 @@ async def find_actor(
|
|||
@acm
|
||||
async def wait_for_actor(
|
||||
name: str,
|
||||
arbiter_sockaddr: tuple[str, int] | None = None,
|
||||
registry_addr: tuple[str, int] | None = None,
|
||||
|
||||
) -> AsyncGenerator[Portal, None]:
|
||||
'''
|
||||
Wait on an actor to register with the arbiter.
|
||||
|
||||
A portal to the first registered actor is returned.
|
||||
Wait on at least one peer actor to register `name` with the
|
||||
registrar, yield a `Portal to the first registree.
|
||||
|
||||
'''
|
||||
actor: Actor = current_actor()
|
||||
|
||||
if arbiter_sockaddr is not None:
|
||||
warnings.warn(
|
||||
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
|
||||
'Use `registry_addr: tuple` instead!',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
registry_addr: tuple[str, int] = arbiter_sockaddr
|
||||
# optimization path, use any pre-existing peer channel
|
||||
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||
if maybe_peers:
|
||||
async with open_portal(maybe_peers[0]) as peer_portal:
|
||||
yield peer_portal
|
||||
return
|
||||
|
||||
regaddr: tuple[str, int] = (
|
||||
registry_addr
|
||||
or
|
||||
actor.reg_addrs[0]
|
||||
)
|
||||
# TODO: use `.trionics.gather_contexts()` like
|
||||
# above in `find_actor()` as well?
|
||||
reg_portal: Portal
|
||||
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
|
||||
async with get_registry(*regaddr) as reg_portal:
|
||||
sockaddrs = await reg_portal.run_from_ns(
|
||||
'self',
|
||||
|
|
|
@ -243,6 +243,7 @@ def _trio_main(
|
|||
nest_from_op(
|
||||
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
||||
tree_str=actor_info,
|
||||
back_from_op=1,
|
||||
)
|
||||
)
|
||||
try:
|
||||
|
|
|
@ -263,11 +263,11 @@ class Portal:
|
|||
return False
|
||||
|
||||
reminfo: str = (
|
||||
f'Portal.cancel_actor() => {self.channel.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
f'c)=> {self.channel.uid}\n'
|
||||
f' |_{chan}\n'
|
||||
)
|
||||
log.cancel(
|
||||
f'Requesting runtime cancel for peer\n\n'
|
||||
f'Requesting actor-runtime cancel for peer\n\n'
|
||||
f'{reminfo}'
|
||||
)
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ async def open_root_actor(
|
|||
|
||||
# enables the multi-process debugger support
|
||||
debug_mode: bool = False,
|
||||
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
|
||||
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
|
||||
enable_stack_on_sig: bool = False,
|
||||
|
||||
# internal logging
|
||||
|
@ -233,14 +233,8 @@ async def open_root_actor(
|
|||
and
|
||||
enable_stack_on_sig
|
||||
):
|
||||
try:
|
||||
logger.info('Enabling `stackscope` traces on SIGUSR1')
|
||||
from .devx import enable_stack_on_sig
|
||||
enable_stack_on_sig()
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
'`stackscope` not installed for use in debug mode!'
|
||||
)
|
||||
from .devx._stackscope import enable_stack_on_sig
|
||||
enable_stack_on_sig()
|
||||
|
||||
# closed into below ping task-func
|
||||
ponged_addrs: list[tuple[str, int]] = []
|
||||
|
|
|
@ -115,25 +115,26 @@ class Actor:
|
|||
'''
|
||||
The fundamental "runtime" concurrency primitive.
|
||||
|
||||
An *actor* is the combination of a regular Python process executing
|
||||
a ``trio`` task tree, communicating with other actors through
|
||||
"memory boundary portals" - which provide a native async API around
|
||||
IPC transport "channels" which themselves encapsulate various
|
||||
(swappable) network protocols.
|
||||
An "actor" is the combination of a regular Python process
|
||||
executing a `trio.run()` task tree, communicating with other
|
||||
"actors" through "memory boundary portals": `Portal`, which
|
||||
provide a high-level async API around IPC "channels" (`Channel`)
|
||||
which themselves encapsulate various (swappable) network
|
||||
transport protocols for sending msgs between said memory domains
|
||||
(processes, hosts, non-GIL threads).
|
||||
|
||||
|
||||
Each "actor" is ``trio.run()`` scheduled "runtime" composed of
|
||||
many concurrent tasks in a single thread. The "runtime" tasks
|
||||
conduct a slew of low(er) level functions to make it possible
|
||||
for message passing between actors as well as the ability to
|
||||
create new actors (aka new "runtimes" in new processes which
|
||||
are supervised via a nursery construct). Each task which sends
|
||||
messages to a task in a "peer" (not necessarily a parent-child,
|
||||
Each "actor" is `trio.run()` scheduled "runtime" composed of many
|
||||
concurrent tasks in a single thread. The "runtime" tasks conduct
|
||||
a slew of low(er) level functions to make it possible for message
|
||||
passing between actors as well as the ability to create new
|
||||
actors (aka new "runtimes" in new processes which are supervised
|
||||
via an "actor-nursery" construct). Each task which sends messages
|
||||
to a task in a "peer" actor (not necessarily a parent-child,
|
||||
depth hierarchy) is able to do so via an "address", which maps
|
||||
IPC connections across memory boundaries, and a task request id
|
||||
which allows for per-actor tasks to send and receive messages
|
||||
to specific peer-actor tasks with which there is an ongoing
|
||||
RPC/IPC dialog.
|
||||
which allows for per-actor tasks to send and receive messages to
|
||||
specific peer-actor tasks with which there is an ongoing RPC/IPC
|
||||
dialog.
|
||||
|
||||
'''
|
||||
# ugh, we need to get rid of this and replace with a "registry" sys
|
||||
|
@ -230,17 +231,20 @@ class Actor:
|
|||
# by the user (currently called the "arbiter")
|
||||
self._spawn_method: str = spawn_method
|
||||
|
||||
self._peers: defaultdict = defaultdict(list)
|
||||
self._peers: defaultdict[
|
||||
str, # uaid
|
||||
list[Channel], # IPC conns from peer
|
||||
] = defaultdict(list)
|
||||
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
|
||||
self._no_more_peers = trio.Event()
|
||||
self._no_more_peers.set()
|
||||
|
||||
# RPC state
|
||||
self._ongoing_rpc_tasks = trio.Event()
|
||||
self._ongoing_rpc_tasks.set()
|
||||
|
||||
# (chan, cid) -> (cancel_scope, func)
|
||||
self._rpc_tasks: dict[
|
||||
tuple[Channel, str],
|
||||
tuple[Context, Callable, trio.Event]
|
||||
tuple[Channel, str], # (chan, cid)
|
||||
tuple[Context, Callable, trio.Event] # (ctx=>, fn(), done?)
|
||||
] = {}
|
||||
|
||||
# map {actor uids -> Context}
|
||||
|
@ -317,7 +321,10 @@ class Actor:
|
|||
event = self._peer_connected.setdefault(uid, trio.Event())
|
||||
await event.wait()
|
||||
log.debug(f'{uid!r} successfully connected back to us')
|
||||
return event, self._peers[uid][-1]
|
||||
return (
|
||||
event,
|
||||
self._peers[uid][-1],
|
||||
)
|
||||
|
||||
def load_modules(
|
||||
self,
|
||||
|
@ -408,26 +415,11 @@ class Actor:
|
|||
'''
|
||||
self._no_more_peers = trio.Event() # unset by making new
|
||||
chan = Channel.from_stream(stream)
|
||||
their_uid: tuple[str, str]|None = chan.uid
|
||||
|
||||
con_status: str = ''
|
||||
|
||||
# TODO: remove this branch since can never happen?
|
||||
# NOTE: `.uid` is only set after first contact
|
||||
if their_uid:
|
||||
con_status = (
|
||||
'IPC Re-connection from already known peer?\n'
|
||||
)
|
||||
else:
|
||||
con_status = (
|
||||
'New inbound IPC connection <=\n'
|
||||
)
|
||||
|
||||
con_status += (
|
||||
con_status: str = (
|
||||
'New inbound IPC connection <=\n'
|
||||
f'|_{chan}\n'
|
||||
# f' |_@{chan.raddr}\n\n'
|
||||
# ^-TODO-^ remove since alfready in chan.__repr__()?
|
||||
)
|
||||
|
||||
# send/receive initial handshake response
|
||||
try:
|
||||
uid: tuple|None = await self._do_handshake(chan)
|
||||
|
@ -439,10 +431,10 @@ class Actor:
|
|||
|
||||
TransportClosed,
|
||||
):
|
||||
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
||||
# and ``MsgpackStream._inter_packets()`` on a read from the
|
||||
# XXX: This may propagate up from `Channel._aiter_recv()`
|
||||
# and `MsgpackStream._inter_packets()` on a read from the
|
||||
# stream particularly when the runtime is first starting up
|
||||
# inside ``open_root_actor()`` where there is a check for
|
||||
# inside `open_root_actor()` where there is a check for
|
||||
# a bound listener on the "arbiter" addr. the reset will be
|
||||
# because the handshake was never meant took place.
|
||||
log.runtime(
|
||||
|
@ -452,9 +444,22 @@ class Actor:
|
|||
)
|
||||
return
|
||||
|
||||
familiar: str = 'new-peer'
|
||||
if _pre_chan := self._peers.get(uid):
|
||||
familiar: str = 'pre-existing-peer'
|
||||
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||
con_status += (
|
||||
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
|
||||
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||
)
|
||||
|
||||
if _pre_chan:
|
||||
log.warning(
|
||||
# con_status += (
|
||||
# ^TODO^ swap once we minimize conn duplication
|
||||
f' -> Wait, we already have IPC with `{uid_short}`??\n'
|
||||
f' |_{_pre_chan}\n'
|
||||
)
|
||||
|
||||
# IPC connection tracking for both peers and new children:
|
||||
# - if this is a new channel to a locally spawned
|
||||
# sub-actor there will be a spawn wait even registered
|
||||
|
@ -507,8 +512,9 @@ class Actor:
|
|||
)
|
||||
except trio.Cancelled:
|
||||
log.cancel(
|
||||
'IPC transport msg loop was cancelled for \n'
|
||||
f'|_{chan}\n'
|
||||
'IPC transport msg loop was cancelled\n'
|
||||
f'c)>\n'
|
||||
f' |_{chan}\n'
|
||||
)
|
||||
raise
|
||||
|
||||
|
@ -545,9 +551,9 @@ class Actor:
|
|||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer\n'
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.uid}\n'
|
||||
f' |_{chan.uid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
|
@ -646,10 +652,14 @@ class Actor:
|
|||
):
|
||||
report: str = (
|
||||
'Timed out waiting on local actor-nursery to exit?\n'
|
||||
f'{local_nursery}\n'
|
||||
f'c)>\n'
|
||||
f' |_{local_nursery}\n'
|
||||
)
|
||||
if children := local_nursery._children:
|
||||
report += f' |_{pformat(children)}\n'
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
||||
|
@ -1236,8 +1246,9 @@ class Actor:
|
|||
# TODO: just use the new `Context.repr_rpc: str` (and
|
||||
# other) repr fields instead of doing this all manual..
|
||||
msg: str = (
|
||||
f'Runtime cancel request from {requester_type}:\n\n'
|
||||
f'<= .cancel(): {requesting_uid}\n\n'
|
||||
f'Actor-runtime cancel request from {requester_type}\n\n'
|
||||
f'<=c) {requesting_uid}\n'
|
||||
f' |_{self}\n'
|
||||
)
|
||||
|
||||
# TODO: what happens here when we self-cancel tho?
|
||||
|
@ -1347,7 +1358,7 @@ class Actor:
|
|||
log.cancel(
|
||||
'Rxed cancel request for RPC task\n'
|
||||
f'<=c) {requesting_uid}\n'
|
||||
f' |_{ctx._task}\n'
|
||||
f' |_{ctx._task}\n'
|
||||
f' >> {ctx.repr_rpc}\n'
|
||||
# f'=> {ctx._task}\n'
|
||||
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
||||
|
@ -1465,17 +1476,17 @@ class Actor:
|
|||
"IPC channel's "
|
||||
)
|
||||
rent_chan_repr: str = (
|
||||
f' |_{parent_chan}\n\n'
|
||||
f' |_{parent_chan}\n\n'
|
||||
if parent_chan
|
||||
else ''
|
||||
)
|
||||
log.cancel(
|
||||
f'Cancelling {descr} RPC tasks\n\n'
|
||||
f'<= canceller: {req_uid}\n'
|
||||
f'<=c) {req_uid} [canceller]\n'
|
||||
f'{rent_chan_repr}'
|
||||
f'=> cancellee: {self.uid}\n'
|
||||
f' |_{self}.cancel_rpc_tasks()\n'
|
||||
f' |_tasks: {len(tasks)}\n'
|
||||
f'c)=> {self.uid} [cancellee]\n'
|
||||
f' |_{self} [with {len(tasks)} tasks]\n'
|
||||
# f' |_tasks: {len(tasks)}\n'
|
||||
# f'{tasks_str}'
|
||||
)
|
||||
for (
|
||||
|
@ -1544,7 +1555,7 @@ class Actor:
|
|||
def accept_addr(self) -> tuple[str, int]:
|
||||
'''
|
||||
Primary address to which the IPC transport server is
|
||||
bound.
|
||||
bound and listening for new connections.
|
||||
|
||||
'''
|
||||
# throws OSError on failure
|
||||
|
@ -1561,6 +1572,7 @@ class Actor:
|
|||
def get_chans(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
|
||||
) -> list[Channel]:
|
||||
'''
|
||||
Return all IPC channels to the actor with provided `uid`.
|
||||
|
@ -1932,9 +1944,15 @@ async def async_main(
|
|||
with CancelScope(shield=True):
|
||||
await actor._no_more_peers.wait()
|
||||
|
||||
teardown_report += ('-> All peer channels are complete\n')
|
||||
teardown_report += (
|
||||
'-> All peer channels are complete\n'
|
||||
)
|
||||
|
||||
teardown_report += ('Actor runtime exited')
|
||||
teardown_report += (
|
||||
'Actor runtime exiting\n'
|
||||
f'>)\n'
|
||||
f'|_{actor}\n'
|
||||
)
|
||||
log.info(teardown_report)
|
||||
|
||||
|
||||
|
|
|
@ -54,6 +54,25 @@ def examples_dir() -> pathlib.Path:
|
|||
return repodir() / 'examples'
|
||||
|
||||
|
||||
def mk_cmd(
|
||||
ex_name: str,
|
||||
exs_subpath: str = 'debugging',
|
||||
) -> str:
|
||||
'''
|
||||
Generate a shell command suitable to pass to ``pexpect.spawn()``.
|
||||
|
||||
'''
|
||||
script_path: pathlib.Path = (
|
||||
examples_dir()
|
||||
/ exs_subpath
|
||||
/ f'{ex_name}.py'
|
||||
)
|
||||
return ' '.join([
|
||||
'python',
|
||||
str(script_path)
|
||||
])
|
||||
|
||||
|
||||
@acm
|
||||
async def expect_ctxc(
|
||||
yay: bool,
|
||||
|
|
|
@ -26,7 +26,7 @@ from ._debug import (
|
|||
breakpoint as breakpoint,
|
||||
pause as pause,
|
||||
pause_from_sync as pause_from_sync,
|
||||
shield_sigint_handler as shield_sigint_handler,
|
||||
sigint_shield as sigint_shield,
|
||||
open_crash_handler as open_crash_handler,
|
||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||
maybe_init_greenback as maybe_init_greenback,
|
||||
|
|
|
@ -409,9 +409,9 @@ class Lock:
|
|||
repl_task
|
||||
)
|
||||
message += (
|
||||
f'\nA non-caller task still owns this lock on behalf of '
|
||||
f'{behalf_of_task}\n'
|
||||
f'|_{lock_stats.owner}\n'
|
||||
f'A non-caller task still owns this lock on behalf of '
|
||||
f'`{behalf_of_task}`\n'
|
||||
f'lock owner task: {lock_stats.owner}\n'
|
||||
)
|
||||
|
||||
if (
|
||||
|
@ -523,6 +523,10 @@ class Lock:
|
|||
)
|
||||
|
||||
|
||||
def get_lock() -> Lock:
|
||||
return Lock
|
||||
|
||||
|
||||
@tractor.context(
|
||||
# enable the locking msgspec
|
||||
pld_spec=__pld_spec__,
|
||||
|
@ -788,13 +792,13 @@ class DebugStatus:
|
|||
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
|
||||
signal.signal,
|
||||
signal.SIGINT,
|
||||
shield_sigint_handler,
|
||||
sigint_shield,
|
||||
)
|
||||
|
||||
else:
|
||||
cls._orig_sigint_handler = signal.signal(
|
||||
signal.SIGINT,
|
||||
shield_sigint_handler,
|
||||
sigint_shield,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
@ -900,12 +904,30 @@ class DebugStatus:
|
|||
|
||||
# actor-local state, irrelevant for non-root.
|
||||
cls.repl_task = None
|
||||
|
||||
# XXX WARNING needs very special caughtion, and we should
|
||||
# prolly make a more explicit `@property` API?
|
||||
#
|
||||
# - if unset in root multi-threaded case can cause
|
||||
# issues with detecting that some root thread is
|
||||
# using a REPL,
|
||||
#
|
||||
# - what benefit is there to unsetting, it's always
|
||||
# set again for the next task in some actor..
|
||||
# only thing would be to avoid in the sigint-handler
|
||||
# logging when we don't need to?
|
||||
cls.repl = None
|
||||
|
||||
# restore original sigint handler
|
||||
cls.unshield_sigint()
|
||||
|
||||
|
||||
|
||||
# TODO: use the new `@lowlevel.singleton` for this!
|
||||
def get_debug_req() -> DebugStatus|None:
|
||||
return DebugStatus
|
||||
|
||||
|
||||
class TractorConfig(pdbp.DefaultConfig):
|
||||
'''
|
||||
Custom `pdbp` config which tries to use the best tradeoff
|
||||
|
@ -1311,7 +1333,7 @@ def any_connected_locker_child() -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def shield_sigint_handler(
|
||||
def sigint_shield(
|
||||
signum: int,
|
||||
frame: 'frame', # type: ignore # noqa
|
||||
*args,
|
||||
|
@ -1351,13 +1373,17 @@ def shield_sigint_handler(
|
|||
# root actor branch that reports whether or not a child
|
||||
# has locked debugger.
|
||||
if is_root_process():
|
||||
# log.warning(
|
||||
log.devx(
|
||||
'Handling SIGINT in root actor\n'
|
||||
f'{Lock.repr()}'
|
||||
f'{DebugStatus.repr()}\n'
|
||||
)
|
||||
# try to see if the supposed (sub)actor in debug still
|
||||
# has an active connection to *this* actor, and if not
|
||||
# it's likely they aren't using the TTY lock / debugger
|
||||
# and we should propagate SIGINT normally.
|
||||
any_connected: bool = any_connected_locker_child()
|
||||
# if not any_connected:
|
||||
# return do_cancel()
|
||||
|
||||
problem = (
|
||||
f'root {actor.uid} handling SIGINT\n'
|
||||
|
@ -1406,19 +1432,25 @@ def shield_sigint_handler(
|
|||
# an actor using the `Lock` (a bug state) ??
|
||||
# => so immediately cancel any stale lock cs and revert
|
||||
# the handler!
|
||||
if not repl:
|
||||
if not DebugStatus.repl:
|
||||
# TODO: WHEN should we revert back to ``trio``
|
||||
# handler if this one is stale?
|
||||
# -[ ] maybe after a counts work of ctl-c mashes?
|
||||
# -[ ] use a state var like `stale_handler: bool`?
|
||||
problem += (
|
||||
'\n'
|
||||
'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n'
|
||||
'BUT, the root should be using it, WHY this handler ??\n'
|
||||
'BUT, the root should be using it, WHY this handler ??\n\n'
|
||||
'So either..\n'
|
||||
'- some root-thread is using it but has no `.repl` set?, OR\n'
|
||||
'- something else weird is going on outside the runtime!?\n'
|
||||
)
|
||||
else:
|
||||
# NOTE: since we emit this msg on ctl-c, we should
|
||||
# also always re-print the prompt the tail block!
|
||||
log.pdb(
|
||||
'Ignoring SIGINT while pdb REPL in use by root actor..\n'
|
||||
f'{DebugStatus.repl_task}\n'
|
||||
f' |_{repl}\n'
|
||||
)
|
||||
problem = None
|
||||
|
||||
|
@ -1468,7 +1500,6 @@ def shield_sigint_handler(
|
|||
'Allowing SIGINT propagation..'
|
||||
)
|
||||
DebugStatus.unshield_sigint()
|
||||
# do_cancel()
|
||||
|
||||
repl_task: str|None = DebugStatus.repl_task
|
||||
req_task: str|None = DebugStatus.req_task
|
||||
|
@ -1483,10 +1514,15 @@ def shield_sigint_handler(
|
|||
f' |_{repl}\n'
|
||||
)
|
||||
elif req_task:
|
||||
log.pdb(
|
||||
f'Ignoring SIGINT while debug request task is open\n'
|
||||
log.debug(
|
||||
'Ignoring SIGINT while debug request task is open but either,\n'
|
||||
'- someone else is already REPL-in and has the `Lock`, or\n'
|
||||
'- some other local task already is replin?\n'
|
||||
f'|_{req_task}\n'
|
||||
)
|
||||
|
||||
# TODO can we remove this now?
|
||||
# -[ ] does this path ever get hit any more?
|
||||
else:
|
||||
msg: str = (
|
||||
'SIGINT shield handler still active BUT, \n\n'
|
||||
|
@ -1522,37 +1558,53 @@ def shield_sigint_handler(
|
|||
# https://github.com/goodboy/tractor/issues/320
|
||||
# elif debug_mode():
|
||||
|
||||
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
|
||||
# it looks to be that the last command that was run (eg. ll)
|
||||
# will be repeated by default.
|
||||
|
||||
# maybe redraw/print last REPL output to console since
|
||||
# we want to alert the user that more input is expect since
|
||||
# nothing has been done dur to ignoring sigint.
|
||||
if (
|
||||
repl # only when current actor has a REPL engaged
|
||||
DebugStatus.repl # only when current actor has a REPL engaged
|
||||
):
|
||||
flush_status: str = (
|
||||
'Flushing stdout to ensure new prompt line!\n'
|
||||
)
|
||||
|
||||
# XXX: yah, mega hack, but how else do we catch this madness XD
|
||||
if repl.shname == 'xonsh':
|
||||
if (
|
||||
repl.shname == 'xonsh'
|
||||
):
|
||||
flush_status += (
|
||||
'-> ALSO re-flushing due to `xonsh`..\n'
|
||||
)
|
||||
repl.stdout.write(repl.prompt)
|
||||
|
||||
# log.warning(
|
||||
log.devx(
|
||||
flush_status
|
||||
)
|
||||
repl.stdout.flush()
|
||||
|
||||
# TODO: make this work like sticky mode where if there is output
|
||||
# detected as written to the tty we redraw this part underneath
|
||||
# and erase the past draw of this same bit above?
|
||||
# TODO: better console UX to match the current "mode":
|
||||
# -[ ] for example if in sticky mode where if there is output
|
||||
# detected as written to the tty we redraw this part underneath
|
||||
# and erase the past draw of this same bit above?
|
||||
# repl.sticky = True
|
||||
# repl._print_if_sticky()
|
||||
|
||||
# also see these links for an approach from ``ptk``:
|
||||
# also see these links for an approach from `ptk`:
|
||||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||
else:
|
||||
log.devx(
|
||||
# log.warning(
|
||||
'Not flushing stdout since not needed?\n'
|
||||
f'|_{repl}\n'
|
||||
)
|
||||
|
||||
# XXX only for tracing this handler
|
||||
log.devx('exiting SIGINT')
|
||||
|
||||
|
||||
_pause_msg: str = 'Attaching to pdb REPL in actor'
|
||||
_pause_msg: str = 'Opening a pdb REPL in paused actor'
|
||||
|
||||
|
||||
class DebugRequestError(RuntimeError):
|
||||
|
@ -1617,7 +1669,7 @@ async def _pause(
|
|||
# 'directly (infected) `asyncio` tasks!'
|
||||
# ) from rte
|
||||
|
||||
raise
|
||||
raise rte
|
||||
|
||||
if debug_func is not None:
|
||||
debug_func = partial(debug_func)
|
||||
|
@ -1625,9 +1677,13 @@ async def _pause(
|
|||
# XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug
|
||||
# request from a subactor BEFORE the REPL is entered by that
|
||||
# process.
|
||||
if not repl:
|
||||
if (
|
||||
not repl
|
||||
and
|
||||
debug_func
|
||||
):
|
||||
repl: PdbREPL = mk_pdb()
|
||||
DebugStatus.shield_sigint()
|
||||
repl: PdbREPL = repl or mk_pdb()
|
||||
|
||||
# TODO: move this into a `open_debug_request()` @acm?
|
||||
# -[ ] prolly makes the most sense to do the request
|
||||
|
@ -1662,7 +1718,13 @@ async def _pause(
|
|||
# recurrent entries/requests from the same
|
||||
# actor-local task.
|
||||
DebugStatus.repl_task = task
|
||||
DebugStatus.repl = repl
|
||||
if repl:
|
||||
DebugStatus.repl = repl
|
||||
else:
|
||||
log.error(
|
||||
'No REPl instance set before entering `debug_func`?\n'
|
||||
f'{debug_func}\n'
|
||||
)
|
||||
|
||||
# invoke the low-level REPL activation routine which itself
|
||||
# should call into a `Pdb.set_trace()` of some sort.
|
||||
|
@ -2001,7 +2063,7 @@ async def _pause(
|
|||
DebugStatus.release(cancel_req_task=True)
|
||||
|
||||
# sanity checks for ^ on request/status teardown
|
||||
assert DebugStatus.repl is None
|
||||
# assert DebugStatus.repl is None # XXX no more bc bg thread cases?
|
||||
assert DebugStatus.repl_task is None
|
||||
|
||||
# sanity, for when hackin on all this?
|
||||
|
@ -2050,9 +2112,8 @@ def _set_trace(
|
|||
# root here? Bo
|
||||
log.pdb(
|
||||
f'{_pause_msg}\n'
|
||||
# '|\n'
|
||||
f'>(\n'
|
||||
f' |_ {task} @ {actor.uid}\n'
|
||||
f'|_ {task} @ {actor.uid}\n'
|
||||
# ^-TODO-^ more compact pformating?
|
||||
# -[ ] make an `Actor.__repr()__`
|
||||
# -[ ] should we use `log.pformat_task_uid()`?
|
||||
|
@ -2241,7 +2302,12 @@ async def _pause_from_bg_root_thread(
|
|||
'Trying to acquire `Lock` on behalf of bg thread\n'
|
||||
f'|_{behalf_of_thread}\n'
|
||||
)
|
||||
# DebugStatus.repl_task = behalf_of_thread
|
||||
|
||||
# NOTE: this is already a task inside the main-`trio`-thread, so
|
||||
# we don't need to worry about calling it another time from the
|
||||
# bg thread on which who's behalf this task is operating.
|
||||
DebugStatus.shield_sigint()
|
||||
|
||||
out = await _pause(
|
||||
debug_func=None,
|
||||
repl=repl,
|
||||
|
@ -2250,6 +2316,8 @@ async def _pause_from_bg_root_thread(
|
|||
called_from_bg_thread=True,
|
||||
**_pause_kwargs
|
||||
)
|
||||
DebugStatus.repl_task = behalf_of_thread
|
||||
|
||||
lock: trio.FIFOLock = Lock._debug_lock
|
||||
stats: trio.LockStatistics= lock.statistics()
|
||||
assert stats.owner is task
|
||||
|
@ -2283,7 +2351,6 @@ async def _pause_from_bg_root_thread(
|
|||
f'|_{behalf_of_thread}\n'
|
||||
)
|
||||
task_status.started(out)
|
||||
DebugStatus.shield_sigint()
|
||||
|
||||
# wait for bg thread to exit REPL sesh.
|
||||
try:
|
||||
|
@ -2324,7 +2391,7 @@ def pause_from_sync(
|
|||
err_on_no_runtime=False,
|
||||
)
|
||||
message: str = (
|
||||
f'{actor.uid} task called `tractor.pause_from_sync()`\n\n'
|
||||
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
|
||||
)
|
||||
if not actor:
|
||||
raise RuntimeError(
|
||||
|
@ -2348,7 +2415,6 @@ def pause_from_sync(
|
|||
'for infected `asyncio` mode!'
|
||||
)
|
||||
|
||||
DebugStatus.shield_sigint()
|
||||
repl: PdbREPL = mk_pdb()
|
||||
|
||||
# message += f'-> created local REPL {repl}\n'
|
||||
|
@ -2366,6 +2432,10 @@ def pause_from_sync(
|
|||
# thread which will call `._pause()` manually with special
|
||||
# handling for root-actor caller usage.
|
||||
if not DebugStatus.is_main_trio_thread():
|
||||
|
||||
# TODO: `threading.Lock()` this so we don't get races in
|
||||
# multi-thr cases where they're acquiring/releasing the
|
||||
# REPL and setting request/`Lock` state, etc..
|
||||
thread: threading.Thread = threading.current_thread()
|
||||
repl_owner = thread
|
||||
|
||||
|
@ -2373,9 +2443,16 @@ def pause_from_sync(
|
|||
if is_root:
|
||||
message += (
|
||||
f'-> called from a root-actor bg {thread}\n'
|
||||
f'-> scheduling `._pause_from_sync_thread()`..\n'
|
||||
f'-> scheduling `._pause_from_bg_root_thread()`..\n'
|
||||
)
|
||||
bg_task, repl = trio.from_thread.run(
|
||||
# XXX SUBTLE BADNESS XXX that should really change!
|
||||
# don't over-write the `repl` here since when
|
||||
# this behalf-of-bg_thread-task calls pause it will
|
||||
# pass `debug_func=None` which will result in it
|
||||
# returing a `repl==None` output and that get's also
|
||||
# `.started(out)` back here! So instead just ignore
|
||||
# that output and assign the `repl` created above!
|
||||
bg_task, _ = trio.from_thread.run(
|
||||
afn=partial(
|
||||
actor._service_n.start,
|
||||
partial(
|
||||
|
@ -2387,8 +2464,9 @@ def pause_from_sync(
|
|||
),
|
||||
)
|
||||
)
|
||||
DebugStatus.shield_sigint()
|
||||
message += (
|
||||
f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n'
|
||||
f'-> `._pause_from_bg_root_thread()` started bg task {bg_task}\n'
|
||||
)
|
||||
else:
|
||||
message += f'-> called from a bg {thread}\n'
|
||||
|
@ -2397,7 +2475,7 @@ def pause_from_sync(
|
|||
# `request_root_stdio_lock()` and we don't need to
|
||||
# worry about all the special considerations as with
|
||||
# the root-actor per above.
|
||||
bg_task, repl = trio.from_thread.run(
|
||||
bg_task, _ = trio.from_thread.run(
|
||||
afn=partial(
|
||||
_pause,
|
||||
debug_func=None,
|
||||
|
@ -2412,6 +2490,9 @@ def pause_from_sync(
|
|||
**_pause_kwargs
|
||||
),
|
||||
)
|
||||
# ?TODO? XXX where do we NEED to call this in the
|
||||
# subactor-bg-thread case?
|
||||
DebugStatus.shield_sigint()
|
||||
assert bg_task is not DebugStatus.repl_task
|
||||
|
||||
else: # we are presumably the `trio.run()` + main thread
|
||||
|
@ -2424,6 +2505,11 @@ def pause_from_sync(
|
|||
# greenback: ModuleType = await maybe_init_greenback()
|
||||
|
||||
message += f'-> imported {greenback}\n'
|
||||
|
||||
# NOTE XXX seems to need to be set BEFORE the `_pause()`
|
||||
# invoke using gb below?
|
||||
DebugStatus.shield_sigint()
|
||||
|
||||
repl_owner: Task = current_task()
|
||||
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
||||
try:
|
||||
|
@ -2449,9 +2535,12 @@ def pause_from_sync(
|
|||
raise
|
||||
|
||||
if out:
|
||||
bg_task, repl = out
|
||||
assert repl is repl
|
||||
assert bg_task is repl_owner
|
||||
bg_task, _ = out
|
||||
else:
|
||||
bg_task: Task = current_task()
|
||||
|
||||
# assert repl is repl
|
||||
assert bg_task is repl_owner
|
||||
|
||||
# NOTE: normally set inside `_enter_repl_sync()`
|
||||
DebugStatus.repl_task: str = repl_owner
|
||||
|
@ -2465,7 +2554,10 @@ def pause_from_sync(
|
|||
)
|
||||
log.devx(message)
|
||||
|
||||
# NOTE set as late as possible to avoid state clobbering
|
||||
# in the multi-threaded case!
|
||||
DebugStatus.repl = repl
|
||||
|
||||
_set_trace(
|
||||
api_frame=api_frame or inspect.currentframe(),
|
||||
repl=repl,
|
||||
|
@ -2523,7 +2615,7 @@ async def breakpoint(
|
|||
|
||||
|
||||
_crash_msg: str = (
|
||||
'Attaching to pdb REPL in crashed actor'
|
||||
'Opening a pdb REPL in crashed actor'
|
||||
)
|
||||
|
||||
|
||||
|
@ -2551,11 +2643,9 @@ def _post_mortem(
|
|||
# here! Bo
|
||||
log.pdb(
|
||||
f'{_crash_msg}\n'
|
||||
# '|\n'
|
||||
f'x>(\n'
|
||||
f' |_ {current_task()} @ {actor.uid}\n'
|
||||
f' |_ {current_task()} @ {actor.uid}\n'
|
||||
|
||||
# f'|_ @{actor.uid}\n'
|
||||
# TODO: make an `Actor.__repr()__`
|
||||
# f'|_ {current_task()} @ {actor.name}\n'
|
||||
)
|
||||
|
@ -2668,7 +2758,8 @@ async def acquire_debug_lock(
|
|||
tuple,
|
||||
]:
|
||||
'''
|
||||
Request to acquire the TTY `Lock` in the root actor, release on exit.
|
||||
Request to acquire the TTY `Lock` in the root actor, release on
|
||||
exit.
|
||||
|
||||
This helper is for actor's who don't actually need to acquired
|
||||
the debugger but want to wait until the lock is free in the
|
||||
|
@ -2680,10 +2771,14 @@ async def acquire_debug_lock(
|
|||
yield None
|
||||
return
|
||||
|
||||
task: Task = current_task()
|
||||
async with trio.open_nursery() as n:
|
||||
ctx: Context = await n.start(
|
||||
request_root_stdio_lock,
|
||||
subactor_uid,
|
||||
partial(
|
||||
request_root_stdio_lock,
|
||||
actor_uid=subactor_uid,
|
||||
task_uid=(task.name, id(task)),
|
||||
)
|
||||
)
|
||||
yield ctx
|
||||
ctx.cancel()
|
||||
|
|
|
@ -24,13 +24,24 @@ disjoint, parallel executing tasks in separate actors.
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
# from functools import partial
|
||||
from threading import (
|
||||
current_thread,
|
||||
Thread,
|
||||
RLock,
|
||||
)
|
||||
import multiprocessing as mp
|
||||
from signal import (
|
||||
signal,
|
||||
getsignal,
|
||||
SIGUSR1,
|
||||
)
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
# import traceback
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
Callable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
from tractor import (
|
||||
|
@ -51,26 +62,45 @@ if TYPE_CHECKING:
|
|||
|
||||
@trio.lowlevel.disable_ki_protection
|
||||
def dump_task_tree() -> None:
|
||||
import stackscope
|
||||
from tractor.log import get_console_log
|
||||
'''
|
||||
Do a classic `stackscope.extract()` task-tree dump to console at
|
||||
`.devx()` level.
|
||||
|
||||
'''
|
||||
import stackscope
|
||||
tree_str: str = str(
|
||||
stackscope.extract(
|
||||
trio.lowlevel.current_root_task(),
|
||||
recurse_child_tasks=True
|
||||
)
|
||||
)
|
||||
log = get_console_log(
|
||||
name=__name__,
|
||||
level='cancel',
|
||||
)
|
||||
actor: Actor = _state.current_actor()
|
||||
thr: Thread = current_thread()
|
||||
log.devx(
|
||||
f'Dumping `stackscope` tree for actor\n'
|
||||
f'{actor.name}: {actor}\n'
|
||||
f' |_{mp.current_process()}\n\n'
|
||||
f'{actor.uid}:\n'
|
||||
f'|_{mp.current_process()}\n'
|
||||
f' |_{thr}\n'
|
||||
f' |_{actor}\n\n'
|
||||
|
||||
# start-of-trace-tree delimiter (mostly for testing)
|
||||
'------ - ------\n'
|
||||
'\n'
|
||||
+
|
||||
f'{tree_str}\n'
|
||||
+
|
||||
# end-of-trace-tree delimiter (mostly for testing)
|
||||
f'\n'
|
||||
f'------ {actor.uid!r} ------\n'
|
||||
)
|
||||
# TODO: can remove this right?
|
||||
# -[ ] was original code from author
|
||||
#
|
||||
# print(
|
||||
# 'DUMPING FROM PRINT\n'
|
||||
# +
|
||||
# content
|
||||
# )
|
||||
# import logging
|
||||
# try:
|
||||
# with open("/dev/tty", "w") as tty:
|
||||
|
@ -80,58 +110,130 @@ def dump_task_tree() -> None:
|
|||
# "task_tree"
|
||||
# ).exception("Error printing task tree")
|
||||
|
||||
_handler_lock = RLock()
|
||||
_tree_dumped: bool = False
|
||||
|
||||
def signal_handler(
|
||||
|
||||
def dump_tree_on_sig(
|
||||
sig: int,
|
||||
frame: object,
|
||||
|
||||
relay_to_subs: bool = True,
|
||||
|
||||
) -> None:
|
||||
try:
|
||||
trio.lowlevel.current_trio_token(
|
||||
).run_sync_soon(dump_task_tree)
|
||||
except RuntimeError:
|
||||
# not in async context -- print a normal traceback
|
||||
traceback.print_stack()
|
||||
global _tree_dumped, _handler_lock
|
||||
with _handler_lock:
|
||||
if _tree_dumped:
|
||||
log.warning(
|
||||
'Already dumped for this actor...??'
|
||||
)
|
||||
return
|
||||
|
||||
_tree_dumped = True
|
||||
|
||||
# actor: Actor = _state.current_actor()
|
||||
log.devx(
|
||||
'Trying to dump `stackscope` tree..\n'
|
||||
)
|
||||
try:
|
||||
dump_task_tree()
|
||||
# await actor._service_n.start_soon(
|
||||
# partial(
|
||||
# trio.to_thread.run_sync,
|
||||
# dump_task_tree,
|
||||
# )
|
||||
# )
|
||||
# trio.lowlevel.current_trio_token().run_sync_soon(
|
||||
# dump_task_tree
|
||||
# )
|
||||
|
||||
except RuntimeError:
|
||||
log.exception(
|
||||
'Failed to dump `stackscope` tree..\n'
|
||||
)
|
||||
# not in async context -- print a normal traceback
|
||||
# traceback.print_stack()
|
||||
raise
|
||||
|
||||
except BaseException:
|
||||
log.exception(
|
||||
'Failed to dump `stackscope` tree..\n'
|
||||
)
|
||||
raise
|
||||
|
||||
log.devx(
|
||||
'Supposedly we dumped just fine..?'
|
||||
)
|
||||
|
||||
if not relay_to_subs:
|
||||
return
|
||||
|
||||
an: ActorNursery
|
||||
for an in _state.current_actor()._actoruid2nursery.values():
|
||||
|
||||
subproc: ProcessType
|
||||
subactor: Actor
|
||||
for subactor, subproc, _ in an._children.values():
|
||||
log.devx(
|
||||
log.warning(
|
||||
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
||||
f'{subactor}\n'
|
||||
f' |_{subproc}\n'
|
||||
)
|
||||
|
||||
if isinstance(subproc, trio.Process):
|
||||
subproc.send_signal(sig)
|
||||
# bc of course stdlib can't have a std API.. XD
|
||||
match subproc:
|
||||
case trio.Process():
|
||||
subproc.send_signal(sig)
|
||||
|
||||
elif isinstance(subproc, mp.Process):
|
||||
subproc._send_signal(sig)
|
||||
case mp.Process():
|
||||
subproc._send_signal(sig)
|
||||
|
||||
|
||||
def enable_stack_on_sig(
|
||||
sig: int = SIGUSR1
|
||||
) -> None:
|
||||
sig: int = SIGUSR1,
|
||||
) -> ModuleType:
|
||||
'''
|
||||
Enable `stackscope` tracing on reception of a signal; by
|
||||
default this is SIGUSR1.
|
||||
|
||||
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
|
||||
fancy cmds.
|
||||
|
||||
For ex. from `bash` using `pgrep` and cmd-sustitution
|
||||
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
|
||||
you could use:
|
||||
|
||||
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
|
||||
|
||||
Or with with `xonsh` (which has diff capture-from-subproc syntax)
|
||||
|
||||
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
||||
|
||||
'''
|
||||
try:
|
||||
import stackscope
|
||||
except ImportError:
|
||||
log.warning(
|
||||
'`stackscope` not installed for use in debug mode!'
|
||||
)
|
||||
return None
|
||||
|
||||
handler: Callable|int = getsignal(sig)
|
||||
if handler is dump_tree_on_sig:
|
||||
log.devx(
|
||||
'A `SIGUSR1` handler already exists?\n'
|
||||
f'|_ {handler!r}\n'
|
||||
)
|
||||
return
|
||||
|
||||
signal(
|
||||
sig,
|
||||
signal_handler,
|
||||
dump_tree_on_sig,
|
||||
)
|
||||
# NOTE: not the above can be triggered from
|
||||
# a (xonsh) shell using:
|
||||
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
||||
#
|
||||
# for example if you were looking to trace a `pytest` run
|
||||
# kill -SIGUSR1 @$(pgrep -f 'pytest')
|
||||
log.devx(
|
||||
'Enabling trace-trees on `SIGUSR1` '
|
||||
'since `stackscope` is installed @ \n'
|
||||
f'{stackscope!r}\n\n'
|
||||
f'With `SIGUSR1` handler\n'
|
||||
f'|_{dump_tree_on_sig}\n'
|
||||
)
|
||||
return stackscope
|
||||
|
|
|
@ -374,7 +374,7 @@ class PldRx(Struct):
|
|||
|
||||
case _:
|
||||
src_err = InternalError(
|
||||
'Unknown IPC msg ??\n\n'
|
||||
'Invalid IPC msg ??\n\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
|
@ -499,7 +499,7 @@ async def maybe_limit_plds(
|
|||
yield None
|
||||
return
|
||||
|
||||
# sanity on scoping
|
||||
# sanity check on IPC scoping
|
||||
curr_ctx: Context = current_ipc_ctx()
|
||||
assert ctx is curr_ctx
|
||||
|
||||
|
@ -510,6 +510,8 @@ async def maybe_limit_plds(
|
|||
) as msgdec:
|
||||
yield msgdec
|
||||
|
||||
# when the applied spec is unwound/removed, the same IPC-ctx
|
||||
# should still be in scope.
|
||||
curr_ctx: Context = current_ipc_ctx()
|
||||
assert ctx is curr_ctx
|
||||
|
||||
|
@ -525,16 +527,26 @@ async def drain_to_final_msg(
|
|||
list[MsgType]
|
||||
]:
|
||||
'''
|
||||
Drain IPC msgs delivered to the underlying IPC primitive's
|
||||
rx-mem-chan (eg. `Context._rx_chan`) from the runtime in
|
||||
search for a final result or error.
|
||||
Drain IPC msgs delivered to the underlying IPC context's
|
||||
rx-mem-chan (i.e. from `Context._rx_chan`) in search for a final
|
||||
`Return` or `Error` msg.
|
||||
|
||||
The motivation here is to ideally capture errors during ctxc
|
||||
conditions where a canc-request/or local error is sent but the
|
||||
local task also excepts and enters the
|
||||
`Portal.open_context().__aexit__()` block wherein we prefer to
|
||||
capture and raise any remote error or ctxc-ack as part of the
|
||||
`ctx.result()` cleanup and teardown sequence.
|
||||
Deliver the `Return` + preceding drained msgs (`list[MsgType]`)
|
||||
as a pair unless an `Error` is found, in which unpack and raise
|
||||
it.
|
||||
|
||||
The motivation here is to always capture any remote error relayed
|
||||
by the remote peer task during a ctxc condition.
|
||||
|
||||
For eg. a ctxc-request may be sent to the peer as part of the
|
||||
local task's (request for) cancellation but then that same task
|
||||
**also errors** before executing the teardown in the
|
||||
`Portal.open_context().__aexit__()` block. In such error-on-exit
|
||||
cases we want to always capture and raise any delivered remote
|
||||
error (like an expected ctxc-ACK) as part of the final
|
||||
`ctx.wait_for_result()` teardown sequence such that the
|
||||
`Context.outcome` related state always reflect what transpired
|
||||
even after ctx closure and the `.open_context()` block exit.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
@ -572,7 +584,6 @@ async def drain_to_final_msg(
|
|||
# |_from tractor.devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
# 1. we requested the cancellation and thus
|
||||
|
@ -580,13 +591,13 @@ async def drain_to_final_msg(
|
|||
# 2. WE DID NOT REQUEST that cancel and thus
|
||||
# SHOULD RAISE HERE!
|
||||
except trio.Cancelled as taskc:
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
# cancellation.
|
||||
ctx.maybe_raise(
|
||||
# TODO: when use this/
|
||||
hide_tb=hide_tb,
|
||||
# TODO: when use this?
|
||||
# from_src_exc=taskc,
|
||||
)
|
||||
|
||||
|
@ -659,7 +670,7 @@ async def drain_to_final_msg(
|
|||
# Stop()
|
||||
case Stop():
|
||||
pre_result_drained.append(msg)
|
||||
log.cancel(
|
||||
log.runtime( # normal/expected shutdown transaction
|
||||
'Remote stream terminated due to "stop" msg:\n\n'
|
||||
f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
|
@ -719,13 +730,19 @@ async def drain_to_final_msg(
|
|||
pre_result_drained.append(msg)
|
||||
# It's definitely an internal error if any other
|
||||
# msg type without a`'cid'` field arrives here!
|
||||
report: str = (
|
||||
f'Invalid or unknown msg type {type(msg)!r}!?\n'
|
||||
)
|
||||
if not msg.cid:
|
||||
raise InternalError(
|
||||
'Unexpected cid-missing msg?\n\n'
|
||||
f'{msg}\n'
|
||||
report += (
|
||||
'\nWhich also has no `.cid` field?\n'
|
||||
)
|
||||
|
||||
raise RuntimeError('Unknown msg type: {msg}')
|
||||
raise MessagingError(
|
||||
report
|
||||
+
|
||||
f'\n{msg}\n'
|
||||
)
|
||||
|
||||
else:
|
||||
log.cancel(
|
||||
|
|
Loading…
Reference in New Issue