Compare commits

..

9 Commits

Author SHA1 Message Date
Tyler Goodlet 547b957bbf Officially test proto-ed `stackscope` integration
By re-purposing our `pexpect`-based console matching with a new
`debugging/shield_hang_in_sub.py` example, this tests a few "hanging
actor" conditions more formally:

- that despite a hanging actor's task we can dump
  a `stackscope.extract()` tree on relay of `SIGUSR1`.
- the actor tree will terminate despite a shielded forever-sleep by our
  "T-800" zombie reaper machinery activating and hard killing the
  underlying subprocess.

Some test deats:
- simulates the expect actions of a real user by manually using
  `os.kill()` to send both signals to the actor-tree program.
- `pexpect`-matches against `log.devx()` emissions under normal
  `debug_mode == True` usage.
- ensure we get the actual "T-800 deployed" `log.error()` msg and
  that the actor tree eventually terminates!

Surrounding (re-org/impl/test-suite) changes:
- allow disabling usage via a `maybe_enable_greenback: bool` to
  `open_root_actor()` but enable by def.
- pretty up the actual `.devx()` content from `.devx._stackscope`
  including be extra pedantic about the conc-primitives for each signal
  event.
- try to avoid double handles of `SIGUSR1` even though it seems the
  original (what i thought was a) problem was actually just double
  logging in the handler..
  |_ avoid double applying the handler func via `signal.signal()`,
  |_ use a global to avoid double handle func calls and,
  |_ a `threading.RLock` around handling.
- move common fixtures and helper routines from `test_debugger` to
  `tests/devx/conftest.py` and import them for use in both test mods.
2024-07-10 18:17:42 -04:00
Tyler Goodlet d216068713 Start a new `tests/devx/` tooling-subsuite-pkg 2024-07-10 15:52:38 -04:00
Tyler Goodlet 131e3e8157 Move `mk_cmd()` to `._testing`
Since we're going to need it more generally for `.devx` sub-sys tooling
tests.

Also, up the sync-pause ctl-c delay another 10ms..
2024-07-10 15:40:44 -04:00
Tyler Goodlet fc95c6719f Get multi-threaded sync-pausing fully workin!
The final issue was making sure we do the same thing on ctl-c/SIGINT
from the user. That is, if there's already a bg-thread in REPL, we
`log.pdb()` about SIGINT shielding and re-draw the prompt; the same UX
as normal actor-runtime-task behaviour.

Reasons this wasn't workin.. and the fix:
- `.pause_from_sync()` was overriding the local `repl` var with `None`
  delivered by (transitive) calls to `_pause(debug_func=None)`.. so
  remove all that and only assign it OAOO prior to thread-type case
  branching.
- always call `DebugStatus.shield_sigint()` as needed from all requesting
  threads/tasks:
  - in `_pause_from_bg_root_thread()` BEFORE calling `._pause()` AND BEFORE
    yielding back to the bg-thread via `.started(out)` to ensure we're
    definitely overriding the handler in the `trio`-main-thread task
    before unblocking the requesting bg-thread.
  - from any requesting bg-thread in the root actor such that both its
    main-`trio`-thread scheduled task (as per above bullet) AND it are
    SIGINT shielded.
  - always call `.shield_sigint()` BEFORE any `greenback._await()` case
    don't entirely grok why yet, but it works)?
  - for `greenback._await()` case always set `bg_task` to the current one..
- tweaks to the `SIGINT` handler, now renamed `sigint_shield()` so as
  not to name-collide with the methods when editor-searching:
  - always try to `repr()` the REPL thread/task "owner" as well as the
    active `PdbREPL` instance.
  - add `.devx()` notes around the prompt flushing deats and comments
    for any root-actor-bg-thread edge cases.

Related/supporting refinements:
- add `get_lock()`/`get_debug_req()` factory funcs since the plan is to
  eventually implement both as `@singleton` instances per actor.
- fix `acquire_debug_lock()`'s call-sig-bug for scheduling
  `request_root_stdio_lock()`..
- in `._pause()` only call `mk_pdb()` when `debug_func != None`.
- add some todo/warning notes around the `cls.repl = None` in
  `DebugStatus.release()`

`test_pause_from_sync()` tweaks:
- don't use a `attach_patts.copy()`, since we always `break` on match.
- do `pytest.fail()` on that ^ loop's fallthrough..
- pass `do_ctlc(child, patt=attach_key)` such that we always match the
  the current thread's name with the ctl-c triggered `.pdb()` emission.
- oh yeah, return the last `before: str` from `do_ctlc()`.
- in the script, flip `abandon_on_cancel=True` since when `False` it
  seems to cause `trio.run()` to hang on exit from the last bg-thread
  case?!?
2024-07-10 12:29:05 -04:00
Tyler Goodlet bef3dd9e97 Another tweak to REPL entry `.pdb()` headers 2024-07-05 13:32:03 -04:00
Tyler Goodlet e6ccfce751 Adjusts advanced fault tests to match new `TransportClosed` semantics 2024-07-05 13:31:29 -04:00
Tyler Goodlet 31207f92ee Finally implement peer-lookup optimization..
There's a been a todo for soo long for this XD

Since all `Actor`'s store a set of `._peers` we can try a lookup on that
table as a shortcut before pinging the registry Bo

Impl deats:
- add a new `._discovery.get_peer_by_name()` routine which attempts the
  `._peers` lookup by combining a copy of that `dict` + an entry added
  for `Actor._parent_chan` (since all subs have a parent and often the
  desired contact is just that connection).
- change `.find_actor()` (for the `only_first == True` case),
  `.query_actor()` and `.wait_for_actor()` to call the new helper and
  deliver appropriate outputs if possible.

Other,
- deprecate `get_arbiter()` def and all usage in tests and examples.
- drop lingering use of `arbiter_sockaddr` arg to various routines.
- tweak the `Actor` doc str as well as some code fmting and a tweak to
  the `._stream_handler()`'s initial `con_status: str` logging value
  since the way it was could never be reached.. oh and `.warning()` on
  any new connections which already have a `_pre_chan: Channel` entry in
  `._peers` so we can start minimizing IPC duplications.
2024-07-04 19:40:11 -04:00
Tyler Goodlet 5f8f8e98ba More-n-more scops annots in logging 2024-07-04 15:06:15 -04:00
Tyler Goodlet b56352b0e4 Quieter `Stop` handling on ctx result capture
In the `drain_to_final_msg()` impl, since a stream terminating
gracefully requires this msg, there's really no reason to `log.cancel()`
about it; go `.runtime()` level instead since we're trying de-noise
under "normal operation".

Also,
- passthrough `hide_tb` to taskc-handler's `ctx.maybe_raise()` call.
- raise `MessagingError` for the `MsgType` unmatched `case _:`.
- detail the doc string motivation a little more.
2024-07-03 22:42:32 -04:00
22 changed files with 929 additions and 440 deletions

View File

@ -4,6 +4,13 @@ import time
import trio import trio
import tractor import tractor
# TODO: only import these when not running from test harness?
# can we detect `pexpect` usage maybe?
# from tractor.devx._debug import (
# get_lock,
# get_debug_req,
# )
def sync_pause( def sync_pause(
use_builtin: bool = False, use_builtin: bool = False,
@ -18,7 +25,13 @@ def sync_pause(
breakpoint(hide_tb=hide_tb) breakpoint(hide_tb=hide_tb)
else: else:
# TODO: maybe for testing some kind of cm style interface
# where the `._set_trace()` call doesn't happen until block
# exit?
# assert get_lock().ctx_in_debug is None
# assert get_debug_req().repl is None
tractor.pause_from_sync() tractor.pause_from_sync()
# assert get_debug_req().repl is None
if error: if error:
raise RuntimeError('yoyo sync code error') raise RuntimeError('yoyo sync code error')
@ -41,10 +54,11 @@ async def start_n_sync_pause(
async def main() -> None: async def main() -> None:
async with ( async with (
tractor.open_nursery( tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True, debug_mode=True,
# loglevel='cancel', maybe_enable_greenback=True,
enable_stack_on_sig=True,
# loglevel='warning',
# loglevel='devx',
) as an, ) as an,
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
@ -138,7 +152,9 @@ async def main() -> None:
# the case 2. from above still exists! # the case 2. from above still exists!
use_builtin=True, use_builtin=True,
), ),
abandon_on_cancel=False, # TODO: with this `False` we can hang!??!
# abandon_on_cancel=False,
abandon_on_cancel=True,
thread_name='inline_root_bg_thread', thread_name='inline_root_bg_thread',
) )

View File

@ -9,7 +9,7 @@ async def main(service_name):
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
await an.start_actor(service_name) await an.start_actor(service_name)
async with tractor.get_arbiter('127.0.0.1', 1616) as portal: async with tractor.get_registry('127.0.0.1', 1616) as portal:
print(f"Arbiter is listening on {portal.channel}") print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr: async with tractor.wait_for_actor(service_name) as sockaddr:

View File

View File

@ -0,0 +1,167 @@
'''
`tractor.devx.*` tooling sub-pkg test space.
'''
from typing import (
Callable,
)
import pytest
from pexpect.exceptions import (
TIMEOUT,
)
from tractor._testing import (
mk_cmd,
)
@pytest.fixture
def spawn(
start_method,
testdir: pytest.Testdir,
reg_addr: tuple[str, int],
) -> Callable[[str], None]:
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
'''
if start_method != 'trio':
pytest.skip(
'`pexpect` based tests only supported on `trio` backend'
)
def _spawn(
cmd: str,
**mkcmd_kwargs,
):
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
)
# such that test-dep can pass input script name.
return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
def expect(
child,
# normally a `pdb` prompt by default
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
err_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
if err_on_false:
raise ValueError(
f'Could not find pattern: {part!r} in `before` output?'
)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
# since this is an "assert" helper ;)
err_on_false=True,
**kwargs
)

View File

@ -13,11 +13,9 @@ TODO:
from functools import partial from functools import partial
import itertools import itertools
import platform import platform
import pathlib
import time import time
import pytest import pytest
import pexpect
from pexpect.exceptions import ( from pexpect.exceptions import (
TIMEOUT, TIMEOUT,
EOF, EOF,
@ -28,12 +26,14 @@ from tractor.devx._debug import (
_crash_msg, _crash_msg,
_repl_fail_msg, _repl_fail_msg,
) )
from tractor._testing import (
examples_dir,
)
from conftest import ( from conftest import (
_ci_env, _ci_env,
) )
from .conftest import (
expect,
in_prompt_msg,
assert_before,
)
# TODO: The next great debugger audit could be done by you! # TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an # - recurrent entry to breakpoint() from single actor *after* and an
@ -52,15 +52,6 @@ if platform.system() == 'Windows':
) )
def mk_cmd(ex_name: str) -> str:
'''
Generate a command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
return ' '.join(['python', str(script_path)])
# TODO: was trying to this xfail style but some weird bug i see in CI # TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm # that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin... # thinkin...
@ -79,142 +70,9 @@ has_nested_actors = pytest.mark.has_nested_actors
# ) # )
@pytest.fixture
def spawn(
start_method,
testdir,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':
pytest.skip(
"Debugger tests are only supported on the trio backend"
)
def _spawn(cmd):
return testdir.spawn(
cmd=mk_cmd(cmd),
expect_timeout=3,
)
return _spawn
PROMPT = r"\(Pdb\+\)" PROMPT = r"\(Pdb\+\)"
def expect(
child,
# prompt by default
patt: str = PROMPT,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
**kwargs
)
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -279,7 +137,7 @@ def test_root_actor_bp(spawn, user_in_out):
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
if expect_err_str is None: if expect_err_str is None:
assert 'Error' not in str(child.before) assert 'Error' not in str(child.before)
@ -299,7 +157,9 @@ def do_ctlc(
# needs some further investigation potentially... # needs some further investigation potentially...
expect_prompt: bool = not _ci_env, expect_prompt: bool = not _ci_env,
) -> None: ) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output # make sure ctl-c sends don't do anything but repeat output
for _ in range(count): for _ in range(count):
@ -309,15 +169,18 @@ def do_ctlc(
# TODO: figure out why this makes CI fail.. # TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine.. # if you run this test manually it works just fine..
if expect_prompt: if expect_prompt:
before = str(child.before.decode())
time.sleep(delay) time.sleep(delay)
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay) time.sleep(delay)
if patt: if patt:
# should see the last line on console # should see the last line on console
assert patt in before assert patt in before
# return the console content up to the final prompt
return before
def test_root_actor_bp_forever( def test_root_actor_bp_forever(
spawn, spawn,
@ -358,7 +221,7 @@ def test_root_actor_bp_forever(
# quit out of the loop # quit out of the loop
child.sendline('q') child.sendline('q')
child.expect(pexpect.EOF) child.expect(EOF)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -423,7 +286,7 @@ def test_subactor_error(
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
def test_subactor_breakpoint( def test_subactor_breakpoint(
@ -486,7 +349,7 @@ def test_subactor_breakpoint(
child.sendline('c') child.sendline('c')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
before = str(child.before.decode()) before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
@ -629,7 +492,7 @@ def test_multi_subactors(
# process should exit # process should exit
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
# repeat of previous multierror for final output # repeat of previous multierror for final output
assert_before(child, [ assert_before(child, [
@ -769,7 +632,7 @@ def test_multi_daemon_subactors(
) )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
@has_nested_actors @has_nested_actors
@ -845,7 +708,7 @@ def test_multi_subactors_root_errors(
]) ])
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
assert_before(child, [ assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'", # "Attaching to pdb in crashed actor: ('root'",
@ -975,7 +838,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(3): for i in range(3):
try: try:
child.expect(pexpect.EOF, timeout=0.5) child.expect(EOF, timeout=0.5)
break break
except TIMEOUT: except TIMEOUT:
child.sendline('c') child.sendline('c')
@ -1017,7 +880,7 @@ def test_root_cancels_child_context_during_startup(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_different_debug_mode_per_actor( def test_different_debug_mode_per_actor(
@ -1038,7 +901,7 @@ def test_different_debug_mode_per_actor(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
before = str(child.before.decode()) before = str(child.before.decode())
@ -1085,10 +948,10 @@ def test_pause_from_sync(
) )
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c') child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body # first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT) child.expect(PROMPT)
@ -1109,7 +972,27 @@ def test_pause_from_sync(
) )
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have # one of the bg thread or subactor should have
# `Lock.acquire()`-ed # `Lock.acquire()`-ed
@ -1128,32 +1011,48 @@ def test_pause_from_sync(
"('root'", "('root'",
], ],
} }
conts: int = 0 # for debugging below matching logic on failure
while attach_patts: while attach_patts:
child.sendline('c') child.sendline('c')
conts += 1
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
for key in attach_patts.copy(): for key in attach_patts:
if key in before: if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key) expected_patts: str = attach_patts.pop(key)
assert_before( assert_before(
child, child,
[_pause_msg] + expected_patts [_pause_msg]
+
expected_patts
) )
break break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL # ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above. # at the same time as the one that was detected above.
for key, other_patts in attach_patts.items(): for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg( assert not in_prompt_msg(
before, before,
other_patts, other_patts,
) )
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_post_mortem_api( def test_post_mortem_api(
@ -1258,7 +1157,7 @@ def test_post_mortem_api(
# ) # )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_shield_pause( def test_shield_pause(
@ -1333,7 +1232,7 @@ def test_shield_pause(
] ]
) )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
# TODO: better error for "non-ideal" usage from the root actor. # TODO: better error for "non-ideal" usage from the root actor.

View File

@ -0,0 +1,120 @@
'''
That "native" runtime-hackin toolset better be dang useful!
Verify the funtion of a variety of "developer-experience" tools we
offer from the `.devx` sub-pkg:
- use of the lovely `stackscope` for dumping actor `trio`-task trees
during operation and hangs.
TODO:
- demonstration of `CallerInfo` call stack frame filtering such that
for logging and REPL purposes a user sees exactly the layers needed
when debugging a problem inside the stack vs. in their app.
'''
import os
import signal
from .conftest import (
expect,
assert_before,
# in_prompt_msg,
)
def test_shield_pause(
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
already cancelled `trio.CancelScope` and that you can step to the
next checkpoint wherein the cancelled will get raised.
'''
child = spawn(
'shield_hang_in_sub'
)
expect(
child,
'Yo my child hanging..?',
)
assert_before(
child,
[
'Entering shield sleep..',
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
]
)
print(
'Sending SIGUSR1 to see a tree-trace!',
)
os.kill(
child.pid,
signal.SIGUSR1,
)
expect(
child,
# end-of-tree delimiter
"------ \('root', ",
)
assert_before(
child,
[
'Trying to dump `stackscope` tree..',
'Dumping `stackscope` tree for actor',
"('root'", # uid line
# parent block point (non-shielded)
'await trio.sleep_forever() # in root',
]
)
# expect(
# child,
# # relay to the sub should be reported
# 'Relaying `SIGUSR1`[10] to sub-actor',
# )
expect(
child,
# end-of-tree delimiter
"------ \('hanger', ",
)
assert_before(
child,
[
# relay to the sub should be reported
'Relaying `SIGUSR1`[10] to sub-actor',
"('hanger'", # uid line
# hanger LOC where it's shield-halted
'await trio.sleep_forever() # in subactor',
]
)
# breakpoint()
# simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since
# the sub is shield blocking and can't respond to SIGINT.
os.kill(
child.pid,
signal.SIGINT,
)
expect(
child,
'Shutting down actor runtime',
timeout=6,
)
assert_before(
child,
[
'raise KeyboardInterrupt',
# 'Shutting down actor runtime',
'#T-800 deployed to collect zombie B0',
"'--uid', \"('hanger',",
]
)

View File

@ -91,7 +91,8 @@ def test_ipc_channel_break_during_stream(
# non-`trio` spawners should never hit the hang condition that # non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree. # requires the user to do ctl-c to cancel the actor tree.
expect_final_exc = trio.ClosedResourceError # expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' examples_dir() / 'advanced_faults'
@ -157,7 +158,7 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = KeyboardInterrupt
# NOTE when the parent IPC side dies (even if the child's does as well # NOTE when the parent IPC side dies (even if the child does as well
# but the child fails BEFORE the parent) we always expect the # but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect # IPC layer to raise a closed-resource, NEVER do we expect
# a stop msg since the parent-side ctx apis will error out # a stop msg since the parent-side ctx apis will error out
@ -169,7 +170,8 @@ def test_ipc_channel_break_during_stream(
and and
ipc_break['break_child_ipc_after'] is False ipc_break['break_child_ipc_after'] is False
): ):
expect_final_exc = trio.ClosedResourceError # expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
# BOTH but, PARENT breaks FIRST # BOTH but, PARENT breaks FIRST
elif ( elif (
@ -180,7 +182,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
) )
): ):
expect_final_exc = trio.ClosedResourceError # expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
with pytest.raises( with pytest.raises(
expected_exception=( expected_exception=(
@ -199,8 +202,8 @@ def test_ipc_channel_break_during_stream(
**ipc_break, **ipc_break,
) )
) )
except KeyboardInterrupt as kbi: except KeyboardInterrupt as _kbi:
_err = kbi kbi = _kbi
if expect_final_exc is not KeyboardInterrupt: if expect_final_exc is not KeyboardInterrupt:
pytest.fail( pytest.fail(
'Rxed unexpected KBI !?\n' 'Rxed unexpected KBI !?\n'
@ -209,6 +212,21 @@ def test_ipc_channel_break_during_stream(
raise raise
except tractor.TransportClosed as _tc:
tc = _tc
if expect_final_exc is KeyboardInterrupt:
pytest.fail(
'Unexpected transport failure !?\n'
f'{repr(tc)}'
)
cause: Exception = tc.__cause__
assert (
type(cause) is trio.ClosedResourceError
and
cause.args[0] == 'another task closed this fd'
)
raise
# get raw instance from pytest wrapper # get raw instance from pytest wrapper
value = excinfo.value value = excinfo.value
if isinstance(value, ExceptionGroup): if isinstance(value, ExceptionGroup):

View File

@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
portal = await n.start_actor('actor', enable_modules=[__name__]) portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid uid = portal.channel.uid
async with tractor.get_arbiter(*reg_addr) as aportal: async with tractor.get_registry(*reg_addr) as aportal:
# this local actor should be the arbiter # this local actor should be the arbiter
assert actor is aportal.actor assert actor is aportal.actor
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
# runtime needs to be up to call this # runtime needs to be up to call this
actor = tractor.current_actor() actor = tractor.current_actor()
@ -298,7 +298,7 @@ async def close_chans_before_nursery(
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
async with tractor.get_arbiter(*reg_addr) as aportal: async with tractor.get_registry(*reg_addr) as aportal:
try: try:
get_reg = partial(unpack_reg, aportal) get_reg = partial(unpack_reg, aportal)

View File

@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal." "Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_arbiter
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal) assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2): with trio.fail_after(0.2):

View File

@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
@tractor_test @tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr): async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter assert not tractor.current_actor().is_arbiter
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
await portal.cancel_actor() await portal.cancel_actor()
time.sleep(0.1) time.sleep(0.1)
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
# no arbiter socket should exist # no arbiter socket should exist
with pytest.raises(OSError): with pytest.raises(OSError):
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
pass pass

View File

@ -30,7 +30,7 @@ from ._streaming import (
stream as stream, stream as stream,
) )
from ._discovery import ( from ._discovery import (
get_arbiter as get_arbiter, get_registry as get_registry,
find_actor as find_actor, find_actor as find_actor,
wait_for_actor as wait_for_actor, wait_for_actor as wait_for_actor,
query_actor as query_actor, query_actor as query_actor,

View File

@ -2376,8 +2376,9 @@ async def open_context_from_portal(
and ctx.cancel_acked and ctx.cancel_acked
): ):
log.cancel( log.cancel(
f'Context cancelled by {ctx.side!r}-side task\n' f'Context cancelled by local {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n' f'c)>\n'
f' |_{ctx._task}\n\n'
f'{repr(scope_err)}\n' f'{repr(scope_err)}\n'
) )
@ -2393,8 +2394,10 @@ async def open_context_from_portal(
# type_only=True, # type_only=True,
) )
log.cancel( log.cancel(
f'Context terminated due to local {ctx.side!r}-side error:\n\n' f'Context terminated due to {ctx.side!r}-side\n\n'
f'{ctx.chan.uid} => {outcome_str}\n' # TODO: do an x)> on err and c)> only for ctxc?
f'c)> {outcome_str}\n'
f' |_{ctx.repr_rpc}\n'
) )
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and

View File

@ -26,8 +26,8 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import warnings
from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
from ._portal import ( from ._portal import (
@ -40,11 +40,13 @@ from ._state import (
_runtime_vars, _runtime_vars,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger(__name__)
@acm @acm
async def get_registry( async def get_registry(
host: str, host: str,
@ -56,14 +58,12 @@ async def get_registry(
]: ]:
''' '''
Return a portal instance connected to a local or remote Return a portal instance connected to a local or remote
arbiter. registry-service actor; if a connection already exists re-use it
(presumably to call a `.register_actor()` registry runtime RPC
ep).
''' '''
actor = current_actor() actor: Actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_registrar: if actor.is_registrar:
# we're already the arbiter # we're already the arbiter
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
@ -72,6 +72,8 @@ async def get_registry(
Channel((host, port)) Channel((host, port))
) )
else: else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
async with ( async with (
_connect_chan(host, port) as chan, _connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl, open_portal(chan) as regstr_ptl,
@ -80,19 +82,6 @@ async def get_registry(
# TODO: deprecate and this remove _arbiter form!
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -110,22 +99,53 @@ async def get_root(
yield portal yield portal
def get_peer_by_name(
name: str,
# uuid: str|None = None,
) -> list[Channel]|None: # at least 1
'''
Scan for an existing connection (set) to a named actor
and return any channels from `Actor._peers`.
This is an optimization method over querying the registrar for
the same info.
'''
actor: Actor = current_actor()
to_scan: dict[tuple, list[Channel]] = actor._peers.copy()
pchan: Channel|None = actor._parent_chan
if pchan:
to_scan[pchan.uid].append(pchan)
for aid, chans in to_scan.items():
_, peer_name = aid
if name == peer_name:
if not chans:
log.warning(
'No IPC chans for matching peer {peer_name}\n'
)
continue
return chans
return None
@acm @acm
async def query_actor( async def query_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None, regaddr: tuple[str, int]|None = None,
regaddr: tuple[str, int] | None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
tuple[str, int] | None, tuple[str, int]|None,
None, None,
]: ]:
''' '''
Make a transport address lookup for an actor name to a specific Lookup a transport address (by actor name) via querying a registrar
registrar. listening @ `regaddr`.
Returns the (socket) address or ``None`` if no entry under that Returns the transport protocol (socket) address or `None` if no
name exists for the given registrar listening @ `regaddr`. entry under that name exists.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
@ -137,14 +157,10 @@ async def query_actor(
'The current actor IS the registry!?' 'The current actor IS the registry!?'
) )
if arbiter_sockaddr is not None: maybe_peers: list[Channel]|None = get_peer_by_name(name)
warnings.warn( if maybe_peers:
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n' yield maybe_peers[0].raddr
'Use `registry_addrs: list[tuple]` instead!', return
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr
reg_portal: Portal reg_portal: Portal
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
@ -159,10 +175,28 @@ async def query_actor(
yield sockaddr yield sockaddr
@acm
async def maybe_open_portal(
addr: tuple[str, int],
name: str,
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
pass
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int]|None = None,
registry_addrs: list[tuple[str, int]]|None = None, registry_addrs: list[tuple[str, int]]|None = None,
only_first: bool = True, only_first: bool = True,
@ -179,29 +213,12 @@ async def find_actor(
known to the arbiter. known to the arbiter.
''' '''
if arbiter_sockaddr is not None: # optimization path, use any pre-existing peer channel
warnings.warn( maybe_peers: list[Channel]|None = get_peer_by_name(name)
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n' if maybe_peers and only_first:
'Use `registry_addrs: list[tuple]` instead!', async with open_portal(maybe_peers[0]) as peer_portal:
DeprecationWarning, yield peer_portal
stacklevel=2, return
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
if not registry_addrs: if not registry_addrs:
# XXX NOTE: make sure to dynamically read the value on # XXX NOTE: make sure to dynamically read the value on
@ -217,10 +234,13 @@ async def find_actor(
maybe_portals: list[ maybe_portals: list[
AsyncContextManager[tuple[str, int]] AsyncContextManager[tuple[str, int]]
] = list( ] = list(
maybe_open_portal_from_reg_addr(addr) maybe_open_portal(
addr=addr,
name=name,
)
for addr in registry_addrs for addr in registry_addrs
) )
portals: list[Portal]
async with gather_contexts( async with gather_contexts(
mngrs=maybe_portals, mngrs=maybe_portals,
) as portals: ) as portals:
@ -254,31 +274,31 @@ async def find_actor(
@acm @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None, registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
''' '''
Wait on an actor to register with the arbiter. Wait on at least one peer actor to register `name` with the
registrar, yield a `Portal to the first registree.
A portal to the first registered actor is returned.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
if arbiter_sockaddr is not None: # optimization path, use any pre-existing peer channel
warnings.warn( maybe_peers: list[Channel]|None = get_peer_by_name(name)
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n' if maybe_peers:
'Use `registry_addr: tuple` instead!', async with open_portal(maybe_peers[0]) as peer_portal:
DeprecationWarning, yield peer_portal
stacklevel=2, return
)
registry_addr: tuple[str, int] = arbiter_sockaddr
regaddr: tuple[str, int] = (
registry_addr
or
actor.reg_addrs[0]
)
# TODO: use `.trionics.gather_contexts()` like # TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well? # above in `find_actor()` as well?
reg_portal: Portal reg_portal: Portal
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal: async with get_registry(*regaddr) as reg_portal:
sockaddrs = await reg_portal.run_from_ns( sockaddrs = await reg_portal.run_from_ns(
'self', 'self',

View File

@ -243,6 +243,7 @@ def _trio_main(
nest_from_op( nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective input_op=')>', # like a "closed-to-play"-icon from super perspective
tree_str=actor_info, tree_str=actor_info,
back_from_op=1,
) )
) )
try: try:

View File

@ -263,11 +263,11 @@ class Portal:
return False return False
reminfo: str = ( reminfo: str = (
f'Portal.cancel_actor() => {self.channel.uid}\n' f'c)=> {self.channel.uid}\n'
f'|_{chan}\n' f' |_{chan}\n'
) )
log.cancel( log.cancel(
f'Requesting runtime cancel for peer\n\n' f'Requesting actor-runtime cancel for peer\n\n'
f'{reminfo}' f'{reminfo}'
) )

View File

@ -80,7 +80,7 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False, enable_stack_on_sig: bool = False,
# internal logging # internal logging
@ -233,14 +233,8 @@ async def open_root_actor(
and and
enable_stack_on_sig enable_stack_on_sig
): ):
try: from .devx._stackscope import enable_stack_on_sig
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig() enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = [] ponged_addrs: list[tuple[str, int]] = []

View File

@ -115,25 +115,26 @@ class Actor:
''' '''
The fundamental "runtime" concurrency primitive. The fundamental "runtime" concurrency primitive.
An *actor* is the combination of a regular Python process executing An "actor" is the combination of a regular Python process
a ``trio`` task tree, communicating with other actors through executing a `trio.run()` task tree, communicating with other
"memory boundary portals" - which provide a native async API around "actors" through "memory boundary portals": `Portal`, which
IPC transport "channels" which themselves encapsulate various provide a high-level async API around IPC "channels" (`Channel`)
(swappable) network protocols. which themselves encapsulate various (swappable) network
transport protocols for sending msgs between said memory domains
(processes, hosts, non-GIL threads).
Each "actor" is `trio.run()` scheduled "runtime" composed of many
Each "actor" is ``trio.run()`` scheduled "runtime" composed of concurrent tasks in a single thread. The "runtime" tasks conduct
many concurrent tasks in a single thread. The "runtime" tasks a slew of low(er) level functions to make it possible for message
conduct a slew of low(er) level functions to make it possible passing between actors as well as the ability to create new
for message passing between actors as well as the ability to actors (aka new "runtimes" in new processes which are supervised
create new actors (aka new "runtimes" in new processes which via an "actor-nursery" construct). Each task which sends messages
are supervised via a nursery construct). Each task which sends to a task in a "peer" actor (not necessarily a parent-child,
messages to a task in a "peer" (not necessarily a parent-child,
depth hierarchy) is able to do so via an "address", which maps depth hierarchy) is able to do so via an "address", which maps
IPC connections across memory boundaries, and a task request id IPC connections across memory boundaries, and a task request id
which allows for per-actor tasks to send and receive messages which allows for per-actor tasks to send and receive messages to
to specific peer-actor tasks with which there is an ongoing specific peer-actor tasks with which there is an ongoing RPC/IPC
RPC/IPC dialog. dialog.
''' '''
# ugh, we need to get rid of this and replace with a "registry" sys # ugh, we need to get rid of this and replace with a "registry" sys
@ -230,17 +231,20 @@ class Actor:
# by the user (currently called the "arbiter") # by the user (currently called the "arbiter")
self._spawn_method: str = spawn_method self._spawn_method: str = spawn_method
self._peers: defaultdict = defaultdict(list) self._peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
self._peer_connected: dict[tuple[str, str], trio.Event] = {} self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._no_more_peers.set() self._no_more_peers.set()
# RPC state
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set() self._ongoing_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], tuple[Channel, str], # (chan, cid)
tuple[Context, Callable, trio.Event] tuple[Context, Callable, trio.Event] # (ctx=>, fn(), done?)
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -317,7 +321,10 @@ class Actor:
event = self._peer_connected.setdefault(uid, trio.Event()) event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait() await event.wait()
log.debug(f'{uid!r} successfully connected back to us') log.debug(f'{uid!r} successfully connected back to us')
return event, self._peers[uid][-1] return (
event,
self._peers[uid][-1],
)
def load_modules( def load_modules(
self, self,
@ -408,26 +415,11 @@ class Actor:
''' '''
self._no_more_peers = trio.Event() # unset by making new self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid con_status: str = (
con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
if their_uid:
con_status = (
'IPC Re-connection from already known peer?\n'
)
else:
con_status = (
'New inbound IPC connection <=\n' 'New inbound IPC connection <=\n'
f'|_{chan}\n'
) )
con_status += (
f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
)
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid: tuple|None = await self._do_handshake(chan) uid: tuple|None = await self._do_handshake(chan)
@ -439,10 +431,10 @@ class Actor:
TransportClosed, TransportClosed,
): ):
# XXX: This may propagate up from ``Channel._aiter_recv()`` # XXX: This may propagate up from `Channel._aiter_recv()`
# and ``MsgpackStream._inter_packets()`` on a read from the # and `MsgpackStream._inter_packets()` on a read from the
# stream particularly when the runtime is first starting up # stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for # inside `open_root_actor()` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be # a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place. # because the handshake was never meant took place.
log.runtime( log.runtime(
@ -452,9 +444,22 @@ class Actor:
) )
return return
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += ( con_status += (
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n' f' -> Handshake with {familiar} `{uid_short}` complete\n'
) )
if _pre_chan:
log.warning(
# con_status += (
# ^TODO^ swap once we minimize conn duplication
f' -> Wait, we already have IPC with `{uid_short}`??\n'
f' |_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children: # IPC connection tracking for both peers and new children:
# - if this is a new channel to a locally spawned # - if this is a new channel to a locally spawned
# sub-actor there will be a spawn wait even registered # sub-actor there will be a spawn wait even registered
@ -507,8 +512,9 @@ class Actor:
) )
except trio.Cancelled: except trio.Cancelled:
log.cancel( log.cancel(
'IPC transport msg loop was cancelled for \n' 'IPC transport msg loop was cancelled\n'
f'|_{chan}\n' f'c)>\n'
f' |_{chan}\n'
) )
raise raise
@ -545,7 +551,7 @@ class Actor:
): ):
log.cancel( log.cancel(
'Waiting on cancel request to peer\n' 'Waiting on cancel request to peer..\n'
f'c)=>\n' f'c)=>\n'
f' |_{chan.uid}\n' f' |_{chan.uid}\n'
) )
@ -646,10 +652,14 @@ class Actor:
): ):
report: str = ( report: str = (
'Timed out waiting on local actor-nursery to exit?\n' 'Timed out waiting on local actor-nursery to exit?\n'
f'{local_nursery}\n' f'c)>\n'
f' |_{local_nursery}\n'
) )
if children := local_nursery._children: if children := local_nursery._children:
report += f' |_{pformat(children)}\n' # indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report) log.warning(report)
@ -1236,8 +1246,9 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and # TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual.. # other) repr fields instead of doing this all manual..
msg: str = ( msg: str = (
f'Runtime cancel request from {requester_type}:\n\n' f'Actor-runtime cancel request from {requester_type}\n\n'
f'<= .cancel(): {requesting_uid}\n\n' f'<=c) {requesting_uid}\n'
f' |_{self}\n'
) )
# TODO: what happens here when we self-cancel tho? # TODO: what happens here when we self-cancel tho?
@ -1471,11 +1482,11 @@ class Actor:
) )
log.cancel( log.cancel(
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<= canceller: {req_uid}\n' f'<=c) {req_uid} [canceller]\n'
f'{rent_chan_repr}' f'{rent_chan_repr}'
f'=> cancellee: {self.uid}\n' f'c)=> {self.uid} [cancellee]\n'
f' |_{self}.cancel_rpc_tasks()\n' f' |_{self} [with {len(tasks)} tasks]\n'
f' |_tasks: {len(tasks)}\n' # f' |_tasks: {len(tasks)}\n'
# f'{tasks_str}' # f'{tasks_str}'
) )
for ( for (
@ -1544,7 +1555,7 @@ class Actor:
def accept_addr(self) -> tuple[str, int]: def accept_addr(self) -> tuple[str, int]:
''' '''
Primary address to which the IPC transport server is Primary address to which the IPC transport server is
bound. bound and listening for new connections.
''' '''
# throws OSError on failure # throws OSError on failure
@ -1561,6 +1572,7 @@ class Actor:
def get_chans( def get_chans(
self, self,
uid: tuple[str, str], uid: tuple[str, str],
) -> list[Channel]: ) -> list[Channel]:
''' '''
Return all IPC channels to the actor with provided `uid`. Return all IPC channels to the actor with provided `uid`.
@ -1932,9 +1944,15 @@ async def async_main(
with CancelScope(shield=True): with CancelScope(shield=True):
await actor._no_more_peers.wait() await actor._no_more_peers.wait()
teardown_report += ('-> All peer channels are complete\n') teardown_report += (
'-> All peer channels are complete\n'
)
teardown_report += ('Actor runtime exited') teardown_report += (
'Actor runtime exiting\n'
f'>)\n'
f'|_{actor}\n'
)
log.info(teardown_report) log.info(teardown_report)

View File

@ -54,6 +54,25 @@ def examples_dir() -> pathlib.Path:
return repodir() / 'examples' return repodir() / 'examples'
def mk_cmd(
ex_name: str,
exs_subpath: str = 'debugging',
) -> str:
'''
Generate a shell command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = (
examples_dir()
/ exs_subpath
/ f'{ex_name}.py'
)
return ' '.join([
'python',
str(script_path)
])
@acm @acm
async def expect_ctxc( async def expect_ctxc(
yay: bool, yay: bool,

View File

@ -26,7 +26,7 @@ from ._debug import (
breakpoint as breakpoint, breakpoint as breakpoint,
pause as pause, pause as pause,
pause_from_sync as pause_from_sync, pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler, sigint_shield as sigint_shield,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback, maybe_init_greenback as maybe_init_greenback,

View File

@ -409,9 +409,9 @@ class Lock:
repl_task repl_task
) )
message += ( message += (
f'\nA non-caller task still owns this lock on behalf of ' f'A non-caller task still owns this lock on behalf of '
f'{behalf_of_task}\n' f'`{behalf_of_task}`\n'
f'|_{lock_stats.owner}\n' f'lock owner task: {lock_stats.owner}\n'
) )
if ( if (
@ -523,6 +523,10 @@ class Lock:
) )
def get_lock() -> Lock:
return Lock
@tractor.context( @tractor.context(
# enable the locking msgspec # enable the locking msgspec
pld_spec=__pld_spec__, pld_spec=__pld_spec__,
@ -788,13 +792,13 @@ class DebugStatus:
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync( cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
signal.signal, signal.signal,
signal.SIGINT, signal.SIGINT,
shield_sigint_handler, sigint_shield,
) )
else: else:
cls._orig_sigint_handler = signal.signal( cls._orig_sigint_handler = signal.signal(
signal.SIGINT, signal.SIGINT,
shield_sigint_handler, sigint_shield,
) )
@classmethod @classmethod
@ -900,12 +904,30 @@ class DebugStatus:
# actor-local state, irrelevant for non-root. # actor-local state, irrelevant for non-root.
cls.repl_task = None cls.repl_task = None
# XXX WARNING needs very special caughtion, and we should
# prolly make a more explicit `@property` API?
#
# - if unset in root multi-threaded case can cause
# issues with detecting that some root thread is
# using a REPL,
#
# - what benefit is there to unsetting, it's always
# set again for the next task in some actor..
# only thing would be to avoid in the sigint-handler
# logging when we don't need to?
cls.repl = None cls.repl = None
# restore original sigint handler # restore original sigint handler
cls.unshield_sigint() cls.unshield_sigint()
# TODO: use the new `@lowlevel.singleton` for this!
def get_debug_req() -> DebugStatus|None:
return DebugStatus
class TractorConfig(pdbp.DefaultConfig): class TractorConfig(pdbp.DefaultConfig):
''' '''
Custom `pdbp` config which tries to use the best tradeoff Custom `pdbp` config which tries to use the best tradeoff
@ -1311,7 +1333,7 @@ def any_connected_locker_child() -> bool:
return False return False
def shield_sigint_handler( def sigint_shield(
signum: int, signum: int,
frame: 'frame', # type: ignore # noqa frame: 'frame', # type: ignore # noqa
*args, *args,
@ -1351,13 +1373,17 @@ def shield_sigint_handler(
# root actor branch that reports whether or not a child # root actor branch that reports whether or not a child
# has locked debugger. # has locked debugger.
if is_root_process(): if is_root_process():
# log.warning(
log.devx(
'Handling SIGINT in root actor\n'
f'{Lock.repr()}'
f'{DebugStatus.repr()}\n'
)
# try to see if the supposed (sub)actor in debug still # try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not # has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger # it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally. # and we should propagate SIGINT normally.
any_connected: bool = any_connected_locker_child() any_connected: bool = any_connected_locker_child()
# if not any_connected:
# return do_cancel()
problem = ( problem = (
f'root {actor.uid} handling SIGINT\n' f'root {actor.uid} handling SIGINT\n'
@ -1406,19 +1432,25 @@ def shield_sigint_handler(
# an actor using the `Lock` (a bug state) ?? # an actor using the `Lock` (a bug state) ??
# => so immediately cancel any stale lock cs and revert # => so immediately cancel any stale lock cs and revert
# the handler! # the handler!
if not repl: if not DebugStatus.repl:
# TODO: WHEN should we revert back to ``trio`` # TODO: WHEN should we revert back to ``trio``
# handler if this one is stale? # handler if this one is stale?
# -[ ] maybe after a counts work of ctl-c mashes? # -[ ] maybe after a counts work of ctl-c mashes?
# -[ ] use a state var like `stale_handler: bool`? # -[ ] use a state var like `stale_handler: bool`?
problem += ( problem += (
'\n'
'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n' 'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n'
'BUT, the root should be using it, WHY this handler ??\n' 'BUT, the root should be using it, WHY this handler ??\n\n'
'So either..\n'
'- some root-thread is using it but has no `.repl` set?, OR\n'
'- something else weird is going on outside the runtime!?\n'
) )
else: else:
# NOTE: since we emit this msg on ctl-c, we should
# also always re-print the prompt the tail block!
log.pdb( log.pdb(
'Ignoring SIGINT while pdb REPL in use by root actor..\n' 'Ignoring SIGINT while pdb REPL in use by root actor..\n'
f'{DebugStatus.repl_task}\n'
f' |_{repl}\n'
) )
problem = None problem = None
@ -1468,7 +1500,6 @@ def shield_sigint_handler(
'Allowing SIGINT propagation..' 'Allowing SIGINT propagation..'
) )
DebugStatus.unshield_sigint() DebugStatus.unshield_sigint()
# do_cancel()
repl_task: str|None = DebugStatus.repl_task repl_task: str|None = DebugStatus.repl_task
req_task: str|None = DebugStatus.req_task req_task: str|None = DebugStatus.req_task
@ -1483,10 +1514,15 @@ def shield_sigint_handler(
f' |_{repl}\n' f' |_{repl}\n'
) )
elif req_task: elif req_task:
log.pdb( log.debug(
f'Ignoring SIGINT while debug request task is open\n' 'Ignoring SIGINT while debug request task is open but either,\n'
'- someone else is already REPL-in and has the `Lock`, or\n'
'- some other local task already is replin?\n'
f'|_{req_task}\n' f'|_{req_task}\n'
) )
# TODO can we remove this now?
# -[ ] does this path ever get hit any more?
else: else:
msg: str = ( msg: str = (
'SIGINT shield handler still active BUT, \n\n' 'SIGINT shield handler still active BUT, \n\n'
@ -1522,37 +1558,53 @@ def shield_sigint_handler(
# https://github.com/goodboy/tractor/issues/320 # https://github.com/goodboy/tractor/issues/320
# elif debug_mode(): # elif debug_mode():
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it looks to be that the last command that was run (eg. ll)
# will be repeated by default.
# maybe redraw/print last REPL output to console since # maybe redraw/print last REPL output to console since
# we want to alert the user that more input is expect since # we want to alert the user that more input is expect since
# nothing has been done dur to ignoring sigint. # nothing has been done dur to ignoring sigint.
if ( if (
repl # only when current actor has a REPL engaged DebugStatus.repl # only when current actor has a REPL engaged
): ):
flush_status: str = (
'Flushing stdout to ensure new prompt line!\n'
)
# XXX: yah, mega hack, but how else do we catch this madness XD # XXX: yah, mega hack, but how else do we catch this madness XD
if repl.shname == 'xonsh': if (
repl.shname == 'xonsh'
):
flush_status += (
'-> ALSO re-flushing due to `xonsh`..\n'
)
repl.stdout.write(repl.prompt) repl.stdout.write(repl.prompt)
# log.warning(
log.devx(
flush_status
)
repl.stdout.flush() repl.stdout.flush()
# TODO: make this work like sticky mode where if there is output # TODO: better console UX to match the current "mode":
# -[ ] for example if in sticky mode where if there is output
# detected as written to the tty we redraw this part underneath # detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above? # and erase the past draw of this same bit above?
# repl.sticky = True # repl.sticky = True
# repl._print_if_sticky() # repl._print_if_sticky()
# also see these links for an approach from ``ptk``: # also see these links for an approach from `ptk`:
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
else:
log.devx(
# log.warning(
'Not flushing stdout since not needed?\n'
f'|_{repl}\n'
)
# XXX only for tracing this handler # XXX only for tracing this handler
log.devx('exiting SIGINT') log.devx('exiting SIGINT')
_pause_msg: str = 'Attaching to pdb REPL in actor' _pause_msg: str = 'Opening a pdb REPL in paused actor'
class DebugRequestError(RuntimeError): class DebugRequestError(RuntimeError):
@ -1617,7 +1669,7 @@ async def _pause(
# 'directly (infected) `asyncio` tasks!' # 'directly (infected) `asyncio` tasks!'
# ) from rte # ) from rte
raise raise rte
if debug_func is not None: if debug_func is not None:
debug_func = partial(debug_func) debug_func = partial(debug_func)
@ -1625,9 +1677,13 @@ async def _pause(
# XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug # XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug
# request from a subactor BEFORE the REPL is entered by that # request from a subactor BEFORE the REPL is entered by that
# process. # process.
if not repl: if (
not repl
and
debug_func
):
repl: PdbREPL = mk_pdb()
DebugStatus.shield_sigint() DebugStatus.shield_sigint()
repl: PdbREPL = repl or mk_pdb()
# TODO: move this into a `open_debug_request()` @acm? # TODO: move this into a `open_debug_request()` @acm?
# -[ ] prolly makes the most sense to do the request # -[ ] prolly makes the most sense to do the request
@ -1662,7 +1718,13 @@ async def _pause(
# recurrent entries/requests from the same # recurrent entries/requests from the same
# actor-local task. # actor-local task.
DebugStatus.repl_task = task DebugStatus.repl_task = task
if repl:
DebugStatus.repl = repl DebugStatus.repl = repl
else:
log.error(
'No REPl instance set before entering `debug_func`?\n'
f'{debug_func}\n'
)
# invoke the low-level REPL activation routine which itself # invoke the low-level REPL activation routine which itself
# should call into a `Pdb.set_trace()` of some sort. # should call into a `Pdb.set_trace()` of some sort.
@ -2001,7 +2063,7 @@ async def _pause(
DebugStatus.release(cancel_req_task=True) DebugStatus.release(cancel_req_task=True)
# sanity checks for ^ on request/status teardown # sanity checks for ^ on request/status teardown
assert DebugStatus.repl is None # assert DebugStatus.repl is None # XXX no more bc bg thread cases?
assert DebugStatus.repl_task is None assert DebugStatus.repl_task is None
# sanity, for when hackin on all this? # sanity, for when hackin on all this?
@ -2050,9 +2112,8 @@ def _set_trace(
# root here? Bo # root here? Bo
log.pdb( log.pdb(
f'{_pause_msg}\n' f'{_pause_msg}\n'
# '|\n'
f'>(\n' f'>(\n'
f' |_ {task} @ {actor.uid}\n' f'|_ {task} @ {actor.uid}\n'
# ^-TODO-^ more compact pformating? # ^-TODO-^ more compact pformating?
# -[ ] make an `Actor.__repr()__` # -[ ] make an `Actor.__repr()__`
# -[ ] should we use `log.pformat_task_uid()`? # -[ ] should we use `log.pformat_task_uid()`?
@ -2241,7 +2302,12 @@ async def _pause_from_bg_root_thread(
'Trying to acquire `Lock` on behalf of bg thread\n' 'Trying to acquire `Lock` on behalf of bg thread\n'
f'|_{behalf_of_thread}\n' f'|_{behalf_of_thread}\n'
) )
# DebugStatus.repl_task = behalf_of_thread
# NOTE: this is already a task inside the main-`trio`-thread, so
# we don't need to worry about calling it another time from the
# bg thread on which who's behalf this task is operating.
DebugStatus.shield_sigint()
out = await _pause( out = await _pause(
debug_func=None, debug_func=None,
repl=repl, repl=repl,
@ -2250,6 +2316,8 @@ async def _pause_from_bg_root_thread(
called_from_bg_thread=True, called_from_bg_thread=True,
**_pause_kwargs **_pause_kwargs
) )
DebugStatus.repl_task = behalf_of_thread
lock: trio.FIFOLock = Lock._debug_lock lock: trio.FIFOLock = Lock._debug_lock
stats: trio.LockStatistics= lock.statistics() stats: trio.LockStatistics= lock.statistics()
assert stats.owner is task assert stats.owner is task
@ -2283,7 +2351,6 @@ async def _pause_from_bg_root_thread(
f'|_{behalf_of_thread}\n' f'|_{behalf_of_thread}\n'
) )
task_status.started(out) task_status.started(out)
DebugStatus.shield_sigint()
# wait for bg thread to exit REPL sesh. # wait for bg thread to exit REPL sesh.
try: try:
@ -2324,7 +2391,7 @@ def pause_from_sync(
err_on_no_runtime=False, err_on_no_runtime=False,
) )
message: str = ( message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n\n' f'{actor.uid} task called `tractor.pause_from_sync()`\n'
) )
if not actor: if not actor:
raise RuntimeError( raise RuntimeError(
@ -2348,7 +2415,6 @@ def pause_from_sync(
'for infected `asyncio` mode!' 'for infected `asyncio` mode!'
) )
DebugStatus.shield_sigint()
repl: PdbREPL = mk_pdb() repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n' # message += f'-> created local REPL {repl}\n'
@ -2366,6 +2432,10 @@ def pause_from_sync(
# thread which will call `._pause()` manually with special # thread which will call `._pause()` manually with special
# handling for root-actor caller usage. # handling for root-actor caller usage.
if not DebugStatus.is_main_trio_thread(): if not DebugStatus.is_main_trio_thread():
# TODO: `threading.Lock()` this so we don't get races in
# multi-thr cases where they're acquiring/releasing the
# REPL and setting request/`Lock` state, etc..
thread: threading.Thread = threading.current_thread() thread: threading.Thread = threading.current_thread()
repl_owner = thread repl_owner = thread
@ -2373,9 +2443,16 @@ def pause_from_sync(
if is_root: if is_root:
message += ( message += (
f'-> called from a root-actor bg {thread}\n' f'-> called from a root-actor bg {thread}\n'
f'-> scheduling `._pause_from_sync_thread()`..\n' f'-> scheduling `._pause_from_bg_root_thread()`..\n'
) )
bg_task, repl = trio.from_thread.run( # XXX SUBTLE BADNESS XXX that should really change!
# don't over-write the `repl` here since when
# this behalf-of-bg_thread-task calls pause it will
# pass `debug_func=None` which will result in it
# returing a `repl==None` output and that get's also
# `.started(out)` back here! So instead just ignore
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
afn=partial( afn=partial(
actor._service_n.start, actor._service_n.start,
partial( partial(
@ -2387,8 +2464,9 @@ def pause_from_sync(
), ),
) )
) )
DebugStatus.shield_sigint()
message += ( message += (
f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n' f'-> `._pause_from_bg_root_thread()` started bg task {bg_task}\n'
) )
else: else:
message += f'-> called from a bg {thread}\n' message += f'-> called from a bg {thread}\n'
@ -2397,7 +2475,7 @@ def pause_from_sync(
# `request_root_stdio_lock()` and we don't need to # `request_root_stdio_lock()` and we don't need to
# worry about all the special considerations as with # worry about all the special considerations as with
# the root-actor per above. # the root-actor per above.
bg_task, repl = trio.from_thread.run( bg_task, _ = trio.from_thread.run(
afn=partial( afn=partial(
_pause, _pause,
debug_func=None, debug_func=None,
@ -2412,6 +2490,9 @@ def pause_from_sync(
**_pause_kwargs **_pause_kwargs
), ),
) )
# ?TODO? XXX where do we NEED to call this in the
# subactor-bg-thread case?
DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task assert bg_task is not DebugStatus.repl_task
else: # we are presumably the `trio.run()` + main thread else: # we are presumably the `trio.run()` + main thread
@ -2424,6 +2505,11 @@ def pause_from_sync(
# greenback: ModuleType = await maybe_init_greenback() # greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n' message += f'-> imported {greenback}\n'
# NOTE XXX seems to need to be set BEFORE the `_pause()`
# invoke using gb below?
DebugStatus.shield_sigint()
repl_owner: Task = current_task() repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try: try:
@ -2449,8 +2535,11 @@ def pause_from_sync(
raise raise
if out: if out:
bg_task, repl = out bg_task, _ = out
assert repl is repl else:
bg_task: Task = current_task()
# assert repl is repl
assert bg_task is repl_owner assert bg_task is repl_owner
# NOTE: normally set inside `_enter_repl_sync()` # NOTE: normally set inside `_enter_repl_sync()`
@ -2465,7 +2554,10 @@ def pause_from_sync(
) )
log.devx(message) log.devx(message)
# NOTE set as late as possible to avoid state clobbering
# in the multi-threaded case!
DebugStatus.repl = repl DebugStatus.repl = repl
_set_trace( _set_trace(
api_frame=api_frame or inspect.currentframe(), api_frame=api_frame or inspect.currentframe(),
repl=repl, repl=repl,
@ -2523,7 +2615,7 @@ async def breakpoint(
_crash_msg: str = ( _crash_msg: str = (
'Attaching to pdb REPL in crashed actor' 'Opening a pdb REPL in crashed actor'
) )
@ -2551,11 +2643,9 @@ def _post_mortem(
# here! Bo # here! Bo
log.pdb( log.pdb(
f'{_crash_msg}\n' f'{_crash_msg}\n'
# '|\n'
f'x>(\n' f'x>(\n'
f' |_ {current_task()} @ {actor.uid}\n' f' |_ {current_task()} @ {actor.uid}\n'
# f'|_ @{actor.uid}\n'
# TODO: make an `Actor.__repr()__` # TODO: make an `Actor.__repr()__`
# f'|_ {current_task()} @ {actor.name}\n' # f'|_ {current_task()} @ {actor.name}\n'
) )
@ -2668,7 +2758,8 @@ async def acquire_debug_lock(
tuple, tuple,
]: ]:
''' '''
Request to acquire the TTY `Lock` in the root actor, release on exit. Request to acquire the TTY `Lock` in the root actor, release on
exit.
This helper is for actor's who don't actually need to acquired This helper is for actor's who don't actually need to acquired
the debugger but want to wait until the lock is free in the the debugger but want to wait until the lock is free in the
@ -2680,10 +2771,14 @@ async def acquire_debug_lock(
yield None yield None
return return
task: Task = current_task()
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
ctx: Context = await n.start( ctx: Context = await n.start(
partial(
request_root_stdio_lock, request_root_stdio_lock,
subactor_uid, actor_uid=subactor_uid,
task_uid=(task.name, id(task)),
)
) )
yield ctx yield ctx
ctx.cancel() ctx.cancel()

View File

@ -24,13 +24,24 @@ disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations from __future__ import annotations
# from functools import partial
from threading import (
current_thread,
Thread,
RLock,
)
import multiprocessing as mp import multiprocessing as mp
from signal import ( from signal import (
signal, signal,
getsignal,
SIGUSR1, SIGUSR1,
) )
import traceback # import traceback
from typing import TYPE_CHECKING from types import ModuleType
from typing import (
Callable,
TYPE_CHECKING,
)
import trio import trio
from tractor import ( from tractor import (
@ -51,26 +62,45 @@ if TYPE_CHECKING:
@trio.lowlevel.disable_ki_protection @trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None: def dump_task_tree() -> None:
import stackscope '''
from tractor.log import get_console_log Do a classic `stackscope.extract()` task-tree dump to console at
`.devx()` level.
'''
import stackscope
tree_str: str = str( tree_str: str = str(
stackscope.extract( stackscope.extract(
trio.lowlevel.current_root_task(), trio.lowlevel.current_root_task(),
recurse_child_tasks=True recurse_child_tasks=True
) )
) )
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
thr: Thread = current_thread()
log.devx( log.devx(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n' f'{actor.uid}:\n'
f' |_{mp.current_process()}\n\n' f'|_{mp.current_process()}\n'
f' |_{thr}\n'
f' |_{actor}\n\n'
# start-of-trace-tree delimiter (mostly for testing)
'------ - ------\n'
'\n'
+
f'{tree_str}\n' f'{tree_str}\n'
+
# end-of-trace-tree delimiter (mostly for testing)
f'\n'
f'------ {actor.uid!r} ------\n'
) )
# TODO: can remove this right?
# -[ ] was original code from author
#
# print(
# 'DUMPING FROM PRINT\n'
# +
# content
# )
# import logging # import logging
# try: # try:
# with open("/dev/tty", "w") as tty: # with open("/dev/tty", "w") as tty:
@ -80,58 +110,130 @@ def dump_task_tree() -> None:
# "task_tree" # "task_tree"
# ).exception("Error printing task tree") # ).exception("Error printing task tree")
_handler_lock = RLock()
_tree_dumped: bool = False
def signal_handler(
def dump_tree_on_sig(
sig: int, sig: int,
frame: object, frame: object,
relay_to_subs: bool = True, relay_to_subs: bool = True,
) -> None: ) -> None:
global _tree_dumped, _handler_lock
with _handler_lock:
if _tree_dumped:
log.warning(
'Already dumped for this actor...??'
)
return
_tree_dumped = True
# actor: Actor = _state.current_actor()
log.devx(
'Trying to dump `stackscope` tree..\n'
)
try: try:
trio.lowlevel.current_trio_token( dump_task_tree()
).run_sync_soon(dump_task_tree) # await actor._service_n.start_soon(
# partial(
# trio.to_thread.run_sync,
# dump_task_tree,
# )
# )
# trio.lowlevel.current_trio_token().run_sync_soon(
# dump_task_tree
# )
except RuntimeError: except RuntimeError:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
# not in async context -- print a normal traceback # not in async context -- print a normal traceback
traceback.print_stack() # traceback.print_stack()
raise
except BaseException:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
raise
log.devx(
'Supposedly we dumped just fine..?'
)
if not relay_to_subs: if not relay_to_subs:
return return
an: ActorNursery an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values(): for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType subproc: ProcessType
subactor: Actor subactor: Actor
for subactor, subproc, _ in an._children.values(): for subactor, subproc, _ in an._children.values():
log.devx( log.warning(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n' f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n' f'{subactor}\n'
f' |_{subproc}\n' f' |_{subproc}\n'
) )
if isinstance(subproc, trio.Process): # bc of course stdlib can't have a std API.. XD
match subproc:
case trio.Process():
subproc.send_signal(sig) subproc.send_signal(sig)
elif isinstance(subproc, mp.Process): case mp.Process():
subproc._send_signal(sig) subproc._send_signal(sig)
def enable_stack_on_sig( def enable_stack_on_sig(
sig: int = SIGUSR1 sig: int = SIGUSR1,
) -> None: ) -> ModuleType:
''' '''
Enable `stackscope` tracing on reception of a signal; by Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1. default this is SIGUSR1.
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
fancy cmds.
For ex. from `bash` using `pgrep` and cmd-sustitution
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use:
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
Or with with `xonsh` (which has diff capture-from-subproc syntax)
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
''' '''
try:
import stackscope
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
return None
handler: Callable|int = getsignal(sig)
if handler is dump_tree_on_sig:
log.devx(
'A `SIGUSR1` handler already exists?\n'
f'|_ {handler!r}\n'
)
return
signal( signal(
sig, sig,
signal_handler, dump_tree_on_sig,
) )
# NOTE: not the above can be triggered from log.devx(
# a (xonsh) shell using: 'Enabling trace-trees on `SIGUSR1` '
# kill -SIGUSR1 @$(pgrep -f '<cmd>') 'since `stackscope` is installed @ \n'
# f'{stackscope!r}\n\n'
# for example if you were looking to trace a `pytest` run f'With `SIGUSR1` handler\n'
# kill -SIGUSR1 @$(pgrep -f 'pytest') f'|_{dump_tree_on_sig}\n'
)
return stackscope

View File

@ -374,7 +374,7 @@ class PldRx(Struct):
case _: case _:
src_err = InternalError( src_err = InternalError(
'Unknown IPC msg ??\n\n' 'Invalid IPC msg ??\n\n'
f'{msg}\n' f'{msg}\n'
) )
@ -499,7 +499,7 @@ async def maybe_limit_plds(
yield None yield None
return return
# sanity on scoping # sanity check on IPC scoping
curr_ctx: Context = current_ipc_ctx() curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx assert ctx is curr_ctx
@ -510,6 +510,8 @@ async def maybe_limit_plds(
) as msgdec: ) as msgdec:
yield msgdec yield msgdec
# when the applied spec is unwound/removed, the same IPC-ctx
# should still be in scope.
curr_ctx: Context = current_ipc_ctx() curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx assert ctx is curr_ctx
@ -525,16 +527,26 @@ async def drain_to_final_msg(
list[MsgType] list[MsgType]
]: ]:
''' '''
Drain IPC msgs delivered to the underlying IPC primitive's Drain IPC msgs delivered to the underlying IPC context's
rx-mem-chan (eg. `Context._rx_chan`) from the runtime in rx-mem-chan (i.e. from `Context._rx_chan`) in search for a final
search for a final result or error. `Return` or `Error` msg.
The motivation here is to ideally capture errors during ctxc Deliver the `Return` + preceding drained msgs (`list[MsgType]`)
conditions where a canc-request/or local error is sent but the as a pair unless an `Error` is found, in which unpack and raise
local task also excepts and enters the it.
`Portal.open_context().__aexit__()` block wherein we prefer to
capture and raise any remote error or ctxc-ack as part of the The motivation here is to always capture any remote error relayed
`ctx.result()` cleanup and teardown sequence. by the remote peer task during a ctxc condition.
For eg. a ctxc-request may be sent to the peer as part of the
local task's (request for) cancellation but then that same task
**also errors** before executing the teardown in the
`Portal.open_context().__aexit__()` block. In such error-on-exit
cases we want to always capture and raise any delivered remote
error (like an expected ctxc-ACK) as part of the final
`ctx.wait_for_result()` teardown sequence such that the
`Context.outcome` related state always reflect what transpired
even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -572,7 +584,6 @@ async def drain_to_final_msg(
# |_from tractor.devx._debug import pause # |_from tractor.devx._debug import pause
# await pause() # await pause()
# NOTE: we get here if the far end was # NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases: # `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus # 1. we requested the cancellation and thus
@ -580,13 +591,13 @@ async def drain_to_final_msg(
# 2. WE DID NOT REQUEST that cancel and thus # 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE! # SHOULD RAISE HERE!
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
# cancellation. # cancellation.
ctx.maybe_raise( ctx.maybe_raise(
# TODO: when use this/ hide_tb=hide_tb,
# TODO: when use this?
# from_src_exc=taskc, # from_src_exc=taskc,
) )
@ -659,7 +670,7 @@ async def drain_to_final_msg(
# Stop() # Stop()
case Stop(): case Stop():
pre_result_drained.append(msg) pre_result_drained.append(msg)
log.cancel( log.runtime( # normal/expected shutdown transaction
'Remote stream terminated due to "stop" msg:\n\n' 'Remote stream terminated due to "stop" msg:\n\n'
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
@ -719,13 +730,19 @@ async def drain_to_final_msg(
pre_result_drained.append(msg) pre_result_drained.append(msg)
# It's definitely an internal error if any other # It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here! # msg type without a`'cid'` field arrives here!
report: str = (
f'Invalid or unknown msg type {type(msg)!r}!?\n'
)
if not msg.cid: if not msg.cid:
raise InternalError( report += (
'Unexpected cid-missing msg?\n\n' '\nWhich also has no `.cid` field?\n'
f'{msg}\n'
) )
raise RuntimeError('Unknown msg type: {msg}') raise MessagingError(
report
+
f'\n{msg}\n'
)
else: else:
log.cancel( log.cancel(