Add UDS orphan-sweep helpers + reap fixtures to `_reap`
Extend the `_testing._reap` mod with UDS sock-file leak detection +
cleanup, complementing the existing shm and subactor-process
reaping:
- `get_uds_dir()`, `_parse_uds_name()`, `find_orphaned_uds()`,
`reap_uds()` — detect `<name>@<pid>.sock` files under
`${XDG_RUNTIME_DIR}/tractor/` whose binder pid is dead (including
the `1616` registry sentinel).
- `_reap_orphaned_subactors` session-scoped autouse fixture: SIGINT
lingering subactors, wait, SIGKILL survivors, then sweep orphaned
UDS files.
- `_track_orphaned_uds_per_test` fn-scoped autouse fixture:
snapshot sock-file dir before/after each test, warn + reap new
orphans to prevent cascade flakiness under `--tpt-proto=uds`.
- `reap_subactors_per_test` opt-in fn-scoped fixture for modules
with known-leaky teardown.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver_backend
parent
486249d74f
commit
1cdc7fb302
|
|
@ -93,6 +93,7 @@ from __future__ import annotations
|
|||
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import signal
|
||||
import stat
|
||||
import sys
|
||||
|
|
@ -106,6 +107,25 @@ _SHM_PLATFORM_OK: bool = sys.platform.startswith(
|
|||
)
|
||||
SHM_DIR: str = '/dev/shm'
|
||||
|
||||
# UDS-socket leak sweep — see `find_orphaned_uds()` /
|
||||
# `reap_uds()` below. Tractor's UDS transport
|
||||
# (`tractor.ipc._uds`) creates sock files under
|
||||
# `${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock`; a
|
||||
# crash / SIGKILL / mid-cancel teardown can leave the
|
||||
# file behind because `os.unlink()` lives in the
|
||||
# `_serve_ipc_eps` `finally:` block which doesn't always
|
||||
# get to run on hard exits. The reaper here is best-effort
|
||||
# cleanup for the test harness + the `tractor-reap` CLI.
|
||||
_UDS_SUBDIR: str = 'tractor'
|
||||
# `<actor-name>@<pid>.sock` — pid is the binder's pid at
|
||||
# creation time. Special sentinel: `registry@1616.sock`
|
||||
# uses the magic `1616` not a real pid (the root
|
||||
# registrar's known address; see `UDSAddress.get_root`).
|
||||
_UDS_NAME_RE: re.Pattern = re.compile(
|
||||
r'^(?P<name>.+)@(?P<pid>\d+)\.sock$'
|
||||
)
|
||||
_UDS_REGISTRY_SENTINEL_PID: int = 1616
|
||||
|
||||
|
||||
def _ensure_shm_supported() -> None:
|
||||
'''
|
||||
|
|
@ -460,3 +480,301 @@ def reap_shm(
|
|||
f'{exc!r}'
|
||||
)
|
||||
return (unlinked, errors)
|
||||
|
||||
|
||||
def get_uds_dir() -> str|None:
|
||||
'''
|
||||
Path of tractor's per-user UDS sock-file dir
|
||||
(`${XDG_RUNTIME_DIR}/tractor/`).
|
||||
|
||||
Returns `None` when `XDG_RUNTIME_DIR` is unset (e.g.
|
||||
non-systemd hosts, or inside a container without the
|
||||
var plumbed through). Caller should treat that as
|
||||
"no UDS leaks possible to detect — skip".
|
||||
|
||||
'''
|
||||
xdg: str|None = os.environ.get('XDG_RUNTIME_DIR')
|
||||
if not xdg:
|
||||
return None
|
||||
return os.path.join(xdg, _UDS_SUBDIR)
|
||||
|
||||
|
||||
def _parse_uds_name(filename: str) -> tuple[str, int]|None:
|
||||
'''
|
||||
Extract `(actor_name, pid)` from a tractor UDS sock
|
||||
filename. Returns `None` for unrecognized names.
|
||||
|
||||
'''
|
||||
m = _UDS_NAME_RE.match(filename)
|
||||
if not m:
|
||||
return None
|
||||
return (m['name'], int(m['pid']))
|
||||
|
||||
|
||||
def find_orphaned_uds(
|
||||
*,
|
||||
uds_dir: str|None = None,
|
||||
) -> list[str]:
|
||||
'''
|
||||
`<uds_dir>/*.sock` paths whose binder pid is no
|
||||
longer alive (orphaned). Includes the
|
||||
`registry@1616.sock` sentinel — `1616` is a magic
|
||||
sentinel pid (not a real one) so the file's
|
||||
presence alone signals a leak from a dead session.
|
||||
|
||||
Returns `[]` on platforms without `XDG_RUNTIME_DIR`
|
||||
or when the dir doesn't exist. Files whose name
|
||||
doesn't match the `<name>@<pid>.sock` pattern are
|
||||
skipped (we don't unlink things we don't recognize).
|
||||
|
||||
'''
|
||||
dir_path: str = uds_dir or get_uds_dir()
|
||||
if not dir_path:
|
||||
return []
|
||||
|
||||
try:
|
||||
entries: list[str] = os.listdir(dir_path)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
leaked: list[str] = []
|
||||
prefix: str = dir_path.rstrip('/') + '/'
|
||||
for entry in entries:
|
||||
path: str = prefix + entry
|
||||
if not entry.endswith('.sock'):
|
||||
continue
|
||||
try:
|
||||
st: os.stat_result = os.stat(path)
|
||||
except OSError:
|
||||
continue
|
||||
# only sockets; skip stray regular files / subdirs
|
||||
if not stat.S_ISSOCK(st.st_mode):
|
||||
continue
|
||||
parsed = _parse_uds_name(entry)
|
||||
if parsed is None:
|
||||
# unknown naming — skip rather than risk
|
||||
# unlinking something we don't own
|
||||
continue
|
||||
_name, pid = parsed
|
||||
if pid == _UDS_REGISTRY_SENTINEL_PID:
|
||||
# sentinel — never a real pid; if the file
|
||||
# exists nobody live is "owning" it via
|
||||
# /proc lookup, so always orphaned
|
||||
leaked.append(path)
|
||||
continue
|
||||
if not _is_alive(pid):
|
||||
leaked.append(path)
|
||||
return leaked
|
||||
|
||||
|
||||
def reap_uds(
|
||||
paths: list[str],
|
||||
*,
|
||||
log=print,
|
||||
) -> tuple[list[str], list[tuple[str, OSError]]]:
|
||||
'''
|
||||
Unlink the given UDS sock-file paths.
|
||||
|
||||
Returns `(unlinked, errors)`; race-already-gone
|
||||
`FileNotFoundError`s count as success. Same shape
|
||||
as `reap_shm` so callers can pipeline both.
|
||||
|
||||
'''
|
||||
unlinked: list[str] = []
|
||||
errors: list[tuple[str, OSError]] = []
|
||||
for path in paths:
|
||||
try:
|
||||
os.unlink(path)
|
||||
unlinked.append(path)
|
||||
except FileNotFoundError:
|
||||
unlinked.append(path)
|
||||
except OSError as exc:
|
||||
errors.append((path, exc))
|
||||
|
||||
if unlinked:
|
||||
log(
|
||||
f'[tractor-reap] unlinked {len(unlinked)} '
|
||||
f'orphaned UDS sock-file(s): {unlinked}'
|
||||
)
|
||||
for path, exc in errors:
|
||||
log(
|
||||
f'[tractor-reap] could not unlink {path}: '
|
||||
f'{exc!r}'
|
||||
)
|
||||
return (unlinked, errors)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# Pytest fixtures — sub-plugin surface
|
||||
# ----------------------------------------------------------
|
||||
# Loaded as a pytest plugin via the `pytest_plugins` line in
|
||||
# `tractor._testing.pytest`. Keeps the reaping infra (helpers
|
||||
# above + fixtures below) co-located so adding a new reap
|
||||
# target is a single-file change. Sibling-module
|
||||
# (`tractor._testing.pytest`) keeps its core
|
||||
# tractor-tooling surface (option/marker/parametrize hooks,
|
||||
# `tractor_test` deco, transport / spawn-method fixtures)
|
||||
# uncluttered.
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='session',
|
||||
autouse=True,
|
||||
)
|
||||
def _reap_orphaned_subactors():
|
||||
'''
|
||||
Session-scoped autouse fixture: after the whole test
|
||||
session finishes, SIGINT any subactor processes still
|
||||
parented to this `pytest` process, wait a bounded
|
||||
grace window, then SIGKILL survivors.
|
||||
|
||||
Rationale: under fork-based spawn backends (notably
|
||||
`main_thread_forkserver`), a test that times out or bails
|
||||
mid-teardown can leave subactor forks alive. Without
|
||||
this reap, they linger across sessions and compete
|
||||
for ports / inherit pytest's capture-pipe fds — which
|
||||
flakifies later tests. SC-polite discipline: SIGINT
|
||||
first to let the subactor's trio cancel shield + IPC
|
||||
teardown paths run before we escalate.
|
||||
|
||||
Matching companion CLI: `scripts/tractor-reap` for
|
||||
the pytest-died-mid-session case.
|
||||
|
||||
'''
|
||||
parent_pid: int = os.getpid()
|
||||
yield
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
# NOTE, sweep UDS sock-files AFTER reaping subactors —
|
||||
# killed actors' bind paths only become "orphaned" once
|
||||
# their owning pid is gone. See `find_orphaned_uds()`
|
||||
# for the leak-detection algorithm + the `1616`
|
||||
# registry-sentinel special case.
|
||||
leaked_uds: list[str] = find_orphaned_uds()
|
||||
if leaked_uds:
|
||||
reap_uds(leaked_uds)
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='function',
|
||||
autouse=True,
|
||||
)
|
||||
def _track_orphaned_uds_per_test():
|
||||
'''
|
||||
Per-test (function-scoped) autouse UDS sock-file leak
|
||||
detector + reaper.
|
||||
|
||||
Snapshots `${XDG_RUNTIME_DIR}/tractor/` before and
|
||||
after each test; any `<name>@<pid>.sock` files
|
||||
created during the test that survive teardown AND
|
||||
whose creator pid is dead are surfaced as a loud
|
||||
warning AND reaped, so the next test starts with a
|
||||
clean dir.
|
||||
|
||||
Why per-test (not just session-scoped): under
|
||||
`--tpt-proto=uds`, a single hard-killed subactor
|
||||
leaves a sock file that a sibling test's
|
||||
`wait_for_actor`/`find_actor` discovery probes can
|
||||
accidentally hit (FileExistsError on rebind, or
|
||||
epoll register on a half-closed peer-FIN'd fd → see
|
||||
issue #452). Catching the leak the test that caused
|
||||
it (vs. blanket session-end sweep) makes blame
|
||||
obvious + prevents cascade flakiness.
|
||||
|
||||
Cheap: 2x `os.listdir` + a few `os.stat`s per test.
|
||||
Skips silently when `XDG_RUNTIME_DIR` isn't set.
|
||||
|
||||
'''
|
||||
uds_dir: str|None = get_uds_dir()
|
||||
# snapshot pre-test sock-file population so we only
|
||||
# blame this test for files it added (others may have
|
||||
# been left around by session-scoped fixtures /
|
||||
# cross-session leaks pending reaper).
|
||||
before: set[str] = set()
|
||||
if uds_dir:
|
||||
try:
|
||||
before = {
|
||||
e for e in os.listdir(uds_dir)
|
||||
if e.endswith('.sock')
|
||||
}
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
yield
|
||||
|
||||
if not uds_dir:
|
||||
return
|
||||
try:
|
||||
after: set[str] = {
|
||||
e for e in os.listdir(uds_dir)
|
||||
if e.endswith('.sock')
|
||||
}
|
||||
except OSError:
|
||||
return
|
||||
new_files: set[str] = after - before
|
||||
if not new_files:
|
||||
return
|
||||
# only consider files whose binder pid is dead (or the
|
||||
# 1616 sentinel) — a still-running test that legit
|
||||
# holds a sock open will be ignored here and caught at
|
||||
# session-end if it really is leaked.
|
||||
orphans: list[str] = find_orphaned_uds(uds_dir=uds_dir)
|
||||
new_orphans: list[str] = [
|
||||
os.path.join(uds_dir, n) for n in new_files
|
||||
if os.path.join(uds_dir, n) in orphans
|
||||
]
|
||||
if new_orphans:
|
||||
import warnings
|
||||
warnings.warn(
|
||||
f'UDS sock-file LEAK detected from test '
|
||||
f'(reaping):\n '
|
||||
+ '\n '.join(new_orphans),
|
||||
stacklevel=1,
|
||||
)
|
||||
reap_uds(new_orphans)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reap_subactors_per_test() -> int:
|
||||
'''
|
||||
Per-test (function-scoped) zombie-subactor reaper —
|
||||
**opt-in**, NOT autouse.
|
||||
|
||||
When a test's teardown fails to fully cancel its actor
|
||||
tree (e.g. an asyncio cancel-cascade times out under
|
||||
`main_thread_forkserver`, pytest hits its 200s wall-
|
||||
clock and abandons), the leftover subactor lingers as a
|
||||
direct child of `pytest` and squats on whatever
|
||||
registrar port / UDS path / shm segment it had bound.
|
||||
Subsequent tests trying to allocate the same resource
|
||||
fail — and with backends that bind a session-shared
|
||||
`reg_addr`, that means EVERY following test in the
|
||||
suite cascades. The session-scoped sibling
|
||||
(`_reap_orphaned_subactors`) only kicks in at session
|
||||
end which is too late to save the cascade.
|
||||
|
||||
Apply at module-level on the topically-problematic
|
||||
test files via:
|
||||
|
||||
```python
|
||||
pytestmark = pytest.mark.usefixtures(
|
||||
'reap_subactors_per_test',
|
||||
)
|
||||
```
|
||||
|
||||
Or per-test via the same `usefixtures` mark on a
|
||||
specific function. Intentionally NOT autouse so the
|
||||
fixture's presence on a module signals "this module's
|
||||
teardown is known-leaky enough to contaminate
|
||||
siblings"; the visibility helps future-us track down
|
||||
root causes rather than burying them under blanket
|
||||
cleanup.
|
||||
|
||||
'''
|
||||
parent_pid: int = os.getpid()
|
||||
yield parent_pid
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
|
|
|
|||
Loading…
Reference in New Issue