Compare commits

..

No commits in common. "547b957bbf4b3346442352f54ed51a20ff29f79a" and "9be821a5cfa4bf4bb7edd44ec1a14da35fd15bd5" have entirely different histories.

22 changed files with 441 additions and 930 deletions

View File

@ -4,13 +4,6 @@ import time
import trio import trio
import tractor import tractor
# TODO: only import these when not running from test harness?
# can we detect `pexpect` usage maybe?
# from tractor.devx._debug import (
# get_lock,
# get_debug_req,
# )
def sync_pause( def sync_pause(
use_builtin: bool = False, use_builtin: bool = False,
@ -25,13 +18,7 @@ def sync_pause(
breakpoint(hide_tb=hide_tb) breakpoint(hide_tb=hide_tb)
else: else:
# TODO: maybe for testing some kind of cm style interface
# where the `._set_trace()` call doesn't happen until block
# exit?
# assert get_lock().ctx_in_debug is None
# assert get_debug_req().repl is None
tractor.pause_from_sync() tractor.pause_from_sync()
# assert get_debug_req().repl is None
if error: if error:
raise RuntimeError('yoyo sync code error') raise RuntimeError('yoyo sync code error')
@ -54,11 +41,10 @@ async def start_n_sync_pause(
async def main() -> None: async def main() -> None:
async with ( async with (
tractor.open_nursery( tractor.open_nursery(
debug_mode=True, # NOTE: required for pausing from sync funcs
maybe_enable_greenback=True, maybe_enable_greenback=True,
enable_stack_on_sig=True, debug_mode=True,
# loglevel='warning', # loglevel='cancel',
# loglevel='devx',
) as an, ) as an,
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
@ -152,9 +138,7 @@ async def main() -> None:
# the case 2. from above still exists! # the case 2. from above still exists!
use_builtin=True, use_builtin=True,
), ),
# TODO: with this `False` we can hang!??! abandon_on_cancel=False,
# abandon_on_cancel=False,
abandon_on_cancel=True,
thread_name='inline_root_bg_thread', thread_name='inline_root_bg_thread',
) )

View File

@ -9,7 +9,7 @@ async def main(service_name):
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
await an.start_actor(service_name) await an.start_actor(service_name)
async with tractor.get_registry('127.0.0.1', 1616) as portal: async with tractor.get_arbiter('127.0.0.1', 1616) as portal:
print(f"Arbiter is listening on {portal.channel}") print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr: async with tractor.wait_for_actor(service_name) as sockaddr:

View File

View File

@ -1,167 +0,0 @@
'''
`tractor.devx.*` tooling sub-pkg test space.
'''
from typing import (
Callable,
)
import pytest
from pexpect.exceptions import (
TIMEOUT,
)
from tractor._testing import (
mk_cmd,
)
@pytest.fixture
def spawn(
start_method,
testdir: pytest.Testdir,
reg_addr: tuple[str, int],
) -> Callable[[str], None]:
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
'''
if start_method != 'trio':
pytest.skip(
'`pexpect` based tests only supported on `trio` backend'
)
def _spawn(
cmd: str,
**mkcmd_kwargs,
):
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
)
# such that test-dep can pass input script name.
return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
def expect(
child,
# normally a `pdb` prompt by default
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
err_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
if err_on_false:
raise ValueError(
f'Could not find pattern: {part!r} in `before` output?'
)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
# since this is an "assert" helper ;)
err_on_false=True,
**kwargs
)

View File

@ -1,120 +0,0 @@
'''
That "native" runtime-hackin toolset better be dang useful!
Verify the funtion of a variety of "developer-experience" tools we
offer from the `.devx` sub-pkg:
- use of the lovely `stackscope` for dumping actor `trio`-task trees
during operation and hangs.
TODO:
- demonstration of `CallerInfo` call stack frame filtering such that
for logging and REPL purposes a user sees exactly the layers needed
when debugging a problem inside the stack vs. in their app.
'''
import os
import signal
from .conftest import (
expect,
assert_before,
# in_prompt_msg,
)
def test_shield_pause(
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
already cancelled `trio.CancelScope` and that you can step to the
next checkpoint wherein the cancelled will get raised.
'''
child = spawn(
'shield_hang_in_sub'
)
expect(
child,
'Yo my child hanging..?',
)
assert_before(
child,
[
'Entering shield sleep..',
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
]
)
print(
'Sending SIGUSR1 to see a tree-trace!',
)
os.kill(
child.pid,
signal.SIGUSR1,
)
expect(
child,
# end-of-tree delimiter
"------ \('root', ",
)
assert_before(
child,
[
'Trying to dump `stackscope` tree..',
'Dumping `stackscope` tree for actor',
"('root'", # uid line
# parent block point (non-shielded)
'await trio.sleep_forever() # in root',
]
)
# expect(
# child,
# # relay to the sub should be reported
# 'Relaying `SIGUSR1`[10] to sub-actor',
# )
expect(
child,
# end-of-tree delimiter
"------ \('hanger', ",
)
assert_before(
child,
[
# relay to the sub should be reported
'Relaying `SIGUSR1`[10] to sub-actor',
"('hanger'", # uid line
# hanger LOC where it's shield-halted
'await trio.sleep_forever() # in subactor',
]
)
# breakpoint()
# simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since
# the sub is shield blocking and can't respond to SIGINT.
os.kill(
child.pid,
signal.SIGINT,
)
expect(
child,
'Shutting down actor runtime',
timeout=6,
)
assert_before(
child,
[
'raise KeyboardInterrupt',
# 'Shutting down actor runtime',
'#T-800 deployed to collect zombie B0',
"'--uid', \"('hanger',",
]
)

View File

@ -91,8 +91,7 @@ def test_ipc_channel_break_during_stream(
# non-`trio` spawners should never hit the hang condition that # non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree. # requires the user to do ctl-c to cancel the actor tree.
# expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' examples_dir() / 'advanced_faults'
@ -158,7 +157,7 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = KeyboardInterrupt
# NOTE when the parent IPC side dies (even if the child does as well # NOTE when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we always expect the # but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect # IPC layer to raise a closed-resource, NEVER do we expect
# a stop msg since the parent-side ctx apis will error out # a stop msg since the parent-side ctx apis will error out
@ -170,8 +169,7 @@ def test_ipc_channel_break_during_stream(
and and
ipc_break['break_child_ipc_after'] is False ipc_break['break_child_ipc_after'] is False
): ):
# expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
# BOTH but, PARENT breaks FIRST # BOTH but, PARENT breaks FIRST
elif ( elif (
@ -182,8 +180,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
) )
): ):
# expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
with pytest.raises( with pytest.raises(
expected_exception=( expected_exception=(
@ -202,8 +199,8 @@ def test_ipc_channel_break_during_stream(
**ipc_break, **ipc_break,
) )
) )
except KeyboardInterrupt as _kbi: except KeyboardInterrupt as kbi:
kbi = _kbi _err = kbi
if expect_final_exc is not KeyboardInterrupt: if expect_final_exc is not KeyboardInterrupt:
pytest.fail( pytest.fail(
'Rxed unexpected KBI !?\n' 'Rxed unexpected KBI !?\n'
@ -212,21 +209,6 @@ def test_ipc_channel_break_during_stream(
raise raise
except tractor.TransportClosed as _tc:
tc = _tc
if expect_final_exc is KeyboardInterrupt:
pytest.fail(
'Unexpected transport failure !?\n'
f'{repr(tc)}'
)
cause: Exception = tc.__cause__
assert (
type(cause) is trio.ClosedResourceError
and
cause.args[0] == 'another task closed this fd'
)
raise
# get raw instance from pytest wrapper # get raw instance from pytest wrapper
value = excinfo.value value = excinfo.value
if isinstance(value, ExceptionGroup): if isinstance(value, ExceptionGroup):

View File

@ -13,9 +13,11 @@ TODO:
from functools import partial from functools import partial
import itertools import itertools
import platform import platform
import pathlib
import time import time
import pytest import pytest
import pexpect
from pexpect.exceptions import ( from pexpect.exceptions import (
TIMEOUT, TIMEOUT,
EOF, EOF,
@ -26,14 +28,12 @@ from tractor.devx._debug import (
_crash_msg, _crash_msg,
_repl_fail_msg, _repl_fail_msg,
) )
from tractor._testing import (
examples_dir,
)
from conftest import ( from conftest import (
_ci_env, _ci_env,
) )
from .conftest import (
expect,
in_prompt_msg,
assert_before,
)
# TODO: The next great debugger audit could be done by you! # TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an # - recurrent entry to breakpoint() from single actor *after* and an
@ -52,6 +52,15 @@ if platform.system() == 'Windows':
) )
def mk_cmd(ex_name: str) -> str:
'''
Generate a command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
return ' '.join(['python', str(script_path)])
# TODO: was trying to this xfail style but some weird bug i see in CI # TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm # that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin... # thinkin...
@ -70,9 +79,142 @@ has_nested_actors = pytest.mark.has_nested_actors
# ) # )
@pytest.fixture
def spawn(
start_method,
testdir,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':
pytest.skip(
"Debugger tests are only supported on the trio backend"
)
def _spawn(cmd):
return testdir.spawn(
cmd=mk_cmd(cmd),
expect_timeout=3,
)
return _spawn
PROMPT = r"\(Pdb\+\)" PROMPT = r"\(Pdb\+\)"
def expect(
child,
# prompt by default
patt: str = PROMPT,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
**kwargs
)
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -137,7 +279,7 @@ def test_root_actor_bp(spawn, user_in_out):
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(EOF) child.expect(pexpect.EOF)
if expect_err_str is None: if expect_err_str is None:
assert 'Error' not in str(child.before) assert 'Error' not in str(child.before)
@ -157,9 +299,7 @@ def do_ctlc(
# needs some further investigation potentially... # needs some further investigation potentially...
expect_prompt: bool = not _ci_env, expect_prompt: bool = not _ci_env,
) -> str|None: ) -> None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output # make sure ctl-c sends don't do anything but repeat output
for _ in range(count): for _ in range(count):
@ -169,18 +309,15 @@ def do_ctlc(
# TODO: figure out why this makes CI fail.. # TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine.. # if you run this test manually it works just fine..
if expect_prompt: if expect_prompt:
before = str(child.before.decode())
time.sleep(delay) time.sleep(delay)
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay) time.sleep(delay)
if patt: if patt:
# should see the last line on console # should see the last line on console
assert patt in before assert patt in before
# return the console content up to the final prompt
return before
def test_root_actor_bp_forever( def test_root_actor_bp_forever(
spawn, spawn,
@ -221,7 +358,7 @@ def test_root_actor_bp_forever(
# quit out of the loop # quit out of the loop
child.sendline('q') child.sendline('q')
child.expect(EOF) child.expect(pexpect.EOF)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -286,7 +423,7 @@ def test_subactor_error(
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(EOF) child.expect(pexpect.EOF)
def test_subactor_breakpoint( def test_subactor_breakpoint(
@ -349,7 +486,7 @@ def test_subactor_breakpoint(
child.sendline('c') child.sendline('c')
# process should exit # process should exit
child.expect(EOF) child.expect(pexpect.EOF)
before = str(child.before.decode()) before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
@ -492,7 +629,7 @@ def test_multi_subactors(
# process should exit # process should exit
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
# repeat of previous multierror for final output # repeat of previous multierror for final output
assert_before(child, [ assert_before(child, [
@ -632,7 +769,7 @@ def test_multi_daemon_subactors(
) )
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
@has_nested_actors @has_nested_actors
@ -708,7 +845,7 @@ def test_multi_subactors_root_errors(
]) ])
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
assert_before(child, [ assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'", # "Attaching to pdb in crashed actor: ('root'",
@ -838,7 +975,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(3): for i in range(3):
try: try:
child.expect(EOF, timeout=0.5) child.expect(pexpect.EOF, timeout=0.5)
break break
except TIMEOUT: except TIMEOUT:
child.sendline('c') child.sendline('c')
@ -880,7 +1017,7 @@ def test_root_cancels_child_context_during_startup(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
def test_different_debug_mode_per_actor( def test_different_debug_mode_per_actor(
@ -901,7 +1038,7 @@ def test_different_debug_mode_per_actor(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
before = str(child.before.decode()) before = str(child.before.decode())
@ -948,10 +1085,10 @@ def test_pause_from_sync(
) )
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c') child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body # first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT) child.expect(PROMPT)
@ -972,27 +1109,7 @@ def test_pause_from_sync(
) )
if ctlc: if ctlc:
do_ctlc( do_ctlc(child)
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have # one of the bg thread or subactor should have
# `Lock.acquire()`-ed # `Lock.acquire()`-ed
@ -1011,48 +1128,32 @@ def test_pause_from_sync(
"('root'", "('root'",
], ],
} }
conts: int = 0 # for debugging below matching logic on failure
while attach_patts: while attach_patts:
child.sendline('c') child.sendline('c')
conts += 1
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
for key in attach_patts: for key in attach_patts.copy():
if key in before: if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key) expected_patts: str = attach_patts.pop(key)
assert_before( assert_before(
child, child,
[_pause_msg] [_pause_msg] + expected_patts
+
expected_patts
) )
break break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL # ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above. # at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items(): for key, other_patts in attach_patts.items():
assert not in_prompt_msg( assert not in_prompt_msg(
before, before,
other_patts, other_patts,
) )
if ctlc: if ctlc:
do_ctlc( do_ctlc(child)
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
def test_post_mortem_api( def test_post_mortem_api(
@ -1157,7 +1258,7 @@ def test_post_mortem_api(
# ) # )
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
def test_shield_pause( def test_shield_pause(
@ -1232,7 +1333,7 @@ def test_shield_pause(
] ]
) )
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(pexpect.EOF)
# TODO: better error for "non-ideal" usage from the root actor. # TODO: better error for "non-ideal" usage from the root actor.

View File

@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
portal = await n.start_actor('actor', enable_modules=[__name__]) portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid uid = portal.channel.uid
async with tractor.get_registry(*reg_addr) as aportal: async with tractor.get_arbiter(*reg_addr) as aportal:
# this local actor should be the arbiter # this local actor should be the arbiter
assert actor is aportal.actor assert actor is aportal.actor
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
async with tractor.get_registry(*reg_addr) as portal: async with tractor.get_arbiter(*reg_addr) as portal:
# runtime needs to be up to call this # runtime needs to be up to call this
actor = tractor.current_actor() actor = tractor.current_actor()
@ -298,7 +298,7 @@ async def close_chans_before_nursery(
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
async with tractor.get_registry(*reg_addr) as aportal: async with tractor.get_arbiter(*reg_addr) as aportal:
try: try:
get_reg = partial(unpack_reg, aportal) get_reg = partial(unpack_reg, aportal)

View File

@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal." "Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_arbiter
async with tractor.get_registry(*reg_addr) as portal: async with tractor.get_arbiter(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal) assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2): with trio.fail_after(0.2):

View File

@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
@tractor_test @tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr): async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter assert not tractor.current_actor().is_arbiter
async with tractor.get_registry(*reg_addr) as portal: async with tractor.get_arbiter(*reg_addr) as portal:
await portal.cancel_actor() await portal.cancel_actor()
time.sleep(0.1) time.sleep(0.1)
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
# no arbiter socket should exist # no arbiter socket should exist
with pytest.raises(OSError): with pytest.raises(OSError):
async with tractor.get_registry(*reg_addr) as portal: async with tractor.get_arbiter(*reg_addr) as portal:
pass pass

View File

@ -30,7 +30,7 @@ from ._streaming import (
stream as stream, stream as stream,
) )
from ._discovery import ( from ._discovery import (
get_registry as get_registry, get_arbiter as get_arbiter,
find_actor as find_actor, find_actor as find_actor,
wait_for_actor as wait_for_actor, wait_for_actor as wait_for_actor,
query_actor as query_actor, query_actor as query_actor,

View File

@ -2376,9 +2376,8 @@ async def open_context_from_portal(
and ctx.cancel_acked and ctx.cancel_acked
): ):
log.cancel( log.cancel(
f'Context cancelled by local {ctx.side!r}-side task\n' f'Context cancelled by {ctx.side!r}-side task\n'
f'c)>\n' f'|_{ctx._task}\n\n'
f' |_{ctx._task}\n\n'
f'{repr(scope_err)}\n' f'{repr(scope_err)}\n'
) )
@ -2394,10 +2393,8 @@ async def open_context_from_portal(
# type_only=True, # type_only=True,
) )
log.cancel( log.cancel(
f'Context terminated due to {ctx.side!r}-side\n\n' f'Context terminated due to local {ctx.side!r}-side error:\n\n'
# TODO: do an x)> on err and c)> only for ctxc? f'{ctx.chan.uid} => {outcome_str}\n'
f'c)> {outcome_str}\n'
f' |_{ctx.repr_rpc}\n'
) )
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and

View File

@ -26,8 +26,8 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import warnings
from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
from ._portal import ( from ._portal import (
@ -40,13 +40,11 @@ from ._state import (
_runtime_vars, _runtime_vars,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger(__name__)
@acm @acm
async def get_registry( async def get_registry(
host: str, host: str,
@ -58,12 +56,14 @@ async def get_registry(
]: ]:
''' '''
Return a portal instance connected to a local or remote Return a portal instance connected to a local or remote
registry-service actor; if a connection already exists re-use it arbiter.
(presumably to call a `.register_actor()` registry runtime RPC
ep).
''' '''
actor: Actor = current_actor() actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_registrar: if actor.is_registrar:
# we're already the arbiter # we're already the arbiter
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
@ -72,8 +72,6 @@ async def get_registry(
Channel((host, port)) Channel((host, port))
) )
else: else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
async with ( async with (
_connect_chan(host, port) as chan, _connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl, open_portal(chan) as regstr_ptl,
@ -82,6 +80,19 @@ async def get_registry(
# TODO: deprecate and this remove _arbiter form!
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -99,53 +110,22 @@ async def get_root(
yield portal yield portal
def get_peer_by_name(
name: str,
# uuid: str|None = None,
) -> list[Channel]|None: # at least 1
'''
Scan for an existing connection (set) to a named actor
and return any channels from `Actor._peers`.
This is an optimization method over querying the registrar for
the same info.
'''
actor: Actor = current_actor()
to_scan: dict[tuple, list[Channel]] = actor._peers.copy()
pchan: Channel|None = actor._parent_chan
if pchan:
to_scan[pchan.uid].append(pchan)
for aid, chans in to_scan.items():
_, peer_name = aid
if name == peer_name:
if not chans:
log.warning(
'No IPC chans for matching peer {peer_name}\n'
)
continue
return chans
return None
@acm @acm
async def query_actor( async def query_actor(
name: str, name: str,
regaddr: tuple[str, int]|None = None, arbiter_sockaddr: tuple[str, int] | None = None,
regaddr: tuple[str, int] | None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
tuple[str, int]|None, tuple[str, int] | None,
None, None,
]: ]:
''' '''
Lookup a transport address (by actor name) via querying a registrar Make a transport address lookup for an actor name to a specific
listening @ `regaddr`. registrar.
Returns the transport protocol (socket) address or `None` if no Returns the (socket) address or ``None`` if no entry under that
entry under that name exists. name exists for the given registrar listening @ `regaddr`.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
@ -157,10 +137,14 @@ async def query_actor(
'The current actor IS the registry!?' 'The current actor IS the registry!?'
) )
maybe_peers: list[Channel]|None = get_peer_by_name(name) if arbiter_sockaddr is not None:
if maybe_peers: warnings.warn(
yield maybe_peers[0].raddr '`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
return 'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr
reg_portal: Portal reg_portal: Portal
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
@ -175,28 +159,10 @@ async def query_actor(
yield sockaddr yield sockaddr
@acm
async def maybe_open_portal(
addr: tuple[str, int],
name: str,
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
pass
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int]|None = None,
registry_addrs: list[tuple[str, int]]|None = None, registry_addrs: list[tuple[str, int]]|None = None,
only_first: bool = True, only_first: bool = True,
@ -213,12 +179,29 @@ async def find_actor(
known to the arbiter. known to the arbiter.
''' '''
# optimization path, use any pre-existing peer channel if arbiter_sockaddr is not None:
maybe_peers: list[Channel]|None = get_peer_by_name(name) warnings.warn(
if maybe_peers and only_first: '`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
async with open_portal(maybe_peers[0]) as peer_portal: 'Use `registry_addrs: list[tuple]` instead!',
yield peer_portal DeprecationWarning,
return stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
if not registry_addrs: if not registry_addrs:
# XXX NOTE: make sure to dynamically read the value on # XXX NOTE: make sure to dynamically read the value on
@ -234,13 +217,10 @@ async def find_actor(
maybe_portals: list[ maybe_portals: list[
AsyncContextManager[tuple[str, int]] AsyncContextManager[tuple[str, int]]
] = list( ] = list(
maybe_open_portal( maybe_open_portal_from_reg_addr(addr)
addr=addr,
name=name,
)
for addr in registry_addrs for addr in registry_addrs
) )
portals: list[Portal]
async with gather_contexts( async with gather_contexts(
mngrs=maybe_portals, mngrs=maybe_portals,
) as portals: ) as portals:
@ -274,31 +254,31 @@ async def find_actor(
@acm @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None, registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
''' '''
Wait on at least one peer actor to register `name` with the Wait on an actor to register with the arbiter.
registrar, yield a `Portal to the first registree.
A portal to the first registered actor is returned.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
# optimization path, use any pre-existing peer channel if arbiter_sockaddr is not None:
maybe_peers: list[Channel]|None = get_peer_by_name(name) warnings.warn(
if maybe_peers: '`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
async with open_portal(maybe_peers[0]) as peer_portal: 'Use `registry_addr: tuple` instead!',
yield peer_portal DeprecationWarning,
return stacklevel=2,
)
registry_addr: tuple[str, int] = arbiter_sockaddr
regaddr: tuple[str, int] = (
registry_addr
or
actor.reg_addrs[0]
)
# TODO: use `.trionics.gather_contexts()` like # TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well? # above in `find_actor()` as well?
reg_portal: Portal reg_portal: Portal
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal: async with get_registry(*regaddr) as reg_portal:
sockaddrs = await reg_portal.run_from_ns( sockaddrs = await reg_portal.run_from_ns(
'self', 'self',

View File

@ -243,7 +243,6 @@ def _trio_main(
nest_from_op( nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective input_op=')>', # like a "closed-to-play"-icon from super perspective
tree_str=actor_info, tree_str=actor_info,
back_from_op=1,
) )
) )
try: try:

View File

@ -263,11 +263,11 @@ class Portal:
return False return False
reminfo: str = ( reminfo: str = (
f'c)=> {self.channel.uid}\n' f'Portal.cancel_actor() => {self.channel.uid}\n'
f' |_{chan}\n' f'|_{chan}\n'
) )
log.cancel( log.cancel(
f'Requesting actor-runtime cancel for peer\n\n' f'Requesting runtime cancel for peer\n\n'
f'{reminfo}' f'{reminfo}'
) )

View File

@ -80,7 +80,7 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False, enable_stack_on_sig: bool = False,
# internal logging # internal logging
@ -233,8 +233,14 @@ async def open_root_actor(
and and
enable_stack_on_sig enable_stack_on_sig
): ):
from .devx._stackscope import enable_stack_on_sig try:
enable_stack_on_sig() logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = [] ponged_addrs: list[tuple[str, int]] = []

View File

@ -115,26 +115,25 @@ class Actor:
''' '''
The fundamental "runtime" concurrency primitive. The fundamental "runtime" concurrency primitive.
An "actor" is the combination of a regular Python process An *actor* is the combination of a regular Python process executing
executing a `trio.run()` task tree, communicating with other a ``trio`` task tree, communicating with other actors through
"actors" through "memory boundary portals": `Portal`, which "memory boundary portals" - which provide a native async API around
provide a high-level async API around IPC "channels" (`Channel`) IPC transport "channels" which themselves encapsulate various
which themselves encapsulate various (swappable) network (swappable) network protocols.
transport protocols for sending msgs between said memory domains
(processes, hosts, non-GIL threads).
Each "actor" is `trio.run()` scheduled "runtime" composed of many
concurrent tasks in a single thread. The "runtime" tasks conduct Each "actor" is ``trio.run()`` scheduled "runtime" composed of
a slew of low(er) level functions to make it possible for message many concurrent tasks in a single thread. The "runtime" tasks
passing between actors as well as the ability to create new conduct a slew of low(er) level functions to make it possible
actors (aka new "runtimes" in new processes which are supervised for message passing between actors as well as the ability to
via an "actor-nursery" construct). Each task which sends messages create new actors (aka new "runtimes" in new processes which
to a task in a "peer" actor (not necessarily a parent-child, are supervised via a nursery construct). Each task which sends
messages to a task in a "peer" (not necessarily a parent-child,
depth hierarchy) is able to do so via an "address", which maps depth hierarchy) is able to do so via an "address", which maps
IPC connections across memory boundaries, and a task request id IPC connections across memory boundaries, and a task request id
which allows for per-actor tasks to send and receive messages to which allows for per-actor tasks to send and receive messages
specific peer-actor tasks with which there is an ongoing RPC/IPC to specific peer-actor tasks with which there is an ongoing
dialog. RPC/IPC dialog.
''' '''
# ugh, we need to get rid of this and replace with a "registry" sys # ugh, we need to get rid of this and replace with a "registry" sys
@ -231,20 +230,17 @@ class Actor:
# by the user (currently called the "arbiter") # by the user (currently called the "arbiter")
self._spawn_method: str = spawn_method self._spawn_method: str = spawn_method
self._peers: defaultdict[ self._peers: defaultdict = defaultdict(list)
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
self._peer_connected: dict[tuple[str, str], trio.Event] = {} self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._no_more_peers.set() self._no_more_peers.set()
# RPC state
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set() self._ongoing_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], # (chan, cid) tuple[Channel, str],
tuple[Context, Callable, trio.Event] # (ctx=>, fn(), done?) tuple[Context, Callable, trio.Event]
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -321,10 +317,7 @@ class Actor:
event = self._peer_connected.setdefault(uid, trio.Event()) event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait() await event.wait()
log.debug(f'{uid!r} successfully connected back to us') log.debug(f'{uid!r} successfully connected back to us')
return ( return event, self._peers[uid][-1]
event,
self._peers[uid][-1],
)
def load_modules( def load_modules(
self, self,
@ -415,11 +408,26 @@ class Actor:
''' '''
self._no_more_peers = trio.Event() # unset by making new self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
con_status: str = ( their_uid: tuple[str, str]|None = chan.uid
'New inbound IPC connection <=\n'
f'|_{chan}\n'
)
con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
if their_uid:
con_status = (
'IPC Re-connection from already known peer?\n'
)
else:
con_status = (
'New inbound IPC connection <=\n'
)
con_status += (
f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
)
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid: tuple|None = await self._do_handshake(chan) uid: tuple|None = await self._do_handshake(chan)
@ -431,10 +439,10 @@ class Actor:
TransportClosed, TransportClosed,
): ):
# XXX: This may propagate up from `Channel._aiter_recv()` # XXX: This may propagate up from ``Channel._aiter_recv()``
# and `MsgpackStream._inter_packets()` on a read from the # and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up # stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for # inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be # a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place. # because the handshake was never meant took place.
log.runtime( log.runtime(
@ -444,22 +452,9 @@ class Actor:
) )
return return
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += ( con_status += (
f' -> Handshake with {familiar} `{uid_short}` complete\n' f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
) )
if _pre_chan:
log.warning(
# con_status += (
# ^TODO^ swap once we minimize conn duplication
f' -> Wait, we already have IPC with `{uid_short}`??\n'
f' |_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children: # IPC connection tracking for both peers and new children:
# - if this is a new channel to a locally spawned # - if this is a new channel to a locally spawned
# sub-actor there will be a spawn wait even registered # sub-actor there will be a spawn wait even registered
@ -512,9 +507,8 @@ class Actor:
) )
except trio.Cancelled: except trio.Cancelled:
log.cancel( log.cancel(
'IPC transport msg loop was cancelled\n' 'IPC transport msg loop was cancelled for \n'
f'c)>\n' f'|_{chan}\n'
f' |_{chan}\n'
) )
raise raise
@ -551,9 +545,9 @@ class Actor:
): ):
log.cancel( log.cancel(
'Waiting on cancel request to peer..\n' 'Waiting on cancel request to peer\n'
f'c)=>\n' f'c)=>\n'
f' |_{chan.uid}\n' f' |_{chan.uid}\n'
) )
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
@ -652,14 +646,10 @@ class Actor:
): ):
report: str = ( report: str = (
'Timed out waiting on local actor-nursery to exit?\n' 'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n' f'{local_nursery}\n'
f' |_{local_nursery}\n'
) )
if children := local_nursery._children: if children := local_nursery._children:
# indent from above local-nurse repr report += f' |_{pformat(children)}\n'
report += (
f' |_{pformat(children)}\n'
)
log.warning(report) log.warning(report)
@ -1246,9 +1236,8 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and # TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual.. # other) repr fields instead of doing this all manual..
msg: str = ( msg: str = (
f'Actor-runtime cancel request from {requester_type}\n\n' f'Runtime cancel request from {requester_type}:\n\n'
f'<=c) {requesting_uid}\n' f'<= .cancel(): {requesting_uid}\n\n'
f' |_{self}\n'
) )
# TODO: what happens here when we self-cancel tho? # TODO: what happens here when we self-cancel tho?
@ -1358,7 +1347,7 @@ class Actor:
log.cancel( log.cancel(
'Rxed cancel request for RPC task\n' 'Rxed cancel request for RPC task\n'
f'<=c) {requesting_uid}\n' f'<=c) {requesting_uid}\n'
f' |_{ctx._task}\n' f' |_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n' f' >> {ctx.repr_rpc}\n'
# f'=> {ctx._task}\n' # f'=> {ctx._task}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n'
@ -1476,17 +1465,17 @@ class Actor:
"IPC channel's " "IPC channel's "
) )
rent_chan_repr: str = ( rent_chan_repr: str = (
f' |_{parent_chan}\n\n' f' |_{parent_chan}\n\n'
if parent_chan if parent_chan
else '' else ''
) )
log.cancel( log.cancel(
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_uid} [canceller]\n' f'<= canceller: {req_uid}\n'
f'{rent_chan_repr}' f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n' f'=> cancellee: {self.uid}\n'
f' |_{self} [with {len(tasks)} tasks]\n' f' |_{self}.cancel_rpc_tasks()\n'
# f' |_tasks: {len(tasks)}\n' f' |_tasks: {len(tasks)}\n'
# f'{tasks_str}' # f'{tasks_str}'
) )
for ( for (
@ -1555,7 +1544,7 @@ class Actor:
def accept_addr(self) -> tuple[str, int]: def accept_addr(self) -> tuple[str, int]:
''' '''
Primary address to which the IPC transport server is Primary address to which the IPC transport server is
bound and listening for new connections. bound.
''' '''
# throws OSError on failure # throws OSError on failure
@ -1572,7 +1561,6 @@ class Actor:
def get_chans( def get_chans(
self, self,
uid: tuple[str, str], uid: tuple[str, str],
) -> list[Channel]: ) -> list[Channel]:
''' '''
Return all IPC channels to the actor with provided `uid`. Return all IPC channels to the actor with provided `uid`.
@ -1944,15 +1932,9 @@ async def async_main(
with CancelScope(shield=True): with CancelScope(shield=True):
await actor._no_more_peers.wait() await actor._no_more_peers.wait()
teardown_report += ( teardown_report += ('-> All peer channels are complete\n')
'-> All peer channels are complete\n'
)
teardown_report += ( teardown_report += ('Actor runtime exited')
'Actor runtime exiting\n'
f'>)\n'
f'|_{actor}\n'
)
log.info(teardown_report) log.info(teardown_report)

View File

@ -54,25 +54,6 @@ def examples_dir() -> pathlib.Path:
return repodir() / 'examples' return repodir() / 'examples'
def mk_cmd(
ex_name: str,
exs_subpath: str = 'debugging',
) -> str:
'''
Generate a shell command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = (
examples_dir()
/ exs_subpath
/ f'{ex_name}.py'
)
return ' '.join([
'python',
str(script_path)
])
@acm @acm
async def expect_ctxc( async def expect_ctxc(
yay: bool, yay: bool,

View File

@ -26,7 +26,7 @@ from ._debug import (
breakpoint as breakpoint, breakpoint as breakpoint,
pause as pause, pause as pause,
pause_from_sync as pause_from_sync, pause_from_sync as pause_from_sync,
sigint_shield as sigint_shield, shield_sigint_handler as shield_sigint_handler,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback, maybe_init_greenback as maybe_init_greenback,

View File

@ -409,9 +409,9 @@ class Lock:
repl_task repl_task
) )
message += ( message += (
f'A non-caller task still owns this lock on behalf of ' f'\nA non-caller task still owns this lock on behalf of '
f'`{behalf_of_task}`\n' f'{behalf_of_task}\n'
f'lock owner task: {lock_stats.owner}\n' f'|_{lock_stats.owner}\n'
) )
if ( if (
@ -523,10 +523,6 @@ class Lock:
) )
def get_lock() -> Lock:
return Lock
@tractor.context( @tractor.context(
# enable the locking msgspec # enable the locking msgspec
pld_spec=__pld_spec__, pld_spec=__pld_spec__,
@ -792,13 +788,13 @@ class DebugStatus:
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync( cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
signal.signal, signal.signal,
signal.SIGINT, signal.SIGINT,
sigint_shield, shield_sigint_handler,
) )
else: else:
cls._orig_sigint_handler = signal.signal( cls._orig_sigint_handler = signal.signal(
signal.SIGINT, signal.SIGINT,
sigint_shield, shield_sigint_handler,
) )
@classmethod @classmethod
@ -904,30 +900,12 @@ class DebugStatus:
# actor-local state, irrelevant for non-root. # actor-local state, irrelevant for non-root.
cls.repl_task = None cls.repl_task = None
# XXX WARNING needs very special caughtion, and we should
# prolly make a more explicit `@property` API?
#
# - if unset in root multi-threaded case can cause
# issues with detecting that some root thread is
# using a REPL,
#
# - what benefit is there to unsetting, it's always
# set again for the next task in some actor..
# only thing would be to avoid in the sigint-handler
# logging when we don't need to?
cls.repl = None cls.repl = None
# restore original sigint handler # restore original sigint handler
cls.unshield_sigint() cls.unshield_sigint()
# TODO: use the new `@lowlevel.singleton` for this!
def get_debug_req() -> DebugStatus|None:
return DebugStatus
class TractorConfig(pdbp.DefaultConfig): class TractorConfig(pdbp.DefaultConfig):
''' '''
Custom `pdbp` config which tries to use the best tradeoff Custom `pdbp` config which tries to use the best tradeoff
@ -1333,7 +1311,7 @@ def any_connected_locker_child() -> bool:
return False return False
def sigint_shield( def shield_sigint_handler(
signum: int, signum: int,
frame: 'frame', # type: ignore # noqa frame: 'frame', # type: ignore # noqa
*args, *args,
@ -1373,17 +1351,13 @@ def sigint_shield(
# root actor branch that reports whether or not a child # root actor branch that reports whether or not a child
# has locked debugger. # has locked debugger.
if is_root_process(): if is_root_process():
# log.warning(
log.devx(
'Handling SIGINT in root actor\n'
f'{Lock.repr()}'
f'{DebugStatus.repr()}\n'
)
# try to see if the supposed (sub)actor in debug still # try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not # has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger # it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally. # and we should propagate SIGINT normally.
any_connected: bool = any_connected_locker_child() any_connected: bool = any_connected_locker_child()
# if not any_connected:
# return do_cancel()
problem = ( problem = (
f'root {actor.uid} handling SIGINT\n' f'root {actor.uid} handling SIGINT\n'
@ -1432,25 +1406,19 @@ def sigint_shield(
# an actor using the `Lock` (a bug state) ?? # an actor using the `Lock` (a bug state) ??
# => so immediately cancel any stale lock cs and revert # => so immediately cancel any stale lock cs and revert
# the handler! # the handler!
if not DebugStatus.repl: if not repl:
# TODO: WHEN should we revert back to ``trio`` # TODO: WHEN should we revert back to ``trio``
# handler if this one is stale? # handler if this one is stale?
# -[ ] maybe after a counts work of ctl-c mashes? # -[ ] maybe after a counts work of ctl-c mashes?
# -[ ] use a state var like `stale_handler: bool`? # -[ ] use a state var like `stale_handler: bool`?
problem += ( problem += (
'\n'
'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n' 'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n'
'BUT, the root should be using it, WHY this handler ??\n\n' 'BUT, the root should be using it, WHY this handler ??\n'
'So either..\n'
'- some root-thread is using it but has no `.repl` set?, OR\n'
'- something else weird is going on outside the runtime!?\n'
) )
else: else:
# NOTE: since we emit this msg on ctl-c, we should
# also always re-print the prompt the tail block!
log.pdb( log.pdb(
'Ignoring SIGINT while pdb REPL in use by root actor..\n' 'Ignoring SIGINT while pdb REPL in use by root actor..\n'
f'{DebugStatus.repl_task}\n'
f' |_{repl}\n'
) )
problem = None problem = None
@ -1500,6 +1468,7 @@ def sigint_shield(
'Allowing SIGINT propagation..' 'Allowing SIGINT propagation..'
) )
DebugStatus.unshield_sigint() DebugStatus.unshield_sigint()
# do_cancel()
repl_task: str|None = DebugStatus.repl_task repl_task: str|None = DebugStatus.repl_task
req_task: str|None = DebugStatus.req_task req_task: str|None = DebugStatus.req_task
@ -1514,15 +1483,10 @@ def sigint_shield(
f' |_{repl}\n' f' |_{repl}\n'
) )
elif req_task: elif req_task:
log.debug( log.pdb(
'Ignoring SIGINT while debug request task is open but either,\n' f'Ignoring SIGINT while debug request task is open\n'
'- someone else is already REPL-in and has the `Lock`, or\n'
'- some other local task already is replin?\n'
f'|_{req_task}\n' f'|_{req_task}\n'
) )
# TODO can we remove this now?
# -[ ] does this path ever get hit any more?
else: else:
msg: str = ( msg: str = (
'SIGINT shield handler still active BUT, \n\n' 'SIGINT shield handler still active BUT, \n\n'
@ -1558,53 +1522,37 @@ def sigint_shield(
# https://github.com/goodboy/tractor/issues/320 # https://github.com/goodboy/tractor/issues/320
# elif debug_mode(): # elif debug_mode():
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it looks to be that the last command that was run (eg. ll)
# will be repeated by default.
# maybe redraw/print last REPL output to console since # maybe redraw/print last REPL output to console since
# we want to alert the user that more input is expect since # we want to alert the user that more input is expect since
# nothing has been done dur to ignoring sigint. # nothing has been done dur to ignoring sigint.
if ( if (
DebugStatus.repl # only when current actor has a REPL engaged repl # only when current actor has a REPL engaged
): ):
flush_status: str = (
'Flushing stdout to ensure new prompt line!\n'
)
# XXX: yah, mega hack, but how else do we catch this madness XD # XXX: yah, mega hack, but how else do we catch this madness XD
if ( if repl.shname == 'xonsh':
repl.shname == 'xonsh'
):
flush_status += (
'-> ALSO re-flushing due to `xonsh`..\n'
)
repl.stdout.write(repl.prompt) repl.stdout.write(repl.prompt)
# log.warning(
log.devx(
flush_status
)
repl.stdout.flush() repl.stdout.flush()
# TODO: better console UX to match the current "mode": # TODO: make this work like sticky mode where if there is output
# -[ ] for example if in sticky mode where if there is output # detected as written to the tty we redraw this part underneath
# detected as written to the tty we redraw this part underneath # and erase the past draw of this same bit above?
# and erase the past draw of this same bit above?
# repl.sticky = True # repl.sticky = True
# repl._print_if_sticky() # repl._print_if_sticky()
# also see these links for an approach from `ptk`: # also see these links for an approach from ``ptk``:
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
else:
log.devx(
# log.warning(
'Not flushing stdout since not needed?\n'
f'|_{repl}\n'
)
# XXX only for tracing this handler # XXX only for tracing this handler
log.devx('exiting SIGINT') log.devx('exiting SIGINT')
_pause_msg: str = 'Opening a pdb REPL in paused actor' _pause_msg: str = 'Attaching to pdb REPL in actor'
class DebugRequestError(RuntimeError): class DebugRequestError(RuntimeError):
@ -1669,7 +1617,7 @@ async def _pause(
# 'directly (infected) `asyncio` tasks!' # 'directly (infected) `asyncio` tasks!'
# ) from rte # ) from rte
raise rte raise
if debug_func is not None: if debug_func is not None:
debug_func = partial(debug_func) debug_func = partial(debug_func)
@ -1677,13 +1625,9 @@ async def _pause(
# XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug # XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug
# request from a subactor BEFORE the REPL is entered by that # request from a subactor BEFORE the REPL is entered by that
# process. # process.
if ( if not repl:
not repl
and
debug_func
):
repl: PdbREPL = mk_pdb()
DebugStatus.shield_sigint() DebugStatus.shield_sigint()
repl: PdbREPL = repl or mk_pdb()
# TODO: move this into a `open_debug_request()` @acm? # TODO: move this into a `open_debug_request()` @acm?
# -[ ] prolly makes the most sense to do the request # -[ ] prolly makes the most sense to do the request
@ -1718,13 +1662,7 @@ async def _pause(
# recurrent entries/requests from the same # recurrent entries/requests from the same
# actor-local task. # actor-local task.
DebugStatus.repl_task = task DebugStatus.repl_task = task
if repl: DebugStatus.repl = repl
DebugStatus.repl = repl
else:
log.error(
'No REPl instance set before entering `debug_func`?\n'
f'{debug_func}\n'
)
# invoke the low-level REPL activation routine which itself # invoke the low-level REPL activation routine which itself
# should call into a `Pdb.set_trace()` of some sort. # should call into a `Pdb.set_trace()` of some sort.
@ -2063,7 +2001,7 @@ async def _pause(
DebugStatus.release(cancel_req_task=True) DebugStatus.release(cancel_req_task=True)
# sanity checks for ^ on request/status teardown # sanity checks for ^ on request/status teardown
# assert DebugStatus.repl is None # XXX no more bc bg thread cases? assert DebugStatus.repl is None
assert DebugStatus.repl_task is None assert DebugStatus.repl_task is None
# sanity, for when hackin on all this? # sanity, for when hackin on all this?
@ -2112,8 +2050,9 @@ def _set_trace(
# root here? Bo # root here? Bo
log.pdb( log.pdb(
f'{_pause_msg}\n' f'{_pause_msg}\n'
# '|\n'
f'>(\n' f'>(\n'
f'|_ {task} @ {actor.uid}\n' f' |_ {task} @ {actor.uid}\n'
# ^-TODO-^ more compact pformating? # ^-TODO-^ more compact pformating?
# -[ ] make an `Actor.__repr()__` # -[ ] make an `Actor.__repr()__`
# -[ ] should we use `log.pformat_task_uid()`? # -[ ] should we use `log.pformat_task_uid()`?
@ -2302,12 +2241,7 @@ async def _pause_from_bg_root_thread(
'Trying to acquire `Lock` on behalf of bg thread\n' 'Trying to acquire `Lock` on behalf of bg thread\n'
f'|_{behalf_of_thread}\n' f'|_{behalf_of_thread}\n'
) )
# DebugStatus.repl_task = behalf_of_thread
# NOTE: this is already a task inside the main-`trio`-thread, so
# we don't need to worry about calling it another time from the
# bg thread on which who's behalf this task is operating.
DebugStatus.shield_sigint()
out = await _pause( out = await _pause(
debug_func=None, debug_func=None,
repl=repl, repl=repl,
@ -2316,8 +2250,6 @@ async def _pause_from_bg_root_thread(
called_from_bg_thread=True, called_from_bg_thread=True,
**_pause_kwargs **_pause_kwargs
) )
DebugStatus.repl_task = behalf_of_thread
lock: trio.FIFOLock = Lock._debug_lock lock: trio.FIFOLock = Lock._debug_lock
stats: trio.LockStatistics= lock.statistics() stats: trio.LockStatistics= lock.statistics()
assert stats.owner is task assert stats.owner is task
@ -2351,6 +2283,7 @@ async def _pause_from_bg_root_thread(
f'|_{behalf_of_thread}\n' f'|_{behalf_of_thread}\n'
) )
task_status.started(out) task_status.started(out)
DebugStatus.shield_sigint()
# wait for bg thread to exit REPL sesh. # wait for bg thread to exit REPL sesh.
try: try:
@ -2391,7 +2324,7 @@ def pause_from_sync(
err_on_no_runtime=False, err_on_no_runtime=False,
) )
message: str = ( message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n' f'{actor.uid} task called `tractor.pause_from_sync()`\n\n'
) )
if not actor: if not actor:
raise RuntimeError( raise RuntimeError(
@ -2415,6 +2348,7 @@ def pause_from_sync(
'for infected `asyncio` mode!' 'for infected `asyncio` mode!'
) )
DebugStatus.shield_sigint()
repl: PdbREPL = mk_pdb() repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n' # message += f'-> created local REPL {repl}\n'
@ -2432,10 +2366,6 @@ def pause_from_sync(
# thread which will call `._pause()` manually with special # thread which will call `._pause()` manually with special
# handling for root-actor caller usage. # handling for root-actor caller usage.
if not DebugStatus.is_main_trio_thread(): if not DebugStatus.is_main_trio_thread():
# TODO: `threading.Lock()` this so we don't get races in
# multi-thr cases where they're acquiring/releasing the
# REPL and setting request/`Lock` state, etc..
thread: threading.Thread = threading.current_thread() thread: threading.Thread = threading.current_thread()
repl_owner = thread repl_owner = thread
@ -2443,16 +2373,9 @@ def pause_from_sync(
if is_root: if is_root:
message += ( message += (
f'-> called from a root-actor bg {thread}\n' f'-> called from a root-actor bg {thread}\n'
f'-> scheduling `._pause_from_bg_root_thread()`..\n' f'-> scheduling `._pause_from_sync_thread()`..\n'
) )
# XXX SUBTLE BADNESS XXX that should really change! bg_task, repl = trio.from_thread.run(
# don't over-write the `repl` here since when
# this behalf-of-bg_thread-task calls pause it will
# pass `debug_func=None` which will result in it
# returing a `repl==None` output and that get's also
# `.started(out)` back here! So instead just ignore
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
afn=partial( afn=partial(
actor._service_n.start, actor._service_n.start,
partial( partial(
@ -2464,9 +2387,8 @@ def pause_from_sync(
), ),
) )
) )
DebugStatus.shield_sigint()
message += ( message += (
f'-> `._pause_from_bg_root_thread()` started bg task {bg_task}\n' f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n'
) )
else: else:
message += f'-> called from a bg {thread}\n' message += f'-> called from a bg {thread}\n'
@ -2475,7 +2397,7 @@ def pause_from_sync(
# `request_root_stdio_lock()` and we don't need to # `request_root_stdio_lock()` and we don't need to
# worry about all the special considerations as with # worry about all the special considerations as with
# the root-actor per above. # the root-actor per above.
bg_task, _ = trio.from_thread.run( bg_task, repl = trio.from_thread.run(
afn=partial( afn=partial(
_pause, _pause,
debug_func=None, debug_func=None,
@ -2490,9 +2412,6 @@ def pause_from_sync(
**_pause_kwargs **_pause_kwargs
), ),
) )
# ?TODO? XXX where do we NEED to call this in the
# subactor-bg-thread case?
DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task assert bg_task is not DebugStatus.repl_task
else: # we are presumably the `trio.run()` + main thread else: # we are presumably the `trio.run()` + main thread
@ -2505,11 +2424,6 @@ def pause_from_sync(
# greenback: ModuleType = await maybe_init_greenback() # greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n' message += f'-> imported {greenback}\n'
# NOTE XXX seems to need to be set BEFORE the `_pause()`
# invoke using gb below?
DebugStatus.shield_sigint()
repl_owner: Task = current_task() repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try: try:
@ -2535,12 +2449,9 @@ def pause_from_sync(
raise raise
if out: if out:
bg_task, _ = out bg_task, repl = out
else: assert repl is repl
bg_task: Task = current_task() assert bg_task is repl_owner
# assert repl is repl
assert bg_task is repl_owner
# NOTE: normally set inside `_enter_repl_sync()` # NOTE: normally set inside `_enter_repl_sync()`
DebugStatus.repl_task: str = repl_owner DebugStatus.repl_task: str = repl_owner
@ -2554,10 +2465,7 @@ def pause_from_sync(
) )
log.devx(message) log.devx(message)
# NOTE set as late as possible to avoid state clobbering
# in the multi-threaded case!
DebugStatus.repl = repl DebugStatus.repl = repl
_set_trace( _set_trace(
api_frame=api_frame or inspect.currentframe(), api_frame=api_frame or inspect.currentframe(),
repl=repl, repl=repl,
@ -2615,7 +2523,7 @@ async def breakpoint(
_crash_msg: str = ( _crash_msg: str = (
'Opening a pdb REPL in crashed actor' 'Attaching to pdb REPL in crashed actor'
) )
@ -2643,9 +2551,11 @@ def _post_mortem(
# here! Bo # here! Bo
log.pdb( log.pdb(
f'{_crash_msg}\n' f'{_crash_msg}\n'
# '|\n'
f'x>(\n' f'x>(\n'
f' |_ {current_task()} @ {actor.uid}\n' f' |_ {current_task()} @ {actor.uid}\n'
# f'|_ @{actor.uid}\n'
# TODO: make an `Actor.__repr()__` # TODO: make an `Actor.__repr()__`
# f'|_ {current_task()} @ {actor.name}\n' # f'|_ {current_task()} @ {actor.name}\n'
) )
@ -2758,8 +2668,7 @@ async def acquire_debug_lock(
tuple, tuple,
]: ]:
''' '''
Request to acquire the TTY `Lock` in the root actor, release on Request to acquire the TTY `Lock` in the root actor, release on exit.
exit.
This helper is for actor's who don't actually need to acquired This helper is for actor's who don't actually need to acquired
the debugger but want to wait until the lock is free in the the debugger but want to wait until the lock is free in the
@ -2771,14 +2680,10 @@ async def acquire_debug_lock(
yield None yield None
return return
task: Task = current_task()
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
ctx: Context = await n.start( ctx: Context = await n.start(
partial( request_root_stdio_lock,
request_root_stdio_lock, subactor_uid,
actor_uid=subactor_uid,
task_uid=(task.name, id(task)),
)
) )
yield ctx yield ctx
ctx.cancel() ctx.cancel()

View File

@ -24,24 +24,13 @@ disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations from __future__ import annotations
# from functools import partial
from threading import (
current_thread,
Thread,
RLock,
)
import multiprocessing as mp import multiprocessing as mp
from signal import ( from signal import (
signal, signal,
getsignal,
SIGUSR1, SIGUSR1,
) )
# import traceback import traceback
from types import ModuleType from typing import TYPE_CHECKING
from typing import (
Callable,
TYPE_CHECKING,
)
import trio import trio
from tractor import ( from tractor import (
@ -62,45 +51,26 @@ if TYPE_CHECKING:
@trio.lowlevel.disable_ki_protection @trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None: def dump_task_tree() -> None:
'''
Do a classic `stackscope.extract()` task-tree dump to console at
`.devx()` level.
'''
import stackscope import stackscope
from tractor.log import get_console_log
tree_str: str = str( tree_str: str = str(
stackscope.extract( stackscope.extract(
trio.lowlevel.current_root_task(), trio.lowlevel.current_root_task(),
recurse_child_tasks=True recurse_child_tasks=True
) )
) )
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
thr: Thread = current_thread()
log.devx( log.devx(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree for actor\n'
f'{actor.uid}:\n' f'{actor.name}: {actor}\n'
f'|_{mp.current_process()}\n' f' |_{mp.current_process()}\n\n'
f' |_{thr}\n'
f' |_{actor}\n\n'
# start-of-trace-tree delimiter (mostly for testing)
'------ - ------\n'
'\n'
+
f'{tree_str}\n' f'{tree_str}\n'
+
# end-of-trace-tree delimiter (mostly for testing)
f'\n'
f'------ {actor.uid!r} ------\n'
) )
# TODO: can remove this right?
# -[ ] was original code from author
#
# print(
# 'DUMPING FROM PRINT\n'
# +
# content
# )
# import logging # import logging
# try: # try:
# with open("/dev/tty", "w") as tty: # with open("/dev/tty", "w") as tty:
@ -110,130 +80,58 @@ def dump_task_tree() -> None:
# "task_tree" # "task_tree"
# ).exception("Error printing task tree") # ).exception("Error printing task tree")
_handler_lock = RLock()
_tree_dumped: bool = False
def signal_handler(
def dump_tree_on_sig(
sig: int, sig: int,
frame: object, frame: object,
relay_to_subs: bool = True, relay_to_subs: bool = True,
) -> None: ) -> None:
global _tree_dumped, _handler_lock try:
with _handler_lock: trio.lowlevel.current_trio_token(
if _tree_dumped: ).run_sync_soon(dump_task_tree)
log.warning( except RuntimeError:
'Already dumped for this actor...??' # not in async context -- print a normal traceback
) traceback.print_stack()
return
_tree_dumped = True
# actor: Actor = _state.current_actor()
log.devx(
'Trying to dump `stackscope` tree..\n'
)
try:
dump_task_tree()
# await actor._service_n.start_soon(
# partial(
# trio.to_thread.run_sync,
# dump_task_tree,
# )
# )
# trio.lowlevel.current_trio_token().run_sync_soon(
# dump_task_tree
# )
except RuntimeError:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
# not in async context -- print a normal traceback
# traceback.print_stack()
raise
except BaseException:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
raise
log.devx(
'Supposedly we dumped just fine..?'
)
if not relay_to_subs: if not relay_to_subs:
return return
an: ActorNursery an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values(): for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType subproc: ProcessType
subactor: Actor subactor: Actor
for subactor, subproc, _ in an._children.values(): for subactor, subproc, _ in an._children.values():
log.warning( log.devx(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n' f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n' f'{subactor}\n'
f' |_{subproc}\n' f' |_{subproc}\n'
) )
# bc of course stdlib can't have a std API.. XD if isinstance(subproc, trio.Process):
match subproc: subproc.send_signal(sig)
case trio.Process():
subproc.send_signal(sig)
case mp.Process(): elif isinstance(subproc, mp.Process):
subproc._send_signal(sig) subproc._send_signal(sig)
def enable_stack_on_sig( def enable_stack_on_sig(
sig: int = SIGUSR1, sig: int = SIGUSR1
) -> ModuleType: ) -> None:
''' '''
Enable `stackscope` tracing on reception of a signal; by Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1. default this is SIGUSR1.
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
fancy cmds.
For ex. from `bash` using `pgrep` and cmd-sustitution
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use:
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
Or with with `xonsh` (which has diff capture-from-subproc syntax)
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
''' '''
try:
import stackscope
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
return None
handler: Callable|int = getsignal(sig)
if handler is dump_tree_on_sig:
log.devx(
'A `SIGUSR1` handler already exists?\n'
f'|_ {handler!r}\n'
)
return
signal( signal(
sig, sig,
dump_tree_on_sig, signal_handler,
) )
log.devx( # NOTE: not the above can be triggered from
'Enabling trace-trees on `SIGUSR1` ' # a (xonsh) shell using:
'since `stackscope` is installed @ \n' # kill -SIGUSR1 @$(pgrep -f '<cmd>')
f'{stackscope!r}\n\n' #
f'With `SIGUSR1` handler\n' # for example if you were looking to trace a `pytest` run
f'|_{dump_tree_on_sig}\n' # kill -SIGUSR1 @$(pgrep -f 'pytest')
)
return stackscope

View File

@ -374,7 +374,7 @@ class PldRx(Struct):
case _: case _:
src_err = InternalError( src_err = InternalError(
'Invalid IPC msg ??\n\n' 'Unknown IPC msg ??\n\n'
f'{msg}\n' f'{msg}\n'
) )
@ -499,7 +499,7 @@ async def maybe_limit_plds(
yield None yield None
return return
# sanity check on IPC scoping # sanity on scoping
curr_ctx: Context = current_ipc_ctx() curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx assert ctx is curr_ctx
@ -510,8 +510,6 @@ async def maybe_limit_plds(
) as msgdec: ) as msgdec:
yield msgdec yield msgdec
# when the applied spec is unwound/removed, the same IPC-ctx
# should still be in scope.
curr_ctx: Context = current_ipc_ctx() curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx assert ctx is curr_ctx
@ -527,26 +525,16 @@ async def drain_to_final_msg(
list[MsgType] list[MsgType]
]: ]:
''' '''
Drain IPC msgs delivered to the underlying IPC context's Drain IPC msgs delivered to the underlying IPC primitive's
rx-mem-chan (i.e. from `Context._rx_chan`) in search for a final rx-mem-chan (eg. `Context._rx_chan`) from the runtime in
`Return` or `Error` msg. search for a final result or error.
Deliver the `Return` + preceding drained msgs (`list[MsgType]`) The motivation here is to ideally capture errors during ctxc
as a pair unless an `Error` is found, in which unpack and raise conditions where a canc-request/or local error is sent but the
it. local task also excepts and enters the
`Portal.open_context().__aexit__()` block wherein we prefer to
The motivation here is to always capture any remote error relayed capture and raise any remote error or ctxc-ack as part of the
by the remote peer task during a ctxc condition. `ctx.result()` cleanup and teardown sequence.
For eg. a ctxc-request may be sent to the peer as part of the
local task's (request for) cancellation but then that same task
**also errors** before executing the teardown in the
`Portal.open_context().__aexit__()` block. In such error-on-exit
cases we want to always capture and raise any delivered remote
error (like an expected ctxc-ACK) as part of the final
`ctx.wait_for_result()` teardown sequence such that the
`Context.outcome` related state always reflect what transpired
even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -584,6 +572,7 @@ async def drain_to_final_msg(
# |_from tractor.devx._debug import pause # |_from tractor.devx._debug import pause
# await pause() # await pause()
# NOTE: we get here if the far end was # NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases: # `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus # 1. we requested the cancellation and thus
@ -591,13 +580,13 @@ async def drain_to_final_msg(
# 2. WE DID NOT REQUEST that cancel and thus # 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE! # SHOULD RAISE HERE!
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
# cancellation. # cancellation.
ctx.maybe_raise( ctx.maybe_raise(
hide_tb=hide_tb, # TODO: when use this/
# TODO: when use this?
# from_src_exc=taskc, # from_src_exc=taskc,
) )
@ -670,7 +659,7 @@ async def drain_to_final_msg(
# Stop() # Stop()
case Stop(): case Stop():
pre_result_drained.append(msg) pre_result_drained.append(msg)
log.runtime( # normal/expected shutdown transaction log.cancel(
'Remote stream terminated due to "stop" msg:\n\n' 'Remote stream terminated due to "stop" msg:\n\n'
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
@ -730,19 +719,13 @@ async def drain_to_final_msg(
pre_result_drained.append(msg) pre_result_drained.append(msg)
# It's definitely an internal error if any other # It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here! # msg type without a`'cid'` field arrives here!
report: str = (
f'Invalid or unknown msg type {type(msg)!r}!?\n'
)
if not msg.cid: if not msg.cid:
report += ( raise InternalError(
'\nWhich also has no `.cid` field?\n' 'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
) )
raise MessagingError( raise RuntimeError('Unknown msg type: {msg}')
report
+
f'\n{msg}\n'
)
else: else:
log.cancel( log.cancel(