Compare commits

...

9 Commits

Author SHA1 Message Date
Gud Boi 66f1941f46 Wire `reg_addr` into `test_context_stream_semantics`
Same wire-up pattern as the prior `test_dynamic_pub_sub`
commit: each test that already pulled in `debug_mode`
now also pulls in `reg_addr` and passes
`registry_addrs=[reg_addr]` into `tractor.open_nursery()`,
so the suite's standard registry-addr conventions apply.

Tests touched:
- `test_started_misuse`
- `test_simple_context`
- `test_parent_cancels`
- `test_one_end_stream_not_opened`
- `test_maybe_allow_overruns_stream`
- `test_ctx_with_self_actor`

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 13:52:28 -04:00
Gud Boi 9b05f659b3 Wire `test_dynamic_pub_sub` to standard fixtures
Pull in the `reg_addr`, `debug_mode`, and `test_log`
fixtures so this test follows the same conventions as
the rest of the suite:

- pass `registry_addrs=[reg_addr]` + `debug_mode` into
  `tractor.open_nursery()` (so `--tpdb` etc work).
- after the `pytest.raises` block, add `assert err` +
  `test_log.exception('Timed out AS EXPECTED')` so the
  expected timeout is logged explicitly instead of
  swallowed.

Also,
- drop whitespace-only blank lines around the
  `subs` param of `consumer()` and `ctx` param of
  `one_task_streams_and_one_handles_reqresp()`.
- promote `test_sigint_both_stream_types`'s one-line
  docstring to multi-line form.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 12:59:00 -04:00
Gud Boi 65fcfbf224 Bump `test_stale_entry_is_deleted`'s timeout to 30
Seems that when run in-suite it delays more then the so-measured "happy
path" timing; better to have no suite-global interruption then asserting
a fast single test's run.
2026-04-27 11:46:45 -04:00
Gud Boi 4f12d69b41 Add `--shm` orphan sweep to `tractor-reap`
Since `tractor.ipc._mp_bs.disable_mantracker()` turns off
`mp.resource_tracker` entirely (see the conc-anal doc
`subint_forkserver_mp_shared_memory_issue.md`), a
hard-crashing actor can leave `/dev/shm/<key>` segments
that nothing else GCs. New `tractor-reap` phase 2 sweeps
them.

Deats,
- `tractor/_testing/_reap.py`: add `find_orphaned_shm()`
  + `reap_shm()` helpers. Match criteria: regular file
  under `/dev/shm`, owned by current uid, AND no live
  proc has it open (mmap'd or fd-held). In-use
  enumeration via `psutil.Process.memory_maps()` +
  `.open_files()` — xplatform, kernel-canonical (same
  answer `lsof` would give), no reliance on
  tractor-specific shm-key naming.
- `_ensure_shm_supported()` guard: helpers raise
  `NotImplementedError` outside Linux/FreeBSD bc macOS
  POSIX shm has no fs-visible path (`shm_open` only)
  and Windows is a different story.
- `scripts/tractor-reap`: new `--shm` (run after
  process reap) and `--shm-only` (skip process phase)
  flags. `-n` dry-runs both phases. Exit code is `1`
  if either phase had survivors/errors.
- `pyproject.toml` + `uv.lock`: add `psutil>=7.0.0` to
  the `testing` dep group; lazy-imported in `_reap.py`
  so the process-reap path stays import-clean without
  it.

Also,
- doc `--shm` in `.claude/skills/run-tests/SKILL.md`
  (new section 10c) — covers match criteria + the
  preservation guarantee for unrelated apps.
- flip mitigation status in
  `subint_forkserver_mp_shared_memory_issue.md` from
  "could extend `tractor-reap`" to "implemented", with
  a note that callers should still UUID-pin shm keys to
  avoid cross-session collisions.

Verified locally vs 81 in-use segments held by `piker`,
`lttng-ust-*`, `aja-shm-*` — all preserved; only the
genuinely-orphaned tractor segments got unlinked.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 11:35:33 -04:00
Gud Boi aa3e230926 Fix `SharedMemory` under `subint_forkserver`
Implements the resolution described in c99d475d's
`subint_forkserver_mp_shared_memory_issue.md` (now
updated with the resolution post-mortem). Two-part
fix that side-steps `mp.resource_tracker` entirely
rather than try to make it fork-safe — turns out
that's both simpler AND more correct given tractor
already SC-manages allocation lifetimes.

Deats,
- `tractor/ipc/_mp_bs.py::disable_mantracker()`: drop the
  `platform.python_version_tuple()[:-1] >= ('3', '13')` branch — patches
  now run unconditionally:
  * monkey-patch `mp.resource_tracker. _resource_tracker` to a no-op
    `ManTracker` subclass (empty `register` / `unregister`
    / `ensure_running`).
  * return `partial(SharedMemory, track=False)` for the per-allocation
    opt-out.
  * belt + suspenders: even if something dodges the wrapper, the
    singleton can't talk to the inherited (broken) parent fd.

- `tractor/ipc/_shm.py::open_shm_list()`: drop the 3.13+ conditional
  skip of the unlink-callback; install a `try_unlink()` wrapper that
  swallows `FileNotFoundError` (sibling-already-cleaned race in
  shared-key setups). Without `mp.resource_tracker` doing it for us, we
  own the unlink — `actor. lifetime_stack` is the right place since
  tractor already controls actor lifecycle.

- `tests/test_shm.py`: uncomment-out `subint_forkserver` from the
  module-level skip- list (tests pass now). Inline comment cross-refs
  the two `_mp_bs` / `_shm` workarounds.

- `ai/conc-anal/subint_forkserver_mp_shared_memory_ issue.md`: heavy
  rewrite — flips status from "open / unresolvable in tractor" to
  "resolved, kept as decision record". Adds Resolution section, "Why
  this is the right call" rationale (mp tracker is widely criticized;
  tractor already owns lifecycle), trade-offs (crash-leaked segments,
  lost mp leak warning), verification (7 passed under both
  `subint_forkserver` and `trio` backends), and upstream issue links

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 10:51:28 -04:00
Gud Boi c99d475d03 Document `SharedMemory` × `subint_forkserver` incompat
New `ai/conc-anal/` doc: `mp.SharedMemory` is
fork-without-exec unsafe — child inherits parent's
`resource_tracker` fd → EBADF on first shm op;
leaked `/shm_list` cascades `FileExistsError`
across parametrize variants. Canonical CPython
issue class, NOT a tractor bug. Includes two
longer-term mitigation paths (reset inherited
tracker fd vs migrate off `mp.shared_memory`).

Also, update `tests/test_shm.py`:
- comment out `subint_forkserver` from skip list
- rewrite reason with precise failure-mode
  descriptions + link to the analysis doc

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-26 20:13:24 -04:00
Gud Boi 6d76b60404 Add `tractor-reap` CLI + document auto-reap
New `scripts/tractor-reap` CLI wraps the
`_testing._reap` mod for manual zombie-subactor
cleanup after crashed pytest sessions. Two modes:

- orphan-mode (default): finds PPid==1 procs
  with cwd matching repo root + `python` in
  cmdline.
- descendant-mode (`--parent <pid>`): scoped
  sweep under a still-live supervisor.

SC-polite: SIGINT with bounded grace window
(default 3s) before escalating to SIGKILL.
Exit code signals whether escalation was needed
(useful for CI health-checks).

Also, document both the auto-reap fixture and
the CLI in `/run-tests` SKILL.md (section 10).

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-26 18:04:40 -04:00
Gud Boi eae478f3d5 Add `_testing._reap` + auto-reap fixture
Zombie-subactor cleanup for the test suite, SC-polite discipline
(`SIGINT` first, bounded grace, `SIGKILL` only on survivors). Two parts:
a shared reaper module + an autouse session-end fixture that runs it.

Deats,
- new `tractor/_testing/_reap.py` (+230 LOC) — Linux- only reaper using
  `/proc/<pid>/{status,cwd,cmdline}` inspection. Two detection modes:
  - `find_descendants(parent_pid)` for the in-session case
    (PPid-direct-match while pytest is still alive).
  - `find_orphans(repo_root)` for the CLI / post- mortem case (`PPid==1`
    reparented to init + `cwd` filter to repo root + `python` cmdline
    filter).
- `reap(pids, *, grace=3.0, poll=0.25)` does the signal ladder: SIGINT
  all, poll up to `grace` for exit, SIGKILL any survivors. Returns
  `(signalled, killed)` for caller-side reporting.
- new `_reap_orphaned_subactors` session-scoped autouse fixture in
  `tractor/_testing/pytest.py` — after `yield`, runs
  `find_descendants(os.getpid())` + `reap(...)` so each pytest session
  leaves no surviving forks.
- companion CLI scaffolding lives at `scripts/tractor-reap` (separate
  commit) for the pytest-died-mid-session case where the in-session
  fixture didn't get to run.

Also,
- promote `from tractor.spawn._spawn import SpawnMethodKey` to
  module-top in `pytest.py` (was inline-imported inside
  `pytest_generate_tests`), and reuse it in
  `pytest_collection_modifyitems` to assert each `skipon_spawn_backend`
  mark arg is a valid spawn-method literal — catches typos at collection
  time.
- inline `# ?TODO` flags running these through the `try_set_backend`
  checker for stronger validation.

Cross-refs `feedback_sc_graceful_cancel_first.md` for the
SIGINT-before-SIGKILL discipline rationale.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-25 00:05:58 -04:00
Gud Boi 44bdb1697c Tighten orphan-SIGINT xfail to `strict=True`
Re-classify `test_orphaned_subactor_sigint_cleanup_DRAFT` from
flakey-env-sensitive (`strict=False` w/ "passes in isolation, flakey in
full suite") to a hard known-gap (`strict=True`) with the orphan-SIGINT
hang as the documented cause. The previous framing ("env pollution") let
the test silently pass when ordering happened to favor it; the new
framing forces an XPASS-as-FAIL the moment the underlying gap is
actually closed, so we can drop the mark intentionally instead of
accidentally.

Reason text + leading `# Known-gap test —` comment both point at
`ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md`
for the full diagnosis.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-24 22:48:35 -04:00
14 changed files with 1096 additions and 77 deletions

View File

@ -521,3 +521,105 @@ filling log volume. Full post-mortem in
`ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`. `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
Lesson codified here so future-me grep-finds the Lesson codified here so future-me grep-finds the
workaround before digging. workaround before digging.
## 10. Reaping zombie subactors (`tractor-reap`)
**Symptom:** after a `pytest` run crashes, times out,
or is `Ctrl+C`'d, subactor forks (esp. under
`subint_forkserver`) can be reparented to `init`
(PPid==1) and linger. They hold onto ports, inherit
pytest's capture-pipe fds, and flakify later
sessions.
**Two layers of defense:**
### a) Session-scoped auto-fixture (always on)
`tractor/_testing/pytest.py::_reap_orphaned_subactors`
runs at pytest session teardown. It walks `/proc` for
direct descendants of the pytest pid, SIGINTs them,
waits up to 3s, then SIGKILLs survivors. SC-polite:
gives the subactor runtime a chance to run its trio
cancel shield + IPC teardown before escalation.
This is *autouse* and session-scoped — you don't need
to do anything. It just runs.
### b) `scripts/tractor-reap` CLI (manual reap)
For the **pytest-died-mid-session** case (Ctrl+C, OOM
kill, hung process you had to `kill -9`), the fixture
never ran. Reach for the CLI:
```sh
# default: orphans (PPid==1, cwd==repo, cmd contains python)
scripts/tractor-reap
# descendant-mode: from a still-live supervisor
scripts/tractor-reap --parent <pytest-pid>
# see what would be reaped, don't signal
scripts/tractor-reap -n
# tune the SIGINT → SIGKILL grace window
scripts/tractor-reap --grace 5
```
Exit code: `0` if everyone exited on SIGINT, `1` if
SIGKILL had to escalate — so you can chain it in CI
health-checks (`scripts/tractor-reap || <alert>`).
**What it matches** (orphan-mode):
- `PPid == 1` (reparented to init → definitely
orphaned, not just a currently-running child)
- `cwd == <repo-root>` (keeps the sweep scoped; won't
touch unrelated init-children elsewhere)
- `python` in cmdline
**What it does not do:** kill anything whose PPid is
still a live tractor parent. If the parent is alive
it's not an orphan; use `--parent <pid>` if you need
to force-reap under a still-live supervisor.
**When NOT to run it:** while a pytest session is
active in another terminal. It's safe (won't touch
that session's live children in orphan-mode) but can
race if the target session is mid-teardown.
### c) `--shm` / `--shm-only`: orphan-segment sweep
Because `tractor.ipc._mp_bs.disable_mantracker()`
turns off `mp.resource_tracker` (see
`ai/conc-anal/subint_forkserver_mp_shared_memory_issue.md`),
a hard-crashing actor can leave `/dev/shm/<key>`
segments behind that nothing else GCs.
```sh
# process reap THEN shm sweep
scripts/tractor-reap --shm
# shm sweep only (skip process phase)
scripts/tractor-reap --shm-only
# dry-run: list candidates, don't unlink
scripts/tractor-reap --shm -n
```
**Match criteria** (very conservative — this is a
shared-system path, can't be wrong):
- segment is a regular file under `/dev/shm`,
- owned by the **current uid** (`stat.st_uid`),
- AND **no live process holds it open**
enumerated by walking every readable
`/proc/<pid>/maps` (post-mmap mappings) AND
`/proc/<pid>/fd/*` (pre-mmap shm-opened fds).
The "nobody has it open" check is the
kernel-canonical "is this leaked?" test — same
answer `lsof /dev/shm/<key>` would give. No
reliance on tractor-specific naming, so it works
for any tractor app. Critically, it WILL NOT touch
segments held by other apps you have running
(e.g. `piker`, `lttng-ust-*`, `aja-shm-*`
verified locally with 81 in-use segments correctly
preserved).

View File

@ -0,0 +1,187 @@
# `subint_forkserver` × `multiprocessing.SharedMemory`: fork-inherited `resource_tracker` fd
Surfaced by `tests/test_shm.py` under
`--spawn-backend=subint_forkserver`. Two distinct
failure modes, one root cause:
**`multiprocessing.resource_tracker` is fork-without-exec
unsafe** (canonical CPython class — bpo-38119, bpo-45209).
**Status: resolved by `tractor/ipc/_mp_bs.py` +
`tractor/ipc/_shm.py` changes (see "Resolution" below).
This doc kept as the
post-mortem / decision record.**
## TL;DR
`mp.shared_memory.SharedMemory` registers each shm
allocation with the per-process
`multiprocessing.resource_tracker` singleton. The
tracker is a daemon process started lazily; the
parent owns a unix-pipe-fd to it. When the parent
forks-without-execing into a `subint_forkserver`
child, the child inherits that fd — but it refers to
the *parent's* tracker, which the child has no
business writing to.
Two manifestations under the original (pre-fix) code:
1. **`test_child_attaches_alot`** — child loops 1000×
`attach_shm_list()`. First `mp.SharedMemory` call
in the child triggers
`resource_tracker._ensure_running_and_write`
`_teardown_dead_process``os.close(self._fd)` on
an fd the child should never have touched. Surfaces
as `OSError: [Errno 9] Bad file descriptor`
wrapped in `tractor.RemoteActorError`.
2. **`test_parent_writer_child_reader[*]`** — first
parametrize variant "passes" (with
`resource_tracker: leaked shared_memory` warning)
because nobody ever cleans up `/shm_list`.
Subsequent variants then fail with
`FileExistsError: '/shm_list'` because the leak
persists across the parametrize loop and forkserver
children can't `shm_open(create=True)` an existing
key.
Trio backend (`mp_spawn`-style) doesn't surface this:
each subactor `exec`s a fresh interpreter →
independent resource tracker per subactor → no
inherited-fd issue, and the test's pre-existing leak
gets masked by the per-process tracker reset.
Under `subint_forkserver`, the child is `os.fork()`'d
from a worker thread (no `exec`) → inherits parent's
`mp.resource_tracker._resource_tracker._fd` → EBADF
/ cross-talk on first `mp.SharedMemory` op.
## Resolution
We side-step the broken upstream machinery entirely
rather than try to make it fork-safe. Two-part fix
landed (commits to follow this doc):
### 1. `tractor/ipc/_mp_bs.py::disable_mantracker()`
— unconditional disable
The previous "3.13+ short-circuit" path used
`partial(SharedMemory, track=False)` to opt-out of
registration on 3.13+. The `track=False` switch is
necessary but not sufficient under fork: the
inherited tracker fd can still be touched indirectly
(e.g. through `_ensure_running_and_write`'s
self-check path).
The fix takes both belts AND suspenders:
- **Always** monkey-patch
`mp.resource_tracker._resource_tracker` to a
no-op `ManTracker` subclass whose
`register`/`unregister`/`ensure_running` are all
empty.
- **Always** wrap `SharedMemory` with
`track=False`.
Result: the inherited tracker fd in the fork child
is still inherited (fd is a kernel object; we can't
un-inherit it across fork) but **nothing in the
shm code path will ever try to use it** — both the
tracker singleton and the per-allocation registration
are short-circuited.
### 2. `tractor/ipc/_shm.py::open_shm_list()`
— own the cleanup
Without `mp.resource_tracker`, nobody else will
unlink leaked segments at process exit. tractor
already controls actor lifecycle, so we register
unlink on the actor's lifetime stack:
```python
def try_unlink():
try:
shml.shm.unlink()
except FileNotFoundError as fne:
log.exception(...) # benign sibling-already-cleaned race
actor.lifetime_stack.callback(try_unlink)
```
The `FileNotFoundError` swallow handles the case
where a sibling actor already unlinked the same
segment (legitimate race in shared-key setups).
## Why this is the right call
- **mp's tracker is widely criticized.** The
in-tree comment "non-SC madness" predates this
fix and matches CPython upstream's own discomfort
(e.g. the per-context tracker design rework
discussions in bpo-43475).
- **tractor already owns process lifecycle.** We
have `actor.lifetime_stack`, `Portal.cancel_actor`,
and the IPC cancel cascade. Adding mp's tracker
on top buys nothing we can't do better ourselves.
- **Backend-uniform.** No special-casing per spawn
backend. trio (`mp_spawn`-style), `subint_forkserver`,
and the future `subint` all behave identically
— register-time no-op, exit-time unlink-via-
lifetime-stack.
## Trade-offs / known gaps
- **Crash-leaked segments.** If an actor segfaults
or is `SIGKILL`'d before its lifetime stack runs,
`/dev/shm/<key>` will leak. Mitigation:
`scripts/tractor-reap --shm` walks `/dev/shm`,
filters to segments owned by the current uid that
no live process is mapping or holding open (via
`/proc/*/maps` + `/proc/*/fd/*`), and unlinks
them. The "nobody-has-it-open" filter is
kernel-canonical so it never touches in-flight
segments held by sibling apps (verified locally
against 81 piker/lttng/aja-held segments — all
preserved).
- Higher-level apps using shm should still pin a
UUID into the key (the `'shml_<uuid>'` pattern
in `test_child_attaches_alot`) so concurrent
sessions don't collide on the same key.
- **Cross-actor unlink races.** Two actors holding
the same shm key racing on `unlink()` — handled
by the `FileNotFoundError` swallow.
- **Crashes won't show up in mp's leak warning.**
We've turned off `resource_tracker`, so the usual
`resource_tracker: There appear to be N leaked
shared_memory objects to clean up at shutdown`
warning is gone too. If we ever want it back as
a crash-detection signal, we'd need our own
equivalent (walk the actor's `_shm_list_keys` set
at root teardown, log any unfreed).
## Verification
```sh
# fixed under both backends:
./py314/bin/python -m pytest tests/test_shm.py \
--spawn-backend=subint_forkserver
# 7 passed
./py314/bin/python -m pytest tests/test_shm.py \
--spawn-backend=trio
# 7 passed (regression check)
```
## References
- CPython upstream issues:
- https://bugs.python.org/issue38119 (fork
+ resource_tracker fd inheritance)
- https://bugs.python.org/issue45209
(SharedMemory + resource_tracker)
- https://bugs.python.org/issue43475
(per-context tracker rework discussion)
- Long-term alternative: migrate off
`multiprocessing.shared_memory` entirely to
`posix_ipc` (no tracker) or finish the
`hotbaud`-based ringbuf transport. Not blocked on
this fix — both are independently tracked.

View File

@ -84,6 +84,11 @@ testing = [
# known-hanging `subint`-backend audit tests; see # known-hanging `subint`-backend audit tests; see
# `ai/conc-anal/subint_*_issue.md`). # `ai/conc-anal/subint_*_issue.md`).
"pytest-timeout>=2.3", "pytest-timeout>=2.3",
# used by `tractor._testing._reap` for the
# `tractor-reap` zombie-subactor + leaked-shm
# cleanup utility (xplatform `Process.memory_maps`,
# `Process.open_files`).
"psutil>=7.0.0",
] ]
repl = [ repl = [
"pyperclip>=1.9.0", "pyperclip>=1.9.0",

View File

@ -0,0 +1,182 @@
#!/usr/bin/env python3
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
'''
`tractor-reap` — SC-polite zombie-subactor reaper +
optional `/dev/shm/` orphan-segment sweep.
Two cleanup phases (run in order when both are enabled):
1. **process reap** — finds `tractor` subactor processes
left alive after a `pytest` (or any tractor-app) run
that failed to fully cancel its actor tree, then sends
SIGINT with a bounded grace window before escalating
to SIGKILL.
2. **shm sweep** (`--shm` / `--shm-only`) — unlinks
`/dev/shm/<file>` entries owned by the current uid
that no live process has open (mmap'd or fd-held).
Needed because `tractor` disables
`mp.resource_tracker` (see `tractor.ipc._mp_bs`), so a
hard-crashing actor leaves leaked segments that
nothing else GCs.
Process-reap detection modes (auto-selected):
--parent <pid> : descendant-mode — kill procs whose
PPid == <pid>. Use when a parent
is still alive and you want to
scope the sweep precisely (e.g.
CI wrapper calling in from outside
pytest).
(default) : orphan-mode — kill procs with
PPid==1 (init-reparented) whose
cwd matches the repo root AND
whose cmdline contains `python`.
The cwd filter is what prevents
sweeping unrelated init-children.
Usage:
# process reap only (default)
scripts/tractor-reap
# process reap + shm sweep
scripts/tractor-reap --shm
# only the shm sweep, skip process reap
scripts/tractor-reap --shm-only
# from inside a still-live supervisor
scripts/tractor-reap --parent 12345
# dry-run: list what would be reaped, don't act
scripts/tractor-reap -n
scripts/tractor-reap --shm -n
'''
import argparse
import pathlib
import subprocess
import sys
def _repo_root() -> pathlib.Path:
'''
Use `git rev-parse --show-toplevel` when available;
fall back to the repo this script lives in.
'''
try:
out: str = subprocess.check_output(
['git', 'rev-parse', '--show-toplevel'],
stderr=subprocess.DEVNULL,
text=True,
).strip()
return pathlib.Path(out)
except (subprocess.CalledProcessError, FileNotFoundError):
return pathlib.Path(__file__).resolve().parent.parent
def main() -> int:
parser = argparse.ArgumentParser(
prog='tractor-reap',
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'--parent', '-p',
type=int,
default=None,
help='descendant-mode: reap procs with PPid==<pid>',
)
parser.add_argument(
'--grace', '-g',
type=float,
default=3.0,
help='SIGINT grace window in seconds (default 3.0)',
)
parser.add_argument(
'--dry-run', '-n',
action='store_true',
help='list matched pids/paths but do not signal/unlink',
)
parser.add_argument(
'--shm',
action='store_true',
help=(
'after process reap, also unlink orphaned '
'/dev/shm segments owned by the current user '
'that no live process is mapping or holding open'
),
)
parser.add_argument(
'--shm-only',
action='store_true',
help='skip process reap; only do the shm sweep',
)
args = parser.parse_args()
# import lazily so `--help` doesn't require the tractor
# package to be importable (e.g. when running from a
# shell not inside a venv).
repo = _repo_root()
sys.path.insert(0, str(repo))
from tractor._testing._reap import (
find_descendants,
find_orphans,
find_orphaned_shm,
reap,
reap_shm,
)
rc: int = 0
# --- phase 1: process reap (skipped under --shm-only) ---
if not args.shm_only:
if args.parent is not None:
pids: list[int] = find_descendants(args.parent)
mode: str = f'descendants of PPid={args.parent}'
else:
pids = find_orphans(repo)
mode = f'orphans (PPid=1, cwd={repo})'
if not pids:
print(f'[tractor-reap] no {mode} to reap')
elif args.dry_run:
print(
f'[tractor-reap] dry-run — {mode}:\n {pids}'
)
else:
_, survivors = reap(pids, grace=args.grace)
if survivors:
rc = 1
# --- phase 2: shm sweep (opt-in) ---
if args.shm or args.shm_only:
leaked: list[str] = find_orphaned_shm()
if not leaked:
print(
'[tractor-reap] no orphaned /dev/shm '
'segments to sweep'
)
elif args.dry_run:
print(
f'[tractor-reap] dry-run — {len(leaked)} '
f'orphaned shm segment(s):\n {leaked}'
)
else:
_, errors = reap_shm(leaked)
if errors:
rc = 1
# exit 0 if everything cleaned cleanly, else 1 — useful
# for CI health-check chaining.
return rc
if __name__ == '__main__':
raise SystemExit(main())

View File

@ -520,6 +520,10 @@ async def kill_transport(
# ?TODO, do a OSc style signalling test on this?
# -[ ] doesn't work for fork backends
# @pytest.mark.parametrize('use_signal', [False, True])
#
# Wall-clock bound via `pytest-timeout` (`method='thread'`). # Wall-clock bound via `pytest-timeout` (`method='thread'`).
# Under `--spawn-backend=subint` this test can wedge in an # Under `--spawn-backend=subint` this test can wedge in an
# un-Ctrl-C-able state (abandoned-subint + shared-GIL # un-Ctrl-C-able state (abandoned-subint + shared-GIL
@ -532,19 +536,21 @@ async def kill_transport(
# the intended behavior here; the alternative is an unattended # the intended behavior here; the alternative is an unattended
# suite run that never returns. # suite run that never returns.
@pytest.mark.timeout( @pytest.mark.timeout(
3, # NOTE should be a 2.1s happy path. 30,
# NOTE should be a 2.1s happy path.
# XXX for `subint_forkserver` this is SUPER SENSITIVE so keep it
# higher to avoid flaky runs..
method='thread', method='thread',
) )
@pytest.mark.skipon_spawn_backend( @pytest.mark.skipon_spawn_backend(
'subint', 'subint',
# 'subint_forkserver',
reason=( reason=(
'XXX SUBINT HANGING TEST XXX\n' 'XXX SUBINT HANGING TEST XXX\n'
'See oustanding issue(s)\n' 'See oustanding issue(s)\n'
# TODO, put issue link! # TODO, put issue link!
) )
) )
# @pytest.mark.parametrize('use_signal', [False, True])
#
def test_stale_entry_is_deleted( def test_stale_entry_is_deleted(
debug_mode: bool, debug_mode: bool,
daemon: subprocess.Popen, daemon: subprocess.Popen,
@ -558,7 +564,6 @@ def test_stale_entry_is_deleted(
''' '''
async def main(): async def main():
name: str = 'transport_fails_actor' name: str = 'transport_fails_actor'
_reg_ptl: tractor.Portal _reg_ptl: tractor.Portal
an: tractor.ActorNursery an: tractor.ActorNursery
@ -591,6 +596,14 @@ def test_stale_entry_is_deleted(
await ptl.cancel_actor() await ptl.cancel_actor()
await an.cancel() await an.cancel()
# XXX, for tracing if this starts being flaky again..
#
# async def _timeout_main():
# with trio.move_on_after(4) as cs:
# await main()
# if cs.cancel_called:
# await tractor.pause()
# TODO, remove once the `[subint]` variant no longer hangs. # TODO, remove once the `[subint]` variant no longer hangs.
# #
# Status (as of Phase B hard-kill landing): # Status (as of Phase B hard-kill landing):
@ -641,3 +654,4 @@ def test_stale_entry_is_deleted(
path=f'/tmp/test_stale_entry_is_deleted_{start_method}.dump', path=f'/tmp/test_stale_entry_is_deleted_{start_method}.dump',
): ):
trio.run(main) trio.run(main)
# trio.run(_timeout_main)

View File

@ -446,20 +446,19 @@ def _process_alive(pid: int) -> bool:
return False return False
# Flakey under session-level env pollution (leftover # Known-gap test — `subint_forkserver` orphan-SIGINT
# subactor PIDs from earlier tests competing for ports / # handling. See
# inheriting the harness subprocess's FDs). Passes
# cleanly in isolation, fails in suite; `strict=False`
# so either outcome is tolerated until the env isolation
# is improved. Tracker:
# `ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md`. # `ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md`.
# `strict=True` so if a future fix closes the gap the
# XPASS surfaces as a FAIL and forces us to drop the
# mark intentionally.
@pytest.mark.xfail( @pytest.mark.xfail(
strict=False, strict=True,
reason=( reason=(
'Env-pollution sensitive. Passes in isolation, ' 'Orphan subactor SIGINT delivery: trio event loop '
'flakey in full-suite runs; orphan subactor may ' 'on non-main thread post-fork doesn\'t see the '
'take longer than 10s to exit when competing for ' 'external SIGINT → KBI path. See tracker doc.\n'
'resources with leftover state from earlier tests.' 'ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md'
), ),
) )
@pytest.mark.timeout( @pytest.mark.timeout(

View File

@ -76,9 +76,7 @@ async def subscribe(
async def consumer( async def consumer(
subs: list[str], subs: list[str],
) -> None: ) -> None:
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
@ -108,15 +106,21 @@ async def consumer(
print(f'{uid} got: {value}') print(f'{uid} got: {value}')
def test_dynamic_pub_sub(): def test_dynamic_pub_sub(
reg_addr: tuple,
debug_mode: bool,
test_log: tractor.log.StackLevelAdapter,
):
global _registry global _registry
from multiprocessing import cpu_count from multiprocessing import cpu_count
cpus = cpu_count() cpus = cpu_count()
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
# name of this actor will be same as target func # name of this actor will be same as target func
await n.run_in_actor(publisher) await n.run_in_actor(publisher)
@ -155,12 +159,13 @@ def test_dynamic_pub_sub():
else: else:
pytest.fail('Never got a `TooSlowError` ?') pytest.fail('Never got a `TooSlowError` ?')
assert err
test_log.exception('Timed out AS EXPECTED')
@tractor.context @tractor.context
async def one_task_streams_and_one_handles_reqresp( async def one_task_streams_and_one_handles_reqresp(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
await ctx.started() await ctx.started()
@ -257,7 +262,8 @@ async def echo_ctx_stream(
def test_sigint_both_stream_types(): def test_sigint_both_stream_types():
'''Verify that running a bi-directional and recv only stream '''
Verify that running a bi-directional and recv only stream
side-by-side will cancel correctly from SIGINT. side-by-side will cancel correctly from SIGINT.
''' '''

View File

@ -115,10 +115,12 @@ async def not_started_but_stream_opened(
) )
def test_started_misuse( def test_started_misuse(
target: Callable, target: Callable,
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
@ -184,6 +186,7 @@ def test_simple_context(
error_parent, error_parent,
child_blocks_forever, child_blocks_forever,
pointlessly_open_stream, pointlessly_open_stream,
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
@ -193,6 +196,7 @@ def test_simple_context(
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
@ -278,6 +282,7 @@ def test_parent_cancels(
cancel_method: str, cancel_method: str,
chk_ctx_result_before_exit: bool, chk_ctx_result_before_exit: bool,
child_returns_early: bool, child_returns_early: bool,
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -355,6 +360,7 @@ def test_parent_cancels(
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
@ -931,6 +937,7 @@ async def keep_sending_from_child(
) )
def test_one_end_stream_not_opened( def test_one_end_stream_not_opened(
overrun_by: tuple[str, int, Callable], overrun_by: tuple[str, int, Callable],
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -949,6 +956,7 @@ def test_one_end_stream_not_opened(
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
@ -1113,6 +1121,7 @@ def test_maybe_allow_overruns_stream(
# conftest wide # conftest wide
loglevel: str, loglevel: str,
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -1133,6 +1142,7 @@ def test_maybe_allow_overruns_stream(
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
@ -1249,6 +1259,7 @@ def test_maybe_allow_overruns_stream(
def test_ctx_with_self_actor( def test_ctx_with_self_actor(
loglevel: str, loglevel: str,
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -1263,6 +1274,7 @@ def test_ctx_with_self_actor(
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode, debug_mode=debug_mode,
enable_modules=[__name__], enable_modules=[__name__],
) as an: ) as an:

View File

@ -16,14 +16,22 @@ from tractor.ipc._shm import (
pytestmark = pytest.mark.skipon_spawn_backend( pytestmark = pytest.mark.skipon_spawn_backend(
'subint', 'subint',
'subint_forkserver', # 'subint_forkserver',
# XXX we hack around this stdlib limitation by both,
# - setting `ShareMemory(track=False)`
# - overriding the `mp.ResourceTracker` nonsense in
# `.ipc._mp_bs`.
reason=( reason=(
'subint: GIL-contention hanging class.\n' 'subint: GIL-contention hanging class.\n'
'subint_forkserver: `multiprocessing.SharedMemory` ' 'subint_forkserver: `multiprocessing.SharedMemory` '
'has known issues with fork-without-exec (mp\'s ' 'is fork-without-exec unsafe — child inherits parent\'s '
'resource_tracker and SharedMemory internals assume ' '`resource_tracker` fd → EBADF on first shm op '
'fresh-process state). RemoteActorError surfaces from ' '(`test_child_attaches_alot`); leaked `/shm_list` from '
'the shm-attach path. TODO, put issue link!\n' 'a "passing" run cascades into `FileExistsError` across '
'parametrize variants (`test_parent_writer_child_reader`). '
'Canonical CPython issue class, NOT a tractor bug; full '
'tracker doc:\n'
'ai/conc-anal/subint_forkserver_mp_shared_memory_issue.md'
) )
) )

View File

@ -0,0 +1,462 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Zombie-subactor reaper SC-polite (SIGINT first, SIGKILL
as last resort with a bounded grace window) plus optional
`/dev/shm/` orphan-segment sweep.
Shared implementation between the `tractor-reap` CLI
(`scripts/tractor-reap`) and the pytest session-scoped
auto-fixture that guards the test suite against leftover
subactor processes.
Design notes process reap
---------------------------
- Linux-only today: reads `/proc/<pid>/{status,cwd,cmdline}`.
Module imports cleanly elsewhere; calling `find_*` on a
non-Linux box returns an empty list (no `/proc`
enumeration). A future xplatform pass could swap this
for `psutil.Process.children()` /
`psutil.process_iter()` since `psutil` is already a
test-time dependency.
- Two detection modes:
1. **descendant-mode** when invoked from a still-live
parent (e.g. a pytest session-end fixture), match by
`PPid == parent_pid`. Direct + precise; the target
PIDs are still reparented to the live pytest process
at teardown time, before pytest exits.
2. **orphan-mode** when invoked after the parent died
(e.g. the `tractor-reap` CLI run post-Ctrl+C), match
by `PPid == 1` (reparented to init) AND `cwd ==
<repo-root>` AND cmdline contains `python`. The cwd
filter is what keeps the heuristic from sweeping up
unrelated init-children on the box.
- Escalation: for every matched PID, SIGINT, poll for up
to `grace` seconds, then SIGKILL any survivors. The
two-phase pattern is the SC-graceful-cancel discipline
documented in `feedback_sc_graceful_cancel_first.md`
we want the subactor runtime to run its trio cancel
shield + IPC teardown paths where it can.
Design notes shm sweep
------------------------
Since `tractor/ipc/_mp_bs.disable_mantracker()` turns off
`mp.resource_tracker` entirely, a hard-crashing actor can
leave `/dev/shm/<key>` segments behind that nothing else
GCs (see
`ai/conc-anal/subint_forkserver_mp_shared_memory_issue.md`,
"Trade-offs / known gaps").
The shm sweep is **Linux-/FreeBSD-only**: both expose
POSIX shared-memory segments as regular files under
`/dev/shm`, so `os.stat()` + `os.unlink()` are the
correct primitives. macOS POSIX shm has no fs-visible
path (segments live behind `shm_open`/`shm_unlink`
syscalls only), and Windows is a different story
entirely. Calling the shm helpers on an unsupported
platform raises `NotImplementedError`.
In-use enumeration delegates to `psutil`
`Process.memory_maps()` (post-mmap) +
`Process.open_files()` (pre-mmap shm-opened fds)
xplatform, mature, and handles the per-process
permission/race edge cases correctly. Segments matching
neither are genuinely leaked safe to unlink.
The "nobody has it open" check is the kernel-canonical
test same answer `lsof /dev/shm/<key>` would give. No
reliance on tractor-specific naming conventions (shm
keys are caller-defined).
'''
from __future__ import annotations
import os
import pathlib
import signal
import stat
import sys
import time
# `/dev/shm` is the POSIX-shm filesystem on Linux + FreeBSD.
# macOS uses `shm_open` syscalls without a fs-visible path,
# so the shm helpers refuse to run there.
_SHM_PLATFORM_OK: bool = sys.platform.startswith(
('linux', 'freebsd')
)
SHM_DIR: str = '/dev/shm'
def _ensure_shm_supported() -> None:
'''
Guard for shm helpers they assume `/dev/shm` exists
as a tmpfs and `os.unlink()` is the right primitive.
Both true on Linux + FreeBSD; not true elsewhere.
'''
if not _SHM_PLATFORM_OK:
raise NotImplementedError(
f'shm reap is only supported on Linux/FreeBSD; '
f'got sys.platform={sys.platform!r}. macOS '
f'POSIX shm has no fs-visible path; Windows '
f'has no /dev/shm equivalent.'
)
def _read_status_ppid(pid: int) -> int | None:
'''
Return the parent-pid from `/proc/<pid>/status` or
`None` if the proc went away / is unreadable.
'''
try:
with open(f'/proc/{pid}/status') as f:
for line in f:
if line.startswith('PPid:'):
return int(line.split()[1])
except (
FileNotFoundError,
PermissionError,
ProcessLookupError,
):
return None
return None
def _read_cwd(pid: int) -> str | None:
try:
return os.readlink(f'/proc/{pid}/cwd')
except (
FileNotFoundError,
PermissionError,
ProcessLookupError,
):
return None
def _read_cmdline(pid: int) -> str:
try:
with open(f'/proc/{pid}/cmdline', 'rb') as f:
return f.read().replace(b'\0', b' ').decode(
errors='replace',
)
except (
FileNotFoundError,
PermissionError,
ProcessLookupError,
):
return ''
def _iter_live_pids() -> list[int]:
'''
Enumerate currently-alive pids from `/proc`. Returns
`[]` on systems without `/proc` (e.g. macOS).
'''
try:
entries: list[str] = os.listdir('/proc')
except OSError:
return []
return [int(e) for e in entries if e.isdigit()]
def find_descendants(
parent_pid: int,
) -> list[int]:
'''
PIDs whose `PPid == parent_pid` i.e. direct
children of the given pid. Used by the pytest
session-end fixture where `parent_pid` is still
alive as the pytest-python process.
'''
return [
pid
for pid in _iter_live_pids()
if _read_status_ppid(pid) == parent_pid
]
def find_orphans(
repo_root: pathlib.Path,
) -> list[int]:
'''
PIDs that are:
- reparented to init (`PPid == 1`),
- have `cwd == <repo_root>`,
- and have a `python` in their cmdline.
This is the "pytest-died-mid-session" case where the
subactor forks got reparented. The cwd filter is the
critical bit that keeps us from sweeping up unrelated
init-children on the box.
'''
repo: str = str(repo_root)
hits: list[int] = []
for pid in _iter_live_pids():
if _read_status_ppid(pid) != 1:
continue
cwd: str | None = _read_cwd(pid)
if cwd != repo:
continue
cmd: str = _read_cmdline(pid)
if 'python' not in cmd:
continue
hits.append(pid)
return hits
def reap(
pids: list[int],
*,
grace: float = 3.0,
poll: float = 0.25,
log=print,
) -> tuple[list[int], list[int]]:
'''
Deliver SIGINT to each pid, wait up to `grace`
seconds for them to exit, then SIGKILL any that
survive.
Returns `(signalled, survivors_killed)` so callers
can report / assert.
`log` is the logger function for user-visible
progress lines default `print`; pytest fixture
swaps it for a `pytest`-friendly writer.
'''
if not pids:
return ([], [])
signalled: list[int] = []
for pid in pids:
try:
os.kill(pid, signal.SIGINT)
signalled.append(pid)
except ProcessLookupError:
# raced — already gone
pass
if signalled:
log(
f'[tractor-reap] SIGINT → {len(signalled)} '
f'proc(s): {signalled}'
)
deadline: float = time.monotonic() + grace
while time.monotonic() < deadline:
time.sleep(poll)
alive: list[int] = [
pid for pid in signalled if _is_alive(pid)
]
if not alive:
return (signalled, [])
survivors: list[int] = [
pid for pid in signalled if _is_alive(pid)
]
if survivors:
log(
f'[tractor-reap] SIGKILL (after {grace}s '
f'grace) → {survivors}'
)
for pid in survivors:
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
return (signalled, survivors)
def _is_alive(pid: int) -> bool:
'''
True iff `/proc/<pid>` still exists AND the proc
isn't already a zombie (Z state).
'''
try:
with open(f'/proc/{pid}/status') as f:
for line in f:
if line.startswith('State:'):
# e.g. 'State:\tZ (zombie)'
return 'Z' not in line.split()[1]
except (
FileNotFoundError,
ProcessLookupError,
):
return False
return True
def _enumerate_in_use_shm(
shm_dir: str = SHM_DIR,
) -> set[str]:
'''
Return the set of `<shm_dir>/<file>` paths currently
held open by any live process via `psutil`'s
xplatform `Process.memory_maps()` (post-mmap
segments) and `Process.open_files()` (pre-mmap
shm-opened fds).
Lazy-imports `psutil` so the module stays importable
on installs without it (it's a `testing` group dep).
'''
_ensure_shm_supported()
# lazy + actionable failure: leaked shm sweep is the
# only thing in this module that needs psutil; we
# don't want a top-level ImportError breaking the
# process-reap path.
try:
import psutil
except ImportError as exc:
raise RuntimeError(
'shm reap requires `psutil` — install the '
'`testing` dep group, e.g. '
'`uv sync --group testing`.'
) from exc
in_use: set[str] = set()
prefix: str = shm_dir.rstrip('/') + '/'
for proc in psutil.process_iter(['pid']):
try:
for m in proc.memory_maps(grouped=False):
if m.path.startswith(prefix):
in_use.add(m.path)
for f in proc.open_files():
if f.path.startswith(prefix):
in_use.add(f.path)
except (
psutil.NoSuchProcess,
psutil.AccessDenied,
psutil.ZombieProcess,
FileNotFoundError,
PermissionError,
):
# raced — proc died or we can't see its
# mappings (e.g. root-owned). Skip; missing
# an in-use entry only means we'd preserve
# something we could reap, never the
# reverse — safe-by-default.
continue
return in_use
def find_orphaned_shm(
*,
uid: int | None = None,
shm_dir: str = SHM_DIR,
) -> list[str]:
'''
`<shm_dir>/<file>` paths that are:
- owned by `uid` (default: the current effective uid),
- and currently held by NO live process i.e.
genuinely leaked.
Linux/FreeBSD only see module docstring. No reliance
on caller-defined shm-key naming, so this works for
any tractor app (not just the test suite).
'''
_ensure_shm_supported()
if uid is None:
uid = os.geteuid()
try:
entries: list[str] = os.listdir(shm_dir)
except OSError:
return []
in_use: set[str] = _enumerate_in_use_shm(shm_dir=shm_dir)
leaked: list[str] = []
prefix: str = shm_dir.rstrip('/') + '/'
for entry in entries:
path: str = prefix + entry
try:
st: os.stat_result = os.stat(path)
except OSError:
continue
# only regular files — skip subdirs / sockets etc.
if not stat.S_ISREG(st.st_mode):
continue
if st.st_uid != uid:
continue
if path in in_use:
continue
leaked.append(path)
return leaked
def reap_shm(
paths: list[str],
*,
log=print,
) -> tuple[list[str], list[tuple[str, OSError]]]:
'''
Unlink the given `/dev/shm/...` paths.
Linux/FreeBSD only `os.unlink()` is the correct
primitive on the POSIX-shm tmpfs there. macOS POSIX
shm has no fs-visible path; the equivalent there is
`posix_ipc.unlink_shared_memory(name)` (not
implemented here see module docstring).
Returns `(unlinked, errors)` where `errors` is a list
of `(path, exc)` for paths that could not be removed
(e.g. permissions). Paths that raced to being already-
gone are counted as successfully unlinked.
'''
_ensure_shm_supported()
unlinked: list[str] = []
errors: list[tuple[str, OSError]] = []
for path in paths:
try:
os.unlink(path)
unlinked.append(path)
except FileNotFoundError:
# raced — already gone, treat as success
unlinked.append(path)
except OSError as exc:
errors.append((path, exc))
if unlinked:
log(
f'[tractor-reap] unlinked {len(unlinked)} '
f'orphaned shm segment(s): {unlinked}'
)
for path, exc in errors:
log(
f'[tractor-reap] could not unlink {path}: '
f'{exc!r}'
)
return (unlinked, errors)

View File

@ -32,6 +32,7 @@ from typing import (
import pytest import pytest
import tractor import tractor
from tractor.spawn._spawn import SpawnMethodKey
import trio import trio
@ -274,7 +275,12 @@ def pytest_collection_modifyitems(
default_reason: str = f'Borked on --spawn-backend={backend!r}' default_reason: str = f'Borked on --spawn-backend={backend!r}'
for item in items: for item in items:
for mark in item.iter_markers(name='skipon_spawn_backend'): for mark in item.iter_markers(name='skipon_spawn_backend'):
if backend in mark.args: skip_backends: tuple[str] = mark.args
for skip_backend in skip_backends:
assert skip_backend in get_args(SpawnMethodKey)
# ?TODO, run these through the try-set-backend checker to
# avoid typos?
if backend in skip_backends:
reason: str = mark.kwargs.get( reason: str = mark.kwargs.get(
'reason', 'reason',
default_reason, default_reason,
@ -285,6 +291,42 @@ def pytest_collection_modifyitems(
break break
@pytest.fixture(
scope='session',
autouse=True,
)
def _reap_orphaned_subactors():
'''
Session-scoped autouse fixture: after the whole test
session finishes, SIGINT any subactor processes still
parented to this `pytest` process, wait a bounded
grace window, then SIGKILL survivors.
Rationale: under fork-based spawn backends (notably
`subint_forkserver`), a test that times out or bails
mid-teardown can leave subactor forks alive. Without
this reap, they linger across sessions and compete
for ports / inherit pytest's capture-pipe fds — which
flakifies later tests. SC-polite discipline: SIGINT
first to let the subactor's trio cancel shield + IPC
teardown paths run before we escalate.
Matching companion CLI: `scripts/tractor-reap` for
the pytest-died-mid-session case.
'''
import os
parent_pid: int = os.getpid()
yield
from tractor._testing._reap import (
find_descendants,
reap,
)
pids: list[int] = find_descendants(parent_pid)
if pids:
reap(pids, grace=3.0)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def debug_mode( def debug_mode(
request: pytest.FixtureRequest, request: pytest.FixtureRequest,
@ -398,7 +440,6 @@ def pytest_generate_tests(
# drive the valid-backend set from the canonical `Literal` so # drive the valid-backend set from the canonical `Literal` so
# adding a new spawn backend (e.g. `'subint'`) doesn't require # adding a new spawn backend (e.g. `'subint'`) doesn't require
# touching the harness. # touching the harness.
from tractor.spawn._spawn import SpawnMethodKey
assert spawn_backend in get_args(SpawnMethodKey) assert spawn_backend in get_args(SpawnMethodKey)
# NOTE: used-to-be-used-to dyanmically parametrize tests for when # NOTE: used-to-be-used-to dyanmically parametrize tests for when

View File

@ -17,7 +17,7 @@
Utils to tame mp non-SC madeness Utils to tame mp non-SC madeness
''' '''
import platform from functools import partial
def disable_mantracker(): def disable_mantracker():
@ -27,49 +27,37 @@ def disable_mantracker():
''' '''
from multiprocessing.shared_memory import SharedMemory from multiprocessing.shared_memory import SharedMemory
from multiprocessing import (
resource_tracker as mantracker,
)
# XXX ALWAYS disable the stdlib's "resource tracker"; it prevents
# fork backends and never was useful to us since we're SC
# lifetime managing all allocations.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# 3.13+ only.. can pass `track=False` to disable # 3.13+ only.. can pass `track=False` to disable
# all the resource tracker bs. # all the resource tracker bs.
# https://docs.python.org/3/library/multiprocessing.shared_memory.html # https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := ( shmT = partial(
platform.python_version_tuple()[:-1] SharedMemory,
>= track=False,
('3', '13') )
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
# !TODO, once we drop 3.12- we can obvi remove all this!
else:
from multiprocessing import (
resource_tracker as mantracker,
)
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT return shmT

View File

@ -929,15 +929,26 @@ def open_shm_list(
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close) actor.lifetime_stack.callback(shml.shm.close)
# XXX on 3.13+ we don't need to call this? # >XXX NOTE< on 3.13+ we need to call this AS WELL AS pass
# -> bc we pass `track=False` for `SharedMemeory` orr? # `track=False` for `mp.SharedMemeory` otherwise fork based
if ( # backends will error out due to long lived stdlib
platform.python_version_tuple()[:-1] < ('3', '13') # limitations,
): # - https://bugs.python.org/issue38119
actor.lifetime_stack.callback(shml.shm.unlink) # - https://bugs.python.org/issue45209
#
def try_unlink():
try:
shml.shm.unlink()
except FileNotFoundError as fne:
log.debug(
f'ShmList already deallocated pre-actor-shutdown.\n'
f'{fne!r}\n'
)
actor.lifetime_stack.callback(try_unlink)
except RuntimeError: except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps') log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -716,6 +716,7 @@ sync-pause = [
] ]
testing = [ testing = [
{ name = "pexpect" }, { name = "pexpect" },
{ name = "psutil" },
{ name = "pytest" }, { name = "pytest" },
{ name = "pytest-timeout" }, { name = "pytest-timeout" },
] ]
@ -761,6 +762,7 @@ subints = [{ name = "msgspec", marker = "python_full_version >= '3.14'", specifi
sync-pause = [{ name = "greenback", marker = "python_full_version == '3.13.*'", specifier = ">=1.2.1,<2" }] sync-pause = [{ name = "greenback", marker = "python_full_version == '3.13.*'", specifier = ">=1.2.1,<2" }]
testing = [ testing = [
{ name = "pexpect", specifier = ">=4.9.0,<5" }, { name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pytest", specifier = ">=8.3.5" }, { name = "pytest", specifier = ">=8.3.5" },
{ name = "pytest-timeout", specifier = ">=2.3" }, { name = "pytest-timeout", specifier = ">=2.3" },
] ]