Compare commits
9 Commits
9be821a5cf
...
547b957bbf
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 547b957bbf | |
Tyler Goodlet | d216068713 | |
Tyler Goodlet | 131e3e8157 | |
Tyler Goodlet | fc95c6719f | |
Tyler Goodlet | bef3dd9e97 | |
Tyler Goodlet | e6ccfce751 | |
Tyler Goodlet | 31207f92ee | |
Tyler Goodlet | 5f8f8e98ba | |
Tyler Goodlet | b56352b0e4 |
|
@ -4,6 +4,13 @@ import time
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
# TODO: only import these when not running from test harness?
|
||||||
|
# can we detect `pexpect` usage maybe?
|
||||||
|
# from tractor.devx._debug import (
|
||||||
|
# get_lock,
|
||||||
|
# get_debug_req,
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
def sync_pause(
|
def sync_pause(
|
||||||
use_builtin: bool = False,
|
use_builtin: bool = False,
|
||||||
|
@ -18,7 +25,13 @@ def sync_pause(
|
||||||
breakpoint(hide_tb=hide_tb)
|
breakpoint(hide_tb=hide_tb)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# TODO: maybe for testing some kind of cm style interface
|
||||||
|
# where the `._set_trace()` call doesn't happen until block
|
||||||
|
# exit?
|
||||||
|
# assert get_lock().ctx_in_debug is None
|
||||||
|
# assert get_debug_req().repl is None
|
||||||
tractor.pause_from_sync()
|
tractor.pause_from_sync()
|
||||||
|
# assert get_debug_req().repl is None
|
||||||
|
|
||||||
if error:
|
if error:
|
||||||
raise RuntimeError('yoyo sync code error')
|
raise RuntimeError('yoyo sync code error')
|
||||||
|
@ -41,10 +54,11 @@ async def start_n_sync_pause(
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
async with (
|
async with (
|
||||||
tractor.open_nursery(
|
tractor.open_nursery(
|
||||||
# NOTE: required for pausing from sync funcs
|
|
||||||
maybe_enable_greenback=True,
|
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
# loglevel='cancel',
|
maybe_enable_greenback=True,
|
||||||
|
enable_stack_on_sig=True,
|
||||||
|
# loglevel='warning',
|
||||||
|
# loglevel='devx',
|
||||||
) as an,
|
) as an,
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
|
@ -138,7 +152,9 @@ async def main() -> None:
|
||||||
# the case 2. from above still exists!
|
# the case 2. from above still exists!
|
||||||
use_builtin=True,
|
use_builtin=True,
|
||||||
),
|
),
|
||||||
abandon_on_cancel=False,
|
# TODO: with this `False` we can hang!??!
|
||||||
|
# abandon_on_cancel=False,
|
||||||
|
abandon_on_cancel=True,
|
||||||
thread_name='inline_root_bg_thread',
|
thread_name='inline_root_bg_thread',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ async def main(service_name):
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as an:
|
||||||
await an.start_actor(service_name)
|
await an.start_actor(service_name)
|
||||||
|
|
||||||
async with tractor.get_arbiter('127.0.0.1', 1616) as portal:
|
async with tractor.get_registry('127.0.0.1', 1616) as portal:
|
||||||
print(f"Arbiter is listening on {portal.channel}")
|
print(f"Arbiter is listening on {portal.channel}")
|
||||||
|
|
||||||
async with tractor.wait_for_actor(service_name) as sockaddr:
|
async with tractor.wait_for_actor(service_name) as sockaddr:
|
||||||
|
|
|
@ -0,0 +1,167 @@
|
||||||
|
'''
|
||||||
|
`tractor.devx.*` tooling sub-pkg test space.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from typing import (
|
||||||
|
Callable,
|
||||||
|
)
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from pexpect.exceptions import (
|
||||||
|
TIMEOUT,
|
||||||
|
)
|
||||||
|
from tractor._testing import (
|
||||||
|
mk_cmd,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def spawn(
|
||||||
|
start_method,
|
||||||
|
testdir: pytest.Testdir,
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
|
||||||
|
) -> Callable[[str], None]:
|
||||||
|
'''
|
||||||
|
Use the `pexpect` module shipped via `testdir.spawn()` to
|
||||||
|
run an `./examples/..` script by name.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if start_method != 'trio':
|
||||||
|
pytest.skip(
|
||||||
|
'`pexpect` based tests only supported on `trio` backend'
|
||||||
|
)
|
||||||
|
|
||||||
|
def _spawn(
|
||||||
|
cmd: str,
|
||||||
|
**mkcmd_kwargs,
|
||||||
|
):
|
||||||
|
return testdir.spawn(
|
||||||
|
cmd=mk_cmd(
|
||||||
|
cmd,
|
||||||
|
**mkcmd_kwargs,
|
||||||
|
),
|
||||||
|
expect_timeout=3,
|
||||||
|
)
|
||||||
|
|
||||||
|
# such that test-dep can pass input script name.
|
||||||
|
return _spawn
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
params=[False, True],
|
||||||
|
ids='ctl-c={}'.format,
|
||||||
|
)
|
||||||
|
def ctlc(
|
||||||
|
request,
|
||||||
|
ci_env: bool,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
use_ctlc = request.param
|
||||||
|
|
||||||
|
node = request.node
|
||||||
|
markers = node.own_markers
|
||||||
|
for mark in markers:
|
||||||
|
if mark.name == 'has_nested_actors':
|
||||||
|
pytest.skip(
|
||||||
|
f'Test {node} has nested actors and fails with Ctrl-C.\n'
|
||||||
|
f'The test can sometimes run fine locally but until'
|
||||||
|
' we solve' 'this issue this CI test will be xfail:\n'
|
||||||
|
'https://github.com/goodboy/tractor/issues/320'
|
||||||
|
)
|
||||||
|
|
||||||
|
if use_ctlc:
|
||||||
|
# XXX: disable pygments highlighting for auto-tests
|
||||||
|
# since some envs (like actions CI) will struggle
|
||||||
|
# the the added color-char encoding..
|
||||||
|
from tractor.devx._debug import TractorConfig
|
||||||
|
TractorConfig.use_pygements = False
|
||||||
|
|
||||||
|
yield use_ctlc
|
||||||
|
|
||||||
|
|
||||||
|
def expect(
|
||||||
|
child,
|
||||||
|
|
||||||
|
# normally a `pdb` prompt by default
|
||||||
|
patt: str,
|
||||||
|
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Expect wrapper that prints last seen console
|
||||||
|
data before failing.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
child.expect(
|
||||||
|
patt,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
except TIMEOUT:
|
||||||
|
before = str(child.before.decode())
|
||||||
|
print(before)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def in_prompt_msg(
|
||||||
|
prompt: str,
|
||||||
|
parts: list[str],
|
||||||
|
|
||||||
|
pause_on_false: bool = False,
|
||||||
|
err_on_false: bool = False,
|
||||||
|
print_prompt_on_false: bool = True,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Predicate check if (the prompt's) std-streams output has all
|
||||||
|
`str`-parts in it.
|
||||||
|
|
||||||
|
Can be used in test asserts for bulk matching expected
|
||||||
|
log/REPL output for a given `pdb` interact point.
|
||||||
|
|
||||||
|
'''
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
|
for part in parts:
|
||||||
|
if part not in prompt:
|
||||||
|
if pause_on_false:
|
||||||
|
import pdbp
|
||||||
|
pdbp.set_trace()
|
||||||
|
|
||||||
|
if print_prompt_on_false:
|
||||||
|
print(prompt)
|
||||||
|
|
||||||
|
if err_on_false:
|
||||||
|
raise ValueError(
|
||||||
|
f'Could not find pattern: {part!r} in `before` output?'
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: todo support terminal color-chars stripping so we can match
|
||||||
|
# against call stack frame output from the the 'll' command the like!
|
||||||
|
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
||||||
|
def assert_before(
|
||||||
|
child,
|
||||||
|
patts: list[str],
|
||||||
|
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
|
# as in before the prompt end
|
||||||
|
before: str = str(child.before.decode())
|
||||||
|
assert in_prompt_msg(
|
||||||
|
prompt=before,
|
||||||
|
parts=patts,
|
||||||
|
|
||||||
|
# since this is an "assert" helper ;)
|
||||||
|
err_on_false=True,
|
||||||
|
**kwargs
|
||||||
|
)
|
|
@ -13,11 +13,9 @@ TODO:
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import itertools
|
import itertools
|
||||||
import platform
|
import platform
|
||||||
import pathlib
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import pexpect
|
|
||||||
from pexpect.exceptions import (
|
from pexpect.exceptions import (
|
||||||
TIMEOUT,
|
TIMEOUT,
|
||||||
EOF,
|
EOF,
|
||||||
|
@ -28,12 +26,14 @@ from tractor.devx._debug import (
|
||||||
_crash_msg,
|
_crash_msg,
|
||||||
_repl_fail_msg,
|
_repl_fail_msg,
|
||||||
)
|
)
|
||||||
from tractor._testing import (
|
|
||||||
examples_dir,
|
|
||||||
)
|
|
||||||
from conftest import (
|
from conftest import (
|
||||||
_ci_env,
|
_ci_env,
|
||||||
)
|
)
|
||||||
|
from .conftest import (
|
||||||
|
expect,
|
||||||
|
in_prompt_msg,
|
||||||
|
assert_before,
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: The next great debugger audit could be done by you!
|
# TODO: The next great debugger audit could be done by you!
|
||||||
# - recurrent entry to breakpoint() from single actor *after* and an
|
# - recurrent entry to breakpoint() from single actor *after* and an
|
||||||
|
@ -52,15 +52,6 @@ if platform.system() == 'Windows':
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def mk_cmd(ex_name: str) -> str:
|
|
||||||
'''
|
|
||||||
Generate a command suitable to pass to ``pexpect.spawn()``.
|
|
||||||
|
|
||||||
'''
|
|
||||||
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
|
|
||||||
return ' '.join(['python', str(script_path)])
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: was trying to this xfail style but some weird bug i see in CI
|
# TODO: was trying to this xfail style but some weird bug i see in CI
|
||||||
# that's happening at collect time.. pretty soon gonna dump actions i'm
|
# that's happening at collect time.. pretty soon gonna dump actions i'm
|
||||||
# thinkin...
|
# thinkin...
|
||||||
|
@ -79,142 +70,9 @@ has_nested_actors = pytest.mark.has_nested_actors
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def spawn(
|
|
||||||
start_method,
|
|
||||||
testdir,
|
|
||||||
reg_addr,
|
|
||||||
) -> 'pexpect.spawn':
|
|
||||||
|
|
||||||
if start_method != 'trio':
|
|
||||||
pytest.skip(
|
|
||||||
"Debugger tests are only supported on the trio backend"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _spawn(cmd):
|
|
||||||
return testdir.spawn(
|
|
||||||
cmd=mk_cmd(cmd),
|
|
||||||
expect_timeout=3,
|
|
||||||
)
|
|
||||||
|
|
||||||
return _spawn
|
|
||||||
|
|
||||||
|
|
||||||
PROMPT = r"\(Pdb\+\)"
|
PROMPT = r"\(Pdb\+\)"
|
||||||
|
|
||||||
|
|
||||||
def expect(
|
|
||||||
child,
|
|
||||||
|
|
||||||
# prompt by default
|
|
||||||
patt: str = PROMPT,
|
|
||||||
|
|
||||||
**kwargs,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Expect wrapper that prints last seen console
|
|
||||||
data before failing.
|
|
||||||
|
|
||||||
'''
|
|
||||||
try:
|
|
||||||
child.expect(
|
|
||||||
patt,
|
|
||||||
**kwargs,
|
|
||||||
)
|
|
||||||
except TIMEOUT:
|
|
||||||
before = str(child.before.decode())
|
|
||||||
print(before)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def in_prompt_msg(
|
|
||||||
prompt: str,
|
|
||||||
parts: list[str],
|
|
||||||
|
|
||||||
pause_on_false: bool = False,
|
|
||||||
print_prompt_on_false: bool = True,
|
|
||||||
|
|
||||||
) -> bool:
|
|
||||||
'''
|
|
||||||
Predicate check if (the prompt's) std-streams output has all
|
|
||||||
`str`-parts in it.
|
|
||||||
|
|
||||||
Can be used in test asserts for bulk matching expected
|
|
||||||
log/REPL output for a given `pdb` interact point.
|
|
||||||
|
|
||||||
'''
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
for part in parts:
|
|
||||||
if part not in prompt:
|
|
||||||
if pause_on_false:
|
|
||||||
import pdbp
|
|
||||||
pdbp.set_trace()
|
|
||||||
|
|
||||||
if print_prompt_on_false:
|
|
||||||
print(prompt)
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: todo support terminal color-chars stripping so we can match
|
|
||||||
# against call stack frame output from the the 'll' command the like!
|
|
||||||
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
|
||||||
def assert_before(
|
|
||||||
child,
|
|
||||||
patts: list[str],
|
|
||||||
|
|
||||||
**kwargs,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
# as in before the prompt end
|
|
||||||
before: str = str(child.before.decode())
|
|
||||||
assert in_prompt_msg(
|
|
||||||
prompt=before,
|
|
||||||
parts=patts,
|
|
||||||
|
|
||||||
**kwargs
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(
|
|
||||||
params=[False, True],
|
|
||||||
ids='ctl-c={}'.format,
|
|
||||||
)
|
|
||||||
def ctlc(
|
|
||||||
request,
|
|
||||||
ci_env: bool,
|
|
||||||
|
|
||||||
) -> bool:
|
|
||||||
|
|
||||||
use_ctlc = request.param
|
|
||||||
|
|
||||||
node = request.node
|
|
||||||
markers = node.own_markers
|
|
||||||
for mark in markers:
|
|
||||||
if mark.name == 'has_nested_actors':
|
|
||||||
pytest.skip(
|
|
||||||
f'Test {node} has nested actors and fails with Ctrl-C.\n'
|
|
||||||
f'The test can sometimes run fine locally but until'
|
|
||||||
' we solve' 'this issue this CI test will be xfail:\n'
|
|
||||||
'https://github.com/goodboy/tractor/issues/320'
|
|
||||||
)
|
|
||||||
|
|
||||||
if use_ctlc:
|
|
||||||
# XXX: disable pygments highlighting for auto-tests
|
|
||||||
# since some envs (like actions CI) will struggle
|
|
||||||
# the the added color-char encoding..
|
|
||||||
from tractor.devx._debug import TractorConfig
|
|
||||||
TractorConfig.use_pygements = False
|
|
||||||
|
|
||||||
yield use_ctlc
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'user_in_out',
|
'user_in_out',
|
||||||
[
|
[
|
||||||
|
@ -279,7 +137,7 @@ def test_root_actor_bp(spawn, user_in_out):
|
||||||
child.expect('\r\n')
|
child.expect('\r\n')
|
||||||
|
|
||||||
# process should exit
|
# process should exit
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
if expect_err_str is None:
|
if expect_err_str is None:
|
||||||
assert 'Error' not in str(child.before)
|
assert 'Error' not in str(child.before)
|
||||||
|
@ -299,7 +157,9 @@ def do_ctlc(
|
||||||
# needs some further investigation potentially...
|
# needs some further investigation potentially...
|
||||||
expect_prompt: bool = not _ci_env,
|
expect_prompt: bool = not _ci_env,
|
||||||
|
|
||||||
) -> None:
|
) -> str|None:
|
||||||
|
|
||||||
|
before: str|None = None
|
||||||
|
|
||||||
# make sure ctl-c sends don't do anything but repeat output
|
# make sure ctl-c sends don't do anything but repeat output
|
||||||
for _ in range(count):
|
for _ in range(count):
|
||||||
|
@ -309,15 +169,18 @@ def do_ctlc(
|
||||||
# TODO: figure out why this makes CI fail..
|
# TODO: figure out why this makes CI fail..
|
||||||
# if you run this test manually it works just fine..
|
# if you run this test manually it works just fine..
|
||||||
if expect_prompt:
|
if expect_prompt:
|
||||||
before = str(child.before.decode())
|
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
|
before = str(child.before.decode())
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
|
|
||||||
if patt:
|
if patt:
|
||||||
# should see the last line on console
|
# should see the last line on console
|
||||||
assert patt in before
|
assert patt in before
|
||||||
|
|
||||||
|
# return the console content up to the final prompt
|
||||||
|
return before
|
||||||
|
|
||||||
|
|
||||||
def test_root_actor_bp_forever(
|
def test_root_actor_bp_forever(
|
||||||
spawn,
|
spawn,
|
||||||
|
@ -358,7 +221,7 @@ def test_root_actor_bp_forever(
|
||||||
|
|
||||||
# quit out of the loop
|
# quit out of the loop
|
||||||
child.sendline('q')
|
child.sendline('q')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
@ -423,7 +286,7 @@ def test_subactor_error(
|
||||||
child.expect('\r\n')
|
child.expect('\r\n')
|
||||||
|
|
||||||
# process should exit
|
# process should exit
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
def test_subactor_breakpoint(
|
def test_subactor_breakpoint(
|
||||||
|
@ -486,7 +349,7 @@ def test_subactor_breakpoint(
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
# process should exit
|
# process should exit
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert in_prompt_msg(
|
assert in_prompt_msg(
|
||||||
|
@ -629,7 +492,7 @@ def test_multi_subactors(
|
||||||
|
|
||||||
# process should exit
|
# process should exit
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
# repeat of previous multierror for final output
|
# repeat of previous multierror for final output
|
||||||
assert_before(child, [
|
assert_before(child, [
|
||||||
|
@ -769,7 +632,7 @@ def test_multi_daemon_subactors(
|
||||||
)
|
)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
@has_nested_actors
|
@has_nested_actors
|
||||||
|
@ -845,7 +708,7 @@ def test_multi_subactors_root_errors(
|
||||||
])
|
])
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
assert_before(child, [
|
assert_before(child, [
|
||||||
# "Attaching to pdb in crashed actor: ('root'",
|
# "Attaching to pdb in crashed actor: ('root'",
|
||||||
|
@ -975,7 +838,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
||||||
|
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
try:
|
try:
|
||||||
child.expect(pexpect.EOF, timeout=0.5)
|
child.expect(EOF, timeout=0.5)
|
||||||
break
|
break
|
||||||
except TIMEOUT:
|
except TIMEOUT:
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
@ -1017,7 +880,7 @@ def test_root_cancels_child_context_during_startup(
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
def test_different_debug_mode_per_actor(
|
def test_different_debug_mode_per_actor(
|
||||||
|
@ -1038,7 +901,7 @@ def test_different_debug_mode_per_actor(
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
|
@ -1085,10 +948,10 @@ def test_pause_from_sync(
|
||||||
)
|
)
|
||||||
if ctlc:
|
if ctlc:
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
# ^NOTE^ subactor not spawned yet; don't need extra delay.
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
|
|
||||||
# first `await tractor.pause()` inside `p.open_context()` body
|
# first `await tractor.pause()` inside `p.open_context()` body
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
|
|
||||||
|
@ -1109,7 +972,27 @@ def test_pause_from_sync(
|
||||||
)
|
)
|
||||||
|
|
||||||
if ctlc:
|
if ctlc:
|
||||||
do_ctlc(child)
|
do_ctlc(
|
||||||
|
child,
|
||||||
|
# NOTE: setting this to 0 (or some other sufficient
|
||||||
|
# small val) can cause the test to fail since the
|
||||||
|
# `subactor` suffers a race where the root/parent
|
||||||
|
# sends an actor-cancel prior to it hitting its pause
|
||||||
|
# point; by def the value is 0.1
|
||||||
|
delay=0.4,
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX, fwiw without a brief sleep here the SIGINT might actually
|
||||||
|
# trigger "subactor" cancellation by its parent before the
|
||||||
|
# shield-handler is engaged.
|
||||||
|
#
|
||||||
|
# => similar to the `delay` input to `do_ctlc()` below, setting
|
||||||
|
# this too low can cause the test to fail since the `subactor`
|
||||||
|
# suffers a race where the root/parent sends an actor-cancel
|
||||||
|
# prior to the context task hitting its pause point (and thus
|
||||||
|
# engaging the `sigint_shield()` handler in time); this value
|
||||||
|
# seems be good enuf?
|
||||||
|
time.sleep(0.6)
|
||||||
|
|
||||||
# one of the bg thread or subactor should have
|
# one of the bg thread or subactor should have
|
||||||
# `Lock.acquire()`-ed
|
# `Lock.acquire()`-ed
|
||||||
|
@ -1128,32 +1011,48 @@ def test_pause_from_sync(
|
||||||
"('root'",
|
"('root'",
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
conts: int = 0 # for debugging below matching logic on failure
|
||||||
while attach_patts:
|
while attach_patts:
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
conts += 1
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
for key in attach_patts.copy():
|
for key in attach_patts:
|
||||||
if key in before:
|
if key in before:
|
||||||
|
attach_key: str = key
|
||||||
expected_patts: str = attach_patts.pop(key)
|
expected_patts: str = attach_patts.pop(key)
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
[_pause_msg] + expected_patts
|
[_pause_msg]
|
||||||
|
+
|
||||||
|
expected_patts
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
|
pytest.fail(
|
||||||
|
f'No keys found?\n\n'
|
||||||
|
f'{attach_patts.keys()}\n\n'
|
||||||
|
f'{before}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# ensure no other task/threads engaged a REPL
|
# ensure no other task/threads engaged a REPL
|
||||||
# at the same time as the one that was detected above.
|
# at the same time as the one that was detected above.
|
||||||
for key, other_patts in attach_patts.items():
|
for key, other_patts in attach_patts.copy().items():
|
||||||
assert not in_prompt_msg(
|
assert not in_prompt_msg(
|
||||||
before,
|
before,
|
||||||
other_patts,
|
other_patts,
|
||||||
)
|
)
|
||||||
|
|
||||||
if ctlc:
|
if ctlc:
|
||||||
do_ctlc(child)
|
do_ctlc(
|
||||||
|
child,
|
||||||
|
patt=attach_key,
|
||||||
|
# NOTE same as comment above
|
||||||
|
delay=0.4,
|
||||||
|
)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
def test_post_mortem_api(
|
def test_post_mortem_api(
|
||||||
|
@ -1258,7 +1157,7 @@ def test_post_mortem_api(
|
||||||
# )
|
# )
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
def test_shield_pause(
|
def test_shield_pause(
|
||||||
|
@ -1333,7 +1232,7 @@ def test_shield_pause(
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(EOF)
|
||||||
|
|
||||||
|
|
||||||
# TODO: better error for "non-ideal" usage from the root actor.
|
# TODO: better error for "non-ideal" usage from the root actor.
|
|
@ -0,0 +1,120 @@
|
||||||
|
'''
|
||||||
|
That "native" runtime-hackin toolset better be dang useful!
|
||||||
|
|
||||||
|
Verify the funtion of a variety of "developer-experience" tools we
|
||||||
|
offer from the `.devx` sub-pkg:
|
||||||
|
|
||||||
|
- use of the lovely `stackscope` for dumping actor `trio`-task trees
|
||||||
|
during operation and hangs.
|
||||||
|
|
||||||
|
TODO:
|
||||||
|
- demonstration of `CallerInfo` call stack frame filtering such that
|
||||||
|
for logging and REPL purposes a user sees exactly the layers needed
|
||||||
|
when debugging a problem inside the stack vs. in their app.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
|
||||||
|
from .conftest import (
|
||||||
|
expect,
|
||||||
|
assert_before,
|
||||||
|
# in_prompt_msg,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_shield_pause(
|
||||||
|
spawn,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||||
|
already cancelled `trio.CancelScope` and that you can step to the
|
||||||
|
next checkpoint wherein the cancelled will get raised.
|
||||||
|
|
||||||
|
'''
|
||||||
|
child = spawn(
|
||||||
|
'shield_hang_in_sub'
|
||||||
|
)
|
||||||
|
expect(
|
||||||
|
child,
|
||||||
|
'Yo my child hanging..?',
|
||||||
|
)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
'Entering shield sleep..',
|
||||||
|
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
'Sending SIGUSR1 to see a tree-trace!',
|
||||||
|
)
|
||||||
|
os.kill(
|
||||||
|
child.pid,
|
||||||
|
signal.SIGUSR1,
|
||||||
|
)
|
||||||
|
expect(
|
||||||
|
child,
|
||||||
|
# end-of-tree delimiter
|
||||||
|
"------ \('root', ",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
'Trying to dump `stackscope` tree..',
|
||||||
|
'Dumping `stackscope` tree for actor',
|
||||||
|
"('root'", # uid line
|
||||||
|
|
||||||
|
# parent block point (non-shielded)
|
||||||
|
'await trio.sleep_forever() # in root',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# expect(
|
||||||
|
# child,
|
||||||
|
# # relay to the sub should be reported
|
||||||
|
# 'Relaying `SIGUSR1`[10] to sub-actor',
|
||||||
|
# )
|
||||||
|
|
||||||
|
expect(
|
||||||
|
child,
|
||||||
|
# end-of-tree delimiter
|
||||||
|
"------ \('hanger', ",
|
||||||
|
)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
# relay to the sub should be reported
|
||||||
|
'Relaying `SIGUSR1`[10] to sub-actor',
|
||||||
|
|
||||||
|
"('hanger'", # uid line
|
||||||
|
|
||||||
|
# hanger LOC where it's shield-halted
|
||||||
|
'await trio.sleep_forever() # in subactor',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
# simulate the user sending a ctl-c to the hanging program.
|
||||||
|
# this should result in the terminator kicking in since
|
||||||
|
# the sub is shield blocking and can't respond to SIGINT.
|
||||||
|
os.kill(
|
||||||
|
child.pid,
|
||||||
|
signal.SIGINT,
|
||||||
|
)
|
||||||
|
expect(
|
||||||
|
child,
|
||||||
|
'Shutting down actor runtime',
|
||||||
|
timeout=6,
|
||||||
|
)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
'raise KeyboardInterrupt',
|
||||||
|
# 'Shutting down actor runtime',
|
||||||
|
'#T-800 deployed to collect zombie B0',
|
||||||
|
"'--uid', \"('hanger',",
|
||||||
|
]
|
||||||
|
)
|
|
@ -91,7 +91,8 @@ def test_ipc_channel_break_during_stream(
|
||||||
|
|
||||||
# non-`trio` spawners should never hit the hang condition that
|
# non-`trio` spawners should never hit the hang condition that
|
||||||
# requires the user to do ctl-c to cancel the actor tree.
|
# requires the user to do ctl-c to cancel the actor tree.
|
||||||
expect_final_exc = trio.ClosedResourceError
|
# expect_final_exc = trio.ClosedResourceError
|
||||||
|
expect_final_exc = tractor.TransportClosed
|
||||||
|
|
||||||
mod: ModuleType = import_path(
|
mod: ModuleType = import_path(
|
||||||
examples_dir() / 'advanced_faults'
|
examples_dir() / 'advanced_faults'
|
||||||
|
@ -157,7 +158,7 @@ def test_ipc_channel_break_during_stream(
|
||||||
if pre_aclose_msgstream:
|
if pre_aclose_msgstream:
|
||||||
expect_final_exc = KeyboardInterrupt
|
expect_final_exc = KeyboardInterrupt
|
||||||
|
|
||||||
# NOTE when the parent IPC side dies (even if the child's does as well
|
# NOTE when the parent IPC side dies (even if the child does as well
|
||||||
# but the child fails BEFORE the parent) we always expect the
|
# but the child fails BEFORE the parent) we always expect the
|
||||||
# IPC layer to raise a closed-resource, NEVER do we expect
|
# IPC layer to raise a closed-resource, NEVER do we expect
|
||||||
# a stop msg since the parent-side ctx apis will error out
|
# a stop msg since the parent-side ctx apis will error out
|
||||||
|
@ -169,7 +170,8 @@ def test_ipc_channel_break_during_stream(
|
||||||
and
|
and
|
||||||
ipc_break['break_child_ipc_after'] is False
|
ipc_break['break_child_ipc_after'] is False
|
||||||
):
|
):
|
||||||
expect_final_exc = trio.ClosedResourceError
|
# expect_final_exc = trio.ClosedResourceError
|
||||||
|
expect_final_exc = tractor.TransportClosed
|
||||||
|
|
||||||
# BOTH but, PARENT breaks FIRST
|
# BOTH but, PARENT breaks FIRST
|
||||||
elif (
|
elif (
|
||||||
|
@ -180,7 +182,8 @@ def test_ipc_channel_break_during_stream(
|
||||||
ipc_break['break_parent_ipc_after']
|
ipc_break['break_parent_ipc_after']
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
expect_final_exc = trio.ClosedResourceError
|
# expect_final_exc = trio.ClosedResourceError
|
||||||
|
expect_final_exc = tractor.TransportClosed
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
expected_exception=(
|
expected_exception=(
|
||||||
|
@ -199,8 +202,8 @@ def test_ipc_channel_break_during_stream(
|
||||||
**ipc_break,
|
**ipc_break,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
except KeyboardInterrupt as kbi:
|
except KeyboardInterrupt as _kbi:
|
||||||
_err = kbi
|
kbi = _kbi
|
||||||
if expect_final_exc is not KeyboardInterrupt:
|
if expect_final_exc is not KeyboardInterrupt:
|
||||||
pytest.fail(
|
pytest.fail(
|
||||||
'Rxed unexpected KBI !?\n'
|
'Rxed unexpected KBI !?\n'
|
||||||
|
@ -209,6 +212,21 @@ def test_ipc_channel_break_during_stream(
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
except tractor.TransportClosed as _tc:
|
||||||
|
tc = _tc
|
||||||
|
if expect_final_exc is KeyboardInterrupt:
|
||||||
|
pytest.fail(
|
||||||
|
'Unexpected transport failure !?\n'
|
||||||
|
f'{repr(tc)}'
|
||||||
|
)
|
||||||
|
cause: Exception = tc.__cause__
|
||||||
|
assert (
|
||||||
|
type(cause) is trio.ClosedResourceError
|
||||||
|
and
|
||||||
|
cause.args[0] == 'another task closed this fd'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
# get raw instance from pytest wrapper
|
# get raw instance from pytest wrapper
|
||||||
value = excinfo.value
|
value = excinfo.value
|
||||||
if isinstance(value, ExceptionGroup):
|
if isinstance(value, ExceptionGroup):
|
||||||
|
|
|
@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
|
||||||
portal = await n.start_actor('actor', enable_modules=[__name__])
|
portal = await n.start_actor('actor', enable_modules=[__name__])
|
||||||
uid = portal.channel.uid
|
uid = portal.channel.uid
|
||||||
|
|
||||||
async with tractor.get_arbiter(*reg_addr) as aportal:
|
async with tractor.get_registry(*reg_addr) as aportal:
|
||||||
# this local actor should be the arbiter
|
# this local actor should be the arbiter
|
||||||
assert actor is aportal.actor
|
assert actor is aportal.actor
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_root_actor(
|
||||||
registry_addrs=[reg_addr],
|
registry_addrs=[reg_addr],
|
||||||
):
|
):
|
||||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
async with tractor.get_registry(*reg_addr) as portal:
|
||||||
# runtime needs to be up to call this
|
# runtime needs to be up to call this
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
|
|
||||||
|
@ -298,7 +298,7 @@ async def close_chans_before_nursery(
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_root_actor(
|
||||||
registry_addrs=[reg_addr],
|
registry_addrs=[reg_addr],
|
||||||
):
|
):
|
||||||
async with tractor.get_arbiter(*reg_addr) as aportal:
|
async with tractor.get_registry(*reg_addr) as aportal:
|
||||||
try:
|
try:
|
||||||
get_reg = partial(unpack_reg, aportal)
|
get_reg = partial(unpack_reg, aportal)
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
|
||||||
"Verify waiting on the arbiter to register itself using a local portal."
|
"Verify waiting on the arbiter to register itself using a local portal."
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
assert actor.is_arbiter
|
assert actor.is_arbiter
|
||||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
async with tractor.get_registry(*reg_addr) as portal:
|
||||||
assert isinstance(portal, tractor._portal.LocalPortal)
|
assert isinstance(portal, tractor._portal.LocalPortal)
|
||||||
|
|
||||||
with trio.fail_after(0.2):
|
with trio.fail_after(0.2):
|
||||||
|
|
|
@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_cancel_remote_arbiter(daemon, reg_addr):
|
async def test_cancel_remote_arbiter(daemon, reg_addr):
|
||||||
assert not tractor.current_actor().is_arbiter
|
assert not tractor.current_actor().is_arbiter
|
||||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
async with tractor.get_registry(*reg_addr) as portal:
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
|
||||||
|
|
||||||
# no arbiter socket should exist
|
# no arbiter socket should exist
|
||||||
with pytest.raises(OSError):
|
with pytest.raises(OSError):
|
||||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
async with tractor.get_registry(*reg_addr) as portal:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ from ._streaming import (
|
||||||
stream as stream,
|
stream as stream,
|
||||||
)
|
)
|
||||||
from ._discovery import (
|
from ._discovery import (
|
||||||
get_arbiter as get_arbiter,
|
get_registry as get_registry,
|
||||||
find_actor as find_actor,
|
find_actor as find_actor,
|
||||||
wait_for_actor as wait_for_actor,
|
wait_for_actor as wait_for_actor,
|
||||||
query_actor as query_actor,
|
query_actor as query_actor,
|
||||||
|
|
|
@ -2376,8 +2376,9 @@ async def open_context_from_portal(
|
||||||
and ctx.cancel_acked
|
and ctx.cancel_acked
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context cancelled by {ctx.side!r}-side task\n'
|
f'Context cancelled by local {ctx.side!r}-side task\n'
|
||||||
f'|_{ctx._task}\n\n'
|
f'c)>\n'
|
||||||
|
f' |_{ctx._task}\n\n'
|
||||||
f'{repr(scope_err)}\n'
|
f'{repr(scope_err)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2393,8 +2394,10 @@ async def open_context_from_portal(
|
||||||
# type_only=True,
|
# type_only=True,
|
||||||
)
|
)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context terminated due to local {ctx.side!r}-side error:\n\n'
|
f'Context terminated due to {ctx.side!r}-side\n\n'
|
||||||
f'{ctx.chan.uid} => {outcome_str}\n'
|
# TODO: do an x)> on err and c)> only for ctxc?
|
||||||
|
f'c)> {outcome_str}\n'
|
||||||
|
f' |_{ctx.repr_rpc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# FINALLY, remove the context from runtime tracking and
|
# FINALLY, remove the context from runtime tracking and
|
||||||
|
|
|
@ -26,8 +26,8 @@ from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
import warnings
|
|
||||||
|
|
||||||
|
from tractor.log import get_logger
|
||||||
from .trionics import gather_contexts
|
from .trionics import gather_contexts
|
||||||
from ._ipc import _connect_chan, Channel
|
from ._ipc import _connect_chan, Channel
|
||||||
from ._portal import (
|
from ._portal import (
|
||||||
|
@ -40,11 +40,13 @@ from ._state import (
|
||||||
_runtime_vars,
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_registry(
|
async def get_registry(
|
||||||
host: str,
|
host: str,
|
||||||
|
@ -56,14 +58,12 @@ async def get_registry(
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Return a portal instance connected to a local or remote
|
Return a portal instance connected to a local or remote
|
||||||
arbiter.
|
registry-service actor; if a connection already exists re-use it
|
||||||
|
(presumably to call a `.register_actor()` registry runtime RPC
|
||||||
|
ep).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
|
||||||
if not actor:
|
|
||||||
raise RuntimeError("No actor instance has been defined yet?")
|
|
||||||
|
|
||||||
if actor.is_registrar:
|
if actor.is_registrar:
|
||||||
# we're already the arbiter
|
# we're already the arbiter
|
||||||
# (likely a re-entrant call from the arbiter actor)
|
# (likely a re-entrant call from the arbiter actor)
|
||||||
|
@ -72,6 +72,8 @@ async def get_registry(
|
||||||
Channel((host, port))
|
Channel((host, port))
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
# TODO: try to look pre-existing connection from
|
||||||
|
# `Actor._peers` and use it instead?
|
||||||
async with (
|
async with (
|
||||||
_connect_chan(host, port) as chan,
|
_connect_chan(host, port) as chan,
|
||||||
open_portal(chan) as regstr_ptl,
|
open_portal(chan) as regstr_ptl,
|
||||||
|
@ -80,19 +82,6 @@ async def get_registry(
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: deprecate and this remove _arbiter form!
|
|
||||||
@acm
|
|
||||||
async def get_arbiter(*args, **kwargs):
|
|
||||||
warnings.warn(
|
|
||||||
'`tractor.get_arbiter()` is now deprecated!\n'
|
|
||||||
'Use `.get_registry()` instead!',
|
|
||||||
DeprecationWarning,
|
|
||||||
stacklevel=2,
|
|
||||||
)
|
|
||||||
async with get_registry(*args, **kwargs) as to_yield:
|
|
||||||
yield to_yield
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_root(
|
async def get_root(
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -110,22 +99,53 @@ async def get_root(
|
||||||
yield portal
|
yield portal
|
||||||
|
|
||||||
|
|
||||||
|
def get_peer_by_name(
|
||||||
|
name: str,
|
||||||
|
# uuid: str|None = None,
|
||||||
|
|
||||||
|
) -> list[Channel]|None: # at least 1
|
||||||
|
'''
|
||||||
|
Scan for an existing connection (set) to a named actor
|
||||||
|
and return any channels from `Actor._peers`.
|
||||||
|
|
||||||
|
This is an optimization method over querying the registrar for
|
||||||
|
the same info.
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor: Actor = current_actor()
|
||||||
|
to_scan: dict[tuple, list[Channel]] = actor._peers.copy()
|
||||||
|
pchan: Channel|None = actor._parent_chan
|
||||||
|
if pchan:
|
||||||
|
to_scan[pchan.uid].append(pchan)
|
||||||
|
|
||||||
|
for aid, chans in to_scan.items():
|
||||||
|
_, peer_name = aid
|
||||||
|
if name == peer_name:
|
||||||
|
if not chans:
|
||||||
|
log.warning(
|
||||||
|
'No IPC chans for matching peer {peer_name}\n'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
return chans
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def query_actor(
|
async def query_actor(
|
||||||
name: str,
|
name: str,
|
||||||
arbiter_sockaddr: tuple[str, int] | None = None,
|
regaddr: tuple[str, int]|None = None,
|
||||||
regaddr: tuple[str, int] | None = None,
|
|
||||||
|
|
||||||
) -> AsyncGenerator[
|
) -> AsyncGenerator[
|
||||||
tuple[str, int] | None,
|
tuple[str, int]|None,
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Make a transport address lookup for an actor name to a specific
|
Lookup a transport address (by actor name) via querying a registrar
|
||||||
registrar.
|
listening @ `regaddr`.
|
||||||
|
|
||||||
Returns the (socket) address or ``None`` if no entry under that
|
Returns the transport protocol (socket) address or `None` if no
|
||||||
name exists for the given registrar listening @ `regaddr`.
|
entry under that name exists.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
@ -137,14 +157,10 @@ async def query_actor(
|
||||||
'The current actor IS the registry!?'
|
'The current actor IS the registry!?'
|
||||||
)
|
)
|
||||||
|
|
||||||
if arbiter_sockaddr is not None:
|
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||||
warnings.warn(
|
if maybe_peers:
|
||||||
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
|
yield maybe_peers[0].raddr
|
||||||
'Use `registry_addrs: list[tuple]` instead!',
|
return
|
||||||
DeprecationWarning,
|
|
||||||
stacklevel=2,
|
|
||||||
)
|
|
||||||
regaddr: list[tuple[str, int]] = arbiter_sockaddr
|
|
||||||
|
|
||||||
reg_portal: Portal
|
reg_portal: Portal
|
||||||
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
|
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
|
||||||
|
@ -159,10 +175,28 @@ async def query_actor(
|
||||||
yield sockaddr
|
yield sockaddr
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def maybe_open_portal(
|
||||||
|
addr: tuple[str, int],
|
||||||
|
name: str,
|
||||||
|
):
|
||||||
|
async with query_actor(
|
||||||
|
name=name,
|
||||||
|
regaddr=addr,
|
||||||
|
) as sockaddr:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if sockaddr:
|
||||||
|
async with _connect_chan(*sockaddr) as chan:
|
||||||
|
async with open_portal(chan) as portal:
|
||||||
|
yield portal
|
||||||
|
else:
|
||||||
|
yield None
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def find_actor(
|
async def find_actor(
|
||||||
name: str,
|
name: str,
|
||||||
arbiter_sockaddr: tuple[str, int]|None = None,
|
|
||||||
registry_addrs: list[tuple[str, int]]|None = None,
|
registry_addrs: list[tuple[str, int]]|None = None,
|
||||||
|
|
||||||
only_first: bool = True,
|
only_first: bool = True,
|
||||||
|
@ -179,29 +213,12 @@ async def find_actor(
|
||||||
known to the arbiter.
|
known to the arbiter.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if arbiter_sockaddr is not None:
|
# optimization path, use any pre-existing peer channel
|
||||||
warnings.warn(
|
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||||
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
|
if maybe_peers and only_first:
|
||||||
'Use `registry_addrs: list[tuple]` instead!',
|
async with open_portal(maybe_peers[0]) as peer_portal:
|
||||||
DeprecationWarning,
|
yield peer_portal
|
||||||
stacklevel=2,
|
return
|
||||||
)
|
|
||||||
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def maybe_open_portal_from_reg_addr(
|
|
||||||
addr: tuple[str, int],
|
|
||||||
):
|
|
||||||
async with query_actor(
|
|
||||||
name=name,
|
|
||||||
regaddr=addr,
|
|
||||||
) as sockaddr:
|
|
||||||
if sockaddr:
|
|
||||||
async with _connect_chan(*sockaddr) as chan:
|
|
||||||
async with open_portal(chan) as portal:
|
|
||||||
yield portal
|
|
||||||
else:
|
|
||||||
yield None
|
|
||||||
|
|
||||||
if not registry_addrs:
|
if not registry_addrs:
|
||||||
# XXX NOTE: make sure to dynamically read the value on
|
# XXX NOTE: make sure to dynamically read the value on
|
||||||
|
@ -217,10 +234,13 @@ async def find_actor(
|
||||||
maybe_portals: list[
|
maybe_portals: list[
|
||||||
AsyncContextManager[tuple[str, int]]
|
AsyncContextManager[tuple[str, int]]
|
||||||
] = list(
|
] = list(
|
||||||
maybe_open_portal_from_reg_addr(addr)
|
maybe_open_portal(
|
||||||
|
addr=addr,
|
||||||
|
name=name,
|
||||||
|
)
|
||||||
for addr in registry_addrs
|
for addr in registry_addrs
|
||||||
)
|
)
|
||||||
|
portals: list[Portal]
|
||||||
async with gather_contexts(
|
async with gather_contexts(
|
||||||
mngrs=maybe_portals,
|
mngrs=maybe_portals,
|
||||||
) as portals:
|
) as portals:
|
||||||
|
@ -254,31 +274,31 @@ async def find_actor(
|
||||||
@acm
|
@acm
|
||||||
async def wait_for_actor(
|
async def wait_for_actor(
|
||||||
name: str,
|
name: str,
|
||||||
arbiter_sockaddr: tuple[str, int] | None = None,
|
|
||||||
registry_addr: tuple[str, int] | None = None,
|
registry_addr: tuple[str, int] | None = None,
|
||||||
|
|
||||||
) -> AsyncGenerator[Portal, None]:
|
) -> AsyncGenerator[Portal, None]:
|
||||||
'''
|
'''
|
||||||
Wait on an actor to register with the arbiter.
|
Wait on at least one peer actor to register `name` with the
|
||||||
|
registrar, yield a `Portal to the first registree.
|
||||||
A portal to the first registered actor is returned.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
|
||||||
if arbiter_sockaddr is not None:
|
# optimization path, use any pre-existing peer channel
|
||||||
warnings.warn(
|
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||||
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
|
if maybe_peers:
|
||||||
'Use `registry_addr: tuple` instead!',
|
async with open_portal(maybe_peers[0]) as peer_portal:
|
||||||
DeprecationWarning,
|
yield peer_portal
|
||||||
stacklevel=2,
|
return
|
||||||
)
|
|
||||||
registry_addr: tuple[str, int] = arbiter_sockaddr
|
|
||||||
|
|
||||||
|
regaddr: tuple[str, int] = (
|
||||||
|
registry_addr
|
||||||
|
or
|
||||||
|
actor.reg_addrs[0]
|
||||||
|
)
|
||||||
# TODO: use `.trionics.gather_contexts()` like
|
# TODO: use `.trionics.gather_contexts()` like
|
||||||
# above in `find_actor()` as well?
|
# above in `find_actor()` as well?
|
||||||
reg_portal: Portal
|
reg_portal: Portal
|
||||||
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
|
|
||||||
async with get_registry(*regaddr) as reg_portal:
|
async with get_registry(*regaddr) as reg_portal:
|
||||||
sockaddrs = await reg_portal.run_from_ns(
|
sockaddrs = await reg_portal.run_from_ns(
|
||||||
'self',
|
'self',
|
||||||
|
|
|
@ -243,6 +243,7 @@ def _trio_main(
|
||||||
nest_from_op(
|
nest_from_op(
|
||||||
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
||||||
tree_str=actor_info,
|
tree_str=actor_info,
|
||||||
|
back_from_op=1,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -263,11 +263,11 @@ class Portal:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
reminfo: str = (
|
reminfo: str = (
|
||||||
f'Portal.cancel_actor() => {self.channel.uid}\n'
|
f'c)=> {self.channel.uid}\n'
|
||||||
f'|_{chan}\n'
|
f' |_{chan}\n'
|
||||||
)
|
)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Requesting runtime cancel for peer\n\n'
|
f'Requesting actor-runtime cancel for peer\n\n'
|
||||||
f'{reminfo}'
|
f'{reminfo}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
# enables the multi-process debugger support
|
# enables the multi-process debugger support
|
||||||
debug_mode: bool = False,
|
debug_mode: bool = False,
|
||||||
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
|
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
|
||||||
enable_stack_on_sig: bool = False,
|
enable_stack_on_sig: bool = False,
|
||||||
|
|
||||||
# internal logging
|
# internal logging
|
||||||
|
@ -233,14 +233,8 @@ async def open_root_actor(
|
||||||
and
|
and
|
||||||
enable_stack_on_sig
|
enable_stack_on_sig
|
||||||
):
|
):
|
||||||
try:
|
from .devx._stackscope import enable_stack_on_sig
|
||||||
logger.info('Enabling `stackscope` traces on SIGUSR1')
|
enable_stack_on_sig()
|
||||||
from .devx import enable_stack_on_sig
|
|
||||||
enable_stack_on_sig()
|
|
||||||
except ImportError:
|
|
||||||
logger.warning(
|
|
||||||
'`stackscope` not installed for use in debug mode!'
|
|
||||||
)
|
|
||||||
|
|
||||||
# closed into below ping task-func
|
# closed into below ping task-func
|
||||||
ponged_addrs: list[tuple[str, int]] = []
|
ponged_addrs: list[tuple[str, int]] = []
|
||||||
|
|
|
@ -115,25 +115,26 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
The fundamental "runtime" concurrency primitive.
|
The fundamental "runtime" concurrency primitive.
|
||||||
|
|
||||||
An *actor* is the combination of a regular Python process executing
|
An "actor" is the combination of a regular Python process
|
||||||
a ``trio`` task tree, communicating with other actors through
|
executing a `trio.run()` task tree, communicating with other
|
||||||
"memory boundary portals" - which provide a native async API around
|
"actors" through "memory boundary portals": `Portal`, which
|
||||||
IPC transport "channels" which themselves encapsulate various
|
provide a high-level async API around IPC "channels" (`Channel`)
|
||||||
(swappable) network protocols.
|
which themselves encapsulate various (swappable) network
|
||||||
|
transport protocols for sending msgs between said memory domains
|
||||||
|
(processes, hosts, non-GIL threads).
|
||||||
|
|
||||||
|
Each "actor" is `trio.run()` scheduled "runtime" composed of many
|
||||||
Each "actor" is ``trio.run()`` scheduled "runtime" composed of
|
concurrent tasks in a single thread. The "runtime" tasks conduct
|
||||||
many concurrent tasks in a single thread. The "runtime" tasks
|
a slew of low(er) level functions to make it possible for message
|
||||||
conduct a slew of low(er) level functions to make it possible
|
passing between actors as well as the ability to create new
|
||||||
for message passing between actors as well as the ability to
|
actors (aka new "runtimes" in new processes which are supervised
|
||||||
create new actors (aka new "runtimes" in new processes which
|
via an "actor-nursery" construct). Each task which sends messages
|
||||||
are supervised via a nursery construct). Each task which sends
|
to a task in a "peer" actor (not necessarily a parent-child,
|
||||||
messages to a task in a "peer" (not necessarily a parent-child,
|
|
||||||
depth hierarchy) is able to do so via an "address", which maps
|
depth hierarchy) is able to do so via an "address", which maps
|
||||||
IPC connections across memory boundaries, and a task request id
|
IPC connections across memory boundaries, and a task request id
|
||||||
which allows for per-actor tasks to send and receive messages
|
which allows for per-actor tasks to send and receive messages to
|
||||||
to specific peer-actor tasks with which there is an ongoing
|
specific peer-actor tasks with which there is an ongoing RPC/IPC
|
||||||
RPC/IPC dialog.
|
dialog.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# ugh, we need to get rid of this and replace with a "registry" sys
|
# ugh, we need to get rid of this and replace with a "registry" sys
|
||||||
|
@ -230,17 +231,20 @@ class Actor:
|
||||||
# by the user (currently called the "arbiter")
|
# by the user (currently called the "arbiter")
|
||||||
self._spawn_method: str = spawn_method
|
self._spawn_method: str = spawn_method
|
||||||
|
|
||||||
self._peers: defaultdict = defaultdict(list)
|
self._peers: defaultdict[
|
||||||
|
str, # uaid
|
||||||
|
list[Channel], # IPC conns from peer
|
||||||
|
] = defaultdict(list)
|
||||||
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
|
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
|
||||||
self._no_more_peers = trio.Event()
|
self._no_more_peers = trio.Event()
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
|
|
||||||
|
# RPC state
|
||||||
self._ongoing_rpc_tasks = trio.Event()
|
self._ongoing_rpc_tasks = trio.Event()
|
||||||
self._ongoing_rpc_tasks.set()
|
self._ongoing_rpc_tasks.set()
|
||||||
|
|
||||||
# (chan, cid) -> (cancel_scope, func)
|
|
||||||
self._rpc_tasks: dict[
|
self._rpc_tasks: dict[
|
||||||
tuple[Channel, str],
|
tuple[Channel, str], # (chan, cid)
|
||||||
tuple[Context, Callable, trio.Event]
|
tuple[Context, Callable, trio.Event] # (ctx=>, fn(), done?)
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
# map {actor uids -> Context}
|
# map {actor uids -> Context}
|
||||||
|
@ -317,7 +321,10 @@ class Actor:
|
||||||
event = self._peer_connected.setdefault(uid, trio.Event())
|
event = self._peer_connected.setdefault(uid, trio.Event())
|
||||||
await event.wait()
|
await event.wait()
|
||||||
log.debug(f'{uid!r} successfully connected back to us')
|
log.debug(f'{uid!r} successfully connected back to us')
|
||||||
return event, self._peers[uid][-1]
|
return (
|
||||||
|
event,
|
||||||
|
self._peers[uid][-1],
|
||||||
|
)
|
||||||
|
|
||||||
def load_modules(
|
def load_modules(
|
||||||
self,
|
self,
|
||||||
|
@ -408,26 +415,11 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
self._no_more_peers = trio.Event() # unset by making new
|
self._no_more_peers = trio.Event() # unset by making new
|
||||||
chan = Channel.from_stream(stream)
|
chan = Channel.from_stream(stream)
|
||||||
their_uid: tuple[str, str]|None = chan.uid
|
con_status: str = (
|
||||||
|
'New inbound IPC connection <=\n'
|
||||||
con_status: str = ''
|
|
||||||
|
|
||||||
# TODO: remove this branch since can never happen?
|
|
||||||
# NOTE: `.uid` is only set after first contact
|
|
||||||
if their_uid:
|
|
||||||
con_status = (
|
|
||||||
'IPC Re-connection from already known peer?\n'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
con_status = (
|
|
||||||
'New inbound IPC connection <=\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
con_status += (
|
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
# f' |_@{chan.raddr}\n\n'
|
|
||||||
# ^-TODO-^ remove since alfready in chan.__repr__()?
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# send/receive initial handshake response
|
# send/receive initial handshake response
|
||||||
try:
|
try:
|
||||||
uid: tuple|None = await self._do_handshake(chan)
|
uid: tuple|None = await self._do_handshake(chan)
|
||||||
|
@ -439,10 +431,10 @@ class Actor:
|
||||||
|
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
):
|
):
|
||||||
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
# XXX: This may propagate up from `Channel._aiter_recv()`
|
||||||
# and ``MsgpackStream._inter_packets()`` on a read from the
|
# and `MsgpackStream._inter_packets()` on a read from the
|
||||||
# stream particularly when the runtime is first starting up
|
# stream particularly when the runtime is first starting up
|
||||||
# inside ``open_root_actor()`` where there is a check for
|
# inside `open_root_actor()` where there is a check for
|
||||||
# a bound listener on the "arbiter" addr. the reset will be
|
# a bound listener on the "arbiter" addr. the reset will be
|
||||||
# because the handshake was never meant took place.
|
# because the handshake was never meant took place.
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -452,9 +444,22 @@ class Actor:
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
familiar: str = 'new-peer'
|
||||||
|
if _pre_chan := self._peers.get(uid):
|
||||||
|
familiar: str = 'pre-existing-peer'
|
||||||
|
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||||
con_status += (
|
con_status += (
|
||||||
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
|
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if _pre_chan:
|
||||||
|
log.warning(
|
||||||
|
# con_status += (
|
||||||
|
# ^TODO^ swap once we minimize conn duplication
|
||||||
|
f' -> Wait, we already have IPC with `{uid_short}`??\n'
|
||||||
|
f' |_{_pre_chan}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# IPC connection tracking for both peers and new children:
|
# IPC connection tracking for both peers and new children:
|
||||||
# - if this is a new channel to a locally spawned
|
# - if this is a new channel to a locally spawned
|
||||||
# sub-actor there will be a spawn wait even registered
|
# sub-actor there will be a spawn wait even registered
|
||||||
|
@ -507,8 +512,9 @@ class Actor:
|
||||||
)
|
)
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'IPC transport msg loop was cancelled for \n'
|
'IPC transport msg loop was cancelled\n'
|
||||||
f'|_{chan}\n'
|
f'c)>\n'
|
||||||
|
f' |_{chan}\n'
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -545,9 +551,9 @@ class Actor:
|
||||||
|
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Waiting on cancel request to peer\n'
|
'Waiting on cancel request to peer..\n'
|
||||||
f'c)=>\n'
|
f'c)=>\n'
|
||||||
f' |_{chan.uid}\n'
|
f' |_{chan.uid}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
@ -646,10 +652,14 @@ class Actor:
|
||||||
):
|
):
|
||||||
report: str = (
|
report: str = (
|
||||||
'Timed out waiting on local actor-nursery to exit?\n'
|
'Timed out waiting on local actor-nursery to exit?\n'
|
||||||
f'{local_nursery}\n'
|
f'c)>\n'
|
||||||
|
f' |_{local_nursery}\n'
|
||||||
)
|
)
|
||||||
if children := local_nursery._children:
|
if children := local_nursery._children:
|
||||||
report += f' |_{pformat(children)}\n'
|
# indent from above local-nurse repr
|
||||||
|
report += (
|
||||||
|
f' |_{pformat(children)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
log.warning(report)
|
log.warning(report)
|
||||||
|
|
||||||
|
@ -1236,8 +1246,9 @@ class Actor:
|
||||||
# TODO: just use the new `Context.repr_rpc: str` (and
|
# TODO: just use the new `Context.repr_rpc: str` (and
|
||||||
# other) repr fields instead of doing this all manual..
|
# other) repr fields instead of doing this all manual..
|
||||||
msg: str = (
|
msg: str = (
|
||||||
f'Runtime cancel request from {requester_type}:\n\n'
|
f'Actor-runtime cancel request from {requester_type}\n\n'
|
||||||
f'<= .cancel(): {requesting_uid}\n\n'
|
f'<=c) {requesting_uid}\n'
|
||||||
|
f' |_{self}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: what happens here when we self-cancel tho?
|
# TODO: what happens here when we self-cancel tho?
|
||||||
|
@ -1347,7 +1358,7 @@ class Actor:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Rxed cancel request for RPC task\n'
|
'Rxed cancel request for RPC task\n'
|
||||||
f'<=c) {requesting_uid}\n'
|
f'<=c) {requesting_uid}\n'
|
||||||
f' |_{ctx._task}\n'
|
f' |_{ctx._task}\n'
|
||||||
f' >> {ctx.repr_rpc}\n'
|
f' >> {ctx.repr_rpc}\n'
|
||||||
# f'=> {ctx._task}\n'
|
# f'=> {ctx._task}\n'
|
||||||
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
||||||
|
@ -1465,17 +1476,17 @@ class Actor:
|
||||||
"IPC channel's "
|
"IPC channel's "
|
||||||
)
|
)
|
||||||
rent_chan_repr: str = (
|
rent_chan_repr: str = (
|
||||||
f' |_{parent_chan}\n\n'
|
f' |_{parent_chan}\n\n'
|
||||||
if parent_chan
|
if parent_chan
|
||||||
else ''
|
else ''
|
||||||
)
|
)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Cancelling {descr} RPC tasks\n\n'
|
f'Cancelling {descr} RPC tasks\n\n'
|
||||||
f'<= canceller: {req_uid}\n'
|
f'<=c) {req_uid} [canceller]\n'
|
||||||
f'{rent_chan_repr}'
|
f'{rent_chan_repr}'
|
||||||
f'=> cancellee: {self.uid}\n'
|
f'c)=> {self.uid} [cancellee]\n'
|
||||||
f' |_{self}.cancel_rpc_tasks()\n'
|
f' |_{self} [with {len(tasks)} tasks]\n'
|
||||||
f' |_tasks: {len(tasks)}\n'
|
# f' |_tasks: {len(tasks)}\n'
|
||||||
# f'{tasks_str}'
|
# f'{tasks_str}'
|
||||||
)
|
)
|
||||||
for (
|
for (
|
||||||
|
@ -1544,7 +1555,7 @@ class Actor:
|
||||||
def accept_addr(self) -> tuple[str, int]:
|
def accept_addr(self) -> tuple[str, int]:
|
||||||
'''
|
'''
|
||||||
Primary address to which the IPC transport server is
|
Primary address to which the IPC transport server is
|
||||||
bound.
|
bound and listening for new connections.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# throws OSError on failure
|
# throws OSError on failure
|
||||||
|
@ -1561,6 +1572,7 @@ class Actor:
|
||||||
def get_chans(
|
def get_chans(
|
||||||
self,
|
self,
|
||||||
uid: tuple[str, str],
|
uid: tuple[str, str],
|
||||||
|
|
||||||
) -> list[Channel]:
|
) -> list[Channel]:
|
||||||
'''
|
'''
|
||||||
Return all IPC channels to the actor with provided `uid`.
|
Return all IPC channels to the actor with provided `uid`.
|
||||||
|
@ -1932,9 +1944,15 @@ async def async_main(
|
||||||
with CancelScope(shield=True):
|
with CancelScope(shield=True):
|
||||||
await actor._no_more_peers.wait()
|
await actor._no_more_peers.wait()
|
||||||
|
|
||||||
teardown_report += ('-> All peer channels are complete\n')
|
teardown_report += (
|
||||||
|
'-> All peer channels are complete\n'
|
||||||
|
)
|
||||||
|
|
||||||
teardown_report += ('Actor runtime exited')
|
teardown_report += (
|
||||||
|
'Actor runtime exiting\n'
|
||||||
|
f'>)\n'
|
||||||
|
f'|_{actor}\n'
|
||||||
|
)
|
||||||
log.info(teardown_report)
|
log.info(teardown_report)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,25 @@ def examples_dir() -> pathlib.Path:
|
||||||
return repodir() / 'examples'
|
return repodir() / 'examples'
|
||||||
|
|
||||||
|
|
||||||
|
def mk_cmd(
|
||||||
|
ex_name: str,
|
||||||
|
exs_subpath: str = 'debugging',
|
||||||
|
) -> str:
|
||||||
|
'''
|
||||||
|
Generate a shell command suitable to pass to ``pexpect.spawn()``.
|
||||||
|
|
||||||
|
'''
|
||||||
|
script_path: pathlib.Path = (
|
||||||
|
examples_dir()
|
||||||
|
/ exs_subpath
|
||||||
|
/ f'{ex_name}.py'
|
||||||
|
)
|
||||||
|
return ' '.join([
|
||||||
|
'python',
|
||||||
|
str(script_path)
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def expect_ctxc(
|
async def expect_ctxc(
|
||||||
yay: bool,
|
yay: bool,
|
||||||
|
|
|
@ -26,7 +26,7 @@ from ._debug import (
|
||||||
breakpoint as breakpoint,
|
breakpoint as breakpoint,
|
||||||
pause as pause,
|
pause as pause,
|
||||||
pause_from_sync as pause_from_sync,
|
pause_from_sync as pause_from_sync,
|
||||||
shield_sigint_handler as shield_sigint_handler,
|
sigint_shield as sigint_shield,
|
||||||
open_crash_handler as open_crash_handler,
|
open_crash_handler as open_crash_handler,
|
||||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||||
maybe_init_greenback as maybe_init_greenback,
|
maybe_init_greenback as maybe_init_greenback,
|
||||||
|
|
|
@ -409,9 +409,9 @@ class Lock:
|
||||||
repl_task
|
repl_task
|
||||||
)
|
)
|
||||||
message += (
|
message += (
|
||||||
f'\nA non-caller task still owns this lock on behalf of '
|
f'A non-caller task still owns this lock on behalf of '
|
||||||
f'{behalf_of_task}\n'
|
f'`{behalf_of_task}`\n'
|
||||||
f'|_{lock_stats.owner}\n'
|
f'lock owner task: {lock_stats.owner}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
|
@ -523,6 +523,10 @@ class Lock:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_lock() -> Lock:
|
||||||
|
return Lock
|
||||||
|
|
||||||
|
|
||||||
@tractor.context(
|
@tractor.context(
|
||||||
# enable the locking msgspec
|
# enable the locking msgspec
|
||||||
pld_spec=__pld_spec__,
|
pld_spec=__pld_spec__,
|
||||||
|
@ -788,13 +792,13 @@ class DebugStatus:
|
||||||
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
|
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
|
||||||
signal.signal,
|
signal.signal,
|
||||||
signal.SIGINT,
|
signal.SIGINT,
|
||||||
shield_sigint_handler,
|
sigint_shield,
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
cls._orig_sigint_handler = signal.signal(
|
cls._orig_sigint_handler = signal.signal(
|
||||||
signal.SIGINT,
|
signal.SIGINT,
|
||||||
shield_sigint_handler,
|
sigint_shield,
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -900,12 +904,30 @@ class DebugStatus:
|
||||||
|
|
||||||
# actor-local state, irrelevant for non-root.
|
# actor-local state, irrelevant for non-root.
|
||||||
cls.repl_task = None
|
cls.repl_task = None
|
||||||
|
|
||||||
|
# XXX WARNING needs very special caughtion, and we should
|
||||||
|
# prolly make a more explicit `@property` API?
|
||||||
|
#
|
||||||
|
# - if unset in root multi-threaded case can cause
|
||||||
|
# issues with detecting that some root thread is
|
||||||
|
# using a REPL,
|
||||||
|
#
|
||||||
|
# - what benefit is there to unsetting, it's always
|
||||||
|
# set again for the next task in some actor..
|
||||||
|
# only thing would be to avoid in the sigint-handler
|
||||||
|
# logging when we don't need to?
|
||||||
cls.repl = None
|
cls.repl = None
|
||||||
|
|
||||||
# restore original sigint handler
|
# restore original sigint handler
|
||||||
cls.unshield_sigint()
|
cls.unshield_sigint()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: use the new `@lowlevel.singleton` for this!
|
||||||
|
def get_debug_req() -> DebugStatus|None:
|
||||||
|
return DebugStatus
|
||||||
|
|
||||||
|
|
||||||
class TractorConfig(pdbp.DefaultConfig):
|
class TractorConfig(pdbp.DefaultConfig):
|
||||||
'''
|
'''
|
||||||
Custom `pdbp` config which tries to use the best tradeoff
|
Custom `pdbp` config which tries to use the best tradeoff
|
||||||
|
@ -1311,7 +1333,7 @@ def any_connected_locker_child() -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def shield_sigint_handler(
|
def sigint_shield(
|
||||||
signum: int,
|
signum: int,
|
||||||
frame: 'frame', # type: ignore # noqa
|
frame: 'frame', # type: ignore # noqa
|
||||||
*args,
|
*args,
|
||||||
|
@ -1351,13 +1373,17 @@ def shield_sigint_handler(
|
||||||
# root actor branch that reports whether or not a child
|
# root actor branch that reports whether or not a child
|
||||||
# has locked debugger.
|
# has locked debugger.
|
||||||
if is_root_process():
|
if is_root_process():
|
||||||
|
# log.warning(
|
||||||
|
log.devx(
|
||||||
|
'Handling SIGINT in root actor\n'
|
||||||
|
f'{Lock.repr()}'
|
||||||
|
f'{DebugStatus.repr()}\n'
|
||||||
|
)
|
||||||
# try to see if the supposed (sub)actor in debug still
|
# try to see if the supposed (sub)actor in debug still
|
||||||
# has an active connection to *this* actor, and if not
|
# has an active connection to *this* actor, and if not
|
||||||
# it's likely they aren't using the TTY lock / debugger
|
# it's likely they aren't using the TTY lock / debugger
|
||||||
# and we should propagate SIGINT normally.
|
# and we should propagate SIGINT normally.
|
||||||
any_connected: bool = any_connected_locker_child()
|
any_connected: bool = any_connected_locker_child()
|
||||||
# if not any_connected:
|
|
||||||
# return do_cancel()
|
|
||||||
|
|
||||||
problem = (
|
problem = (
|
||||||
f'root {actor.uid} handling SIGINT\n'
|
f'root {actor.uid} handling SIGINT\n'
|
||||||
|
@ -1406,19 +1432,25 @@ def shield_sigint_handler(
|
||||||
# an actor using the `Lock` (a bug state) ??
|
# an actor using the `Lock` (a bug state) ??
|
||||||
# => so immediately cancel any stale lock cs and revert
|
# => so immediately cancel any stale lock cs and revert
|
||||||
# the handler!
|
# the handler!
|
||||||
if not repl:
|
if not DebugStatus.repl:
|
||||||
# TODO: WHEN should we revert back to ``trio``
|
# TODO: WHEN should we revert back to ``trio``
|
||||||
# handler if this one is stale?
|
# handler if this one is stale?
|
||||||
# -[ ] maybe after a counts work of ctl-c mashes?
|
# -[ ] maybe after a counts work of ctl-c mashes?
|
||||||
# -[ ] use a state var like `stale_handler: bool`?
|
# -[ ] use a state var like `stale_handler: bool`?
|
||||||
problem += (
|
problem += (
|
||||||
'\n'
|
|
||||||
'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n'
|
'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n'
|
||||||
'BUT, the root should be using it, WHY this handler ??\n'
|
'BUT, the root should be using it, WHY this handler ??\n\n'
|
||||||
|
'So either..\n'
|
||||||
|
'- some root-thread is using it but has no `.repl` set?, OR\n'
|
||||||
|
'- something else weird is going on outside the runtime!?\n'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
# NOTE: since we emit this msg on ctl-c, we should
|
||||||
|
# also always re-print the prompt the tail block!
|
||||||
log.pdb(
|
log.pdb(
|
||||||
'Ignoring SIGINT while pdb REPL in use by root actor..\n'
|
'Ignoring SIGINT while pdb REPL in use by root actor..\n'
|
||||||
|
f'{DebugStatus.repl_task}\n'
|
||||||
|
f' |_{repl}\n'
|
||||||
)
|
)
|
||||||
problem = None
|
problem = None
|
||||||
|
|
||||||
|
@ -1468,7 +1500,6 @@ def shield_sigint_handler(
|
||||||
'Allowing SIGINT propagation..'
|
'Allowing SIGINT propagation..'
|
||||||
)
|
)
|
||||||
DebugStatus.unshield_sigint()
|
DebugStatus.unshield_sigint()
|
||||||
# do_cancel()
|
|
||||||
|
|
||||||
repl_task: str|None = DebugStatus.repl_task
|
repl_task: str|None = DebugStatus.repl_task
|
||||||
req_task: str|None = DebugStatus.req_task
|
req_task: str|None = DebugStatus.req_task
|
||||||
|
@ -1483,10 +1514,15 @@ def shield_sigint_handler(
|
||||||
f' |_{repl}\n'
|
f' |_{repl}\n'
|
||||||
)
|
)
|
||||||
elif req_task:
|
elif req_task:
|
||||||
log.pdb(
|
log.debug(
|
||||||
f'Ignoring SIGINT while debug request task is open\n'
|
'Ignoring SIGINT while debug request task is open but either,\n'
|
||||||
|
'- someone else is already REPL-in and has the `Lock`, or\n'
|
||||||
|
'- some other local task already is replin?\n'
|
||||||
f'|_{req_task}\n'
|
f'|_{req_task}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO can we remove this now?
|
||||||
|
# -[ ] does this path ever get hit any more?
|
||||||
else:
|
else:
|
||||||
msg: str = (
|
msg: str = (
|
||||||
'SIGINT shield handler still active BUT, \n\n'
|
'SIGINT shield handler still active BUT, \n\n'
|
||||||
|
@ -1522,37 +1558,53 @@ def shield_sigint_handler(
|
||||||
# https://github.com/goodboy/tractor/issues/320
|
# https://github.com/goodboy/tractor/issues/320
|
||||||
# elif debug_mode():
|
# elif debug_mode():
|
||||||
|
|
||||||
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
|
|
||||||
# it looks to be that the last command that was run (eg. ll)
|
|
||||||
# will be repeated by default.
|
|
||||||
|
|
||||||
# maybe redraw/print last REPL output to console since
|
# maybe redraw/print last REPL output to console since
|
||||||
# we want to alert the user that more input is expect since
|
# we want to alert the user that more input is expect since
|
||||||
# nothing has been done dur to ignoring sigint.
|
# nothing has been done dur to ignoring sigint.
|
||||||
if (
|
if (
|
||||||
repl # only when current actor has a REPL engaged
|
DebugStatus.repl # only when current actor has a REPL engaged
|
||||||
):
|
):
|
||||||
|
flush_status: str = (
|
||||||
|
'Flushing stdout to ensure new prompt line!\n'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: yah, mega hack, but how else do we catch this madness XD
|
# XXX: yah, mega hack, but how else do we catch this madness XD
|
||||||
if repl.shname == 'xonsh':
|
if (
|
||||||
|
repl.shname == 'xonsh'
|
||||||
|
):
|
||||||
|
flush_status += (
|
||||||
|
'-> ALSO re-flushing due to `xonsh`..\n'
|
||||||
|
)
|
||||||
repl.stdout.write(repl.prompt)
|
repl.stdout.write(repl.prompt)
|
||||||
|
|
||||||
|
# log.warning(
|
||||||
|
log.devx(
|
||||||
|
flush_status
|
||||||
|
)
|
||||||
repl.stdout.flush()
|
repl.stdout.flush()
|
||||||
|
|
||||||
# TODO: make this work like sticky mode where if there is output
|
# TODO: better console UX to match the current "mode":
|
||||||
# detected as written to the tty we redraw this part underneath
|
# -[ ] for example if in sticky mode where if there is output
|
||||||
# and erase the past draw of this same bit above?
|
# detected as written to the tty we redraw this part underneath
|
||||||
|
# and erase the past draw of this same bit above?
|
||||||
# repl.sticky = True
|
# repl.sticky = True
|
||||||
# repl._print_if_sticky()
|
# repl._print_if_sticky()
|
||||||
|
|
||||||
# also see these links for an approach from ``ptk``:
|
# also see these links for an approach from `ptk`:
|
||||||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||||
|
else:
|
||||||
|
log.devx(
|
||||||
|
# log.warning(
|
||||||
|
'Not flushing stdout since not needed?\n'
|
||||||
|
f'|_{repl}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX only for tracing this handler
|
# XXX only for tracing this handler
|
||||||
log.devx('exiting SIGINT')
|
log.devx('exiting SIGINT')
|
||||||
|
|
||||||
|
|
||||||
_pause_msg: str = 'Attaching to pdb REPL in actor'
|
_pause_msg: str = 'Opening a pdb REPL in paused actor'
|
||||||
|
|
||||||
|
|
||||||
class DebugRequestError(RuntimeError):
|
class DebugRequestError(RuntimeError):
|
||||||
|
@ -1617,7 +1669,7 @@ async def _pause(
|
||||||
# 'directly (infected) `asyncio` tasks!'
|
# 'directly (infected) `asyncio` tasks!'
|
||||||
# ) from rte
|
# ) from rte
|
||||||
|
|
||||||
raise
|
raise rte
|
||||||
|
|
||||||
if debug_func is not None:
|
if debug_func is not None:
|
||||||
debug_func = partial(debug_func)
|
debug_func = partial(debug_func)
|
||||||
|
@ -1625,9 +1677,13 @@ async def _pause(
|
||||||
# XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug
|
# XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug
|
||||||
# request from a subactor BEFORE the REPL is entered by that
|
# request from a subactor BEFORE the REPL is entered by that
|
||||||
# process.
|
# process.
|
||||||
if not repl:
|
if (
|
||||||
|
not repl
|
||||||
|
and
|
||||||
|
debug_func
|
||||||
|
):
|
||||||
|
repl: PdbREPL = mk_pdb()
|
||||||
DebugStatus.shield_sigint()
|
DebugStatus.shield_sigint()
|
||||||
repl: PdbREPL = repl or mk_pdb()
|
|
||||||
|
|
||||||
# TODO: move this into a `open_debug_request()` @acm?
|
# TODO: move this into a `open_debug_request()` @acm?
|
||||||
# -[ ] prolly makes the most sense to do the request
|
# -[ ] prolly makes the most sense to do the request
|
||||||
|
@ -1662,7 +1718,13 @@ async def _pause(
|
||||||
# recurrent entries/requests from the same
|
# recurrent entries/requests from the same
|
||||||
# actor-local task.
|
# actor-local task.
|
||||||
DebugStatus.repl_task = task
|
DebugStatus.repl_task = task
|
||||||
DebugStatus.repl = repl
|
if repl:
|
||||||
|
DebugStatus.repl = repl
|
||||||
|
else:
|
||||||
|
log.error(
|
||||||
|
'No REPl instance set before entering `debug_func`?\n'
|
||||||
|
f'{debug_func}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# invoke the low-level REPL activation routine which itself
|
# invoke the low-level REPL activation routine which itself
|
||||||
# should call into a `Pdb.set_trace()` of some sort.
|
# should call into a `Pdb.set_trace()` of some sort.
|
||||||
|
@ -2001,7 +2063,7 @@ async def _pause(
|
||||||
DebugStatus.release(cancel_req_task=True)
|
DebugStatus.release(cancel_req_task=True)
|
||||||
|
|
||||||
# sanity checks for ^ on request/status teardown
|
# sanity checks for ^ on request/status teardown
|
||||||
assert DebugStatus.repl is None
|
# assert DebugStatus.repl is None # XXX no more bc bg thread cases?
|
||||||
assert DebugStatus.repl_task is None
|
assert DebugStatus.repl_task is None
|
||||||
|
|
||||||
# sanity, for when hackin on all this?
|
# sanity, for when hackin on all this?
|
||||||
|
@ -2050,9 +2112,8 @@ def _set_trace(
|
||||||
# root here? Bo
|
# root here? Bo
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{_pause_msg}\n'
|
f'{_pause_msg}\n'
|
||||||
# '|\n'
|
|
||||||
f'>(\n'
|
f'>(\n'
|
||||||
f' |_ {task} @ {actor.uid}\n'
|
f'|_ {task} @ {actor.uid}\n'
|
||||||
# ^-TODO-^ more compact pformating?
|
# ^-TODO-^ more compact pformating?
|
||||||
# -[ ] make an `Actor.__repr()__`
|
# -[ ] make an `Actor.__repr()__`
|
||||||
# -[ ] should we use `log.pformat_task_uid()`?
|
# -[ ] should we use `log.pformat_task_uid()`?
|
||||||
|
@ -2241,7 +2302,12 @@ async def _pause_from_bg_root_thread(
|
||||||
'Trying to acquire `Lock` on behalf of bg thread\n'
|
'Trying to acquire `Lock` on behalf of bg thread\n'
|
||||||
f'|_{behalf_of_thread}\n'
|
f'|_{behalf_of_thread}\n'
|
||||||
)
|
)
|
||||||
# DebugStatus.repl_task = behalf_of_thread
|
|
||||||
|
# NOTE: this is already a task inside the main-`trio`-thread, so
|
||||||
|
# we don't need to worry about calling it another time from the
|
||||||
|
# bg thread on which who's behalf this task is operating.
|
||||||
|
DebugStatus.shield_sigint()
|
||||||
|
|
||||||
out = await _pause(
|
out = await _pause(
|
||||||
debug_func=None,
|
debug_func=None,
|
||||||
repl=repl,
|
repl=repl,
|
||||||
|
@ -2250,6 +2316,8 @@ async def _pause_from_bg_root_thread(
|
||||||
called_from_bg_thread=True,
|
called_from_bg_thread=True,
|
||||||
**_pause_kwargs
|
**_pause_kwargs
|
||||||
)
|
)
|
||||||
|
DebugStatus.repl_task = behalf_of_thread
|
||||||
|
|
||||||
lock: trio.FIFOLock = Lock._debug_lock
|
lock: trio.FIFOLock = Lock._debug_lock
|
||||||
stats: trio.LockStatistics= lock.statistics()
|
stats: trio.LockStatistics= lock.statistics()
|
||||||
assert stats.owner is task
|
assert stats.owner is task
|
||||||
|
@ -2283,7 +2351,6 @@ async def _pause_from_bg_root_thread(
|
||||||
f'|_{behalf_of_thread}\n'
|
f'|_{behalf_of_thread}\n'
|
||||||
)
|
)
|
||||||
task_status.started(out)
|
task_status.started(out)
|
||||||
DebugStatus.shield_sigint()
|
|
||||||
|
|
||||||
# wait for bg thread to exit REPL sesh.
|
# wait for bg thread to exit REPL sesh.
|
||||||
try:
|
try:
|
||||||
|
@ -2324,7 +2391,7 @@ def pause_from_sync(
|
||||||
err_on_no_runtime=False,
|
err_on_no_runtime=False,
|
||||||
)
|
)
|
||||||
message: str = (
|
message: str = (
|
||||||
f'{actor.uid} task called `tractor.pause_from_sync()`\n\n'
|
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
|
||||||
)
|
)
|
||||||
if not actor:
|
if not actor:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
@ -2348,7 +2415,6 @@ def pause_from_sync(
|
||||||
'for infected `asyncio` mode!'
|
'for infected `asyncio` mode!'
|
||||||
)
|
)
|
||||||
|
|
||||||
DebugStatus.shield_sigint()
|
|
||||||
repl: PdbREPL = mk_pdb()
|
repl: PdbREPL = mk_pdb()
|
||||||
|
|
||||||
# message += f'-> created local REPL {repl}\n'
|
# message += f'-> created local REPL {repl}\n'
|
||||||
|
@ -2366,6 +2432,10 @@ def pause_from_sync(
|
||||||
# thread which will call `._pause()` manually with special
|
# thread which will call `._pause()` manually with special
|
||||||
# handling for root-actor caller usage.
|
# handling for root-actor caller usage.
|
||||||
if not DebugStatus.is_main_trio_thread():
|
if not DebugStatus.is_main_trio_thread():
|
||||||
|
|
||||||
|
# TODO: `threading.Lock()` this so we don't get races in
|
||||||
|
# multi-thr cases where they're acquiring/releasing the
|
||||||
|
# REPL and setting request/`Lock` state, etc..
|
||||||
thread: threading.Thread = threading.current_thread()
|
thread: threading.Thread = threading.current_thread()
|
||||||
repl_owner = thread
|
repl_owner = thread
|
||||||
|
|
||||||
|
@ -2373,9 +2443,16 @@ def pause_from_sync(
|
||||||
if is_root:
|
if is_root:
|
||||||
message += (
|
message += (
|
||||||
f'-> called from a root-actor bg {thread}\n'
|
f'-> called from a root-actor bg {thread}\n'
|
||||||
f'-> scheduling `._pause_from_sync_thread()`..\n'
|
f'-> scheduling `._pause_from_bg_root_thread()`..\n'
|
||||||
)
|
)
|
||||||
bg_task, repl = trio.from_thread.run(
|
# XXX SUBTLE BADNESS XXX that should really change!
|
||||||
|
# don't over-write the `repl` here since when
|
||||||
|
# this behalf-of-bg_thread-task calls pause it will
|
||||||
|
# pass `debug_func=None` which will result in it
|
||||||
|
# returing a `repl==None` output and that get's also
|
||||||
|
# `.started(out)` back here! So instead just ignore
|
||||||
|
# that output and assign the `repl` created above!
|
||||||
|
bg_task, _ = trio.from_thread.run(
|
||||||
afn=partial(
|
afn=partial(
|
||||||
actor._service_n.start,
|
actor._service_n.start,
|
||||||
partial(
|
partial(
|
||||||
|
@ -2387,8 +2464,9 @@ def pause_from_sync(
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
DebugStatus.shield_sigint()
|
||||||
message += (
|
message += (
|
||||||
f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n'
|
f'-> `._pause_from_bg_root_thread()` started bg task {bg_task}\n'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
message += f'-> called from a bg {thread}\n'
|
message += f'-> called from a bg {thread}\n'
|
||||||
|
@ -2397,7 +2475,7 @@ def pause_from_sync(
|
||||||
# `request_root_stdio_lock()` and we don't need to
|
# `request_root_stdio_lock()` and we don't need to
|
||||||
# worry about all the special considerations as with
|
# worry about all the special considerations as with
|
||||||
# the root-actor per above.
|
# the root-actor per above.
|
||||||
bg_task, repl = trio.from_thread.run(
|
bg_task, _ = trio.from_thread.run(
|
||||||
afn=partial(
|
afn=partial(
|
||||||
_pause,
|
_pause,
|
||||||
debug_func=None,
|
debug_func=None,
|
||||||
|
@ -2412,6 +2490,9 @@ def pause_from_sync(
|
||||||
**_pause_kwargs
|
**_pause_kwargs
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
# ?TODO? XXX where do we NEED to call this in the
|
||||||
|
# subactor-bg-thread case?
|
||||||
|
DebugStatus.shield_sigint()
|
||||||
assert bg_task is not DebugStatus.repl_task
|
assert bg_task is not DebugStatus.repl_task
|
||||||
|
|
||||||
else: # we are presumably the `trio.run()` + main thread
|
else: # we are presumably the `trio.run()` + main thread
|
||||||
|
@ -2424,6 +2505,11 @@ def pause_from_sync(
|
||||||
# greenback: ModuleType = await maybe_init_greenback()
|
# greenback: ModuleType = await maybe_init_greenback()
|
||||||
|
|
||||||
message += f'-> imported {greenback}\n'
|
message += f'-> imported {greenback}\n'
|
||||||
|
|
||||||
|
# NOTE XXX seems to need to be set BEFORE the `_pause()`
|
||||||
|
# invoke using gb below?
|
||||||
|
DebugStatus.shield_sigint()
|
||||||
|
|
||||||
repl_owner: Task = current_task()
|
repl_owner: Task = current_task()
|
||||||
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
||||||
try:
|
try:
|
||||||
|
@ -2449,9 +2535,12 @@ def pause_from_sync(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if out:
|
if out:
|
||||||
bg_task, repl = out
|
bg_task, _ = out
|
||||||
assert repl is repl
|
else:
|
||||||
assert bg_task is repl_owner
|
bg_task: Task = current_task()
|
||||||
|
|
||||||
|
# assert repl is repl
|
||||||
|
assert bg_task is repl_owner
|
||||||
|
|
||||||
# NOTE: normally set inside `_enter_repl_sync()`
|
# NOTE: normally set inside `_enter_repl_sync()`
|
||||||
DebugStatus.repl_task: str = repl_owner
|
DebugStatus.repl_task: str = repl_owner
|
||||||
|
@ -2465,7 +2554,10 @@ def pause_from_sync(
|
||||||
)
|
)
|
||||||
log.devx(message)
|
log.devx(message)
|
||||||
|
|
||||||
|
# NOTE set as late as possible to avoid state clobbering
|
||||||
|
# in the multi-threaded case!
|
||||||
DebugStatus.repl = repl
|
DebugStatus.repl = repl
|
||||||
|
|
||||||
_set_trace(
|
_set_trace(
|
||||||
api_frame=api_frame or inspect.currentframe(),
|
api_frame=api_frame or inspect.currentframe(),
|
||||||
repl=repl,
|
repl=repl,
|
||||||
|
@ -2523,7 +2615,7 @@ async def breakpoint(
|
||||||
|
|
||||||
|
|
||||||
_crash_msg: str = (
|
_crash_msg: str = (
|
||||||
'Attaching to pdb REPL in crashed actor'
|
'Opening a pdb REPL in crashed actor'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -2551,11 +2643,9 @@ def _post_mortem(
|
||||||
# here! Bo
|
# here! Bo
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{_crash_msg}\n'
|
f'{_crash_msg}\n'
|
||||||
# '|\n'
|
|
||||||
f'x>(\n'
|
f'x>(\n'
|
||||||
f' |_ {current_task()} @ {actor.uid}\n'
|
f' |_ {current_task()} @ {actor.uid}\n'
|
||||||
|
|
||||||
# f'|_ @{actor.uid}\n'
|
|
||||||
# TODO: make an `Actor.__repr()__`
|
# TODO: make an `Actor.__repr()__`
|
||||||
# f'|_ {current_task()} @ {actor.name}\n'
|
# f'|_ {current_task()} @ {actor.name}\n'
|
||||||
)
|
)
|
||||||
|
@ -2668,7 +2758,8 @@ async def acquire_debug_lock(
|
||||||
tuple,
|
tuple,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Request to acquire the TTY `Lock` in the root actor, release on exit.
|
Request to acquire the TTY `Lock` in the root actor, release on
|
||||||
|
exit.
|
||||||
|
|
||||||
This helper is for actor's who don't actually need to acquired
|
This helper is for actor's who don't actually need to acquired
|
||||||
the debugger but want to wait until the lock is free in the
|
the debugger but want to wait until the lock is free in the
|
||||||
|
@ -2680,10 +2771,14 @@ async def acquire_debug_lock(
|
||||||
yield None
|
yield None
|
||||||
return
|
return
|
||||||
|
|
||||||
|
task: Task = current_task()
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
ctx: Context = await n.start(
|
ctx: Context = await n.start(
|
||||||
request_root_stdio_lock,
|
partial(
|
||||||
subactor_uid,
|
request_root_stdio_lock,
|
||||||
|
actor_uid=subactor_uid,
|
||||||
|
task_uid=(task.name, id(task)),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
yield ctx
|
yield ctx
|
||||||
ctx.cancel()
|
ctx.cancel()
|
||||||
|
|
|
@ -24,13 +24,24 @@ disjoint, parallel executing tasks in separate actors.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
# from functools import partial
|
||||||
|
from threading import (
|
||||||
|
current_thread,
|
||||||
|
Thread,
|
||||||
|
RLock,
|
||||||
|
)
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
from signal import (
|
from signal import (
|
||||||
signal,
|
signal,
|
||||||
|
getsignal,
|
||||||
SIGUSR1,
|
SIGUSR1,
|
||||||
)
|
)
|
||||||
import traceback
|
# import traceback
|
||||||
from typing import TYPE_CHECKING
|
from types import ModuleType
|
||||||
|
from typing import (
|
||||||
|
Callable,
|
||||||
|
TYPE_CHECKING,
|
||||||
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from tractor import (
|
from tractor import (
|
||||||
|
@ -51,26 +62,45 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
@trio.lowlevel.disable_ki_protection
|
@trio.lowlevel.disable_ki_protection
|
||||||
def dump_task_tree() -> None:
|
def dump_task_tree() -> None:
|
||||||
import stackscope
|
'''
|
||||||
from tractor.log import get_console_log
|
Do a classic `stackscope.extract()` task-tree dump to console at
|
||||||
|
`.devx()` level.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import stackscope
|
||||||
tree_str: str = str(
|
tree_str: str = str(
|
||||||
stackscope.extract(
|
stackscope.extract(
|
||||||
trio.lowlevel.current_root_task(),
|
trio.lowlevel.current_root_task(),
|
||||||
recurse_child_tasks=True
|
recurse_child_tasks=True
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log = get_console_log(
|
|
||||||
name=__name__,
|
|
||||||
level='cancel',
|
|
||||||
)
|
|
||||||
actor: Actor = _state.current_actor()
|
actor: Actor = _state.current_actor()
|
||||||
|
thr: Thread = current_thread()
|
||||||
log.devx(
|
log.devx(
|
||||||
f'Dumping `stackscope` tree for actor\n'
|
f'Dumping `stackscope` tree for actor\n'
|
||||||
f'{actor.name}: {actor}\n'
|
f'{actor.uid}:\n'
|
||||||
f' |_{mp.current_process()}\n\n'
|
f'|_{mp.current_process()}\n'
|
||||||
|
f' |_{thr}\n'
|
||||||
|
f' |_{actor}\n\n'
|
||||||
|
|
||||||
|
# start-of-trace-tree delimiter (mostly for testing)
|
||||||
|
'------ - ------\n'
|
||||||
|
'\n'
|
||||||
|
+
|
||||||
f'{tree_str}\n'
|
f'{tree_str}\n'
|
||||||
|
+
|
||||||
|
# end-of-trace-tree delimiter (mostly for testing)
|
||||||
|
f'\n'
|
||||||
|
f'------ {actor.uid!r} ------\n'
|
||||||
)
|
)
|
||||||
|
# TODO: can remove this right?
|
||||||
|
# -[ ] was original code from author
|
||||||
|
#
|
||||||
|
# print(
|
||||||
|
# 'DUMPING FROM PRINT\n'
|
||||||
|
# +
|
||||||
|
# content
|
||||||
|
# )
|
||||||
# import logging
|
# import logging
|
||||||
# try:
|
# try:
|
||||||
# with open("/dev/tty", "w") as tty:
|
# with open("/dev/tty", "w") as tty:
|
||||||
|
@ -80,58 +110,130 @@ def dump_task_tree() -> None:
|
||||||
# "task_tree"
|
# "task_tree"
|
||||||
# ).exception("Error printing task tree")
|
# ).exception("Error printing task tree")
|
||||||
|
|
||||||
|
_handler_lock = RLock()
|
||||||
|
_tree_dumped: bool = False
|
||||||
|
|
||||||
def signal_handler(
|
|
||||||
|
def dump_tree_on_sig(
|
||||||
sig: int,
|
sig: int,
|
||||||
frame: object,
|
frame: object,
|
||||||
|
|
||||||
relay_to_subs: bool = True,
|
relay_to_subs: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
try:
|
global _tree_dumped, _handler_lock
|
||||||
trio.lowlevel.current_trio_token(
|
with _handler_lock:
|
||||||
).run_sync_soon(dump_task_tree)
|
if _tree_dumped:
|
||||||
except RuntimeError:
|
log.warning(
|
||||||
# not in async context -- print a normal traceback
|
'Already dumped for this actor...??'
|
||||||
traceback.print_stack()
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
_tree_dumped = True
|
||||||
|
|
||||||
|
# actor: Actor = _state.current_actor()
|
||||||
|
log.devx(
|
||||||
|
'Trying to dump `stackscope` tree..\n'
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
dump_task_tree()
|
||||||
|
# await actor._service_n.start_soon(
|
||||||
|
# partial(
|
||||||
|
# trio.to_thread.run_sync,
|
||||||
|
# dump_task_tree,
|
||||||
|
# )
|
||||||
|
# )
|
||||||
|
# trio.lowlevel.current_trio_token().run_sync_soon(
|
||||||
|
# dump_task_tree
|
||||||
|
# )
|
||||||
|
|
||||||
|
except RuntimeError:
|
||||||
|
log.exception(
|
||||||
|
'Failed to dump `stackscope` tree..\n'
|
||||||
|
)
|
||||||
|
# not in async context -- print a normal traceback
|
||||||
|
# traceback.print_stack()
|
||||||
|
raise
|
||||||
|
|
||||||
|
except BaseException:
|
||||||
|
log.exception(
|
||||||
|
'Failed to dump `stackscope` tree..\n'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
log.devx(
|
||||||
|
'Supposedly we dumped just fine..?'
|
||||||
|
)
|
||||||
|
|
||||||
if not relay_to_subs:
|
if not relay_to_subs:
|
||||||
return
|
return
|
||||||
|
|
||||||
an: ActorNursery
|
an: ActorNursery
|
||||||
for an in _state.current_actor()._actoruid2nursery.values():
|
for an in _state.current_actor()._actoruid2nursery.values():
|
||||||
|
|
||||||
subproc: ProcessType
|
subproc: ProcessType
|
||||||
subactor: Actor
|
subactor: Actor
|
||||||
for subactor, subproc, _ in an._children.values():
|
for subactor, subproc, _ in an._children.values():
|
||||||
log.devx(
|
log.warning(
|
||||||
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
||||||
f'{subactor}\n'
|
f'{subactor}\n'
|
||||||
f' |_{subproc}\n'
|
f' |_{subproc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if isinstance(subproc, trio.Process):
|
# bc of course stdlib can't have a std API.. XD
|
||||||
subproc.send_signal(sig)
|
match subproc:
|
||||||
|
case trio.Process():
|
||||||
|
subproc.send_signal(sig)
|
||||||
|
|
||||||
elif isinstance(subproc, mp.Process):
|
case mp.Process():
|
||||||
subproc._send_signal(sig)
|
subproc._send_signal(sig)
|
||||||
|
|
||||||
|
|
||||||
def enable_stack_on_sig(
|
def enable_stack_on_sig(
|
||||||
sig: int = SIGUSR1
|
sig: int = SIGUSR1,
|
||||||
) -> None:
|
) -> ModuleType:
|
||||||
'''
|
'''
|
||||||
Enable `stackscope` tracing on reception of a signal; by
|
Enable `stackscope` tracing on reception of a signal; by
|
||||||
default this is SIGUSR1.
|
default this is SIGUSR1.
|
||||||
|
|
||||||
|
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
|
||||||
|
fancy cmds.
|
||||||
|
|
||||||
|
For ex. from `bash` using `pgrep` and cmd-sustitution
|
||||||
|
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
|
||||||
|
you could use:
|
||||||
|
|
||||||
|
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
|
||||||
|
|
||||||
|
Or with with `xonsh` (which has diff capture-from-subproc syntax)
|
||||||
|
|
||||||
|
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
try:
|
||||||
|
import stackscope
|
||||||
|
except ImportError:
|
||||||
|
log.warning(
|
||||||
|
'`stackscope` not installed for use in debug mode!'
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
handler: Callable|int = getsignal(sig)
|
||||||
|
if handler is dump_tree_on_sig:
|
||||||
|
log.devx(
|
||||||
|
'A `SIGUSR1` handler already exists?\n'
|
||||||
|
f'|_ {handler!r}\n'
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
signal(
|
signal(
|
||||||
sig,
|
sig,
|
||||||
signal_handler,
|
dump_tree_on_sig,
|
||||||
)
|
)
|
||||||
# NOTE: not the above can be triggered from
|
log.devx(
|
||||||
# a (xonsh) shell using:
|
'Enabling trace-trees on `SIGUSR1` '
|
||||||
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
|
'since `stackscope` is installed @ \n'
|
||||||
#
|
f'{stackscope!r}\n\n'
|
||||||
# for example if you were looking to trace a `pytest` run
|
f'With `SIGUSR1` handler\n'
|
||||||
# kill -SIGUSR1 @$(pgrep -f 'pytest')
|
f'|_{dump_tree_on_sig}\n'
|
||||||
|
)
|
||||||
|
return stackscope
|
||||||
|
|
|
@ -374,7 +374,7 @@ class PldRx(Struct):
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
src_err = InternalError(
|
src_err = InternalError(
|
||||||
'Unknown IPC msg ??\n\n'
|
'Invalid IPC msg ??\n\n'
|
||||||
f'{msg}\n'
|
f'{msg}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -499,7 +499,7 @@ async def maybe_limit_plds(
|
||||||
yield None
|
yield None
|
||||||
return
|
return
|
||||||
|
|
||||||
# sanity on scoping
|
# sanity check on IPC scoping
|
||||||
curr_ctx: Context = current_ipc_ctx()
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
assert ctx is curr_ctx
|
assert ctx is curr_ctx
|
||||||
|
|
||||||
|
@ -510,6 +510,8 @@ async def maybe_limit_plds(
|
||||||
) as msgdec:
|
) as msgdec:
|
||||||
yield msgdec
|
yield msgdec
|
||||||
|
|
||||||
|
# when the applied spec is unwound/removed, the same IPC-ctx
|
||||||
|
# should still be in scope.
|
||||||
curr_ctx: Context = current_ipc_ctx()
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
assert ctx is curr_ctx
|
assert ctx is curr_ctx
|
||||||
|
|
||||||
|
@ -525,16 +527,26 @@ async def drain_to_final_msg(
|
||||||
list[MsgType]
|
list[MsgType]
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Drain IPC msgs delivered to the underlying IPC primitive's
|
Drain IPC msgs delivered to the underlying IPC context's
|
||||||
rx-mem-chan (eg. `Context._rx_chan`) from the runtime in
|
rx-mem-chan (i.e. from `Context._rx_chan`) in search for a final
|
||||||
search for a final result or error.
|
`Return` or `Error` msg.
|
||||||
|
|
||||||
The motivation here is to ideally capture errors during ctxc
|
Deliver the `Return` + preceding drained msgs (`list[MsgType]`)
|
||||||
conditions where a canc-request/or local error is sent but the
|
as a pair unless an `Error` is found, in which unpack and raise
|
||||||
local task also excepts and enters the
|
it.
|
||||||
`Portal.open_context().__aexit__()` block wherein we prefer to
|
|
||||||
capture and raise any remote error or ctxc-ack as part of the
|
The motivation here is to always capture any remote error relayed
|
||||||
`ctx.result()` cleanup and teardown sequence.
|
by the remote peer task during a ctxc condition.
|
||||||
|
|
||||||
|
For eg. a ctxc-request may be sent to the peer as part of the
|
||||||
|
local task's (request for) cancellation but then that same task
|
||||||
|
**also errors** before executing the teardown in the
|
||||||
|
`Portal.open_context().__aexit__()` block. In such error-on-exit
|
||||||
|
cases we want to always capture and raise any delivered remote
|
||||||
|
error (like an expected ctxc-ACK) as part of the final
|
||||||
|
`ctx.wait_for_result()` teardown sequence such that the
|
||||||
|
`Context.outcome` related state always reflect what transpired
|
||||||
|
even after ctx closure and the `.open_context()` block exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
@ -572,7 +584,6 @@ async def drain_to_final_msg(
|
||||||
# |_from tractor.devx._debug import pause
|
# |_from tractor.devx._debug import pause
|
||||||
# await pause()
|
# await pause()
|
||||||
|
|
||||||
|
|
||||||
# NOTE: we get here if the far end was
|
# NOTE: we get here if the far end was
|
||||||
# `ContextCancelled` in 2 cases:
|
# `ContextCancelled` in 2 cases:
|
||||||
# 1. we requested the cancellation and thus
|
# 1. we requested the cancellation and thus
|
||||||
|
@ -580,13 +591,13 @@ async def drain_to_final_msg(
|
||||||
# 2. WE DID NOT REQUEST that cancel and thus
|
# 2. WE DID NOT REQUEST that cancel and thus
|
||||||
# SHOULD RAISE HERE!
|
# SHOULD RAISE HERE!
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
|
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
# cancellation.
|
# cancellation.
|
||||||
ctx.maybe_raise(
|
ctx.maybe_raise(
|
||||||
# TODO: when use this/
|
hide_tb=hide_tb,
|
||||||
|
# TODO: when use this?
|
||||||
# from_src_exc=taskc,
|
# from_src_exc=taskc,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -659,7 +670,7 @@ async def drain_to_final_msg(
|
||||||
# Stop()
|
# Stop()
|
||||||
case Stop():
|
case Stop():
|
||||||
pre_result_drained.append(msg)
|
pre_result_drained.append(msg)
|
||||||
log.cancel(
|
log.runtime( # normal/expected shutdown transaction
|
||||||
'Remote stream terminated due to "stop" msg:\n\n'
|
'Remote stream terminated due to "stop" msg:\n\n'
|
||||||
f'{pretty_struct.pformat(msg)}\n'
|
f'{pretty_struct.pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
|
@ -719,13 +730,19 @@ async def drain_to_final_msg(
|
||||||
pre_result_drained.append(msg)
|
pre_result_drained.append(msg)
|
||||||
# It's definitely an internal error if any other
|
# It's definitely an internal error if any other
|
||||||
# msg type without a`'cid'` field arrives here!
|
# msg type without a`'cid'` field arrives here!
|
||||||
|
report: str = (
|
||||||
|
f'Invalid or unknown msg type {type(msg)!r}!?\n'
|
||||||
|
)
|
||||||
if not msg.cid:
|
if not msg.cid:
|
||||||
raise InternalError(
|
report += (
|
||||||
'Unexpected cid-missing msg?\n\n'
|
'\nWhich also has no `.cid` field?\n'
|
||||||
f'{msg}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
raise RuntimeError('Unknown msg type: {msg}')
|
raise MessagingError(
|
||||||
|
report
|
||||||
|
+
|
||||||
|
f'\n{msg}\n'
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
|
Loading…
Reference in New Issue