Compare commits
11 Commits
2ee44a6fdd
...
caebf60f4e
| Author | SHA1 | Date |
|---|---|---|
|
|
caebf60f4e | |
|
|
3b0724eba8 | |
|
|
cec6cc2a56 | |
|
|
34f333a026 | |
|
|
38ffb875bd | |
|
|
4c00913b3b | |
|
|
5cd06810db | |
|
|
255c9c3a7c | |
|
|
0f4e671862 | |
|
|
d036ef7d7f | |
|
|
7882c37ce0 |
|
|
@ -0,0 +1,125 @@
|
||||||
|
# `RuntimeVars` env-var lift — design plan
|
||||||
|
|
||||||
|
Status: **draft, awaiting user edits**
|
||||||
|
|
||||||
|
## Goal
|
||||||
|
|
||||||
|
Consolidate the sprawl of pytest CLI flags + ad-hoc env vars +
|
||||||
|
hardcoded fixture defaults into a *single* env-var-encoded
|
||||||
|
runtime-vars envelope, with a typed in-memory representation
|
||||||
|
(`tractor.runtime._state.RuntimeVars`) as the sole source of
|
||||||
|
truth.
|
||||||
|
|
||||||
|
## Why now
|
||||||
|
|
||||||
|
- `--tpt-proto`, `--spawn-backend`, `--diag-on-hang`,
|
||||||
|
`--diag-capture-delay` and (soon) `TRACTOR_REG_ADDR` etc. are
|
||||||
|
proliferating. Each adds a parsing seam.
|
||||||
|
- `tests/devx/test_debugger.py` invokes example scripts as
|
||||||
|
separate subprocesses; they currently can't see the
|
||||||
|
fixture-allocated `reg_addr` at all (root cause of why
|
||||||
|
parametrizing devx scripts on `reg_addr` is on your TODO).
|
||||||
|
- Concurrent pytest sessions on the same host collide on
|
||||||
|
shared defaults (the `registry@1616` race we just fixed is
|
||||||
|
one symptom; per-session unique addr is the structural
|
||||||
|
fix).
|
||||||
|
- `tractor.runtime._state.RuntimeVars: Struct` is already
|
||||||
|
defined and **unused** — its docstring even says it
|
||||||
|
"should be utilized as possible for future calls."
|
||||||
|
|
||||||
|
## Design
|
||||||
|
|
||||||
|
### Module: `tractor/_testing/_rtvars.py`
|
||||||
|
|
||||||
|
Lifted from `modden.runtime.env`, ~50 LOC, no new deps.
|
||||||
|
|
||||||
|
```python
|
||||||
|
_TRACTOR_RT_VARS_OSENV: str = '_TRACTOR_RT_VARS'
|
||||||
|
|
||||||
|
def dump_rtvars(rtvars: RuntimeVars|dict) -> tuple[str, str]:
|
||||||
|
'''str-serialize via `str(dict)` — ast.literal_eval-able'''
|
||||||
|
|
||||||
|
def load_rtvars(env: dict) -> RuntimeVars:
|
||||||
|
'''ast.literal_eval the env-var value, hydrate to struct'''
|
||||||
|
|
||||||
|
def get_rtvars(proc: psutil.Process|None = None) -> RuntimeVars:
|
||||||
|
'''read the var from a target proc's env (or current)'''
|
||||||
|
|
||||||
|
def update_rtvars(
|
||||||
|
rtvars: RuntimeVars|dict|None = None,
|
||||||
|
update_osenv: bool|dict = True,
|
||||||
|
) -> tuple[str, str]:
|
||||||
|
'''mutate + re-encode + (optionally) write to os.environ'''
|
||||||
|
```
|
||||||
|
|
||||||
|
### Encoding choice: `str(dict)` + `ast.literal_eval`
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- stdlib only
|
||||||
|
- handles all the types tractor's tests need: `str`, `int`,
|
||||||
|
`float`, `bool`, `None`, `list`, `tuple`, `dict`
|
||||||
|
- human-readable in the env (greppable, inspectable via
|
||||||
|
`cat /proc/<pid>/environ | tr '\0' '\n'`)
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- non-stdlib types (msgspec Structs, `Path`, custom classes)
|
||||||
|
must be lowered first — fine for the test fixture set
|
||||||
|
- not stable across Python versions for esoteric repr cases
|
||||||
|
(we don't hit any)
|
||||||
|
|
||||||
|
Alternatives considered:
|
||||||
|
- **msgpack**: adds a dep + binary form is ungreppable
|
||||||
|
- **json**: doesn't preserve tuples (becomes lists), which is
|
||||||
|
a common type for `reg_addr`
|
||||||
|
- **toml/yaml**: heavier deps, no real benefit
|
||||||
|
|
||||||
|
### `RuntimeVars` becomes the single source of truth
|
||||||
|
|
||||||
|
The legacy `_runtime_vars: dict[str, Any]` global in
|
||||||
|
`runtime/_state.py` becomes a *cached view* of a
|
||||||
|
`RuntimeVars` singleton instance:
|
||||||
|
|
||||||
|
- `get_runtime_vars()` returns either the struct or a
|
||||||
|
`.to_dict()` view depending on caller's preference
|
||||||
|
- `set_runtime_vars(...)` validates against the struct schema
|
||||||
|
- spawn-time SpawnSpec sends the struct (already does
|
||||||
|
conceptually — just gets typed)
|
||||||
|
- `__setattr__` `breakpoint()` debug instrumentation gets
|
||||||
|
removed (unrelated cleanup, mentioned in conversation)
|
||||||
|
|
||||||
|
### Migration path
|
||||||
|
|
||||||
|
**Phase 0** *(prep)*: strip the stray `breakpoint()` from
|
||||||
|
`RuntimeVars.__setattr__`.
|
||||||
|
|
||||||
|
**Phase 1**: land `_rtvars.py` as a leaf module, used only by
|
||||||
|
test infra. Subprocess-spawned scripts in `tests/devx/`
|
||||||
|
read `_TRACTOR_RT_VARS` on startup → reconstruct
|
||||||
|
`RuntimeVars` → call `tractor.open_root_actor(**rtvars.as_kwargs())`.
|
||||||
|
Concurrent runs become deterministic-isolated because each
|
||||||
|
session writes a unique `_registry_addrs` into the env.
|
||||||
|
|
||||||
|
**Phase 2**: migrate runtime callers (`_state.get_runtime_vars`,
|
||||||
|
spawn `SpawnSpec`, `Actor.async_main`) to operate on the
|
||||||
|
struct directly, with the dict as a compat view that gets
|
||||||
|
deprecated.
|
||||||
|
|
||||||
|
**Phase 3** *(structural)*: per-session bindspace subdir
|
||||||
|
`/run/user/<uid>/tractor/<session_uuid>/` — encoded in the
|
||||||
|
rt-vars envelope, picked up by every subactor automatically.
|
||||||
|
Obsoletes the entire bindspace-leak warning class.
|
||||||
|
|
||||||
|
## Open design questions (user input wanted)
|
||||||
|
|
||||||
|
- (placeholder for your edits)
|
||||||
|
- (placeholder)
|
||||||
|
- (placeholder)
|
||||||
|
|
||||||
|
## Out-of-scope for this lift
|
||||||
|
|
||||||
|
- Anything in `modden.runtime.env` related to `Spawn`,
|
||||||
|
`WmCtl`, `Wks` — that's a workspace orchestration layer,
|
||||||
|
not an env-var helper. We only lift the four utility
|
||||||
|
functions + the var name constant.
|
||||||
|
- Switching to msgpack/json — explicitly chosen against
|
||||||
|
above.
|
||||||
|
|
@ -86,26 +86,182 @@ def test_register_duplicate_name(
|
||||||
daemon: subprocess.Popen,
|
daemon: subprocess.Popen,
|
||||||
reg_addr: UnwrappedAddress,
|
reg_addr: UnwrappedAddress,
|
||||||
):
|
):
|
||||||
|
# bug-class-3 breadcrumbs: the *last* `[CANCEL]` line that
|
||||||
|
# appears under `--ll cancel`/`TRACTOR_LOG_FILE=...` names the
|
||||||
|
# cancel-cascade boundary that's parked. Pair with
|
||||||
|
# `_trio_main` entry/exit breadcrumbs in
|
||||||
|
# `tractor/spawn/_entry.py` to triangulate the swallow point.
|
||||||
|
log = tractor.log.get_logger('tractor.tests.test_multi_program')
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
log.cancel('test_register_duplicate_name: enter `main()`')
|
||||||
|
try:
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
registry_addrs=[reg_addr],
|
registry_addrs=[reg_addr],
|
||||||
) as an:
|
) as an:
|
||||||
|
log.cancel(
|
||||||
|
'test_register_duplicate_name: '
|
||||||
|
'actor nursery opened'
|
||||||
|
)
|
||||||
|
|
||||||
assert not current_actor().is_registrar
|
assert not current_actor().is_registrar
|
||||||
|
|
||||||
p1 = await an.start_actor('doggy')
|
p1 = await an.start_actor('doggy')
|
||||||
|
log.cancel(
|
||||||
|
'test_register_duplicate_name: '
|
||||||
|
'spawned doggy #1'
|
||||||
|
)
|
||||||
p2 = await an.start_actor('doggy')
|
p2 = await an.start_actor('doggy')
|
||||||
|
log.cancel(
|
||||||
|
'test_register_duplicate_name: '
|
||||||
|
'spawned doggy #2'
|
||||||
|
)
|
||||||
|
|
||||||
async with tractor.wait_for_actor('doggy') as portal:
|
async with tractor.wait_for_actor('doggy') as portal:
|
||||||
|
log.cancel(
|
||||||
|
'test_register_duplicate_name: '
|
||||||
|
'`wait_for_actor` returned'
|
||||||
|
)
|
||||||
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
|
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
|
||||||
|
|
||||||
|
log.cancel(
|
||||||
|
'test_register_duplicate_name: '
|
||||||
|
'ABOUT TO CALL `an.cancel()`'
|
||||||
|
)
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
log.cancel(
|
||||||
|
'test_register_duplicate_name: '
|
||||||
|
'`an.cancel()` returned'
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
log.cancel(
|
||||||
|
'test_register_duplicate_name: '
|
||||||
|
'`open_nursery.__aexit__` returned, leaving `main()`'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX, run manually since we want to start this root **after**
|
# XXX, run manually since we want to start this root **after**
|
||||||
# the other "daemon" program with it's own root.
|
# the other "daemon" program with it's own root.
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'n_dups',
|
||||||
|
[
|
||||||
|
2,
|
||||||
|
# `n_dups=4` exposes a SEPARATE pre-existing race: under
|
||||||
|
# rapid same-name spawning against a forkserver +
|
||||||
|
# registrar, ONE of the spawned doggies (typically the
|
||||||
|
# 3rd) `sys.exit(2)`s during boot before completing
|
||||||
|
# parent-handshake. Surfaces now (post the spawn-time
|
||||||
|
# `wait_for_peer_or_proc_death` fix) as `ActorFailure
|
||||||
|
# rc=2`; previously it was silently masked by the
|
||||||
|
# handshake-wait parking forever.
|
||||||
|
#
|
||||||
|
# Tracked separately in,
|
||||||
|
# https://github.com/goodboy/tractor/issues/456
|
||||||
|
pytest.param(
|
||||||
|
4,
|
||||||
|
marks=pytest.mark.xfail(
|
||||||
|
strict=False,
|
||||||
|
reason=(
|
||||||
|
'doggy boot-race rc=2 under rapid same-name '
|
||||||
|
'spawn — separate bug from cancel-cascade'
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
8,
|
||||||
|
],
|
||||||
|
ids=lambda n: f'n_dups={n}',
|
||||||
|
)
|
||||||
|
def test_dup_name_cancel_cascade_escalates_to_hard_kill(
|
||||||
|
daemon: subprocess.Popen,
|
||||||
|
reg_addr: UnwrappedAddress,
|
||||||
|
n_dups: int,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Regression for the duplicate-name cancel-cascade hang under
|
||||||
|
`tcp+main_thread_forkserver`.
|
||||||
|
|
||||||
|
When N actors share a single name and the parent calls
|
||||||
|
`an.cancel()`, the daemon registrar gets N `register_actor` RPCs
|
||||||
|
in tight succession. Under TCP+MTF, kernel-level socket-buffer
|
||||||
|
contention can push at least one sub-actor's cancel-RPC ack past
|
||||||
|
`Portal.cancel_timeout` (default 0.5s).
|
||||||
|
|
||||||
|
Pre-fix, `Portal.cancel_actor()` silently returned `False` on
|
||||||
|
that timeout, the supervisor's outer `move_on_after(3)` never
|
||||||
|
fired (each per-portal task always returned ≤0.5s, never
|
||||||
|
exceeded 3s), and `soft_kill()`'s `await wait_func(proc)` parked
|
||||||
|
forever — deadlocking nursery `__aexit__`.
|
||||||
|
|
||||||
|
Post-fix, `Portal.cancel_actor()` raises `ActorTooSlowError` on
|
||||||
|
the bounded-wait timeout, and `ActorNursery.cancel()`'s
|
||||||
|
per-child wrapper escalates to `proc.terminate()` (hard-kill).
|
||||||
|
The full nursery teardown therefore stays bounded even under
|
||||||
|
pathological timing.
|
||||||
|
|
||||||
|
`n_dups` is parametrized to widen the race window — more
|
||||||
|
same-name siblings = more concurrent register-RPCs at the
|
||||||
|
daemon = higher probability of hitting the contention path.
|
||||||
|
|
||||||
|
'''
|
||||||
|
log = tractor.log.get_logger(
|
||||||
|
'tractor.tests.test_multi_program'
|
||||||
|
)
|
||||||
|
|
||||||
|
# outer hard ceiling: a regression should fail-fast, NOT hang
|
||||||
|
# the test session for minutes. Budget scales with `n_dups`
|
||||||
|
# since each extra same-name sibling adds ~spawn-cost +
|
||||||
|
# potential cancel-ack-timeout escalation latency under
|
||||||
|
# TCP+forkserver. ~5s/sibling + 15s baseline gives plenty of
|
||||||
|
# headroom while still failing-loud on a real hang.
|
||||||
|
fail_after_s: int = 15 + (5 * n_dups)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
log.cancel(
|
||||||
|
f'enter `main()` n_dups={n_dups}'
|
||||||
|
)
|
||||||
|
with trio.fail_after(fail_after_s):
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
registry_addrs=[reg_addr],
|
||||||
|
) as an:
|
||||||
|
portals: list[Portal] = []
|
||||||
|
for i in range(n_dups):
|
||||||
|
p: Portal = await an.start_actor('doggy')
|
||||||
|
portals.append(p)
|
||||||
|
log.cancel(
|
||||||
|
f'spawned doggy #{i + 1}/{n_dups}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# at least one of the N must be discoverable by
|
||||||
|
# name; doesn't matter which one (registrar will
|
||||||
|
# have last-wins semantics under same-name).
|
||||||
|
async with tractor.wait_for_actor('doggy') as portal:
|
||||||
|
expected_uids = {p.channel.uid for p in portals}
|
||||||
|
assert portal.channel.uid in expected_uids
|
||||||
|
|
||||||
|
# critical section: this MUST return within
|
||||||
|
# `fail_after_s` even when one or more cancel-RPC
|
||||||
|
# acks time out. Pre-fix, this hangs forever.
|
||||||
|
log.cancel('about to call `an.cancel()`')
|
||||||
|
await an.cancel()
|
||||||
|
log.cancel('`an.cancel()` returned')
|
||||||
|
|
||||||
|
# post-teardown sanity: every child proc must be reaped.
|
||||||
|
# If escalation worked, even timed-out cancel-RPCs would
|
||||||
|
# have triggered `proc.terminate()` and the procs are dead.
|
||||||
|
for p in portals:
|
||||||
|
# `Portal.channel.connected()` -> False once the
|
||||||
|
# underlying chan disconnected (clean exit OR
|
||||||
|
# hard-killed proc both produce disconnect).
|
||||||
|
assert not p.channel.connected(), (
|
||||||
|
f'Portal chan still connected post-teardown?\n'
|
||||||
|
f'{p.channel}'
|
||||||
|
)
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def get_root_portal(
|
async def get_root_portal(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
|
|
||||||
|
|
@ -59,15 +59,18 @@ async def chk_tpts(
|
||||||
)
|
)
|
||||||
def test_root_passes_tpt_to_sub(
|
def test_root_passes_tpt_to_sub(
|
||||||
tpt_proto_key: str,
|
tpt_proto_key: str,
|
||||||
|
tpt_proto: str,
|
||||||
reg_addr: tuple,
|
reg_addr: tuple,
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
# XXX NOTE, the `reg_addr` addr won't be the same type as the
|
# `reg_addr` is sourced from the CLI `--tpt-proto={tpt_proto}`,
|
||||||
# `tpt_proto_key` would deliver here unless you pass `--tpt-proto
|
# so when the parametrized `tpt_proto_key` differs, the test
|
||||||
# <tpt_proto_key>` on the CLI.
|
# asks the runtime to `enable_transports=[<other_proto>]` while
|
||||||
#
|
# pointing `registry_addrs` at a `reg_addr` of the wrong proto.
|
||||||
# if tpt_proto_key == 'uds':
|
# The layer-2 guard in `open_root_actor` is expected to fail
|
||||||
# breakpoint()
|
# fast with `ValueError` on this mismatch (rather than the prior
|
||||||
|
# silent hang during the registrar handshake).
|
||||||
|
proto_mismatch: bool = (tpt_proto_key != tpt_proto)
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
|
@ -99,4 +102,14 @@ def test_root_passes_tpt_to_sub(
|
||||||
# shudown sub-actor(s)
|
# shudown sub-actor(s)
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
|
||||||
|
if proto_mismatch:
|
||||||
|
# mismatched proto must raise `ValueError` from the
|
||||||
|
# `open_root_actor` runtime guard before any subactor spawn.
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
msg: str = str(excinfo.value)
|
||||||
|
assert 'enable_transports' in msg
|
||||||
|
assert 'registry_addrs' in msg
|
||||||
|
assert tpt_proto_key in msg or tpt_proto in msg
|
||||||
|
else:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
||||||
|
|
@ -89,6 +89,28 @@ class ActorFailure(RuntimeFailure):
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class ActorTooSlowError(RuntimeFailure):
|
||||||
|
'''
|
||||||
|
A peer-`Actor` failed to ack an actor-runtime cancel-cascade
|
||||||
|
request (e.g. `Portal.cancel_actor()` -> `Actor.cancel()`)
|
||||||
|
within the bounded wait window.
|
||||||
|
|
||||||
|
Distinct exc-type (NOT a `trio.TooSlowError` subclass) so that
|
||||||
|
`except trio.TooSlowError:` blocks elsewhere in the test-suite
|
||||||
|
or `tractor` internals do NOT silently mask actor-cancel
|
||||||
|
timeouts — these MUST propagate so a supervisor can escalate
|
||||||
|
to `proc.terminate()` (hard-kill) per SC-discipline:
|
||||||
|
|
||||||
|
graceful cancel-req -> bounded wait -> hard-kill
|
||||||
|
|
||||||
|
Reason: see #subint_forkserver duplicate-name hang
|
||||||
|
diagnosis where `Portal.cancel_actor()` silently swallowed
|
||||||
|
the timeout and the supervisor never escalated, leaving
|
||||||
|
a same-named sibling subactor parked forever.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
class InternalError(RuntimeError):
|
class InternalError(RuntimeError):
|
||||||
'''
|
'''
|
||||||
Entirely unexpected internal machinery error indicating
|
Entirely unexpected internal machinery error indicating
|
||||||
|
|
|
||||||
|
|
@ -371,6 +371,35 @@ async def open_root_actor(
|
||||||
for uw_addr in uw_reg_addrs
|
for uw_addr in uw_reg_addrs
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# fail-fast on `enable_transports` / `registry_addrs` proto
|
||||||
|
# mismatch — historically this caused a silent indefinite
|
||||||
|
# hang during the registrar handshake (registry was reachable
|
||||||
|
# only via a transport not in `enable_transports`, so the
|
||||||
|
# actor could never connect to register/discover). See
|
||||||
|
# `tests/ipc/test_multi_tpt.py::test_root_passes_tpt_to_sub`
|
||||||
|
# for the foot-gun case + its layer-1 skip-guard.
|
||||||
|
bad_addrs: list[tuple[str, Address]] = [
|
||||||
|
(addr.proto_key, addr)
|
||||||
|
for addr in registry_addrs
|
||||||
|
if addr.proto_key not in enable_transports
|
||||||
|
]
|
||||||
|
if bad_addrs:
|
||||||
|
mismatch_lines: str = '\n'.join(
|
||||||
|
f' - proto_key={pk!r} addr={a!r}'
|
||||||
|
for pk, a in bad_addrs
|
||||||
|
)
|
||||||
|
raise ValueError(
|
||||||
|
f'`registry_addrs` contains addr(s) whose proto is '
|
||||||
|
f'not in `enable_transports`!\n'
|
||||||
|
f'enable_transports: {enable_transports!r}\n'
|
||||||
|
f'mismatched_addrs:\n'
|
||||||
|
f'{mismatch_lines}\n'
|
||||||
|
f'\n'
|
||||||
|
f'Either add the missing proto to '
|
||||||
|
f'`enable_transports`, or remove the addr from '
|
||||||
|
f'`registry_addrs`.'
|
||||||
|
)
|
||||||
|
|
||||||
# Debug-mode is currently only supported for backends whose
|
# Debug-mode is currently only supported for backends whose
|
||||||
# subactor root runtime is trio-native (so `tractor.devx.
|
# subactor root runtime is trio-native (so `tractor.devx.
|
||||||
# debug._tty_lock` works). See `_DEBUG_COMPATIBLE_BACKENDS`
|
# debug._tty_lock` works). See `_DEBUG_COMPATIBLE_BACKENDS`
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ from typing import (
|
||||||
get_args,
|
get_args,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
import warnings
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import tractor
|
import tractor
|
||||||
|
|
@ -52,8 +53,19 @@ pytest_plugins: tuple[str, ...] = (
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from argparse import Namespace
|
from argparse import Namespace
|
||||||
|
|
||||||
|
|
||||||
_cap_sys_passed_as_flag: bool = False
|
_cap_sys_passed_as_flag: bool = False
|
||||||
_cap_fd_set: bool = False
|
|
||||||
|
# Spawn backends that need `--capture=sys` to avoid the
|
||||||
|
# fork-child×pytest-capture-fd deadlock. See the long
|
||||||
|
# NOTE in `pytest_load_initial_conftests` below for the
|
||||||
|
# full mechanism + tradeoff write-up.
|
||||||
|
_CAPSYS_REQUIRED_SPAWNERS: frozenset[str] = frozenset({
|
||||||
|
'main_thread_forkserver',
|
||||||
|
# TODO future variant-2 'subint_forkserver' lands
|
||||||
|
# here too once the impl is unblocked.
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
# XXX REQUIRED in order to enforce `--capture=` flag
|
# XXX REQUIRED in order to enforce `--capture=` flag
|
||||||
# pre test session.
|
# pre test session.
|
||||||
|
|
@ -64,67 +76,108 @@ def pytest_load_initial_conftests(
|
||||||
parser: pytest.Parser,
|
parser: pytest.Parser,
|
||||||
args: list[str],
|
args: list[str],
|
||||||
):
|
):
|
||||||
global _cap_sys_passed_as_flag, _cap_fd_set
|
'''
|
||||||
|
Validate the `--capture=` × `--spawn-backend=`
|
||||||
|
combination at session-startup.
|
||||||
|
|
||||||
opts: Namespace = early_config.option
|
Background
|
||||||
if opts.capture == 'fd':
|
----------
|
||||||
_cap_fd_set = True
|
`--capture=sys` is REQUIRED for fork-based spawn backends (e.g.
|
||||||
|
`main_thread_forkserver`): default `--capture=fd` redirects fd
|
||||||
|
1,2 to temp files, and fork children inherit those fds — opaque
|
||||||
|
deadlocks happen in the pytest-capture-machinery ↔ fork-child
|
||||||
|
stdio interaction. `--capture=sys` only redirects Python- level
|
||||||
|
`sys.stdout`/`sys.stderr`, leaving fd 1,2 alone.
|
||||||
|
|
||||||
|
Trade-off (vs. `--capture=fd`):
|
||||||
|
|
||||||
|
- LOST: per-test attribution of subactor *raw-fd* output (C-ext
|
||||||
|
writes, `os.write(2, ...)`, subproc stdout). Not zero — those
|
||||||
|
go to the terminal, captured by CI's terminal-level capture,
|
||||||
|
just not per-test-scoped in the pytest failure report.
|
||||||
|
|
||||||
|
- KEPT: Python-level `print()` + `logging` capture per-test
|
||||||
|
(tractor's logger uses `sys.stderr`, so tractor log output IS
|
||||||
|
still attributed per-test).
|
||||||
|
|
||||||
|
- KEPT: user `pytest -s` for debugging (unaffected).
|
||||||
|
|
||||||
|
Full post-mortem in
|
||||||
|
`ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
|
||||||
|
|
||||||
|
Validation policy:
|
||||||
|
- **CI mode** (`CI` env-var set): fail-fast at
|
||||||
|
session start if a fork-spawn backend is requested
|
||||||
|
WITHOUT `--capture=sys`. CI must be explicit; no
|
||||||
|
auto-fallbacks. Forces every CI matrix-row's run
|
||||||
|
line to declare its capture mode plainly.
|
||||||
|
- **Local mode** (no `CI` env-var): emit a loud
|
||||||
|
warning + suggest `--capture=sys`, but allow the
|
||||||
|
run to proceed. Lets devs experiment with the bad
|
||||||
|
combo (e.g. to validate whether recent
|
||||||
|
fork-survival fixes have made `--capture=fd` work
|
||||||
|
after all).
|
||||||
|
|
||||||
|
'''
|
||||||
|
global _cap_sys_passed_as_flag
|
||||||
opts_w_args: Namespace = parser.parse_known_args(args)
|
opts_w_args: Namespace = parser.parse_known_args(args)
|
||||||
if opts_w_args.capture == 'fd':
|
spawner: str|None = getattr(
|
||||||
_cap_fd_set = True
|
opts_w_args,
|
||||||
|
'spawn_backend',
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
capture: str|None = getattr(
|
||||||
|
opts_w_args,
|
||||||
|
'capture',
|
||||||
|
None,
|
||||||
|
)
|
||||||
if '--capture=sys' in args:
|
if '--capture=sys' in args:
|
||||||
_cap_sys_passed_as_flag = True
|
_cap_sys_passed_as_flag = True
|
||||||
|
assert capture == 'sys'
|
||||||
|
|
||||||
|
in_ci: bool = bool(os.environ.get('CI'))
|
||||||
|
|
||||||
# XXX, ALWAYS apply capsys for fork based spawners:
|
|
||||||
# * main_thread_forkserver
|
|
||||||
# * (TODO) subint_forkserver
|
|
||||||
# '--capture=sys',
|
|
||||||
# ^XXX NOTE^ for `main_thread_forkserver` spawner
|
|
||||||
#
|
|
||||||
# => sys-level capture is REQUIRED for fork-based spawn
|
|
||||||
# backends (e.g. `main_thread_forkserver`): default
|
|
||||||
# `--capture=fd` redirects fd 1,2 to temp files, and fork
|
|
||||||
# children inherit those fds — opaque deadlocks happen in
|
|
||||||
# the pytest-capture-machinery ↔ fork-child stdio
|
|
||||||
# interaction. `--capture=sys` only redirects Python-level
|
|
||||||
# `sys.stdout`/`sys.stderr`, leaving fd 1,2 alone.
|
|
||||||
#
|
|
||||||
# Trade-off (vs. `--capture=fd`):
|
|
||||||
# - LOST: per-test attribution of subactor *raw-fd* output
|
|
||||||
# (C-ext writes, `os.write(2, ...)`, subproc stdout). Not
|
|
||||||
# zero — those go to the terminal, captured by CI's
|
|
||||||
# terminal-level capture, just not per-test-scoped in the
|
|
||||||
# pytest failure report.
|
|
||||||
# - KEPT: Python-level `print()` + `logging` capture per-
|
|
||||||
# test (tractor's logger uses `sys.stderr`, so tractor
|
|
||||||
# log output IS still attributed per-test).
|
|
||||||
# - KEPT: user `pytest -s` for debugging (unaffected).
|
|
||||||
#
|
|
||||||
# Full post-mortem in
|
|
||||||
# `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
|
|
||||||
if (
|
if (
|
||||||
(spawner := opts_w_args.spawn_backend) in [
|
spawner in _CAPSYS_REQUIRED_SPAWNERS
|
||||||
'main_thread_forkserver',
|
and
|
||||||
]
|
capture == 'fd'
|
||||||
):
|
):
|
||||||
print(
|
msg: str = (
|
||||||
f'XXX SETTING CAPSYS due to spawning backend XXX\n'
|
f'\n'
|
||||||
f'--spawn-backend={spawner!r}\n'
|
f'XXX `--spawn-backend={spawner}` REQUIRES '
|
||||||
|
f'`--capture=sys` XXX\n'
|
||||||
|
f'fork-child × `--capture=fd` is a known '
|
||||||
|
f'deadlock pattern.\n'
|
||||||
|
f'See `tractor._testing.pytest`\'s '
|
||||||
|
f'`pytest_load_initial_conftests` docstring '
|
||||||
|
f'for the full mechanism.\n'
|
||||||
|
f'\n'
|
||||||
|
f'Re-invoke with `--capture=sys` (or run '
|
||||||
|
f'with `pytest -s` for no capture).\n'
|
||||||
)
|
)
|
||||||
opts.capture = 'sys'
|
# fail-fast: CI must declare capture explicitly for
|
||||||
# ^TODO XXX?/
|
# fork-spawn backends.
|
||||||
# seems like this doesn't get set by the above!?
|
if in_ci:
|
||||||
args.append(
|
pytest.exit(
|
||||||
'--capture=sys',
|
f'{msg}\n'
|
||||||
|
f'FAIL-FAST: CI=1 detected; aborting session.\n',
|
||||||
|
returncode=2,
|
||||||
)
|
)
|
||||||
out = parser.parse_known_and_unknown_args(
|
|
||||||
args,
|
# local: loud warn but let the run proceed so devs can
|
||||||
early_config.option,
|
# experiment.
|
||||||
|
else:
|
||||||
|
warnings.warn(
|
||||||
|
f'{msg}\n'
|
||||||
|
f'Local mode (no `CI` env var) — '
|
||||||
|
f'continuing. Expect potential hangs.\n',
|
||||||
|
category=UserWarning,
|
||||||
|
stacklevel=1,
|
||||||
)
|
)
|
||||||
assert out[0].capture == 'sys'
|
# ??TODO?? is there a way to force the `--capture=sys` sin CLI ??
|
||||||
# breakpoint()
|
# - [x] ask pytest peeps in chat!
|
||||||
|
# - [x] pytest` issue,
|
||||||
|
# https://github.com/pytest-dev/pytest/issues/14444
|
||||||
|
|
||||||
# TODO, set various `$TRACTOR_X*` osenv vars here!
|
# TODO, set various `$TRACTOR_X*` osenv vars here!
|
||||||
print(
|
print(
|
||||||
|
|
@ -399,7 +452,6 @@ def pytest_configure(
|
||||||
)
|
)
|
||||||
enable_stack_on_sig()
|
enable_stack_on_sig()
|
||||||
except ImportError:
|
except ImportError:
|
||||||
import warnings
|
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
'`stackscope` not installed — '
|
'`stackscope` not installed — '
|
||||||
'--enable-stackscope is a no-op. '
|
'--enable-stackscope is a no-op. '
|
||||||
|
|
@ -622,25 +674,36 @@ def is_forking_spawner(
|
||||||
|
|
||||||
|
|
||||||
def maybe_xfail_for_spawner(
|
def maybe_xfail_for_spawner(
|
||||||
|
request: pytest.FixtureRequest,
|
||||||
start_method: str,
|
start_method: str,
|
||||||
is_forking_spawner: bool,
|
is_forking_spawner: bool,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Fork based spawning backends caude issues with `pytest`'s
|
Fork based spawning backends cause issues with
|
||||||
fd-capture mechanism and can cause various suites to hang.
|
`pytest`'s fd-capture mechanism and can cause various
|
||||||
|
suites to hang.
|
||||||
|
|
||||||
Instead this helper allows skipping/xfailing from a test
|
This helper allows skipping/xfailing from a test when
|
||||||
when a certain spawner + CLI-flag input is detected.
|
a fork-spawn backend is being used WITHOUT
|
||||||
|
`--capture=sys`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
capture_mode: str = request.config.option.capture
|
||||||
|
# `tee-sys` is also sys-level capture (just additionally writes
|
||||||
|
# to the original `sys.__stdout__/__stderr__`); fork-safe like
|
||||||
|
# `sys`. Only `fd`-level capture is the deadlock pattern.
|
||||||
if (
|
if (
|
||||||
not _cap_sys_passed_as_flag
|
capture_mode not in (
|
||||||
|
'sys',
|
||||||
|
'tee-sys',
|
||||||
|
)
|
||||||
and
|
and
|
||||||
is_forking_spawner
|
is_forking_spawner
|
||||||
):
|
):
|
||||||
pytest.skip(
|
pytest.skip(
|
||||||
f'Spawner {start_method!r} requires the flag,\n'
|
f'Spawner {start_method!r} requires the flag,\n'
|
||||||
f'--capture=sys or similar..\n'
|
f'--capture=sys or --capture=tee-sys..\n'
|
||||||
|
f'(got --capture={capture_mode!r})\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -666,8 +729,13 @@ def set_fork_aware_capture(
|
||||||
which can oddly make certain tests hang/fail.
|
which can oddly make certain tests hang/fail.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if _cap_sys_passed_as_flag:
|
# Fast-path: user already passed sys-level capture
|
||||||
return 'sys'
|
# (`sys` or `tee-sys`) at the CLI — no override needed.
|
||||||
|
if request.config.option.capture in (
|
||||||
|
'sys',
|
||||||
|
'tee-sys',
|
||||||
|
):
|
||||||
|
return request.config.option.capture
|
||||||
|
|
||||||
capsys: pytest.CaptureFixture = maybe_override_capture(
|
capsys: pytest.CaptureFixture = maybe_override_capture(
|
||||||
request=request,
|
request=request,
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,7 @@ from ..msg import (
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
from .._exceptions import (
|
from .._exceptions import (
|
||||||
|
ActorTooSlowError,
|
||||||
NoResult,
|
NoResult,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
)
|
)
|
||||||
|
|
@ -268,6 +269,7 @@ class Portal:
|
||||||
async def cancel_actor(
|
async def cancel_actor(
|
||||||
self,
|
self,
|
||||||
timeout: float | None = None,
|
timeout: float | None = None,
|
||||||
|
raise_on_timeout: bool = False,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
|
|
@ -281,6 +283,17 @@ class Portal:
|
||||||
`._context.Context.cancel()` which CAN be used for this
|
`._context.Context.cancel()` which CAN be used for this
|
||||||
purpose.
|
purpose.
|
||||||
|
|
||||||
|
`raise_on_timeout` (default `False`):
|
||||||
|
|
||||||
|
- `False` (legacy): on bounded-wait expiry, log at DEBUG
|
||||||
|
and return `False`. Used by callers that issue cancel
|
||||||
|
fire-and-forget and have their own escalation
|
||||||
|
(e.g. `_spawn.soft_kill()` checks `proc.poll()` after).
|
||||||
|
- `True`: on bounded-wait expiry, raise `ActorTooSlowError`
|
||||||
|
so the caller MUST handle the failure explicitly.
|
||||||
|
`ActorNursery.cancel()` opts in so it can escalate via
|
||||||
|
`proc.terminate()` per SC-discipline.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
|
||||||
|
|
@ -301,15 +314,16 @@ class Portal:
|
||||||
|
|
||||||
# XXX the one spot we set it?
|
# XXX the one spot we set it?
|
||||||
chan._cancel_called: bool = True
|
chan._cancel_called: bool = True
|
||||||
|
cancel_timeout: float = (
|
||||||
|
timeout
|
||||||
|
or
|
||||||
|
self.cancel_timeout
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
# XXX: sure would be nice to make this work with
|
# XXX: sure would be nice to make this work with
|
||||||
# a proper shield
|
# a proper shield
|
||||||
with trio.move_on_after(
|
with trio.move_on_after(cancel_timeout) as cs:
|
||||||
timeout
|
|
||||||
or
|
|
||||||
self.cancel_timeout
|
|
||||||
) as cs:
|
|
||||||
cs.shield: bool = True
|
cs.shield: bool = True
|
||||||
await self.run_from_ns(
|
await self.run_from_ns(
|
||||||
'self',
|
'self',
|
||||||
|
|
@ -317,16 +331,24 @@ class Portal:
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
# `move_on_after` fired — peer didn't ack within
|
||||||
# may timeout and we never get an ack (obvi racy)
|
# bounded window. Behaviour depends on
|
||||||
# but that doesn't mean it wasn't cancelled.
|
# `raise_on_timeout`:
|
||||||
|
assert cs.cancelled_caught
|
||||||
|
if raise_on_timeout:
|
||||||
|
raise ActorTooSlowError(
|
||||||
|
f'Peer {peer_id} did not ack `Actor.cancel()`'
|
||||||
|
f'-RPC within bounded wait of '
|
||||||
|
f'{cancel_timeout!r}s'
|
||||||
|
)
|
||||||
|
|
||||||
|
# legacy fire-and-forget path: log + return False so
|
||||||
|
# the caller can decide whether to escalate.
|
||||||
log.debug(
|
log.debug(
|
||||||
f'May have failed to cancel peer?\n'
|
f'May have failed to cancel peer?\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'c)=?> {peer_id}\n'
|
f'c)=?> {peer_id}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# if we get here some weird cancellation case happened
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except TransportClosed as tpt_err:
|
except TransportClosed as tpt_err:
|
||||||
|
|
|
||||||
|
|
@ -38,8 +38,14 @@ from ..discovery._addr import (
|
||||||
UnwrappedAddress,
|
UnwrappedAddress,
|
||||||
mk_uuid,
|
mk_uuid,
|
||||||
)
|
)
|
||||||
from ._state import current_actor, is_main_process
|
from ._state import (
|
||||||
from ..log import get_logger, get_loglevel
|
current_actor,
|
||||||
|
is_main_process,
|
||||||
|
)
|
||||||
|
from ..log import (
|
||||||
|
get_logger,
|
||||||
|
get_loglevel,
|
||||||
|
)
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ..trionics import (
|
from ..trionics import (
|
||||||
|
|
@ -47,6 +53,7 @@ from ..trionics import (
|
||||||
collapse_eg,
|
collapse_eg,
|
||||||
)
|
)
|
||||||
from .._exceptions import (
|
from .._exceptions import (
|
||||||
|
ActorTooSlowError,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
from .._root import (
|
from .._root import (
|
||||||
|
|
@ -60,11 +67,93 @@ if TYPE_CHECKING:
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
# from ..ipc._server import IPCServer
|
# from ..ipc._server import IPCServer
|
||||||
from ..ipc import IPCServer
|
from ..ipc import IPCServer
|
||||||
|
from ..spawn._spawn import ProcessType
|
||||||
|
|
||||||
|
|
||||||
log = get_logger()
|
log = get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
async def _try_cancel_then_kill(
|
||||||
|
portal: Portal,
|
||||||
|
# `ProcessType` is `TYPE_CHECKING`-only (defined under that
|
||||||
|
# guard in `..spawn._spawn`) so we stringify here to avoid
|
||||||
|
# eager runtime eval of the annotation at function-def time
|
||||||
|
# (this module has no `from __future__ import annotations`).
|
||||||
|
proc: 'ProcessType',
|
||||||
|
subactor: Actor,
|
||||||
|
debug_mode_active: bool = False,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Per-child cancel-then-escalate helper used by
|
||||||
|
`ActorNursery.cancel()`.
|
||||||
|
|
||||||
|
Sends a graceful actor-runtime cancel-RPC via
|
||||||
|
`Portal.cancel_actor(raise_on_timeout=True)`. If the bounded-wait
|
||||||
|
expires before the peer ack's, `ActorTooSlowError` is raised and
|
||||||
|
we escalate via `proc.terminate()` (SIGTERM) per SC-discipline:
|
||||||
|
|
||||||
|
graceful cancel-req -> bounded wait -> hard-kill
|
||||||
|
|
||||||
|
Without this escalation, a same-name sibling subactor whose
|
||||||
|
cancel-RPC failed to ack within `Portal.cancel_timeout` (e.g.
|
||||||
|
under TCP+forkserver register-RPC contention) would park the
|
||||||
|
parent's `soft_kill()` watcher forever waiting on `proc.poll()`,
|
||||||
|
deadlocking nursery `__aexit__`. See `ActorTooSlowError` for
|
||||||
|
the wider write-up.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# XXX, do NOT escalate to `proc.terminate()` while ANY of
|
||||||
|
# the following are true — SIGTERM-ing a sub would tear
|
||||||
|
# down its sub-tree including any descendant proxying
|
||||||
|
# stdio to/from a REPL-locked actor, clobbering the user's
|
||||||
|
# debug session:
|
||||||
|
#
|
||||||
|
# - `Lock.ctx_in_debug is not None`: most precise — some
|
||||||
|
# actor in the tree is currently REPL-locked. Set in the
|
||||||
|
# root actor for the lifetime of the lock. Raceable
|
||||||
|
# (false negative if SIGINT arrives before lock-acquire
|
||||||
|
# RPC completes).
|
||||||
|
#
|
||||||
|
# - `_runtime_vars['_debug_mode']`: root-actor was opened
|
||||||
|
# with `debug_mode=True` (via `open_root_actor` /
|
||||||
|
# `open_nursery`). Set once at root boot, never cleared.
|
||||||
|
# Catches deep-descendant REPL sessions even when the
|
||||||
|
# intermediate nurseries didn't pass `debug_mode=` per-
|
||||||
|
# child.
|
||||||
|
#
|
||||||
|
# - `debug_mode_active`: this nursery has at least one
|
||||||
|
# child started with an explicit `debug_mode=` arg
|
||||||
|
# (`ActorNursery._at_least_one_child_in_debug`). Catches
|
||||||
|
# the case where root is NOT in debug-mode but a
|
||||||
|
# nursery-direct child opted in.
|
||||||
|
#
|
||||||
|
# Independent because root may NOT be in debug-mode even
|
||||||
|
# when a child is (only the child's `_runtime_vars` is
|
||||||
|
# mutated by per-child `debug_mode=True`). ORing covers
|
||||||
|
# every flavor without false-positively skipping
|
||||||
|
# legitimate hard-kill paths in non-debug trees.
|
||||||
|
if (
|
||||||
|
debug.Lock.ctx_in_debug is not None
|
||||||
|
or
|
||||||
|
_state._runtime_vars.get('_debug_mode', False)
|
||||||
|
or
|
||||||
|
debug_mode_active
|
||||||
|
):
|
||||||
|
await portal.cancel_actor()
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await portal.cancel_actor(raise_on_timeout=True)
|
||||||
|
except ActorTooSlowError as too_slow:
|
||||||
|
log.error(
|
||||||
|
f'Cancel-ack TIMED OUT for sub-actor\n'
|
||||||
|
f' uid: {subactor.aid.reprol()!r}\n'
|
||||||
|
f' reason: {too_slow}\n'
|
||||||
|
f'-> escalating to `proc.terminate()` (hard-kill)\n'
|
||||||
|
)
|
||||||
|
proc.terminate()
|
||||||
|
|
||||||
|
|
||||||
class ActorNursery:
|
class ActorNursery:
|
||||||
'''
|
'''
|
||||||
The fundamental actor supervision construct: spawn and manage
|
The fundamental actor supervision construct: spawn and manage
|
||||||
|
|
@ -428,10 +517,23 @@ class ActorNursery:
|
||||||
else: # there's no other choice left
|
else: # there's no other choice left
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
|
||||||
# spawn cancel tasks for each sub-actor
|
# spawn per-child cancel tasks; the helper
|
||||||
|
# escalates to hard-kill on
|
||||||
|
# `ActorTooSlowError` rather than silently
|
||||||
|
# swallowing the cancel-ack timeout, EXCEPT
|
||||||
|
# when this nursery has any debug-eligible
|
||||||
|
# child (in which case we keep legacy
|
||||||
|
# fire-and-forget semantics to avoid
|
||||||
|
# clobbering an active REPL).
|
||||||
assert portal
|
assert portal
|
||||||
if portal.channel.connected():
|
if portal.channel.connected():
|
||||||
tn.start_soon(portal.cancel_actor)
|
tn.start_soon(
|
||||||
|
_try_cancel_then_kill,
|
||||||
|
portal,
|
||||||
|
proc,
|
||||||
|
subactor,
|
||||||
|
self._at_least_one_child_in_debug,
|
||||||
|
)
|
||||||
|
|
||||||
log.cancel(msg)
|
log.cancel(msg)
|
||||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
|
|
|
||||||
|
|
@ -368,6 +368,7 @@ from tractor.runtime._portal import Portal
|
||||||
from ._spawn import (
|
from ._spawn import (
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
soft_kill,
|
soft_kill,
|
||||||
|
wait_for_peer_or_proc_death,
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
@ -760,6 +761,26 @@ class _ForkedProc:
|
||||||
self._pidfd = -1
|
self._pidfd = -1
|
||||||
return self._returncode
|
return self._returncode
|
||||||
|
|
||||||
|
def terminate(self) -> None:
|
||||||
|
'''
|
||||||
|
OS-level `SIGTERM` to the child. Swallows
|
||||||
|
`ProcessLookupError` (already dead).
|
||||||
|
|
||||||
|
Mirrors `trio.Process.terminate()` /
|
||||||
|
`multiprocessing.Process.terminate()` — sends SIGTERM
|
||||||
|
(graceful, allows the child a chance to clean up via
|
||||||
|
signal-handlers) rather than SIGKILL. Used by
|
||||||
|
`ActorNursery.cancel()`'s per-child escalation when
|
||||||
|
`Portal.cancel_actor()` raises `ActorTooSlowError`,
|
||||||
|
and by the legacy `hard_kill=True` branch on the same
|
||||||
|
path.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
os.kill(self.pid, signal.SIGTERM)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
def kill(self) -> None:
|
def kill(self) -> None:
|
||||||
'''
|
'''
|
||||||
OS-level `SIGKILL` to the child. Swallows
|
OS-level `SIGKILL` to the child. Swallows
|
||||||
|
|
@ -948,7 +969,18 @@ async def main_thread_forkserver_proc(
|
||||||
f' |_{proc}\n'
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
event, chan = await ipc_server.wait_for_peer(uid)
|
# race the handshake-wait against proc-death so a
|
||||||
|
# sub that dies during boot (e.g. crashed on import
|
||||||
|
# before reaching `_actor_child_main`, leaving a
|
||||||
|
# zombie + no cmdline) surfaces as `ActorFailure`
|
||||||
|
# instead of parking the spawning task forever on
|
||||||
|
# an unsignalled `_peer_connected[uid]` event.
|
||||||
|
event, chan = await wait_for_peer_or_proc_death(
|
||||||
|
ipc_server,
|
||||||
|
uid,
|
||||||
|
proc_wait=proc.wait,
|
||||||
|
proc_repr=repr(proc),
|
||||||
|
)
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
cancelled_during_spawn = True
|
cancelled_during_spawn = True
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ from tractor.log import get_logger
|
||||||
from tractor.discovery._addr import (
|
from tractor.discovery._addr import (
|
||||||
UnwrappedAddress,
|
UnwrappedAddress,
|
||||||
)
|
)
|
||||||
|
from .._exceptions import ActorFailure
|
||||||
from ._reap import unlink_uds_bind_addrs
|
from ._reap import unlink_uds_bind_addrs
|
||||||
from tractor.runtime._portal import Portal
|
from tractor.runtime._portal import Portal
|
||||||
from tractor.runtime._runtime import Actor
|
from tractor.runtime._runtime import Actor
|
||||||
|
|
@ -106,6 +107,71 @@ else:
|
||||||
await trio.lowlevel.wait_readable(proc.sentinel)
|
await trio.lowlevel.wait_readable(proc.sentinel)
|
||||||
|
|
||||||
|
|
||||||
|
async def wait_for_peer_or_proc_death(
|
||||||
|
ipc_server,
|
||||||
|
uid: tuple[str, str],
|
||||||
|
# TODO? not not types?
|
||||||
|
proc_wait: 'Callable[[], Awaitable]',
|
||||||
|
proc_repr: str = '',
|
||||||
|
|
||||||
|
) -> 'tuple[trio.Event, Channel]':
|
||||||
|
'''
|
||||||
|
Race `IPCServer.wait_for_peer(uid)` against the sub-proc's
|
||||||
|
own `.wait()` coroutine. Whichever completes first cancels
|
||||||
|
the other.
|
||||||
|
|
||||||
|
Used by every spawn-backend to detect a sub-actor that
|
||||||
|
*dies during boot* before completing the parent-handshake-
|
||||||
|
callback (e.g. crashed on import, exec'd-out, kernel-killed
|
||||||
|
pre-`_actor_child_main`). Without this race, the
|
||||||
|
handshake-wait — backed by an unsignalled `trio.Event` —
|
||||||
|
parks the spawning task forever and leaves the dead child
|
||||||
|
as a zombie since nobody calls `proc.wait()` to reap.
|
||||||
|
|
||||||
|
On normal handshake-complete: returns `(event, chan)`
|
||||||
|
identical to a bare `wait_for_peer`.
|
||||||
|
|
||||||
|
On proc-death-first: raises `ActorFailure` carrying the
|
||||||
|
proc's exit code, allowing the supervisor to surface a
|
||||||
|
clean error rather than hanging indefinitely.
|
||||||
|
|
||||||
|
`proc_wait` is a 0-arg async callable returning the proc's
|
||||||
|
exit-status — kept generic so each backend can pass its
|
||||||
|
own (`trio.Process.wait`, `_ForkedProc.wait`,
|
||||||
|
`proc_waiter(mp.Process)`, etc.).
|
||||||
|
|
||||||
|
`proc_repr` is an optional string used in the
|
||||||
|
`ActorFailure` message for diag.
|
||||||
|
|
||||||
|
'''
|
||||||
|
result: dict = {}
|
||||||
|
|
||||||
|
async def _await_handshake():
|
||||||
|
event, chan = await ipc_server.wait_for_peer(uid)
|
||||||
|
result['handshake'] = (event, chan)
|
||||||
|
boot_n.cancel_scope.cancel()
|
||||||
|
|
||||||
|
async def _await_death():
|
||||||
|
rc = await proc_wait()
|
||||||
|
result['died'] = rc
|
||||||
|
boot_n.cancel_scope.cancel()
|
||||||
|
|
||||||
|
async with trio.open_nursery() as boot_n:
|
||||||
|
boot_n.start_soon(_await_handshake)
|
||||||
|
boot_n.start_soon(_await_death)
|
||||||
|
|
||||||
|
if 'handshake' in result:
|
||||||
|
return result['handshake']
|
||||||
|
|
||||||
|
# only reached if proc-death won the race
|
||||||
|
raise ActorFailure(
|
||||||
|
f'Sub-actor {uid!r} died during boot '
|
||||||
|
f'(rc={result.get("died")!r}) before completing '
|
||||||
|
f'parent-handshake.\n'
|
||||||
|
f' proc: {proc_repr}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def try_set_start_method(
|
def try_set_start_method(
|
||||||
key: SpawnMethodKey
|
key: SpawnMethodKey
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,15 +1,23 @@
|
||||||
"""
|
"""
|
||||||
`xontrib_tractor_diag`: pytest/tractor diagnostic aliases.
|
`xontrib_tractor_diag`: pytest/tractor diagnostic aliases.
|
||||||
|
|
||||||
|
All aliases live under the `acli.` namespace so xonsh's
|
||||||
|
prefix-completion treats them as a sub-cmd group — type
|
||||||
|
`acli.<TAB>` to see the full set.
|
||||||
|
|
||||||
Provides:
|
Provides:
|
||||||
- `pytree <pid|pgrep-pat>` psutil-backed proc tree,
|
- `acli.pytree <pid|pgrep-pat>` psutil-backed proc tree,
|
||||||
live + zombies split.
|
live + zombies split.
|
||||||
- `hung-dump <pid|pat> [...]` kernel `wchan`/`stack` +
|
- `acli.hung_dump <pid|pat> [...]` kernel `wchan`/`stack` +
|
||||||
`py-spy dump` (incl `--locals`)
|
`py-spy dump` (incl `--locals`)
|
||||||
for each pid in tree.
|
for each pid in tree.
|
||||||
- `bindspace-scan [<dir>]` find orphaned tractor UDS
|
- `acli.bindspace_scan [<dir>]` find orphaned tractor UDS
|
||||||
sock files (no live owner pid).
|
sock files (no live owner pid).
|
||||||
default: `$XDG_RUNTIME_DIR/tractor`.
|
default: `$XDG_RUNTIME_DIR/tractor`.
|
||||||
|
- `acli.reap [opts]` SC-polite zombie-subactor
|
||||||
|
reaper + optional `/dev/shm/`
|
||||||
|
+ UDS sock-file sweeps.
|
||||||
|
alias for `scripts/tractor-reap`.
|
||||||
|
|
||||||
Loading from repo root:
|
Loading from repo root:
|
||||||
xontrib load -p ./xontrib tractor_diag
|
xontrib load -p ./xontrib tractor_diag
|
||||||
|
|
@ -36,7 +44,7 @@ except ImportError:
|
||||||
psutil = None
|
psutil = None
|
||||||
print(
|
print(
|
||||||
'[tractor-diag] `psutil` missing — '
|
'[tractor-diag] `psutil` missing — '
|
||||||
'pytree disabled, hung-dump uses pgrep fallback. '
|
'acli.pytree disabled, acli.hung_dump uses pgrep fallback. '
|
||||||
'`uv pip install psutil` for full functionality.'
|
'`uv pip install psutil` for full functionality.'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -157,7 +165,7 @@ def _pytree(args):
|
||||||
severity-ordered buckets so leaked / defunct procs
|
severity-ordered buckets so leaked / defunct procs
|
||||||
don't hide in the noise of normal `live` rows.
|
don't hide in the noise of normal `live` rows.
|
||||||
|
|
||||||
usage: pytree <pid|pgrep-pattern> [...]
|
usage: acli.pytree [--tree|-t] <pid|pgrep-pattern> [...]
|
||||||
|
|
||||||
classification (per-proc, not per-tree):
|
classification (per-proc, not per-tree):
|
||||||
|
|
||||||
|
|
@ -174,20 +182,43 @@ def _pytree(args):
|
||||||
descendants show as `live` if they themselves still
|
descendants show as `live` if they themselves still
|
||||||
have a real (non-init) parent (the orphan root), but
|
have a real (non-init) parent (the orphan root), but
|
||||||
the orphan root itself appears in `orphans`.
|
the orphan root itself appears in `orphans`.
|
||||||
|
|
||||||
|
Cross-bucket parent annotation (always emitted):
|
||||||
|
when a row's parent (by ppid) lives in a *different*
|
||||||
|
severity bucket, the row is suffixed with
|
||||||
|
`[parent: <pid> (in `<bucket>`)]` so the visual
|
||||||
|
`└─` marker still resolves to a findable parent
|
||||||
|
even when bucketing scatters parent and child into
|
||||||
|
separate sections.
|
||||||
|
|
||||||
|
`--tree` / `-t` flag (opt-in):
|
||||||
|
additionally emit a flat walk-order `## tree`
|
||||||
|
section at the top — a contiguous parent-child
|
||||||
|
tree shape with no severity-grouping. Same procs,
|
||||||
|
no annotations needed because each parent appears
|
||||||
|
directly above its children.
|
||||||
'''
|
'''
|
||||||
if not args:
|
flag_tree: bool = False
|
||||||
print('usage: pytree <pid|pgrep-pattern> [...]')
|
pos_args: list = []
|
||||||
|
for a in args:
|
||||||
|
if a in ('--tree', '-t'):
|
||||||
|
flag_tree = True
|
||||||
|
else:
|
||||||
|
pos_args.append(a)
|
||||||
|
|
||||||
|
if not pos_args:
|
||||||
|
print('usage: acli.pytree [--tree|-t] <pid|pgrep-pattern> [...]')
|
||||||
return 1
|
return 1
|
||||||
if psutil is None:
|
if psutil is None:
|
||||||
print('pytree requires psutil; install via `uv pip install psutil`')
|
print('pytree requires psutil; install via `uv pip install psutil`')
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
roots: list = []
|
roots: list = []
|
||||||
for a in args:
|
for a in pos_args:
|
||||||
roots.extend(_resolve_pids(a))
|
roots.extend(_resolve_pids(a))
|
||||||
roots = sorted(set(roots))
|
roots = sorted(set(roots))
|
||||||
if not roots:
|
if not roots:
|
||||||
print(f'(no procs match: {args})')
|
print(f'(no procs match: {pos_args})')
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
# statuses considered "defunct" — STATUS_ZOMBIE is the
|
# statuses considered "defunct" — STATUS_ZOMBIE is the
|
||||||
|
|
@ -199,11 +230,16 @@ def _pytree(args):
|
||||||
}
|
}
|
||||||
|
|
||||||
seen: set = set()
|
seen: set = set()
|
||||||
|
walk_order: list = [] # [(proc, depth)] preserved walk order
|
||||||
live: list = [] # [(proc, depth)]
|
live: list = [] # [(proc, depth)]
|
||||||
orphans: list = []
|
orphans: list = []
|
||||||
zombies: list = []
|
zombies: list = []
|
||||||
gone: list = []
|
gone: list = []
|
||||||
|
|
||||||
|
# parent-bucket lookup populated post-classification so
|
||||||
|
# `_row()` can annotate cross-bucket parent refs.
|
||||||
|
pid_to_bucket: dict = {}
|
||||||
|
|
||||||
for r in roots:
|
for r in roots:
|
||||||
for (p, depth) in _walk_tree_with_depth(r):
|
for (p, depth) in _walk_tree_with_depth(r):
|
||||||
if p.pid in seen:
|
if p.pid in seen:
|
||||||
|
|
@ -219,10 +255,14 @@ def _pytree(args):
|
||||||
# severity order: zombie > orphan > live.
|
# severity order: zombie > orphan > live.
|
||||||
if status in defunct_statuses:
|
if status in defunct_statuses:
|
||||||
zombies.append(entry)
|
zombies.append(entry)
|
||||||
|
pid_to_bucket[p.pid] = 'zombies'
|
||||||
elif ppid == 1:
|
elif ppid == 1:
|
||||||
orphans.append(entry)
|
orphans.append(entry)
|
||||||
|
pid_to_bucket[p.pid] = 'orphans'
|
||||||
else:
|
else:
|
||||||
live.append(entry)
|
live.append(entry)
|
||||||
|
pid_to_bucket[p.pid] = 'live'
|
||||||
|
walk_order.append(entry)
|
||||||
|
|
||||||
total: int = len(live) + len(orphans) + len(zombies)
|
total: int = len(live) + len(orphans) + len(zombies)
|
||||||
print(f'# pytree: {total} procs across roots {roots}')
|
print(f'# pytree: {total} procs across roots {roots}')
|
||||||
|
|
@ -230,14 +270,44 @@ def _pytree(args):
|
||||||
hdr = ' ' + 'PID'.rjust(7) + ' ' + 'PPID'.rjust(7) + ' '
|
hdr = ' ' + 'PID'.rjust(7) + ' ' + 'PPID'.rjust(7) + ' '
|
||||||
hdr += 'STATUS'.ljust(10) + ' CMD'
|
hdr += 'STATUS'.ljust(10) + ' CMD'
|
||||||
|
|
||||||
def _row(entry):
|
def _row(entry, bucket: str|None = None):
|
||||||
'''
|
'''
|
||||||
Render `(proc, depth)` as an aligned row. Tree depth is
|
Render `(proc, depth)` as an aligned row. Tree depth is
|
||||||
rendered as a `└─` marker on the CMD column so PID/PPID/
|
rendered as a `└─` marker on the CMD column so PID/PPID/
|
||||||
STATUS stay column-aligned.
|
STATUS stay column-aligned.
|
||||||
|
|
||||||
|
When `bucket` is given AND the row's parent lives in a
|
||||||
|
*different* bucket, append a `[parent: <pid> (in `<b>`)]`
|
||||||
|
suffix so the `└─` marker can be resolved across the
|
||||||
|
severity-section split.
|
||||||
'''
|
'''
|
||||||
p, depth = entry
|
p, depth = entry
|
||||||
tree_pfx = (' ' * depth) + ('└─ ' if depth > 0 else '')
|
tree_pfx = (' ' * depth) + ('└─ ' if depth > 0 else '')
|
||||||
|
|
||||||
|
# cross-bucket parent annotation; safe to compute up
|
||||||
|
# front because `p.ppid()` is cheap and rarely
|
||||||
|
# raises (parent pid is read from `/proc/<pid>/stat`,
|
||||||
|
# cached by psutil).
|
||||||
|
parent_anno: str = ''
|
||||||
|
if (
|
||||||
|
bucket is not None
|
||||||
|
and depth > 0
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
parent_pid: int = p.ppid()
|
||||||
|
except psutil.NoSuchProcess:
|
||||||
|
parent_pid = 0
|
||||||
|
if parent_pid and parent_pid != 1:
|
||||||
|
parent_bucket: str|None = pid_to_bucket.get(parent_pid)
|
||||||
|
if (
|
||||||
|
parent_bucket is not None
|
||||||
|
and parent_bucket != bucket
|
||||||
|
):
|
||||||
|
parent_anno = (
|
||||||
|
f' [parent: {parent_pid} '
|
||||||
|
f'(in `{parent_bucket}`)]'
|
||||||
|
)
|
||||||
|
|
||||||
# NOTE: `psutil.ZombieProcess` is a *subclass* of
|
# NOTE: `psutil.ZombieProcess` is a *subclass* of
|
||||||
# `psutil.NoSuchProcess`, but the proc is NOT gone —
|
# `psutil.NoSuchProcess`, but the proc is NOT gone —
|
||||||
# it's a zombie whose `/proc/<pid>/cmdline` is empty/
|
# it's a zombie whose `/proc/<pid>/cmdline` is empty/
|
||||||
|
|
@ -249,44 +319,61 @@ def _pytree(args):
|
||||||
r = ' ' + str(p.pid).rjust(7)
|
r = ' ' + str(p.pid).rjust(7)
|
||||||
r += ' ' + str(p.ppid()).rjust(7)
|
r += ' ' + str(p.ppid()).rjust(7)
|
||||||
r += ' ' + p.status().ljust(10)
|
r += ' ' + p.status().ljust(10)
|
||||||
r += ' ' + tree_pfx + cmd
|
r += ' ' + tree_pfx + cmd + parent_anno
|
||||||
return r
|
return r
|
||||||
except psutil.ZombieProcess:
|
except psutil.ZombieProcess:
|
||||||
try:
|
try:
|
||||||
ppid = str(p.ppid())
|
ppid_str = str(p.ppid())
|
||||||
name = p.name()
|
name = p.name()
|
||||||
except psutil.NoSuchProcess:
|
except psutil.NoSuchProcess:
|
||||||
ppid, name = '?', '?'
|
ppid_str, name = '?', '?'
|
||||||
r = ' ' + str(p.pid).rjust(7)
|
r = ' ' + str(p.pid).rjust(7)
|
||||||
r += ' ' + ppid.rjust(7)
|
r += ' ' + ppid_str.rjust(7)
|
||||||
r += ' ' + 'zombie'.ljust(10)
|
r += ' ' + 'zombie'.ljust(10)
|
||||||
r += ' ' + tree_pfx + '[' + name + ' <defunct>]'
|
r += ' ' + tree_pfx + '[' + name + ' <defunct>]' + parent_anno
|
||||||
return r
|
return r
|
||||||
except psutil.NoSuchProcess:
|
except psutil.NoSuchProcess:
|
||||||
return ' ' + str(p.pid).rjust(7) + ' (gone mid-walk)'
|
return ' ' + str(p.pid).rjust(7) + ' (gone mid-walk)'
|
||||||
|
|
||||||
def _section(title: str, procs: list, hint: str = ''):
|
def _section(
|
||||||
|
title: str,
|
||||||
|
procs: list,
|
||||||
|
hint: str = '',
|
||||||
|
bucket: str|None = None,
|
||||||
|
):
|
||||||
print(f'\n## {title} ({len(procs)})' + (f' — {hint}' if hint else ''))
|
print(f'\n## {title} ({len(procs)})' + (f' — {hint}' if hint else ''))
|
||||||
if not procs:
|
if not procs:
|
||||||
print(' (none)')
|
print(' (none)')
|
||||||
return
|
return
|
||||||
print(hdr)
|
print(hdr)
|
||||||
for p in procs:
|
for p in procs:
|
||||||
print(_row(p))
|
print(_row(p, bucket=bucket))
|
||||||
|
|
||||||
# severity-ordered: most concerning first.
|
# `--tree` opt-in: emit a flat walk-order section first
|
||||||
|
# so the parent-child tree shape is contiguous (no
|
||||||
|
# severity-grouping). No `bucket` arg → no cross-bucket
|
||||||
|
# annotation, since each parent appears directly above
|
||||||
|
# its children here.
|
||||||
|
if flag_tree:
|
||||||
|
_section(
|
||||||
|
'tree', walk_order,
|
||||||
|
'flat walk-order, parent-child preserved',
|
||||||
|
)
|
||||||
|
|
||||||
|
# severity-ordered: most concerning first. Each section
|
||||||
|
# passes its own `bucket` name so `_row()` can annotate
|
||||||
|
# rows whose parents live in a different section.
|
||||||
_section(
|
_section(
|
||||||
'zombies', zombies,
|
'zombies', zombies,
|
||||||
'status `Z`/`X`, parent has not reaped',
|
'status `Z`/`X`, parent has not reaped',
|
||||||
|
bucket='zombies',
|
||||||
)
|
)
|
||||||
_section(
|
_section(
|
||||||
'orphans', orphans,
|
'orphans', orphans,
|
||||||
'`ppid==1`, reparented to init (leaked / parent gone)',
|
'`ppid==1`, reparented to init (leaked / parent gone)',
|
||||||
|
bucket='orphans',
|
||||||
)
|
)
|
||||||
_section('live', live)
|
_section('live', live, bucket='live')
|
||||||
|
|
||||||
if gone:
|
|
||||||
print(f'\n## gone-during-walk ({len(gone)}): {gone}')
|
|
||||||
|
|
||||||
if gone:
|
if gone:
|
||||||
print(f'\n## gone-during-walk ({len(gone)}): {gone}')
|
print(f'\n## gone-during-walk ({len(gone)}): {gone}')
|
||||||
|
|
@ -299,14 +386,14 @@ def _hung_dump(args):
|
||||||
kernel + python state for a hung pytest/tractor tree.
|
kernel + python state for a hung pytest/tractor tree.
|
||||||
walks all descendants of each `<pid|pgrep-pat>` arg.
|
walks all descendants of each `<pid|pgrep-pat>` arg.
|
||||||
|
|
||||||
usage: hung-dump <pid|pgrep-pattern> [...]
|
usage: acli.hung_dump <pid|pgrep-pattern> [...]
|
||||||
|
|
||||||
note: `/proc/<pid>/stack` and `py-spy dump` typically
|
note: `/proc/<pid>/stack` and `py-spy dump` typically
|
||||||
require CAP_SYS_PTRACE — invoked via `sudo -n`. run
|
require CAP_SYS_PTRACE — invoked via `sudo -n`. run
|
||||||
`sudo true` first to cache creds.
|
`sudo true` first to cache creds.
|
||||||
'''
|
'''
|
||||||
if not args:
|
if not args:
|
||||||
print('usage: hung-dump <pid|pgrep-pattern> [...]')
|
print('usage: acli.hung_dump <pid|pgrep-pattern> [...]')
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
# cache sudo creds upfront so per-pid `sudo -n` calls
|
# cache sudo creds upfront so per-pid `sudo -n` calls
|
||||||
|
|
@ -386,7 +473,7 @@ def _bindspace_scan(args):
|
||||||
(those whose embedded `<pid>` no longer corresponds to
|
(those whose embedded `<pid>` no longer corresponds to
|
||||||
a live process).
|
a live process).
|
||||||
|
|
||||||
usage: bindspace-scan [<dir>]
|
usage: acli.bindspace_scan [<dir>]
|
||||||
default: `$XDG_RUNTIME_DIR/tractor`
|
default: `$XDG_RUNTIME_DIR/tractor`
|
||||||
(or `/run/user/<uid>/tractor`)
|
(or `/run/user/<uid>/tractor`)
|
||||||
'''
|
'''
|
||||||
|
|
@ -454,11 +541,203 @@ def _bindspace_scan(args):
|
||||||
print(f'\nto unlink orphans:\n rm {unlink_cmd}')
|
print(f'\nto unlink orphans:\n rm {unlink_cmd}')
|
||||||
|
|
||||||
|
|
||||||
|
# --- acli.reap ------------------------------------------------
|
||||||
|
|
||||||
|
def _tractor_reap(args):
|
||||||
|
'''
|
||||||
|
SC-polite zombie-subactor reaper + optional `/dev/shm/`
|
||||||
|
orphan-segment sweep + optional UDS sock-file sweep.
|
||||||
|
|
||||||
|
usage: acli.reap [-h] [--parent PID] [--grace SEC]
|
||||||
|
[--dry-run] [--shm | --shm-only]
|
||||||
|
[--uds | --uds-only]
|
||||||
|
|
||||||
|
phases (run in order when enabled):
|
||||||
|
|
||||||
|
1. process reap — finds tractor subactor procs left
|
||||||
|
alive after a `pytest`/app run that failed to fully
|
||||||
|
cancel its tree. Default = orphan-mode (PPid==1
|
||||||
|
init-reparented procs whose cwd matches repo root
|
||||||
|
AND cmdline contains `python`). With `--parent`,
|
||||||
|
scopes to descendants of a specific live PID.
|
||||||
|
SIGINT first, then SIGKILL after `--grace` (default
|
||||||
|
3.0s).
|
||||||
|
2. shm sweep (`--shm`/`--shm-only`) — unlinks
|
||||||
|
`/dev/shm/<file>` entries owned by the current uid
|
||||||
|
that no live process has open. Needed because
|
||||||
|
`tractor` disables `mp.resource_tracker`.
|
||||||
|
3. UDS sweep (`--uds`/`--uds-only`) — unlinks
|
||||||
|
`${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock`
|
||||||
|
files whose binder pid is dead (or the `1616`
|
||||||
|
registry sentinel). See issue #452.
|
||||||
|
|
||||||
|
Mirrors `scripts/tractor-reap` (use `-n`/`--dry-run`
|
||||||
|
first to see what would be touched).
|
||||||
|
|
||||||
|
'''
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
prog='acli.reap',
|
||||||
|
description=_tractor_reap.__doc__,
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--parent', '-p',
|
||||||
|
type=int,
|
||||||
|
default=None,
|
||||||
|
help='descendant-mode: reap procs with PPid==<pid>',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--grace', '-g',
|
||||||
|
type=float,
|
||||||
|
default=3.0,
|
||||||
|
help='SIGINT grace window in seconds (default 3.0)',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--dry-run', '-n',
|
||||||
|
action='store_true',
|
||||||
|
help='list matched pids/paths but do not signal/unlink',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--shm',
|
||||||
|
action='store_true',
|
||||||
|
help='also unlink orphaned /dev/shm segments',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--shm-only',
|
||||||
|
action='store_true',
|
||||||
|
help='skip process reap; only do the shm sweep',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--uds',
|
||||||
|
action='store_true',
|
||||||
|
help='also unlink orphaned UDS sock-files',
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--uds-only',
|
||||||
|
action='store_true',
|
||||||
|
help='skip process reap + shm; only do the UDS sweep',
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
ns = parser.parse_args(args)
|
||||||
|
except SystemExit as se:
|
||||||
|
# `argparse` raises SystemExit on `-h`/bad-args; let
|
||||||
|
# xonsh treat it as a normal alias return code.
|
||||||
|
return int(se.code) if se.code is not None else 0
|
||||||
|
|
||||||
|
skip_proc_reap: bool = (
|
||||||
|
ns.shm_only
|
||||||
|
or
|
||||||
|
ns.uds_only
|
||||||
|
)
|
||||||
|
|
||||||
|
# repo-root resolution: `git rev-parse --show-toplevel`
|
||||||
|
# first, falling back to the xontrib file's parent of
|
||||||
|
# parent. mirrors `scripts/tractor-reap._repo_root()`.
|
||||||
|
try:
|
||||||
|
repo_str: str = sp.check_output(
|
||||||
|
['git', 'rev-parse', '--show-toplevel'],
|
||||||
|
stderr=sp.DEVNULL,
|
||||||
|
text=True,
|
||||||
|
).strip()
|
||||||
|
repo: Path = Path(repo_str)
|
||||||
|
except (sp.CalledProcessError, FileNotFoundError):
|
||||||
|
repo: Path = Path(__file__).resolve().parent.parent
|
||||||
|
|
||||||
|
# lazy-import the reap helpers since the package may not
|
||||||
|
# have been on `sys.path` at xontrib-load time (e.g. the
|
||||||
|
# contrib was sourced before activating the venv).
|
||||||
|
import sys
|
||||||
|
if str(repo) not in sys.path:
|
||||||
|
sys.path.insert(0, str(repo))
|
||||||
|
from tractor._testing._reap import (
|
||||||
|
find_descendants,
|
||||||
|
find_orphans,
|
||||||
|
find_orphaned_shm,
|
||||||
|
find_orphaned_uds,
|
||||||
|
reap,
|
||||||
|
reap_shm,
|
||||||
|
reap_uds,
|
||||||
|
)
|
||||||
|
|
||||||
|
rc: int = 0
|
||||||
|
|
||||||
|
# phase 1: process reap (skipped under `--*-only`)
|
||||||
|
if not skip_proc_reap:
|
||||||
|
if ns.parent is not None:
|
||||||
|
pids: list = find_descendants(ns.parent)
|
||||||
|
mode: str = f'descendants of PPid={ns.parent}'
|
||||||
|
else:
|
||||||
|
pids = find_orphans(repo)
|
||||||
|
mode = f'orphans (PPid=1, cwd={repo})'
|
||||||
|
|
||||||
|
if not pids:
|
||||||
|
print(f'[acli.reap] no {mode} to reap')
|
||||||
|
elif ns.dry_run:
|
||||||
|
print(
|
||||||
|
f'[acli.reap] dry-run — {mode}:\n {pids}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_, survivors = reap(pids, grace=ns.grace)
|
||||||
|
if survivors:
|
||||||
|
rc = 1
|
||||||
|
|
||||||
|
# phase 2: shm sweep (opt-in)
|
||||||
|
if ns.shm or ns.shm_only:
|
||||||
|
leaked: list = find_orphaned_shm()
|
||||||
|
if not leaked:
|
||||||
|
print(
|
||||||
|
'[acli.reap] no orphaned /dev/shm '
|
||||||
|
'segments to sweep'
|
||||||
|
)
|
||||||
|
elif ns.dry_run:
|
||||||
|
print(
|
||||||
|
f'[acli.reap] dry-run — {len(leaked)} '
|
||||||
|
f'orphaned shm segment(s):\n {leaked}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_, errors = reap_shm(leaked)
|
||||||
|
if errors:
|
||||||
|
rc = 1
|
||||||
|
|
||||||
|
# phase 3: UDS sweep (opt-in)
|
||||||
|
if ns.uds or ns.uds_only:
|
||||||
|
leaked_uds: list = find_orphaned_uds()
|
||||||
|
if not leaked_uds:
|
||||||
|
print(
|
||||||
|
'[acli.reap] no orphaned UDS sock-files '
|
||||||
|
'to sweep'
|
||||||
|
)
|
||||||
|
elif ns.dry_run:
|
||||||
|
print(
|
||||||
|
f'[acli.reap] dry-run — {len(leaked_uds)} '
|
||||||
|
f'orphaned UDS sock-file(s):\n {leaked_uds}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_, errors = reap_uds(leaked_uds)
|
||||||
|
if errors:
|
||||||
|
rc = 1
|
||||||
|
|
||||||
|
return rc
|
||||||
|
|
||||||
|
|
||||||
# --- registration ---------------------------------------------
|
# --- registration ---------------------------------------------
|
||||||
|
|
||||||
aliases['pytree'] = _pytree
|
# all aliases under the `acli.` namespace so xonsh's prefix-
|
||||||
aliases['hung-dump'] = _hung_dump
|
# completion makes them feel like a sub-cmd group: type
|
||||||
aliases['bindspace-scan'] = _bindspace_scan
|
# `acli.<TAB>` and the full set is suggested. no parent
|
||||||
|
# `acli` cmd exists — the dot is purely a naming convention.
|
||||||
|
_TCLI_ALIASES: dict = {
|
||||||
|
'acli.pytree': _pytree,
|
||||||
|
'acli.hung_dump': _hung_dump,
|
||||||
|
'acli.bindspace_scan': _bindspace_scan,
|
||||||
|
'acli.reap': _tractor_reap,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _name, _fn in _TCLI_ALIASES.items():
|
||||||
|
aliases[_name] = _fn
|
||||||
|
|
||||||
|
|
||||||
# xontrib protocol hooks (for `xontrib load tractor_diag`).
|
# xontrib protocol hooks (for `xontrib load tractor_diag`).
|
||||||
|
|
@ -468,6 +747,6 @@ def _load_xontrib_(xsh, **_):
|
||||||
|
|
||||||
|
|
||||||
def _unload_xontrib_(xsh, **_):
|
def _unload_xontrib_(xsh, **_):
|
||||||
for name in ('pytree', 'hung-dump', 'bindspace-scan'):
|
for name in _TCLI_ALIASES:
|
||||||
aliases.pop(name, None)
|
aliases.pop(name, None)
|
||||||
return {}
|
return {}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue