From f595acc76c696317255fca9861d1940603f8d93b Mon Sep 17 00:00:00 2001 From: goodboy Date: Mon, 1 Jun 2026 19:29:46 -0400 Subject: [PATCH] Add `supervise_run_process` to `trionics._subproc` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A `trio.Nursery.start()`-style wrapper around `trio.run_process()` that surfaces rc!=0 errors deterministically, ALWAYS isolates the parent controlling-tty, and optionally live-relays the child's std-streams to `log.` per-line. Suits both short-lived test-runners + long-lived daemons. `supervise_run_process()`, - Deterministic rc!=0: pass `check=False` to `trio` and do our OWN post-drain rc-check from the supervisor coro body AFTER `own_tn.__aexit__` — NOT inside the internal nursery, since that would race-cancel the still-draining relay reader and lose stderr lines. (Re)build + raise a BARE `subprocess.CalledProcessError`: `.stderr=` for programmatic callers + an `add_note()`'d `|_.stderr:` block for human teardown logs. No nursery-eg-wrapped CPE to `collapse_eg` around. - Parent controlling-tty isolation: `stdin=DEVNULL` always, `stdout=DEVNULL` unless relayed/overridden (via `stdout=` kwarg w/ `_UNSET` sentinel so explicit `None` = inherit still works). Prevents a spawned program from clobbering the launching tty's scrollback w/ control-seqs. - Live per-line relay: `relay_stdout=True`/ `relay_stderr=True` → relayed to `log.` (default `'io'`, our custom level 21). Picked to sort just above stdlib `INFO`=20 so it shows at usual `info`/`devx` levels yet stays separately filterable; `runtime`=15 was REJECTED as a default since it'd be silently filtered at usual verbosity — footgun for daemon supervisors whose whole point is visibility. STREAMED, not buffered-until-exit. - Non-blocking `tn.start()` semantics: live `trio.Process` handed up via `task_status.started()` immediately (else `tn.start()` would block till child exit, losing the long-lived-daemon use case). Supervise/relay bg tasks run to completion in this coro. - `**run_process_kwargs` forwarded verbatim (env, shell, cwd, start_new_session, executable, ...); MANAGED keys (`stdin`/`stdout`/`stderr`/`check`) win on conflict. - Crash-handling layer intentionally NOT baked in — compose `maybe_open_crash_handler()` ON TOP at the call-site. `_relay_stream_lines()` helper, - Concurrent pipe-drain reader. MANDATORY whenever piping w/o `capture_*` since nothing else drains the OS pipe — child blocks on `write()` once kernel buf (~64KiB) fills → deadlock. - Modes (combine freely): `emit`-only live relay, `accum`-only silent drain+capture (for the CPE note), or both. Per-line splitting handles cross-chunk residuals + flushes any trailing un-newline-term'd line at EOF. `_add_stderr_note()` helper, - Attaches an indented `|_.stderr:` note to a CPE via `add_note()` for legible rc!=0 reporting at teardown. Tests (`tests/trionics/test_subproc.py`), - Hermetic `trio`-only (no actor-runtime). - `test_stdout_relayed_per_line`: per-line stdout relay. - `test_parent_tty_isolated`: child fd1 is OUR pipe (no `/dev/pts/*`), fd0 pinned to `/dev/null`. - `test_no_deadlock_on_big_unnewlined_output`: 200KiB no-newline output completes under `fail_after(2)` — exercises the concurrent drain (without it, the child blocks at ~64KiB). - `test_stderr_relay_and_cpe_rebuild`: rc!=0 w/ `relay_stderr=True` → bare `CalledProcessError` w/ the `.stderr` note + per-line live relay. - `test_nonrelay_cpe_note`: rc!=0 w/o relay → same deterministic post-drain CPE w/ `.stderr` note (silent drain+capture path). Re-export `supervise_run_process` from `tractor.trionics`. Prompt-IO: ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.md (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- .../20260601T231429Z_0e3e008b_prompt_io.md | 146 +++++++++ ...20260601T231429Z_0e3e008b_prompt_io.raw.md | 106 +++++++ tests/trionics/test_subproc.py | 230 ++++++++++++++ tractor/trionics/__init__.py | 3 + tractor/trionics/_subproc.py | 296 ++++++++++++++++++ 5 files changed, 781 insertions(+) create mode 100644 ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.md create mode 100644 ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.raw.md create mode 100644 tests/trionics/test_subproc.py create mode 100644 tractor/trionics/_subproc.py diff --git a/ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.md b/ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.md new file mode 100644 index 00000000..2f58db49 --- /dev/null +++ b/ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.md @@ -0,0 +1,146 @@ +--- +model: claude-opus-4-7[1m] +service: claude +session: trio-0.33-subproc-supervisor-retroactive +timestamp: 2026-06-01T23:14:29Z +git_ref: 0e3e008b +scope: code +substantive: true +raw_file: 20260601T231429Z_0e3e008b_prompt_io.raw.md +--- + +## Prompt + +**RETROACTIVE LOG** — original session prompts not +preserved; reconstructed from the staged work product. + +The work designs a `trio.Nursery.start()`-style wrapper +around `trio.run_process()` for SC-friendly subprocess +supervision. From the resulting code shape, the +prompting intent was: + +1. Surface rc!=0 `CalledProcessError` DETERMINISTICALLY, + without the nursery-eg-wrapping that complicates + `collapse_eg()` usage and races the relay reader on + trio's `check=True`-driven cancel cascade. +2. ALWAYS isolate the parent controlling-tty so a + spawned child can't emit terminal control-seqs onto + the launching tty (clobbering scrollback). Default + `stdin=DEVNULL`; default `stdout=DEVNULL` unless + explicitly relayed/overridden; distinguish "caller + passed nothing" from "caller passed `None` for + inherit". +3. Optional live per-line relay of child std-streams to + the `tractor` log — STREAMED (not + buffered-until-exit) so long-lived daemon output is + visible during the run. Pick a custom log level that + shows at usual `info`/`devx` console levels but is + separately filterable. +4. Concurrent pipe-drain reader MANDATORY when piping + without `capture_*` — without it the child blocks on + `write()` once the OS pipe buffer fills (~64KiB), + causing deadlocks on output bursts. +5. Non-blocking `tn.start()` semantics: hand the live + `trio.Process` to the parent immediately; + supervise/relay run to completion in the supervisor + coro. +6. Hermetic `trio`-only unit tests (no actor-runtime) + covering each of: per-line relay, tty isolation, + no-deadlock on >64KiB unnewlined output, CPE + rebuild w/ stderr relay, CPE rebuild on the silent + drain+capture path. + +## Response summary + +Adds `tractor/trionics/_subproc.py` (296 LOC) + +`tests/trionics/test_subproc.py` (230 LOC) + a +re-export in `tractor/trionics/__init__.py`. + +**`supervise_run_process()`** (public, re-exported) +- `check=False` is forced to `trio.run_process`; the + rc-check runs in the supervisor coro AFTER `own_tn` + unwinds (both the child AND the relay readers have + hit EOF + fully drained). A BARE + `subprocess.CalledProcessError` is rebuilt + raised + from there, with `.stderr` bytes passed in the + constructor AND attached as an `add_note()`'d + `|_.stderr:` block for legible teardown logs. +- `stdin=DEVNULL` always. `stdout` default chosen via a + `_UNSET` sentinel: `relay_stdout=True` → PIPE, + explicit `stdout=...` → as given, else `DEVNULL`. + `stderr` defaults to PIPE whenever we relay OR need + the CPE note (when `check=True`), else `DEVNULL`. +- `relay_level='io'` (custom level 21; sorts just + above stdlib `INFO`=20 so it shows at usual + `info`/`devx` levels and stays separately + filterable). `runtime`=15 would silently filter at + default levels, so it's rejected as a default. +- `task_status.started(trio_proc)` delivers the live + process immediately. The internal `own_tn` + supervises `trio.run_process` + any relay readers to + completion. +- `**run_process_kwargs` forward verbatim; + `stdin/stdout/stderr/check` are MANAGED keys + (override on conflict). +- Crash-handling deliberately NOT baked in — compose + `maybe_open_crash_handler()` on top at the call-site. + +**`_relay_stream_lines()`** (internal helper) +- Three modes (combinable): `emit`-only (live per-line + relay), `accum`-only (silent drain+capture for a CPE + note), or both (live relay AND capture). +- Per-line split handles cross-chunk residuals via a + rolling `residual` bytes buffer; flushes any trailing + un-newline-term'd line at EOF. +- `async with stream:` ensures aclose at EOF/cancel + (mirrors trio's internal `_subprocess` drain idiom). + +**`_add_stderr_note()`** (internal helper) +- `add_note()`s a `textwrap.indent(...)`'d + `|_.stderr:` block onto a `CalledProcessError` for + teardown logs. + +**Tests** (5 hermetic, trio-only) — `_capture_relay` +fixture monkeypatches `_subproc.log.` to a list: +- `test_stdout_relayed_per_line`: per-line stdout + relay carries each `line=N` to the records. +- `test_parent_tty_isolated`: `readlink /proc/self/fd/0` + and `fd/1` from the child show `pipe:` (fd1) + + `/dev/null` (fd0); NO `/dev/pts/*`. +- `test_no_deadlock_on_big_unnewlined_output`: 200KiB + of `x` with no newlines completes inside + `fail_after(2)` — exercises the concurrent drain. +- `test_stderr_relay_and_cpe_rebuild`: rc=3 with + `relay_stderr=True` raises bare CPE + (via `collapse_eg()`) with `b'boom' in cpe.stderr`, + the note attached, AND per-line live relay. +- `test_nonrelay_cpe_note`: rc=7 with no relay still + produces CPE with `.stderr` + note via the silent + drain+capture path. + +## Files changed + +- `tractor/trionics/_subproc.py` — NEW. Public + `supervise_run_process()` + helpers + `_relay_stream_lines()` / `_add_stderr_note()` + the + `_UNSET` sentinel. +- `tests/trionics/test_subproc.py` — NEW. 5 hermetic + trio-only tests + `_capture_relay` monkeypatch + fixture. +- `tractor/trionics/__init__.py` — re-export + `supervise_run_process`. + +## Human edits + +**RETROACTIVE**: this log is being written from the +staged diff, not from a live session. The code as +staged is the canonical artifact; any human edits the +user made during the originating design session are +already integrated and cannot be separated post-hoc. +The `.raw.md` sibling is a diff-pointer placeholder, +NOT a pre-edit transcript. + +Future prompt-io entries for in-flight work should be +written DURING the design session per the skill +contract so the pre-edit `.raw.md` captures the +unedited model output for genuine provenance. diff --git a/ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.raw.md b/ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.raw.md new file mode 100644 index 00000000..362c4f4b --- /dev/null +++ b/ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.raw.md @@ -0,0 +1,106 @@ +--- +model: claude-opus-4-7[1m] +service: claude +timestamp: 2026-06-01T23:14:29Z +git_ref: 0e3e008b +diff_cmd: git diff HEAD~1..HEAD +--- + +# RETROACTIVE — original model output not preserved + +This `.raw.md` would normally contain the verbatim +pre-human-edit response from the design session that +produced the staged `_subproc.py` module + tests. That +session's transcript is not available, so this file +serves as a diff-pointer placeholder + transparency +note. + +## Authoritative artifact + +The committed code IS the artifact of record. Once the +companion commit lands, the unified diff is: + +> `git diff HEAD~1..HEAD -- tractor/trionics/_subproc.py` +> `git diff HEAD~1..HEAD -- tests/trionics/test_subproc.py` +> `git diff HEAD~1..HEAD -- tractor/trionics/__init__.py` + +Before committing, substitute `--cached` for the +pre-commit form. + +## What is NOT here + +Because this is retroactive: +- No verbatim chain-of-thought / discussion prose from + the design session. +- No rejected alternatives the model considered before + arriving at the final shape (e.g. whether the + rc-check should live inside `own_tn` vs after it; the + `_UNSET` sentinel vs a `None`-means-DEVNULL + convention; `io` vs `info` as the default relay + level). +- No pre-edit code blocks as the model first emitted + them, separable from any user cleanup applied before + the diff was staged. + +## Inferred design choices visible in the final code + +(Documented here because they're the kind of decision +detail an unedited raw transcript would have captured.) + +1. **Post-drain rc-check in the supervisor coro body, + AFTER `own_tn.__aexit__`.** Placing the + `CalledProcessError` raise here (not inside + `own_tn`) means the EG-unwrap happens at the OUTER + `tn.start()` boundary — callers do `collapse_eg()` + if they want bare. Doing the raise INSIDE `own_tn` + would cancel the still-draining relay reader + mid-flight and lose stderr lines. + +2. **`_UNSET` sentinel for `stdout`.** A plain default + of `None` couldn't distinguish "use the safe + `DEVNULL` default" from "caller explicitly passed + `None` (inherit, presumably knowingly)". The + sentinel keeps the SAFE default while letting power + users opt into inherit. + +3. **`relay_level='io'` (custom level 21).** Chosen to + sort just above stdlib `INFO`=20 so a default + `--ll info` shows the relay, but it remains a + distinct level so users can filter + `tractor.trionics:io` separately. Picking + `runtime`=15 would have made the relay invisible at + default verbosity (a footgun for daemon supervisors + whose whole point is "I want to see this output"). + +4. **Reader is MANDATORY, not opt-in cosmetic.** With + `stdout=PIPE` / `stderr=PIPE` we OWN the drain + responsibility — there's no `trio.capture_*` running + under the hood here. The ~64KiB OS pipe buffer + means a child writing more than that without us + reading hangs at `write()` — a deadlock that won't + show up in small-output tests, which is why the + 200KiB-no-newline test is in the suite. + +5. **`task_status.started(trio_proc)` BEFORE the + `own_tn` exits.** Without this, `tn.start()` would + block until the child exits — losing the "start a + long-lived daemon and continue with parent work" + use case. With it, the parent gets the live process + handle immediately and the supervise+relay tasks + run in the supervisor coro until the child exits. + +6. **`__notes__` via `add_note()` for the CPE + `.stderr`.** The `.stderr` attribute is what + `subprocess` callers expect; the `add_note()` is + what trio's exception-rendering shows. Both wired so + programmatic AND human consumers see the stderr at + teardown. + +## Honesty statement + +This file's content is RECONSTRUCTED from the staged +code, not extracted from a verbatim model transcript. +The prompt-io skill's intent is for the `.raw.md` to +be a pre-edit fossil; that's not possible here. Future +work should write the prompt-io entry DURING the +design session. diff --git a/tests/trionics/test_subproc.py b/tests/trionics/test_subproc.py new file mode 100644 index 00000000..136bca9f --- /dev/null +++ b/tests/trionics/test_subproc.py @@ -0,0 +1,230 @@ +''' +Unit tests for `tractor.trionics.supervise_run_process` (in +`tractor.trionics._subproc`) and its per-line std-stream relay. + +Hermetic `trio`-only coverage (no actor-runtime needed): + +- per-line stdout relay -> `log.io` +- parent controlling-tty isolation (child fd1 is a pipe, fd0 + `/dev/null` — never the parent `/dev/pts/*`) +- mandatory concurrent pipe-drain (no deadlock on >64KiB + no-newline output) +- live stderr relay + `CalledProcessError` rebuild (rc!=0 note) +- legacy capture-stderr CPE note path + +''' +from functools import partial +import subprocess + +import pytest +import trio + +from tractor.trionics import ( + _subproc, + collapse_eg, + supervise_run_process, +) + + +def _capture_relay(monkeypatch, level: str = 'io') -> list[str]: + ''' + Redirect `_subproc.log.` (the relay's emit method — + `io` by default, see `supervise_run_process(relay_level=...)`) + into a list so tests can assert on the relayed lines. + + ''' + records: list[str] = [] + monkeypatch.setattr( + _subproc.log, + level, + lambda msg, *a, **k: records.append(msg), + ) + return records + + +def test_stdout_relayed_per_line(monkeypatch): + records = _capture_relay(monkeypatch) + + cmd = [ + 'sh', '-c', + 'for i in 1 2 3; do echo line=$i; done', + ] + + async def main(): + async with trio.open_nursery() as tn: + await tn.start( + partial( + supervise_run_process, + cmd, + label='t-out', + relay_stdout=True, + ) + ) + + trio.run(main) + + out_lines = [r for r in records if '[t-out:out]' in r] + assert any('line=1' in r for r in out_lines) + assert any('line=2' in r for r in out_lines) + assert any('line=3' in r for r in out_lines) + + +def test_parent_tty_isolated(monkeypatch): + records = _capture_relay(monkeypatch) + + cmd = [ + 'sh', '-c', + 'readlink /proc/self/fd/0; readlink /proc/self/fd/1', + ] + + async def main(): + async with trio.open_nursery() as tn: + await tn.start( + partial( + supervise_run_process, + cmd, + label='t-tty', + relay_stdout=True, + ) + ) + + trio.run(main) + + relayed = '\n'.join(records) + # fd1 (stdout) must be OUR pipe, never a controlling tty. + assert 'pipe:' in relayed + assert '/dev/pts/' not in relayed + # fd0 (stdin) is pinned to DEVNULL. + assert '/dev/null' in relayed + + +def test_no_deadlock_on_big_unnewlined_output(monkeypatch): + ''' + >64KiB of output with NO newline: only completes because the + relay reader concurrently drains the pipe (else the child + blocks on `write()` when the OS pipe buffer fills). + + ''' + records = _capture_relay(monkeypatch) + + cmd = [ + 'sh', '-c', + 'head -c 200000 /dev/zero | tr "\\0" x', + ] + + async def main(): + # generous vs the ~ms real runtime, but bounded so a + # genuine pipe-fill deadlock fails fast. + with trio.fail_after(2): + async with trio.open_nursery() as tn: + await tn.start( + partial( + supervise_run_process, + cmd, + label='t-big', + relay_stdout=True, + ) + ) + + trio.run(main) + + big = ''.join( + r.split('] ', 1)[-1] + for r in records + if '[t-big:out]' in r + ) + assert len(big) == 200_000 + + +def test_stderr_relay_and_cpe_rebuild(monkeypatch): + ''' + `relay_stderr=True` PIPEs stderr ourselves (mutually + exclusive with trio's `capture_stderr`), so on rc!=0 the + wrapper rebuilds a `CalledProcessError` from the live + accumulator and `.add_note()`s its `.stderr` — AND the + stderr is relayed per-line live. + + ''' + records = _capture_relay(monkeypatch) + + cmd = [ + 'sh', '-c', + 'echo boom 1>&2; exit 3', + ] + + async def main(): + # `collapse_eg()` unwraps the parent-nursery's single-exc + # eg so the bare CPE bubbles straight out (mirrors real + # caller usage). + async with ( + collapse_eg(), + trio.open_nursery() as tn, + ): + await tn.start( + partial( + supervise_run_process, + cmd, + label='t-err', + relay_stderr=True, + check=True, + ) + ) + + with pytest.raises(subprocess.CalledProcessError) as ei: + trio.run(main) + + cpe = ei.value + assert cpe.returncode == 3 + # rebuilt `.stderr` (trio did NOT capture since we PIPE'd it). + assert b'boom' in (cpe.stderr or b'') + # note attached for legible teardown reporting. + assert any( + 'boom' in n + for n in getattr(cpe, '__notes__', []) + ) + # AND it was relayed live per-line. + assert any( + '[t-err:err]' in r and 'boom' in r + for r in records + ) + + +def test_nonrelay_cpe_note(monkeypatch): + ''' + No live relay: stderr is silently drained + captured (NOT + emitted), and on rc!=0 the wrapper rebuilds the + `CalledProcessError` from that accumulator with a `.stderr` + note — same deterministic post-drain path as the relay case. + + ''' + cmd = [ + 'sh', '-c', + 'echo nope 1>&2; exit 7', + ] + + async def main(): + async with ( + collapse_eg(), + trio.open_nursery() as tn, + ): + await tn.start( + partial( + supervise_run_process, + cmd, + label='t-legacy', + check=True, + # relay_* default False -> silent + # drain+capture for the CPE note. + ) + ) + + with pytest.raises(subprocess.CalledProcessError) as ei: + trio.run(main) + + cpe = ei.value + assert cpe.returncode == 7 + assert b'nope' in (cpe.stderr or b'') + assert any( + 'nope' in n + for n in getattr(cpe, '__notes__', []) + ) diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 7271b0f3..6cf57b7b 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -38,3 +38,6 @@ from ._taskc import ( maybe_raise_from_masking_exc as maybe_raise_from_masking_exc, start_or_cancel as start_or_cancel, ) +from ._subproc import ( + supervise_run_process as supervise_run_process, +) diff --git a/tractor/trionics/_subproc.py b/tractor/trionics/_subproc.py new file mode 100644 index 00000000..4cd62200 --- /dev/null +++ b/tractor/trionics/_subproc.py @@ -0,0 +1,296 @@ +# tractor: distributed structured concurrency. +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +SC-friendly `trio.run_process()` supervision: a `tn.start()` +style wrapper which surfaces rc!=0 errors deterministically and +(optionally) live-relays the child's std-streams to the `tractor` +log. + +''' +from __future__ import annotations +from functools import partial +import subprocess +import textwrap +from typing import ( + Callable, +) + +import trio + +from ..log import get_logger + +log = get_logger() + + +# sentinel so `supervise_run_process(stdout=...)` can tell +# "caller passed nothing" (-> tty-safe `DEVNULL` default) from +# an explicit `stdout=None` (inherit) override. +_UNSET = object() + + +def _add_stderr_note( + cpe: subprocess.CalledProcessError, + stderr_bytes: bytes, +) -> None: + ''' + Attach an indented `|_.stderr:` note to a + `CalledProcessError` for legible rc!=0 reporting at + teardown. + + ''' + stderr_str: str = stderr_bytes.decode(errors='replace') + cpe.add_note( + f'|_.stderr:\n' + f'{textwrap.indent(stderr_str, prefix=" "*3)}' + ) + + +async def _relay_stream_lines( + stream: trio.abc.ReceiveStream, + *, + emit: Callable[[str], None]|None = None, + tag: str = '', + accum: bytearray|None = None, +) -> None: + ''' + Concurrently drain a child subproc's `stdout`/`stderr` + PIPE; relay each COMPLETE line to `emit` (a bound + `log.` method) prefixed with `tag` (e.g. + `f'{label}:out'`) and/or append raw bytes to `accum`. + + This reader is MANDATORY whenever a bare + `stdout=`/`stderr=PIPE` is used WITHOUT `trio`'s + `capture_*` (which would spawn trio's own internal drain + task): nothing else drains the OS pipe, so once its kernel + buffer (~64KiB) fills the child blocks on `write()` -> + deadlock. + + Modes (combine freely): + - `emit`-only: live per-line relay (e.g. `relay_stdout`). + - `accum`-only: silent drain + capture (e.g. stderr kept + for a `CalledProcessError` note WITHOUT relaying it). + - both: relay AND capture (e.g. `relay_stderr` with `check=True`). + + ''' + # NOTE, mirrors `trio._subprocess`'s internal + # `async with stream: async for ...` drain idiom — except + # here we EMIT per-line (and/or accumulate) instead of + # only accumulating. + residual: bytes = b'' + async with stream: # aclose at EOF/cancel + async for chunk in stream: # ends at child-exit EOF + if accum is not None: + accum += chunk + if emit is None: + continue # drain(+accum)-only + buf: bytes = residual + chunk + *lines, residual = buf.split(b'\n') + for raw in lines: + line: str = raw.decode( + errors='replace', + ).rstrip('\r') + emit(f'[{tag}] {line}') + + # flush any trailing partial (un-newline-term'd) line @ EOF + if ( + emit is not None + and + residual + ): + line: str = residual.decode( + errors='replace', + ).rstrip('\r') + emit(f'[{tag}] {line}') + + +async def supervise_run_process( + cmd: list[str]|str, + *, + check: bool = True, + label: str|None = None, + + # per-line `log.*` relay of the child's std-streams + # (tty-safe, capture-safe, STREAMED — not + # buffered-until-exit, so it suits long-lived daemons). + relay_stdout: bool = False, + relay_stderr: bool = False, + + # default `io` (our custom level, value 21): the relay + # exists to make windowless-spawn output VISIBLE, and + # `IO`(21) sorts just ABOVE `INFO`(20) so it shows at the + # usual `info`/`devx` console levels (a `runtime`(15) relay + # would be silently filtered) while staying distinctly + # labelled + separately filterable. + relay_level: str = 'io', + + # non-relay `stdout` override; defaults (via `_UNSET`) to + # `DEVNULL` so we NEVER inherit (+ thus can't clobber) the + # parent controlling-tty. + stdout: int = _UNSET, + + task_status: trio.TaskStatus[ + trio.Process + ] = trio.TASK_STATUS_IGNORED, + + # any other `trio.run_process()` kwarg (env, shell, cwd, + # start_new_session, executable, ...) forwarded verbatim; + # our MANAGED keys (stdin/stdout/stderr/check) are set + # below and WIN on conflict. + **run_process_kwargs, +) -> None: + ''' + A `trio.Nursery.start()`-style `trio.run_process()` + wrapper which, + + - surfaces a rc!=0 `subprocess.CalledProcessError` + DETERMINISTICALLY: we pass `check=False` to `trio` and + do our OWN post-drain rc-check, (re)building + raising a + BARE CPE (with a `.stderr` note) from this coro's body + AFTER the child exits — so there's no nursery-eg-wrapped + CPE to catch/`collapse_eg`, and the relay reader is never + race-cancelled mid-drain. + + - ALWAYS isolates the parent controlling-tty + (`stdin=DEVNULL`, and `stdout=DEVNULL` unless + relayed/overridden) so a spawned program can't emit + terminal control-seqs onto the launching tty (which + would clobber its scrollback). + + - optionally live-relays `stdout`/`stderr` per-line to + `log.` via concurrent reader tasks (see + `_relay_stream_lines`). + + Delivers the live `trio.Process` via + `task_status.started()` then SUPERVISES it (the + `run_process` bg task + any relay readers) to completion + in this coro — i.e. the parent `tn.start()` returns + immediately/non-blocking. + + NOTE: any crash-handling / `repl_fixture` layer is + intentionally NOT baked in here — compose it ON TOP at the + call-site, e.g. + + async with maybe_open_crash_handler(): + await tn.start( + partial(supervise_run_process, cmd, ...), + ) + + ''' + emit: Callable[[str], None] = getattr(log, relay_level) + tag: str = ( + label + or + (cmd if isinstance(cmd, str) else ' '.join(cmd)) + ) + + # forward any extra `trio.run_process` kwargs verbatim; + # MANAGED keys below override on conflict. + rp_kwargs: dict = dict(run_process_kwargs) + + # XXX ALWAYS isolate the controlling-tty's stdin. + rp_kwargs['stdin'] = subprocess.DEVNULL + + # stdout: relay -> our own PIPE (drained by the reader + # below); else an explicit override; else tty-safe + # `DEVNULL`. + if relay_stdout: + rp_kwargs['stdout'] = subprocess.PIPE + elif stdout is not _UNSET: + rp_kwargs['stdout'] = stdout + else: + rp_kwargs['stdout'] = subprocess.DEVNULL + + # stderr: PIPE (+ our reader) when we either RELAY it OR + # need it captured for a rc!=0 CPE note; else tty-safe + # `DEVNULL`. We accumulate ONLY when `check` (the note is + # the only consumer). + # + # XXX we ALWAYS pass `check=False` to `trio` and do our + # OWN deterministic post-drain rc-check (below) so `trio` + # never raises a nursery-eg-wrapped CPE — no `collapse_eg` + # workaround, no reader race-cancel. + want_stderr_pipe: bool = relay_stderr or check + stderr_accum: bytearray|None = bytearray() if check else None + rp_kwargs['check'] = False + rp_kwargs['stderr'] = ( + subprocess.PIPE if want_stderr_pipe + else subprocess.DEVNULL + ) + + async with trio.open_nursery() as own_tn: + trio_proc: trio.Process = await own_tn.start( + partial( + trio.run_process, + cmd, + **rp_kwargs, + ) + ) + + # spin up the concurrent pipe-drain relay reader(s) — + # see `_relay_stream_lines` for why these are mandatory + # (not cosmetic) when piping without `capture_*`. + if relay_stdout: + own_tn.start_soon( + partial( + _relay_stream_lines, + trio_proc.stdout, + emit=emit, + tag=f'{tag}:out', + ) + ) + if want_stderr_pipe: + own_tn.start_soon( + partial( + _relay_stream_lines, + trio_proc.stderr, + # relay live only if asked; else silent + # drain+capture for the CPE note. + emit=emit if relay_stderr else None, + tag=f'{tag}:err', + accum=stderr_accum, + ) + ) + + # hand the live proc up to the parent WITHOUT blocking + # on the bg supervise/relay tasks (keeps non-blocking + # `tn.start()` semantics). + task_status.started(trio_proc) + + # ===== deterministic post-drain rc-check (BOTH paths) ===== + # `own_tn` only unwinds once `run_process` AND the relay + # reader(s) have hit EOF + FULLY drained — so `stderr_accum` + # is COMPLETE here (no race vs an early CPE-cancel). Rebuild + # + raise a BARE `CalledProcessError` (the parent `tn` will + # eg-wrap it like any task-raise; callers `collapse_eg()` if + # they want it bare). + if ( + check + and + trio_proc.returncode + ): + stderr_bytes: bytes = ( + bytes(stderr_accum) + if stderr_accum is not None + else b'' + ) + cpe = subprocess.CalledProcessError( + returncode=trio_proc.returncode, + cmd=trio_proc.args, + stderr=stderr_bytes, + ) + _add_stderr_note(cpe, stderr_bytes) + raise cpe