From 1cdc7fb30233c5c287be96ca1719d570a229c4dc Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 30 Apr 2026 19:21:02 -0400 Subject: [PATCH] Add UDS orphan-sweep helpers + reap fixtures to `_reap` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend the `_testing._reap` mod with UDS sock-file leak detection + cleanup, complementing the existing shm and subactor-process reaping: - `get_uds_dir()`, `_parse_uds_name()`, `find_orphaned_uds()`, `reap_uds()` — detect `@.sock` files under `${XDG_RUNTIME_DIR}/tractor/` whose binder pid is dead (including the `1616` registry sentinel). - `_reap_orphaned_subactors` session-scoped autouse fixture: SIGINT lingering subactors, wait, SIGKILL survivors, then sweep orphaned UDS files. - `_track_orphaned_uds_per_test` fn-scoped autouse fixture: snapshot sock-file dir before/after each test, warn + reap new orphans to prevent cascade flakiness under `--tpt-proto=uds`. - `reap_subactors_per_test` opt-in fn-scoped fixture for modules with known-leaky teardown. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/_testing/_reap.py | 318 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 318 insertions(+) diff --git a/tractor/_testing/_reap.py b/tractor/_testing/_reap.py index f16c22d3..e1209835 100644 --- a/tractor/_testing/_reap.py +++ b/tractor/_testing/_reap.py @@ -93,6 +93,7 @@ from __future__ import annotations import os import pathlib +import re import signal import stat import sys @@ -106,6 +107,25 @@ _SHM_PLATFORM_OK: bool = sys.platform.startswith( ) SHM_DIR: str = '/dev/shm' +# UDS-socket leak sweep — see `find_orphaned_uds()` / +# `reap_uds()` below. Tractor's UDS transport +# (`tractor.ipc._uds`) creates sock files under +# `${XDG_RUNTIME_DIR}/tractor/@.sock`; a +# crash / SIGKILL / mid-cancel teardown can leave the +# file behind because `os.unlink()` lives in the +# `_serve_ipc_eps` `finally:` block which doesn't always +# get to run on hard exits. The reaper here is best-effort +# cleanup for the test harness + the `tractor-reap` CLI. +_UDS_SUBDIR: str = 'tractor' +# `@.sock` — pid is the binder's pid at +# creation time. Special sentinel: `registry@1616.sock` +# uses the magic `1616` not a real pid (the root +# registrar's known address; see `UDSAddress.get_root`). +_UDS_NAME_RE: re.Pattern = re.compile( + r'^(?P.+)@(?P\d+)\.sock$' +) +_UDS_REGISTRY_SENTINEL_PID: int = 1616 + def _ensure_shm_supported() -> None: ''' @@ -460,3 +480,301 @@ def reap_shm( f'{exc!r}' ) return (unlinked, errors) + + +def get_uds_dir() -> str|None: + ''' + Path of tractor's per-user UDS sock-file dir + (`${XDG_RUNTIME_DIR}/tractor/`). + + Returns `None` when `XDG_RUNTIME_DIR` is unset (e.g. + non-systemd hosts, or inside a container without the + var plumbed through). Caller should treat that as + "no UDS leaks possible to detect — skip". + + ''' + xdg: str|None = os.environ.get('XDG_RUNTIME_DIR') + if not xdg: + return None + return os.path.join(xdg, _UDS_SUBDIR) + + +def _parse_uds_name(filename: str) -> tuple[str, int]|None: + ''' + Extract `(actor_name, pid)` from a tractor UDS sock + filename. Returns `None` for unrecognized names. + + ''' + m = _UDS_NAME_RE.match(filename) + if not m: + return None + return (m['name'], int(m['pid'])) + + +def find_orphaned_uds( + *, + uds_dir: str|None = None, +) -> list[str]: + ''' + `/*.sock` paths whose binder pid is no + longer alive (orphaned). Includes the + `registry@1616.sock` sentinel — `1616` is a magic + sentinel pid (not a real one) so the file's + presence alone signals a leak from a dead session. + + Returns `[]` on platforms without `XDG_RUNTIME_DIR` + or when the dir doesn't exist. Files whose name + doesn't match the `@.sock` pattern are + skipped (we don't unlink things we don't recognize). + + ''' + dir_path: str = uds_dir or get_uds_dir() + if not dir_path: + return [] + + try: + entries: list[str] = os.listdir(dir_path) + except OSError: + return [] + + leaked: list[str] = [] + prefix: str = dir_path.rstrip('/') + '/' + for entry in entries: + path: str = prefix + entry + if not entry.endswith('.sock'): + continue + try: + st: os.stat_result = os.stat(path) + except OSError: + continue + # only sockets; skip stray regular files / subdirs + if not stat.S_ISSOCK(st.st_mode): + continue + parsed = _parse_uds_name(entry) + if parsed is None: + # unknown naming — skip rather than risk + # unlinking something we don't own + continue + _name, pid = parsed + if pid == _UDS_REGISTRY_SENTINEL_PID: + # sentinel — never a real pid; if the file + # exists nobody live is "owning" it via + # /proc lookup, so always orphaned + leaked.append(path) + continue + if not _is_alive(pid): + leaked.append(path) + return leaked + + +def reap_uds( + paths: list[str], + *, + log=print, +) -> tuple[list[str], list[tuple[str, OSError]]]: + ''' + Unlink the given UDS sock-file paths. + + Returns `(unlinked, errors)`; race-already-gone + `FileNotFoundError`s count as success. Same shape + as `reap_shm` so callers can pipeline both. + + ''' + unlinked: list[str] = [] + errors: list[tuple[str, OSError]] = [] + for path in paths: + try: + os.unlink(path) + unlinked.append(path) + except FileNotFoundError: + unlinked.append(path) + except OSError as exc: + errors.append((path, exc)) + + if unlinked: + log( + f'[tractor-reap] unlinked {len(unlinked)} ' + f'orphaned UDS sock-file(s): {unlinked}' + ) + for path, exc in errors: + log( + f'[tractor-reap] could not unlink {path}: ' + f'{exc!r}' + ) + return (unlinked, errors) + + +# ---------------------------------------------------------- +# Pytest fixtures — sub-plugin surface +# ---------------------------------------------------------- +# Loaded as a pytest plugin via the `pytest_plugins` line in +# `tractor._testing.pytest`. Keeps the reaping infra (helpers +# above + fixtures below) co-located so adding a new reap +# target is a single-file change. Sibling-module +# (`tractor._testing.pytest`) keeps its core +# tractor-tooling surface (option/marker/parametrize hooks, +# `tractor_test` deco, transport / spawn-method fixtures) +# uncluttered. +import pytest + + +@pytest.fixture( + scope='session', + autouse=True, +) +def _reap_orphaned_subactors(): + ''' + Session-scoped autouse fixture: after the whole test + session finishes, SIGINT any subactor processes still + parented to this `pytest` process, wait a bounded + grace window, then SIGKILL survivors. + + Rationale: under fork-based spawn backends (notably + `main_thread_forkserver`), a test that times out or bails + mid-teardown can leave subactor forks alive. Without + this reap, they linger across sessions and compete + for ports / inherit pytest's capture-pipe fds — which + flakifies later tests. SC-polite discipline: SIGINT + first to let the subactor's trio cancel shield + IPC + teardown paths run before we escalate. + + Matching companion CLI: `scripts/tractor-reap` for + the pytest-died-mid-session case. + + ''' + parent_pid: int = os.getpid() + yield + pids: list[int] = find_descendants(parent_pid) + if pids: + reap(pids, grace=3.0) + # NOTE, sweep UDS sock-files AFTER reaping subactors — + # killed actors' bind paths only become "orphaned" once + # their owning pid is gone. See `find_orphaned_uds()` + # for the leak-detection algorithm + the `1616` + # registry-sentinel special case. + leaked_uds: list[str] = find_orphaned_uds() + if leaked_uds: + reap_uds(leaked_uds) + + +@pytest.fixture( + scope='function', + autouse=True, +) +def _track_orphaned_uds_per_test(): + ''' + Per-test (function-scoped) autouse UDS sock-file leak + detector + reaper. + + Snapshots `${XDG_RUNTIME_DIR}/tractor/` before and + after each test; any `@.sock` files + created during the test that survive teardown AND + whose creator pid is dead are surfaced as a loud + warning AND reaped, so the next test starts with a + clean dir. + + Why per-test (not just session-scoped): under + `--tpt-proto=uds`, a single hard-killed subactor + leaves a sock file that a sibling test's + `wait_for_actor`/`find_actor` discovery probes can + accidentally hit (FileExistsError on rebind, or + epoll register on a half-closed peer-FIN'd fd → see + issue #452). Catching the leak the test that caused + it (vs. blanket session-end sweep) makes blame + obvious + prevents cascade flakiness. + + Cheap: 2x `os.listdir` + a few `os.stat`s per test. + Skips silently when `XDG_RUNTIME_DIR` isn't set. + + ''' + uds_dir: str|None = get_uds_dir() + # snapshot pre-test sock-file population so we only + # blame this test for files it added (others may have + # been left around by session-scoped fixtures / + # cross-session leaks pending reaper). + before: set[str] = set() + if uds_dir: + try: + before = { + e for e in os.listdir(uds_dir) + if e.endswith('.sock') + } + except OSError: + pass + + yield + + if not uds_dir: + return + try: + after: set[str] = { + e for e in os.listdir(uds_dir) + if e.endswith('.sock') + } + except OSError: + return + new_files: set[str] = after - before + if not new_files: + return + # only consider files whose binder pid is dead (or the + # 1616 sentinel) — a still-running test that legit + # holds a sock open will be ignored here and caught at + # session-end if it really is leaked. + orphans: list[str] = find_orphaned_uds(uds_dir=uds_dir) + new_orphans: list[str] = [ + os.path.join(uds_dir, n) for n in new_files + if os.path.join(uds_dir, n) in orphans + ] + if new_orphans: + import warnings + warnings.warn( + f'UDS sock-file LEAK detected from test ' + f'(reaping):\n ' + + '\n '.join(new_orphans), + stacklevel=1, + ) + reap_uds(new_orphans) + + +@pytest.fixture +def reap_subactors_per_test() -> int: + ''' + Per-test (function-scoped) zombie-subactor reaper — + **opt-in**, NOT autouse. + + When a test's teardown fails to fully cancel its actor + tree (e.g. an asyncio cancel-cascade times out under + `main_thread_forkserver`, pytest hits its 200s wall- + clock and abandons), the leftover subactor lingers as a + direct child of `pytest` and squats on whatever + registrar port / UDS path / shm segment it had bound. + Subsequent tests trying to allocate the same resource + fail — and with backends that bind a session-shared + `reg_addr`, that means EVERY following test in the + suite cascades. The session-scoped sibling + (`_reap_orphaned_subactors`) only kicks in at session + end which is too late to save the cascade. + + Apply at module-level on the topically-problematic + test files via: + + ```python + pytestmark = pytest.mark.usefixtures( + 'reap_subactors_per_test', + ) + ``` + + Or per-test via the same `usefixtures` mark on a + specific function. Intentionally NOT autouse so the + fixture's presence on a module signals "this module's + teardown is known-leaky enough to contaminate + siblings"; the visibility helps future-us track down + root causes rather than burying them under blanket + cleanup. + + ''' + parent_pid: int = os.getpid() + yield parent_pid + pids: list[int] = find_descendants(parent_pid) + if pids: + reap(pids, grace=3.0)