Compare commits
No commits in common. "caebf60f4ebf78d0f3c7069d92fb5cff5d9c5c40" and "2ee44a6fdd11b1750ae1363489b59e7bab4c47ba" have entirely different histories.
caebf60f4e
...
2ee44a6fdd
|
|
@ -1,125 +0,0 @@
|
|||
# `RuntimeVars` env-var lift — design plan
|
||||
|
||||
Status: **draft, awaiting user edits**
|
||||
|
||||
## Goal
|
||||
|
||||
Consolidate the sprawl of pytest CLI flags + ad-hoc env vars +
|
||||
hardcoded fixture defaults into a *single* env-var-encoded
|
||||
runtime-vars envelope, with a typed in-memory representation
|
||||
(`tractor.runtime._state.RuntimeVars`) as the sole source of
|
||||
truth.
|
||||
|
||||
## Why now
|
||||
|
||||
- `--tpt-proto`, `--spawn-backend`, `--diag-on-hang`,
|
||||
`--diag-capture-delay` and (soon) `TRACTOR_REG_ADDR` etc. are
|
||||
proliferating. Each adds a parsing seam.
|
||||
- `tests/devx/test_debugger.py` invokes example scripts as
|
||||
separate subprocesses; they currently can't see the
|
||||
fixture-allocated `reg_addr` at all (root cause of why
|
||||
parametrizing devx scripts on `reg_addr` is on your TODO).
|
||||
- Concurrent pytest sessions on the same host collide on
|
||||
shared defaults (the `registry@1616` race we just fixed is
|
||||
one symptom; per-session unique addr is the structural
|
||||
fix).
|
||||
- `tractor.runtime._state.RuntimeVars: Struct` is already
|
||||
defined and **unused** — its docstring even says it
|
||||
"should be utilized as possible for future calls."
|
||||
|
||||
## Design
|
||||
|
||||
### Module: `tractor/_testing/_rtvars.py`
|
||||
|
||||
Lifted from `modden.runtime.env`, ~50 LOC, no new deps.
|
||||
|
||||
```python
|
||||
_TRACTOR_RT_VARS_OSENV: str = '_TRACTOR_RT_VARS'
|
||||
|
||||
def dump_rtvars(rtvars: RuntimeVars|dict) -> tuple[str, str]:
|
||||
'''str-serialize via `str(dict)` — ast.literal_eval-able'''
|
||||
|
||||
def load_rtvars(env: dict) -> RuntimeVars:
|
||||
'''ast.literal_eval the env-var value, hydrate to struct'''
|
||||
|
||||
def get_rtvars(proc: psutil.Process|None = None) -> RuntimeVars:
|
||||
'''read the var from a target proc's env (or current)'''
|
||||
|
||||
def update_rtvars(
|
||||
rtvars: RuntimeVars|dict|None = None,
|
||||
update_osenv: bool|dict = True,
|
||||
) -> tuple[str, str]:
|
||||
'''mutate + re-encode + (optionally) write to os.environ'''
|
||||
```
|
||||
|
||||
### Encoding choice: `str(dict)` + `ast.literal_eval`
|
||||
|
||||
Pros:
|
||||
- stdlib only
|
||||
- handles all the types tractor's tests need: `str`, `int`,
|
||||
`float`, `bool`, `None`, `list`, `tuple`, `dict`
|
||||
- human-readable in the env (greppable, inspectable via
|
||||
`cat /proc/<pid>/environ | tr '\0' '\n'`)
|
||||
|
||||
Cons:
|
||||
- non-stdlib types (msgspec Structs, `Path`, custom classes)
|
||||
must be lowered first — fine for the test fixture set
|
||||
- not stable across Python versions for esoteric repr cases
|
||||
(we don't hit any)
|
||||
|
||||
Alternatives considered:
|
||||
- **msgpack**: adds a dep + binary form is ungreppable
|
||||
- **json**: doesn't preserve tuples (becomes lists), which is
|
||||
a common type for `reg_addr`
|
||||
- **toml/yaml**: heavier deps, no real benefit
|
||||
|
||||
### `RuntimeVars` becomes the single source of truth
|
||||
|
||||
The legacy `_runtime_vars: dict[str, Any]` global in
|
||||
`runtime/_state.py` becomes a *cached view* of a
|
||||
`RuntimeVars` singleton instance:
|
||||
|
||||
- `get_runtime_vars()` returns either the struct or a
|
||||
`.to_dict()` view depending on caller's preference
|
||||
- `set_runtime_vars(...)` validates against the struct schema
|
||||
- spawn-time SpawnSpec sends the struct (already does
|
||||
conceptually — just gets typed)
|
||||
- `__setattr__` `breakpoint()` debug instrumentation gets
|
||||
removed (unrelated cleanup, mentioned in conversation)
|
||||
|
||||
### Migration path
|
||||
|
||||
**Phase 0** *(prep)*: strip the stray `breakpoint()` from
|
||||
`RuntimeVars.__setattr__`.
|
||||
|
||||
**Phase 1**: land `_rtvars.py` as a leaf module, used only by
|
||||
test infra. Subprocess-spawned scripts in `tests/devx/`
|
||||
read `_TRACTOR_RT_VARS` on startup → reconstruct
|
||||
`RuntimeVars` → call `tractor.open_root_actor(**rtvars.as_kwargs())`.
|
||||
Concurrent runs become deterministic-isolated because each
|
||||
session writes a unique `_registry_addrs` into the env.
|
||||
|
||||
**Phase 2**: migrate runtime callers (`_state.get_runtime_vars`,
|
||||
spawn `SpawnSpec`, `Actor.async_main`) to operate on the
|
||||
struct directly, with the dict as a compat view that gets
|
||||
deprecated.
|
||||
|
||||
**Phase 3** *(structural)*: per-session bindspace subdir
|
||||
`/run/user/<uid>/tractor/<session_uuid>/` — encoded in the
|
||||
rt-vars envelope, picked up by every subactor automatically.
|
||||
Obsoletes the entire bindspace-leak warning class.
|
||||
|
||||
## Open design questions (user input wanted)
|
||||
|
||||
- (placeholder for your edits)
|
||||
- (placeholder)
|
||||
- (placeholder)
|
||||
|
||||
## Out-of-scope for this lift
|
||||
|
||||
- Anything in `modden.runtime.env` related to `Spawn`,
|
||||
`WmCtl`, `Wks` — that's a workspace orchestration layer,
|
||||
not an env-var helper. We only lift the four utility
|
||||
functions + the var name constant.
|
||||
- Switching to msgpack/json — explicitly chosen against
|
||||
above.
|
||||
|
|
@ -86,182 +86,26 @@ def test_register_duplicate_name(
|
|||
daemon: subprocess.Popen,
|
||||
reg_addr: UnwrappedAddress,
|
||||
):
|
||||
# bug-class-3 breadcrumbs: the *last* `[CANCEL]` line that
|
||||
# appears under `--ll cancel`/`TRACTOR_LOG_FILE=...` names the
|
||||
# cancel-cascade boundary that's parked. Pair with
|
||||
# `_trio_main` entry/exit breadcrumbs in
|
||||
# `tractor/spawn/_entry.py` to triangulate the swallow point.
|
||||
log = tractor.log.get_logger('tractor.tests.test_multi_program')
|
||||
|
||||
async def main():
|
||||
log.cancel('test_register_duplicate_name: enter `main()`')
|
||||
try:
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
) as an:
|
||||
log.cancel(
|
||||
'test_register_duplicate_name: '
|
||||
'actor nursery opened'
|
||||
)
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
) as an:
|
||||
|
||||
assert not current_actor().is_registrar
|
||||
assert not current_actor().is_registrar
|
||||
|
||||
p1 = await an.start_actor('doggy')
|
||||
log.cancel(
|
||||
'test_register_duplicate_name: '
|
||||
'spawned doggy #1'
|
||||
)
|
||||
p2 = await an.start_actor('doggy')
|
||||
log.cancel(
|
||||
'test_register_duplicate_name: '
|
||||
'spawned doggy #2'
|
||||
)
|
||||
p1 = await an.start_actor('doggy')
|
||||
p2 = await an.start_actor('doggy')
|
||||
|
||||
async with tractor.wait_for_actor('doggy') as portal:
|
||||
log.cancel(
|
||||
'test_register_duplicate_name: '
|
||||
'`wait_for_actor` returned'
|
||||
)
|
||||
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
|
||||
async with tractor.wait_for_actor('doggy') as portal:
|
||||
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
|
||||
|
||||
log.cancel(
|
||||
'test_register_duplicate_name: '
|
||||
'ABOUT TO CALL `an.cancel()`'
|
||||
)
|
||||
await an.cancel()
|
||||
log.cancel(
|
||||
'test_register_duplicate_name: '
|
||||
'`an.cancel()` returned'
|
||||
)
|
||||
finally:
|
||||
log.cancel(
|
||||
'test_register_duplicate_name: '
|
||||
'`open_nursery.__aexit__` returned, leaving `main()`'
|
||||
)
|
||||
await an.cancel()
|
||||
|
||||
# XXX, run manually since we want to start this root **after**
|
||||
# the other "daemon" program with it's own root.
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'n_dups',
|
||||
[
|
||||
2,
|
||||
# `n_dups=4` exposes a SEPARATE pre-existing race: under
|
||||
# rapid same-name spawning against a forkserver +
|
||||
# registrar, ONE of the spawned doggies (typically the
|
||||
# 3rd) `sys.exit(2)`s during boot before completing
|
||||
# parent-handshake. Surfaces now (post the spawn-time
|
||||
# `wait_for_peer_or_proc_death` fix) as `ActorFailure
|
||||
# rc=2`; previously it was silently masked by the
|
||||
# handshake-wait parking forever.
|
||||
#
|
||||
# Tracked separately in,
|
||||
# https://github.com/goodboy/tractor/issues/456
|
||||
pytest.param(
|
||||
4,
|
||||
marks=pytest.mark.xfail(
|
||||
strict=False,
|
||||
reason=(
|
||||
'doggy boot-race rc=2 under rapid same-name '
|
||||
'spawn — separate bug from cancel-cascade'
|
||||
),
|
||||
),
|
||||
),
|
||||
8,
|
||||
],
|
||||
ids=lambda n: f'n_dups={n}',
|
||||
)
|
||||
def test_dup_name_cancel_cascade_escalates_to_hard_kill(
|
||||
daemon: subprocess.Popen,
|
||||
reg_addr: UnwrappedAddress,
|
||||
n_dups: int,
|
||||
):
|
||||
'''
|
||||
Regression for the duplicate-name cancel-cascade hang under
|
||||
`tcp+main_thread_forkserver`.
|
||||
|
||||
When N actors share a single name and the parent calls
|
||||
`an.cancel()`, the daemon registrar gets N `register_actor` RPCs
|
||||
in tight succession. Under TCP+MTF, kernel-level socket-buffer
|
||||
contention can push at least one sub-actor's cancel-RPC ack past
|
||||
`Portal.cancel_timeout` (default 0.5s).
|
||||
|
||||
Pre-fix, `Portal.cancel_actor()` silently returned `False` on
|
||||
that timeout, the supervisor's outer `move_on_after(3)` never
|
||||
fired (each per-portal task always returned ≤0.5s, never
|
||||
exceeded 3s), and `soft_kill()`'s `await wait_func(proc)` parked
|
||||
forever — deadlocking nursery `__aexit__`.
|
||||
|
||||
Post-fix, `Portal.cancel_actor()` raises `ActorTooSlowError` on
|
||||
the bounded-wait timeout, and `ActorNursery.cancel()`'s
|
||||
per-child wrapper escalates to `proc.terminate()` (hard-kill).
|
||||
The full nursery teardown therefore stays bounded even under
|
||||
pathological timing.
|
||||
|
||||
`n_dups` is parametrized to widen the race window — more
|
||||
same-name siblings = more concurrent register-RPCs at the
|
||||
daemon = higher probability of hitting the contention path.
|
||||
|
||||
'''
|
||||
log = tractor.log.get_logger(
|
||||
'tractor.tests.test_multi_program'
|
||||
)
|
||||
|
||||
# outer hard ceiling: a regression should fail-fast, NOT hang
|
||||
# the test session for minutes. Budget scales with `n_dups`
|
||||
# since each extra same-name sibling adds ~spawn-cost +
|
||||
# potential cancel-ack-timeout escalation latency under
|
||||
# TCP+forkserver. ~5s/sibling + 15s baseline gives plenty of
|
||||
# headroom while still failing-loud on a real hang.
|
||||
fail_after_s: int = 15 + (5 * n_dups)
|
||||
|
||||
async def main():
|
||||
log.cancel(
|
||||
f'enter `main()` n_dups={n_dups}'
|
||||
)
|
||||
with trio.fail_after(fail_after_s):
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
) as an:
|
||||
portals: list[Portal] = []
|
||||
for i in range(n_dups):
|
||||
p: Portal = await an.start_actor('doggy')
|
||||
portals.append(p)
|
||||
log.cancel(
|
||||
f'spawned doggy #{i + 1}/{n_dups}'
|
||||
)
|
||||
|
||||
# at least one of the N must be discoverable by
|
||||
# name; doesn't matter which one (registrar will
|
||||
# have last-wins semantics under same-name).
|
||||
async with tractor.wait_for_actor('doggy') as portal:
|
||||
expected_uids = {p.channel.uid for p in portals}
|
||||
assert portal.channel.uid in expected_uids
|
||||
|
||||
# critical section: this MUST return within
|
||||
# `fail_after_s` even when one or more cancel-RPC
|
||||
# acks time out. Pre-fix, this hangs forever.
|
||||
log.cancel('about to call `an.cancel()`')
|
||||
await an.cancel()
|
||||
log.cancel('`an.cancel()` returned')
|
||||
|
||||
# post-teardown sanity: every child proc must be reaped.
|
||||
# If escalation worked, even timed-out cancel-RPCs would
|
||||
# have triggered `proc.terminate()` and the procs are dead.
|
||||
for p in portals:
|
||||
# `Portal.channel.connected()` -> False once the
|
||||
# underlying chan disconnected (clean exit OR
|
||||
# hard-killed proc both produce disconnect).
|
||||
assert not p.channel.connected(), (
|
||||
f'Portal chan still connected post-teardown?\n'
|
||||
f'{p.channel}'
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def get_root_portal(
|
||||
ctx: Context,
|
||||
|
|
|
|||
|
|
@ -59,18 +59,15 @@ async def chk_tpts(
|
|||
)
|
||||
def test_root_passes_tpt_to_sub(
|
||||
tpt_proto_key: str,
|
||||
tpt_proto: str,
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
# `reg_addr` is sourced from the CLI `--tpt-proto={tpt_proto}`,
|
||||
# so when the parametrized `tpt_proto_key` differs, the test
|
||||
# asks the runtime to `enable_transports=[<other_proto>]` while
|
||||
# pointing `registry_addrs` at a `reg_addr` of the wrong proto.
|
||||
# The layer-2 guard in `open_root_actor` is expected to fail
|
||||
# fast with `ValueError` on this mismatch (rather than the prior
|
||||
# silent hang during the registrar handshake).
|
||||
proto_mismatch: bool = (tpt_proto_key != tpt_proto)
|
||||
# XXX NOTE, the `reg_addr` addr won't be the same type as the
|
||||
# `tpt_proto_key` would deliver here unless you pass `--tpt-proto
|
||||
# <tpt_proto_key>` on the CLI.
|
||||
#
|
||||
# if tpt_proto_key == 'uds':
|
||||
# breakpoint()
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
|
|
@ -102,14 +99,4 @@ def test_root_passes_tpt_to_sub(
|
|||
# shudown sub-actor(s)
|
||||
await an.cancel()
|
||||
|
||||
if proto_mismatch:
|
||||
# mismatched proto must raise `ValueError` from the
|
||||
# `open_root_actor` runtime guard before any subactor spawn.
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
trio.run(main)
|
||||
msg: str = str(excinfo.value)
|
||||
assert 'enable_transports' in msg
|
||||
assert 'registry_addrs' in msg
|
||||
assert tpt_proto_key in msg or tpt_proto in msg
|
||||
else:
|
||||
trio.run(main)
|
||||
trio.run(main)
|
||||
|
|
|
|||
|
|
@ -89,28 +89,6 @@ class ActorFailure(RuntimeFailure):
|
|||
'''
|
||||
|
||||
|
||||
class ActorTooSlowError(RuntimeFailure):
|
||||
'''
|
||||
A peer-`Actor` failed to ack an actor-runtime cancel-cascade
|
||||
request (e.g. `Portal.cancel_actor()` -> `Actor.cancel()`)
|
||||
within the bounded wait window.
|
||||
|
||||
Distinct exc-type (NOT a `trio.TooSlowError` subclass) so that
|
||||
`except trio.TooSlowError:` blocks elsewhere in the test-suite
|
||||
or `tractor` internals do NOT silently mask actor-cancel
|
||||
timeouts — these MUST propagate so a supervisor can escalate
|
||||
to `proc.terminate()` (hard-kill) per SC-discipline:
|
||||
|
||||
graceful cancel-req -> bounded wait -> hard-kill
|
||||
|
||||
Reason: see #subint_forkserver duplicate-name hang
|
||||
diagnosis where `Portal.cancel_actor()` silently swallowed
|
||||
the timeout and the supervisor never escalated, leaving
|
||||
a same-named sibling subactor parked forever.
|
||||
|
||||
'''
|
||||
|
||||
|
||||
class InternalError(RuntimeError):
|
||||
'''
|
||||
Entirely unexpected internal machinery error indicating
|
||||
|
|
|
|||
|
|
@ -371,35 +371,6 @@ async def open_root_actor(
|
|||
for uw_addr in uw_reg_addrs
|
||||
]
|
||||
|
||||
# fail-fast on `enable_transports` / `registry_addrs` proto
|
||||
# mismatch — historically this caused a silent indefinite
|
||||
# hang during the registrar handshake (registry was reachable
|
||||
# only via a transport not in `enable_transports`, so the
|
||||
# actor could never connect to register/discover). See
|
||||
# `tests/ipc/test_multi_tpt.py::test_root_passes_tpt_to_sub`
|
||||
# for the foot-gun case + its layer-1 skip-guard.
|
||||
bad_addrs: list[tuple[str, Address]] = [
|
||||
(addr.proto_key, addr)
|
||||
for addr in registry_addrs
|
||||
if addr.proto_key not in enable_transports
|
||||
]
|
||||
if bad_addrs:
|
||||
mismatch_lines: str = '\n'.join(
|
||||
f' - proto_key={pk!r} addr={a!r}'
|
||||
for pk, a in bad_addrs
|
||||
)
|
||||
raise ValueError(
|
||||
f'`registry_addrs` contains addr(s) whose proto is '
|
||||
f'not in `enable_transports`!\n'
|
||||
f'enable_transports: {enable_transports!r}\n'
|
||||
f'mismatched_addrs:\n'
|
||||
f'{mismatch_lines}\n'
|
||||
f'\n'
|
||||
f'Either add the missing proto to '
|
||||
f'`enable_transports`, or remove the addr from '
|
||||
f'`registry_addrs`.'
|
||||
)
|
||||
|
||||
# Debug-mode is currently only supported for backends whose
|
||||
# subactor root runtime is trio-native (so `tractor.devx.
|
||||
# debug._tty_lock` works). See `_DEBUG_COMPATIBLE_BACKENDS`
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ from typing import (
|
|||
get_args,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
import warnings
|
||||
|
||||
import pytest
|
||||
import tractor
|
||||
|
|
@ -53,19 +52,8 @@ pytest_plugins: tuple[str, ...] = (
|
|||
if TYPE_CHECKING:
|
||||
from argparse import Namespace
|
||||
|
||||
|
||||
_cap_sys_passed_as_flag: bool = False
|
||||
|
||||
# Spawn backends that need `--capture=sys` to avoid the
|
||||
# fork-child×pytest-capture-fd deadlock. See the long
|
||||
# NOTE in `pytest_load_initial_conftests` below for the
|
||||
# full mechanism + tradeoff write-up.
|
||||
_CAPSYS_REQUIRED_SPAWNERS: frozenset[str] = frozenset({
|
||||
'main_thread_forkserver',
|
||||
# TODO future variant-2 'subint_forkserver' lands
|
||||
# here too once the impl is unblocked.
|
||||
})
|
||||
|
||||
_cap_fd_set: bool = False
|
||||
|
||||
# XXX REQUIRED in order to enforce `--capture=` flag
|
||||
# pre test session.
|
||||
|
|
@ -76,108 +64,67 @@ def pytest_load_initial_conftests(
|
|||
parser: pytest.Parser,
|
||||
args: list[str],
|
||||
):
|
||||
'''
|
||||
Validate the `--capture=` × `--spawn-backend=`
|
||||
combination at session-startup.
|
||||
global _cap_sys_passed_as_flag, _cap_fd_set
|
||||
|
||||
Background
|
||||
----------
|
||||
`--capture=sys` is REQUIRED for fork-based spawn backends (e.g.
|
||||
`main_thread_forkserver`): default `--capture=fd` redirects fd
|
||||
1,2 to temp files, and fork children inherit those fds — opaque
|
||||
deadlocks happen in the pytest-capture-machinery ↔ fork-child
|
||||
stdio interaction. `--capture=sys` only redirects Python- level
|
||||
`sys.stdout`/`sys.stderr`, leaving fd 1,2 alone.
|
||||
opts: Namespace = early_config.option
|
||||
if opts.capture == 'fd':
|
||||
_cap_fd_set = True
|
||||
|
||||
Trade-off (vs. `--capture=fd`):
|
||||
|
||||
- LOST: per-test attribution of subactor *raw-fd* output (C-ext
|
||||
writes, `os.write(2, ...)`, subproc stdout). Not zero — those
|
||||
go to the terminal, captured by CI's terminal-level capture,
|
||||
just not per-test-scoped in the pytest failure report.
|
||||
|
||||
- KEPT: Python-level `print()` + `logging` capture per-test
|
||||
(tractor's logger uses `sys.stderr`, so tractor log output IS
|
||||
still attributed per-test).
|
||||
|
||||
- KEPT: user `pytest -s` for debugging (unaffected).
|
||||
|
||||
Full post-mortem in
|
||||
`ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
|
||||
|
||||
Validation policy:
|
||||
- **CI mode** (`CI` env-var set): fail-fast at
|
||||
session start if a fork-spawn backend is requested
|
||||
WITHOUT `--capture=sys`. CI must be explicit; no
|
||||
auto-fallbacks. Forces every CI matrix-row's run
|
||||
line to declare its capture mode plainly.
|
||||
- **Local mode** (no `CI` env-var): emit a loud
|
||||
warning + suggest `--capture=sys`, but allow the
|
||||
run to proceed. Lets devs experiment with the bad
|
||||
combo (e.g. to validate whether recent
|
||||
fork-survival fixes have made `--capture=fd` work
|
||||
after all).
|
||||
|
||||
'''
|
||||
global _cap_sys_passed_as_flag
|
||||
opts_w_args: Namespace = parser.parse_known_args(args)
|
||||
spawner: str|None = getattr(
|
||||
opts_w_args,
|
||||
'spawn_backend',
|
||||
None,
|
||||
)
|
||||
capture: str|None = getattr(
|
||||
opts_w_args,
|
||||
'capture',
|
||||
None,
|
||||
)
|
||||
if opts_w_args.capture == 'fd':
|
||||
_cap_fd_set = True
|
||||
|
||||
if '--capture=sys' in args:
|
||||
_cap_sys_passed_as_flag = True
|
||||
assert capture == 'sys'
|
||||
|
||||
in_ci: bool = bool(os.environ.get('CI'))
|
||||
|
||||
# XXX, ALWAYS apply capsys for fork based spawners:
|
||||
# * main_thread_forkserver
|
||||
# * (TODO) subint_forkserver
|
||||
# '--capture=sys',
|
||||
# ^XXX NOTE^ for `main_thread_forkserver` spawner
|
||||
#
|
||||
# => sys-level capture is REQUIRED for fork-based spawn
|
||||
# backends (e.g. `main_thread_forkserver`): default
|
||||
# `--capture=fd` redirects fd 1,2 to temp files, and fork
|
||||
# children inherit those fds — opaque deadlocks happen in
|
||||
# the pytest-capture-machinery ↔ fork-child stdio
|
||||
# interaction. `--capture=sys` only redirects Python-level
|
||||
# `sys.stdout`/`sys.stderr`, leaving fd 1,2 alone.
|
||||
#
|
||||
# Trade-off (vs. `--capture=fd`):
|
||||
# - LOST: per-test attribution of subactor *raw-fd* output
|
||||
# (C-ext writes, `os.write(2, ...)`, subproc stdout). Not
|
||||
# zero — those go to the terminal, captured by CI's
|
||||
# terminal-level capture, just not per-test-scoped in the
|
||||
# pytest failure report.
|
||||
# - KEPT: Python-level `print()` + `logging` capture per-
|
||||
# test (tractor's logger uses `sys.stderr`, so tractor
|
||||
# log output IS still attributed per-test).
|
||||
# - KEPT: user `pytest -s` for debugging (unaffected).
|
||||
#
|
||||
# Full post-mortem in
|
||||
# `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
|
||||
if (
|
||||
spawner in _CAPSYS_REQUIRED_SPAWNERS
|
||||
and
|
||||
capture == 'fd'
|
||||
(spawner := opts_w_args.spawn_backend) in [
|
||||
'main_thread_forkserver',
|
||||
]
|
||||
):
|
||||
msg: str = (
|
||||
f'\n'
|
||||
f'XXX `--spawn-backend={spawner}` REQUIRES '
|
||||
f'`--capture=sys` XXX\n'
|
||||
f'fork-child × `--capture=fd` is a known '
|
||||
f'deadlock pattern.\n'
|
||||
f'See `tractor._testing.pytest`\'s '
|
||||
f'`pytest_load_initial_conftests` docstring '
|
||||
f'for the full mechanism.\n'
|
||||
f'\n'
|
||||
f'Re-invoke with `--capture=sys` (or run '
|
||||
f'with `pytest -s` for no capture).\n'
|
||||
print(
|
||||
f'XXX SETTING CAPSYS due to spawning backend XXX\n'
|
||||
f'--spawn-backend={spawner!r}\n'
|
||||
)
|
||||
# fail-fast: CI must declare capture explicitly for
|
||||
# fork-spawn backends.
|
||||
if in_ci:
|
||||
pytest.exit(
|
||||
f'{msg}\n'
|
||||
f'FAIL-FAST: CI=1 detected; aborting session.\n',
|
||||
returncode=2,
|
||||
)
|
||||
|
||||
# local: loud warn but let the run proceed so devs can
|
||||
# experiment.
|
||||
else:
|
||||
warnings.warn(
|
||||
f'{msg}\n'
|
||||
f'Local mode (no `CI` env var) — '
|
||||
f'continuing. Expect potential hangs.\n',
|
||||
category=UserWarning,
|
||||
stacklevel=1,
|
||||
)
|
||||
# ??TODO?? is there a way to force the `--capture=sys` sin CLI ??
|
||||
# - [x] ask pytest peeps in chat!
|
||||
# - [x] pytest` issue,
|
||||
# https://github.com/pytest-dev/pytest/issues/14444
|
||||
opts.capture = 'sys'
|
||||
# ^TODO XXX?/
|
||||
# seems like this doesn't get set by the above!?
|
||||
args.append(
|
||||
'--capture=sys',
|
||||
)
|
||||
out = parser.parse_known_and_unknown_args(
|
||||
args,
|
||||
early_config.option,
|
||||
)
|
||||
assert out[0].capture == 'sys'
|
||||
# breakpoint()
|
||||
|
||||
# TODO, set various `$TRACTOR_X*` osenv vars here!
|
||||
print(
|
||||
|
|
@ -452,6 +399,7 @@ def pytest_configure(
|
|||
)
|
||||
enable_stack_on_sig()
|
||||
except ImportError:
|
||||
import warnings
|
||||
warnings.warn(
|
||||
'`stackscope` not installed — '
|
||||
'--enable-stackscope is a no-op. '
|
||||
|
|
@ -674,36 +622,25 @@ def is_forking_spawner(
|
|||
|
||||
|
||||
def maybe_xfail_for_spawner(
|
||||
request: pytest.FixtureRequest,
|
||||
start_method: str,
|
||||
is_forking_spawner: bool,
|
||||
) -> None:
|
||||
'''
|
||||
Fork based spawning backends cause issues with
|
||||
`pytest`'s fd-capture mechanism and can cause various
|
||||
suites to hang.
|
||||
Fork based spawning backends caude issues with `pytest`'s
|
||||
fd-capture mechanism and can cause various suites to hang.
|
||||
|
||||
This helper allows skipping/xfailing from a test when
|
||||
a fork-spawn backend is being used WITHOUT
|
||||
`--capture=sys`.
|
||||
Instead this helper allows skipping/xfailing from a test
|
||||
when a certain spawner + CLI-flag input is detected.
|
||||
|
||||
'''
|
||||
capture_mode: str = request.config.option.capture
|
||||
# `tee-sys` is also sys-level capture (just additionally writes
|
||||
# to the original `sys.__stdout__/__stderr__`); fork-safe like
|
||||
# `sys`. Only `fd`-level capture is the deadlock pattern.
|
||||
if (
|
||||
capture_mode not in (
|
||||
'sys',
|
||||
'tee-sys',
|
||||
)
|
||||
not _cap_sys_passed_as_flag
|
||||
and
|
||||
is_forking_spawner
|
||||
):
|
||||
pytest.skip(
|
||||
f'Spawner {start_method!r} requires the flag,\n'
|
||||
f'--capture=sys or --capture=tee-sys..\n'
|
||||
f'(got --capture={capture_mode!r})\n'
|
||||
f'--capture=sys or similar..\n'
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -729,13 +666,8 @@ def set_fork_aware_capture(
|
|||
which can oddly make certain tests hang/fail.
|
||||
|
||||
'''
|
||||
# Fast-path: user already passed sys-level capture
|
||||
# (`sys` or `tee-sys`) at the CLI — no override needed.
|
||||
if request.config.option.capture in (
|
||||
'sys',
|
||||
'tee-sys',
|
||||
):
|
||||
return request.config.option.capture
|
||||
if _cap_sys_passed_as_flag:
|
||||
return 'sys'
|
||||
|
||||
capsys: pytest.CaptureFixture = maybe_override_capture(
|
||||
request=request,
|
||||
|
|
|
|||
|
|
@ -55,7 +55,6 @@ from ..msg import (
|
|||
Return,
|
||||
)
|
||||
from .._exceptions import (
|
||||
ActorTooSlowError,
|
||||
NoResult,
|
||||
TransportClosed,
|
||||
)
|
||||
|
|
@ -269,7 +268,6 @@ class Portal:
|
|||
async def cancel_actor(
|
||||
self,
|
||||
timeout: float | None = None,
|
||||
raise_on_timeout: bool = False,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
|
|
@ -283,17 +281,6 @@ class Portal:
|
|||
`._context.Context.cancel()` which CAN be used for this
|
||||
purpose.
|
||||
|
||||
`raise_on_timeout` (default `False`):
|
||||
|
||||
- `False` (legacy): on bounded-wait expiry, log at DEBUG
|
||||
and return `False`. Used by callers that issue cancel
|
||||
fire-and-forget and have their own escalation
|
||||
(e.g. `_spawn.soft_kill()` checks `proc.poll()` after).
|
||||
- `True`: on bounded-wait expiry, raise `ActorTooSlowError`
|
||||
so the caller MUST handle the failure explicitly.
|
||||
`ActorNursery.cancel()` opts in so it can escalate via
|
||||
`proc.terminate()` per SC-discipline.
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
|
||||
|
|
@ -314,16 +301,15 @@ class Portal:
|
|||
|
||||
# XXX the one spot we set it?
|
||||
chan._cancel_called: bool = True
|
||||
cancel_timeout: float = (
|
||||
timeout
|
||||
or
|
||||
self.cancel_timeout
|
||||
)
|
||||
try:
|
||||
# send cancel cmd - might not get response
|
||||
# XXX: sure would be nice to make this work with
|
||||
# a proper shield
|
||||
with trio.move_on_after(cancel_timeout) as cs:
|
||||
with trio.move_on_after(
|
||||
timeout
|
||||
or
|
||||
self.cancel_timeout
|
||||
) as cs:
|
||||
cs.shield: bool = True
|
||||
await self.run_from_ns(
|
||||
'self',
|
||||
|
|
@ -331,24 +317,16 @@ class Portal:
|
|||
)
|
||||
return True
|
||||
|
||||
# `move_on_after` fired — peer didn't ack within
|
||||
# bounded window. Behaviour depends on
|
||||
# `raise_on_timeout`:
|
||||
assert cs.cancelled_caught
|
||||
if raise_on_timeout:
|
||||
raise ActorTooSlowError(
|
||||
f'Peer {peer_id} did not ack `Actor.cancel()`'
|
||||
f'-RPC within bounded wait of '
|
||||
f'{cancel_timeout!r}s'
|
||||
if cs.cancelled_caught:
|
||||
# may timeout and we never get an ack (obvi racy)
|
||||
# but that doesn't mean it wasn't cancelled.
|
||||
log.debug(
|
||||
f'May have failed to cancel peer?\n'
|
||||
f'\n'
|
||||
f'c)=?> {peer_id}\n'
|
||||
)
|
||||
|
||||
# legacy fire-and-forget path: log + return False so
|
||||
# the caller can decide whether to escalate.
|
||||
log.debug(
|
||||
f'May have failed to cancel peer?\n'
|
||||
f'\n'
|
||||
f'c)=?> {peer_id}\n'
|
||||
)
|
||||
# if we get here some weird cancellation case happened
|
||||
return False
|
||||
|
||||
except TransportClosed as tpt_err:
|
||||
|
|
|
|||
|
|
@ -38,14 +38,8 @@ from ..discovery._addr import (
|
|||
UnwrappedAddress,
|
||||
mk_uuid,
|
||||
)
|
||||
from ._state import (
|
||||
current_actor,
|
||||
is_main_process,
|
||||
)
|
||||
from ..log import (
|
||||
get_logger,
|
||||
get_loglevel,
|
||||
)
|
||||
from ._state import current_actor, is_main_process
|
||||
from ..log import get_logger, get_loglevel
|
||||
from ._runtime import Actor
|
||||
from ._portal import Portal
|
||||
from ..trionics import (
|
||||
|
|
@ -53,7 +47,6 @@ from ..trionics import (
|
|||
collapse_eg,
|
||||
)
|
||||
from .._exceptions import (
|
||||
ActorTooSlowError,
|
||||
ContextCancelled,
|
||||
)
|
||||
from .._root import (
|
||||
|
|
@ -67,93 +60,11 @@ if TYPE_CHECKING:
|
|||
import multiprocessing as mp
|
||||
# from ..ipc._server import IPCServer
|
||||
from ..ipc import IPCServer
|
||||
from ..spawn._spawn import ProcessType
|
||||
|
||||
|
||||
log = get_logger()
|
||||
|
||||
|
||||
async def _try_cancel_then_kill(
|
||||
portal: Portal,
|
||||
# `ProcessType` is `TYPE_CHECKING`-only (defined under that
|
||||
# guard in `..spawn._spawn`) so we stringify here to avoid
|
||||
# eager runtime eval of the annotation at function-def time
|
||||
# (this module has no `from __future__ import annotations`).
|
||||
proc: 'ProcessType',
|
||||
subactor: Actor,
|
||||
debug_mode_active: bool = False,
|
||||
) -> None:
|
||||
'''
|
||||
Per-child cancel-then-escalate helper used by
|
||||
`ActorNursery.cancel()`.
|
||||
|
||||
Sends a graceful actor-runtime cancel-RPC via
|
||||
`Portal.cancel_actor(raise_on_timeout=True)`. If the bounded-wait
|
||||
expires before the peer ack's, `ActorTooSlowError` is raised and
|
||||
we escalate via `proc.terminate()` (SIGTERM) per SC-discipline:
|
||||
|
||||
graceful cancel-req -> bounded wait -> hard-kill
|
||||
|
||||
Without this escalation, a same-name sibling subactor whose
|
||||
cancel-RPC failed to ack within `Portal.cancel_timeout` (e.g.
|
||||
under TCP+forkserver register-RPC contention) would park the
|
||||
parent's `soft_kill()` watcher forever waiting on `proc.poll()`,
|
||||
deadlocking nursery `__aexit__`. See `ActorTooSlowError` for
|
||||
the wider write-up.
|
||||
|
||||
'''
|
||||
# XXX, do NOT escalate to `proc.terminate()` while ANY of
|
||||
# the following are true — SIGTERM-ing a sub would tear
|
||||
# down its sub-tree including any descendant proxying
|
||||
# stdio to/from a REPL-locked actor, clobbering the user's
|
||||
# debug session:
|
||||
#
|
||||
# - `Lock.ctx_in_debug is not None`: most precise — some
|
||||
# actor in the tree is currently REPL-locked. Set in the
|
||||
# root actor for the lifetime of the lock. Raceable
|
||||
# (false negative if SIGINT arrives before lock-acquire
|
||||
# RPC completes).
|
||||
#
|
||||
# - `_runtime_vars['_debug_mode']`: root-actor was opened
|
||||
# with `debug_mode=True` (via `open_root_actor` /
|
||||
# `open_nursery`). Set once at root boot, never cleared.
|
||||
# Catches deep-descendant REPL sessions even when the
|
||||
# intermediate nurseries didn't pass `debug_mode=` per-
|
||||
# child.
|
||||
#
|
||||
# - `debug_mode_active`: this nursery has at least one
|
||||
# child started with an explicit `debug_mode=` arg
|
||||
# (`ActorNursery._at_least_one_child_in_debug`). Catches
|
||||
# the case where root is NOT in debug-mode but a
|
||||
# nursery-direct child opted in.
|
||||
#
|
||||
# Independent because root may NOT be in debug-mode even
|
||||
# when a child is (only the child's `_runtime_vars` is
|
||||
# mutated by per-child `debug_mode=True`). ORing covers
|
||||
# every flavor without false-positively skipping
|
||||
# legitimate hard-kill paths in non-debug trees.
|
||||
if (
|
||||
debug.Lock.ctx_in_debug is not None
|
||||
or
|
||||
_state._runtime_vars.get('_debug_mode', False)
|
||||
or
|
||||
debug_mode_active
|
||||
):
|
||||
await portal.cancel_actor()
|
||||
return
|
||||
|
||||
try:
|
||||
await portal.cancel_actor(raise_on_timeout=True)
|
||||
except ActorTooSlowError as too_slow:
|
||||
log.error(
|
||||
f'Cancel-ack TIMED OUT for sub-actor\n'
|
||||
f' uid: {subactor.aid.reprol()!r}\n'
|
||||
f' reason: {too_slow}\n'
|
||||
f'-> escalating to `proc.terminate()` (hard-kill)\n'
|
||||
)
|
||||
proc.terminate()
|
||||
|
||||
|
||||
class ActorNursery:
|
||||
'''
|
||||
The fundamental actor supervision construct: spawn and manage
|
||||
|
|
@ -517,23 +428,10 @@ class ActorNursery:
|
|||
else: # there's no other choice left
|
||||
proc.terminate()
|
||||
|
||||
# spawn per-child cancel tasks; the helper
|
||||
# escalates to hard-kill on
|
||||
# `ActorTooSlowError` rather than silently
|
||||
# swallowing the cancel-ack timeout, EXCEPT
|
||||
# when this nursery has any debug-eligible
|
||||
# child (in which case we keep legacy
|
||||
# fire-and-forget semantics to avoid
|
||||
# clobbering an active REPL).
|
||||
# spawn cancel tasks for each sub-actor
|
||||
assert portal
|
||||
if portal.channel.connected():
|
||||
tn.start_soon(
|
||||
_try_cancel_then_kill,
|
||||
portal,
|
||||
proc,
|
||||
subactor,
|
||||
self._at_least_one_child_in_debug,
|
||||
)
|
||||
tn.start_soon(portal.cancel_actor)
|
||||
|
||||
log.cancel(msg)
|
||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||
|
|
|
|||
|
|
@ -368,7 +368,6 @@ from tractor.runtime._portal import Portal
|
|||
from ._spawn import (
|
||||
cancel_on_completion,
|
||||
soft_kill,
|
||||
wait_for_peer_or_proc_death,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -761,26 +760,6 @@ class _ForkedProc:
|
|||
self._pidfd = -1
|
||||
return self._returncode
|
||||
|
||||
def terminate(self) -> None:
|
||||
'''
|
||||
OS-level `SIGTERM` to the child. Swallows
|
||||
`ProcessLookupError` (already dead).
|
||||
|
||||
Mirrors `trio.Process.terminate()` /
|
||||
`multiprocessing.Process.terminate()` — sends SIGTERM
|
||||
(graceful, allows the child a chance to clean up via
|
||||
signal-handlers) rather than SIGKILL. Used by
|
||||
`ActorNursery.cancel()`'s per-child escalation when
|
||||
`Portal.cancel_actor()` raises `ActorTooSlowError`,
|
||||
and by the legacy `hard_kill=True` branch on the same
|
||||
path.
|
||||
|
||||
'''
|
||||
try:
|
||||
os.kill(self.pid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
|
||||
def kill(self) -> None:
|
||||
'''
|
||||
OS-level `SIGKILL` to the child. Swallows
|
||||
|
|
@ -969,18 +948,7 @@ async def main_thread_forkserver_proc(
|
|||
f' |_{proc}\n'
|
||||
)
|
||||
|
||||
# race the handshake-wait against proc-death so a
|
||||
# sub that dies during boot (e.g. crashed on import
|
||||
# before reaching `_actor_child_main`, leaving a
|
||||
# zombie + no cmdline) surfaces as `ActorFailure`
|
||||
# instead of parking the spawning task forever on
|
||||
# an unsignalled `_peer_connected[uid]` event.
|
||||
event, chan = await wait_for_peer_or_proc_death(
|
||||
ipc_server,
|
||||
uid,
|
||||
proc_wait=proc.wait,
|
||||
proc_repr=repr(proc),
|
||||
)
|
||||
event, chan = await ipc_server.wait_for_peer(uid)
|
||||
|
||||
except trio.Cancelled:
|
||||
cancelled_during_spawn = True
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ from tractor.log import get_logger
|
|||
from tractor.discovery._addr import (
|
||||
UnwrappedAddress,
|
||||
)
|
||||
from .._exceptions import ActorFailure
|
||||
from ._reap import unlink_uds_bind_addrs
|
||||
from tractor.runtime._portal import Portal
|
||||
from tractor.runtime._runtime import Actor
|
||||
|
|
@ -107,71 +106,6 @@ else:
|
|||
await trio.lowlevel.wait_readable(proc.sentinel)
|
||||
|
||||
|
||||
async def wait_for_peer_or_proc_death(
|
||||
ipc_server,
|
||||
uid: tuple[str, str],
|
||||
# TODO? not not types?
|
||||
proc_wait: 'Callable[[], Awaitable]',
|
||||
proc_repr: str = '',
|
||||
|
||||
) -> 'tuple[trio.Event, Channel]':
|
||||
'''
|
||||
Race `IPCServer.wait_for_peer(uid)` against the sub-proc's
|
||||
own `.wait()` coroutine. Whichever completes first cancels
|
||||
the other.
|
||||
|
||||
Used by every spawn-backend to detect a sub-actor that
|
||||
*dies during boot* before completing the parent-handshake-
|
||||
callback (e.g. crashed on import, exec'd-out, kernel-killed
|
||||
pre-`_actor_child_main`). Without this race, the
|
||||
handshake-wait — backed by an unsignalled `trio.Event` —
|
||||
parks the spawning task forever and leaves the dead child
|
||||
as a zombie since nobody calls `proc.wait()` to reap.
|
||||
|
||||
On normal handshake-complete: returns `(event, chan)`
|
||||
identical to a bare `wait_for_peer`.
|
||||
|
||||
On proc-death-first: raises `ActorFailure` carrying the
|
||||
proc's exit code, allowing the supervisor to surface a
|
||||
clean error rather than hanging indefinitely.
|
||||
|
||||
`proc_wait` is a 0-arg async callable returning the proc's
|
||||
exit-status — kept generic so each backend can pass its
|
||||
own (`trio.Process.wait`, `_ForkedProc.wait`,
|
||||
`proc_waiter(mp.Process)`, etc.).
|
||||
|
||||
`proc_repr` is an optional string used in the
|
||||
`ActorFailure` message for diag.
|
||||
|
||||
'''
|
||||
result: dict = {}
|
||||
|
||||
async def _await_handshake():
|
||||
event, chan = await ipc_server.wait_for_peer(uid)
|
||||
result['handshake'] = (event, chan)
|
||||
boot_n.cancel_scope.cancel()
|
||||
|
||||
async def _await_death():
|
||||
rc = await proc_wait()
|
||||
result['died'] = rc
|
||||
boot_n.cancel_scope.cancel()
|
||||
|
||||
async with trio.open_nursery() as boot_n:
|
||||
boot_n.start_soon(_await_handshake)
|
||||
boot_n.start_soon(_await_death)
|
||||
|
||||
if 'handshake' in result:
|
||||
return result['handshake']
|
||||
|
||||
# only reached if proc-death won the race
|
||||
raise ActorFailure(
|
||||
f'Sub-actor {uid!r} died during boot '
|
||||
f'(rc={result.get("died")!r}) before completing '
|
||||
f'parent-handshake.\n'
|
||||
f' proc: {proc_repr}'
|
||||
)
|
||||
|
||||
|
||||
def try_set_start_method(
|
||||
key: SpawnMethodKey
|
||||
|
||||
|
|
|
|||
|
|
@ -1,23 +1,15 @@
|
|||
"""
|
||||
`xontrib_tractor_diag`: pytest/tractor diagnostic aliases.
|
||||
|
||||
All aliases live under the `acli.` namespace so xonsh's
|
||||
prefix-completion treats them as a sub-cmd group — type
|
||||
`acli.<TAB>` to see the full set.
|
||||
|
||||
Provides:
|
||||
- `acli.pytree <pid|pgrep-pat>` psutil-backed proc tree,
|
||||
live + zombies split.
|
||||
- `acli.hung_dump <pid|pat> [...]` kernel `wchan`/`stack` +
|
||||
`py-spy dump` (incl `--locals`)
|
||||
for each pid in tree.
|
||||
- `acli.bindspace_scan [<dir>]` find orphaned tractor UDS
|
||||
sock files (no live owner pid).
|
||||
default: `$XDG_RUNTIME_DIR/tractor`.
|
||||
- `acli.reap [opts]` SC-polite zombie-subactor
|
||||
reaper + optional `/dev/shm/`
|
||||
+ UDS sock-file sweeps.
|
||||
alias for `scripts/tractor-reap`.
|
||||
- `pytree <pid|pgrep-pat>` psutil-backed proc tree,
|
||||
live + zombies split.
|
||||
- `hung-dump <pid|pat> [...]` kernel `wchan`/`stack` +
|
||||
`py-spy dump` (incl `--locals`)
|
||||
for each pid in tree.
|
||||
- `bindspace-scan [<dir>]` find orphaned tractor UDS
|
||||
sock files (no live owner pid).
|
||||
default: `$XDG_RUNTIME_DIR/tractor`.
|
||||
|
||||
Loading from repo root:
|
||||
xontrib load -p ./xontrib tractor_diag
|
||||
|
|
@ -44,7 +36,7 @@ except ImportError:
|
|||
psutil = None
|
||||
print(
|
||||
'[tractor-diag] `psutil` missing — '
|
||||
'acli.pytree disabled, acli.hung_dump uses pgrep fallback. '
|
||||
'pytree disabled, hung-dump uses pgrep fallback. '
|
||||
'`uv pip install psutil` for full functionality.'
|
||||
)
|
||||
|
||||
|
|
@ -165,7 +157,7 @@ def _pytree(args):
|
|||
severity-ordered buckets so leaked / defunct procs
|
||||
don't hide in the noise of normal `live` rows.
|
||||
|
||||
usage: acli.pytree [--tree|-t] <pid|pgrep-pattern> [...]
|
||||
usage: pytree <pid|pgrep-pattern> [...]
|
||||
|
||||
classification (per-proc, not per-tree):
|
||||
|
||||
|
|
@ -182,43 +174,20 @@ def _pytree(args):
|
|||
descendants show as `live` if they themselves still
|
||||
have a real (non-init) parent (the orphan root), but
|
||||
the orphan root itself appears in `orphans`.
|
||||
|
||||
Cross-bucket parent annotation (always emitted):
|
||||
when a row's parent (by ppid) lives in a *different*
|
||||
severity bucket, the row is suffixed with
|
||||
`[parent: <pid> (in `<bucket>`)]` so the visual
|
||||
`└─` marker still resolves to a findable parent
|
||||
even when bucketing scatters parent and child into
|
||||
separate sections.
|
||||
|
||||
`--tree` / `-t` flag (opt-in):
|
||||
additionally emit a flat walk-order `## tree`
|
||||
section at the top — a contiguous parent-child
|
||||
tree shape with no severity-grouping. Same procs,
|
||||
no annotations needed because each parent appears
|
||||
directly above its children.
|
||||
'''
|
||||
flag_tree: bool = False
|
||||
pos_args: list = []
|
||||
for a in args:
|
||||
if a in ('--tree', '-t'):
|
||||
flag_tree = True
|
||||
else:
|
||||
pos_args.append(a)
|
||||
|
||||
if not pos_args:
|
||||
print('usage: acli.pytree [--tree|-t] <pid|pgrep-pattern> [...]')
|
||||
if not args:
|
||||
print('usage: pytree <pid|pgrep-pattern> [...]')
|
||||
return 1
|
||||
if psutil is None:
|
||||
print('pytree requires psutil; install via `uv pip install psutil`')
|
||||
return 1
|
||||
|
||||
roots: list = []
|
||||
for a in pos_args:
|
||||
for a in args:
|
||||
roots.extend(_resolve_pids(a))
|
||||
roots = sorted(set(roots))
|
||||
if not roots:
|
||||
print(f'(no procs match: {pos_args})')
|
||||
print(f'(no procs match: {args})')
|
||||
return 1
|
||||
|
||||
# statuses considered "defunct" — STATUS_ZOMBIE is the
|
||||
|
|
@ -230,16 +199,11 @@ def _pytree(args):
|
|||
}
|
||||
|
||||
seen: set = set()
|
||||
walk_order: list = [] # [(proc, depth)] preserved walk order
|
||||
live: list = [] # [(proc, depth)]
|
||||
live: list = [] # [(proc, depth)]
|
||||
orphans: list = []
|
||||
zombies: list = []
|
||||
gone: list = []
|
||||
|
||||
# parent-bucket lookup populated post-classification so
|
||||
# `_row()` can annotate cross-bucket parent refs.
|
||||
pid_to_bucket: dict = {}
|
||||
|
||||
for r in roots:
|
||||
for (p, depth) in _walk_tree_with_depth(r):
|
||||
if p.pid in seen:
|
||||
|
|
@ -255,14 +219,10 @@ def _pytree(args):
|
|||
# severity order: zombie > orphan > live.
|
||||
if status in defunct_statuses:
|
||||
zombies.append(entry)
|
||||
pid_to_bucket[p.pid] = 'zombies'
|
||||
elif ppid == 1:
|
||||
orphans.append(entry)
|
||||
pid_to_bucket[p.pid] = 'orphans'
|
||||
else:
|
||||
live.append(entry)
|
||||
pid_to_bucket[p.pid] = 'live'
|
||||
walk_order.append(entry)
|
||||
|
||||
total: int = len(live) + len(orphans) + len(zombies)
|
||||
print(f'# pytree: {total} procs across roots {roots}')
|
||||
|
|
@ -270,44 +230,14 @@ def _pytree(args):
|
|||
hdr = ' ' + 'PID'.rjust(7) + ' ' + 'PPID'.rjust(7) + ' '
|
||||
hdr += 'STATUS'.ljust(10) + ' CMD'
|
||||
|
||||
def _row(entry, bucket: str|None = None):
|
||||
def _row(entry):
|
||||
'''
|
||||
Render `(proc, depth)` as an aligned row. Tree depth is
|
||||
rendered as a `└─` marker on the CMD column so PID/PPID/
|
||||
STATUS stay column-aligned.
|
||||
|
||||
When `bucket` is given AND the row's parent lives in a
|
||||
*different* bucket, append a `[parent: <pid> (in `<b>`)]`
|
||||
suffix so the `└─` marker can be resolved across the
|
||||
severity-section split.
|
||||
'''
|
||||
p, depth = entry
|
||||
tree_pfx = (' ' * depth) + ('└─ ' if depth > 0 else '')
|
||||
|
||||
# cross-bucket parent annotation; safe to compute up
|
||||
# front because `p.ppid()` is cheap and rarely
|
||||
# raises (parent pid is read from `/proc/<pid>/stat`,
|
||||
# cached by psutil).
|
||||
parent_anno: str = ''
|
||||
if (
|
||||
bucket is not None
|
||||
and depth > 0
|
||||
):
|
||||
try:
|
||||
parent_pid: int = p.ppid()
|
||||
except psutil.NoSuchProcess:
|
||||
parent_pid = 0
|
||||
if parent_pid and parent_pid != 1:
|
||||
parent_bucket: str|None = pid_to_bucket.get(parent_pid)
|
||||
if (
|
||||
parent_bucket is not None
|
||||
and parent_bucket != bucket
|
||||
):
|
||||
parent_anno = (
|
||||
f' [parent: {parent_pid} '
|
||||
f'(in `{parent_bucket}`)]'
|
||||
)
|
||||
|
||||
# NOTE: `psutil.ZombieProcess` is a *subclass* of
|
||||
# `psutil.NoSuchProcess`, but the proc is NOT gone —
|
||||
# it's a zombie whose `/proc/<pid>/cmdline` is empty/
|
||||
|
|
@ -319,61 +249,44 @@ def _pytree(args):
|
|||
r = ' ' + str(p.pid).rjust(7)
|
||||
r += ' ' + str(p.ppid()).rjust(7)
|
||||
r += ' ' + p.status().ljust(10)
|
||||
r += ' ' + tree_pfx + cmd + parent_anno
|
||||
r += ' ' + tree_pfx + cmd
|
||||
return r
|
||||
except psutil.ZombieProcess:
|
||||
try:
|
||||
ppid_str = str(p.ppid())
|
||||
ppid = str(p.ppid())
|
||||
name = p.name()
|
||||
except psutil.NoSuchProcess:
|
||||
ppid_str, name = '?', '?'
|
||||
ppid, name = '?', '?'
|
||||
r = ' ' + str(p.pid).rjust(7)
|
||||
r += ' ' + ppid_str.rjust(7)
|
||||
r += ' ' + ppid.rjust(7)
|
||||
r += ' ' + 'zombie'.ljust(10)
|
||||
r += ' ' + tree_pfx + '[' + name + ' <defunct>]' + parent_anno
|
||||
r += ' ' + tree_pfx + '[' + name + ' <defunct>]'
|
||||
return r
|
||||
except psutil.NoSuchProcess:
|
||||
return ' ' + str(p.pid).rjust(7) + ' (gone mid-walk)'
|
||||
|
||||
def _section(
|
||||
title: str,
|
||||
procs: list,
|
||||
hint: str = '',
|
||||
bucket: str|None = None,
|
||||
):
|
||||
def _section(title: str, procs: list, hint: str = ''):
|
||||
print(f'\n## {title} ({len(procs)})' + (f' — {hint}' if hint else ''))
|
||||
if not procs:
|
||||
print(' (none)')
|
||||
return
|
||||
print(hdr)
|
||||
for p in procs:
|
||||
print(_row(p, bucket=bucket))
|
||||
print(_row(p))
|
||||
|
||||
# `--tree` opt-in: emit a flat walk-order section first
|
||||
# so the parent-child tree shape is contiguous (no
|
||||
# severity-grouping). No `bucket` arg → no cross-bucket
|
||||
# annotation, since each parent appears directly above
|
||||
# its children here.
|
||||
if flag_tree:
|
||||
_section(
|
||||
'tree', walk_order,
|
||||
'flat walk-order, parent-child preserved',
|
||||
)
|
||||
|
||||
# severity-ordered: most concerning first. Each section
|
||||
# passes its own `bucket` name so `_row()` can annotate
|
||||
# rows whose parents live in a different section.
|
||||
# severity-ordered: most concerning first.
|
||||
_section(
|
||||
'zombies', zombies,
|
||||
'status `Z`/`X`, parent has not reaped',
|
||||
bucket='zombies',
|
||||
)
|
||||
_section(
|
||||
'orphans', orphans,
|
||||
'`ppid==1`, reparented to init (leaked / parent gone)',
|
||||
bucket='orphans',
|
||||
)
|
||||
_section('live', live, bucket='live')
|
||||
_section('live', live)
|
||||
|
||||
if gone:
|
||||
print(f'\n## gone-during-walk ({len(gone)}): {gone}')
|
||||
|
||||
if gone:
|
||||
print(f'\n## gone-during-walk ({len(gone)}): {gone}')
|
||||
|
|
@ -386,14 +299,14 @@ def _hung_dump(args):
|
|||
kernel + python state for a hung pytest/tractor tree.
|
||||
walks all descendants of each `<pid|pgrep-pat>` arg.
|
||||
|
||||
usage: acli.hung_dump <pid|pgrep-pattern> [...]
|
||||
usage: hung-dump <pid|pgrep-pattern> [...]
|
||||
|
||||
note: `/proc/<pid>/stack` and `py-spy dump` typically
|
||||
require CAP_SYS_PTRACE — invoked via `sudo -n`. run
|
||||
`sudo true` first to cache creds.
|
||||
'''
|
||||
if not args:
|
||||
print('usage: acli.hung_dump <pid|pgrep-pattern> [...]')
|
||||
print('usage: hung-dump <pid|pgrep-pattern> [...]')
|
||||
return 1
|
||||
|
||||
# cache sudo creds upfront so per-pid `sudo -n` calls
|
||||
|
|
@ -473,7 +386,7 @@ def _bindspace_scan(args):
|
|||
(those whose embedded `<pid>` no longer corresponds to
|
||||
a live process).
|
||||
|
||||
usage: acli.bindspace_scan [<dir>]
|
||||
usage: bindspace-scan [<dir>]
|
||||
default: `$XDG_RUNTIME_DIR/tractor`
|
||||
(or `/run/user/<uid>/tractor`)
|
||||
'''
|
||||
|
|
@ -541,203 +454,11 @@ def _bindspace_scan(args):
|
|||
print(f'\nto unlink orphans:\n rm {unlink_cmd}')
|
||||
|
||||
|
||||
# --- acli.reap ------------------------------------------------
|
||||
|
||||
def _tractor_reap(args):
|
||||
'''
|
||||
SC-polite zombie-subactor reaper + optional `/dev/shm/`
|
||||
orphan-segment sweep + optional UDS sock-file sweep.
|
||||
|
||||
usage: acli.reap [-h] [--parent PID] [--grace SEC]
|
||||
[--dry-run] [--shm | --shm-only]
|
||||
[--uds | --uds-only]
|
||||
|
||||
phases (run in order when enabled):
|
||||
|
||||
1. process reap — finds tractor subactor procs left
|
||||
alive after a `pytest`/app run that failed to fully
|
||||
cancel its tree. Default = orphan-mode (PPid==1
|
||||
init-reparented procs whose cwd matches repo root
|
||||
AND cmdline contains `python`). With `--parent`,
|
||||
scopes to descendants of a specific live PID.
|
||||
SIGINT first, then SIGKILL after `--grace` (default
|
||||
3.0s).
|
||||
2. shm sweep (`--shm`/`--shm-only`) — unlinks
|
||||
`/dev/shm/<file>` entries owned by the current uid
|
||||
that no live process has open. Needed because
|
||||
`tractor` disables `mp.resource_tracker`.
|
||||
3. UDS sweep (`--uds`/`--uds-only`) — unlinks
|
||||
`${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock`
|
||||
files whose binder pid is dead (or the `1616`
|
||||
registry sentinel). See issue #452.
|
||||
|
||||
Mirrors `scripts/tractor-reap` (use `-n`/`--dry-run`
|
||||
first to see what would be touched).
|
||||
|
||||
'''
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='acli.reap',
|
||||
description=_tractor_reap.__doc__,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--parent', '-p',
|
||||
type=int,
|
||||
default=None,
|
||||
help='descendant-mode: reap procs with PPid==<pid>',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--grace', '-g',
|
||||
type=float,
|
||||
default=3.0,
|
||||
help='SIGINT grace window in seconds (default 3.0)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--dry-run', '-n',
|
||||
action='store_true',
|
||||
help='list matched pids/paths but do not signal/unlink',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--shm',
|
||||
action='store_true',
|
||||
help='also unlink orphaned /dev/shm segments',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--shm-only',
|
||||
action='store_true',
|
||||
help='skip process reap; only do the shm sweep',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--uds',
|
||||
action='store_true',
|
||||
help='also unlink orphaned UDS sock-files',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--uds-only',
|
||||
action='store_true',
|
||||
help='skip process reap + shm; only do the UDS sweep',
|
||||
)
|
||||
|
||||
try:
|
||||
ns = parser.parse_args(args)
|
||||
except SystemExit as se:
|
||||
# `argparse` raises SystemExit on `-h`/bad-args; let
|
||||
# xonsh treat it as a normal alias return code.
|
||||
return int(se.code) if se.code is not None else 0
|
||||
|
||||
skip_proc_reap: bool = (
|
||||
ns.shm_only
|
||||
or
|
||||
ns.uds_only
|
||||
)
|
||||
|
||||
# repo-root resolution: `git rev-parse --show-toplevel`
|
||||
# first, falling back to the xontrib file's parent of
|
||||
# parent. mirrors `scripts/tractor-reap._repo_root()`.
|
||||
try:
|
||||
repo_str: str = sp.check_output(
|
||||
['git', 'rev-parse', '--show-toplevel'],
|
||||
stderr=sp.DEVNULL,
|
||||
text=True,
|
||||
).strip()
|
||||
repo: Path = Path(repo_str)
|
||||
except (sp.CalledProcessError, FileNotFoundError):
|
||||
repo: Path = Path(__file__).resolve().parent.parent
|
||||
|
||||
# lazy-import the reap helpers since the package may not
|
||||
# have been on `sys.path` at xontrib-load time (e.g. the
|
||||
# contrib was sourced before activating the venv).
|
||||
import sys
|
||||
if str(repo) not in sys.path:
|
||||
sys.path.insert(0, str(repo))
|
||||
from tractor._testing._reap import (
|
||||
find_descendants,
|
||||
find_orphans,
|
||||
find_orphaned_shm,
|
||||
find_orphaned_uds,
|
||||
reap,
|
||||
reap_shm,
|
||||
reap_uds,
|
||||
)
|
||||
|
||||
rc: int = 0
|
||||
|
||||
# phase 1: process reap (skipped under `--*-only`)
|
||||
if not skip_proc_reap:
|
||||
if ns.parent is not None:
|
||||
pids: list = find_descendants(ns.parent)
|
||||
mode: str = f'descendants of PPid={ns.parent}'
|
||||
else:
|
||||
pids = find_orphans(repo)
|
||||
mode = f'orphans (PPid=1, cwd={repo})'
|
||||
|
||||
if not pids:
|
||||
print(f'[acli.reap] no {mode} to reap')
|
||||
elif ns.dry_run:
|
||||
print(
|
||||
f'[acli.reap] dry-run — {mode}:\n {pids}'
|
||||
)
|
||||
else:
|
||||
_, survivors = reap(pids, grace=ns.grace)
|
||||
if survivors:
|
||||
rc = 1
|
||||
|
||||
# phase 2: shm sweep (opt-in)
|
||||
if ns.shm or ns.shm_only:
|
||||
leaked: list = find_orphaned_shm()
|
||||
if not leaked:
|
||||
print(
|
||||
'[acli.reap] no orphaned /dev/shm '
|
||||
'segments to sweep'
|
||||
)
|
||||
elif ns.dry_run:
|
||||
print(
|
||||
f'[acli.reap] dry-run — {len(leaked)} '
|
||||
f'orphaned shm segment(s):\n {leaked}'
|
||||
)
|
||||
else:
|
||||
_, errors = reap_shm(leaked)
|
||||
if errors:
|
||||
rc = 1
|
||||
|
||||
# phase 3: UDS sweep (opt-in)
|
||||
if ns.uds or ns.uds_only:
|
||||
leaked_uds: list = find_orphaned_uds()
|
||||
if not leaked_uds:
|
||||
print(
|
||||
'[acli.reap] no orphaned UDS sock-files '
|
||||
'to sweep'
|
||||
)
|
||||
elif ns.dry_run:
|
||||
print(
|
||||
f'[acli.reap] dry-run — {len(leaked_uds)} '
|
||||
f'orphaned UDS sock-file(s):\n {leaked_uds}'
|
||||
)
|
||||
else:
|
||||
_, errors = reap_uds(leaked_uds)
|
||||
if errors:
|
||||
rc = 1
|
||||
|
||||
return rc
|
||||
|
||||
|
||||
# --- registration ---------------------------------------------
|
||||
|
||||
# all aliases under the `acli.` namespace so xonsh's prefix-
|
||||
# completion makes them feel like a sub-cmd group: type
|
||||
# `acli.<TAB>` and the full set is suggested. no parent
|
||||
# `acli` cmd exists — the dot is purely a naming convention.
|
||||
_TCLI_ALIASES: dict = {
|
||||
'acli.pytree': _pytree,
|
||||
'acli.hung_dump': _hung_dump,
|
||||
'acli.bindspace_scan': _bindspace_scan,
|
||||
'acli.reap': _tractor_reap,
|
||||
}
|
||||
|
||||
for _name, _fn in _TCLI_ALIASES.items():
|
||||
aliases[_name] = _fn
|
||||
aliases['pytree'] = _pytree
|
||||
aliases['hung-dump'] = _hung_dump
|
||||
aliases['bindspace-scan'] = _bindspace_scan
|
||||
|
||||
|
||||
# xontrib protocol hooks (for `xontrib load tractor_diag`).
|
||||
|
|
@ -747,6 +468,6 @@ def _load_xontrib_(xsh, **_):
|
|||
|
||||
|
||||
def _unload_xontrib_(xsh, **_):
|
||||
for name in _TCLI_ALIASES:
|
||||
for name in ('pytree', 'hung-dump', 'bindspace-scan'):
|
||||
aliases.pop(name, None)
|
||||
return {}
|
||||
|
|
|
|||
Loading…
Reference in New Issue