Compare commits

..

5 Commits

Author SHA1 Message Date
Gud Boi 2ee44a6fdd Fix shutdown deadlock on UDS unlink race
Wrap `os.unlink()` in `close_listener()` with a `FileNotFoundError`
guard — under concurrent pytest sessions the sock-file can already be
reaped. Without this the raise aborts `_serve_ipc_eps`'s finally before
`_shutdown.set()`, deadlocking `wait_for_shutdown()` on
`actor.cancel()`.

Also,
- close each endpoint independently in the finally so one raise doesn't
  strand the rest.
- always signal `_shutdown.set()` regardless of remaining ep count.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-06 14:11:51 -04:00
Gud Boi 7b14fdcd96 Add `tractor_diag`(nosis) xontrib with aliases
Xonsh xontrib providing three diagnostic commands
for tractor development / hang investigation:

- `pytree <pid|pat>` — psutil-backed proc tree with severity-bucketed
  output (zombies > orphans > live), tree-depth markers, zombie-safe
  rendering.
- `hung-dump <pid|pat>` — kernel `wchan`/`stack` + `py-spy dump
  --locals` per descendant, sudo-cred caching upfront, pgrep fallback
  when psutil absent.
- `bindspace-scan [<dir>]` — scan UDS bindspace for orphaned
  `<name>@<pid>.sock` files whose binder pid is dead, emit `rm`
  one-liner for cleanup.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-06 14:07:24 -04:00
Gud Boi e4953851de Mk per-test reap fixtures opt-in
Rename `_track_orphaned_uds_per_test` and
`_detect_runaway_subactors_per_test` to public names (drop `_` prefix),
drop `autouse=True`. Tests that need per-test reap blame now opt in via
`pytestmark = pytest.mark.usefixtures(...)`.

Also,
- reduce `sample_interval` from 0.5 -> 0.05s so the CPU probe is cheaper
  per pid.
- add empty-`only_pids` fast-path in `find_runaway_subactors` to skip
  psutil import when no descendants were spawned.
- extract `new_pids` intermediate var for clarity.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-06 13:29:49 -04:00
Gud Boi c4082be876 Mv `daemon` + `test_multi_program` to `discovery/`
All `daemon` fixture consumers are discovery-
protocol tests now living under `tests/discovery/`.
Move the fixture, its `_wait_for_daemon_ready`
helper, and `test_multi_program.py` into that subdir
so scope matches usage.

Also,
- add `pytestmark` for `track_orphaned_uds_per_test`
  + `detect_runaway_subactors_per_test` to `test_multi_program` as
    regression net.
- drop now-unused `_PROC_SPAWN_WAIT` + `socket` import from root
  conftest.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-06 13:23:42 -04:00
Gud Boi ec8c4659c4 Replace sleep with active poll in `daemon` fixture
First draft at resolving,
https://github.com/goodboy/tractor/issues/424

`tests.conftest.py.daemon()` previously used a blind
`time.sleep(_PROC_SPAWN_WAIT + uds_bonus + ci_bonus)` to "wait for the
daemon to come up" before yielding the proc to the test.

Two problems:

1. **Racy under load** — sleep is fixed at design time; loaded boxes
   / cold starts / fork-spawn cost spikes blow past it, leading to
   `ConnectionRefusedError` /`OSError: connect failed` flakes in
   `test_register_duplicate_name`.

2. **Wasteful when daemon comes up fast** — happy-path pays the FULL
   sleep regardless. ~3s of dead time per fixture invocation, ~10-20s
   per full suite run.

Replace with `_wait_for_daemon_ready()` — active poll via stdlib
`socket.create_connection` (TCP) or `socket.connect` (UDS) on the
daemon's bind addr, with 50ms backoff and a 10s/15s deadline (CI gets
extra headroom). Daemon-died-during-startup early-exit catches the case
where `_PROC_SPAWN_WAIT` was silently masking daemon startup crashes.

Why stdlib `socket` (Option 2 from the conc-anal doc) instead of
`tractor`'s own `_root.ping_tpt_socket` closure or trio?

- `tractor.run_daemon()` doesn't return from bootstrap until the runtime
  is fully ready to handle IPC, so probing listen-side acceptance is
  sufficient.
- no need to do the full IPC handshake just to validate readiness.
  Sidesteps the `trio.run()` bootstrap cost (~50ms) per fixture too.

`claude`'s verification: 10/10 runs of `tests/test_multi_program.py`
pass on both `--tpt-proto=tcp` and `--tpt-proto=uds`. Per-test wall-time
`test_register_duplicate_name`: 4.31s → 1.10s. Full file: ~12s → 3.27s
per transport.

Doc-tracked at:
`ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`

Future work — session-scoped trio runtime in a bg thread to share
fixture-side trio operations across many fixtures (currently overkill
for the one fixture that needs it).

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-04 20:03:41 -04:00
7 changed files with 808 additions and 138 deletions

View File

@ -34,15 +34,10 @@ if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT _KILL_SIGNAL = signal.CTRL_BREAK_EVENT
_INT_SIGNAL = signal.CTRL_C_EVENT _INT_SIGNAL = signal.CTRL_C_EVENT
_INT_RETURN_CODE = 3221225786 _INT_RETURN_CODE = 3221225786
_PROC_SPAWN_WAIT = 2
else: else:
_KILL_SIGNAL = signal.SIGKILL _KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT _INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = (
2 if _ci_env
else 1
)
no_windows = pytest.mark.skipif( no_windows = pytest.mark.skipif(
@ -244,112 +239,14 @@ def sig_prog(
assert ret assert ret
# TODO: factor into @cm and move to `._testing`? # NOTE, the `daemon` fixture (+ its `_wait_for_daemon_ready`
@pytest.fixture # helper + the post-yield teardown drain logic) has been
def daemon( # moved to `tests/discovery/conftest.py` since 100% of its
debug_mode: bool, # consumers are discovery-protocol tests now living under
loglevel: str, # that subdir. See:
testdir: pytest.Pytester, # - `tests/discovery/test_multi_program.py`
reg_addr: tuple[str, int], # - `tests/discovery/test_registrar.py`
tpt_proto: str, # - `tests/discovery/test_tpt_bind_addrs.py`
ci_env: bool,
test_log: tractor.log.StackLevelAdapter,
# set_fork_aware_capture,
) -> subprocess.Popen:
'''
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
'''
# XXX: too much logging will lock up the subproc (smh)
if loglevel in ('trace', 'debug'):
test_log.warning(
f'Test harness log level is too verbose: {loglevel!r}\n'
f'Reducing to INFO level..'
)
loglevel: str = 'info'
code: str = (
"import tractor; "
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"enable_transports={enable_tpts}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
enable_tpts=str([tpt_proto]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
# breakpoint()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc: subprocess.Popen = testdir.popen(
cmd,
**kwargs,
)
# TODO! we should poll for the registry socket-bind to take place
# and only once that's done yield to the requester!
# -[ ] TCP: use the `._root.open_root_actor()`::`ping_tpt_socket()`
# closure!
# -[ ] UDS: can we do something similar for 'pinging" the
# file-socket?
#
bg_daemon_spawn_delay: float = _PROC_SPAWN_WAIT
# UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting
# the first actor-tree..
if tpt_proto == 'uds':
bg_daemon_spawn_delay += 1.6
if _non_linux and ci_env:
bg_daemon_spawn_delay += 1
# XXX, allow time for the sub-py-proc to boot up.
# !TODO, see ping-polling ideas above!
time.sleep(bg_daemon_spawn_delay)
assert not proc.returncode
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
stdout: str = proc.stdout.read().decode()
if (
stderr
or
stdout
):
print(
f'Daemon actor tree produced output:\n'
f'{proc.args}\n'
f'\n'
f'stderr: {stderr!r}\n'
f'stdout: {stdout!r}\n'
)
if (rc := proc.returncode) != -2:
msg: str = (
f'Daemon actor tree was not cancelled !?\n'
f'proc.args: {proc.args!r}\n'
f'proc.returncode: {rc!r}\n'
)
if rc < 0:
raise RuntimeError(msg)
test_log.error(msg)
# @pytest.fixture(autouse=True) # @pytest.fixture(autouse=True)

View File

@ -0,0 +1,223 @@
'''
Discovery-suite fixtures, including the `daemon`
remote-registrar subprocess used by the multi-program
discovery tests.
Lives here (vs. the parent `tests/conftest.py`)
because `daemon` is a discovery-protocol primitive
boots a separate `tractor.run_daemon()` process whose
sole purpose is to serve as a registrar peer for
discovery-roundtrip tests. Pytest fixtures inherit
DOWNWARD through conftest hierarchy, so anything
under `tests/discovery/` automatically picks this up.
'''
from __future__ import annotations
import os
import platform
import socket
import subprocess
import sys
import time
import pytest
import tractor
from ..conftest import (
sig_prog,
_INT_SIGNAL,
_non_linux,
)
def _wait_for_daemon_ready(
reg_addr: tuple,
tpt_proto: str,
*,
deadline: float = 10.0,
poll_interval: float = 0.05,
proc: subprocess.Popen|None = None,
) -> None:
'''
Active-poll the daemon's bind address until it
accepts a connection (proving it has called
`bind() + listen()` and is ready to handle IPC).
Replaces the historical blind `time.sleep()` in the
`daemon` fixture which was racy under load see
`ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`.
Uses stdlib `socket` directly (no trio runtime
bootstrap cost) sufficient because
`tractor.run_daemon()` doesn't return from
bootstrap until the runtime is fully ready to
accept IPC.
Raises `TimeoutError` on `deadline` exceeded. If
`proc` is given, ALSO raises early if the daemon
process exits non-zero before the deadline (catches
daemon-startup-crash that the blind sleep used to
silently mask).
'''
end: float = time.monotonic() + deadline
last_exc: Exception|None = None
while time.monotonic() < end:
# Daemon-died-during-startup early-exit. Without
# this, a crashed-on-import daemon would just
# eat the full deadline before raising opaque
# TimeoutError.
if proc is not None and proc.poll() is not None:
raise RuntimeError(
f'Daemon proc exited (rc={proc.returncode}) '
f'before becoming ready to accept on '
f'{reg_addr!r}'
)
try:
if tpt_proto == 'tcp':
# `socket.create_connection` does the
# `socket() + connect()` dance with a
# builtin timeout — perfect primitive
# for a one-shot probe.
with socket.create_connection(
reg_addr,
timeout=poll_interval,
):
return
else:
# UDS — `reg_addr` is a `(filedir, sockname)`
# tuple per `tractor.ipc._uds.UDSAddress.unwrap`.
sockpath: str = os.path.join(*reg_addr)
sock = socket.socket(socket.AF_UNIX)
try:
sock.settimeout(poll_interval)
sock.connect(sockpath)
return
finally:
sock.close()
except (
ConnectionRefusedError,
FileNotFoundError,
OSError,
socket.timeout,
) as exc:
last_exc = exc
time.sleep(poll_interval)
raise TimeoutError(
f'Daemon never accepted on {reg_addr!r} within '
f'{deadline}s (last connect-attempt exc: '
f'{last_exc!r})'
)
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
debug_mode: bool,
loglevel: str,
testdir: pytest.Pytester,
reg_addr: tuple[str, int],
tpt_proto: str,
ci_env: bool,
test_log: tractor.log.StackLevelAdapter,
) -> subprocess.Popen:
'''
Run a daemon root actor as a separate actor-process
tree and "remote registrar" for discovery-protocol
related tests.
'''
# XXX: too much logging will lock up the subproc (smh)
if loglevel in ('trace', 'debug'):
test_log.warning(
f'Test harness log level is too verbose: {loglevel!r}\n'
f'Reducing to INFO level..'
)
loglevel: str = 'info'
code: str = (
"import tractor; "
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"enable_transports={enable_tpts}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
enable_tpts=str([tpt_proto]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc: subprocess.Popen = testdir.popen(
cmd,
**kwargs,
)
# Active-poll the daemon's bind address until it's
# ready to accept connections — replaces the legacy
# blind `time.sleep(2.2)` which was racy under load
# (see
# `ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`).
#
# Per-test deadline scales with platform: macOS/CI
# gets extra headroom; Linux dev boxes need very
# little.
deadline: float = (
15.0 if (_non_linux and ci_env)
else 10.0
)
_wait_for_daemon_ready(
reg_addr=reg_addr,
tpt_proto=tpt_proto,
deadline=deadline,
proc=proc,
)
assert not proc.returncode
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc
# sometimes it can lock up on the `_io.BufferedReader`
# and hang..
#
# NB, drain happens at TEARDOWN (post-yield), so the
# test body has its chance to read `proc.stderr`
# FIRST. Reading here AFTER would silently swallow
# the daemon's stderr output and break tests that
# assert on it (e.g. `test_abort_on_sigint`).
stderr: str = proc.stderr.read().decode()
stdout: str = proc.stdout.read().decode()
if (
stderr
or
stdout
):
print(
f'Daemon actor tree produced output:\n'
f'{proc.args}\n'
f'\n'
f'stderr: {stderr!r}\n'
f'stdout: {stdout!r}\n'
)
if (rc := proc.returncode) != -2:
msg: str = (
f'Daemon actor tree was not cancelled !?\n'
f'proc.args: {proc.args!r}\n'
f'proc.returncode: {rc!r}\n'
)
if rc < 0:
raise RuntimeError(msg)
test_log.error(msg)

View File

@ -22,7 +22,7 @@ from tractor import (
Portal, Portal,
) )
from tractor.runtime import _state from tractor.runtime import _state
from .conftest import ( from ..conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,
@ -38,6 +38,17 @@ if TYPE_CHECKING:
_non_linux: bool = platform.system() != 'Linux' _non_linux: bool = platform.system() != 'Linux'
# NOTE, multi-program tests historically triggered both
# UDS sock-file leaks (daemon-subproc SIGKILL paths) AND
# trio `WakeupSocketpair.drain()` busy-loops
# (`test_register_duplicate_name`). Track + detect
# per-test as a regression net.
pytestmark = pytest.mark.usefixtures(
'track_orphaned_uds_per_test',
'detect_runaway_subactors_per_test',
)
def test_abort_on_sigint( def test_abort_on_sigint(
daemon: subprocess.Popen, daemon: subprocess.Popen,
): ):

View File

@ -222,7 +222,7 @@ def find_runaway_subactors(
parent_pid: int, parent_pid: int,
*, *,
cpu_threshold: float = 95.0, cpu_threshold: float = 95.0,
sample_interval: float = 0.5, sample_interval: float = 0.05,
only_pids: set[int]|None = None, only_pids: set[int]|None = None,
) -> list[tuple[int, float, str]]: ) -> list[tuple[int, float, str]]:
''' '''
@ -237,16 +237,30 @@ def find_runaway_subactors(
`cpu_percent(interval=sample_interval)` is the `cpu_percent(interval=sample_interval)` is the
canonical psutil API for a "what %CPU is this proc canonical psutil API for a "what %CPU is this proc
using NOW" answer — it samples twice with a delta to using NOW" answer — it samples twice with a delta to
compute true utilization. compute true utilization. Default `sample_interval`
of 50ms is enough to register a sustained C-level
tight-loop at ~100% but cheap enough to run as part
of an autouse per-test fixture without dominating
suite wall-clock. Sub-50ms transient bursts are NOT
the bug class we're hunting (those are normal Python
work) so the lost sensitivity is fine.
`only_pids` filters to a specific pre-snapshotted set `only_pids` filters to a specific pre-snapshotted set
(e.g. "pids spawned during this test only"); when (e.g. "pids spawned during this test only"); when
`None`, all live descendants are checked. `None`, all live descendants are checked. Empty
`only_pids` returns `[]` IMMEDIATELY fast path for
tests that didn't spawn anything new.
Returns `[]` when `psutil` isn't installed or no Returns `[]` when `psutil` isn't installed or no
descendants match. descendants match.
''' '''
# Fast-path: caller passed empty `only_pids` —
# nothing to sample. Avoids the psutil import + /proc
# walk for tests that didn't spawn descendants.
if only_pids is not None and not only_pids:
return []
try: try:
import psutil import psutil
except ImportError: except ImportError:
@ -718,12 +732,25 @@ def _reap_orphaned_subactors():
@pytest.fixture( @pytest.fixture(
scope='function', scope='function',
autouse=True,
) )
def _track_orphaned_uds_per_test(): def track_orphaned_uds_per_test():
''' '''
Per-test (function-scoped) autouse UDS sock-file leak Per-test (function-scoped) UDS sock-file leak
detector + reaper. detector + reaper. **Opt-in**, NOT autouse.
Apply at module level on UDS-heavy test files via:
pytestmark = pytest.mark.usefixtures(
'track_orphaned_uds_per_test',
)
The session-end `_reap_orphaned_subactors` fixture
is the always-on safety net that catches leaks at
suite teardown; this per-test fixture is for the
smaller set of modules where blame attribution per
test matters (i.e. modules with a HISTORY of leaky
teardown that flakifies sibling tests via
sock-file rebind races).
Snapshots `${XDG_RUNTIME_DIR}/tractor/` before and Snapshots `${XDG_RUNTIME_DIR}/tractor/` before and
after each test; any `<name>@<pid>.sock` files after each test; any `<name>@<pid>.sock` files
@ -797,12 +824,18 @@ def _track_orphaned_uds_per_test():
@pytest.fixture( @pytest.fixture(
scope='function', scope='function',
autouse=True,
) )
def _detect_runaway_subactors_per_test(): def detect_runaway_subactors_per_test():
''' '''
Per-test (function-scoped) autouse runaway-subactor Per-test (function-scoped) runaway-subactor detector.
detector. **Opt-in**, NOT autouse.
Apply at module level on cancellation-cascade-heavy
test files via:
pytestmark = pytest.mark.usefixtures(
'detect_runaway_subactors_per_test',
)
Snapshots descendant pids before+after each test; Snapshots descendant pids before+after each test;
for any pid spawned during the test that's still for any pid spawned during the test that's still
@ -826,7 +859,7 @@ def _detect_runaway_subactors_per_test():
as needed. as needed.
Cost: one extra `os.listdir('/proc')` snapshot Cost: one extra `os.listdir('/proc')` snapshot
pre-test, one snapshot + N×`psutil.cpu_percent(0.5)` pre-test, one snapshot + N×`psutil.cpu_percent(0.05)`
post-test (only when there ARE new descendants post-test (only when there ARE new descendants
most tests don't trigger any sampling). Skips most tests don't trigger any sampling). Skips
silently when `psutil` isn't installed. silently when `psutil` isn't installed.
@ -876,6 +909,11 @@ def _detect_runaway_subactors_per_test():
# subactor is still burning CPU when the next test # subactor is still burning CPU when the next test
# starts. The warning comes ONE TEST LATE which is # starts. The warning comes ONE TEST LATE which is
# imperfect but better than silence. # imperfect but better than silence.
#
# NB, in the typical clean case `pre_existing` is
# empty (no test descendants leftover) and the
# `find_runaway_subactors` call short-circuits
# without even loading `psutil`.
pre_existing: set[int] = set(find_descendants(parent_pid)) pre_existing: set[int] = set(find_descendants(parent_pid))
pre_runaways: list[tuple[int, float, str]] = ( pre_runaways: list[tuple[int, float, str]] = (
find_runaway_subactors( find_runaway_subactors(
@ -899,12 +937,17 @@ def _detect_runaway_subactors_per_test():
# `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md` # `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
# for the canonical fork-spawn forkserver-worker # for the canonical fork-spawn forkserver-worker
# post-fork-close gap). # post-fork-close gap).
#
# `new_pids` is typically empty for tests that
# cleanly tore down their subactor tree; the call
# short-circuits before any `psutil` work.
new_pids: set[int] = (
set(find_descendants(parent_pid)) - pre_existing
)
post_runaways: list[tuple[int, float, str]] = ( post_runaways: list[tuple[int, float, str]] = (
find_runaway_subactors( find_runaway_subactors(
parent_pid, parent_pid,
only_pids=set( only_pids=new_pids,
find_descendants(parent_pid)
) - pre_existing,
) )
) )
if post_runaways: if post_runaways:

View File

@ -1122,20 +1122,32 @@ async def _serve_ipc_eps(
) )
finally: finally:
# close every endpoint INDEPENDENTLY: a close raising
# mid-iter (e.g. UDS `os.unlink` racing concurrent reap) must
# not strand the rest of the eps + must not skip the
# `_shutdown.set()` below.
if eps: if eps:
addr: Address addr: Address
ep: Endpoint ep: Endpoint
for addr, ep in server.epsdict().items(): for addr, ep in list(server.epsdict().items()):
ep.close_listener() try:
server._endpoints.remove(ep) ep.close_listener()
except Exception as ep_close_err:
log.exception(
f'Endpoint close raised, continuing teardown\n'
f' |_{ep!r}\n'
f' |_{ep_close_err!r}\n'
)
finally:
try:
server._endpoints.remove(ep)
except ValueError:
pass
# actor = _state.current_actor() # always signal "shutdown" so `actor.cancel()` →
# if actor.is_arbiter: # `ipc_server.wait_for_shutdown()` doesn't deadlock when an
# import pdbp; pdbp.set_trace() # endpoint close raised above.
if server._shutdown is not None:
# signal the server is "shutdown"/"terminated"
# since no more active endpoints are active.
if not server._endpoints:
server._shutdown.set() server._shutdown.set()
@acm @acm

View File

@ -344,7 +344,18 @@ def close_listener(
''' '''
lstnr.socket.close() lstnr.socket.close()
os.unlink(addr.sockpath) # tolerate the sock-file being already gone — under concurrent
# pytest sessions sharing the bindspace dir, another session's
# reap path can unlink it first; raising here aborts the
# `_serve_ipc_eps` finally before `_shutdown.set()`, deadlocking
# `wait_for_shutdown()` on `actor.cancel()`.
try:
os.unlink(addr.sockpath)
except FileNotFoundError:
log.warning(
f'UDS sock-file already unlinked, skipping\n'
f' |_{addr.sockpath}\n'
)
async def open_unix_socket_w_passcred( async def open_unix_socket_w_passcred(

View File

@ -0,0 +1,473 @@
"""
`xontrib_tractor_diag`: pytest/tractor diagnostic aliases.
Provides:
- `pytree <pid|pgrep-pat>` psutil-backed proc tree,
live + zombies split.
- `hung-dump <pid|pat> [...]` kernel `wchan`/`stack` +
`py-spy dump` (incl `--locals`)
for each pid in tree.
- `bindspace-scan [<dir>]` find orphaned tractor UDS
sock files (no live owner pid).
default: `$XDG_RUNTIME_DIR/tractor`.
Loading from repo root:
xontrib load -p ./xontrib tractor_diag
Or source directly:
source ./xontrib/tractor_diag.xsh
Pipe-to-paste idiom (xonsh):
hung-dump pytest |t /tmp/hung.log
Requires `psutil` for full functionality (`pytree` and the
`hung-dump` tree-walk). Falls back to `pgrep -P` recursion
if missing.
"""
import os
import re
import subprocess as sp
from pathlib import Path
try:
import psutil
except ImportError:
psutil = None
print(
'[tractor-diag] `psutil` missing — '
'pytree disabled, hung-dump uses pgrep fallback. '
'`uv pip install psutil` for full functionality.'
)
# matches tractor's UDS sock naming: `<actor_name>@<pid>.sock`
_UDS_SOCK_RE = re.compile(
r'^(?P<name>.+)@(?P<pid>\d+)\.sock$'
)
# --- helpers --------------------------------------------------
def _resolve_pids(arg: str) -> list:
'''Resolve a numeric pid OR a `pgrep -f` pattern.'''
if arg.isdigit():
return [int(arg)]
try:
out = sp.check_output(
['pgrep', '-f', arg],
text=True,
)
except sp.CalledProcessError:
return []
return [int(p) for p in out.split() if p]
def _walk_tree_psutil(pid: int) -> list:
'''Flat list `[Process, *descendants]` via psutil.'''
try:
p = psutil.Process(pid)
except psutil.NoSuchProcess:
return []
return [p] + p.children(recursive=True)
def _walk_tree_with_depth(pid: int):
'''
Yield `(proc, depth)` pairs walking `pid`'s tree. `depth==0`
is the root; `depth==1` are direct children, etc. Used by
`pytree` to render parent/child relationships visually.
'''
try:
root = psutil.Process(pid)
except psutil.NoSuchProcess:
return
yield root, 0
stack: list = [(root, 0)]
seen: set = {pid}
while stack:
parent, d = stack.pop()
try:
kids = parent.children()
except psutil.NoSuchProcess:
continue
for k in kids:
if k.pid in seen:
continue
seen.add(k.pid)
yield k, d + 1
stack.append((k, d + 1))
def _walk_tree_pgrep(pid: int) -> list:
'''psutil-less fallback — recursive `pgrep -P`.'''
out = [pid]
try:
kids = sp.check_output(
['pgrep', '-P', str(pid)],
text=True,
).split()
except sp.CalledProcessError:
return out
for k in kids:
out.extend(_walk_tree_pgrep(int(k)))
return out
def _ensure_sudo_cached() -> bool:
'''
Ensure `sudo` credentials are cached so subsequent
`sudo -n` calls succeed without prompting.
Returns True if cached (or successfully refreshed),
False if user cancelled or sudo is unavailable.
Tries `sudo -n true` first as a no-op probe; if that
fails, runs `sudo -v` which prompts interactively to
validate/refresh the credential timestamp.
'''
# probe — already cached?
cached = sp.run(
['sudo', '-n', 'true'],
capture_output=True,
).returncode == 0
if cached:
return True
print(
'[tractor-diag] needs `sudo` for /proc/<pid>/stack '
'and `py-spy dump`; caching creds via `sudo -v`...'
)
try:
rc = sp.run(['sudo', '-v']).returncode
except KeyboardInterrupt:
print(' cancelled — proceeding without sudo')
return False
except FileNotFoundError:
print(' sudo not on PATH — proceeding without sudo')
return False
return rc == 0
# --- pytree ---------------------------------------------------
def _pytree(args):
'''
psutil-backed proc tree; per-proc classification into
severity-ordered buckets so leaked / defunct procs
don't hide in the noise of normal `live` rows.
usage: pytree <pid|pgrep-pattern> [...]
classification (per-proc, not per-tree):
- zombies: `status in (Z, X)` — defunct, parent
hasn't reaped (or kernel-marked dead).
- orphans: `ppid == 1` — original parent exited;
has been reparented to init. Includes
the *root* of an abandoned tree AND
any descendant that ended up reparented
to init mid-flight.
- live: real parent (`ppid > 1`), non-defunct.
Trees of orphan roots are still walked — their
descendants show as `live` if they themselves still
have a real (non-init) parent (the orphan root), but
the orphan root itself appears in `orphans`.
'''
if not args:
print('usage: pytree <pid|pgrep-pattern> [...]')
return 1
if psutil is None:
print('pytree requires psutil; install via `uv pip install psutil`')
return 1
roots: list = []
for a in args:
roots.extend(_resolve_pids(a))
roots = sorted(set(roots))
if not roots:
print(f'(no procs match: {args})')
return 1
# statuses considered "defunct" — STATUS_ZOMBIE is the
# common case (`Z`); STATUS_DEAD (`X`) is rarer but kernel-
# reported and equally not-coming-back.
defunct_statuses: set = {
psutil.STATUS_ZOMBIE,
getattr(psutil, 'STATUS_DEAD', 'dead'),
}
seen: set = set()
live: list = [] # [(proc, depth)]
orphans: list = []
zombies: list = []
gone: list = []
for r in roots:
for (p, depth) in _walk_tree_with_depth(r):
if p.pid in seen:
continue
seen.add(p.pid)
try:
status: str = p.status()
ppid: int = p.ppid()
except psutil.NoSuchProcess:
gone.append(p.pid)
continue
entry = (p, depth)
# severity order: zombie > orphan > live.
if status in defunct_statuses:
zombies.append(entry)
elif ppid == 1:
orphans.append(entry)
else:
live.append(entry)
total: int = len(live) + len(orphans) + len(zombies)
print(f'# pytree: {total} procs across roots {roots}')
hdr = ' ' + 'PID'.rjust(7) + ' ' + 'PPID'.rjust(7) + ' '
hdr += 'STATUS'.ljust(10) + ' CMD'
def _row(entry):
'''
Render `(proc, depth)` as an aligned row. Tree depth is
rendered as a `└─` marker on the CMD column so PID/PPID/
STATUS stay column-aligned.
'''
p, depth = entry
tree_pfx = (' ' * depth) + ('└─ ' if depth > 0 else '')
# NOTE: `psutil.ZombieProcess` is a *subclass* of
# `psutil.NoSuchProcess`, but the proc is NOT gone —
# it's a zombie whose `/proc/<pid>/cmdline` is empty/
# unreadable. Catch it FIRST so we still render a
# row (using fields that DO work on zombies: pid,
# ppid, status, name).
try:
cmd = ' '.join(p.cmdline())[:140] or '[' + p.name() + ']'
r = ' ' + str(p.pid).rjust(7)
r += ' ' + str(p.ppid()).rjust(7)
r += ' ' + p.status().ljust(10)
r += ' ' + tree_pfx + cmd
return r
except psutil.ZombieProcess:
try:
ppid = str(p.ppid())
name = p.name()
except psutil.NoSuchProcess:
ppid, name = '?', '?'
r = ' ' + str(p.pid).rjust(7)
r += ' ' + ppid.rjust(7)
r += ' ' + 'zombie'.ljust(10)
r += ' ' + tree_pfx + '[' + name + ' <defunct>]'
return r
except psutil.NoSuchProcess:
return ' ' + str(p.pid).rjust(7) + ' (gone mid-walk)'
def _section(title: str, procs: list, hint: str = ''):
print(f'\n## {title} ({len(procs)})' + (f' — {hint}' if hint else ''))
if not procs:
print(' (none)')
return
print(hdr)
for p in procs:
print(_row(p))
# severity-ordered: most concerning first.
_section(
'zombies', zombies,
'status `Z`/`X`, parent has not reaped',
)
_section(
'orphans', orphans,
'`ppid==1`, reparented to init (leaked / parent gone)',
)
_section('live', live)
if gone:
print(f'\n## gone-during-walk ({len(gone)}): {gone}')
if gone:
print(f'\n## gone-during-walk ({len(gone)}): {gone}')
# --- hung-dump ------------------------------------------------
def _hung_dump(args):
'''
kernel + python state for a hung pytest/tractor tree.
walks all descendants of each `<pid|pgrep-pat>` arg.
usage: hung-dump <pid|pgrep-pattern> [...]
note: `/proc/<pid>/stack` and `py-spy dump` typically
require CAP_SYS_PTRACE — invoked via `sudo -n`. run
`sudo true` first to cache creds.
'''
if not args:
print('usage: hung-dump <pid|pgrep-pattern> [...]')
return 1
# cache sudo creds upfront so per-pid `sudo -n` calls
# for `cat /proc/<pid>/stack` and `py-spy dump` don't
# each prompt (or silently fail).
have_sudo: bool = _ensure_sudo_cached()
roots: list = []
for a in args:
roots.extend(_resolve_pids(a))
roots = sorted(set(roots))
if not roots:
print(f'(no procs match: {args})')
return 1
pids: list = []
seen: set = set()
for r in roots:
if psutil is not None:
walk = [p.pid for p in _walk_tree_psutil(r)]
else:
walk = _walk_tree_pgrep(r)
for pid in walk:
if pid not in seen:
seen.add(pid)
pids.append(pid)
print(f'# tree: {pids}')
print('\n## ps forest')
$[ps -o pid,ppid,pgid,stat,cmd -p @(','.join(map(str, pids)))]
for pid in pids:
print(f'\n## pid {pid}')
for f in ('wchan', 'stack'):
path = Path(f'/proc/{pid}/{f}')
try:
txt = path.read_text().rstrip()
print(f'-- /proc/{pid}/{f} --\n{txt}')
except PermissionError:
if not have_sudo:
print(
f'-- /proc/{pid}/{f}: '
'PermissionError (no sudo) --'
)
continue
try:
txt = sp.check_output(
['sudo', '-n', 'cat', str(path)],
text=True,
stderr=sp.DEVNULL,
).rstrip()
print(f'-- /proc/{pid}/{f} (sudo) --\n{txt}')
except sp.CalledProcessError:
print(
f'-- /proc/{pid}/{f}: '
'sudo cred expired? rerun --'
)
except FileNotFoundError:
print(f'-- /proc/{pid}/{f}: proc gone --')
print(f'-- py-spy {pid} --')
if not have_sudo:
print(' (skipped — no sudo)')
continue
try:
$[sudo -n py-spy dump --pid @(pid) --locals]
except Exception as e:
print(f' (py-spy failed: {e})')
# --- bindspace-scan -------------------------------------------
def _bindspace_scan(args):
'''
Scan a tractor UDS bindspace dir for orphan sock files
(those whose embedded `<pid>` no longer corresponds to
a live process).
usage: bindspace-scan [<dir>]
default: `$XDG_RUNTIME_DIR/tractor`
(or `/run/user/<uid>/tractor`)
'''
if args:
bs_dir = Path(args[0])
else:
runtime = os.environ.get(
'XDG_RUNTIME_DIR',
f'/run/user/{os.getuid()}',
)
bs_dir = Path(runtime) / 'tractor'
if not bs_dir.exists():
print(f'(no bindspace at {bs_dir})')
return 1
socks = sorted(bs_dir.glob('*.sock'))
print(f'## bindspace {bs_dir} ({len(socks)} sock file(s))')
live: list = []
orphans: list = []
bogus: list = []
for s in socks:
m = _UDS_SOCK_RE.match(s.name)
if not m:
bogus.append(s)
continue
pid = int(m['pid'])
name = m['name']
try:
os.kill(pid, 0)
live.append((s, pid, name))
except ProcessLookupError:
orphans.append((s, pid, name))
except PermissionError:
# exists but owned by another user
live.append((s, pid, name))
print(f'\n## live ({len(live)})')
if not live:
print(' (none)')
for s, pid, name in live:
row = ' ' + str(pid).rjust(7)
row += ' ' + name.ljust(32)
row += ' ' + s.name
print(row)
print(f'\n## orphaned ({len(orphans)})')
if not orphans:
print(' (none)')
for s, pid, name in orphans:
row = ' ' + str(pid).rjust(7)
row += ' ' + name.ljust(32)
row += ' ' + s.name + ' (no live proc)'
print(row)
if bogus:
print(f'\n## unparseable ({len(bogus)})')
for s in bogus:
print(f' {s.name}')
if orphans:
unlink_cmd = ' '.join(str(o[0]) for o in orphans)
print(f'\nto unlink orphans:\n rm {unlink_cmd}')
# --- registration ---------------------------------------------
aliases['pytree'] = _pytree
aliases['hung-dump'] = _hung_dump
aliases['bindspace-scan'] = _bindspace_scan
# xontrib protocol hooks (for `xontrib load tractor_diag`).
# also harmless when sourced directly.
def _load_xontrib_(xsh, **_):
return {}
def _unload_xontrib_(xsh, **_):
for name in ('pytree', 'hung-dump', 'bindspace-scan'):
aliases.pop(name, None)
return {}