Compare commits
8 Commits
fc5e80fea5
...
fc2e298a29
| Author | SHA1 | Date |
|---|---|---|
|
|
fc2e298a29 | |
|
|
48523358cf | |
|
|
e2b790a70d | |
|
|
61d4525137 | |
|
|
0996a83655 | |
|
|
1cdc7fb302 | |
|
|
486249d74f | |
|
|
8bc304f094 |
|
|
@ -49,9 +49,11 @@ async def main(
|
|||
tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
enable_stack_on_sig=True,
|
||||
# maybe_enable_greenback=False,
|
||||
loglevel='devx',
|
||||
loglevel='devx', # XXX REQUIRED log level!
|
||||
enable_transports=[tpt],
|
||||
# maybe_enable_greenback=True,
|
||||
# ^TODO? maybe a "smarter" way todo all this is how
|
||||
# `modden` does with a rtv serialized through the osenv?
|
||||
) as an,
|
||||
):
|
||||
ptl: tractor.Portal = await an.start_actor(
|
||||
|
|
@ -63,7 +65,9 @@ async def main(
|
|||
start_n_shield_hang,
|
||||
) as (ctx, cpid):
|
||||
|
||||
_, proc, _ = an._children[ptl.chan.uid]
|
||||
_, proc, _ = an._children[
|
||||
ptl.chan.aid.uid
|
||||
]
|
||||
assert cpid == proc.pid
|
||||
|
||||
print(
|
||||
|
|
|
|||
|
|
@ -1,9 +1,22 @@
|
|||
from functools import partial
|
||||
import os
|
||||
import time
|
||||
|
||||
# ?TODO? how to make `pdbp` enforce this?
|
||||
# os.environ['PYTHON_COLORS'] = '0'
|
||||
# os.environ['NO_COLOR'] = '1'
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
# disable `pbdp` prompt colors
|
||||
# for prompt matching in test.
|
||||
def disable_pdbp_color():
|
||||
if os.environ['PYTHON_COLORS'] == '0':
|
||||
from tractor.devx.debug import _repl
|
||||
_repl.TractorConfig.use_pygments = False
|
||||
|
||||
|
||||
# TODO: only import these when not running from test harness?
|
||||
# can we detect `pexpect` usage maybe?
|
||||
# from tractor.devx.debug import (
|
||||
|
|
@ -42,6 +55,7 @@ async def start_n_sync_pause(
|
|||
ctx: tractor.Context,
|
||||
):
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
disable_pdbp_color()
|
||||
|
||||
# sync to parent-side task
|
||||
await ctx.started()
|
||||
|
|
@ -52,13 +66,15 @@ async def start_n_sync_pause(
|
|||
|
||||
|
||||
async def main() -> None:
|
||||
disable_pdbp_color()
|
||||
async with (
|
||||
tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
maybe_enable_greenback=True,
|
||||
enable_stack_on_sig=True,
|
||||
# loglevel='warning',
|
||||
# loglevel='devx',
|
||||
|
||||
# XXX flags required for test pattern matching.
|
||||
loglevel='pdb',
|
||||
# enable_stack_on_sig=True,
|
||||
) as an,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
|
|
@ -68,8 +84,8 @@ async def main() -> None:
|
|||
p: tractor.Portal = await an.start_actor(
|
||||
'subactor',
|
||||
enable_modules=[__name__],
|
||||
# infect_asyncio=True,
|
||||
debug_mode=True,
|
||||
# infect_asyncio=True,
|
||||
)
|
||||
|
||||
# TODO: 3 sub-actor usage cases:
|
||||
|
|
|
|||
|
|
@ -240,38 +240,27 @@ testpaths = [
|
|||
addopts = [
|
||||
# TODO: figure out why this isn't working..
|
||||
'--rootdir=./tests',
|
||||
|
||||
'--import-mode=importlib',
|
||||
# don't show frickin captured logs AGAIN in the report..
|
||||
'--show-capture=no',
|
||||
|
||||
# sys-level capture. REQUIRED for fork-based spawn
|
||||
# backends (e.g. `main_thread_forkserver`): default
|
||||
# `--capture=fd` redirects fd 1,2 to temp files, and fork
|
||||
# children inherit those fds — opaque deadlocks happen in
|
||||
# the pytest-capture-machinery ↔ fork-child stdio
|
||||
# interaction. `--capture=sys` only redirects Python-level
|
||||
# `sys.stdout`/`sys.stderr`, leaving fd 1,2 alone.
|
||||
#
|
||||
# Trade-off (vs. `--capture=fd`):
|
||||
# - LOST: per-test attribution of subactor *raw-fd* output
|
||||
# (C-ext writes, `os.write(2, ...)`, subproc stdout). Not
|
||||
# zero — those go to the terminal, captured by CI's
|
||||
# terminal-level capture, just not per-test-scoped in the
|
||||
# pytest failure report.
|
||||
# - KEPT: Python-level `print()` + `logging` capture per-
|
||||
# test (tractor's logger uses `sys.stderr`, so tractor
|
||||
# log output IS still attributed per-test).
|
||||
# - KEPT: user `pytest -s` for debugging (unaffected).
|
||||
#
|
||||
# Full post-mortem in
|
||||
# `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
|
||||
'--capture=sys',
|
||||
# load builtin plugin since we need a boostrapping hook,
|
||||
# `pytest_load_initial_conftests()` for `--capture=` per:
|
||||
# https://docs.pytest.org/en/stable/reference/reference.html#bootstrapping-hooks
|
||||
'-p tractor._testing.pytest',
|
||||
|
||||
# disable `xonsh` plugin
|
||||
# https://docs.pytest.org/en/stable/how-to/plugins.html#disabling-plugins-from-autoloading
|
||||
# https://docs.pytest.org/en/stable/how-to/plugins.html#deactivating-unregistering-a-plugin-by-name
|
||||
'-p no:xonsh'
|
||||
'-p no:xonsh',
|
||||
|
||||
# XXX default on non-forking spawners
|
||||
'--capture=fd',
|
||||
# '--capture=sys',
|
||||
# ^XXX NOTE^ ALWAYS SET THIS for `*_forkserver` spawner
|
||||
# backends! see details @
|
||||
# `tractor._testing.pytest.pytest_load_initial_conftests()`
|
||||
|
||||
]
|
||||
log_cli = false
|
||||
# TODO: maybe some of these layout choices?
|
||||
|
|
|
|||
|
|
@ -23,6 +23,14 @@ Two cleanup phases (run in order when both are enabled):
|
|||
hard-crashing actor leaves leaked segments that
|
||||
nothing else GCs.
|
||||
|
||||
3. **UDS sweep** (`--uds` / `--uds-only`) — unlinks
|
||||
`${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock` files
|
||||
whose binder pid is dead (or the `1616` registry
|
||||
sentinel). Needed because the IPC server's
|
||||
`os.unlink()` cleanup lives in a `finally:` block
|
||||
that doesn't always run on hard exits (SIGKILL,
|
||||
escaped `KeyboardInterrupt`, etc.) — see issue #452.
|
||||
|
||||
Process-reap detection modes (auto-selected):
|
||||
|
||||
--parent <pid> : descendant-mode — kill procs whose
|
||||
|
|
@ -50,12 +58,18 @@ Usage:
|
|||
# only the shm sweep, skip process reap
|
||||
scripts/tractor-reap --shm-only
|
||||
|
||||
# process reap + shm + UDS sweep (the works)
|
||||
scripts/tractor-reap --shm --uds
|
||||
|
||||
# only UDS sweep
|
||||
scripts/tractor-reap --uds-only
|
||||
|
||||
# from inside a still-live supervisor
|
||||
scripts/tractor-reap --parent 12345
|
||||
|
||||
# dry-run: list what would be reaped, don't act
|
||||
scripts/tractor-reap -n
|
||||
scripts/tractor-reap --shm -n
|
||||
scripts/tractor-reap --shm --uds -n
|
||||
|
||||
'''
|
||||
import argparse
|
||||
|
|
@ -118,7 +132,28 @@ def main() -> int:
|
|||
action='store_true',
|
||||
help='skip process reap; only do the shm sweep',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--uds',
|
||||
action='store_true',
|
||||
help=(
|
||||
'after process reap, also unlink orphaned '
|
||||
'${XDG_RUNTIME_DIR}/tractor/*.sock files '
|
||||
'whose binder pid is dead (or the 1616 '
|
||||
'registry sentinel). See issue #452.'
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--uds-only',
|
||||
action='store_true',
|
||||
help='skip process reap + shm; only do the UDS sweep',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
# any *-only flag also skips the process reap phase
|
||||
skip_proc_reap: bool = (
|
||||
args.shm_only
|
||||
or
|
||||
args.uds_only
|
||||
)
|
||||
|
||||
# import lazily so `--help` doesn't require the tractor
|
||||
# package to be importable (e.g. when running from a
|
||||
|
|
@ -129,14 +164,16 @@ def main() -> int:
|
|||
find_descendants,
|
||||
find_orphans,
|
||||
find_orphaned_shm,
|
||||
find_orphaned_uds,
|
||||
reap,
|
||||
reap_shm,
|
||||
reap_uds,
|
||||
)
|
||||
|
||||
rc: int = 0
|
||||
|
||||
# --- phase 1: process reap (skipped under --shm-only) ---
|
||||
if not args.shm_only:
|
||||
# --- phase 1: process reap (skipped under --*-only) ---
|
||||
if not skip_proc_reap:
|
||||
if args.parent is not None:
|
||||
pids: list[int] = find_descendants(args.parent)
|
||||
mode: str = f'descendants of PPid={args.parent}'
|
||||
|
|
@ -173,6 +210,24 @@ def main() -> int:
|
|||
if errors:
|
||||
rc = 1
|
||||
|
||||
# --- phase 3: UDS sweep (opt-in) ---
|
||||
if args.uds or args.uds_only:
|
||||
leaked_uds: list[str] = find_orphaned_uds()
|
||||
if not leaked_uds:
|
||||
print(
|
||||
'[tractor-reap] no orphaned UDS sock-files '
|
||||
'to sweep'
|
||||
)
|
||||
elif args.dry_run:
|
||||
print(
|
||||
f'[tractor-reap] dry-run — {len(leaked_uds)} '
|
||||
f'orphaned UDS sock-file(s):\n {leaked_uds}'
|
||||
)
|
||||
else:
|
||||
_, errors = reap_uds(leaked_uds)
|
||||
if errors:
|
||||
rc = 1
|
||||
|
||||
# exit 0 if everything cleaned cleanly, else 1 — useful
|
||||
# for CI health-check chaining.
|
||||
return rc
|
||||
|
|
|
|||
|
|
@ -22,7 +22,8 @@ from tractor._testing import (
|
|||
|
||||
pytest_plugins: list[str] = [
|
||||
'pytester',
|
||||
'tractor._testing.pytest',
|
||||
# NOTE, now loaded in `pytest-ini` section of `pyproject.toml`
|
||||
# 'tractor._testing.pytest',
|
||||
]
|
||||
|
||||
_ci_env: bool = os.environ.get('CI', False)
|
||||
|
|
|
|||
|
|
@ -95,8 +95,12 @@ def spawn(
|
|||
os.environ['PYTHON_COLORS'] = '0'
|
||||
# disable all ANSI color output
|
||||
# os.environ['NO_COLOR'] = '1'
|
||||
# ?TODO, doesn't seem to disable prompt color
|
||||
# for `pdbp`?
|
||||
|
||||
def set_spawn_method():
|
||||
def set_spawn_method(
|
||||
start_method: str,
|
||||
):
|
||||
'''
|
||||
Drive the actor-spawn backend inside the spawned
|
||||
`examples/debugging/<script>.py` subproc via env-var
|
||||
|
|
@ -106,7 +110,9 @@ def spawn(
|
|||
'''
|
||||
os.environ['TRACTOR_SPAWN_METHOD'] = start_method
|
||||
|
||||
def set_loglevel():
|
||||
def set_loglevel(
|
||||
loglevel: str|None,
|
||||
):
|
||||
'''
|
||||
Forward the test-suite parametrized `loglevel` into the
|
||||
spawned `examples/debugging/<script>.py` subproc via
|
||||
|
|
@ -125,12 +131,24 @@ def spawn(
|
|||
def _spawn(
|
||||
cmd: str,
|
||||
expect_timeout: float = 4,
|
||||
start_method: str = start_method,
|
||||
loglevel: str|None = None,
|
||||
**mkcmd_kwargs,
|
||||
) -> pty_spawn.spawn:
|
||||
'''
|
||||
Inner closure handed to consumer tests to invoke
|
||||
`pytest.Pytester.spawn`
|
||||
|
||||
'''
|
||||
nonlocal spawned
|
||||
unset_colors()
|
||||
set_spawn_method()
|
||||
set_loglevel()
|
||||
set_spawn_method(start_method=start_method)
|
||||
set_loglevel(
|
||||
loglevel=loglevel,
|
||||
# ?TODO^ when should this be set by `--ll <level>` ?
|
||||
# by default we apply 'error' but there should be a diff
|
||||
# vs. when the flag IS NOT passed?
|
||||
)
|
||||
spawned = testdir.spawn(
|
||||
cmd=mk_cmd(
|
||||
cmd,
|
||||
|
|
@ -322,10 +340,13 @@ def in_prompt_msg(
|
|||
def assert_before(
|
||||
child: SpawnBase,
|
||||
patts: list[str],
|
||||
|
||||
**kwargs,
|
||||
) -> str:
|
||||
'''
|
||||
Assert a patter is in `child.before.decode() -> str`,
|
||||
return the full `.before` output on success.
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
assert in_prompt_msg(
|
||||
|
|
|
|||
|
|
@ -66,19 +66,28 @@ def test_pause_from_sync(
|
|||
# XXX required for `breakpoint()` overload and
|
||||
# thus`tractor.devx.pause_from_sync()`.
|
||||
pytest.importorskip('greenback')
|
||||
child = spawn('sync_bp')
|
||||
child = spawn(
|
||||
'sync_bp',
|
||||
loglevel='pdb', # XXX pattern matching
|
||||
)
|
||||
|
||||
# first `sync_pause()` after nurseries open
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
_before: str = assert_before(
|
||||
child,
|
||||
[
|
||||
# pre-prompt line
|
||||
_pause_msg,
|
||||
"<Task '__main__.main'",
|
||||
# devx-loglevel
|
||||
# "imported <module 'greenback' from",
|
||||
# "successfully scheduled `._pause()` in `trio` thread on behalf of <Task",
|
||||
|
||||
_pause_msg, # pre-prompt line
|
||||
"('root'",
|
||||
"<Task '__main__.main'",
|
||||
"tractor.pause_from_sync()",
|
||||
]
|
||||
)
|
||||
# XXX `enable_stack_on_sig=False` in script
|
||||
assert 'stackscope' not in _before
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
# ^NOTE^ subactor not spawned yet; don't need extra delay.
|
||||
|
|
@ -88,18 +97,18 @@ def test_pause_from_sync(
|
|||
# first `await tractor.pause()` inside `p.open_context()` body
|
||||
child.expect(PROMPT)
|
||||
|
||||
# XXX shouldn't see gb loaded message with PDB loglevel!
|
||||
# assert not in_prompt_msg(
|
||||
# child,
|
||||
# ['`greenback` portal opened!'],
|
||||
# )
|
||||
# should be same root task
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
# XXX should see gb loaded with devx-loglevel.
|
||||
# "`greenback` portal opened!",
|
||||
# "Activated `greenback` for `tractor.pause_from_sync()` support!",
|
||||
|
||||
_pause_msg,
|
||||
"<Task '__main__.main'",
|
||||
"('root'",
|
||||
"<Task '__main__.main'",
|
||||
"tractor.pause()",
|
||||
]
|
||||
)
|
||||
|
||||
|
|
@ -130,17 +139,17 @@ def test_pause_from_sync(
|
|||
# `Lock.acquire()`-ed
|
||||
# (NOT both, which will result in REPL clobbering!)
|
||||
attach_patts: dict[str, list[str]] = {
|
||||
'subactor': [
|
||||
"'start_n_sync_pause'",
|
||||
"('subactor'",
|
||||
"|_<Task 'start_n_sync_pause'": [
|
||||
"|_('subactor'",
|
||||
"tractor.pause_from_sync()",
|
||||
],
|
||||
'inline_root_bg_thread': [
|
||||
"<Thread(inline_root_bg_thread",
|
||||
"|_<Thread(inline_root_bg_thread": [
|
||||
"('root'",
|
||||
"breakpoint(hide_tb=hide_tb)",
|
||||
],
|
||||
'start_soon_root_bg_thread': [
|
||||
"<Thread(start_soon_root_bg_thread",
|
||||
"('root'",
|
||||
"|_<Thread(start_soon_root_bg_thread": [
|
||||
"|_('root'",
|
||||
"tractor.pause_from_sync()",
|
||||
],
|
||||
}
|
||||
conts: int = 0 # for debugging below matching logic on failure
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import os
|
|||
import signal
|
||||
import time
|
||||
from typing import (
|
||||
Callable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
|
|
@ -47,7 +48,10 @@ if TYPE_CHECKING:
|
|||
|
||||
@no_macos
|
||||
def test_shield_pause(
|
||||
spawn: PexpectSpawner,
|
||||
spawn: Callable[
|
||||
...,
|
||||
PexpectSpawner,
|
||||
],
|
||||
):
|
||||
'''
|
||||
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||
|
|
@ -55,8 +59,10 @@ def test_shield_pause(
|
|||
next checkpoint wherein the cancelled will get raised.
|
||||
|
||||
'''
|
||||
child = spawn(
|
||||
'shield_hang_in_sub'
|
||||
child: PexpectSpawner = spawn(
|
||||
'shield_hang_in_sub',
|
||||
loglevel='devx',
|
||||
# ^XXX REQUIRED for below patt matching!
|
||||
)
|
||||
expect(
|
||||
child,
|
||||
|
|
@ -86,31 +92,54 @@ def test_shield_pause(
|
|||
# end-of-tree delimiter
|
||||
"end-of-\('root'",
|
||||
)
|
||||
assert_before(
|
||||
_before: str = assert_before(
|
||||
child,
|
||||
[
|
||||
# 'Srying to dump `stackscope` tree..',
|
||||
# 'Dumping `stackscope` tree for actor',
|
||||
"('root'", # uid line
|
||||
|
||||
# TODO!? this used to show?
|
||||
# TODO!? this in-task-code used to show??
|
||||
# -[ ] mk reproducable for @oremanj?
|
||||
# => SOLVED? by our `trio_token.run_sync_soon()`
|
||||
# approach?
|
||||
#
|
||||
# parent block point (non-shielded)
|
||||
# 'await trio.sleep_forever() # in root',
|
||||
]
|
||||
)
|
||||
|
||||
# NOTE, hierarchical-ordering invariant restored by
|
||||
# `_dump_then_relay` (co-scheduled dump+relay on the
|
||||
# trio loop, see `tractor.devx._stackscope`): the
|
||||
# parent's full task-tree prints BEFORE the 'Relaying
|
||||
# `SIGUSR1`' log msg, which prints BEFORE any sub-
|
||||
# actor receives the signal and dumps its own tree.
|
||||
# So the relay log appears BETWEEN `end-of-('root'`
|
||||
# (above) and `end-of-('hanger'` (below).
|
||||
handle_out_of_order: bool = False
|
||||
|
||||
if (
|
||||
handle_out_of_order
|
||||
and
|
||||
"end-of-('hanger'" in _before
|
||||
):
|
||||
assert "('hanger'" in _before
|
||||
assert 'Relaying `SIGUSR1`[10] to sub-actor' in _before
|
||||
|
||||
else:
|
||||
expect(
|
||||
child,
|
||||
# end-of-tree delimiter
|
||||
'Relaying `SIGUSR1`\\[10\\] to sub-actor',
|
||||
)
|
||||
expect(
|
||||
child,
|
||||
# end-of-subactor's-tree delimiter
|
||||
"end-of-\('hanger'",
|
||||
)
|
||||
assert_before(
|
||||
_before: str = assert_before(
|
||||
child,
|
||||
[
|
||||
# relay to the sub should be reported
|
||||
'Relaying `SIGUSR1`[10] to sub-actor',
|
||||
|
||||
"('hanger'", # uid line
|
||||
|
||||
# TODO!? SEE ABOVE
|
||||
|
|
@ -119,6 +148,7 @@ def test_shield_pause(
|
|||
]
|
||||
)
|
||||
|
||||
|
||||
# simulate the user sending a ctl-c to the hanging program.
|
||||
# this should result in the terminator kicking in since
|
||||
# the sub is shield blocking and can't respond to SIGINT.
|
||||
|
|
|
|||
|
|
@ -93,6 +93,7 @@ from __future__ import annotations
|
|||
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import signal
|
||||
import stat
|
||||
import sys
|
||||
|
|
@ -106,6 +107,25 @@ _SHM_PLATFORM_OK: bool = sys.platform.startswith(
|
|||
)
|
||||
SHM_DIR: str = '/dev/shm'
|
||||
|
||||
# UDS-socket leak sweep — see `find_orphaned_uds()` /
|
||||
# `reap_uds()` below. Tractor's UDS transport
|
||||
# (`tractor.ipc._uds`) creates sock files under
|
||||
# `${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock`; a
|
||||
# crash / SIGKILL / mid-cancel teardown can leave the
|
||||
# file behind because `os.unlink()` lives in the
|
||||
# `_serve_ipc_eps` `finally:` block which doesn't always
|
||||
# get to run on hard exits. The reaper here is best-effort
|
||||
# cleanup for the test harness + the `tractor-reap` CLI.
|
||||
_UDS_SUBDIR: str = 'tractor'
|
||||
# `<actor-name>@<pid>.sock` — pid is the binder's pid at
|
||||
# creation time. Special sentinel: `registry@1616.sock`
|
||||
# uses the magic `1616` not a real pid (the root
|
||||
# registrar's known address; see `UDSAddress.get_root`).
|
||||
_UDS_NAME_RE: re.Pattern = re.compile(
|
||||
r'^(?P<name>.+)@(?P<pid>\d+)\.sock$'
|
||||
)
|
||||
_UDS_REGISTRY_SENTINEL_PID: int = 1616
|
||||
|
||||
|
||||
def _ensure_shm_supported() -> None:
|
||||
'''
|
||||
|
|
@ -460,3 +480,301 @@ def reap_shm(
|
|||
f'{exc!r}'
|
||||
)
|
||||
return (unlinked, errors)
|
||||
|
||||
|
||||
def get_uds_dir() -> str|None:
|
||||
'''
|
||||
Path of tractor's per-user UDS sock-file dir
|
||||
(`${XDG_RUNTIME_DIR}/tractor/`).
|
||||
|
||||
Returns `None` when `XDG_RUNTIME_DIR` is unset (e.g.
|
||||
non-systemd hosts, or inside a container without the
|
||||
var plumbed through). Caller should treat that as
|
||||
"no UDS leaks possible to detect — skip".
|
||||
|
||||
'''
|
||||
xdg: str|None = os.environ.get('XDG_RUNTIME_DIR')
|
||||
if not xdg:
|
||||
return None
|
||||
return os.path.join(xdg, _UDS_SUBDIR)
|
||||
|
||||
|
||||
def _parse_uds_name(filename: str) -> tuple[str, int]|None:
|
||||
'''
|
||||
Extract `(actor_name, pid)` from a tractor UDS sock
|
||||
filename. Returns `None` for unrecognized names.
|
||||
|
||||
'''
|
||||
m = _UDS_NAME_RE.match(filename)
|
||||
if not m:
|
||||
return None
|
||||
return (m['name'], int(m['pid']))
|
||||
|
||||
|
||||
def find_orphaned_uds(
|
||||
*,
|
||||
uds_dir: str|None = None,
|
||||
) -> list[str]:
|
||||
'''
|
||||
`<uds_dir>/*.sock` paths whose binder pid is no
|
||||
longer alive (orphaned). Includes the
|
||||
`registry@1616.sock` sentinel — `1616` is a magic
|
||||
sentinel pid (not a real one) so the file's
|
||||
presence alone signals a leak from a dead session.
|
||||
|
||||
Returns `[]` on platforms without `XDG_RUNTIME_DIR`
|
||||
or when the dir doesn't exist. Files whose name
|
||||
doesn't match the `<name>@<pid>.sock` pattern are
|
||||
skipped (we don't unlink things we don't recognize).
|
||||
|
||||
'''
|
||||
dir_path: str = uds_dir or get_uds_dir()
|
||||
if not dir_path:
|
||||
return []
|
||||
|
||||
try:
|
||||
entries: list[str] = os.listdir(dir_path)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
leaked: list[str] = []
|
||||
prefix: str = dir_path.rstrip('/') + '/'
|
||||
for entry in entries:
|
||||
path: str = prefix + entry
|
||||
if not entry.endswith('.sock'):
|
||||
continue
|
||||
try:
|
||||
st: os.stat_result = os.stat(path)
|
||||
except OSError:
|
||||
continue
|
||||
# only sockets; skip stray regular files / subdirs
|
||||
if not stat.S_ISSOCK(st.st_mode):
|
||||
continue
|
||||
parsed = _parse_uds_name(entry)
|
||||
if parsed is None:
|
||||
# unknown naming — skip rather than risk
|
||||
# unlinking something we don't own
|
||||
continue
|
||||
_name, pid = parsed
|
||||
if pid == _UDS_REGISTRY_SENTINEL_PID:
|
||||
# sentinel — never a real pid; if the file
|
||||
# exists nobody live is "owning" it via
|
||||
# /proc lookup, so always orphaned
|
||||
leaked.append(path)
|
||||
continue
|
||||
if not _is_alive(pid):
|
||||
leaked.append(path)
|
||||
return leaked
|
||||
|
||||
|
||||
def reap_uds(
|
||||
paths: list[str],
|
||||
*,
|
||||
log=print,
|
||||
) -> tuple[list[str], list[tuple[str, OSError]]]:
|
||||
'''
|
||||
Unlink the given UDS sock-file paths.
|
||||
|
||||
Returns `(unlinked, errors)`; race-already-gone
|
||||
`FileNotFoundError`s count as success. Same shape
|
||||
as `reap_shm` so callers can pipeline both.
|
||||
|
||||
'''
|
||||
unlinked: list[str] = []
|
||||
errors: list[tuple[str, OSError]] = []
|
||||
for path in paths:
|
||||
try:
|
||||
os.unlink(path)
|
||||
unlinked.append(path)
|
||||
except FileNotFoundError:
|
||||
unlinked.append(path)
|
||||
except OSError as exc:
|
||||
errors.append((path, exc))
|
||||
|
||||
if unlinked:
|
||||
log(
|
||||
f'[tractor-reap] unlinked {len(unlinked)} '
|
||||
f'orphaned UDS sock-file(s): {unlinked}'
|
||||
)
|
||||
for path, exc in errors:
|
||||
log(
|
||||
f'[tractor-reap] could not unlink {path}: '
|
||||
f'{exc!r}'
|
||||
)
|
||||
return (unlinked, errors)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# Pytest fixtures — sub-plugin surface
|
||||
# ----------------------------------------------------------
|
||||
# Loaded as a pytest plugin via the `pytest_plugins` line in
|
||||
# `tractor._testing.pytest`. Keeps the reaping infra (helpers
|
||||
# above + fixtures below) co-located so adding a new reap
|
||||
# target is a single-file change. Sibling-module
|
||||
# (`tractor._testing.pytest`) keeps its core
|
||||
# tractor-tooling surface (option/marker/parametrize hooks,
|
||||
# `tractor_test` deco, transport / spawn-method fixtures)
|
||||
# uncluttered.
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='session',
|
||||
autouse=True,
|
||||
)
|
||||
def _reap_orphaned_subactors():
|
||||
'''
|
||||
Session-scoped autouse fixture: after the whole test
|
||||
session finishes, SIGINT any subactor processes still
|
||||
parented to this `pytest` process, wait a bounded
|
||||
grace window, then SIGKILL survivors.
|
||||
|
||||
Rationale: under fork-based spawn backends (notably
|
||||
`main_thread_forkserver`), a test that times out or bails
|
||||
mid-teardown can leave subactor forks alive. Without
|
||||
this reap, they linger across sessions and compete
|
||||
for ports / inherit pytest's capture-pipe fds — which
|
||||
flakifies later tests. SC-polite discipline: SIGINT
|
||||
first to let the subactor's trio cancel shield + IPC
|
||||
teardown paths run before we escalate.
|
||||
|
||||
Matching companion CLI: `scripts/tractor-reap` for
|
||||
the pytest-died-mid-session case.
|
||||
|
||||
'''
|
||||
parent_pid: int = os.getpid()
|
||||
yield
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
# NOTE, sweep UDS sock-files AFTER reaping subactors —
|
||||
# killed actors' bind paths only become "orphaned" once
|
||||
# their owning pid is gone. See `find_orphaned_uds()`
|
||||
# for the leak-detection algorithm + the `1616`
|
||||
# registry-sentinel special case.
|
||||
leaked_uds: list[str] = find_orphaned_uds()
|
||||
if leaked_uds:
|
||||
reap_uds(leaked_uds)
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='function',
|
||||
autouse=True,
|
||||
)
|
||||
def _track_orphaned_uds_per_test():
|
||||
'''
|
||||
Per-test (function-scoped) autouse UDS sock-file leak
|
||||
detector + reaper.
|
||||
|
||||
Snapshots `${XDG_RUNTIME_DIR}/tractor/` before and
|
||||
after each test; any `<name>@<pid>.sock` files
|
||||
created during the test that survive teardown AND
|
||||
whose creator pid is dead are surfaced as a loud
|
||||
warning AND reaped, so the next test starts with a
|
||||
clean dir.
|
||||
|
||||
Why per-test (not just session-scoped): under
|
||||
`--tpt-proto=uds`, a single hard-killed subactor
|
||||
leaves a sock file that a sibling test's
|
||||
`wait_for_actor`/`find_actor` discovery probes can
|
||||
accidentally hit (FileExistsError on rebind, or
|
||||
epoll register on a half-closed peer-FIN'd fd → see
|
||||
issue #452). Catching the leak the test that caused
|
||||
it (vs. blanket session-end sweep) makes blame
|
||||
obvious + prevents cascade flakiness.
|
||||
|
||||
Cheap: 2x `os.listdir` + a few `os.stat`s per test.
|
||||
Skips silently when `XDG_RUNTIME_DIR` isn't set.
|
||||
|
||||
'''
|
||||
uds_dir: str|None = get_uds_dir()
|
||||
# snapshot pre-test sock-file population so we only
|
||||
# blame this test for files it added (others may have
|
||||
# been left around by session-scoped fixtures /
|
||||
# cross-session leaks pending reaper).
|
||||
before: set[str] = set()
|
||||
if uds_dir:
|
||||
try:
|
||||
before = {
|
||||
e for e in os.listdir(uds_dir)
|
||||
if e.endswith('.sock')
|
||||
}
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
yield
|
||||
|
||||
if not uds_dir:
|
||||
return
|
||||
try:
|
||||
after: set[str] = {
|
||||
e for e in os.listdir(uds_dir)
|
||||
if e.endswith('.sock')
|
||||
}
|
||||
except OSError:
|
||||
return
|
||||
new_files: set[str] = after - before
|
||||
if not new_files:
|
||||
return
|
||||
# only consider files whose binder pid is dead (or the
|
||||
# 1616 sentinel) — a still-running test that legit
|
||||
# holds a sock open will be ignored here and caught at
|
||||
# session-end if it really is leaked.
|
||||
orphans: list[str] = find_orphaned_uds(uds_dir=uds_dir)
|
||||
new_orphans: list[str] = [
|
||||
os.path.join(uds_dir, n) for n in new_files
|
||||
if os.path.join(uds_dir, n) in orphans
|
||||
]
|
||||
if new_orphans:
|
||||
import warnings
|
||||
warnings.warn(
|
||||
f'UDS sock-file LEAK detected from test '
|
||||
f'(reaping):\n '
|
||||
+ '\n '.join(new_orphans),
|
||||
stacklevel=1,
|
||||
)
|
||||
reap_uds(new_orphans)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reap_subactors_per_test() -> int:
|
||||
'''
|
||||
Per-test (function-scoped) zombie-subactor reaper —
|
||||
**opt-in**, NOT autouse.
|
||||
|
||||
When a test's teardown fails to fully cancel its actor
|
||||
tree (e.g. an asyncio cancel-cascade times out under
|
||||
`main_thread_forkserver`, pytest hits its 200s wall-
|
||||
clock and abandons), the leftover subactor lingers as a
|
||||
direct child of `pytest` and squats on whatever
|
||||
registrar port / UDS path / shm segment it had bound.
|
||||
Subsequent tests trying to allocate the same resource
|
||||
fail — and with backends that bind a session-shared
|
||||
`reg_addr`, that means EVERY following test in the
|
||||
suite cascades. The session-scoped sibling
|
||||
(`_reap_orphaned_subactors`) only kicks in at session
|
||||
end which is too late to save the cascade.
|
||||
|
||||
Apply at module-level on the topically-problematic
|
||||
test files via:
|
||||
|
||||
```python
|
||||
pytestmark = pytest.mark.usefixtures(
|
||||
'reap_subactors_per_test',
|
||||
)
|
||||
```
|
||||
|
||||
Or per-test via the same `usefixtures` mark on a
|
||||
specific function. Intentionally NOT autouse so the
|
||||
fixture's presence on a module signals "this module's
|
||||
teardown is known-leaky enough to contaminate
|
||||
siblings"; the visibility helps future-us track down
|
||||
root causes rather than burying them under blanket
|
||||
cleanup.
|
||||
|
||||
'''
|
||||
parent_pid: int = os.getpid()
|
||||
yield parent_pid
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
|
|
|
|||
|
|
@ -24,10 +24,12 @@ from functools import (
|
|||
wraps,
|
||||
)
|
||||
import inspect
|
||||
import os
|
||||
import platform
|
||||
from typing import (
|
||||
Callable,
|
||||
get_args,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import pytest
|
||||
|
|
@ -35,6 +37,78 @@ import tractor
|
|||
from tractor.spawn._spawn import SpawnMethodKey
|
||||
import trio
|
||||
|
||||
# Sub-plugin: zombie-subactor + UDS sock-file + shm
|
||||
# reaping fixtures live in `tractor._testing._reap`
|
||||
# alongside the underlying detection/cleanup helpers.
|
||||
# Loading `_reap` as a sub-plugin here keeps reaping
|
||||
# concerns co-located + this module focused on tractor-
|
||||
# tooling-specific hooks (option/marker/parametrize,
|
||||
# `tractor_test` deco, transport / spawn-method
|
||||
# fixtures).
|
||||
pytest_plugins: tuple[str, ...] = (
|
||||
'tractor._testing._reap',
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from argparse import Namespace
|
||||
|
||||
# XXX REQUIRED in order to enforce `--capture=` flag
|
||||
# pre test session.
|
||||
# https://docs.pytest.org/en/stable/reference/reference.html#bootstrapping-hooks
|
||||
def pytest_load_initial_conftests(
|
||||
early_config: pytest.Config,
|
||||
parser: pytest.Parser,
|
||||
args: list[str],
|
||||
):
|
||||
opts: Namespace = early_config.option
|
||||
opts_w_args: Namespace = parser.parse_known_args(args)
|
||||
|
||||
# XXX, ALWAYS apply capsys for fork based spawners:
|
||||
# * main_thread_forkserver
|
||||
# * (TODO) subint_forkserver
|
||||
# '--capture=sys',
|
||||
# ^XXX NOTE^ for `main_thread_forkserver` spawner
|
||||
#
|
||||
# => sys-level capture is REQUIRED for fork-based spawn
|
||||
# backends (e.g. `main_thread_forkserver`): default
|
||||
# `--capture=fd` redirects fd 1,2 to temp files, and fork
|
||||
# children inherit those fds — opaque deadlocks happen in
|
||||
# the pytest-capture-machinery ↔ fork-child stdio
|
||||
# interaction. `--capture=sys` only redirects Python-level
|
||||
# `sys.stdout`/`sys.stderr`, leaving fd 1,2 alone.
|
||||
#
|
||||
# Trade-off (vs. `--capture=fd`):
|
||||
# - LOST: per-test attribution of subactor *raw-fd* output
|
||||
# (C-ext writes, `os.write(2, ...)`, subproc stdout). Not
|
||||
# zero — those go to the terminal, captured by CI's
|
||||
# terminal-level capture, just not per-test-scoped in the
|
||||
# pytest failure report.
|
||||
# - KEPT: Python-level `print()` + `logging` capture per-
|
||||
# test (tractor's logger uses `sys.stderr`, so tractor
|
||||
# log output IS still attributed per-test).
|
||||
# - KEPT: user `pytest -s` for debugging (unaffected).
|
||||
#
|
||||
# Full post-mortem in
|
||||
# `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
|
||||
if (
|
||||
(spawner := opts_w_args.spawn_backend) in [
|
||||
'main_thread_forkserver',
|
||||
]
|
||||
and
|
||||
opts.capture == 'fd'
|
||||
):
|
||||
print(
|
||||
f'XXX SETTING CAPSYS due to spawning backend XXX\n'
|
||||
f'--spawn-backend={spawner!r}\n'
|
||||
)
|
||||
opts.capture = 'sys'
|
||||
|
||||
# TODO, set various `$TRACTOR_X*` osenv vars here!
|
||||
print(
|
||||
f'Applying `tractor`-specific `pytest` config,\n'
|
||||
f'{opts_w_args!r}\n'
|
||||
)
|
||||
|
||||
|
||||
def tractor_test(
|
||||
wrapped: Callable|None = None,
|
||||
|
|
@ -216,8 +290,8 @@ def pytest_addoption(
|
|||
parser.addoption(
|
||||
"--enable-stackscope",
|
||||
action="store_true",
|
||||
dest='tractor_enable_stackscope',
|
||||
default=False,
|
||||
dest='enable_stackscope',
|
||||
# default=False,
|
||||
help=(
|
||||
'Install `stackscope` SIGUSR1 handler in pytest + '
|
||||
'every spawned subactor for live trio task-tree '
|
||||
|
|
@ -274,9 +348,10 @@ def pytest_configure(
|
|||
# gate honors. Lighter than `--tpdb` (no pdb machinery) —
|
||||
# purely for hang-investigation stack visibility.
|
||||
if getattr(
|
||||
config.option, 'tractor_enable_stackscope', False
|
||||
config.option,
|
||||
'enable_stackscope',
|
||||
False
|
||||
):
|
||||
import os
|
||||
# Env var inherited via fork → subactor's runtime
|
||||
# picks it up at `Actor.async_main` startup. See the
|
||||
# gate in `tractor.runtime._runtime` matching this
|
||||
|
|
@ -298,6 +373,8 @@ def pytest_configure(
|
|||
'--enable-stackscope is a no-op. '
|
||||
'Install via the `devx` dep group.'
|
||||
)
|
||||
else:
|
||||
os.environ.pop('TRACTOR_ENABLE_STACKSCOPE', None)
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(
|
||||
|
|
@ -337,91 +414,6 @@ def pytest_collection_modifyitems(
|
|||
break
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='session',
|
||||
autouse=True,
|
||||
)
|
||||
def _reap_orphaned_subactors():
|
||||
'''
|
||||
Session-scoped autouse fixture: after the whole test
|
||||
session finishes, SIGINT any subactor processes still
|
||||
parented to this `pytest` process, wait a bounded
|
||||
grace window, then SIGKILL survivors.
|
||||
|
||||
Rationale: under fork-based spawn backends (notably
|
||||
`main_thread_forkserver`), a test that times out or bails
|
||||
mid-teardown can leave subactor forks alive. Without
|
||||
this reap, they linger across sessions and compete
|
||||
for ports / inherit pytest's capture-pipe fds — which
|
||||
flakifies later tests. SC-polite discipline: SIGINT
|
||||
first to let the subactor's trio cancel shield + IPC
|
||||
teardown paths run before we escalate.
|
||||
|
||||
Matching companion CLI: `scripts/tractor-reap` for
|
||||
the pytest-died-mid-session case.
|
||||
|
||||
'''
|
||||
import os
|
||||
parent_pid: int = os.getpid()
|
||||
yield
|
||||
from tractor._testing._reap import (
|
||||
find_descendants,
|
||||
reap,
|
||||
)
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reap_subactors_per_test() -> int:
|
||||
'''
|
||||
Per-test (function-scoped) zombie-subactor reaper —
|
||||
**opt-in**, NOT autouse.
|
||||
|
||||
When a test's teardown fails to fully cancel its actor
|
||||
tree (e.g. an asyncio cancel-cascade times out under
|
||||
`main_thread_forkserver`, pytest hits its 200s wall-
|
||||
clock and abandons), the leftover subactor lingers as a
|
||||
direct child of `pytest` and squats on whatever
|
||||
registrar port / UDS path / shm segment it had bound.
|
||||
Subsequent tests trying to allocate the same resource
|
||||
fail — and with backends that bind a session-shared
|
||||
`reg_addr`, that means EVERY following test in the
|
||||
suite cascades. The session-scoped sibling
|
||||
(`_reap_orphaned_subactors`) only kicks in at session
|
||||
end which is too late to save the cascade.
|
||||
|
||||
Apply at module-level on the topically-problematic
|
||||
test files via:
|
||||
|
||||
```python
|
||||
pytestmark = pytest.mark.usefixtures(
|
||||
'reap_subactors_per_test',
|
||||
)
|
||||
```
|
||||
|
||||
Or per-test via the same `usefixtures` mark on a
|
||||
specific function. Intentionally NOT autouse so the
|
||||
fixture's presence on a module signals "this module's
|
||||
teardown is known-leaky enough to contaminate
|
||||
siblings"; the visibility helps future-us track down
|
||||
root causes rather than burying them under blanket
|
||||
cleanup.
|
||||
|
||||
'''
|
||||
import os
|
||||
parent_pid: int = os.getpid()
|
||||
yield parent_pid
|
||||
from tractor._testing._reap import (
|
||||
find_descendants,
|
||||
reap,
|
||||
)
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def debug_mode(
|
||||
request: pytest.FixtureRequest,
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ disjoint, parallel executing tasks in separate actors.
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
# from functools import partial
|
||||
from functools import partial
|
||||
from threading import (
|
||||
current_thread,
|
||||
Thread,
|
||||
|
|
@ -63,7 +63,10 @@ if TYPE_CHECKING:
|
|||
|
||||
|
||||
@trio.lowlevel.disable_ki_protection
|
||||
def dump_task_tree() -> None:
|
||||
def dump_task_tree(
|
||||
write_file: bool = False,
|
||||
write_tty: bool = False,
|
||||
) -> None:
|
||||
'''
|
||||
Do a classic `stackscope.extract()` task-tree dump to console at
|
||||
`.devx()` level.
|
||||
|
|
@ -112,16 +115,16 @@ def dump_task_tree() -> None:
|
|||
# |_[Storage/Memory/IPC-Stream/Data-Struct
|
||||
|
||||
fpath: str = f'/tmp/tractor-stackscope-{os.getpid()}.log'
|
||||
from . import _pformat
|
||||
actor_repr: str = _pformat.nest_from_op(
|
||||
from . import pformat
|
||||
actor_repr: str = pformat.nest_from_op(
|
||||
input_op='|_',
|
||||
text=f'{actor}',
|
||||
nest_prefilx='|_',
|
||||
nest_prefix='|_',
|
||||
nest_indent=3,
|
||||
)
|
||||
full_dump: str = (
|
||||
f'Dumping `stackscope` tree for actor\n'
|
||||
f'(>: {actor.uid!r}\n'
|
||||
f'(>: {actor.aid.uid!r}\n'
|
||||
f' |_{mp.current_process()}\n'
|
||||
f' |_{thr}\n'
|
||||
# TODO, use the nest_from_op
|
||||
|
|
@ -134,11 +137,11 @@ def dump_task_tree() -> None:
|
|||
f'capture-bypass tee: {fpath}\n'
|
||||
f'(`tail -f {fpath}` to follow across signals)\n'
|
||||
f'\n'
|
||||
f'------ start-of-{actor.uid!r} ------\n'
|
||||
f'------ start-of-{actor.aid.uid!r} ------\n'
|
||||
f'|\n'
|
||||
f'{tree_str}'
|
||||
f'|\n'
|
||||
f'|_____ end-of-{actor.uid!r} ______\n'
|
||||
f'|_____ end-of-{actor.aid.uid!r} ______\n'
|
||||
)
|
||||
log.devx(full_dump)
|
||||
|
||||
|
|
@ -146,6 +149,7 @@ def dump_task_tree() -> None:
|
|||
# `--capture=fd` swallows `log.devx()` above; the
|
||||
# following two writes guarantee the dump reaches the
|
||||
# human even when stdio is captured.
|
||||
if write_file:
|
||||
try:
|
||||
with open(fpath, 'a') as f:
|
||||
f.write(full_dump + '\n')
|
||||
|
|
@ -154,6 +158,7 @@ def dump_task_tree() -> None:
|
|||
f'Failed to tee stackscope dump to {fpath!r}'
|
||||
)
|
||||
|
||||
if write_tty:
|
||||
try:
|
||||
with open('/dev/tty', 'w') as tty:
|
||||
tty.write(full_dump + '\n')
|
||||
|
|
@ -167,7 +172,7 @@ _tree_dumped: bool = False
|
|||
|
||||
# Captured at `enable_stack_on_sig()` time when running
|
||||
# inside a trio task. `dump_tree_on_sig` uses this to
|
||||
# schedule `dump_task_tree` ON the trio loop via
|
||||
# schedule `dump_task_tree()` ON the trio loop via
|
||||
# `token.run_sync_soon` so stackscope sees a real current
|
||||
# task and can recurse into nursery children. Without
|
||||
# it (signal handler running in a non-trio stack frame),
|
||||
|
|
@ -176,13 +181,70 @@ _tree_dumped: bool = False
|
|||
_trio_token: trio.lowlevel.TrioToken|None = None
|
||||
|
||||
|
||||
def _safe_dump_task_tree() -> None:
|
||||
def _relay_sig_to_subactors(sig: int) -> None:
|
||||
'''
|
||||
`run_sync_soon`-friendly wrapper that swallows any
|
||||
exception from `dump_task_tree`. Trio prints
|
||||
+ crashes on uncaught exceptions in scheduled
|
||||
callbacks; we'd rather log + keep the test running so
|
||||
the user can re-trigger the dump.
|
||||
Forward `sig` to every live sub-actor's underlying
|
||||
process so each runs its own `dump_tree_on_sig`
|
||||
handler.
|
||||
|
||||
Factored out of `dump_tree_on_sig` so the
|
||||
`run_sync_soon`-deferred path can call it AFTER
|
||||
the parent's `dump_task_tree()` completes — see
|
||||
`_dump_then_relay` below for why ordering matters.
|
||||
|
||||
'''
|
||||
an: ActorNursery
|
||||
for an in _state.current_actor()._actoruid2nursery.values():
|
||||
subproc: ProcessType
|
||||
subactor: Actor
|
||||
for (
|
||||
subactor,
|
||||
subproc,
|
||||
_,
|
||||
) in an._children.values():
|
||||
log.warning(
|
||||
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
||||
f'{subactor}\n'
|
||||
f' |_{subproc}\n'
|
||||
)
|
||||
# bc of course stdlib can't have a std API.. XD
|
||||
match subproc:
|
||||
case trio.Process():
|
||||
subproc.send_signal(sig)
|
||||
|
||||
case mp.Process():
|
||||
subproc._send_signal(sig)
|
||||
|
||||
|
||||
def _dump_then_relay(
|
||||
sig: int|None,
|
||||
) -> None:
|
||||
'''
|
||||
`run_sync_soon`-friendly callback: dump THIS actor's
|
||||
task tree first, THEN relay `sig` to subactors so
|
||||
their dumps can't race ahead of ours.
|
||||
|
||||
Hierarchical-ordering preservation: the legacy
|
||||
direct-call path (pre-`run_sync_soon`) ran the dump
|
||||
synchronously inside the signal handler, then
|
||||
relayed — guaranteeing parent-output-before-child
|
||||
in the multiplexed pty stream. The pure-deferred
|
||||
path (schedule dump only, relay sync from handler)
|
||||
inverts that: relay fires while the parent's
|
||||
dump is still queued, subs receive SIGUSR1 and
|
||||
schedule their own dumps, all dumps then race in
|
||||
arbitrary order through stdio.
|
||||
|
||||
Co-scheduling fixes that: by chaining relay AFTER
|
||||
`dump_task_tree()` inside the same trio-loop
|
||||
callback, parent output flushes before any sub
|
||||
receives the signal, restoring the
|
||||
parent → relay-log → sub-dump ordering humans
|
||||
expect when reading hang-investigation traces.
|
||||
|
||||
Trio prints + crashes on uncaught exceptions in
|
||||
scheduled callbacks; we swallow + log so the test
|
||||
keeps running and the user can re-trigger.
|
||||
|
||||
'''
|
||||
try:
|
||||
|
|
@ -193,6 +255,17 @@ def _safe_dump_task_tree() -> None:
|
|||
'`run_sync_soon`); continuing.\n'
|
||||
)
|
||||
|
||||
if sig is None:
|
||||
return
|
||||
|
||||
try:
|
||||
_relay_sig_to_subactors(sig)
|
||||
except BaseException:
|
||||
log.exception(
|
||||
f'`_relay_sig_to_subactors({sig})` raised '
|
||||
f'(scheduled via `run_sync_soon`); continuing.\n'
|
||||
)
|
||||
|
||||
|
||||
def dump_tree_on_sig(
|
||||
sig: int,
|
||||
|
|
@ -223,8 +296,23 @@ def dump_tree_on_sig(
|
|||
# only the `<init>` task. Falls back to a direct
|
||||
# call when no token was captured (e.g. signal
|
||||
# delivered outside a trio.run).
|
||||
#
|
||||
# Co-schedule the relay-to-subs in the SAME
|
||||
# callback so parent's dump prints BEFORE any
|
||||
# sub receives SIGUSR1 — see `_dump_then_relay`
|
||||
# for the full hierarchical-ordering rationale.
|
||||
if _trio_token is not None:
|
||||
_trio_token.run_sync_soon(_safe_dump_task_tree)
|
||||
_trio_token.run_sync_soon(
|
||||
partial(
|
||||
_dump_then_relay,
|
||||
sig=sig if relay_to_subs else None,
|
||||
)
|
||||
)
|
||||
# NOTE, `_dump_then_relay` handles the relay
|
||||
# internally; bail out before the
|
||||
# direct-path relay below.
|
||||
return
|
||||
|
||||
else:
|
||||
dump_task_tree()
|
||||
|
||||
|
|
@ -246,27 +334,15 @@ def dump_tree_on_sig(
|
|||
# 'Supposedly we dumped just fine..?'
|
||||
# )
|
||||
|
||||
# Direct-path relay (only reached when `_trio_token`
|
||||
# was None — the run_sync_soon path returned above
|
||||
# to let `_dump_then_relay` handle the relay
|
||||
# in-callback).
|
||||
if not relay_to_subs:
|
||||
log.devx(f'Skipping {sig!r} relay to subactors..')
|
||||
return
|
||||
|
||||
an: ActorNursery
|
||||
for an in _state.current_actor()._actoruid2nursery.values():
|
||||
subproc: ProcessType
|
||||
subactor: Actor
|
||||
for subactor, subproc, _ in an._children.values():
|
||||
log.warning(
|
||||
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
||||
f'{subactor}\n'
|
||||
f' |_{subproc}\n'
|
||||
)
|
||||
|
||||
# bc of course stdlib can't have a std API.. XD
|
||||
match subproc:
|
||||
case trio.Process():
|
||||
subproc.send_signal(sig)
|
||||
|
||||
case mp.Process():
|
||||
subproc._send_signal(sig)
|
||||
_relay_sig_to_subactors(sig)
|
||||
|
||||
|
||||
def enable_stack_on_sig(
|
||||
|
|
@ -305,11 +381,13 @@ def enable_stack_on_sig(
|
|||
message=r"coroutine method '(asend|athrow)' .* was never awaited",
|
||||
)
|
||||
import stackscope
|
||||
_state._runtime_vars['use_stackscope'] = True
|
||||
except ImportError:
|
||||
log.warning(
|
||||
'The `stackscope` lib is not installed!\n'
|
||||
'`Ignoring enable_stack_on_sig() call!\n'
|
||||
)
|
||||
assert not _state._runtime_vars['use_stackscope']
|
||||
return None
|
||||
|
||||
# Capture the trio token if we're inside `trio.run`
|
||||
|
|
|
|||
|
|
@ -181,7 +181,7 @@ class Lock:
|
|||
return (
|
||||
f'<{cls.__name__}(\n'
|
||||
f'{body}'
|
||||
')>\n\n'
|
||||
')>\n'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
|
@ -282,7 +282,7 @@ class Lock:
|
|||
):
|
||||
message += (
|
||||
'-> No new task holds the TTY lock!\n\n'
|
||||
f'{Lock.repr()}\n'
|
||||
f'{Lock.repr()}'
|
||||
)
|
||||
|
||||
elif (
|
||||
|
|
|
|||
|
|
@ -940,16 +940,13 @@ class Actor:
|
|||
# `tractor._testing.pytest`'s `--enable-stackscope`
|
||||
# CLI flag — env var propagates via fork-inherited
|
||||
# environ).
|
||||
import os
|
||||
if rvs['_debug_mode']:
|
||||
if (
|
||||
rvs['_debug_mode']
|
||||
rvs.get('use_stackscope')
|
||||
or
|
||||
os.environ.get('TRACTOR_ENABLE_STACKSCOPE')
|
||||
):
|
||||
from ..devx import (
|
||||
enable_stack_on_sig,
|
||||
maybe_init_greenback,
|
||||
)
|
||||
from ..devx import enable_stack_on_sig
|
||||
try:
|
||||
# TODO: maybe return some status msgs upward
|
||||
# to that we can emit them in `con_status`
|
||||
|
|
@ -966,6 +963,7 @@ class Actor:
|
|||
)
|
||||
|
||||
if rvs.get('use_greenback', False):
|
||||
from ..devx import maybe_init_greenback
|
||||
maybe_mod: ModuleType|None = await maybe_init_greenback()
|
||||
if maybe_mod:
|
||||
log.devx(
|
||||
|
|
|
|||
|
|
@ -93,6 +93,7 @@ class RuntimeVars(Struct):
|
|||
repl_fixture: bool|Callable = False # |AbstractContextManager[bool]
|
||||
# for `tractor.pause_from_sync()` & `breakpoint()` support
|
||||
use_greenback: bool = False
|
||||
use_stackscope: bool = False
|
||||
|
||||
# infected-`asyncio`-mode: `trio` running as guest.
|
||||
_is_infected_aio: bool = False
|
||||
|
|
@ -139,8 +140,9 @@ _RUNTIME_VARS_DEFAULTS: dict[str, Any] = {
|
|||
# `debug_mode: bool` settings
|
||||
'_debug_mode': False, # bool
|
||||
'repl_fixture': False, # |AbstractContextManager[bool]
|
||||
# for `tractor.pause_from_sync()` & `breakpoint()` support
|
||||
'use_greenback': False,
|
||||
|
||||
'use_greenback': False, # `.pause_from_sync()`/`breakpoint()`
|
||||
'use_stackscope': False, # trio-task-stack dumps on SIGUSR1
|
||||
|
||||
# infected-`asyncio`-mode: `trio` running as guest.
|
||||
'_is_infected_aio': False,
|
||||
|
|
|
|||
Loading…
Reference in New Issue