Compare commits

...

8 Commits

Author SHA1 Message Date
Gud Boi 0ef549fadb Add `tractor.trionics.patches` subpkg + first fix
With a seminal patch fixing `trio`'s `WakeupSocketpair.drain()` which
can busy-loop due to lack of handling `EOF`.

New `tractor.trionics.patches` subpkg housing defensive monkey-patches
for upstream `trio` bugs we've encountered while running `tractor`
— particularly as of recent, fork-survival edge cases that haven't been
filed/fixed upstream yet. Each patch is idempotent, version-gated via
`is_needed()`, and carries a `# REMOVE WHEN:` marker pointing at the
upstream release whose adoption allows deletion.

Subpkg layout + per-patch contract documented in
`tractor/trionics/patches/README.md` — `apply()` / `is_needed()`
/ `repro()` API, registry pattern via `_PATCHES` in `__init__.py`,
single-call entry point `apply_all()`.

First patch, `_wakeup_socketpair`:
- `trio`'s `WakeupSocketpair.drain()` loops on `recv(64KB)` and exits
  ONLY on `BlockingIOError`, NEVER on `recv() == b''` (peer-closed FIN).
- under `fork()`-spawning backends the COW-inherited socketpair fds
  & `_close_inherited_fds()` teardown can leave a `WakeupSocketpair`
  instance whose write-end is closed, and `drain()` then **spins forever
  in C with no Python checkpoints**,
- this obviously burns 100% CPU and no signal delivery.

Standalone repro:

    from trio._core._wakeup_socketpair import WakeupSocketpair
    ws = WakeupSocketpair()
    ws.write_sock.close()
    ws.drain()  # spins forever

Patch is one-line — break the drain loop on b'' EOF.

Manifested as two distinct test failures:

- `tests/test_multi_program.py::test_register_duplicate_name` hung at
  100% CPU on the busy-loop directly (fork child's worker thread)
- `tests/test_infected_asyncio.py::test_aio_simple_error` Mode-A
  deadlock — busy-loop wedged trio's scheduler inside `start_guest_run`,
  both threads parked in `epoll_wait`, no TCP connect-back to parent
  ever happened.

Same patch fixes both. Restored 99.7% pass rate on full
suite under `--spawn-backend=main_thread_forkserver`
(was hanging indefinitely before).

Wired into `tractor._child._actor_child_main` via `apply_all()` BEFORE
any trio runtime init. Harmless on non-fork backends.

Conc-anal write-ups, including strace + py-spy evidence:

- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`

Regression tests in `tests/trionics/test_patches.py`: each test asserts
(a) the bug exists pre-patch (or is fixed upstream — skip cleanly), (b)
the patch fixes it with a SIGALRM wall-clock cap so a regression hangs
loud instead of silently.

TODO:
- [ ] file the upstream `python-trio/trio` issue + PR.
- [ ] use the `repro()` callable in `_wakeup_socketpair.py` IS the issue
      body's evidence section.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-04 12:18:03 -04:00
Gud Boi e9712dcaeb Add `tractor.spawn._reap.unlink_uds_bind_addrs()`
Inside a new new `tractor.spawn._reap` submod which kicks off providing
post-mortem subactor cleanup primitives, parent-side; consider it the
"sibling" of `tractor._testing._reap` which is the test-harness-oriented
brother mod.

Today: `unlink_uds_bind_addrs()` provides a starter bug-fix for #454
where `hard_kill()`'s `SIGKILL` bypasses the subactor's
`_serve_ipc_eps`-`finally:` `os.unlink(addr.sockpath)`, leaking
`${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock` files..

This adds 2 cleanup paths:
- explicit `bind_addrs` (when set at spawn time),
OR
- convention-based reconstruction from `subactor.aid.name + proc.pid`
  for the random-self-assign case.

`.spawn.hard_kill()` now invokes the cleanup unconditionally
post-`SIGKILL`; graceful-exit case is a no-op via `FileNotFoundError`
skip.

Future work — authoritative tracking via a per-process
UDS bind-addr registry — documented in module docstring,
deferred to a follow-up PR.

Co-fix: `tractor/spawn/_trio.py::new_proc` already passes
`bind_addrs` + `subactor` to `hard_kill` via prior work
on this branch.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-04 11:13:59 -04:00
Gud Boi 5cf0312c78 Add per-test runaway-subactor CPU detector to `_reap`
New `find_runaway_subactors()` helper + autouse
`_detect_runaway_subactors_per_test` fixture that
samples `psutil.cpu_percent()` on descendants to
catch tight-loop bugs (e.g. #452-class `recvfrom`
on a closed socket). Checks both at setup
(leftovers from a prior hung test) and teardown
(spawned by this test).

Intentionally does NOT kill the runaway — emits
a loud warning with diag commands (`strace`,
`lsof`, `ss`, `kill`) so the pid stays alive for
hands-on investigation. Session-end reaper still
SIGINT/SIGKILL survivors on normal exit.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-04 10:15:55 -04:00
Gud Boi 32e89c67ee Fix `maybe_override_capture` to not get invalid capX fixture names.. 2026-05-04 10:07:57 -04:00
Gud Boi d549c72052 Add fork-aware capture fixtures to `_testing.pytest`
Extend the pytest plugin with helpers that detect
and adapt to `--capture=sys` under fork-based
spawners (`main_thread_forkserver`, `mp_forkserver`)
where fd-capture causes hangs.

Deats,
- track `_cap_sys_passed_as_flag` + `_cap_fd_set`
  globals in `pytest_load_initial_conftests()`.
- add `@pytest.hookimpl(tryfirst=True)` + re-parse
  args after appending `--capture=sys`.
- `_is_forking_spawner()` predicate + fixture.
- `maybe_xfail_for_spawner()` — enalbes skipping tests that need capsys
  but weren't passed `--capture=sys`.
- `set_fork_aware_capture` fixture — returns the appropriate capture
  fixture per spawner backend based on `start_method: str` set via CLI.
- wire `set_fork_aware_capture` into `tractor_test`
  wrapper's fixture injection.

Also,
- add `alert_on_finish` session fixture (terminal
  bell on completion; tho not sure it works fully..)
- add `ids=` to `start_method` parametrize.
- restore `default=False` on `--enable-stackscope`.
- drop commented-out `--ll` option block; we will likely factor it to
  our plugin eventually however..

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-02 01:09:02 -04:00
Gud Boi 5a9926fc32 Adjust `test_shield_pause` for capsys backends
Under `main_thread_forkserver` the bootstrapping
hook switches to `--capture=sys`, so subactor
fd-level output (tree dumps, zombie-reaper msgs)
isn't captured per-test by pexpect. Gate those
expects behind a `no_capfd` check so the test
passes on both capture modes.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-01 19:08:55 -04:00
Gud Boi 72a0465c52 Default `--ll` to `None` in test harness
Only override `tractor.log._default_loglevel` when
the flag is explicitly passed — lets per-spawn and
per-example `loglevel` kwargs take effect instead
of being clobbered by the hard-coded `'ERROR'`
default.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-01 00:18:18 -04:00
Gud Boi 9431a81d37 Update debug examples + harden `test_debugger`
Pass explicit `loglevel` to `spawn()` calls in
`test_debugger` tests — required for pexpect
pattern matching now that examples no longer
hard-code log levels.

Also,
- make `expect()` return the decoded `before` str.
- add `start_method` param + fork-backend timeout
  slack (+4s) in nested-error test.
- clean up debug examples: drop unused loglevels,
  rename `n` -> `an`, fix docstrings, add TODO
  comments for tpt parametrize via osenv.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-01 00:13:22 -04:00
23 changed files with 1720 additions and 70 deletions

View File

@ -0,0 +1,378 @@
# `infect_asyncio` × `main_thread_forkserver` Mode-A deadlock
## Reproducer
```bash
./py313/bin/python -m pytest \
tests/test_infected_asyncio.py::test_aio_simple_error \
--tpt-proto=tcp \
--spawn-backend=main_thread_forkserver \
-v --capture=sys
```
Hangs indefinitely. Mode-A signature — both processes
parked in `epoll_wait`, **neither burning CPU**.
## Empirical observations (caught alive)
### Outer pytest (parent)
`py-spy dump` on the test runner pid shows the trio
event loop parked at the bottom of `trio.run()`:
```
Thread <pid> (idle): "MainThread"
get_events (trio/_core/_io_epoll.py:245)
self: <EpollIOManager at 0x...>
timeout: 86400
run (trio/_core/_run.py:2415)
next_send: []
timeout: 86400
test_aio_simple_error (tests/test_infected_asyncio.py:175)
```
`timeout: 86400` is trio's "no scheduled work, just wait
for I/O forever" sentinel. `next_send: []` confirms
nothing is queued. The parent is stuck inside
`tractor.open_nursery(...).run_in_actor(...)` waiting
for `ipc_server.wait_for_peer(uid)` to fire — i.e.
waiting for the spawned subactor to connect back.
### Subactor (forked child)
`/proc/<pid>/stack`:
```
do_epoll_wait+0x4c0/0x500
__x64_sys_epoll_wait+0x70/0x120
do_syscall_64+0xef/0x1540
entry_SYSCALL_64_after_hwframe+0x77/0x7f
```
`strace -p <pid> -f`:
```
[pid <child-A>] epoll_wait(6 <unfinished ...>
[pid <child-B>] epoll_wait(3
```
**Two threads**, both parked in `epoll_wait` on
distinct epoll fds. Both blocked, neither making
progress.
### Subactor file-descriptor table
```
fd=0,1,2 stdio
fd=3 eventpoll [watches fd 4]
fd=4 ↔ fd=5 unix STREAM (CONNECTED) — internal pair
fd=6 eventpoll [watches fds 7, 9]
fd=7 ↔ fd=8 unix STREAM (CONNECTED) — internal pair
fd=9 ↔ fd=10 unix STREAM (CONNECTED) — internal pair
```
Confirmed via `ss -xp` peer-inode lookup: **all 6 unix
sockets are internal socketpairs** (peer in same pid).
**Critical**: zero TCP/IPv4/IPv6 sockets, despite
`--tpt-proto=tcp`:
```
$ sudo lsof -p <subactor> | grep -iE 'TCP|IPv'
(empty)
$ sudo ss -tnp | grep <subactor>
(empty)
```
**The subactor never opened a TCP connection back to
the parent.**
## Diagnosis
The subactor reaches `_actor_child_main`
`_trio_main(actor)`
`run_as_asyncio_guest(trio_main)`. Code path
(`tractor.spawn._entry`):
```python
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main) # ← this branch
else:
trio.run(trio_main)
```
`run_as_asyncio_guest` (`tractor.to_asyncio`):
```python
def run_as_asyncio_guest(trio_main, ...):
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fute = asyncio.Future()
...
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
out = await asyncio.shield(trio_done_fute)
return out.unwrap()
...
return asyncio.run(aio_main(trio_main))
```
Expected flow:
1. `asyncio.run(aio_main(...))` — boots fresh asyncio
loop in calling thread.
2. `aio_main` calls `trio.lowlevel.start_guest_run(...)`
— initializes trio's I/O manager, schedules first
trio slice via `loop.call_soon_threadsafe`.
3. asyncio loop dispatches the callback → trio runs a
slice → yields back via `call_soon_threadsafe`.
4. Trio's `async_main` (the user function) runs →
`Channel.from_addr(parent_addr)` → TCP connect to
parent.
What we observe instead:
- 2 threads in `epoll_wait` (one trio epoll, one
asyncio epoll, both inactive)
- 6 unix-socket fds (3 socketpairs: trio
wakeup-fd-pair, asyncio wakeup-fd-pair, trio kicker
socketpair)
- ZERO TCP — `Channel.from_addr` never ran
Most likely cause: **trio's guest-run scheduling
callback didn't get dispatched by asyncio's loop in
the forked child**, so trio's `async_main` never
executes past trio bootstrap, and the
parent-IPC-connect step is never reached.
## Fork-survival risk surface (hypothesis)
`trio.lowlevel.start_guest_run` builds Python-level
closures + signal handlers + wakeup-fd registrations
that depend on:
- The asyncio event loop's `call_soon_threadsafe`
thread-id matching the loop owner thread.
- Process-wide signal-wakeup-fd state
(`signal.set_wakeup_fd`).
- Trio's `KIManager` SIGINT handler.
Under `main_thread_forkserver`, the fork happens from
a worker thread that has **never entered trio**
(intentional — trio-free launchpad). But the FORKED
child then tries to bring up BOTH asyncio AND
trio-as-guest fresh from this trio-free thread. The
asyncio loop boots fine; trio's `start_guest_run`
initializes BUT the cross-loop dispatch (asyncio
queue → trio slice) appears to silently fail to wire
up.
Two more hypotheses worth probing:
1. **Wakeup-fd contention**: asyncio installs
`signal.set_wakeup_fd(<own_pair>)`. trio's
guest-run also wants a wakeup-fd. Whoever installs
second wins; the loser's `epoll_wait` no longer
wakes on signals. Combined with the `asyncio.shield(
trio_done_fute)` + `asyncio.CancelledError`
handling in `run_as_asyncio_guest`, a missed signal
delivery could explain the indefinite park.
2. **Trio kicker socketpair race**: trio's I/O manager
uses an internal `socket.socketpair()` to "kick"
itself out of `epoll_wait` when a non-IO task needs
scheduling. In guest mode, the kicker is still
present but is supposed to be triggered via the
asyncio dispatch. If the kicker write never gets
issued by asyncio's callback, trio's epoll never
wakes.
## Confirmed via py-spy (live capture)
After detaching `strace` (ptrace is exclusive — that's
why `py-spy` returns EPERM if strace is attached):
```
Thread <pid> (idle): "main-thread-forkserver[asyncio_actor]"
select (selectors.py:452) # asyncio epoll
_run_once (asyncio/base_events.py:2012)
run_forever (asyncio/base_events.py:683)
run_until_complete (asyncio/base_events.py:712)
run (asyncio/runners.py:118)
run (asyncio/runners.py:195)
run_as_asyncio_guest (tractor/to_asyncio.py:1770)
_trio_main (tractor/spawn/_entry.py:160)
_actor_child_main (tractor/_child.py:72)
_child_target (tractor/spawn/_main_thread_forkserver.py:910)
_worker (tractor/spawn/_main_thread_forkserver.py:605)
[thread bootstrap]
Thread <pid+1> (idle): "Trio thread 14"
get_events (trio/_core/_io_epoll.py:245) # trio epoll
get_events (trio/_core/_run.py:1678)
capture (outcome/_impl.py:67)
_handle_job (trio/_core/_thread_cache.py:173)
_work (trio/_core/_thread_cache.py:196)
[thread bootstrap]
```
This data **rewrites the diagnosis**: trio guest-run
isn't broken across the fork — it's working as designed.
The two threads ARE the canonical guest-run architecture:
1. **Asyncio main loop** runs in the lead thread. Parked
in `selectors.EpollSelector.select(timeout=-1)`
waiting indefinitely for ANY callback to be queued.
2. **Trio's I/O manager** offloads `get_events`
(`epoll_wait`) onto a `trio._core._thread_cache`
worker thread. The worker calls
`outcome.capture(get_events)` and parks in
`epoll_wait(timeout=86400)`.
3. When trio I/O fires (or its kicker socketpair gets a
write), the worker returns from `epoll_wait`,
delivers the result via `_handle_job`'s `deliver`
callback, which schedules the next trio slice on
asyncio via `loop.call_soon_threadsafe`.
The fact that the trio thread is *already* in
`_thread_cache._handle_job` doing `capture(get_events)`
means **trio's scheduler HAS started** — the bridge
asyncio↔trio is wired correctly post-fork.
So `async_main` DID run far enough to register some
trio task that's now awaiting I/O. The question
becomes: **what is `async_main` waiting on?**
Process state confirms it's NOT waiting on the TCP
connect to parent:
```
$ sudo lsof -p <subactor> | grep -iE 'TCP|IPv'
(empty)
$ sudo ss -tnp | grep <subactor>
(empty)
```
`Channel.from_addr(parent_addr)` — the very first
thing `async_main` does — was never reached, OR was
reached but errored before `socket()` was called. The
parent (running `ipc_server.wait_for_peer`) waits
forever for the connection; it never comes.
## Refined hypothesis
`async_main` is stalled in some PRE-`Channel.from_addr`
checkpoint. Candidates:
1. **`get_console_log` / logger init** — called early in
`_trio_main` if `actor.loglevel is not None`. Logging
setup involves file/handler init that could block on
something fork-inherited (e.g. a stale lock).
2. **`debug.maybe_init_greenback`** — `start_guest_run`
includes a check (`if debug_mode(): assert 0` —
currently asserts unsupported). For non-debug mode
this is bypassed but related machinery may run.
3. **Stackscope SIGUSR1 handler install** — gated on
`_debug_mode` OR `TRACTOR_ENABLE_STACKSCOPE` env-var.
The `enable_stack_on_sig()` path captures a trio
token via `trio.lowlevel.current_trio_token()`
could block under guest mode.
4. **Initial `await trio.sleep(0)` / first checkpoint**
in `async_main` before reaching the
`Channel.from_addr` line. Under guest mode, if the
FIRST `call_soon_threadsafe` callback never gets
processed by asyncio, trio's first slice never
completes — but the worker thread WOULD still be in
`epoll_wait` having been started by trio's I/O
manager init.
## Confirming `async_main`'s parked location
Add temporary logging at the top of `Actor.async_main`:
```python
# tractor/runtime/_runtime.py around line 855
async def async_main(self, parent_addr=None):
log.devx('async_main: ENTERED') # marker A
try:
log.devx('async_main: pre-Channel.from_addr') # marker B
chan = await Channel.from_addr(
addr=wrap_address(parent_addr)
)
log.devx('async_main: post-Channel.from_addr') # marker C
...
```
Re-run the test with `--ll=devx`. The last marker logged
tells us exactly where `async_main` parked. If only A
fires, the issue is between A and B (logger init,
stackscope, etc.). If A and B fire but not C, it's in
`Channel.from_addr` (DNS, socket creation, connect).
## Related sibling bug
`tests/test_multi_program.py::test_register_duplicate_name`
hangs under the same backend with a DIFFERENT
fingerprint:
- Subactor at 100% CPU (busy-loop), not parked
- `recvfrom(6, "", 65536, 0, NULL, NULL) = 0` repeating
with no `epoll_wait` in between
- fd=6 is one of trio's internal AF_UNIX
socketpair fds (the kicker mechanism)
Distinct root cause — possibly trio's kicker socketpair
inheriting a half-closed state across the fork — but
shares the broader theme: **trio internal-state
initialization isn't fully fork-safe under
`main_thread_forkserver`** for the more exotic
dispatch paths.
## Workarounds (until fix lands)
1. **Skip-mark on the fork backend** — temporarily mark
`tests/test_infected_asyncio.py` with
`pytest.mark.skipon_spawn_backend('main_thread_forkserver',
reason='infect_asyncio + fork interaction broken,
see ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md')`.
Lets the rest of the test suite run green while
this is being fixed properly.
2. **Run infected-asyncio tests under the `trio`
backend only** — they don't exercise fork
semantics, so they won't hit this bug.
## Investigation next steps
In rough priority:
1. Catch the hang alive again, **detach strace**,
`py-spy --locals` the subactor — confirm trio
thread is NOT yet at `async_main`.
2. Diff `start_guest_run` setup pre-fork vs post-fork
by adding `log.devx()` markers in
`tractor.to_asyncio.run_as_asyncio_guest::aio_main`
at:
- asyncio loop bringup
- immediately before `start_guest_run`
- immediately after `start_guest_run`
- inside the `trio_done_callback` registration
3. Check whether the asyncio loop dispatches ANY
callbacks in the forked child — instrument
`loop.call_soon_threadsafe` (e.g. monkey-patch
`loop._call_soon` to log).
4. If steps 13 confirm that asyncio's queue is
stuck, look at whether the asyncio event-loop
policy or selector is being inherited from a
pre-fork (parent-process) state in a way that
breaks the new loop.
## See also
- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella
- [#451](https://github.com/goodboy/tractor/issues/451) — Mode-A cancel-cascade hang
- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md`
- `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`
- python-trio/trio#1614 — trio + fork hazards

View File

@ -0,0 +1,221 @@
# trio `WakeupSocketpair.drain()` busy-loop in forked child (peer-closed missed-EOF)
## Reproducer
```bash
./py313/bin/python -m pytest \
tests/test_multi_program.py::test_register_duplicate_name \
--tpt-proto=tcp \
--spawn-backend=main_thread_forkserver \
-v --capture=sys
```
Subactor pegs a CPU core indefinitely; parent test
hangs waiting for the subactor.
## Empirical evidence (caught alive)
```
$ sudo strace -p <subactor-pid>
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
... (no `epoll_wait`, no other syscalls, just this back-to-back)
```
Pattern: tight C-level `recvfrom` loop returning 0
each call. No `epoll_wait` between iterations →
**not trio's task scheduler**. Pure synchronous C
loop.
```
$ sudo readlink /proc/<subactor-pid>/fd/6
socket:[<inode>]
$ sudo lsof -p <subactor-pid> | grep ' 6u'
<cmd> <pid> goodboy 6u unix 0xffff... 0t0 <inode> type=STREAM (CONNECTED)
```
fd=6 is an **AF_UNIX socket** in CONNECTED state.
Even though the test uses `--tpt-proto=tcp`, this fd
is NOT a tractor IPC channel — it's an internal
trio socketpair.
## Root-cause: `WakeupSocketpair.drain()`
`/site-packages/trio/_core/_wakeup_socketpair.py`:
```python
class WakeupSocketpair:
def __init__(self) -> None:
self.wakeup_sock, self.write_sock = socket.socketpair()
self.wakeup_sock.setblocking(False)
self.write_sock.setblocking(False)
...
def drain(self) -> None:
try:
while True:
self.wakeup_sock.recv(2**16)
except BlockingIOError:
pass
```
`socket.socketpair()` on Linux defaults to AF_UNIX
SOCK_STREAM. Both ends non-blocking. Normal flow:
1. Signal/wake event → `write_sock.send(b'\x00')`
queues a byte.
2. `wakeup_sock` becomes readable → trio's epoll
triggers.
3. Trio calls `drain()` to flush the buffer.
4. drain loops on `wakeup_sock.recv(64KB)`.
5. Eventually buffer empty → non-blocking socket
raises `BlockingIOError` → except → break.
**Bug surface — peer-closed missed-EOF**:
Non-blocking socket semantics:
- buffer has data → `recv` returns N>0 bytes (loop continues)
- buffer empty → `recv` raises `BlockingIOError`
- **peer FIN'd → `recv` returns 0 bytes (NEITHER exception NOR
break — infinite tight loop)**
`drain()` does not handle the `b''` return-value
(EOF) case. If `write_sock` has been closed (or the
process holding it is gone), every iteration returns
0 → infinite loop → 100% CPU on a single core.
## Why this triggers under `main_thread_forkserver`
Under `os.fork()` from the forkserver-worker thread:
1. Parent has a `WakeupSocketpair` instance with
`wakeup_sock=fdN`, `write_sock=fdM`. Both fds
open in parent.
2. Fork → child inherits BOTH fds (kernel-level fd
table dup).
3. `_close_inherited_fds()` runs in child →
closes everything except stdio. `wakeup_sock` and
`write_sock` of the parent's `WakeupSocketpair`
ARE closed in child.
4. Child's trio (running fresh) creates its OWN
`WakeupSocketpair` → NEW fd numbers (e.g. fd 6, 7).
5. **In `infect_asyncio` mode** the asyncio loop is
the host; trio runs as guest via
`start_guest_run`. trio still creates its
`WakeupSocketpair` in the I/O manager but its
role is different.
The race window: somewhere between (3) and (5), if a
`WakeupSocketpair` Python object reference inherited
via COW (from parent's pre-fork heap) survives long
enough that `drain()` is called on it AFTER its fds
were closed but BEFORE the child's NEW socketpair
takes over the recycled fd numbers — the recycled fd
will be one of the child's NEW socketpair ends, whose
peer might be FIN-flagged (e.g. parent-process
peer-end is closed).
Or simpler: the `wait_for_actor`/`find_actor` discovery
flow in `test_register_duplicate_name` triggers an
unusual code path where a stale `WakeupSocketpair`
gets `drain()`-called on a fd whose peer has already
closed.
## Why `drain()` shouldn't loop indefinitely on EOF
(upstream trio bug)
Even WITHOUT fork, `drain()` should treat `b''` as
EOF and break. The current code is correct for the
"buffer drained on a healthy socketpair" scenario but
incorrect for the "peer is gone" scenario. It's a
defensive-programming gap in trio.
A one-line patch upstream:
```python
def drain(self) -> None:
try:
while True:
data = self.wakeup_sock.recv(2**16)
if not data:
break # peer-closed; nothing more to drain
except BlockingIOError:
pass
```
## Workarounds (until the underlying issue lands)
1. **Skip-mark on the fork backend**:
`tests/test_multi_program.py`
`pytest.mark.skipon_spawn_backend('main_thread_forkserver',
reason='trio WakeupSocketpair.drain busy-loop, see ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md')`.
2. **Defensive monkey-patch in tractor's
forkserver-child prelude** — wrap
`WakeupSocketpair.drain` to handle `b''`:
```python
# in `_actor_child_main` or `_close_inherited_fds`'s
# post-fork prelude:
from trio._core._wakeup_socketpair import WakeupSocketpair
_orig_drain = WakeupSocketpair.drain
def _safe_drain(self):
try:
while True:
data = self.wakeup_sock.recv(2**16)
if not data:
return # peer closed
except BlockingIOError:
pass
WakeupSocketpair.drain = _safe_drain
```
Tracks upstream — remove once trio fixes.
3. **Upstream the fix**: 1-line PR to `python-trio/trio`
adding `if not data: break` to `drain()`.
## Investigation next steps
1. **Confirm via py-spy**: when caught alive, detach
strace first then
`sudo py-spy dump --pid <subactor> --locals`. The
busy thread should show `drain` from `WakeupSocketpair`
in the call chain.
2. **Identify which write-end peer is closed**: from
the inode of fd 6, look up the matching peer
inode via `ss -xp` and see whose process it
was/is.
3. **Verify the missed-EOF hypothesis**: hand-craft a
minimal `WakeupSocketpair` repro:
```python
from trio._core._wakeup_socketpair import WakeupSocketpair
ws = WakeupSocketpair()
ws.write_sock.close() # simulate peer-gone
ws.drain() # should hang forever
```
## Sibling bug
`tests/test_infected_asyncio.py::test_aio_simple_error`
hangs under the same backend with a DIFFERENT
fingerprint (Mode-A deadlock, both parties in
`epoll_wait`, no busy-loop). Distinct root cause —
see `infected_asyncio_under_main_thread_forkserver_hang_issue.md`.
Both share the broader theme: **trio internal-state
initialization isn't fully fork-safe under
`main_thread_forkserver`** for the more exotic
dispatch paths.
## See also
- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella
- python-trio/trio#1614 — trio + fork hazards
- `trio._core._wakeup_socketpair.WakeupSocketpair`
source (the smoking gun)
- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md`
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`

View File

@ -27,12 +27,9 @@ async def main():
''' '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='cancel', ) as an:
# loglevel='devx', p0 = await an.start_actor('bp_forever', enable_modules=[__name__])
) as n: p1 = await an.start_actor('name_error', enable_modules=[__name__])
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
p1 = await n.start_actor('name_error', enable_modules=[__name__])
# retreive results # retreive results
async with p0.open_stream_from(breakpoint_forever) as stream: async with p0.open_stream_from(breakpoint_forever) as stream:

View File

@ -67,7 +67,7 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
# loglevel='cancel', loglevel='pdb',
) as n: ) as n:
# spawn both actors # spawn both actors

View File

@ -39,8 +39,8 @@ async def main():
''' '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='devx', enable_transports=['uds'], # TODO, apss this via osenv?
enable_transports=['uds'], loglevel='devx', # XXX, required for test!
) as n: ) as n:
# spawn both actors # spawn both actors

View File

@ -1,4 +1,3 @@
import trio import trio
import tractor import tractor
@ -9,16 +8,22 @@ async def key_error():
async def main(): async def main():
"""Root dies '''
Root is fail-after-cancelled while blocking and child RPC fails
simultaneously.
""" '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='debug' # loglevel='debug' # ?XXX required?
) as n: ) as n:
# spawn both actors # spawn both actors
portal = await n.run_in_actor(key_error) portal = await n.run_in_actor(key_error)
print(
f'Child is up @ {portal.chan.aid.reprol()}'
)
# XXX: originally a bug caused by this is where root would enter # XXX: originally a bug caused by this is where root would enter
# the debugger and clobber the tty used by the repl even though # the debugger and clobber the tty used by the repl even though

View File

@ -36,6 +36,11 @@ async def just_bp(
async def main(): async def main():
# !TODO, parametrize the --tpt-proto={key} with osenv vars just
# like we do for loglevel/spawn-backend!
# - [ ] run on both tpts for all such debugger tests?
# - [ ] special skip for macos!
#
if platform.system() != 'Darwin': if platform.system() != 'Darwin':
tpt = 'uds' tpt = 'uds'
else: else:

View File

@ -9,7 +9,6 @@ async def name_error():
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
# loglevel='transport',
) as an: ) as an:
# TODO: ideally the REPL arrives at this frame in the parent, # TODO: ideally the REPL arrives at this frame in the parent,

View File

@ -135,25 +135,30 @@ def pytest_addoption(
"--ll", "--ll",
action="store", action="store",
dest='loglevel', dest='loglevel',
default='ERROR', help="logging level to set when testing" default=None,
help="logging level to set when testing",
) )
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
def loglevel( def loglevel(
request: pytest.FixtureRequest, request: pytest.FixtureRequest,
) -> str: ) -> str|None:
import tractor import tractor
orig = tractor.log._default_loglevel orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel flag_level: str|None = request.config.option.loglevel
if flag_level is not None:
tractor.log._default_loglevel = flag_level
log = tractor.log.get_console_log( log = tractor.log.get_console_log(
level=level, level=flag_level,
name='tractor', # <- enable root logger name='tractor', # <- enable root logger
) )
log.info( log.info(
f'Test-harness set runtime loglevel: {level!r}\n' f'Test-harness set runtime loglevel: {flag_level!r}\n'
) )
yield level yield flag_level
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig

View File

@ -269,13 +269,10 @@ def ctlc(
def expect( def expect(
child, child,
patt: str, # often a `pdbp`-prompt
# normally a `pdb` prompt by default
patt: str,
**kwargs, **kwargs,
) -> None: ) -> str:
''' '''
Expect wrapper that prints last seen console Expect wrapper that prints last seen console
data before failing. data before failing.
@ -286,6 +283,8 @@ def expect(
patt, patt,
**kwargs, **kwargs,
) )
before = str(child.before.decode())
return before
except TIMEOUT: except TIMEOUT:
before = str(child.before.decode()) before = str(child.before.decode())
print(before) print(before)

View File

@ -765,6 +765,7 @@ def test_multi_subactors_root_errors(
def test_multi_nested_subactors_error_through_nurseries( def test_multi_nested_subactors_error_through_nurseries(
ci_env: bool, ci_env: bool,
spawn: PexpectSpawner, spawn: PexpectSpawner,
start_method: str,
# TODO: address debugger issue for nested tree: # TODO: address debugger issue for nested tree:
# https://github.com/goodboy/tractor/issues/320 # https://github.com/goodboy/tractor/issues/320
@ -781,16 +782,16 @@ def test_multi_nested_subactors_error_through_nurseries(
# A test (below) has now been added to explicitly verify this is # A test (below) has now been added to explicitly verify this is
# fixed. # fixed.
child = spawn('multi_nested_subactors_error_up_through_nurseries') child = spawn(
'multi_nested_subactors_error_up_through_nurseries',
# timed_out_early: bool = False loglevel='pdb',
)
for ( for (
i, i,
send_char, send_char,
) in enumerate(itertools.cycle(['c', 'q'])): ) in enumerate(itertools.cycle(['c', 'q'])):
timeout: float = -1 timeout: float = child.timeout
if ( if (
_non_linux _non_linux
and and
@ -803,6 +804,15 @@ def test_multi_nested_subactors_error_through_nurseries(
elif i == 0: elif i == 0:
timeout = 5 timeout = 5
# XXX forking backends may take longer due to
# determinstic IPC cancellation.
if (
start_method in [
'main_thread_forkserver',
]
):
timeout += 4
try: try:
child.expect( child.expect(
PROMPT, PROMPT,
@ -1187,7 +1197,11 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
mashed and zombie reaper kills sub with no hangs. mashed and zombie reaper kills sub with no hangs.
''' '''
child = spawn('subactor_bp_in_ctx') child = spawn(
'subactor_bp_in_ctx',
loglevel='devx'
# ^XXX REQUIRED for below patt matching!
)
child.expect(PROMPT) child.expect(PROMPT)
# 3 iters for the `gen()` pause-points # 3 iters for the `gen()` pause-points
@ -1277,7 +1291,11 @@ def test_crash_handling_within_cancelled_root_actor(
call. call.
''' '''
child = spawn('root_self_cancelled_w_error') child = spawn(
'root_self_cancelled_w_error',
loglevel='cancel',
# ^XXX REQUIRED for below patt matching!
)
child.expect(PROMPT) child.expect(PROMPT)
assert_before( assert_before(

View File

@ -52,6 +52,8 @@ def test_shield_pause(
..., ...,
PexpectSpawner, PexpectSpawner,
], ],
start_method: str,
request: pytest.FixtureRequest,
): ):
''' '''
Verify the `tractor.pause()/.post_mortem()` API works inside an Verify the `tractor.pause()/.post_mortem()` API works inside an
@ -119,6 +121,21 @@ def test_shield_pause(
# (above) and `end-of-('hanger'` (below). # (above) and `end-of-('hanger'` (below).
handle_out_of_order: bool = False handle_out_of_order: bool = False
# XXX, when capfd is NOT used we don't expect to
# see the logging output from the subactor.
if (no_capfd := (start_method in [
'main_thread_forkserver',
])
):
opts = request.config.option
assert opts.spawn_backend == start_method
# ?XXX? i guess the `testdir` fixture "pretends to" reset
# this to the default 'fd'??
# assert opts.capture in [
# 'sys',
# 'no',
# ]
if ( if (
handle_out_of_order handle_out_of_order
and and
@ -128,25 +145,30 @@ def test_shield_pause(
assert 'Relaying `SIGUSR1`[10] to sub-actor' in _before assert 'Relaying `SIGUSR1`[10] to sub-actor' in _before
else: else:
expect( _before = expect(
child, child,
'Relaying `SIGUSR1`\\[10\\] to sub-actor', 'Relaying `SIGUSR1`\\[10\\] to sub-actor',
) )
expect( # _before: str = assert_before(
child, # child,
# end-of-subactor's-tree delimiter # ["('hanger'",] # uid line
"end-of-\('hanger'", # )
) if not no_capfd:
_before: str = assert_before( expect(
child, child,
[ # end-of-subactor's-tree delimiter
"('hanger'", # uid line "end-of-\('hanger'",
)
_before: str = assert_before(
child,
[
"('hanger'", # uid line
# TODO!? SEE ABOVE # TODO!? SEE ABOVE
# hanger LOC where it's shield-halted # hanger LOC where it's shield-halted
# 'await trio.sleep_forever() # in subactor', # 'await trio.sleep_forever() # in subactor',
] ]
) )
# simulate the user sending a ctl-c to the hanging program. # simulate the user sending a ctl-c to the hanging program.
@ -163,14 +185,19 @@ def test_shield_pause(
_shutdown_msg, _shutdown_msg,
timeout=6, timeout=6,
) )
assert_before( expect_on_teardown: list[str] = [
child, 'raise KeyboardInterrupt',
[ 'Root actor terminated',
'raise KeyboardInterrupt', ]
if not no_capfd:
expect_on_teardown += [
# 'Shutting down actor runtime', # 'Shutting down actor runtime',
'#T-800 deployed to collect zombie B0', '#T-800 deployed to collect zombie B0',
"'--uid', \"('hanger',", "'--uid', \"('hanger',",
] ]
assert_before(
child,
expect_on_teardown,
) )

View File

View File

@ -0,0 +1,99 @@
'''
Regression tests for `tractor.trionics.patches`
defensive monkey-patches on upstream `trio` bugs.
Each test asserts:
1. The bug exists (or is gone skip cleanly if
upstream shipped the fix and our `is_needed()` now
returns `False`).
2. Our patch fixes it (post-`apply()` the `repro()`
returns cleanly within a tight wall-clock cap).
Wall-clock caps are critical here the bugs we patch
are tight-loops or deadlocks, so a regression would
HANG the test runner unless we hard-cap each
`repro()` call.
'''
import signal
import pytest
from tractor.trionics import patches
from tractor.trionics.patches import _wakeup_socketpair as wsp
@pytest.fixture(autouse=True)
def _alarm_cleanup():
'''
Ensure no leftover SIGALRM survives a test failure
or unexpected return.
'''
yield
signal.alarm(0)
def test_wakeup_socketpair_drain_eof_patch_works():
'''
Without the patch, `WakeupSocketpair.drain()` on a
socketpair whose write-end has been closed spins
forever. With the patch applied, it returns
cleanly within milliseconds.
Wall-clock cap: 2s. If the patch regresses, SIGALRM
fires and the test hard-fails with a clear signal
instead of hanging CI indefinitely.
'''
if not wsp.is_needed():
pytest.skip(
'upstream trio shipped the fix — '
'patch no longer needed for trio '
'(see `is_needed()` for version gate)'
)
# Apply the patch.
applied: bool = wsp.apply()
# First call MUST return True; idempotent guard
# prevents False on subsequent calls within the
# same process.
assert applied is True or applied is False # idempotent
# Cap wall-clock at 2s; SIGALRM raises in main
# thread which interrupts the C-level recv loop
# IF the patch regresses (since `signal.alarm`
# uses Python's signal-wakeup-fd which the patch
# itself relies on... but `repro()` runs OUTSIDE
# a trio.run, so it's plain stdlib semantics here
# — alarm WILL fire during `recv` syscall).
signal.alarm(2)
wsp.repro()
signal.alarm(0)
def test_apply_all_idempotent():
'''
Calling `apply_all()` twice should not double-
apply: second call's dict has all-False values
(every patch reports "already applied").
'''
first: dict[str, bool] = patches.apply_all()
second: dict[str, bool] = patches.apply_all()
# Second call: every patch reports skipped.
assert all(v is False for v in second.values()), (
f'apply_all() not idempotent: {second}'
)
# First call: at least one patch was applied
# (or all are no-ops because `is_needed()` is
# False everywhere — the all-fixed-upstream future
# state which is also valid).
assert isinstance(first, dict)
for name, applied in first.items():
assert isinstance(applied, bool), (
f'patch {name!r} returned non-bool: {applied!r}'
)

View File

@ -63,6 +63,14 @@ def _actor_child_main(
sub-interpreter via `Interpreter.call()`. sub-interpreter via `Interpreter.call()`.
''' '''
# Apply defensive monkey-patches for upstream `trio`
# bugs we've encountered while running tractor — see
# `tractor.trionics.patches` for the catalog +
# per-patch upstream-fix tracking. Must run BEFORE
# any trio runtime init.
from .trionics.patches import apply_all
apply_all()
subactor = Actor( subactor = Actor(
name=uid[0], name=uid[0],
uuid=uid[1], uuid=uid[1],

View File

@ -218,6 +218,65 @@ def find_descendants(
] ]
def find_runaway_subactors(
parent_pid: int,
*,
cpu_threshold: float = 95.0,
sample_interval: float = 0.5,
only_pids: set[int]|None = None,
) -> list[tuple[int, float, str]]:
'''
Return `(pid, cpu_pct, cmdline)` for any descendant
of `parent_pid` currently burning CPU above
`cpu_threshold` (default 95%) the smoking-gun
signature of a runaway tight-loop bug (e.g. a C-level
`recvfrom` loop on a closed socket that missed EOF
detection see
`ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`).
`cpu_percent(interval=sample_interval)` is the
canonical psutil API for a "what %CPU is this proc
using NOW" answer — it samples twice with a delta to
compute true utilization.
`only_pids` filters to a specific pre-snapshotted set
(e.g. "pids spawned during this test only"); when
`None`, all live descendants are checked.
Returns `[]` when `psutil` isn't installed or no
descendants match.
'''
try:
import psutil
except ImportError:
return []
candidates: list[int] = find_descendants(parent_pid)
if only_pids is not None:
candidates = [p for p in candidates if p in only_pids]
if not candidates:
return []
runaways: list[tuple[int, float, str]] = []
for pid in candidates:
try:
proc = psutil.Process(pid)
cpu: float = proc.cpu_percent(
interval=sample_interval,
)
if cpu < cpu_threshold:
continue
cmdline: str = ' '.join(proc.cmdline())
runaways.append((pid, cpu, cmdline))
except (
psutil.NoSuchProcess,
psutil.AccessDenied,
):
continue
return runaways
def find_orphans( def find_orphans(
repo_root: pathlib.Path, repo_root: pathlib.Path,
) -> list[int]: ) -> list[int]:
@ -679,7 +738,7 @@ def _track_orphaned_uds_per_test():
`wait_for_actor`/`find_actor` discovery probes can `wait_for_actor`/`find_actor` discovery probes can
accidentally hit (FileExistsError on rebind, or accidentally hit (FileExistsError on rebind, or
epoll register on a half-closed peer-FIN'd fd → see epoll register on a half-closed peer-FIN'd fd → see
issue #452). Catching the leak the test that caused issue #454). Catching the leak the test that caused
it (vs. blanket session-end sweep) makes blame it (vs. blanket session-end sweep) makes blame
obvious + prevents cascade flakiness. obvious + prevents cascade flakiness.
@ -728,14 +787,133 @@ def _track_orphaned_uds_per_test():
if new_orphans: if new_orphans:
import warnings import warnings
warnings.warn( warnings.warn(
f'UDS sock-file LEAK detected from test ' 'UDS sock-file LEAK detected from test '
f'(reaping):\n ' '(reaping):\n '
+ '\n '.join(new_orphans), + '\n '.join(new_orphans),
stacklevel=1, stacklevel=1,
) )
reap_uds(new_orphans) reap_uds(new_orphans)
@pytest.fixture(
scope='function',
autouse=True,
)
def _detect_runaway_subactors_per_test():
'''
Per-test (function-scoped) autouse runaway-subactor
detector.
Snapshots descendant pids before+after each test;
for any pid spawned during the test that's still
ALIVE at teardown AND burning >95% CPU, emits a loud
warning with `pid`, sampled `cpu%`, full `cmdline`,
AND copy-pastable diag commands (`strace`, `lsof`,
`ss`, `kill`).
**Does NOT kill the runaway** by design.
The point of this fixture is to make tight-loop bugs
(e.g. C-level `recvfrom` loop on a closed socket
that missed EOF detection see
`ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`)
loudly visible AT the test that triggers, while
keeping the live pid available for hands-on
diagnosis. The session-end
`_reap_orphaned_subactors` fixture will
SIGINT-then-SIGKILL any survivors when the test
session completes normally; if the user Ctrl-C's
pytest mid-warning, the pid stays alive for as long
as needed.
Cost: one extra `os.listdir('/proc')` snapshot
pre-test, one snapshot + N×`psutil.cpu_percent(0.5)`
post-test (only when there ARE new descendants
most tests don't trigger any sampling). Skips
silently when `psutil` isn't installed.
'''
parent_pid: int = os.getpid()
def _emit_runaway_warning(
runaways: list[tuple[int, float, str]],
when: str,
) -> None:
'''
Format + emit the runaway warning. Shared between
the SETUP-side (pre-yield, catches survivors of a
prior hung test) and TEARDOWN-side (post-yield,
catches normally-completing tests that left a
runaway behind) detection passes.
'''
msg_lines: list[str] = [
f'RUNAWAY subactor(s) detected at {when}'
f'burning CPU (>95%):',
]
for pid, cpu, cmdline in runaways:
msg_lines.extend([
f' pid={pid} cpu={cpu:.1f}% cmdline={cmdline!r}',
f' diagnose live (pid stays alive — NOT killed):',
f' sudo strace -p {pid} -f -tt -e trace=recvfrom,epoll_wait,read,write',
f' sudo readlink /proc/{pid}/fd/* 2>/dev/null | head -20',
f' sudo ss -tnp | grep {pid}',
f' sudo lsof -p {pid}',
f' manual kill when done:',
f' kill -SIGINT {pid} # graceful first',
f' kill -SIGKILL {pid} # if SIGINT ignored (busy in C)',
'',
])
import warnings
warnings.warn(
'\n'.join(msg_lines),
stacklevel=1,
)
# SETUP-side detection: catches runaways inherited
# from a PRIOR test that hung (and the user
# Ctrl-C'd or pytest-timeout fired) — those tests'
# teardown-side detector never ran, but the
# subactor is still burning CPU when the next test
# starts. The warning comes ONE TEST LATE which is
# imperfect but better than silence.
pre_existing: set[int] = set(find_descendants(parent_pid))
pre_runaways: list[tuple[int, float, str]] = (
find_runaway_subactors(
parent_pid,
only_pids=pre_existing,
)
)
if pre_runaways:
_emit_runaway_warning(
pre_runaways,
when='test SETUP (leftover from prior hung test)',
)
yield
# TEARDOWN-side detection: catches runaways spawned
# by THIS test that survived a normal teardown
# (i.e. parent's `hard_kill` SIGKILL didn't actually
# stop the runaway because it was in C tight-loop
# somewhere unreachable to signals — see
# `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
# for the canonical fork-spawn forkserver-worker
# post-fork-close gap).
post_runaways: list[tuple[int, float, str]] = (
find_runaway_subactors(
parent_pid,
only_pids=set(
find_descendants(parent_pid)
) - pre_existing,
)
)
if post_runaways:
_emit_runaway_warning(
post_runaways,
when='test teardown',
)
@pytest.fixture @pytest.fixture
def reap_subactors_per_test() -> int: def reap_subactors_per_test() -> int:
''' '''

View File

@ -52,16 +52,30 @@ pytest_plugins: tuple[str, ...] = (
if TYPE_CHECKING: if TYPE_CHECKING:
from argparse import Namespace from argparse import Namespace
_cap_sys_passed_as_flag: bool = False
_cap_fd_set: bool = False
# XXX REQUIRED in order to enforce `--capture=` flag # XXX REQUIRED in order to enforce `--capture=` flag
# pre test session. # pre test session.
# https://docs.pytest.org/en/stable/reference/reference.html#bootstrapping-hooks # https://docs.pytest.org/en/stable/reference/reference.html#bootstrapping-hooks
@pytest.hookimpl(tryfirst=True)
def pytest_load_initial_conftests( def pytest_load_initial_conftests(
early_config: pytest.Config, early_config: pytest.Config,
parser: pytest.Parser, parser: pytest.Parser,
args: list[str], args: list[str],
): ):
global _cap_sys_passed_as_flag, _cap_fd_set
opts: Namespace = early_config.option opts: Namespace = early_config.option
if opts.capture == 'fd':
_cap_fd_set = True
opts_w_args: Namespace = parser.parse_known_args(args) opts_w_args: Namespace = parser.parse_known_args(args)
if opts_w_args.capture == 'fd':
_cap_fd_set = True
if '--capture=sys' in args:
_cap_sys_passed_as_flag = True
# XXX, ALWAYS apply capsys for fork based spawners: # XXX, ALWAYS apply capsys for fork based spawners:
# * main_thread_forkserver # * main_thread_forkserver
@ -94,14 +108,23 @@ def pytest_load_initial_conftests(
(spawner := opts_w_args.spawn_backend) in [ (spawner := opts_w_args.spawn_backend) in [
'main_thread_forkserver', 'main_thread_forkserver',
] ]
and
opts.capture == 'fd'
): ):
print( print(
f'XXX SETTING CAPSYS due to spawning backend XXX\n' f'XXX SETTING CAPSYS due to spawning backend XXX\n'
f'--spawn-backend={spawner!r}\n' f'--spawn-backend={spawner!r}\n'
) )
opts.capture = 'sys' opts.capture = 'sys'
# ^TODO XXX?/
# seems like this doesn't get set by the above!?
args.append(
'--capture=sys',
)
out = parser.parse_known_and_unknown_args(
args,
early_config.option,
)
assert out[0].capture == 'sys'
# breakpoint()
# TODO, set various `$TRACTOR_X*` osenv vars here! # TODO, set various `$TRACTOR_X*` osenv vars here!
print( print(
@ -187,11 +210,17 @@ def tractor_test(
# injection (via `__wrapped__`) without leaking the async # injection (via `__wrapped__`) without leaking the async
# nature. # nature.
@wraps(wrapped) @wraps(wrapped)
def wrapper(**kwargs): def wrapper(
set_fork_aware_capture: pytest.CaptureFixture|None = None,
# ^NOTE when set, the decorated fn declared as fixture-param.
**kwargs,
):
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# NOTE, ensure we inject any test-fn declared fixture # NOTE, ensure we inject any test-fn declared fixture
# names. # names.
sig = inspect.signature(wrapped)
for kw in [ for kw in [
'reg_addr', 'reg_addr',
'loglevel', 'loglevel',
@ -200,9 +229,13 @@ def tractor_test(
'tpt_proto', 'tpt_proto',
'timeout', 'timeout',
]: ]:
if kw in inspect.signature(wrapped).parameters: if kw in sig.parameters:
assert kw in kwargs assert kw in kwargs
if 'set_fork_aware_capture' in sig.parameters:
assert set_fork_aware_capture
kwargs['set_fork_aware_capture'] = set_fork_aware_capture
# Extract runtime settings as locals for # Extract runtime settings as locals for
# `open_root_actor()`; these must NOT leak into # `open_root_actor()`; these must NOT leak into
# `kwargs` when the test fn doesn't declare them # `kwargs` when the test fn doesn't declare them
@ -245,7 +278,6 @@ def tractor_test(
# invoke test-fn body IN THIS task # invoke test-fn body IN THIS task
await wrapped(**kwargs) await wrapped(**kwargs)
# invoke runtime via a root task.
return trio.run( return trio.run(
partial( partial(
_main, _main,
@ -259,13 +291,6 @@ def tractor_test(
def pytest_addoption( def pytest_addoption(
parser: pytest.Parser, parser: pytest.Parser,
): ):
# parser.addoption(
# "--ll",
# action="store",
# dest='loglevel',
# default='ERROR', help="logging level to set when testing"
# )
parser.addoption( parser.addoption(
"--spawn-backend", "--spawn-backend",
action="store", action="store",
@ -291,7 +316,7 @@ def pytest_addoption(
"--enable-stackscope", "--enable-stackscope",
action="store_true", action="store_true",
dest='enable_stackscope', dest='enable_stackscope',
# default=False, default=False,
help=( help=(
'Install `stackscope` SIGUSR1 handler in pytest + ' 'Install `stackscope` SIGUSR1 handler in pytest + '
'every spawned subactor for live trio task-tree ' 'every spawned subactor for live trio task-tree '
@ -317,6 +342,13 @@ def pytest_addoption(
def pytest_configure( def pytest_configure(
config: pytest.Config, config: pytest.Config,
): ):
# opts: Namespace = config.option
# print(
# f'PYTEST_CONFIGURE\n'
# f'capture={opts.capture!r}\n'
# )
# breakpoint()
backend: str = config.option.spawn_backend backend: str = config.option.spawn_backend
from tractor.spawn._spawn import try_set_start_method from tractor.spawn._spawn import try_set_start_method
try: try:
@ -414,6 +446,25 @@ def pytest_collection_modifyitems(
break break
@pytest.fixture(
scope="session",
autouse=True,
)
def alert_on_finish():
'''
Ring a terminal notification on full test session
completion to alert any would be human.
'''
# TODO, check attached to tty or skip!
yield # run all tests
print("\a") # trigger terminal bell
# ?TODO, any other nice-tricks/specific tuis we could try?
# - supposedly works in many terminals:
# >> print("\033]5;Alert: Tests Finished\a")
# - sway/i3-nag?
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def debug_mode( def debug_mode(
request: pytest.FixtureRequest, request: pytest.FixtureRequest,
@ -538,6 +589,7 @@ def pytest_generate_tests(
"start_method", "start_method",
[spawn_backend], [spawn_backend],
scope='module', scope='module',
ids=lambda item: f'start_method={spawn_backend}',
) )
# TODO, parametrize any `tpt_proto: str` declaring tests! # TODO, parametrize any `tpt_proto: str` declaring tests!
@ -548,3 +600,85 @@ def pytest_generate_tests(
# proto_tpts, # TODO, double check this list usage! # proto_tpts, # TODO, double check this list usage!
# scope='module', # scope='module',
# ) # )
def _is_forking_spawner(
start_method: str,
) -> bool:
return start_method in [
'main_thread_forkserver',
'mp_forkserver',
]
@pytest.fixture
def is_forking_spawner(
start_method: str,
) -> bool:
'''
Is the `pytest` run using a `fork()`ing process spawning-backend?
'''
return _is_forking_spawner
def maybe_xfail_for_spawner(
start_method: str,
is_forking_spawner: bool,
) -> None:
'''
Fork based spawning backends caude issues with `pytest`'s
fd-capture mechanism and can cause various suites to hang.
Instead this helper allows skipping/xfailing from a test
when a certain spawner + CLI-flag input is detected.
'''
if (
not _cap_sys_passed_as_flag
and
is_forking_spawner
):
pytest.skip(
f'Spawner {start_method!r} requires the flag,\n'
f'--capture=sys or similar..\n'
)
def maybe_override_capture(
request: pytest.FixtureRequest,
start_method: bool,
) -> str:
if _is_forking_spawner(start_method):
request.getfixturevalue('capsys')
return 'sys'
return request.config.option.capture
@pytest.fixture
def set_fork_aware_capture(
request: pytest.FixtureRequest,
start_method: str,
) -> pytest.CaptureFixture|str:
'''
Force `--capture=sys` method for tests using
a forking-spawner backend due to fd-copying issues
which can oddly make certain tests hang/fail.
'''
if _cap_sys_passed_as_flag:
return 'sys'
capsys: pytest.CaptureFixture = maybe_override_capture(
request=request,
start_method=start_method,
)
return capsys
# XXX reset?
# with capsys.disabled():
# pass
# return partial(
# maybe_override_capture,
# request=request,
# start_method=start_method,
# )

View File

@ -0,0 +1,183 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Post-mortem subactor cleanup primitives things the parent
runtime has to clean up because the dead-or-SIGKILL'd child
couldn't.
Sibling of `tractor._testing._reap` which is the test-harness
equivalent (orphan-pid + leaked-shm + leaked-UDS-sock sweeper
fixtures). This module is the spawn-layer counterpart, called
inline from `hard_kill` and the broader subactor reap path.
Today this is just `unlink_uds_bind_addrs()`. As future
post-mortem cleanup needs surface (e.g. `/dev/shm` segment
unlink for hard-crashed actors, leaked-pidfile cleanup), they
land here too.
Future-work TODO authoritative UDS bind-addr tracking
-------------------------------------------------------
`unlink_uds_bind_addrs()` currently has two cleanup paths:
1. Explicit `bind_addrs` (when parent set them at spawn time)
2. **Convention-based reconstruction**
`<XDG_RUNTIME_DIR>/tractor/<name>@<pid>.sock` for the
common case where the subactor self-assigned a random sock
via `UDSAddress.get_random()`.
Path (2) hardcodes the `<name>@<pid>.sock` convention from
`tractor.ipc._uds.UDSAddress`. If that convention ever
changes or the subactor binds to a non-default
`bindspace`/`filedir` we'll silently fail to unlink.
A more authoritative approach would be:
- Subactors register their bound UDS sockpaths in a
per-process registry inside `tractor.ipc._uds` at
`start_listener()` time.
- The subactor reports its bound sockpath(s) back to the
parent over IPC immediately post-bind (extension to
`SpawnSpec` reply / a new handshake msg).
- Parent caches the subactor's authoritative sockpaths.
- `unlink_uds_bind_addrs()` checks the cache FIRST, falls
back to convention-reconstruction if the subactor died
before reporting (which is the SIGKILL case this fn
primarily exists for).
Tracked as future work in #454 (the parent UDS-leak
issue this module addresses); a separate issue may be
filed if/when the registry impl is scoped.
See also #452 — the discovery-client `CLOSE_WAIT` TCP
fd leak. Different bug class but same broader theme of
"fork-spawn unmasked latent cleanup gaps".
'''
from __future__ import annotations
import os
from typing import TYPE_CHECKING
import trio
from tractor.discovery._addr import (
UnwrappedAddress,
wrap_address,
)
from tractor.ipc._uds import UDSAddress
from tractor.log import get_logger
if TYPE_CHECKING:
from tractor.runtime._runtime import Actor
log = get_logger('tractor')
def unlink_uds_bind_addrs(
proc: trio.Process,
*,
bind_addrs: list[UnwrappedAddress] | None = None,
subactor: Actor | None = None,
) -> None:
'''
Best-effort post-mortem cleanup of any UDS sock-files
a hard-killed subactor was bound to.
SIGKILL bypasses Python execution the subactor's
`_serve_ipc_eps` `finally:` block (which normally calls
`os.unlink(addr.sockpath)`) never runs. Without this
parent-side cleanup, the dead subactor's
`${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock` file
accumulates on the filesystem (see issue #454 + the
autouse `_track_orphaned_uds_per_test` fixture).
Two cleanup paths, in order:
1. **Explicit `bind_addrs`** when the parent set the
subactor's bind addrs at spawn time, unlink each
UDS-flavored sockpath directly.
2. **Self-assigned reconstruction** when
`bind_addrs` is empty (the common case: subactor
picked its own random sock via
`UDSAddress.get_random()`), reconstruct the path
from `(subactor.aid.name, proc.pid)` using the
same `<name>@<pid>.sock` convention. We can do this
because the subactor uses its OWN `os.getpid()` at
bind time, which equals `proc.pid` from the
parent's view.
Idempotent: `FileNotFoundError` (graceful exit
already-unlinked, or sock never bound under early-
spawn cancel) is silenced; other `OSError`s log a
warning but never raise. TCP / non-UDS bind addrs are
skipped.
'''
sockpaths: list[str] = []
# path 1: explicit bind_addrs set at spawn time
for unwrapped in (bind_addrs or ()):
try:
addr = wrap_address(unwrapped)
except Exception:
log.exception(
f'Failed to wrap addr for UDS post-kill cleanup '
f'— skipping {unwrapped!r}\n'
)
continue
if isinstance(addr, UDSAddress):
sockpaths.append(str(addr.sockpath))
# path 2: reconstruct from subactor name + proc pid
# for the random-self-assign case (bind_addrs=None)
#
# TODO authoritative tracking — see module docstring.
if (
not sockpaths
and subactor is not None
and proc.pid is not None
):
sockname: str = f'{subactor.aid.name}@{proc.pid}.sock'
sockpath: str = str(
UDSAddress.def_bindspace / sockname
)
sockpaths.append(sockpath)
for sockpath in sockpaths:
try:
os.unlink(sockpath)
log.runtime(
f'Unlinked orphaned UDS sock-file post-SIGKILL\n'
f' |_{proc}\n'
f' |_{sockpath}\n'
)
except FileNotFoundError:
# raced — subactor cleaned up before SIGKILL,
# OR sockfile never bound (early-spawn cancel),
# OR transport wasn't UDS this run.
pass
except OSError as exc:
log.warning(
f'Failed to unlink subactor UDS sock-file '
f'post-SIGKILL\n'
f' |_{proc}\n'
f' |_{sockpath}\n'
f' |_{exc!r}\n'
)

View File

@ -40,7 +40,10 @@ from tractor.runtime._state import (
_runtime_vars, _runtime_vars,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor.discovery._addr import UnwrappedAddress from tractor.discovery._addr import (
UnwrappedAddress,
)
from ._reap import unlink_uds_bind_addrs
from tractor.runtime._portal import Portal from tractor.runtime._portal import Portal
from tractor.runtime._runtime import Actor from tractor.runtime._runtime import Actor
from tractor.msg import types as msgtypes from tractor.msg import types as msgtypes
@ -279,6 +282,16 @@ async def hard_kill(
# whilst also hacking on it XD # whilst also hacking on it XD
# terminate_after: int = 99999, # terminate_after: int = 99999,
*,
# Subactor's bind addresses + subactor record, used
# for post-SIGKILL UDS sockpath cleanup. Optional for
# legacy callers; new call sites should pass at least
# `subactor` (which lets us reconstruct the sock path
# from `aid.name + proc.pid` when `bind_addrs` is
# empty/self-assigned). See `._reap.unlink_uds_bind_addrs()`.
bind_addrs: list[UnwrappedAddress] | None = None,
subactor: Actor | None = None,
) -> None: ) -> None:
''' '''
Un-gracefully terminate an OS level `trio.Process` after timeout. Un-gracefully terminate an OS level `trio.Process` after timeout.
@ -366,6 +379,21 @@ async def hard_kill(
) )
proc.kill() proc.kill()
# Post-mortem UDS sockpath cleanup. SIGKILL bypassed
# the subactor's normal `os.unlink(addr.sockpath)` in
# `_serve_ipc_eps`'s `finally:`; the parent has the
# bind addrs (or can reconstruct from name + pid) so
# we do it here. Runs UNCONDITIONALLY (graceful-exit
# case is a no-op via `FileNotFoundError` skip in the
# helper) so the cleanup also covers the "cancelled
# during spawn" path where the subactor never reached
# its IPC server finally block.
unlink_uds_bind_addrs(
proc,
bind_addrs=bind_addrs,
subactor=subactor,
)
async def soft_kill( async def soft_kill(
proc: ProcessType, proc: ProcessType,

View File

@ -39,7 +39,7 @@ from tractor.runtime._state import (
current_actor, current_actor,
is_root_process, is_root_process,
debug_mode, debug_mode,
get_runtime_vars, # get_runtime_vars,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor.discovery._addr import UnwrappedAddress from tractor.discovery._addr import UnwrappedAddress
@ -282,7 +282,23 @@ async def trio_proc(
if proc.poll() is None: if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}") log.cancel(f"Attempting to hard kill {proc}")
await hard_kill(proc) await hard_kill(
proc,
# NOTE, pass through so post-SIGKILL we
# can `os.unlink()` the subactor's
# orphaned UDS sock-file(s) — the
# subactor's own
# `_serve_ipc_eps`-`finally:` cleanup
# never runs under SIGKILL. `subactor`
# lets the helper reconstruct the
# sock path via `aid.name + proc.pid`
# when `bind_addrs` is the common
# self-assigned-random case
# (bind_addrs=None at spawn). See
# `_unlink_uds_bind_addrs()` in `_spawn`.
bind_addrs=bind_addrs,
subactor=subactor,
)
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
else: else:

View File

@ -0,0 +1,95 @@
# `tractor.trionics.patches`
Defensive monkey-patches for bugs in `trio` itself.
## What goes here
- Bugs in upstream `trio` that we've encountered while
running `tractor` and need to work around until
upstream releases a fix.
- Each patch fixes EXACTLY one trio internal — no
multi-bug omnibus patches.
## What does NOT go here
- Bugs in `tractor`'s own code (those get fixed
in-tree, in the offending tractor module).
- Bugs in `asyncio`, `pytest`, the stdlib, etc. (file
separate `tractor.<lib>.patches` subpkgs as
needed).
- Workarounds for behavior we *disagree* with but that
isn't a bug per se. If trio's API does what it says
on the tin, we don't override it here.
## Per-patch contract
Every `_<topic>.py` module in this directory MUST
expose:
- **`apply() -> bool`** — apply the patch. Idempotent
(safe to call multiple times). Version-gated — must
consult `is_needed()` and skip when False. Returns
`True` if patched this call, `False` if skipped.
- **`is_needed() -> bool`** — does upstream still need
patching? Today most patches return `True`
unconditionally, but as upstream releases land each
should gate on `Version(trio.__version__) <
Version('X.Y.Z')`. When the gated version is
released, the patch can be DELETED entirely.
- **`repro() -> None`** — minimal demonstration of the
bug. Used by the regression test suite to assert (a)
the upstream bug still exists, (b) our patch fixes
it. Should be tight enough that calling it post-
`apply()` returns cleanly within a few hundred
milliseconds — tests wrap it with a wall-clock cap.
Each module's docstring MUST contain:
- **Problem**: what trio does wrong + the trigger
conditions (e.g. "fork-spawn backend, peer-closed
socketpair, etc.")
- **Fix**: the one-line (ideally) patch
- **Repro**: the standalone snippet `repro()`
implements
- **Upstream**: link to filed issue/PR (or
`TODO: file`)
- **REMOVE WHEN**: `trio>=X.Y.Z` ships the upstream
fix
## Adding a patch
1. Create `_<topic>.py` with the `apply` /
`is_needed` / `repro` API.
2. Register it in `__init__.py::_PATCHES`.
3. Add a regression test in
`tests/trionics/test_patches.py` that uses
`repro()` to assert pre/post-patch behavior with a
wall-clock cap.
4. File the upstream issue/PR. Add the link to your
module's `Upstream:` and `# REMOVE WHEN:` lines.
## Removing a patch (when upstream releases the fix)
1. Confirm the upstream-fixed `trio` version is the
minimum we depend on, OR keep the version-gate in
`is_needed()` if we still support older trio.
2. If we've fully bumped past the broken versions:
- Delete `_<topic>.py`
- Remove the entry from `__init__.py::_PATCHES`
- Delete the corresponding test in
`tests/trionics/test_patches.py`
- Bump the conc-anal doc with a "FIXED" header
## Calling
```python
from tractor.trionics.patches import apply_all
apply_all()
```
Currently invoked from `tractor._child._actor_child_main`
before `_trio_main` so every spawned subactor gets
patched. The root actor's entry could opt in too if a
patch turns out to bite the root (none do today).

View File

@ -0,0 +1,84 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Defensive monkey-patches for `trio` internals.
Every patch in this package fixes a bug in `trio` itself
that we've encountered while running `tractor` — usually
a fork-survival edge case that upstream `trio` hasn't
filed/fixed yet. Each patch is:
- **idempotent** safe to call multiple times
- **version-gated** checks `trio.__version__` and skips
itself if upstream has shipped the fix
- **scoped** only modifies the specific trio internal
it's targeting; no broad side effects
- **removable** every patch carries a `# REMOVE WHEN:`
marker in its docstring pointing at the upstream PR
whose release allows us to drop it
Add a new patch by:
1. Create `tractor/trionics/patches/_<topic>.py` exposing
the `apply()` / `is_needed()` / `repro()` API
contract.
2. Import it in this `__init__.py` and add an entry to
`_PATCHES`.
3. Document upstream-fix-tracking in the module
docstring's `# REMOVE WHEN:` line.
4. Add a regression test in
`tests/trionics/test_patches.py` that uses the
patch's `repro()` to assert the bug exists + the
patch fixes it.
Calling `apply_all()` from a tractor entry point (e.g.
`tractor._child._actor_child_main`) applies every
registered patch + returns `{patch_name: applied?}` so
callers can log/assert as needed.
'''
from typing import Callable
from . import _wakeup_socketpair
_PATCHES: list[tuple[str, Callable[[], bool]]] = [
(
'trio_wakeup_socketpair_drain_eof',
_wakeup_socketpair.apply,
),
]
def apply_all() -> dict[str, bool]:
'''
Apply every registered patch. Idempotent calling
twice is fine, second call's dict will be all
`False`.
Returns `{patch_name: applied?}`:
- `True` patch was applied THIS call (inaugural
apply, or first-call-since-process-start).
- `False` skipped (already applied OR upstream fix
detected via `is_needed() == False`).
'''
results: dict[str, bool] = {}
for name, applier in _PATCHES:
results[name] = applier()
return results

View File

@ -0,0 +1,171 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Patch `trio._core._wakeup_socketpair.WakeupSocketpair.drain()`
to break on peer-closed EOF.
Problem
-------
`drain()` loops on `self.wakeup_sock.recv(2**16)` and
exits ONLY on `BlockingIOError` (buffer-empty on a
non-blocking socket), NEVER on `recv() == b''`
(peer-closed FIN). When the socketpair's write-end
has been closed, `recv` returns 0 bytes each call
infinite C-level tight loop 100% CPU, no Python
checkpoints, no signal delivery, no progress.
Most reliably triggered under fork-spawn backends
`os.fork()` + `_close_inherited_fds()` can leave a
`WakeupSocketpair` instance whose `write_sock` was
closed in the child (or whose peer-end is held by a
process that has since exited).
Repro
-----
```python
from trio._core._wakeup_socketpair import WakeupSocketpair
ws = WakeupSocketpair()
ws.write_sock.close()
ws.drain() # spins forever pre-patch
```
Fix
---
One line: break the drain loop on `b''` EOF
in addition to the existing `BlockingIOError` exit.
```python
def _safe_drain(self) -> None:
try:
while True:
data = self.wakeup_sock.recv(2**16)
if not data: # ← peer-closed; nothing more to drain
return
except BlockingIOError:
pass
```
Upstream
--------
TODO: file at `python-trio/trio` the standalone
`repro()` below + this docstring is the issue body's
evidence section.
REMOVE WHEN: trio>=`<TBD>` ships the EOF-break in
`_wakeup_socketpair.WakeupSocketpair.drain()`.
See also
--------
- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
sibling-bug analysis fixed by the same patch.
'''
from __future__ import annotations
# Module-local sentinel — set True by `apply()` after the
# first successful patch. Idempotency guard.
_APPLIED: bool = False
def is_needed() -> bool:
'''
True iff upstream `trio` is the broken version that
needs our patch.
Today: always True since no released `trio` has the
fix. When upstream lands it, gate on:
```python
from packaging.version import Version
import trio
return Version(trio.__version__) < Version('<TBD>')
```
'''
# TODO version-gate once upstream lands the fix.
return True
def repro() -> None:
'''
Minimal hang demonstrator + regression test target.
Returns CLEANLY when `apply()` has been called
earlier in this process (the patched
`_safe_drain` breaks on EOF). Spins forever
UNPATCHED caller should wrap with a wall-clock
cap (e.g. `signal.alarm(N)` or `trio.fail_after`)
to avoid hanging the test runner if regressing.
Used by `tests/trionics/test_patches.py` to assert
both:
1. The bug exists upstream (sanity check the
repro is real).
2. Our patch fixes it (post-`apply()` returns
cleanly).
'''
from trio._core._wakeup_socketpair import (
WakeupSocketpair,
)
ws = WakeupSocketpair()
ws.write_sock.close()
ws.drain() # ← targeted operation
def apply() -> bool:
'''
Apply the EOF-break patch to
`WakeupSocketpair.drain`. Idempotent + version-
gated.
Returns:
- `True` if patched THIS call (inaugural apply).
- `False` if skipped (already applied this process,
OR `is_needed() == False` because upstream fixed
it).
'''
global _APPLIED
if _APPLIED or not is_needed():
return False
from trio._core._wakeup_socketpair import (
WakeupSocketpair as _WSP,
)
def _safe_drain(self) -> None:
try:
while True:
data = self.wakeup_sock.recv(2**16)
# XXX patch — break on EOF instead of
# spinning. Upstream trio's `drain()`
# only handles the `BlockingIOError`
# (buffer-empty) case; missed the
# peer-closed (`recv == b''`) case.
if not data:
return
except BlockingIOError:
pass
_WSP.drain = _safe_drain
_APPLIED = True
return True