Compare commits

..

No commits in common. "547b957bbf4b3346442352f54ed51a20ff29f79a" and "9be821a5cfa4bf4bb7edd44ec1a14da35fd15bd5" have entirely different histories.

22 changed files with 441 additions and 930 deletions

View File

@ -4,13 +4,6 @@ import time
import trio
import tractor
# TODO: only import these when not running from test harness?
# can we detect `pexpect` usage maybe?
# from tractor.devx._debug import (
# get_lock,
# get_debug_req,
# )
def sync_pause(
use_builtin: bool = False,
@ -25,13 +18,7 @@ def sync_pause(
breakpoint(hide_tb=hide_tb)
else:
# TODO: maybe for testing some kind of cm style interface
# where the `._set_trace()` call doesn't happen until block
# exit?
# assert get_lock().ctx_in_debug is None
# assert get_debug_req().repl is None
tractor.pause_from_sync()
# assert get_debug_req().repl is None
if error:
raise RuntimeError('yoyo sync code error')
@ -54,11 +41,10 @@ async def start_n_sync_pause(
async def main() -> None:
async with (
tractor.open_nursery(
debug_mode=True,
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
enable_stack_on_sig=True,
# loglevel='warning',
# loglevel='devx',
debug_mode=True,
# loglevel='cancel',
) as an,
trio.open_nursery() as tn,
):
@ -152,9 +138,7 @@ async def main() -> None:
# the case 2. from above still exists!
use_builtin=True,
),
# TODO: with this `False` we can hang!??!
# abandon_on_cancel=False,
abandon_on_cancel=True,
abandon_on_cancel=False,
thread_name='inline_root_bg_thread',
)

View File

@ -9,7 +9,7 @@ async def main(service_name):
async with tractor.open_nursery() as an:
await an.start_actor(service_name)
async with tractor.get_registry('127.0.0.1', 1616) as portal:
async with tractor.get_arbiter('127.0.0.1', 1616) as portal:
print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr:

View File

View File

@ -1,167 +0,0 @@
'''
`tractor.devx.*` tooling sub-pkg test space.
'''
from typing import (
Callable,
)
import pytest
from pexpect.exceptions import (
TIMEOUT,
)
from tractor._testing import (
mk_cmd,
)
@pytest.fixture
def spawn(
start_method,
testdir: pytest.Testdir,
reg_addr: tuple[str, int],
) -> Callable[[str], None]:
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
'''
if start_method != 'trio':
pytest.skip(
'`pexpect` based tests only supported on `trio` backend'
)
def _spawn(
cmd: str,
**mkcmd_kwargs,
):
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
)
# such that test-dep can pass input script name.
return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
def expect(
child,
# normally a `pdb` prompt by default
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
err_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
if err_on_false:
raise ValueError(
f'Could not find pattern: {part!r} in `before` output?'
)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
# since this is an "assert" helper ;)
err_on_false=True,
**kwargs
)

View File

@ -1,120 +0,0 @@
'''
That "native" runtime-hackin toolset better be dang useful!
Verify the funtion of a variety of "developer-experience" tools we
offer from the `.devx` sub-pkg:
- use of the lovely `stackscope` for dumping actor `trio`-task trees
during operation and hangs.
TODO:
- demonstration of `CallerInfo` call stack frame filtering such that
for logging and REPL purposes a user sees exactly the layers needed
when debugging a problem inside the stack vs. in their app.
'''
import os
import signal
from .conftest import (
expect,
assert_before,
# in_prompt_msg,
)
def test_shield_pause(
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
already cancelled `trio.CancelScope` and that you can step to the
next checkpoint wherein the cancelled will get raised.
'''
child = spawn(
'shield_hang_in_sub'
)
expect(
child,
'Yo my child hanging..?',
)
assert_before(
child,
[
'Entering shield sleep..',
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
]
)
print(
'Sending SIGUSR1 to see a tree-trace!',
)
os.kill(
child.pid,
signal.SIGUSR1,
)
expect(
child,
# end-of-tree delimiter
"------ \('root', ",
)
assert_before(
child,
[
'Trying to dump `stackscope` tree..',
'Dumping `stackscope` tree for actor',
"('root'", # uid line
# parent block point (non-shielded)
'await trio.sleep_forever() # in root',
]
)
# expect(
# child,
# # relay to the sub should be reported
# 'Relaying `SIGUSR1`[10] to sub-actor',
# )
expect(
child,
# end-of-tree delimiter
"------ \('hanger', ",
)
assert_before(
child,
[
# relay to the sub should be reported
'Relaying `SIGUSR1`[10] to sub-actor',
"('hanger'", # uid line
# hanger LOC where it's shield-halted
'await trio.sleep_forever() # in subactor',
]
)
# breakpoint()
# simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since
# the sub is shield blocking and can't respond to SIGINT.
os.kill(
child.pid,
signal.SIGINT,
)
expect(
child,
'Shutting down actor runtime',
timeout=6,
)
assert_before(
child,
[
'raise KeyboardInterrupt',
# 'Shutting down actor runtime',
'#T-800 deployed to collect zombie B0',
"'--uid', \"('hanger',",
]
)

View File

@ -91,8 +91,7 @@ def test_ipc_channel_break_during_stream(
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_exc = trio.ClosedResourceError
mod: ModuleType = import_path(
examples_dir() / 'advanced_faults'
@ -158,7 +157,7 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
# NOTE when the parent IPC side dies (even if the child does as well
# NOTE when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect
# a stop msg since the parent-side ctx apis will error out
@ -170,8 +169,7 @@ def test_ipc_channel_break_during_stream(
and
ipc_break['break_child_ipc_after'] is False
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_exc = trio.ClosedResourceError
# BOTH but, PARENT breaks FIRST
elif (
@ -182,8 +180,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after']
)
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_exc = trio.ClosedResourceError
with pytest.raises(
expected_exception=(
@ -202,8 +199,8 @@ def test_ipc_channel_break_during_stream(
**ipc_break,
)
)
except KeyboardInterrupt as _kbi:
kbi = _kbi
except KeyboardInterrupt as kbi:
_err = kbi
if expect_final_exc is not KeyboardInterrupt:
pytest.fail(
'Rxed unexpected KBI !?\n'
@ -212,21 +209,6 @@ def test_ipc_channel_break_during_stream(
raise
except tractor.TransportClosed as _tc:
tc = _tc
if expect_final_exc is KeyboardInterrupt:
pytest.fail(
'Unexpected transport failure !?\n'
f'{repr(tc)}'
)
cause: Exception = tc.__cause__
assert (
type(cause) is trio.ClosedResourceError
and
cause.args[0] == 'another task closed this fd'
)
raise
# get raw instance from pytest wrapper
value = excinfo.value
if isinstance(value, ExceptionGroup):

View File

@ -13,9 +13,11 @@ TODO:
from functools import partial
import itertools
import platform
import pathlib
import time
import pytest
import pexpect
from pexpect.exceptions import (
TIMEOUT,
EOF,
@ -26,14 +28,12 @@ from tractor.devx._debug import (
_crash_msg,
_repl_fail_msg,
)
from tractor._testing import (
examples_dir,
)
from conftest import (
_ci_env,
)
from .conftest import (
expect,
in_prompt_msg,
assert_before,
)
# TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an
@ -52,6 +52,15 @@ if platform.system() == 'Windows':
)
def mk_cmd(ex_name: str) -> str:
'''
Generate a command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
return ' '.join(['python', str(script_path)])
# TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin...
@ -70,9 +79,142 @@ has_nested_actors = pytest.mark.has_nested_actors
# )
@pytest.fixture
def spawn(
start_method,
testdir,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':
pytest.skip(
"Debugger tests are only supported on the trio backend"
)
def _spawn(cmd):
return testdir.spawn(
cmd=mk_cmd(cmd),
expect_timeout=3,
)
return _spawn
PROMPT = r"\(Pdb\+\)"
def expect(
child,
# prompt by default
patt: str = PROMPT,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
**kwargs
)
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@pytest.mark.parametrize(
'user_in_out',
[
@ -137,7 +279,7 @@ def test_root_actor_bp(spawn, user_in_out):
child.expect('\r\n')
# process should exit
child.expect(EOF)
child.expect(pexpect.EOF)
if expect_err_str is None:
assert 'Error' not in str(child.before)
@ -157,9 +299,7 @@ def do_ctlc(
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> str|None:
before: str|None = None
) -> None:
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
@ -169,18 +309,15 @@ def do_ctlc(
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before
def test_root_actor_bp_forever(
spawn,
@ -221,7 +358,7 @@ def test_root_actor_bp_forever(
# quit out of the loop
child.sendline('q')
child.expect(EOF)
child.expect(pexpect.EOF)
@pytest.mark.parametrize(
@ -286,7 +423,7 @@ def test_subactor_error(
child.expect('\r\n')
# process should exit
child.expect(EOF)
child.expect(pexpect.EOF)
def test_subactor_breakpoint(
@ -349,7 +486,7 @@ def test_subactor_breakpoint(
child.sendline('c')
# process should exit
child.expect(EOF)
child.expect(pexpect.EOF)
before = str(child.before.decode())
assert in_prompt_msg(
@ -492,7 +629,7 @@ def test_multi_subactors(
# process should exit
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
# repeat of previous multierror for final output
assert_before(child, [
@ -632,7 +769,7 @@ def test_multi_daemon_subactors(
)
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
@has_nested_actors
@ -708,7 +845,7 @@ def test_multi_subactors_root_errors(
])
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'",
@ -838,7 +975,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(3):
try:
child.expect(EOF, timeout=0.5)
child.expect(pexpect.EOF, timeout=0.5)
break
except TIMEOUT:
child.sendline('c')
@ -880,7 +1017,7 @@ def test_root_cancels_child_context_during_startup(
do_ctlc(child)
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
def test_different_debug_mode_per_actor(
@ -901,7 +1038,7 @@ def test_different_debug_mode_per_actor(
do_ctlc(child)
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
before = str(child.before.decode())
@ -948,10 +1085,10 @@ def test_pause_from_sync(
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
@ -972,27 +1109,7 @@ def test_pause_from_sync(
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
do_ctlc(child)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
@ -1011,48 +1128,32 @@ def test_pause_from_sync(
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts:
for key in attach_patts.copy():
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg]
+
expected_patts
[_pause_msg] + expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
for key, other_patts in attach_patts.items():
assert not in_prompt_msg(
before,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
do_ctlc(child)
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
def test_post_mortem_api(
@ -1157,7 +1258,7 @@ def test_post_mortem_api(
# )
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
def test_shield_pause(
@ -1232,7 +1333,7 @@ def test_shield_pause(
]
)
child.sendline('c')
child.expect(EOF)
child.expect(pexpect.EOF)
# TODO: better error for "non-ideal" usage from the root actor.

View File

@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
async with tractor.get_registry(*reg_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@ -298,7 +298,7 @@ async def close_chans_before_nursery(
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
async with tractor.get_registry(*reg_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)

View File

@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):

View File

@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
pass

View File

@ -30,7 +30,7 @@ from ._streaming import (
stream as stream,
)
from ._discovery import (
get_registry as get_registry,
get_arbiter as get_arbiter,
find_actor as find_actor,
wait_for_actor as wait_for_actor,
query_actor as query_actor,

View File

@ -2376,9 +2376,8 @@ async def open_context_from_portal(
and ctx.cancel_acked
):
log.cancel(
f'Context cancelled by local {ctx.side!r}-side task\n'
f'c)>\n'
f' |_{ctx._task}\n\n'
f'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
)
@ -2394,10 +2393,8 @@ async def open_context_from_portal(
# type_only=True,
)
log.cancel(
f'Context terminated due to {ctx.side!r}-side\n\n'
# TODO: do an x)> on err and c)> only for ctxc?
f'c)> {outcome_str}\n'
f' |_{ctx.repr_rpc}\n'
f'Context terminated due to local {ctx.side!r}-side error:\n\n'
f'{ctx.chan.uid} => {outcome_str}\n'
)
# FINALLY, remove the context from runtime tracking and

View File

@ -26,8 +26,8 @@ from typing import (
TYPE_CHECKING,
)
from contextlib import asynccontextmanager as acm
import warnings
from tractor.log import get_logger
from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel
from ._portal import (
@ -40,13 +40,11 @@ from ._state import (
_runtime_vars,
)
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
@acm
async def get_registry(
host: str,
@ -58,12 +56,14 @@ async def get_registry(
]:
'''
Return a portal instance connected to a local or remote
registry-service actor; if a connection already exists re-use it
(presumably to call a `.register_actor()` registry runtime RPC
ep).
arbiter.
'''
actor: Actor = current_actor()
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
@ -72,8 +72,6 @@ async def get_registry(
Channel((host, port))
)
else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl,
@ -82,6 +80,19 @@ async def get_registry(
# TODO: deprecate and this remove _arbiter form!
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
@acm
async def get_root(
**kwargs,
@ -99,53 +110,22 @@ async def get_root(
yield portal
def get_peer_by_name(
name: str,
# uuid: str|None = None,
) -> list[Channel]|None: # at least 1
'''
Scan for an existing connection (set) to a named actor
and return any channels from `Actor._peers`.
This is an optimization method over querying the registrar for
the same info.
'''
actor: Actor = current_actor()
to_scan: dict[tuple, list[Channel]] = actor._peers.copy()
pchan: Channel|None = actor._parent_chan
if pchan:
to_scan[pchan.uid].append(pchan)
for aid, chans in to_scan.items():
_, peer_name = aid
if name == peer_name:
if not chans:
log.warning(
'No IPC chans for matching peer {peer_name}\n'
)
continue
return chans
return None
@acm
async def query_actor(
name: str,
regaddr: tuple[str, int]|None = None,
arbiter_sockaddr: tuple[str, int] | None = None,
regaddr: tuple[str, int] | None = None,
) -> AsyncGenerator[
tuple[str, int]|None,
tuple[str, int] | None,
None,
]:
'''
Lookup a transport address (by actor name) via querying a registrar
listening @ `regaddr`.
Make a transport address lookup for an actor name to a specific
registrar.
Returns the transport protocol (socket) address or `None` if no
entry under that name exists.
Returns the (socket) address or ``None`` if no entry under that
name exists for the given registrar listening @ `regaddr`.
'''
actor: Actor = current_actor()
@ -157,10 +137,14 @@ async def query_actor(
'The current actor IS the registry!?'
)
maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers:
yield maybe_peers[0].raddr
return
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr
reg_portal: Portal
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
@ -175,28 +159,10 @@ async def query_actor(
yield sockaddr
@acm
async def maybe_open_portal(
addr: tuple[str, int],
name: str,
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
pass
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@acm
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int]|None = None,
registry_addrs: list[tuple[str, int]]|None = None,
only_first: bool = True,
@ -213,12 +179,29 @@ async def find_actor(
known to the arbiter.
'''
# optimization path, use any pre-existing peer channel
maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers and only_first:
async with open_portal(maybe_peers[0]) as peer_portal:
yield peer_portal
return
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
if not registry_addrs:
# XXX NOTE: make sure to dynamically read the value on
@ -234,13 +217,10 @@ async def find_actor(
maybe_portals: list[
AsyncContextManager[tuple[str, int]]
] = list(
maybe_open_portal(
addr=addr,
name=name,
)
maybe_open_portal_from_reg_addr(addr)
for addr in registry_addrs
)
portals: list[Portal]
async with gather_contexts(
mngrs=maybe_portals,
) as portals:
@ -274,31 +254,31 @@ async def find_actor(
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]:
'''
Wait on at least one peer actor to register `name` with the
registrar, yield a `Portal to the first registree.
Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
'''
actor: Actor = current_actor()
# optimization path, use any pre-existing peer channel
maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers:
async with open_portal(maybe_peers[0]) as peer_portal:
yield peer_portal
return
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
'Use `registry_addr: tuple` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addr: tuple[str, int] = arbiter_sockaddr
regaddr: tuple[str, int] = (
registry_addr
or
actor.reg_addrs[0]
)
# TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well?
reg_portal: Portal
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal:
sockaddrs = await reg_portal.run_from_ns(
'self',

View File

@ -243,7 +243,6 @@ def _trio_main(
nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective
tree_str=actor_info,
back_from_op=1,
)
)
try:

View File

@ -263,11 +263,11 @@ class Portal:
return False
reminfo: str = (
f'c)=> {self.channel.uid}\n'
f' |_{chan}\n'
f'Portal.cancel_actor() => {self.channel.uid}\n'
f'|_{chan}\n'
)
log.cancel(
f'Requesting actor-runtime cancel for peer\n\n'
f'Requesting runtime cancel for peer\n\n'
f'{reminfo}'
)

View File

@ -80,7 +80,7 @@ async def open_root_actor(
# enables the multi-process debugger support
debug_mode: bool = False,
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False,
# internal logging
@ -233,8 +233,14 @@ async def open_root_actor(
and
enable_stack_on_sig
):
from .devx._stackscope import enable_stack_on_sig
enable_stack_on_sig()
try:
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []

View File

@ -115,26 +115,25 @@ class Actor:
'''
The fundamental "runtime" concurrency primitive.
An "actor" is the combination of a regular Python process
executing a `trio.run()` task tree, communicating with other
"actors" through "memory boundary portals": `Portal`, which
provide a high-level async API around IPC "channels" (`Channel`)
which themselves encapsulate various (swappable) network
transport protocols for sending msgs between said memory domains
(processes, hosts, non-GIL threads).
An *actor* is the combination of a regular Python process executing
a ``trio`` task tree, communicating with other actors through
"memory boundary portals" - which provide a native async API around
IPC transport "channels" which themselves encapsulate various
(swappable) network protocols.
Each "actor" is `trio.run()` scheduled "runtime" composed of many
concurrent tasks in a single thread. The "runtime" tasks conduct
a slew of low(er) level functions to make it possible for message
passing between actors as well as the ability to create new
actors (aka new "runtimes" in new processes which are supervised
via an "actor-nursery" construct). Each task which sends messages
to a task in a "peer" actor (not necessarily a parent-child,
Each "actor" is ``trio.run()`` scheduled "runtime" composed of
many concurrent tasks in a single thread. The "runtime" tasks
conduct a slew of low(er) level functions to make it possible
for message passing between actors as well as the ability to
create new actors (aka new "runtimes" in new processes which
are supervised via a nursery construct). Each task which sends
messages to a task in a "peer" (not necessarily a parent-child,
depth hierarchy) is able to do so via an "address", which maps
IPC connections across memory boundaries, and a task request id
which allows for per-actor tasks to send and receive messages to
specific peer-actor tasks with which there is an ongoing RPC/IPC
dialog.
which allows for per-actor tasks to send and receive messages
to specific peer-actor tasks with which there is an ongoing
RPC/IPC dialog.
'''
# ugh, we need to get rid of this and replace with a "registry" sys
@ -231,20 +230,17 @@ class Actor:
# by the user (currently called the "arbiter")
self._spawn_method: str = spawn_method
self._peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
self._peers: defaultdict = defaultdict(list)
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
# RPC state
self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[
tuple[Channel, str], # (chan, cid)
tuple[Context, Callable, trio.Event] # (ctx=>, fn(), done?)
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = {}
# map {actor uids -> Context}
@ -321,10 +317,7 @@ class Actor:
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f'{uid!r} successfully connected back to us')
return (
event,
self._peers[uid][-1],
)
return event, self._peers[uid][-1]
def load_modules(
self,
@ -415,11 +408,26 @@ class Actor:
'''
self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream)
con_status: str = (
'New inbound IPC connection <=\n'
f'|_{chan}\n'
)
their_uid: tuple[str, str]|None = chan.uid
con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
if their_uid:
con_status = (
'IPC Re-connection from already known peer?\n'
)
else:
con_status = (
'New inbound IPC connection <=\n'
)
con_status += (
f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
)
# send/receive initial handshake response
try:
uid: tuple|None = await self._do_handshake(chan)
@ -431,10 +439,10 @@ class Actor:
TransportClosed,
):
# XXX: This may propagate up from `Channel._aiter_recv()`
# and `MsgpackStream._inter_packets()` on a read from the
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.runtime(
@ -444,22 +452,9 @@ class Actor:
)
return
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += (
f' -> Handshake with {familiar} `{uid_short}` complete\n'
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
)
if _pre_chan:
log.warning(
# con_status += (
# ^TODO^ swap once we minimize conn duplication
f' -> Wait, we already have IPC with `{uid_short}`??\n'
f' |_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children:
# - if this is a new channel to a locally spawned
# sub-actor there will be a spawn wait even registered
@ -512,9 +507,8 @@ class Actor:
)
except trio.Cancelled:
log.cancel(
'IPC transport msg loop was cancelled\n'
f'c)>\n'
f' |_{chan}\n'
'IPC transport msg loop was cancelled for \n'
f'|_{chan}\n'
)
raise
@ -551,9 +545,9 @@ class Actor:
):
log.cancel(
'Waiting on cancel request to peer..\n'
'Waiting on cancel request to peer\n'
f'c)=>\n'
f' |_{chan.uid}\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
@ -652,14 +646,10 @@ class Actor:
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
f'{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
report += f' |_{pformat(children)}\n'
log.warning(report)
@ -1246,9 +1236,8 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual..
msg: str = (
f'Actor-runtime cancel request from {requester_type}\n\n'
f'<=c) {requesting_uid}\n'
f' |_{self}\n'
f'Runtime cancel request from {requester_type}:\n\n'
f'<= .cancel(): {requesting_uid}\n\n'
)
# TODO: what happens here when we self-cancel tho?
@ -1358,7 +1347,7 @@ class Actor:
log.cancel(
'Rxed cancel request for RPC task\n'
f'<=c) {requesting_uid}\n'
f' |_{ctx._task}\n'
f' |_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
# f'=> {ctx._task}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n'
@ -1476,17 +1465,17 @@ class Actor:
"IPC channel's "
)
rent_chan_repr: str = (
f' |_{parent_chan}\n\n'
f' |_{parent_chan}\n\n'
if parent_chan
else ''
)
log.cancel(
f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_uid} [canceller]\n'
f'<= canceller: {req_uid}\n'
f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n'
f' |_{self} [with {len(tasks)} tasks]\n'
# f' |_tasks: {len(tasks)}\n'
f'=> cancellee: {self.uid}\n'
f' |_{self}.cancel_rpc_tasks()\n'
f' |_tasks: {len(tasks)}\n'
# f'{tasks_str}'
)
for (
@ -1555,7 +1544,7 @@ class Actor:
def accept_addr(self) -> tuple[str, int]:
'''
Primary address to which the IPC transport server is
bound and listening for new connections.
bound.
'''
# throws OSError on failure
@ -1572,7 +1561,6 @@ class Actor:
def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
'''
Return all IPC channels to the actor with provided `uid`.
@ -1944,15 +1932,9 @@ async def async_main(
with CancelScope(shield=True):
await actor._no_more_peers.wait()
teardown_report += (
'-> All peer channels are complete\n'
)
teardown_report += ('-> All peer channels are complete\n')
teardown_report += (
'Actor runtime exiting\n'
f'>)\n'
f'|_{actor}\n'
)
teardown_report += ('Actor runtime exited')
log.info(teardown_report)

View File

@ -54,25 +54,6 @@ def examples_dir() -> pathlib.Path:
return repodir() / 'examples'
def mk_cmd(
ex_name: str,
exs_subpath: str = 'debugging',
) -> str:
'''
Generate a shell command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = (
examples_dir()
/ exs_subpath
/ f'{ex_name}.py'
)
return ' '.join([
'python',
str(script_path)
])
@acm
async def expect_ctxc(
yay: bool,

View File

@ -26,7 +26,7 @@ from ._debug import (
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
sigint_shield as sigint_shield,
shield_sigint_handler as shield_sigint_handler,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,

View File

@ -409,9 +409,9 @@ class Lock:
repl_task
)
message += (
f'A non-caller task still owns this lock on behalf of '
f'`{behalf_of_task}`\n'
f'lock owner task: {lock_stats.owner}\n'
f'\nA non-caller task still owns this lock on behalf of '
f'{behalf_of_task}\n'
f'|_{lock_stats.owner}\n'
)
if (
@ -523,10 +523,6 @@ class Lock:
)
def get_lock() -> Lock:
return Lock
@tractor.context(
# enable the locking msgspec
pld_spec=__pld_spec__,
@ -792,13 +788,13 @@ class DebugStatus:
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
signal.signal,
signal.SIGINT,
sigint_shield,
shield_sigint_handler,
)
else:
cls._orig_sigint_handler = signal.signal(
signal.SIGINT,
sigint_shield,
shield_sigint_handler,
)
@classmethod
@ -904,30 +900,12 @@ class DebugStatus:
# actor-local state, irrelevant for non-root.
cls.repl_task = None
# XXX WARNING needs very special caughtion, and we should
# prolly make a more explicit `@property` API?
#
# - if unset in root multi-threaded case can cause
# issues with detecting that some root thread is
# using a REPL,
#
# - what benefit is there to unsetting, it's always
# set again for the next task in some actor..
# only thing would be to avoid in the sigint-handler
# logging when we don't need to?
cls.repl = None
# restore original sigint handler
cls.unshield_sigint()
# TODO: use the new `@lowlevel.singleton` for this!
def get_debug_req() -> DebugStatus|None:
return DebugStatus
class TractorConfig(pdbp.DefaultConfig):
'''
Custom `pdbp` config which tries to use the best tradeoff
@ -1333,7 +1311,7 @@ def any_connected_locker_child() -> bool:
return False
def sigint_shield(
def shield_sigint_handler(
signum: int,
frame: 'frame', # type: ignore # noqa
*args,
@ -1373,17 +1351,13 @@ def sigint_shield(
# root actor branch that reports whether or not a child
# has locked debugger.
if is_root_process():
# log.warning(
log.devx(
'Handling SIGINT in root actor\n'
f'{Lock.repr()}'
f'{DebugStatus.repr()}\n'
)
# try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally.
any_connected: bool = any_connected_locker_child()
# if not any_connected:
# return do_cancel()
problem = (
f'root {actor.uid} handling SIGINT\n'
@ -1432,25 +1406,19 @@ def sigint_shield(
# an actor using the `Lock` (a bug state) ??
# => so immediately cancel any stale lock cs and revert
# the handler!
if not DebugStatus.repl:
if not repl:
# TODO: WHEN should we revert back to ``trio``
# handler if this one is stale?
# -[ ] maybe after a counts work of ctl-c mashes?
# -[ ] use a state var like `stale_handler: bool`?
problem += (
'\n'
'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n'
'BUT, the root should be using it, WHY this handler ??\n\n'
'So either..\n'
'- some root-thread is using it but has no `.repl` set?, OR\n'
'- something else weird is going on outside the runtime!?\n'
'BUT, the root should be using it, WHY this handler ??\n'
)
else:
# NOTE: since we emit this msg on ctl-c, we should
# also always re-print the prompt the tail block!
log.pdb(
'Ignoring SIGINT while pdb REPL in use by root actor..\n'
f'{DebugStatus.repl_task}\n'
f' |_{repl}\n'
)
problem = None
@ -1500,6 +1468,7 @@ def sigint_shield(
'Allowing SIGINT propagation..'
)
DebugStatus.unshield_sigint()
# do_cancel()
repl_task: str|None = DebugStatus.repl_task
req_task: str|None = DebugStatus.req_task
@ -1514,15 +1483,10 @@ def sigint_shield(
f' |_{repl}\n'
)
elif req_task:
log.debug(
'Ignoring SIGINT while debug request task is open but either,\n'
'- someone else is already REPL-in and has the `Lock`, or\n'
'- some other local task already is replin?\n'
log.pdb(
f'Ignoring SIGINT while debug request task is open\n'
f'|_{req_task}\n'
)
# TODO can we remove this now?
# -[ ] does this path ever get hit any more?
else:
msg: str = (
'SIGINT shield handler still active BUT, \n\n'
@ -1558,53 +1522,37 @@ def sigint_shield(
# https://github.com/goodboy/tractor/issues/320
# elif debug_mode():
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it looks to be that the last command that was run (eg. ll)
# will be repeated by default.
# maybe redraw/print last REPL output to console since
# we want to alert the user that more input is expect since
# nothing has been done dur to ignoring sigint.
if (
DebugStatus.repl # only when current actor has a REPL engaged
repl # only when current actor has a REPL engaged
):
flush_status: str = (
'Flushing stdout to ensure new prompt line!\n'
)
# XXX: yah, mega hack, but how else do we catch this madness XD
if (
repl.shname == 'xonsh'
):
flush_status += (
'-> ALSO re-flushing due to `xonsh`..\n'
)
if repl.shname == 'xonsh':
repl.stdout.write(repl.prompt)
# log.warning(
log.devx(
flush_status
)
repl.stdout.flush()
# TODO: better console UX to match the current "mode":
# -[ ] for example if in sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
# TODO: make this work like sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
# repl.sticky = True
# repl._print_if_sticky()
# also see these links for an approach from `ptk`:
# also see these links for an approach from ``ptk``:
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
else:
log.devx(
# log.warning(
'Not flushing stdout since not needed?\n'
f'|_{repl}\n'
)
# XXX only for tracing this handler
log.devx('exiting SIGINT')
_pause_msg: str = 'Opening a pdb REPL in paused actor'
_pause_msg: str = 'Attaching to pdb REPL in actor'
class DebugRequestError(RuntimeError):
@ -1669,7 +1617,7 @@ async def _pause(
# 'directly (infected) `asyncio` tasks!'
# ) from rte
raise rte
raise
if debug_func is not None:
debug_func = partial(debug_func)
@ -1677,13 +1625,9 @@ async def _pause(
# XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug
# request from a subactor BEFORE the REPL is entered by that
# process.
if (
not repl
and
debug_func
):
repl: PdbREPL = mk_pdb()
if not repl:
DebugStatus.shield_sigint()
repl: PdbREPL = repl or mk_pdb()
# TODO: move this into a `open_debug_request()` @acm?
# -[ ] prolly makes the most sense to do the request
@ -1718,13 +1662,7 @@ async def _pause(
# recurrent entries/requests from the same
# actor-local task.
DebugStatus.repl_task = task
if repl:
DebugStatus.repl = repl
else:
log.error(
'No REPl instance set before entering `debug_func`?\n'
f'{debug_func}\n'
)
DebugStatus.repl = repl
# invoke the low-level REPL activation routine which itself
# should call into a `Pdb.set_trace()` of some sort.
@ -2063,7 +2001,7 @@ async def _pause(
DebugStatus.release(cancel_req_task=True)
# sanity checks for ^ on request/status teardown
# assert DebugStatus.repl is None # XXX no more bc bg thread cases?
assert DebugStatus.repl is None
assert DebugStatus.repl_task is None
# sanity, for when hackin on all this?
@ -2112,8 +2050,9 @@ def _set_trace(
# root here? Bo
log.pdb(
f'{_pause_msg}\n'
# '|\n'
f'>(\n'
f'|_ {task} @ {actor.uid}\n'
f' |_ {task} @ {actor.uid}\n'
# ^-TODO-^ more compact pformating?
# -[ ] make an `Actor.__repr()__`
# -[ ] should we use `log.pformat_task_uid()`?
@ -2302,12 +2241,7 @@ async def _pause_from_bg_root_thread(
'Trying to acquire `Lock` on behalf of bg thread\n'
f'|_{behalf_of_thread}\n'
)
# NOTE: this is already a task inside the main-`trio`-thread, so
# we don't need to worry about calling it another time from the
# bg thread on which who's behalf this task is operating.
DebugStatus.shield_sigint()
# DebugStatus.repl_task = behalf_of_thread
out = await _pause(
debug_func=None,
repl=repl,
@ -2316,8 +2250,6 @@ async def _pause_from_bg_root_thread(
called_from_bg_thread=True,
**_pause_kwargs
)
DebugStatus.repl_task = behalf_of_thread
lock: trio.FIFOLock = Lock._debug_lock
stats: trio.LockStatistics= lock.statistics()
assert stats.owner is task
@ -2351,6 +2283,7 @@ async def _pause_from_bg_root_thread(
f'|_{behalf_of_thread}\n'
)
task_status.started(out)
DebugStatus.shield_sigint()
# wait for bg thread to exit REPL sesh.
try:
@ -2391,7 +2324,7 @@ def pause_from_sync(
err_on_no_runtime=False,
)
message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
f'{actor.uid} task called `tractor.pause_from_sync()`\n\n'
)
if not actor:
raise RuntimeError(
@ -2415,6 +2348,7 @@ def pause_from_sync(
'for infected `asyncio` mode!'
)
DebugStatus.shield_sigint()
repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n'
@ -2432,10 +2366,6 @@ def pause_from_sync(
# thread which will call `._pause()` manually with special
# handling for root-actor caller usage.
if not DebugStatus.is_main_trio_thread():
# TODO: `threading.Lock()` this so we don't get races in
# multi-thr cases where they're acquiring/releasing the
# REPL and setting request/`Lock` state, etc..
thread: threading.Thread = threading.current_thread()
repl_owner = thread
@ -2443,16 +2373,9 @@ def pause_from_sync(
if is_root:
message += (
f'-> called from a root-actor bg {thread}\n'
f'-> scheduling `._pause_from_bg_root_thread()`..\n'
f'-> scheduling `._pause_from_sync_thread()`..\n'
)
# XXX SUBTLE BADNESS XXX that should really change!
# don't over-write the `repl` here since when
# this behalf-of-bg_thread-task calls pause it will
# pass `debug_func=None` which will result in it
# returing a `repl==None` output and that get's also
# `.started(out)` back here! So instead just ignore
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
bg_task, repl = trio.from_thread.run(
afn=partial(
actor._service_n.start,
partial(
@ -2464,9 +2387,8 @@ def pause_from_sync(
),
)
)
DebugStatus.shield_sigint()
message += (
f'-> `._pause_from_bg_root_thread()` started bg task {bg_task}\n'
f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n'
)
else:
message += f'-> called from a bg {thread}\n'
@ -2475,7 +2397,7 @@ def pause_from_sync(
# `request_root_stdio_lock()` and we don't need to
# worry about all the special considerations as with
# the root-actor per above.
bg_task, _ = trio.from_thread.run(
bg_task, repl = trio.from_thread.run(
afn=partial(
_pause,
debug_func=None,
@ -2490,9 +2412,6 @@ def pause_from_sync(
**_pause_kwargs
),
)
# ?TODO? XXX where do we NEED to call this in the
# subactor-bg-thread case?
DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task
else: # we are presumably the `trio.run()` + main thread
@ -2505,11 +2424,6 @@ def pause_from_sync(
# greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n'
# NOTE XXX seems to need to be set BEFORE the `_pause()`
# invoke using gb below?
DebugStatus.shield_sigint()
repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try:
@ -2535,12 +2449,9 @@ def pause_from_sync(
raise
if out:
bg_task, _ = out
else:
bg_task: Task = current_task()
# assert repl is repl
assert bg_task is repl_owner
bg_task, repl = out
assert repl is repl
assert bg_task is repl_owner
# NOTE: normally set inside `_enter_repl_sync()`
DebugStatus.repl_task: str = repl_owner
@ -2554,10 +2465,7 @@ def pause_from_sync(
)
log.devx(message)
# NOTE set as late as possible to avoid state clobbering
# in the multi-threaded case!
DebugStatus.repl = repl
_set_trace(
api_frame=api_frame or inspect.currentframe(),
repl=repl,
@ -2615,7 +2523,7 @@ async def breakpoint(
_crash_msg: str = (
'Opening a pdb REPL in crashed actor'
'Attaching to pdb REPL in crashed actor'
)
@ -2643,9 +2551,11 @@ def _post_mortem(
# here! Bo
log.pdb(
f'{_crash_msg}\n'
# '|\n'
f'x>(\n'
f' |_ {current_task()} @ {actor.uid}\n'
f' |_ {current_task()} @ {actor.uid}\n'
# f'|_ @{actor.uid}\n'
# TODO: make an `Actor.__repr()__`
# f'|_ {current_task()} @ {actor.name}\n'
)
@ -2758,8 +2668,7 @@ async def acquire_debug_lock(
tuple,
]:
'''
Request to acquire the TTY `Lock` in the root actor, release on
exit.
Request to acquire the TTY `Lock` in the root actor, release on exit.
This helper is for actor's who don't actually need to acquired
the debugger but want to wait until the lock is free in the
@ -2771,14 +2680,10 @@ async def acquire_debug_lock(
yield None
return
task: Task = current_task()
async with trio.open_nursery() as n:
ctx: Context = await n.start(
partial(
request_root_stdio_lock,
actor_uid=subactor_uid,
task_uid=(task.name, id(task)),
)
request_root_stdio_lock,
subactor_uid,
)
yield ctx
ctx.cancel()

View File

@ -24,24 +24,13 @@ disjoint, parallel executing tasks in separate actors.
'''
from __future__ import annotations
# from functools import partial
from threading import (
current_thread,
Thread,
RLock,
)
import multiprocessing as mp
from signal import (
signal,
getsignal,
SIGUSR1,
)
# import traceback
from types import ModuleType
from typing import (
Callable,
TYPE_CHECKING,
)
import traceback
from typing import TYPE_CHECKING
import trio
from tractor import (
@ -62,45 +51,26 @@ if TYPE_CHECKING:
@trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None:
'''
Do a classic `stackscope.extract()` task-tree dump to console at
`.devx()` level.
'''
import stackscope
from tractor.log import get_console_log
tree_str: str = str(
stackscope.extract(
trio.lowlevel.current_root_task(),
recurse_child_tasks=True
)
)
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor()
thr: Thread = current_thread()
log.devx(
f'Dumping `stackscope` tree for actor\n'
f'{actor.uid}:\n'
f'|_{mp.current_process()}\n'
f' |_{thr}\n'
f' |_{actor}\n\n'
# start-of-trace-tree delimiter (mostly for testing)
'------ - ------\n'
'\n'
+
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
f'{tree_str}\n'
+
# end-of-trace-tree delimiter (mostly for testing)
f'\n'
f'------ {actor.uid!r} ------\n'
)
# TODO: can remove this right?
# -[ ] was original code from author
#
# print(
# 'DUMPING FROM PRINT\n'
# +
# content
# )
# import logging
# try:
# with open("/dev/tty", "w") as tty:
@ -110,130 +80,58 @@ def dump_task_tree() -> None:
# "task_tree"
# ).exception("Error printing task tree")
_handler_lock = RLock()
_tree_dumped: bool = False
def dump_tree_on_sig(
def signal_handler(
sig: int,
frame: object,
relay_to_subs: bool = True,
) -> None:
global _tree_dumped, _handler_lock
with _handler_lock:
if _tree_dumped:
log.warning(
'Already dumped for this actor...??'
)
return
_tree_dumped = True
# actor: Actor = _state.current_actor()
log.devx(
'Trying to dump `stackscope` tree..\n'
)
try:
dump_task_tree()
# await actor._service_n.start_soon(
# partial(
# trio.to_thread.run_sync,
# dump_task_tree,
# )
# )
# trio.lowlevel.current_trio_token().run_sync_soon(
# dump_task_tree
# )
except RuntimeError:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
# not in async context -- print a normal traceback
# traceback.print_stack()
raise
except BaseException:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
raise
log.devx(
'Supposedly we dumped just fine..?'
)
try:
trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree)
except RuntimeError:
# not in async context -- print a normal traceback
traceback.print_stack()
if not relay_to_subs:
return
an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.warning(
log.devx(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'
)
# bc of course stdlib can't have a std API.. XD
match subproc:
case trio.Process():
subproc.send_signal(sig)
if isinstance(subproc, trio.Process):
subproc.send_signal(sig)
case mp.Process():
subproc._send_signal(sig)
elif isinstance(subproc, mp.Process):
subproc._send_signal(sig)
def enable_stack_on_sig(
sig: int = SIGUSR1,
) -> ModuleType:
sig: int = SIGUSR1
) -> None:
'''
Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1.
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
fancy cmds.
For ex. from `bash` using `pgrep` and cmd-sustitution
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use:
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
Or with with `xonsh` (which has diff capture-from-subproc syntax)
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
'''
try:
import stackscope
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
return None
handler: Callable|int = getsignal(sig)
if handler is dump_tree_on_sig:
log.devx(
'A `SIGUSR1` handler already exists?\n'
f'|_ {handler!r}\n'
)
return
signal(
sig,
dump_tree_on_sig,
signal_handler,
)
log.devx(
'Enabling trace-trees on `SIGUSR1` '
'since `stackscope` is installed @ \n'
f'{stackscope!r}\n\n'
f'With `SIGUSR1` handler\n'
f'|_{dump_tree_on_sig}\n'
)
return stackscope
# NOTE: not the above can be triggered from
# a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
#
# for example if you were looking to trace a `pytest` run
# kill -SIGUSR1 @$(pgrep -f 'pytest')

View File

@ -374,7 +374,7 @@ class PldRx(Struct):
case _:
src_err = InternalError(
'Invalid IPC msg ??\n\n'
'Unknown IPC msg ??\n\n'
f'{msg}\n'
)
@ -499,7 +499,7 @@ async def maybe_limit_plds(
yield None
return
# sanity check on IPC scoping
# sanity on scoping
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
@ -510,8 +510,6 @@ async def maybe_limit_plds(
) as msgdec:
yield msgdec
# when the applied spec is unwound/removed, the same IPC-ctx
# should still be in scope.
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
@ -527,26 +525,16 @@ async def drain_to_final_msg(
list[MsgType]
]:
'''
Drain IPC msgs delivered to the underlying IPC context's
rx-mem-chan (i.e. from `Context._rx_chan`) in search for a final
`Return` or `Error` msg.
Drain IPC msgs delivered to the underlying IPC primitive's
rx-mem-chan (eg. `Context._rx_chan`) from the runtime in
search for a final result or error.
Deliver the `Return` + preceding drained msgs (`list[MsgType]`)
as a pair unless an `Error` is found, in which unpack and raise
it.
The motivation here is to always capture any remote error relayed
by the remote peer task during a ctxc condition.
For eg. a ctxc-request may be sent to the peer as part of the
local task's (request for) cancellation but then that same task
**also errors** before executing the teardown in the
`Portal.open_context().__aexit__()` block. In such error-on-exit
cases we want to always capture and raise any delivered remote
error (like an expected ctxc-ACK) as part of the final
`ctx.wait_for_result()` teardown sequence such that the
`Context.outcome` related state always reflect what transpired
even after ctx closure and the `.open_context()` block exit.
The motivation here is to ideally capture errors during ctxc
conditions where a canc-request/or local error is sent but the
local task also excepts and enters the
`Portal.open_context().__aexit__()` block wherein we prefer to
capture and raise any remote error or ctxc-ack as part of the
`ctx.result()` cleanup and teardown sequence.
'''
__tracebackhide__: bool = hide_tb
@ -584,6 +572,7 @@ async def drain_to_final_msg(
# |_from tractor.devx._debug import pause
# await pause()
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
@ -591,13 +580,13 @@ async def drain_to_final_msg(
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled as taskc:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
ctx.maybe_raise(
hide_tb=hide_tb,
# TODO: when use this?
# TODO: when use this/
# from_src_exc=taskc,
)
@ -670,7 +659,7 @@ async def drain_to_final_msg(
# Stop()
case Stop():
pre_result_drained.append(msg)
log.runtime( # normal/expected shutdown transaction
log.cancel(
'Remote stream terminated due to "stop" msg:\n\n'
f'{pretty_struct.pformat(msg)}\n'
)
@ -730,19 +719,13 @@ async def drain_to_final_msg(
pre_result_drained.append(msg)
# It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here!
report: str = (
f'Invalid or unknown msg type {type(msg)!r}!?\n'
)
if not msg.cid:
report += (
'\nWhich also has no `.cid` field?\n'
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
raise MessagingError(
report
+
f'\n{msg}\n'
)
raise RuntimeError('Unknown msg type: {msg}')
else:
log.cancel(