Compare commits
9 Commits
2ca0f41e61
...
66f1941f46
| Author | SHA1 | Date |
|---|---|---|
|
|
66f1941f46 | |
|
|
9b05f659b3 | |
|
|
65fcfbf224 | |
|
|
4f12d69b41 | |
|
|
aa3e230926 | |
|
|
c99d475d03 | |
|
|
6d76b60404 | |
|
|
eae478f3d5 | |
|
|
44bdb1697c |
|
|
@ -521,3 +521,105 @@ filling log volume. Full post-mortem in
|
|||
`ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`.
|
||||
Lesson codified here so future-me grep-finds the
|
||||
workaround before digging.
|
||||
|
||||
## 10. Reaping zombie subactors (`tractor-reap`)
|
||||
|
||||
**Symptom:** after a `pytest` run crashes, times out,
|
||||
or is `Ctrl+C`'d, subactor forks (esp. under
|
||||
`subint_forkserver`) can be reparented to `init`
|
||||
(PPid==1) and linger. They hold onto ports, inherit
|
||||
pytest's capture-pipe fds, and flakify later
|
||||
sessions.
|
||||
|
||||
**Two layers of defense:**
|
||||
|
||||
### a) Session-scoped auto-fixture (always on)
|
||||
|
||||
`tractor/_testing/pytest.py::_reap_orphaned_subactors`
|
||||
runs at pytest session teardown. It walks `/proc` for
|
||||
direct descendants of the pytest pid, SIGINTs them,
|
||||
waits up to 3s, then SIGKILLs survivors. SC-polite:
|
||||
gives the subactor runtime a chance to run its trio
|
||||
cancel shield + IPC teardown before escalation.
|
||||
|
||||
This is *autouse* and session-scoped — you don't need
|
||||
to do anything. It just runs.
|
||||
|
||||
### b) `scripts/tractor-reap` CLI (manual reap)
|
||||
|
||||
For the **pytest-died-mid-session** case (Ctrl+C, OOM
|
||||
kill, hung process you had to `kill -9`), the fixture
|
||||
never ran. Reach for the CLI:
|
||||
|
||||
```sh
|
||||
# default: orphans (PPid==1, cwd==repo, cmd contains python)
|
||||
scripts/tractor-reap
|
||||
|
||||
# descendant-mode: from a still-live supervisor
|
||||
scripts/tractor-reap --parent <pytest-pid>
|
||||
|
||||
# see what would be reaped, don't signal
|
||||
scripts/tractor-reap -n
|
||||
|
||||
# tune the SIGINT → SIGKILL grace window
|
||||
scripts/tractor-reap --grace 5
|
||||
```
|
||||
|
||||
Exit code: `0` if everyone exited on SIGINT, `1` if
|
||||
SIGKILL had to escalate — so you can chain it in CI
|
||||
health-checks (`scripts/tractor-reap || <alert>`).
|
||||
|
||||
**What it matches** (orphan-mode):
|
||||
- `PPid == 1` (reparented to init → definitely
|
||||
orphaned, not just a currently-running child)
|
||||
- `cwd == <repo-root>` (keeps the sweep scoped; won't
|
||||
touch unrelated init-children elsewhere)
|
||||
- `python` in cmdline
|
||||
|
||||
**What it does not do:** kill anything whose PPid is
|
||||
still a live tractor parent. If the parent is alive
|
||||
it's not an orphan; use `--parent <pid>` if you need
|
||||
to force-reap under a still-live supervisor.
|
||||
|
||||
**When NOT to run it:** while a pytest session is
|
||||
active in another terminal. It's safe (won't touch
|
||||
that session's live children in orphan-mode) but can
|
||||
race if the target session is mid-teardown.
|
||||
|
||||
### c) `--shm` / `--shm-only`: orphan-segment sweep
|
||||
|
||||
Because `tractor.ipc._mp_bs.disable_mantracker()`
|
||||
turns off `mp.resource_tracker` (see
|
||||
`ai/conc-anal/subint_forkserver_mp_shared_memory_issue.md`),
|
||||
a hard-crashing actor can leave `/dev/shm/<key>`
|
||||
segments behind that nothing else GCs.
|
||||
|
||||
```sh
|
||||
# process reap THEN shm sweep
|
||||
scripts/tractor-reap --shm
|
||||
|
||||
# shm sweep only (skip process phase)
|
||||
scripts/tractor-reap --shm-only
|
||||
|
||||
# dry-run: list candidates, don't unlink
|
||||
scripts/tractor-reap --shm -n
|
||||
```
|
||||
|
||||
**Match criteria** (very conservative — this is a
|
||||
shared-system path, can't be wrong):
|
||||
- segment is a regular file under `/dev/shm`,
|
||||
- owned by the **current uid** (`stat.st_uid`),
|
||||
- AND **no live process holds it open** —
|
||||
enumerated by walking every readable
|
||||
`/proc/<pid>/maps` (post-mmap mappings) AND
|
||||
`/proc/<pid>/fd/*` (pre-mmap shm-opened fds).
|
||||
|
||||
The "nobody has it open" check is the
|
||||
kernel-canonical "is this leaked?" test — same
|
||||
answer `lsof /dev/shm/<key>` would give. No
|
||||
reliance on tractor-specific naming, so it works
|
||||
for any tractor app. Critically, it WILL NOT touch
|
||||
segments held by other apps you have running
|
||||
(e.g. `piker`, `lttng-ust-*`, `aja-shm-*` —
|
||||
verified locally with 81 in-use segments correctly
|
||||
preserved).
|
||||
|
|
|
|||
|
|
@ -0,0 +1,187 @@
|
|||
# `subint_forkserver` × `multiprocessing.SharedMemory`: fork-inherited `resource_tracker` fd
|
||||
|
||||
Surfaced by `tests/test_shm.py` under
|
||||
`--spawn-backend=subint_forkserver`. Two distinct
|
||||
failure modes, one root cause:
|
||||
**`multiprocessing.resource_tracker` is fork-without-exec
|
||||
unsafe** (canonical CPython class — bpo-38119, bpo-45209).
|
||||
|
||||
**Status: resolved by `tractor/ipc/_mp_bs.py` +
|
||||
`tractor/ipc/_shm.py` changes (see "Resolution" below).
|
||||
This doc kept as the
|
||||
post-mortem / decision record.**
|
||||
|
||||
## TL;DR
|
||||
|
||||
`mp.shared_memory.SharedMemory` registers each shm
|
||||
allocation with the per-process
|
||||
`multiprocessing.resource_tracker` singleton. The
|
||||
tracker is a daemon process started lazily; the
|
||||
parent owns a unix-pipe-fd to it. When the parent
|
||||
forks-without-execing into a `subint_forkserver`
|
||||
child, the child inherits that fd — but it refers to
|
||||
the *parent's* tracker, which the child has no
|
||||
business writing to.
|
||||
|
||||
Two manifestations under the original (pre-fix) code:
|
||||
|
||||
1. **`test_child_attaches_alot`** — child loops 1000×
|
||||
`attach_shm_list()`. First `mp.SharedMemory` call
|
||||
in the child triggers
|
||||
`resource_tracker._ensure_running_and_write` →
|
||||
`_teardown_dead_process` → `os.close(self._fd)` on
|
||||
an fd the child should never have touched. Surfaces
|
||||
as `OSError: [Errno 9] Bad file descriptor`
|
||||
wrapped in `tractor.RemoteActorError`.
|
||||
|
||||
2. **`test_parent_writer_child_reader[*]`** — first
|
||||
parametrize variant "passes" (with
|
||||
`resource_tracker: leaked shared_memory` warning)
|
||||
because nobody ever cleans up `/shm_list`.
|
||||
Subsequent variants then fail with
|
||||
`FileExistsError: '/shm_list'` because the leak
|
||||
persists across the parametrize loop and forkserver
|
||||
children can't `shm_open(create=True)` an existing
|
||||
key.
|
||||
|
||||
Trio backend (`mp_spawn`-style) doesn't surface this:
|
||||
each subactor `exec`s a fresh interpreter →
|
||||
independent resource tracker per subactor → no
|
||||
inherited-fd issue, and the test's pre-existing leak
|
||||
gets masked by the per-process tracker reset.
|
||||
|
||||
Under `subint_forkserver`, the child is `os.fork()`'d
|
||||
from a worker thread (no `exec`) → inherits parent's
|
||||
`mp.resource_tracker._resource_tracker._fd` → EBADF
|
||||
/ cross-talk on first `mp.SharedMemory` op.
|
||||
|
||||
## Resolution
|
||||
|
||||
We side-step the broken upstream machinery entirely
|
||||
rather than try to make it fork-safe. Two-part fix
|
||||
landed (commits to follow this doc):
|
||||
|
||||
### 1. `tractor/ipc/_mp_bs.py::disable_mantracker()`
|
||||
— unconditional disable
|
||||
|
||||
The previous "3.13+ short-circuit" path used
|
||||
`partial(SharedMemory, track=False)` to opt-out of
|
||||
registration on 3.13+. The `track=False` switch is
|
||||
necessary but not sufficient under fork: the
|
||||
inherited tracker fd can still be touched indirectly
|
||||
(e.g. through `_ensure_running_and_write`'s
|
||||
self-check path).
|
||||
|
||||
The fix takes both belts AND suspenders:
|
||||
|
||||
- **Always** monkey-patch
|
||||
`mp.resource_tracker._resource_tracker` to a
|
||||
no-op `ManTracker` subclass whose
|
||||
`register`/`unregister`/`ensure_running` are all
|
||||
empty.
|
||||
- **Always** wrap `SharedMemory` with
|
||||
`track=False`.
|
||||
|
||||
Result: the inherited tracker fd in the fork child
|
||||
is still inherited (fd is a kernel object; we can't
|
||||
un-inherit it across fork) but **nothing in the
|
||||
shm code path will ever try to use it** — both the
|
||||
tracker singleton and the per-allocation registration
|
||||
are short-circuited.
|
||||
|
||||
### 2. `tractor/ipc/_shm.py::open_shm_list()`
|
||||
— own the cleanup
|
||||
|
||||
Without `mp.resource_tracker`, nobody else will
|
||||
unlink leaked segments at process exit. tractor
|
||||
already controls actor lifecycle, so we register
|
||||
unlink on the actor's lifetime stack:
|
||||
|
||||
```python
|
||||
def try_unlink():
|
||||
try:
|
||||
shml.shm.unlink()
|
||||
except FileNotFoundError as fne:
|
||||
log.exception(...) # benign sibling-already-cleaned race
|
||||
|
||||
actor.lifetime_stack.callback(try_unlink)
|
||||
```
|
||||
|
||||
The `FileNotFoundError` swallow handles the case
|
||||
where a sibling actor already unlinked the same
|
||||
segment (legitimate race in shared-key setups).
|
||||
|
||||
## Why this is the right call
|
||||
|
||||
- **mp's tracker is widely criticized.** The
|
||||
in-tree comment "non-SC madness" predates this
|
||||
fix and matches CPython upstream's own discomfort
|
||||
(e.g. the per-context tracker design rework
|
||||
discussions in bpo-43475).
|
||||
- **tractor already owns process lifecycle.** We
|
||||
have `actor.lifetime_stack`, `Portal.cancel_actor`,
|
||||
and the IPC cancel cascade. Adding mp's tracker
|
||||
on top buys nothing we can't do better ourselves.
|
||||
- **Backend-uniform.** No special-casing per spawn
|
||||
backend. trio (`mp_spawn`-style), `subint_forkserver`,
|
||||
and the future `subint` all behave identically
|
||||
— register-time no-op, exit-time unlink-via-
|
||||
lifetime-stack.
|
||||
|
||||
## Trade-offs / known gaps
|
||||
|
||||
- **Crash-leaked segments.** If an actor segfaults
|
||||
or is `SIGKILL`'d before its lifetime stack runs,
|
||||
`/dev/shm/<key>` will leak. Mitigation:
|
||||
`scripts/tractor-reap --shm` walks `/dev/shm`,
|
||||
filters to segments owned by the current uid that
|
||||
no live process is mapping or holding open (via
|
||||
`/proc/*/maps` + `/proc/*/fd/*`), and unlinks
|
||||
them. The "nobody-has-it-open" filter is
|
||||
kernel-canonical so it never touches in-flight
|
||||
segments held by sibling apps (verified locally
|
||||
against 81 piker/lttng/aja-held segments — all
|
||||
preserved).
|
||||
- Higher-level apps using shm should still pin a
|
||||
UUID into the key (the `'shml_<uuid>'` pattern
|
||||
in `test_child_attaches_alot`) so concurrent
|
||||
sessions don't collide on the same key.
|
||||
- **Cross-actor unlink races.** Two actors holding
|
||||
the same shm key racing on `unlink()` — handled
|
||||
by the `FileNotFoundError` swallow.
|
||||
- **Crashes won't show up in mp's leak warning.**
|
||||
We've turned off `resource_tracker`, so the usual
|
||||
`resource_tracker: There appear to be N leaked
|
||||
shared_memory objects to clean up at shutdown`
|
||||
warning is gone too. If we ever want it back as
|
||||
a crash-detection signal, we'd need our own
|
||||
equivalent (walk the actor's `_shm_list_keys` set
|
||||
at root teardown, log any unfreed).
|
||||
|
||||
## Verification
|
||||
|
||||
```sh
|
||||
# fixed under both backends:
|
||||
./py314/bin/python -m pytest tests/test_shm.py \
|
||||
--spawn-backend=subint_forkserver
|
||||
# 7 passed
|
||||
|
||||
./py314/bin/python -m pytest tests/test_shm.py \
|
||||
--spawn-backend=trio
|
||||
# 7 passed (regression check)
|
||||
```
|
||||
|
||||
## References
|
||||
|
||||
- CPython upstream issues:
|
||||
- https://bugs.python.org/issue38119 (fork
|
||||
+ resource_tracker fd inheritance)
|
||||
- https://bugs.python.org/issue45209
|
||||
(SharedMemory + resource_tracker)
|
||||
- https://bugs.python.org/issue43475
|
||||
(per-context tracker rework discussion)
|
||||
- Long-term alternative: migrate off
|
||||
`multiprocessing.shared_memory` entirely to
|
||||
`posix_ipc` (no tracker) or finish the
|
||||
`hotbaud`-based ringbuf transport. Not blocked on
|
||||
this fix — both are independently tracked.
|
||||
|
|
@ -84,6 +84,11 @@ testing = [
|
|||
# known-hanging `subint`-backend audit tests; see
|
||||
# `ai/conc-anal/subint_*_issue.md`).
|
||||
"pytest-timeout>=2.3",
|
||||
# used by `tractor._testing._reap` for the
|
||||
# `tractor-reap` zombie-subactor + leaked-shm
|
||||
# cleanup utility (xplatform `Process.memory_maps`,
|
||||
# `Process.open_files`).
|
||||
"psutil>=7.0.0",
|
||||
]
|
||||
repl = [
|
||||
"pyperclip>=1.9.0",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,182 @@
|
|||
#!/usr/bin/env python3
|
||||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
'''
|
||||
`tractor-reap` — SC-polite zombie-subactor reaper +
|
||||
optional `/dev/shm/` orphan-segment sweep.
|
||||
|
||||
Two cleanup phases (run in order when both are enabled):
|
||||
|
||||
1. **process reap** — finds `tractor` subactor processes
|
||||
left alive after a `pytest` (or any tractor-app) run
|
||||
that failed to fully cancel its actor tree, then sends
|
||||
SIGINT with a bounded grace window before escalating
|
||||
to SIGKILL.
|
||||
|
||||
2. **shm sweep** (`--shm` / `--shm-only`) — unlinks
|
||||
`/dev/shm/<file>` entries owned by the current uid
|
||||
that no live process has open (mmap'd or fd-held).
|
||||
Needed because `tractor` disables
|
||||
`mp.resource_tracker` (see `tractor.ipc._mp_bs`), so a
|
||||
hard-crashing actor leaves leaked segments that
|
||||
nothing else GCs.
|
||||
|
||||
Process-reap detection modes (auto-selected):
|
||||
|
||||
--parent <pid> : descendant-mode — kill procs whose
|
||||
PPid == <pid>. Use when a parent
|
||||
is still alive and you want to
|
||||
scope the sweep precisely (e.g.
|
||||
CI wrapper calling in from outside
|
||||
pytest).
|
||||
|
||||
(default) : orphan-mode — kill procs with
|
||||
PPid==1 (init-reparented) whose
|
||||
cwd matches the repo root AND
|
||||
whose cmdline contains `python`.
|
||||
The cwd filter is what prevents
|
||||
sweeping unrelated init-children.
|
||||
|
||||
Usage:
|
||||
|
||||
# process reap only (default)
|
||||
scripts/tractor-reap
|
||||
|
||||
# process reap + shm sweep
|
||||
scripts/tractor-reap --shm
|
||||
|
||||
# only the shm sweep, skip process reap
|
||||
scripts/tractor-reap --shm-only
|
||||
|
||||
# from inside a still-live supervisor
|
||||
scripts/tractor-reap --parent 12345
|
||||
|
||||
# dry-run: list what would be reaped, don't act
|
||||
scripts/tractor-reap -n
|
||||
scripts/tractor-reap --shm -n
|
||||
|
||||
'''
|
||||
import argparse
|
||||
import pathlib
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
def _repo_root() -> pathlib.Path:
|
||||
'''
|
||||
Use `git rev-parse --show-toplevel` when available;
|
||||
fall back to the repo this script lives in.
|
||||
|
||||
'''
|
||||
try:
|
||||
out: str = subprocess.check_output(
|
||||
['git', 'rev-parse', '--show-toplevel'],
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
).strip()
|
||||
return pathlib.Path(out)
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
return pathlib.Path(__file__).resolve().parent.parent
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='tractor-reap',
|
||||
description=__doc__,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--parent', '-p',
|
||||
type=int,
|
||||
default=None,
|
||||
help='descendant-mode: reap procs with PPid==<pid>',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--grace', '-g',
|
||||
type=float,
|
||||
default=3.0,
|
||||
help='SIGINT grace window in seconds (default 3.0)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--dry-run', '-n',
|
||||
action='store_true',
|
||||
help='list matched pids/paths but do not signal/unlink',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--shm',
|
||||
action='store_true',
|
||||
help=(
|
||||
'after process reap, also unlink orphaned '
|
||||
'/dev/shm segments owned by the current user '
|
||||
'that no live process is mapping or holding open'
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--shm-only',
|
||||
action='store_true',
|
||||
help='skip process reap; only do the shm sweep',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# import lazily so `--help` doesn't require the tractor
|
||||
# package to be importable (e.g. when running from a
|
||||
# shell not inside a venv).
|
||||
repo = _repo_root()
|
||||
sys.path.insert(0, str(repo))
|
||||
from tractor._testing._reap import (
|
||||
find_descendants,
|
||||
find_orphans,
|
||||
find_orphaned_shm,
|
||||
reap,
|
||||
reap_shm,
|
||||
)
|
||||
|
||||
rc: int = 0
|
||||
|
||||
# --- phase 1: process reap (skipped under --shm-only) ---
|
||||
if not args.shm_only:
|
||||
if args.parent is not None:
|
||||
pids: list[int] = find_descendants(args.parent)
|
||||
mode: str = f'descendants of PPid={args.parent}'
|
||||
else:
|
||||
pids = find_orphans(repo)
|
||||
mode = f'orphans (PPid=1, cwd={repo})'
|
||||
|
||||
if not pids:
|
||||
print(f'[tractor-reap] no {mode} to reap')
|
||||
elif args.dry_run:
|
||||
print(
|
||||
f'[tractor-reap] dry-run — {mode}:\n {pids}'
|
||||
)
|
||||
else:
|
||||
_, survivors = reap(pids, grace=args.grace)
|
||||
if survivors:
|
||||
rc = 1
|
||||
|
||||
# --- phase 2: shm sweep (opt-in) ---
|
||||
if args.shm or args.shm_only:
|
||||
leaked: list[str] = find_orphaned_shm()
|
||||
if not leaked:
|
||||
print(
|
||||
'[tractor-reap] no orphaned /dev/shm '
|
||||
'segments to sweep'
|
||||
)
|
||||
elif args.dry_run:
|
||||
print(
|
||||
f'[tractor-reap] dry-run — {len(leaked)} '
|
||||
f'orphaned shm segment(s):\n {leaked}'
|
||||
)
|
||||
else:
|
||||
_, errors = reap_shm(leaked)
|
||||
if errors:
|
||||
rc = 1
|
||||
|
||||
# exit 0 if everything cleaned cleanly, else 1 — useful
|
||||
# for CI health-check chaining.
|
||||
return rc
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
raise SystemExit(main())
|
||||
|
|
@ -520,6 +520,10 @@ async def kill_transport(
|
|||
|
||||
|
||||
|
||||
# ?TODO, do a OSc style signalling test on this?
|
||||
# -[ ] doesn't work for fork backends
|
||||
# @pytest.mark.parametrize('use_signal', [False, True])
|
||||
#
|
||||
# Wall-clock bound via `pytest-timeout` (`method='thread'`).
|
||||
# Under `--spawn-backend=subint` this test can wedge in an
|
||||
# un-Ctrl-C-able state (abandoned-subint + shared-GIL
|
||||
|
|
@ -532,19 +536,21 @@ async def kill_transport(
|
|||
# the intended behavior here; the alternative is an unattended
|
||||
# suite run that never returns.
|
||||
@pytest.mark.timeout(
|
||||
3, # NOTE should be a 2.1s happy path.
|
||||
30,
|
||||
# NOTE should be a 2.1s happy path.
|
||||
# XXX for `subint_forkserver` this is SUPER SENSITIVE so keep it
|
||||
# higher to avoid flaky runs..
|
||||
method='thread',
|
||||
)
|
||||
@pytest.mark.skipon_spawn_backend(
|
||||
'subint',
|
||||
# 'subint_forkserver',
|
||||
reason=(
|
||||
'XXX SUBINT HANGING TEST XXX\n'
|
||||
'See oustanding issue(s)\n'
|
||||
# TODO, put issue link!
|
||||
)
|
||||
)
|
||||
# @pytest.mark.parametrize('use_signal', [False, True])
|
||||
#
|
||||
def test_stale_entry_is_deleted(
|
||||
debug_mode: bool,
|
||||
daemon: subprocess.Popen,
|
||||
|
|
@ -558,7 +564,6 @@ def test_stale_entry_is_deleted(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
name: str = 'transport_fails_actor'
|
||||
_reg_ptl: tractor.Portal
|
||||
an: tractor.ActorNursery
|
||||
|
|
@ -591,6 +596,14 @@ def test_stale_entry_is_deleted(
|
|||
await ptl.cancel_actor()
|
||||
await an.cancel()
|
||||
|
||||
# XXX, for tracing if this starts being flaky again..
|
||||
#
|
||||
# async def _timeout_main():
|
||||
# with trio.move_on_after(4) as cs:
|
||||
# await main()
|
||||
# if cs.cancel_called:
|
||||
# await tractor.pause()
|
||||
|
||||
# TODO, remove once the `[subint]` variant no longer hangs.
|
||||
#
|
||||
# Status (as of Phase B hard-kill landing):
|
||||
|
|
@ -641,3 +654,4 @@ def test_stale_entry_is_deleted(
|
|||
path=f'/tmp/test_stale_entry_is_deleted_{start_method}.dump',
|
||||
):
|
||||
trio.run(main)
|
||||
# trio.run(_timeout_main)
|
||||
|
|
|
|||
|
|
@ -446,20 +446,19 @@ def _process_alive(pid: int) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
# Flakey under session-level env pollution (leftover
|
||||
# subactor PIDs from earlier tests competing for ports /
|
||||
# inheriting the harness subprocess's FDs). Passes
|
||||
# cleanly in isolation, fails in suite; `strict=False`
|
||||
# so either outcome is tolerated until the env isolation
|
||||
# is improved. Tracker:
|
||||
# Known-gap test — `subint_forkserver` orphan-SIGINT
|
||||
# handling. See
|
||||
# `ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md`.
|
||||
# `strict=True` so if a future fix closes the gap the
|
||||
# XPASS surfaces as a FAIL and forces us to drop the
|
||||
# mark intentionally.
|
||||
@pytest.mark.xfail(
|
||||
strict=False,
|
||||
strict=True,
|
||||
reason=(
|
||||
'Env-pollution sensitive. Passes in isolation, '
|
||||
'flakey in full-suite runs; orphan subactor may '
|
||||
'take longer than 10s to exit when competing for '
|
||||
'resources with leftover state from earlier tests.'
|
||||
'Orphan subactor SIGINT delivery: trio event loop '
|
||||
'on non-main thread post-fork doesn\'t see the '
|
||||
'external SIGINT → KBI path. See tracker doc.\n'
|
||||
'ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md'
|
||||
),
|
||||
)
|
||||
@pytest.mark.timeout(
|
||||
|
|
|
|||
|
|
@ -76,9 +76,7 @@ async def subscribe(
|
|||
|
||||
|
||||
async def consumer(
|
||||
|
||||
subs: list[str],
|
||||
|
||||
) -> None:
|
||||
|
||||
uid = tractor.current_actor().uid
|
||||
|
|
@ -108,15 +106,21 @@ async def consumer(
|
|||
print(f'{uid} got: {value}')
|
||||
|
||||
|
||||
def test_dynamic_pub_sub():
|
||||
|
||||
def test_dynamic_pub_sub(
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
test_log: tractor.log.StackLevelAdapter,
|
||||
):
|
||||
global _registry
|
||||
|
||||
from multiprocessing import cpu_count
|
||||
cpus = cpu_count()
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as n:
|
||||
|
||||
# name of this actor will be same as target func
|
||||
await n.run_in_actor(publisher)
|
||||
|
|
@ -155,12 +159,13 @@ def test_dynamic_pub_sub():
|
|||
else:
|
||||
pytest.fail('Never got a `TooSlowError` ?')
|
||||
|
||||
assert err
|
||||
test_log.exception('Timed out AS EXPECTED')
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def one_task_streams_and_one_handles_reqresp(
|
||||
|
||||
ctx: tractor.Context,
|
||||
|
||||
) -> None:
|
||||
|
||||
await ctx.started()
|
||||
|
|
@ -257,7 +262,8 @@ async def echo_ctx_stream(
|
|||
|
||||
|
||||
def test_sigint_both_stream_types():
|
||||
'''Verify that running a bi-directional and recv only stream
|
||||
'''
|
||||
Verify that running a bi-directional and recv only stream
|
||||
side-by-side will cancel correctly from SIGINT.
|
||||
|
||||
'''
|
||||
|
|
|
|||
|
|
@ -115,10 +115,12 @@ async def not_started_but_stream_opened(
|
|||
)
|
||||
def test_started_misuse(
|
||||
target: Callable,
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
|
|
@ -184,6 +186,7 @@ def test_simple_context(
|
|||
error_parent,
|
||||
child_blocks_forever,
|
||||
pointlessly_open_stream,
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
|
||||
|
|
@ -193,6 +196,7 @@ def test_simple_context(
|
|||
|
||||
with trio.fail_after(timeout):
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
|
|
@ -278,6 +282,7 @@ def test_parent_cancels(
|
|||
cancel_method: str,
|
||||
chk_ctx_result_before_exit: bool,
|
||||
child_returns_early: bool,
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
|
|
@ -355,6 +360,7 @@ def test_parent_cancels(
|
|||
async def main():
|
||||
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
|
|
@ -931,6 +937,7 @@ async def keep_sending_from_child(
|
|||
)
|
||||
def test_one_end_stream_not_opened(
|
||||
overrun_by: tuple[str, int, Callable],
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
|
|
@ -949,6 +956,7 @@ def test_one_end_stream_not_opened(
|
|||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
|
|
@ -1113,6 +1121,7 @@ def test_maybe_allow_overruns_stream(
|
|||
|
||||
# conftest wide
|
||||
loglevel: str,
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
|
|
@ -1133,6 +1142,7 @@ def test_maybe_allow_overruns_stream(
|
|||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
|
|
@ -1249,6 +1259,7 @@ def test_maybe_allow_overruns_stream(
|
|||
|
||||
def test_ctx_with_self_actor(
|
||||
loglevel: str,
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
|
|
@ -1263,6 +1274,7 @@ def test_ctx_with_self_actor(
|
|||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
enable_modules=[__name__],
|
||||
) as an:
|
||||
|
|
|
|||
|
|
@ -16,14 +16,22 @@ from tractor.ipc._shm import (
|
|||
|
||||
pytestmark = pytest.mark.skipon_spawn_backend(
|
||||
'subint',
|
||||
'subint_forkserver',
|
||||
# 'subint_forkserver',
|
||||
# XXX we hack around this stdlib limitation by both,
|
||||
# - setting `ShareMemory(track=False)`
|
||||
# - overriding the `mp.ResourceTracker` nonsense in
|
||||
# `.ipc._mp_bs`.
|
||||
reason=(
|
||||
'subint: GIL-contention hanging class.\n'
|
||||
'subint_forkserver: `multiprocessing.SharedMemory` '
|
||||
'has known issues with fork-without-exec (mp\'s '
|
||||
'resource_tracker and SharedMemory internals assume '
|
||||
'fresh-process state). RemoteActorError surfaces from '
|
||||
'the shm-attach path. TODO, put issue link!\n'
|
||||
'is fork-without-exec unsafe — child inherits parent\'s '
|
||||
'`resource_tracker` fd → EBADF on first shm op '
|
||||
'(`test_child_attaches_alot`); leaked `/shm_list` from '
|
||||
'a "passing" run cascades into `FileExistsError` across '
|
||||
'parametrize variants (`test_parent_writer_child_reader`). '
|
||||
'Canonical CPython issue class, NOT a tractor bug; full '
|
||||
'tracker doc:\n'
|
||||
'ai/conc-anal/subint_forkserver_mp_shared_memory_issue.md'
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,462 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Zombie-subactor reaper — SC-polite (SIGINT first, SIGKILL
|
||||
as last resort with a bounded grace window) plus optional
|
||||
`/dev/shm/` orphan-segment sweep.
|
||||
|
||||
Shared implementation between the `tractor-reap` CLI
|
||||
(`scripts/tractor-reap`) and the pytest session-scoped
|
||||
auto-fixture that guards the test suite against leftover
|
||||
subactor processes.
|
||||
|
||||
Design notes — process reap
|
||||
---------------------------
|
||||
|
||||
- Linux-only today: reads `/proc/<pid>/{status,cwd,cmdline}`.
|
||||
Module imports cleanly elsewhere; calling `find_*` on a
|
||||
non-Linux box returns an empty list (no `/proc`
|
||||
enumeration). A future xplatform pass could swap this
|
||||
for `psutil.Process.children()` /
|
||||
`psutil.process_iter()` since `psutil` is already a
|
||||
test-time dependency.
|
||||
|
||||
- Two detection modes:
|
||||
|
||||
1. **descendant-mode** — when invoked from a still-live
|
||||
parent (e.g. a pytest session-end fixture), match by
|
||||
`PPid == parent_pid`. Direct + precise; the target
|
||||
PIDs are still reparented to the live pytest process
|
||||
at teardown time, before pytest exits.
|
||||
|
||||
2. **orphan-mode** — when invoked after the parent died
|
||||
(e.g. the `tractor-reap` CLI run post-Ctrl+C), match
|
||||
by `PPid == 1` (reparented to init) AND `cwd ==
|
||||
<repo-root>` AND cmdline contains `python`. The cwd
|
||||
filter is what keeps the heuristic from sweeping up
|
||||
unrelated init-children on the box.
|
||||
|
||||
- Escalation: for every matched PID, SIGINT, poll for up
|
||||
to `grace` seconds, then SIGKILL any survivors. The
|
||||
two-phase pattern is the SC-graceful-cancel discipline
|
||||
documented in `feedback_sc_graceful_cancel_first.md` —
|
||||
we want the subactor runtime to run its trio cancel
|
||||
shield + IPC teardown paths where it can.
|
||||
|
||||
Design notes — shm sweep
|
||||
------------------------
|
||||
|
||||
Since `tractor/ipc/_mp_bs.disable_mantracker()` turns off
|
||||
`mp.resource_tracker` entirely, a hard-crashing actor can
|
||||
leave `/dev/shm/<key>` segments behind that nothing else
|
||||
GCs (see
|
||||
`ai/conc-anal/subint_forkserver_mp_shared_memory_issue.md`,
|
||||
"Trade-offs / known gaps").
|
||||
|
||||
The shm sweep is **Linux-/FreeBSD-only**: both expose
|
||||
POSIX shared-memory segments as regular files under
|
||||
`/dev/shm`, so `os.stat()` + `os.unlink()` are the
|
||||
correct primitives. macOS POSIX shm has no fs-visible
|
||||
path (segments live behind `shm_open`/`shm_unlink`
|
||||
syscalls only), and Windows is a different story
|
||||
entirely. Calling the shm helpers on an unsupported
|
||||
platform raises `NotImplementedError`.
|
||||
|
||||
In-use enumeration delegates to `psutil` —
|
||||
`Process.memory_maps()` (post-mmap) +
|
||||
`Process.open_files()` (pre-mmap shm-opened fds) —
|
||||
xplatform, mature, and handles the per-process
|
||||
permission/race edge cases correctly. Segments matching
|
||||
neither are genuinely leaked → safe to unlink.
|
||||
|
||||
The "nobody has it open" check is the kernel-canonical
|
||||
test — same answer `lsof /dev/shm/<key>` would give. No
|
||||
reliance on tractor-specific naming conventions (shm
|
||||
keys are caller-defined).
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
import signal
|
||||
import stat
|
||||
import sys
|
||||
import time
|
||||
|
||||
# `/dev/shm` is the POSIX-shm filesystem on Linux + FreeBSD.
|
||||
# macOS uses `shm_open` syscalls without a fs-visible path,
|
||||
# so the shm helpers refuse to run there.
|
||||
_SHM_PLATFORM_OK: bool = sys.platform.startswith(
|
||||
('linux', 'freebsd')
|
||||
)
|
||||
SHM_DIR: str = '/dev/shm'
|
||||
|
||||
|
||||
def _ensure_shm_supported() -> None:
|
||||
'''
|
||||
Guard for shm helpers — they assume `/dev/shm` exists
|
||||
as a tmpfs and `os.unlink()` is the right primitive.
|
||||
Both true on Linux + FreeBSD; not true elsewhere.
|
||||
|
||||
'''
|
||||
if not _SHM_PLATFORM_OK:
|
||||
raise NotImplementedError(
|
||||
f'shm reap is only supported on Linux/FreeBSD; '
|
||||
f'got sys.platform={sys.platform!r}. macOS '
|
||||
f'POSIX shm has no fs-visible path; Windows '
|
||||
f'has no /dev/shm equivalent.'
|
||||
)
|
||||
|
||||
|
||||
def _read_status_ppid(pid: int) -> int | None:
|
||||
'''
|
||||
Return the parent-pid from `/proc/<pid>/status` or
|
||||
`None` if the proc went away / is unreadable.
|
||||
|
||||
'''
|
||||
try:
|
||||
with open(f'/proc/{pid}/status') as f:
|
||||
for line in f:
|
||||
if line.startswith('PPid:'):
|
||||
return int(line.split()[1])
|
||||
except (
|
||||
FileNotFoundError,
|
||||
PermissionError,
|
||||
ProcessLookupError,
|
||||
):
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _read_cwd(pid: int) -> str | None:
|
||||
try:
|
||||
return os.readlink(f'/proc/{pid}/cwd')
|
||||
except (
|
||||
FileNotFoundError,
|
||||
PermissionError,
|
||||
ProcessLookupError,
|
||||
):
|
||||
return None
|
||||
|
||||
|
||||
def _read_cmdline(pid: int) -> str:
|
||||
try:
|
||||
with open(f'/proc/{pid}/cmdline', 'rb') as f:
|
||||
return f.read().replace(b'\0', b' ').decode(
|
||||
errors='replace',
|
||||
)
|
||||
except (
|
||||
FileNotFoundError,
|
||||
PermissionError,
|
||||
ProcessLookupError,
|
||||
):
|
||||
return ''
|
||||
|
||||
|
||||
def _iter_live_pids() -> list[int]:
|
||||
'''
|
||||
Enumerate currently-alive pids from `/proc`. Returns
|
||||
`[]` on systems without `/proc` (e.g. macOS).
|
||||
|
||||
'''
|
||||
try:
|
||||
entries: list[str] = os.listdir('/proc')
|
||||
except OSError:
|
||||
return []
|
||||
return [int(e) for e in entries if e.isdigit()]
|
||||
|
||||
|
||||
def find_descendants(
|
||||
parent_pid: int,
|
||||
) -> list[int]:
|
||||
'''
|
||||
PIDs whose `PPid == parent_pid` — i.e. direct
|
||||
children of the given pid. Used by the pytest
|
||||
session-end fixture where `parent_pid` is still
|
||||
alive as the pytest-python process.
|
||||
|
||||
'''
|
||||
return [
|
||||
pid
|
||||
for pid in _iter_live_pids()
|
||||
if _read_status_ppid(pid) == parent_pid
|
||||
]
|
||||
|
||||
|
||||
def find_orphans(
|
||||
repo_root: pathlib.Path,
|
||||
) -> list[int]:
|
||||
'''
|
||||
PIDs that are:
|
||||
|
||||
- reparented to init (`PPid == 1`),
|
||||
- have `cwd == <repo_root>`,
|
||||
- and have a `python` in their cmdline.
|
||||
|
||||
This is the "pytest-died-mid-session" case where the
|
||||
subactor forks got reparented. The cwd filter is the
|
||||
critical bit that keeps us from sweeping up unrelated
|
||||
init-children on the box.
|
||||
|
||||
'''
|
||||
repo: str = str(repo_root)
|
||||
hits: list[int] = []
|
||||
for pid in _iter_live_pids():
|
||||
if _read_status_ppid(pid) != 1:
|
||||
continue
|
||||
cwd: str | None = _read_cwd(pid)
|
||||
if cwd != repo:
|
||||
continue
|
||||
cmd: str = _read_cmdline(pid)
|
||||
if 'python' not in cmd:
|
||||
continue
|
||||
hits.append(pid)
|
||||
return hits
|
||||
|
||||
|
||||
def reap(
|
||||
pids: list[int],
|
||||
*,
|
||||
grace: float = 3.0,
|
||||
poll: float = 0.25,
|
||||
log=print,
|
||||
) -> tuple[list[int], list[int]]:
|
||||
'''
|
||||
Deliver SIGINT to each pid, wait up to `grace`
|
||||
seconds for them to exit, then SIGKILL any that
|
||||
survive.
|
||||
|
||||
Returns `(signalled, survivors_killed)` so callers
|
||||
can report / assert.
|
||||
|
||||
`log` is the logger function for user-visible
|
||||
progress lines — default `print`; pytest fixture
|
||||
swaps it for a `pytest`-friendly writer.
|
||||
|
||||
'''
|
||||
if not pids:
|
||||
return ([], [])
|
||||
|
||||
signalled: list[int] = []
|
||||
for pid in pids:
|
||||
try:
|
||||
os.kill(pid, signal.SIGINT)
|
||||
signalled.append(pid)
|
||||
except ProcessLookupError:
|
||||
# raced — already gone
|
||||
pass
|
||||
|
||||
if signalled:
|
||||
log(
|
||||
f'[tractor-reap] SIGINT → {len(signalled)} '
|
||||
f'proc(s): {signalled}'
|
||||
)
|
||||
|
||||
deadline: float = time.monotonic() + grace
|
||||
while time.monotonic() < deadline:
|
||||
time.sleep(poll)
|
||||
alive: list[int] = [
|
||||
pid for pid in signalled if _is_alive(pid)
|
||||
]
|
||||
if not alive:
|
||||
return (signalled, [])
|
||||
|
||||
survivors: list[int] = [
|
||||
pid for pid in signalled if _is_alive(pid)
|
||||
]
|
||||
if survivors:
|
||||
log(
|
||||
f'[tractor-reap] SIGKILL (after {grace}s '
|
||||
f'grace) → {survivors}'
|
||||
)
|
||||
for pid in survivors:
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
|
||||
return (signalled, survivors)
|
||||
|
||||
|
||||
def _is_alive(pid: int) -> bool:
|
||||
'''
|
||||
True iff `/proc/<pid>` still exists AND the proc
|
||||
isn't already a zombie (Z state).
|
||||
|
||||
'''
|
||||
try:
|
||||
with open(f'/proc/{pid}/status') as f:
|
||||
for line in f:
|
||||
if line.startswith('State:'):
|
||||
# e.g. 'State:\tZ (zombie)'
|
||||
return 'Z' not in line.split()[1]
|
||||
except (
|
||||
FileNotFoundError,
|
||||
ProcessLookupError,
|
||||
):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _enumerate_in_use_shm(
|
||||
shm_dir: str = SHM_DIR,
|
||||
) -> set[str]:
|
||||
'''
|
||||
Return the set of `<shm_dir>/<file>` paths currently
|
||||
held open by any live process — via `psutil`'s
|
||||
xplatform `Process.memory_maps()` (post-mmap
|
||||
segments) and `Process.open_files()` (pre-mmap
|
||||
shm-opened fds).
|
||||
|
||||
Lazy-imports `psutil` so the module stays importable
|
||||
on installs without it (it's a `testing` group dep).
|
||||
|
||||
'''
|
||||
_ensure_shm_supported()
|
||||
|
||||
# lazy + actionable failure: leaked shm sweep is the
|
||||
# only thing in this module that needs psutil; we
|
||||
# don't want a top-level ImportError breaking the
|
||||
# process-reap path.
|
||||
try:
|
||||
import psutil
|
||||
except ImportError as exc:
|
||||
raise RuntimeError(
|
||||
'shm reap requires `psutil` — install the '
|
||||
'`testing` dep group, e.g. '
|
||||
'`uv sync --group testing`.'
|
||||
) from exc
|
||||
|
||||
in_use: set[str] = set()
|
||||
prefix: str = shm_dir.rstrip('/') + '/'
|
||||
for proc in psutil.process_iter(['pid']):
|
||||
try:
|
||||
for m in proc.memory_maps(grouped=False):
|
||||
if m.path.startswith(prefix):
|
||||
in_use.add(m.path)
|
||||
for f in proc.open_files():
|
||||
if f.path.startswith(prefix):
|
||||
in_use.add(f.path)
|
||||
except (
|
||||
psutil.NoSuchProcess,
|
||||
psutil.AccessDenied,
|
||||
psutil.ZombieProcess,
|
||||
FileNotFoundError,
|
||||
PermissionError,
|
||||
):
|
||||
# raced — proc died or we can't see its
|
||||
# mappings (e.g. root-owned). Skip; missing
|
||||
# an in-use entry only means we'd preserve
|
||||
# something we could reap, never the
|
||||
# reverse — safe-by-default.
|
||||
continue
|
||||
return in_use
|
||||
|
||||
|
||||
def find_orphaned_shm(
|
||||
*,
|
||||
uid: int | None = None,
|
||||
shm_dir: str = SHM_DIR,
|
||||
) -> list[str]:
|
||||
'''
|
||||
`<shm_dir>/<file>` paths that are:
|
||||
|
||||
- owned by `uid` (default: the current effective uid),
|
||||
- and currently held by NO live process — i.e.
|
||||
genuinely leaked.
|
||||
|
||||
Linux/FreeBSD only — see module docstring. No reliance
|
||||
on caller-defined shm-key naming, so this works for
|
||||
any tractor app (not just the test suite).
|
||||
|
||||
'''
|
||||
_ensure_shm_supported()
|
||||
|
||||
if uid is None:
|
||||
uid = os.geteuid()
|
||||
|
||||
try:
|
||||
entries: list[str] = os.listdir(shm_dir)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
in_use: set[str] = _enumerate_in_use_shm(shm_dir=shm_dir)
|
||||
leaked: list[str] = []
|
||||
prefix: str = shm_dir.rstrip('/') + '/'
|
||||
for entry in entries:
|
||||
path: str = prefix + entry
|
||||
try:
|
||||
st: os.stat_result = os.stat(path)
|
||||
except OSError:
|
||||
continue
|
||||
# only regular files — skip subdirs / sockets etc.
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
continue
|
||||
if st.st_uid != uid:
|
||||
continue
|
||||
if path in in_use:
|
||||
continue
|
||||
leaked.append(path)
|
||||
return leaked
|
||||
|
||||
|
||||
def reap_shm(
|
||||
paths: list[str],
|
||||
*,
|
||||
log=print,
|
||||
) -> tuple[list[str], list[tuple[str, OSError]]]:
|
||||
'''
|
||||
Unlink the given `/dev/shm/...` paths.
|
||||
|
||||
Linux/FreeBSD only — `os.unlink()` is the correct
|
||||
primitive on the POSIX-shm tmpfs there. macOS POSIX
|
||||
shm has no fs-visible path; the equivalent there is
|
||||
`posix_ipc.unlink_shared_memory(name)` (not
|
||||
implemented here — see module docstring).
|
||||
|
||||
Returns `(unlinked, errors)` where `errors` is a list
|
||||
of `(path, exc)` for paths that could not be removed
|
||||
(e.g. permissions). Paths that raced to being already-
|
||||
gone are counted as successfully unlinked.
|
||||
|
||||
'''
|
||||
_ensure_shm_supported()
|
||||
|
||||
unlinked: list[str] = []
|
||||
errors: list[tuple[str, OSError]] = []
|
||||
for path in paths:
|
||||
try:
|
||||
os.unlink(path)
|
||||
unlinked.append(path)
|
||||
except FileNotFoundError:
|
||||
# raced — already gone, treat as success
|
||||
unlinked.append(path)
|
||||
except OSError as exc:
|
||||
errors.append((path, exc))
|
||||
|
||||
if unlinked:
|
||||
log(
|
||||
f'[tractor-reap] unlinked {len(unlinked)} '
|
||||
f'orphaned shm segment(s): {unlinked}'
|
||||
)
|
||||
for path, exc in errors:
|
||||
log(
|
||||
f'[tractor-reap] could not unlink {path}: '
|
||||
f'{exc!r}'
|
||||
)
|
||||
return (unlinked, errors)
|
||||
|
|
@ -32,6 +32,7 @@ from typing import (
|
|||
|
||||
import pytest
|
||||
import tractor
|
||||
from tractor.spawn._spawn import SpawnMethodKey
|
||||
import trio
|
||||
|
||||
|
||||
|
|
@ -274,7 +275,12 @@ def pytest_collection_modifyitems(
|
|||
default_reason: str = f'Borked on --spawn-backend={backend!r}'
|
||||
for item in items:
|
||||
for mark in item.iter_markers(name='skipon_spawn_backend'):
|
||||
if backend in mark.args:
|
||||
skip_backends: tuple[str] = mark.args
|
||||
for skip_backend in skip_backends:
|
||||
assert skip_backend in get_args(SpawnMethodKey)
|
||||
# ?TODO, run these through the try-set-backend checker to
|
||||
# avoid typos?
|
||||
if backend in skip_backends:
|
||||
reason: str = mark.kwargs.get(
|
||||
'reason',
|
||||
default_reason,
|
||||
|
|
@ -285,6 +291,42 @@ def pytest_collection_modifyitems(
|
|||
break
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='session',
|
||||
autouse=True,
|
||||
)
|
||||
def _reap_orphaned_subactors():
|
||||
'''
|
||||
Session-scoped autouse fixture: after the whole test
|
||||
session finishes, SIGINT any subactor processes still
|
||||
parented to this `pytest` process, wait a bounded
|
||||
grace window, then SIGKILL survivors.
|
||||
|
||||
Rationale: under fork-based spawn backends (notably
|
||||
`subint_forkserver`), a test that times out or bails
|
||||
mid-teardown can leave subactor forks alive. Without
|
||||
this reap, they linger across sessions and compete
|
||||
for ports / inherit pytest's capture-pipe fds — which
|
||||
flakifies later tests. SC-polite discipline: SIGINT
|
||||
first to let the subactor's trio cancel shield + IPC
|
||||
teardown paths run before we escalate.
|
||||
|
||||
Matching companion CLI: `scripts/tractor-reap` for
|
||||
the pytest-died-mid-session case.
|
||||
|
||||
'''
|
||||
import os
|
||||
parent_pid: int = os.getpid()
|
||||
yield
|
||||
from tractor._testing._reap import (
|
||||
find_descendants,
|
||||
reap,
|
||||
)
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def debug_mode(
|
||||
request: pytest.FixtureRequest,
|
||||
|
|
@ -398,7 +440,6 @@ def pytest_generate_tests(
|
|||
# drive the valid-backend set from the canonical `Literal` so
|
||||
# adding a new spawn backend (e.g. `'subint'`) doesn't require
|
||||
# touching the harness.
|
||||
from tractor.spawn._spawn import SpawnMethodKey
|
||||
assert spawn_backend in get_args(SpawnMethodKey)
|
||||
|
||||
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
Utils to tame mp non-SC madeness
|
||||
|
||||
'''
|
||||
import platform
|
||||
from functools import partial
|
||||
|
||||
|
||||
def disable_mantracker():
|
||||
|
|
@ -27,49 +27,37 @@ def disable_mantracker():
|
|||
|
||||
'''
|
||||
from multiprocessing.shared_memory import SharedMemory
|
||||
from multiprocessing import (
|
||||
resource_tracker as mantracker,
|
||||
)
|
||||
|
||||
# XXX ALWAYS disable the stdlib's "resource tracker"; it prevents
|
||||
# fork backends and never was useful to us since we're SC
|
||||
# lifetime managing all allocations.
|
||||
class ManTracker(mantracker.ResourceTracker):
|
||||
def register(self, name, rtype):
|
||||
pass
|
||||
|
||||
def unregister(self, name, rtype):
|
||||
pass
|
||||
|
||||
def ensure_running(self):
|
||||
pass
|
||||
|
||||
# "know your land and know your prey"
|
||||
# https://www.dailymotion.com/video/x6ozzco
|
||||
mantracker._resource_tracker = ManTracker()
|
||||
mantracker.register = mantracker._resource_tracker.register
|
||||
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||
|
||||
# 3.13+ only.. can pass `track=False` to disable
|
||||
# all the resource tracker bs.
|
||||
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
|
||||
if (_py_313 := (
|
||||
platform.python_version_tuple()[:-1]
|
||||
>=
|
||||
('3', '13')
|
||||
)
|
||||
):
|
||||
from functools import partial
|
||||
return partial(
|
||||
SharedMemory,
|
||||
track=False,
|
||||
)
|
||||
|
||||
# !TODO, once we drop 3.12- we can obvi remove all this!
|
||||
else:
|
||||
from multiprocessing import (
|
||||
resource_tracker as mantracker,
|
||||
)
|
||||
|
||||
# Tell the "resource tracker" thing to fuck off.
|
||||
class ManTracker(mantracker.ResourceTracker):
|
||||
def register(self, name, rtype):
|
||||
pass
|
||||
|
||||
def unregister(self, name, rtype):
|
||||
pass
|
||||
|
||||
def ensure_running(self):
|
||||
pass
|
||||
|
||||
# "know your land and know your prey"
|
||||
# https://www.dailymotion.com/video/x6ozzco
|
||||
mantracker._resource_tracker = ManTracker()
|
||||
mantracker.register = mantracker._resource_tracker.register
|
||||
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||
|
||||
# use std type verbatim
|
||||
shmT = SharedMemory
|
||||
shmT = partial(
|
||||
SharedMemory,
|
||||
track=False,
|
||||
)
|
||||
|
||||
return shmT
|
||||
|
|
|
|||
|
|
@ -929,15 +929,26 @@ def open_shm_list(
|
|||
# "close" attached shm on actor teardown
|
||||
try:
|
||||
actor = tractor.current_actor()
|
||||
|
||||
actor.lifetime_stack.callback(shml.shm.close)
|
||||
|
||||
# XXX on 3.13+ we don't need to call this?
|
||||
# -> bc we pass `track=False` for `SharedMemeory` orr?
|
||||
if (
|
||||
platform.python_version_tuple()[:-1] < ('3', '13')
|
||||
):
|
||||
actor.lifetime_stack.callback(shml.shm.unlink)
|
||||
# >XXX NOTE< on 3.13+ we need to call this AS WELL AS pass
|
||||
# `track=False` for `mp.SharedMemeory` otherwise fork based
|
||||
# backends will error out due to long lived stdlib
|
||||
# limitations,
|
||||
# - https://bugs.python.org/issue38119
|
||||
# - https://bugs.python.org/issue45209
|
||||
#
|
||||
def try_unlink():
|
||||
try:
|
||||
shml.shm.unlink()
|
||||
except FileNotFoundError as fne:
|
||||
log.debug(
|
||||
f'ShmList already deallocated pre-actor-shutdown.\n'
|
||||
f'{fne!r}\n'
|
||||
)
|
||||
|
||||
actor.lifetime_stack.callback(try_unlink)
|
||||
|
||||
except RuntimeError:
|
||||
log.warning('tractor runtime not active, skipping teardown steps')
|
||||
|
||||
|
|
|
|||
2
uv.lock
2
uv.lock
|
|
@ -716,6 +716,7 @@ sync-pause = [
|
|||
]
|
||||
testing = [
|
||||
{ name = "pexpect" },
|
||||
{ name = "psutil" },
|
||||
{ name = "pytest" },
|
||||
{ name = "pytest-timeout" },
|
||||
]
|
||||
|
|
@ -761,6 +762,7 @@ subints = [{ name = "msgspec", marker = "python_full_version >= '3.14'", specifi
|
|||
sync-pause = [{ name = "greenback", marker = "python_full_version == '3.13.*'", specifier = ">=1.2.1,<2" }]
|
||||
testing = [
|
||||
{ name = "pexpect", specifier = ">=4.9.0,<5" },
|
||||
{ name = "psutil", specifier = ">=7.0.0" },
|
||||
{ name = "pytest", specifier = ">=8.3.5" },
|
||||
{ name = "pytest-timeout", specifier = ">=2.3" },
|
||||
]
|
||||
|
|
|
|||
Loading…
Reference in New Issue