tractor/tests/trionics/test_subproc.py

231 lines
6.1 KiB
Python
Raw Normal View History

Add `supervise_run_process` to `trionics._subproc` A `trio.Nursery.start()`-style wrapper around `trio.run_process()` that surfaces rc!=0 errors deterministically, ALWAYS isolates the parent controlling-tty, and optionally live-relays the child's std-streams to `log.<level>` per-line. Suits both short-lived test-runners + long-lived daemons. `supervise_run_process()`, - Deterministic rc!=0: pass `check=False` to `trio` and do our OWN post-drain rc-check from the supervisor coro body AFTER `own_tn.__aexit__` — NOT inside the internal nursery, since that would race-cancel the still-draining relay reader and lose stderr lines. (Re)build + raise a BARE `subprocess.CalledProcessError`: `.stderr=` for programmatic callers + an `add_note()`'d `|_.stderr:` block for human teardown logs. No nursery-eg-wrapped CPE to `collapse_eg` around. - Parent controlling-tty isolation: `stdin=DEVNULL` always, `stdout=DEVNULL` unless relayed/overridden (via `stdout=` kwarg w/ `_UNSET` sentinel so explicit `None` = inherit still works). Prevents a spawned program from clobbering the launching tty's scrollback w/ control-seqs. - Live per-line relay: `relay_stdout=True`/ `relay_stderr=True` → relayed to `log.<relay_level>` (default `'io'`, our custom level 21). Picked to sort just above stdlib `INFO`=20 so it shows at usual `info`/`devx` levels yet stays separately filterable; `runtime`=15 was REJECTED as a default since it'd be silently filtered at usual verbosity — footgun for daemon supervisors whose whole point is visibility. STREAMED, not buffered-until-exit. - Non-blocking `tn.start()` semantics: live `trio.Process` handed up via `task_status.started()` immediately (else `tn.start()` would block till child exit, losing the long-lived-daemon use case). Supervise/relay bg tasks run to completion in this coro. - `**run_process_kwargs` forwarded verbatim (env, shell, cwd, start_new_session, executable, ...); MANAGED keys (`stdin`/`stdout`/`stderr`/`check`) win on conflict. - Crash-handling layer intentionally NOT baked in — compose `maybe_open_crash_handler()` ON TOP at the call-site. `_relay_stream_lines()` helper, - Concurrent pipe-drain reader. MANDATORY whenever piping w/o `capture_*` since nothing else drains the OS pipe — child blocks on `write()` once kernel buf (~64KiB) fills → deadlock. - Modes (combine freely): `emit`-only live relay, `accum`-only silent drain+capture (for the CPE note), or both. Per-line splitting handles cross-chunk residuals + flushes any trailing un-newline-term'd line at EOF. `_add_stderr_note()` helper, - Attaches an indented `|_.stderr:` note to a CPE via `add_note()` for legible rc!=0 reporting at teardown. Tests (`tests/trionics/test_subproc.py`), - Hermetic `trio`-only (no actor-runtime). - `test_stdout_relayed_per_line`: per-line stdout relay. - `test_parent_tty_isolated`: child fd1 is OUR pipe (no `/dev/pts/*`), fd0 pinned to `/dev/null`. - `test_no_deadlock_on_big_unnewlined_output`: 200KiB no-newline output completes under `fail_after(2)` — exercises the concurrent drain (without it, the child blocks at ~64KiB). - `test_stderr_relay_and_cpe_rebuild`: rc!=0 w/ `relay_stderr=True` → bare `CalledProcessError` w/ the `.stderr` note + per-line live relay. - `test_nonrelay_cpe_note`: rc!=0 w/o relay → same deterministic post-drain CPE w/ `.stderr` note (silent drain+capture path). Re-export `supervise_run_process` from `tractor.trionics`. Prompt-IO: ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.md (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-06-01 23:29:46 +00:00
'''
Unit tests for `tractor.trionics.supervise_run_process` (in
`tractor.trionics._subproc`) and its per-line std-stream relay.
Hermetic `trio`-only coverage (no actor-runtime needed):
- per-line stdout relay -> `log.io`
- parent controlling-tty isolation (child fd1 is a pipe, fd0
`/dev/null` never the parent `/dev/pts/*`)
- mandatory concurrent pipe-drain (no deadlock on >64KiB
no-newline output)
- live stderr relay + `CalledProcessError` rebuild (rc!=0 note)
- legacy capture-stderr CPE note path
'''
from functools import partial
import subprocess
import pytest
import trio
from tractor.trionics import (
_subproc,
collapse_eg,
supervise_run_process,
)
def _capture_relay(monkeypatch, level: str = 'io') -> list[str]:
'''
Redirect `_subproc.log.<level>` (the relay's emit method —
`io` by default, see `supervise_run_process(relay_level=...)`)
into a list so tests can assert on the relayed lines.
'''
records: list[str] = []
monkeypatch.setattr(
_subproc.log,
level,
lambda msg, *a, **k: records.append(msg),
)
return records
def test_stdout_relayed_per_line(monkeypatch):
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'for i in 1 2 3; do echo line=$i; done',
]
async def main():
async with trio.open_nursery() as tn:
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-out',
relay_stdout=True,
)
)
trio.run(main)
out_lines = [r for r in records if '[t-out:out]' in r]
assert any('line=1' in r for r in out_lines)
assert any('line=2' in r for r in out_lines)
assert any('line=3' in r for r in out_lines)
def test_parent_tty_isolated(monkeypatch):
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'readlink /proc/self/fd/0; readlink /proc/self/fd/1',
]
async def main():
async with trio.open_nursery() as tn:
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-tty',
relay_stdout=True,
)
)
trio.run(main)
relayed = '\n'.join(records)
# fd1 (stdout) must be OUR pipe, never a controlling tty.
assert 'pipe:' in relayed
assert '/dev/pts/' not in relayed
# fd0 (stdin) is pinned to DEVNULL.
assert '/dev/null' in relayed
def test_no_deadlock_on_big_unnewlined_output(monkeypatch):
'''
>64KiB of output with NO newline: only completes because the
relay reader concurrently drains the pipe (else the child
blocks on `write()` when the OS pipe buffer fills).
'''
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'head -c 200000 /dev/zero | tr "\\0" x',
]
async def main():
# generous vs the ~ms real runtime, but bounded so a
# genuine pipe-fill deadlock fails fast.
with trio.fail_after(2):
async with trio.open_nursery() as tn:
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-big',
relay_stdout=True,
)
)
trio.run(main)
big = ''.join(
r.split('] ', 1)[-1]
for r in records
if '[t-big:out]' in r
)
assert len(big) == 200_000
def test_stderr_relay_and_cpe_rebuild(monkeypatch):
'''
`relay_stderr=True` PIPEs stderr ourselves (mutually
exclusive with trio's `capture_stderr`), so on rc!=0 the
wrapper rebuilds a `CalledProcessError` from the live
accumulator and `.add_note()`s its `.stderr` AND the
stderr is relayed per-line live.
'''
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'echo boom 1>&2; exit 3',
]
async def main():
# `collapse_eg()` unwraps the parent-nursery's single-exc
# eg so the bare CPE bubbles straight out (mirrors real
# caller usage).
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-err',
relay_stderr=True,
check=True,
)
)
with pytest.raises(subprocess.CalledProcessError) as ei:
trio.run(main)
cpe = ei.value
assert cpe.returncode == 3
# rebuilt `.stderr` (trio did NOT capture since we PIPE'd it).
assert b'boom' in (cpe.stderr or b'')
# note attached for legible teardown reporting.
assert any(
'boom' in n
for n in getattr(cpe, '__notes__', [])
)
# AND it was relayed live per-line.
assert any(
'[t-err:err]' in r and 'boom' in r
for r in records
)
def test_nonrelay_cpe_note(monkeypatch):
'''
No live relay: stderr is silently drained + captured (NOT
emitted), and on rc!=0 the wrapper rebuilds the
`CalledProcessError` from that accumulator with a `.stderr`
note same deterministic post-drain path as the relay case.
'''
cmd = [
'sh', '-c',
'echo nope 1>&2; exit 7',
]
async def main():
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-legacy',
check=True,
# relay_* default False -> silent
# drain+capture for the CPE note.
)
)
with pytest.raises(subprocess.CalledProcessError) as ei:
trio.run(main)
cpe = ei.value
assert cpe.returncode == 7
assert b'nope' in (cpe.stderr or b'')
assert any(
'nope' in n
for n in getattr(cpe, '__notes__', [])
)