# tractor: distributed structured concurrency.
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
'''
Pure-Python diagnostic state-capture for hung
`pytest`/`tractor` process trees.
This module is the load-bearing core for two consumers:
1. The `xontrib/tractor_diag.xsh::acli.*` xonsh aliases
(`acli.ptree`, `acli.hung_dump`, `acli.bindspace_scan`,
`acli.dump_all`) — interactive terminal diag tools.
2. In-test "capture-on-hang" helpers like
`fail_after_w_trace()` / `afk_alarm_w_trace()` that drop a
full diag snapshot to disk when a test exceeds its timeout
budget instead of just emitting a context-less
`trio.TooSlowError`.
All public dump-* functions RETURN formatted text rather than
printing, so callers can render to a terminal OR write to a
file. `dump_all()` does the file-writing for snapshot-archive
use cases.
Sudo policy:
Per-pid kernel `stack` + `py-spy dump` need `CAP_SYS_PTRACE`,
invoked via `sudo -n`. Two modes:
- `allow_sudo_prompt=True` (terminal CLI default):
`ensure_sudo_cached()` prompts the user once via `sudo -v`
if creds aren't cached, then re-uses them per-pid.
- `allow_sudo_prompt=False` (pytest / in-test default):
silently skip sudo-required diagnostics; emit a banner
pointing the human at `sudo -v && acli.hung_dump `
for a follow-up manual capture.
'''
from __future__ import annotations
import json
import os
import re
import signal
import subprocess as sp
from contextlib import (
AbstractAsyncContextManager,
AbstractContextManager,
asynccontextmanager,
contextmanager,
)
from datetime import datetime
from io import StringIO
from pathlib import Path
from typing import (
AsyncIterator,
Callable,
Iterator,
TypeAlias,
)
# Public type aliases for the `fail_after_w_trace` /
# `afk_alarm_w_trace` fixture-returned CM-factory callables.
# Test signatures can annotate the fixture param directly::
#
# def test_foo(
# fail_after_w_trace: FailAfterWTraceFactory,
# ):
# async with fail_after_w_trace(5.0):
# ...
#
# NOTE the fixture name intentionally shadows the underlying
# `fail_after_w_trace` function at test-fn scope; pytest's
# param-resolution overrides the module-level import, so the
# fixture-returned CM-factory wins inside the test body.
#
# `Callable[..., ...]` keeps the kwargs surface loose (caller
# can pass `label=`, `pid=`, `out_dir=`); precise checking of
# the first-arg `seconds` is left to runtime since most callers
# pass an `int|float` literal.
FailAfterWTraceFactory: TypeAlias = Callable[
...,
AbstractAsyncContextManager[None],
]
AfkAlarmWTraceFactory: TypeAlias = Callable[
...,
AbstractContextManager[None],
]
try:
import psutil
except ImportError:
psutil = None
try:
import pytest as _pytest
except ImportError:
# `trace.py`'s pure-Python core (proc-tree + bindspace +
# dump_*) is intentionally pytest-free so the `xontrib`
# CLI can `import` it from any venv. The fixtures at
# the bottom of this module require `pytest` and are
# only defined when it's importable.
_pytest = None
# matches tractor's UDS sock naming: `@.sock`
_UDS_SOCK_RE = re.compile(
r'^(?P.+)@(?P\d+)\.sock$'
)
# ---------------------------------------------------------------
# pid + proc-tree resolution
# ---------------------------------------------------------------
def resolve_pids(arg: str) -> list[int]:
'''
Resolve a numeric pid OR a `pgrep -f` pattern to a list of
pids. Returns `[]` on no match.
'''
if arg.isdigit():
return [int(arg)]
try:
out: str = sp.check_output(
['pgrep', '-f', arg],
text=True,
)
except sp.CalledProcessError:
return []
return [int(p) for p in out.split() if p]
def walk_tree_psutil(pid: int) -> list:
'''Flat `[Process, *descendants]` via `psutil` (or `[]`).'''
if psutil is None:
return []
try:
p = psutil.Process(pid)
except psutil.NoSuchProcess:
return []
return [p] + p.children(recursive=True)
def _walk_tree_with_depth(pid: int) -> Iterator[tuple]:
'''Yield `(proc, depth)` pairs walking `pid`'s subtree.'''
if psutil is None:
return
try:
root = psutil.Process(pid)
except psutil.NoSuchProcess:
return
yield root, 0
stack: list = [(root, 0)]
seen: set = {pid}
while stack:
parent, d = stack.pop()
try:
kids = parent.children()
except psutil.NoSuchProcess:
continue
for k in kids:
if k.pid in seen:
continue
seen.add(k.pid)
yield k, d + 1
stack.append((k, d + 1))
def _walk_tree_pgrep(pid: int) -> list[int]:
'''psutil-less fallback — recursive `pgrep -P`.'''
out: list[int] = [pid]
try:
kids: list = sp.check_output(
['pgrep', '-P', str(pid)],
text=True,
).split()
except sp.CalledProcessError:
return out
for k in kids:
out.extend(_walk_tree_pgrep(int(k)))
return out
def _which_cgroup_slice(pid: int) -> str | None:
'''
Return `'system'` / `'user'` / `None` for `pid`'s top-level
systemd cgroup slice. See the full `xontrib` docstring on
`_which_cgroup_slice` for the bucket semantics.
'''
try:
with open(f'/proc/{pid}/cgroup') as f:
cg: str = f.read()
except (
FileNotFoundError,
PermissionError,
ProcessLookupError,
OSError,
):
return None
if '/system.slice/' in cg:
return 'system'
if '/user.slice/' in cg:
return 'user'
return None
def _find_tractor_strays(seen: set[int]) -> list[int]:
'''
Scan `/proc/*/cmdline` (+ `/comm` as zombie-safe fallback) for
`tractor._child` / `tractor[]` proctitle matches whose
`pid` is NOT in the `seen` set AND whose `ppid` disposition
indicates the proc belongs to THIS test run's process tree:
- `ppid == 1` → init-adopted (parent died) — a real leaked
subactor from this (or a prior killed) test run.
- `ppid in seen` → subtree-descendant the recursive walk
missed due to a race (proc appeared between iterations).
Procs whose `ppid` points to some OTHER live, non-pytest
process are skipped — they belong to a different tractor app
(e.g. `piker`, another `pytest` invocation in another shell,
a long-running tractor daemon) and falsely flagging them as
"cross-test ghosts" of THIS run is misleading.
Used by `dump_proc_tree(include_strays=True)` to surface ghost
subactor trees from PRIOR test runs that aren't descendants of
the snapshot's root pid (typically the pytest worker). These
are usually the source of cross-test launchpad contamination —
e.g. orphaned `tractor._child` procs still squatting on UDS
bindspace from a hung-then-killed pytest invocation.
Returns the pids; caller decides what to do with them
(typically: walk their subtrees as additional roots and let
the existing zombie/orphan/live classification handle them).
Reuses `_reap._is_tractor_subactor` for the cmdline/comm
intrinsic-marker test so the detection stays in lock-step
with the reaper's own definition.
'''
# lazy-imported to avoid module-import cycle: `_reap.py` is a
# pytest plugin that imports from this module's siblings.
from ._reap import _is_tractor_subactor
strays: list[int] = []
proc = Path('/proc')
if not proc.is_dir():
return strays
for entry in proc.iterdir():
if not entry.name.isdigit():
continue
pid: int = int(entry.name)
if pid in seen:
continue
if not _is_tractor_subactor(pid):
continue
# ownership filter: only flag procs whose `ppid` ties them
# back to THIS test run (init-adopted orphan, or a
# descendant the walk missed).
ppid: int | None = _ppid_from_proc(pid)
if ppid is None:
# proc disappeared between `iterdir()` and `stat` —
# treat as gone, don't flag.
continue
if ppid == 1 or ppid in seen:
strays.append(pid)
return sorted(strays)
def _ppid_from_proc(pid: int) -> int | None:
'''
Read `ppid` from `/proc//stat`. Returns None on race
(proc died) / permission / non-linux.
NB: stat field [1] is `(comm)` which can contain spaces +
parens — `rsplit(')', 1)` is the safe way to skip past it.
'''
try:
with open(f'/proc/{pid}/stat') as f:
stat: str = f.read()
after_comm: str = stat.rsplit(')', 1)[1].strip()
return int(after_comm.split()[1]) # state(0) ppid(1)
except (
FileNotFoundError,
PermissionError,
ProcessLookupError,
OSError,
):
return None
# ---------------------------------------------------------------
# sudo probe / prompt
# ---------------------------------------------------------------
def is_sudo_cached() -> bool:
'''
Quietly probe whether `sudo` creds are cached. Never
prompts — safe to call from non-interactive contexts.
'''
try:
return sp.run(
['sudo', '-n', 'true'],
capture_output=True,
).returncode == 0
except FileNotFoundError:
return False
def ensure_sudo_cached() -> bool:
'''
Like `is_sudo_cached()` but PROMPTS interactively via
`sudo -v` if not yet cached. Suitable for terminal-CLI use
only — DO NOT call from inside a pytest run.
'''
if is_sudo_cached():
return True
print(
'[tractor-trace] needs `sudo` for '
'/proc//stack and `py-spy dump`; caching creds '
'via `sudo -v`...'
)
try:
rc: int = sp.run(['sudo', '-v']).returncode
except KeyboardInterrupt:
print(' cancelled — proceeding without sudo')
return False
except FileNotFoundError:
print(' sudo not on PATH — proceeding without sudo')
return False
return rc == 0
# ---------------------------------------------------------------
# dump_proc_tree (== acli.ptree)
# ---------------------------------------------------------------
def dump_proc_tree(
roots: list[int],
*,
flag_tree: bool = False,
include_strays: bool = True,
) -> str:
'''
Severity-classified proc-tree rendering of `roots` and
their descendants. Returns formatted text.
Buckets (severity-ordered):
- zombies: `status in (Z, X)`
- orphans: `ppid==1`, NOT in a systemd cgroup slice
- system-slice: `ppid==1`, under `/system.slice/`
- user-slice: `ppid==1`, under `/user.slice/.../*.scope`
- live: real (`ppid > 1`) parent
`flag_tree=True` additionally prepends a flat walk-order
`## tree` section preserving parent-child shape.
`include_strays=True` (default) additionally scans
`/proc/*/cmdline` for `tractor._child` / `tractor[]`
procs that are NOT descendants of any provided root — these
are typically ghost subactor trees from PRIOR test runs
(cross-test launchpad contamination). Their subtrees are
walked and classified normally; the bucket counts then
include them. See `_find_tractor_strays()`.
'''
buf = StringIO()
def echo(line: str = '') -> None:
buf.write(line + '\n')
if psutil is None:
echo(
'ptree requires `psutil`; '
'install via `uv pip install psutil`'
)
return buf.getvalue()
# statuses considered "defunct"
defunct_statuses: set = {
psutil.STATUS_ZOMBIE,
getattr(psutil, 'STATUS_DEAD', 'dead'),
}
seen: set = set()
walk_order: list = []
live: list = []
orphans: list = []
system_slice: list = []
user_slice: list = []
zombies: list = []
gone: list = []
pid_to_bucket: dict = {}
# lazy-imported, used to override cgroup-slice classification
# for `tractor._child` strays (they're orphans regardless of
# whether they happen to be in the user.slice / system.slice
# cgroup — `desktop-launched app` is the *wrong* read for a
# leaked subactor that just happens to inherit user-session
# cgroup membership from its now-dead parent).
from ._reap import _is_tractor_subactor
def _classify_walk(walk_roots: list[int]) -> None:
'''Walk + classify into the closure-shared bucket lists.'''
for r in walk_roots:
for (p, depth) in _walk_tree_with_depth(r):
if p.pid in seen:
continue
seen.add(p.pid)
try:
status: str = p.status()
ppid: int = p.ppid()
except psutil.NoSuchProcess:
gone.append(p.pid)
continue
entry = (p, depth)
if status in defunct_statuses:
zombies.append(entry)
pid_to_bucket[p.pid] = 'zombies'
elif ppid == 1:
# `tractor._child` procs reparented to init are
# leaked subactors regardless of cgroup-slice —
# short-circuit to `orphans` before falling back
# to the systemd-slice categorization (which is
# only meaningful for NON-tractor procs).
if _is_tractor_subactor(p.pid):
orphans.append(entry)
pid_to_bucket[p.pid] = 'orphans'
else:
slice_kind: str | None = _which_cgroup_slice(p.pid)
if slice_kind == 'system':
system_slice.append(entry)
pid_to_bucket[p.pid] = 'system-slice'
elif slice_kind == 'user':
user_slice.append(entry)
pid_to_bucket[p.pid] = 'user-slice'
else:
orphans.append(entry)
pid_to_bucket[p.pid] = 'orphans'
else:
live.append(entry)
pid_to_bucket[p.pid] = 'live'
walk_order.append(entry)
_classify_walk(roots)
explicit_seen: set = set(seen)
stray_roots: list[int] = []
if include_strays:
stray_roots = _find_tractor_strays(seen)
if stray_roots:
_classify_walk(stray_roots)
total: int = (
len(live)
+ len(orphans)
+ len(system_slice)
+ len(user_slice)
+ len(zombies)
)
echo(f'# ptree: {total} procs across roots {roots}')
if stray_roots:
n_stray_proc: int = len(seen) - len(explicit_seen)
echo(
f'# + {n_stray_proc} `tractor._child` stray proc(s) '
f'NOT descendants of {roots} '
f'(likely cross-test ghosts; see bindspace dump for '
f'their UDS sock state):'
)
for sr in stray_roots:
echo(f'# stray-root: {sr}')
hdr: str = (
' ' + 'PID'.rjust(7)
+ ' ' + 'PPID'.rjust(7)
+ ' ' + 'STATUS'.ljust(10)
+ ' CMD'
)
def _row(entry, bucket: str | None = None) -> str:
p, depth = entry
tree_pfx: str = (' ' * depth) + ('└─ ' if depth > 0 else '')
parent_anno: str = ''
if (
bucket is not None
and depth > 0
):
try:
parent_pid: int = p.ppid()
except psutil.NoSuchProcess:
parent_pid = 0
if parent_pid and parent_pid != 1:
parent_bucket: str | None = pid_to_bucket.get(parent_pid)
if (
parent_bucket is not None
and parent_bucket != bucket
):
parent_anno = (
f' [parent: {parent_pid} '
f'(in `{parent_bucket}`)]'
)
try:
cmd: str = (
' '.join(p.cmdline())[:140]
or '[' + p.name() + ']'
)
r: str = ' ' + str(p.pid).rjust(7)
r += ' ' + str(p.ppid()).rjust(7)
r += ' ' + p.status().ljust(10)
r += ' ' + tree_pfx + cmd + parent_anno
return r
except psutil.ZombieProcess:
try:
ppid_str: str = str(p.ppid())
name: str = p.name()
except psutil.NoSuchProcess:
ppid_str, name = '?', '?'
r = ' ' + str(p.pid).rjust(7)
r += ' ' + ppid_str.rjust(7)
r += ' ' + 'zombie'.ljust(10)
r += (
' ' + tree_pfx
+ '[' + name + ' ]'
+ parent_anno
)
return r
except psutil.NoSuchProcess:
return (
' ' + str(p.pid).rjust(7)
+ ' (gone mid-walk)'
)
def _section(
title: str,
procs: list,
hint: str = '',
bucket: str | None = None,
) -> None:
echo()
echo(
f'## {title} ({len(procs)})'
+ (f' — {hint}' if hint else '')
)
if not procs:
echo(' (none)')
return
echo(hdr)
for p in procs:
echo(_row(p, bucket=bucket))
if flag_tree:
_section(
'tree', walk_order,
'flat walk-order, parent-child preserved',
)
_section(
'zombies', zombies,
'status `Z`/`X`, parent has not reaped',
bucket='zombies',
)
_section(
'orphans', orphans,
'`ppid==1` + leaked: either NOT in a `system.slice`/'
'`user.slice` cgroup, OR a known `tractor._child` '
'proc (leaked subactor, regardless of cgroup-slice)',
bucket='orphans',
)
_section(
'system-slice', system_slice,
'`ppid==1`, rooted under `/system.slice/` '
'(real systemd-managed service — daemon, login '
'session manager, etc; not a leak)',
bucket='system-slice',
)
_section(
'user-slice', user_slice,
'`ppid==1`, rooted under `/user.slice/.../*.scope` '
'(desktop-launched app wrapped by systemd-user — '
'browser, editor, etc; not a leak)',
bucket='user-slice',
)
_section('live', live, bucket='live')
if gone:
echo()
echo(f'## gone-during-walk ({len(gone)}): {gone}')
return buf.getvalue()
# ---------------------------------------------------------------
# dump_hung_state (== acli.hung_dump)
# ---------------------------------------------------------------
def dump_hung_state(
roots: list[int],
*,
allow_sudo_prompt: bool = False,
) -> str:
'''
Per-pid kernel + python state for a hung pytest/tractor
process tree. Walks descendants of each root.
Captures per-pid:
- `/proc//wchan` (world-readable)
- `/proc//stack` (CAP_SYS_PTRACE — needs sudo)
- `py-spy dump --pid --locals` (needs sudo)
Sudo policy controlled by `allow_sudo_prompt`:
- `True`: call `ensure_sudo_cached()` which prompts via
`sudo -v` if creds aren't cached. Use from terminal CLI.
- `False` (default): only probe via `is_sudo_cached()` —
never prompts. If not cached, skip stack+py-spy and emit
a banner pointing the human at the manual follow-up cmd.
Use from inside a pytest run.
'''
buf = StringIO()
def echo(line: str = '') -> None:
buf.write(line + '\n')
if allow_sudo_prompt:
have_sudo: bool = ensure_sudo_cached()
else:
have_sudo: bool = is_sudo_cached()
pids: list[int] = []
seen: set = set()
for r in roots:
if psutil is not None:
walk: list[int] = [p.pid for p in walk_tree_psutil(r)]
else:
walk = _walk_tree_pgrep(r)
for pid in walk:
if pid not in seen:
seen.add(pid)
pids.append(pid)
echo(f'# tree: {pids}')
if not have_sudo:
echo()
echo(
'💡 sudo creds NOT cached — '
'`/proc//stack` + `py-spy dump` SKIPPED '
'for all pids below.'
)
echo(
' For full kernel-stack + py-spy frames, '
're-run manually with sudo cached:'
)
echo(f' sudo -v && acli.hung_dump {pids[0] if pids else ""}')
echo()
echo('## ps forest')
if pids:
try:
ps_out: str = sp.check_output(
[
'ps',
'-o', 'pid,ppid,pgid,stat,cmd',
'-p', ','.join(map(str, pids)),
],
text=True,
)
echo(ps_out.rstrip())
except (sp.CalledProcessError, FileNotFoundError) as e:
echo(f' (ps failed: {e})')
for pid in pids:
echo()
echo(f'## pid {pid}' + (
''
if have_sudo
else ' (sudo NOT cached — stack/py-spy SKIPPED)'
))
for f in ('wchan', 'stack'):
path = Path(f'/proc/{pid}/{f}')
try:
txt: str = path.read_text().rstrip()
echo(f'-- /proc/{pid}/{f} --')
echo(txt)
except PermissionError:
if not have_sudo:
echo(
f'-- /proc/{pid}/{f}: '
'PermissionError (no sudo) --'
)
continue
try:
txt = sp.check_output(
['sudo', '-n', 'cat', str(path)],
text=True,
stderr=sp.DEVNULL,
).rstrip()
echo(f'-- /proc/{pid}/{f} (sudo) --')
echo(txt)
except sp.CalledProcessError:
echo(
f'-- /proc/{pid}/{f}: '
'sudo cred expired? rerun --'
)
except FileNotFoundError:
echo(f'-- /proc/{pid}/{f}: proc gone --')
echo(f'-- py-spy {pid} --')
if not have_sudo:
echo(' (skipped — no sudo)')
continue
try:
py_spy_out: str = sp.check_output(
['sudo', '-n', 'py-spy', 'dump', '--pid', str(pid), '--locals'],
text=True,
stderr=sp.STDOUT,
)
echo(py_spy_out.rstrip())
except (sp.CalledProcessError, FileNotFoundError) as e:
echo(f' (py-spy failed: {e})')
return buf.getvalue()
# ---------------------------------------------------------------
# scan_bindspace (== acli.bindspace_scan)
# ---------------------------------------------------------------
def scan_bindspace(arg: str | None = None) -> str:
'''
Scan a tractor UDS bindspace dir for orphan sock files.
`arg` semantics:
- `None` -> `$XDG_RUNTIME_DIR/tractor`
- bare `` -> `$XDG_RUNTIME_DIR/` (e.g. `piker`)
- path -> use as-is
Output buckets: `live-active`, `orphaned-alive`,
`orphaned-dead`, `non-tractor`.
'''
buf = StringIO()
def echo(line: str = '') -> None:
buf.write(line + '\n')
runtime: str = os.environ.get(
'XDG_RUNTIME_DIR',
f'/run/user/{os.getuid()}',
)
if arg:
if arg.startswith('/') or '/' in arg:
bs_dir = Path(arg)
else:
bs_dir = Path(runtime) / arg
else:
bs_dir = Path(runtime) / 'tractor'
if not bs_dir.exists():
echo(f'(no bindspace at {bs_dir})')
return buf.getvalue()
socks: list = sorted(bs_dir.glob('*.sock'))
echo(f'## bindspace {bs_dir} ({len(socks)} sock file(s))')
live_active: list = []
live_orphaned: list = []
dead_orphans: list = []
bogus: list = []
for s in socks:
m = _UDS_SOCK_RE.match(s.name)
if not m:
bogus.append(s)
continue
pid = int(m['pid'])
name = m['name']
try:
os.kill(pid, 0)
except ProcessLookupError:
dead_orphans.append((s, pid, name))
continue
except PermissionError:
live_active.append((s, pid, name, None))
continue
ppid: int | None = _ppid_from_proc(pid)
if ppid == 1:
live_orphaned.append((s, pid, name, ppid))
else:
live_active.append((s, pid, name, ppid))
echo()
echo(
f'## live-active ({len(live_active)}) '
f'— PID alive, parent still own it'
)
if not live_active:
echo(' (none)')
for s, pid, name, ppid in live_active:
row: str = ' ' + str(pid).rjust(7)
row += ' ' + name.ljust(32)
row += ' ' + s.name
if ppid is not None:
row += f' (ppid={ppid})'
echo(row)
echo()
echo(
f'## orphaned-alive ({len(live_orphaned)}) '
f'— PID alive but `ppid==1`, parent reaped; '
f'`acli.reap` candidate'
)
if not live_orphaned:
echo(' (none)')
for s, pid, name, ppid in live_orphaned:
row = ' ' + str(pid).rjust(7)
row += ' ' + name.ljust(32)
row += ' ' + s.name + ' (adopted by init)'
echo(row)
echo()
echo(
f'## orphaned-dead ({len(dead_orphans)}) '
f'— PID gone, sock stale'
)
if not dead_orphans:
echo(' (none)')
for s, pid, name in dead_orphans:
row = ' ' + str(pid).rjust(7)
row += ' ' + name.ljust(32)
row += ' ' + s.name + ' (no live proc)'
echo(row)
if bogus:
echo()
echo(
f'## non-tractor ({len(bogus)}) '
f'— filename lacks `@` suffix, '
f'cannot determine liveness intrinsically'
)
for s in bogus:
echo(f' {s.name}')
echo()
echo('to check liveness manually (needs `iproute2`/`ss`):')
for s in bogus:
echo(f" ss -lpx 'src = {s}'")
if dead_orphans or live_orphaned:
echo()
echo(
'to sweep BOTH orphaned-alive subs '
'(graceful SIGINT -> SIGKILL) AND dead-orphan '
'socks in one shot:'
)
echo(' acli.reap --uds')
if dead_orphans:
unlink_cmd: str = ' '.join(str(o[0]) for o in dead_orphans)
echo()
echo(
'(or to unlink dead-orphan socks manually, '
"skipping `acli.reap`'s graceful-cancel ladder:)"
)
echo(f' rm {unlink_cmd}')
return buf.getvalue()
# ---------------------------------------------------------------
# dump_all — file-writing snapshot capture
# ---------------------------------------------------------------
def _default_dump_root() -> Path:
'''
`$XDG_CACHE_HOME/tractor/hung-dumps/` with
`~/.cache/tractor/hung-dumps/` fallback.
'''
cache: str = os.environ.get(
'XDG_CACHE_HOME',
str(Path.home() / '.cache'),
)
return Path(cache) / 'tractor' / 'hung-dumps'
def dump_all(
pid: int,
out_dir: Path | None = None,
*,
label: str,
allow_sudo_prompt: bool = False,
) -> Path:
'''
Capture full diag snapshot for the proc tree rooted at
`pid` into a new sub-directory under `out_dir`.
Layout:
`/