Compare commits
8 Commits
fc2e298a29
...
0ef549fadb
| Author | SHA1 | Date |
|---|---|---|
|
|
0ef549fadb | |
|
|
e9712dcaeb | |
|
|
5cf0312c78 | |
|
|
32e89c67ee | |
|
|
d549c72052 | |
|
|
5a9926fc32 | |
|
|
72a0465c52 | |
|
|
9431a81d37 |
|
|
@ -0,0 +1,378 @@
|
||||||
|
# `infect_asyncio` × `main_thread_forkserver` Mode-A deadlock
|
||||||
|
|
||||||
|
## Reproducer
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./py313/bin/python -m pytest \
|
||||||
|
tests/test_infected_asyncio.py::test_aio_simple_error \
|
||||||
|
--tpt-proto=tcp \
|
||||||
|
--spawn-backend=main_thread_forkserver \
|
||||||
|
-v --capture=sys
|
||||||
|
```
|
||||||
|
|
||||||
|
Hangs indefinitely. Mode-A signature — both processes
|
||||||
|
parked in `epoll_wait`, **neither burning CPU**.
|
||||||
|
|
||||||
|
## Empirical observations (caught alive)
|
||||||
|
|
||||||
|
### Outer pytest (parent)
|
||||||
|
|
||||||
|
`py-spy dump` on the test runner pid shows the trio
|
||||||
|
event loop parked at the bottom of `trio.run()`:
|
||||||
|
|
||||||
|
```
|
||||||
|
Thread <pid> (idle): "MainThread"
|
||||||
|
get_events (trio/_core/_io_epoll.py:245)
|
||||||
|
self: <EpollIOManager at 0x...>
|
||||||
|
timeout: 86400
|
||||||
|
run (trio/_core/_run.py:2415)
|
||||||
|
next_send: []
|
||||||
|
timeout: 86400
|
||||||
|
test_aio_simple_error (tests/test_infected_asyncio.py:175)
|
||||||
|
```
|
||||||
|
|
||||||
|
`timeout: 86400` is trio's "no scheduled work, just wait
|
||||||
|
for I/O forever" sentinel. `next_send: []` confirms
|
||||||
|
nothing is queued. The parent is stuck inside
|
||||||
|
`tractor.open_nursery(...).run_in_actor(...)` waiting
|
||||||
|
for `ipc_server.wait_for_peer(uid)` to fire — i.e.
|
||||||
|
waiting for the spawned subactor to connect back.
|
||||||
|
|
||||||
|
### Subactor (forked child)
|
||||||
|
|
||||||
|
`/proc/<pid>/stack`:
|
||||||
|
|
||||||
|
```
|
||||||
|
do_epoll_wait+0x4c0/0x500
|
||||||
|
__x64_sys_epoll_wait+0x70/0x120
|
||||||
|
do_syscall_64+0xef/0x1540
|
||||||
|
entry_SYSCALL_64_after_hwframe+0x77/0x7f
|
||||||
|
```
|
||||||
|
|
||||||
|
`strace -p <pid> -f`:
|
||||||
|
|
||||||
|
```
|
||||||
|
[pid <child-A>] epoll_wait(6 <unfinished ...>
|
||||||
|
[pid <child-B>] epoll_wait(3
|
||||||
|
```
|
||||||
|
|
||||||
|
**Two threads**, both parked in `epoll_wait` on
|
||||||
|
distinct epoll fds. Both blocked, neither making
|
||||||
|
progress.
|
||||||
|
|
||||||
|
### Subactor file-descriptor table
|
||||||
|
|
||||||
|
```
|
||||||
|
fd=0,1,2 stdio
|
||||||
|
fd=3 eventpoll [watches fd 4]
|
||||||
|
fd=4 ↔ fd=5 unix STREAM (CONNECTED) — internal pair
|
||||||
|
fd=6 eventpoll [watches fds 7, 9]
|
||||||
|
fd=7 ↔ fd=8 unix STREAM (CONNECTED) — internal pair
|
||||||
|
fd=9 ↔ fd=10 unix STREAM (CONNECTED) — internal pair
|
||||||
|
```
|
||||||
|
|
||||||
|
Confirmed via `ss -xp` peer-inode lookup: **all 6 unix
|
||||||
|
sockets are internal socketpairs** (peer in same pid).
|
||||||
|
|
||||||
|
**Critical**: zero TCP/IPv4/IPv6 sockets, despite
|
||||||
|
`--tpt-proto=tcp`:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ sudo lsof -p <subactor> | grep -iE 'TCP|IPv'
|
||||||
|
(empty)
|
||||||
|
$ sudo ss -tnp | grep <subactor>
|
||||||
|
(empty)
|
||||||
|
```
|
||||||
|
|
||||||
|
**The subactor never opened a TCP connection back to
|
||||||
|
the parent.**
|
||||||
|
|
||||||
|
## Diagnosis
|
||||||
|
|
||||||
|
The subactor reaches `_actor_child_main` →
|
||||||
|
`_trio_main(actor)` →
|
||||||
|
`run_as_asyncio_guest(trio_main)`. Code path
|
||||||
|
(`tractor.spawn._entry`):
|
||||||
|
|
||||||
|
```python
|
||||||
|
if infect_asyncio:
|
||||||
|
actor._infected_aio = True
|
||||||
|
run_as_asyncio_guest(trio_main) # ← this branch
|
||||||
|
else:
|
||||||
|
trio.run(trio_main)
|
||||||
|
```
|
||||||
|
|
||||||
|
`run_as_asyncio_guest` (`tractor.to_asyncio`):
|
||||||
|
|
||||||
|
```python
|
||||||
|
def run_as_asyncio_guest(trio_main, ...):
|
||||||
|
async def aio_main(trio_main):
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
trio_done_fute = asyncio.Future()
|
||||||
|
...
|
||||||
|
trio.lowlevel.start_guest_run(
|
||||||
|
trio_main,
|
||||||
|
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||||
|
done_callback=trio_done_callback,
|
||||||
|
)
|
||||||
|
out = await asyncio.shield(trio_done_fute)
|
||||||
|
return out.unwrap()
|
||||||
|
...
|
||||||
|
return asyncio.run(aio_main(trio_main))
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected flow:
|
||||||
|
1. `asyncio.run(aio_main(...))` — boots fresh asyncio
|
||||||
|
loop in calling thread.
|
||||||
|
2. `aio_main` calls `trio.lowlevel.start_guest_run(...)`
|
||||||
|
— initializes trio's I/O manager, schedules first
|
||||||
|
trio slice via `loop.call_soon_threadsafe`.
|
||||||
|
3. asyncio loop dispatches the callback → trio runs a
|
||||||
|
slice → yields back via `call_soon_threadsafe`.
|
||||||
|
4. Trio's `async_main` (the user function) runs →
|
||||||
|
`Channel.from_addr(parent_addr)` → TCP connect to
|
||||||
|
parent.
|
||||||
|
|
||||||
|
What we observe instead:
|
||||||
|
- 2 threads in `epoll_wait` (one trio epoll, one
|
||||||
|
asyncio epoll, both inactive)
|
||||||
|
- 6 unix-socket fds (3 socketpairs: trio
|
||||||
|
wakeup-fd-pair, asyncio wakeup-fd-pair, trio kicker
|
||||||
|
socketpair)
|
||||||
|
- ZERO TCP — `Channel.from_addr` never ran
|
||||||
|
|
||||||
|
Most likely cause: **trio's guest-run scheduling
|
||||||
|
callback didn't get dispatched by asyncio's loop in
|
||||||
|
the forked child**, so trio's `async_main` never
|
||||||
|
executes past trio bootstrap, and the
|
||||||
|
parent-IPC-connect step is never reached.
|
||||||
|
|
||||||
|
## Fork-survival risk surface (hypothesis)
|
||||||
|
|
||||||
|
`trio.lowlevel.start_guest_run` builds Python-level
|
||||||
|
closures + signal handlers + wakeup-fd registrations
|
||||||
|
that depend on:
|
||||||
|
|
||||||
|
- The asyncio event loop's `call_soon_threadsafe`
|
||||||
|
thread-id matching the loop owner thread.
|
||||||
|
- Process-wide signal-wakeup-fd state
|
||||||
|
(`signal.set_wakeup_fd`).
|
||||||
|
- Trio's `KIManager` SIGINT handler.
|
||||||
|
|
||||||
|
Under `main_thread_forkserver`, the fork happens from
|
||||||
|
a worker thread that has **never entered trio**
|
||||||
|
(intentional — trio-free launchpad). But the FORKED
|
||||||
|
child then tries to bring up BOTH asyncio AND
|
||||||
|
trio-as-guest fresh from this trio-free thread. The
|
||||||
|
asyncio loop boots fine; trio's `start_guest_run`
|
||||||
|
initializes BUT the cross-loop dispatch (asyncio
|
||||||
|
queue → trio slice) appears to silently fail to wire
|
||||||
|
up.
|
||||||
|
|
||||||
|
Two more hypotheses worth probing:
|
||||||
|
|
||||||
|
1. **Wakeup-fd contention**: asyncio installs
|
||||||
|
`signal.set_wakeup_fd(<own_pair>)`. trio's
|
||||||
|
guest-run also wants a wakeup-fd. Whoever installs
|
||||||
|
second wins; the loser's `epoll_wait` no longer
|
||||||
|
wakes on signals. Combined with the `asyncio.shield(
|
||||||
|
trio_done_fute)` + `asyncio.CancelledError`
|
||||||
|
handling in `run_as_asyncio_guest`, a missed signal
|
||||||
|
delivery could explain the indefinite park.
|
||||||
|
|
||||||
|
2. **Trio kicker socketpair race**: trio's I/O manager
|
||||||
|
uses an internal `socket.socketpair()` to "kick"
|
||||||
|
itself out of `epoll_wait` when a non-IO task needs
|
||||||
|
scheduling. In guest mode, the kicker is still
|
||||||
|
present but is supposed to be triggered via the
|
||||||
|
asyncio dispatch. If the kicker write never gets
|
||||||
|
issued by asyncio's callback, trio's epoll never
|
||||||
|
wakes.
|
||||||
|
|
||||||
|
## Confirmed via py-spy (live capture)
|
||||||
|
|
||||||
|
After detaching `strace` (ptrace is exclusive — that's
|
||||||
|
why `py-spy` returns EPERM if strace is attached):
|
||||||
|
|
||||||
|
```
|
||||||
|
Thread <pid> (idle): "main-thread-forkserver[asyncio_actor]"
|
||||||
|
select (selectors.py:452) # asyncio epoll
|
||||||
|
_run_once (asyncio/base_events.py:2012)
|
||||||
|
run_forever (asyncio/base_events.py:683)
|
||||||
|
run_until_complete (asyncio/base_events.py:712)
|
||||||
|
run (asyncio/runners.py:118)
|
||||||
|
run (asyncio/runners.py:195)
|
||||||
|
run_as_asyncio_guest (tractor/to_asyncio.py:1770)
|
||||||
|
_trio_main (tractor/spawn/_entry.py:160)
|
||||||
|
_actor_child_main (tractor/_child.py:72)
|
||||||
|
_child_target (tractor/spawn/_main_thread_forkserver.py:910)
|
||||||
|
_worker (tractor/spawn/_main_thread_forkserver.py:605)
|
||||||
|
[thread bootstrap]
|
||||||
|
|
||||||
|
Thread <pid+1> (idle): "Trio thread 14"
|
||||||
|
get_events (trio/_core/_io_epoll.py:245) # trio epoll
|
||||||
|
get_events (trio/_core/_run.py:1678)
|
||||||
|
capture (outcome/_impl.py:67)
|
||||||
|
_handle_job (trio/_core/_thread_cache.py:173)
|
||||||
|
_work (trio/_core/_thread_cache.py:196)
|
||||||
|
[thread bootstrap]
|
||||||
|
```
|
||||||
|
|
||||||
|
This data **rewrites the diagnosis**: trio guest-run
|
||||||
|
isn't broken across the fork — it's working as designed.
|
||||||
|
The two threads ARE the canonical guest-run architecture:
|
||||||
|
|
||||||
|
1. **Asyncio main loop** runs in the lead thread. Parked
|
||||||
|
in `selectors.EpollSelector.select(timeout=-1)` —
|
||||||
|
waiting indefinitely for ANY callback to be queued.
|
||||||
|
2. **Trio's I/O manager** offloads `get_events`
|
||||||
|
(`epoll_wait`) onto a `trio._core._thread_cache`
|
||||||
|
worker thread. The worker calls
|
||||||
|
`outcome.capture(get_events)` and parks in
|
||||||
|
`epoll_wait(timeout=86400)`.
|
||||||
|
3. When trio I/O fires (or its kicker socketpair gets a
|
||||||
|
write), the worker returns from `epoll_wait`,
|
||||||
|
delivers the result via `_handle_job`'s `deliver`
|
||||||
|
callback, which schedules the next trio slice on
|
||||||
|
asyncio via `loop.call_soon_threadsafe`.
|
||||||
|
|
||||||
|
The fact that the trio thread is *already* in
|
||||||
|
`_thread_cache._handle_job` doing `capture(get_events)`
|
||||||
|
means **trio's scheduler HAS started** — the bridge
|
||||||
|
asyncio↔trio is wired correctly post-fork.
|
||||||
|
|
||||||
|
So `async_main` DID run far enough to register some
|
||||||
|
trio task that's now awaiting I/O. The question
|
||||||
|
becomes: **what is `async_main` waiting on?**
|
||||||
|
|
||||||
|
Process state confirms it's NOT waiting on the TCP
|
||||||
|
connect to parent:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ sudo lsof -p <subactor> | grep -iE 'TCP|IPv'
|
||||||
|
(empty)
|
||||||
|
$ sudo ss -tnp | grep <subactor>
|
||||||
|
(empty)
|
||||||
|
```
|
||||||
|
|
||||||
|
`Channel.from_addr(parent_addr)` — the very first
|
||||||
|
thing `async_main` does — was never reached, OR was
|
||||||
|
reached but errored before `socket()` was called. The
|
||||||
|
parent (running `ipc_server.wait_for_peer`) waits
|
||||||
|
forever for the connection; it never comes.
|
||||||
|
|
||||||
|
## Refined hypothesis
|
||||||
|
|
||||||
|
`async_main` is stalled in some PRE-`Channel.from_addr`
|
||||||
|
checkpoint. Candidates:
|
||||||
|
|
||||||
|
1. **`get_console_log` / logger init** — called early in
|
||||||
|
`_trio_main` if `actor.loglevel is not None`. Logging
|
||||||
|
setup involves file/handler init that could block on
|
||||||
|
something fork-inherited (e.g. a stale lock).
|
||||||
|
2. **`debug.maybe_init_greenback`** — `start_guest_run`
|
||||||
|
includes a check (`if debug_mode(): assert 0` —
|
||||||
|
currently asserts unsupported). For non-debug mode
|
||||||
|
this is bypassed but related machinery may run.
|
||||||
|
3. **Stackscope SIGUSR1 handler install** — gated on
|
||||||
|
`_debug_mode` OR `TRACTOR_ENABLE_STACKSCOPE` env-var.
|
||||||
|
The `enable_stack_on_sig()` path captures a trio
|
||||||
|
token via `trio.lowlevel.current_trio_token()` —
|
||||||
|
could block under guest mode.
|
||||||
|
4. **Initial `await trio.sleep(0)` / first checkpoint**
|
||||||
|
in `async_main` before reaching the
|
||||||
|
`Channel.from_addr` line. Under guest mode, if the
|
||||||
|
FIRST `call_soon_threadsafe` callback never gets
|
||||||
|
processed by asyncio, trio's first slice never
|
||||||
|
completes — but the worker thread WOULD still be in
|
||||||
|
`epoll_wait` having been started by trio's I/O
|
||||||
|
manager init.
|
||||||
|
|
||||||
|
## Confirming `async_main`'s parked location
|
||||||
|
|
||||||
|
Add temporary logging at the top of `Actor.async_main`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# tractor/runtime/_runtime.py around line 855
|
||||||
|
async def async_main(self, parent_addr=None):
|
||||||
|
log.devx('async_main: ENTERED') # marker A
|
||||||
|
try:
|
||||||
|
log.devx('async_main: pre-Channel.from_addr') # marker B
|
||||||
|
chan = await Channel.from_addr(
|
||||||
|
addr=wrap_address(parent_addr)
|
||||||
|
)
|
||||||
|
log.devx('async_main: post-Channel.from_addr') # marker C
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
Re-run the test with `--ll=devx`. The last marker logged
|
||||||
|
tells us exactly where `async_main` parked. If only A
|
||||||
|
fires, the issue is between A and B (logger init,
|
||||||
|
stackscope, etc.). If A and B fire but not C, it's in
|
||||||
|
`Channel.from_addr` (DNS, socket creation, connect).
|
||||||
|
|
||||||
|
## Related sibling bug
|
||||||
|
|
||||||
|
`tests/test_multi_program.py::test_register_duplicate_name`
|
||||||
|
hangs under the same backend with a DIFFERENT
|
||||||
|
fingerprint:
|
||||||
|
|
||||||
|
- Subactor at 100% CPU (busy-loop), not parked
|
||||||
|
- `recvfrom(6, "", 65536, 0, NULL, NULL) = 0` repeating
|
||||||
|
with no `epoll_wait` in between
|
||||||
|
- fd=6 is one of trio's internal AF_UNIX
|
||||||
|
socketpair fds (the kicker mechanism)
|
||||||
|
|
||||||
|
Distinct root cause — possibly trio's kicker socketpair
|
||||||
|
inheriting a half-closed state across the fork — but
|
||||||
|
shares the broader theme: **trio internal-state
|
||||||
|
initialization isn't fully fork-safe under
|
||||||
|
`main_thread_forkserver`** for the more exotic
|
||||||
|
dispatch paths.
|
||||||
|
|
||||||
|
## Workarounds (until fix lands)
|
||||||
|
|
||||||
|
1. **Skip-mark on the fork backend** — temporarily mark
|
||||||
|
`tests/test_infected_asyncio.py` with
|
||||||
|
`pytest.mark.skipon_spawn_backend('main_thread_forkserver',
|
||||||
|
reason='infect_asyncio + fork interaction broken,
|
||||||
|
see ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md')`.
|
||||||
|
Lets the rest of the test suite run green while
|
||||||
|
this is being fixed properly.
|
||||||
|
|
||||||
|
2. **Run infected-asyncio tests under the `trio`
|
||||||
|
backend only** — they don't exercise fork
|
||||||
|
semantics, so they won't hit this bug.
|
||||||
|
|
||||||
|
## Investigation next steps
|
||||||
|
|
||||||
|
In rough priority:
|
||||||
|
|
||||||
|
1. Catch the hang alive again, **detach strace**,
|
||||||
|
`py-spy --locals` the subactor — confirm trio
|
||||||
|
thread is NOT yet at `async_main`.
|
||||||
|
2. Diff `start_guest_run` setup pre-fork vs post-fork
|
||||||
|
by adding `log.devx()` markers in
|
||||||
|
`tractor.to_asyncio.run_as_asyncio_guest::aio_main`
|
||||||
|
at:
|
||||||
|
- asyncio loop bringup
|
||||||
|
- immediately before `start_guest_run`
|
||||||
|
- immediately after `start_guest_run`
|
||||||
|
- inside the `trio_done_callback` registration
|
||||||
|
3. Check whether the asyncio loop dispatches ANY
|
||||||
|
callbacks in the forked child — instrument
|
||||||
|
`loop.call_soon_threadsafe` (e.g. monkey-patch
|
||||||
|
`loop._call_soon` to log).
|
||||||
|
4. If steps 1–3 confirm that asyncio's queue is
|
||||||
|
stuck, look at whether the asyncio event-loop
|
||||||
|
policy or selector is being inherited from a
|
||||||
|
pre-fork (parent-process) state in a way that
|
||||||
|
breaks the new loop.
|
||||||
|
|
||||||
|
## See also
|
||||||
|
|
||||||
|
- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella
|
||||||
|
- [#451](https://github.com/goodboy/tractor/issues/451) — Mode-A cancel-cascade hang
|
||||||
|
- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md`
|
||||||
|
- `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`
|
||||||
|
- python-trio/trio#1614 — trio + fork hazards
|
||||||
|
|
@ -0,0 +1,221 @@
|
||||||
|
# trio `WakeupSocketpair.drain()` busy-loop in forked child (peer-closed missed-EOF)
|
||||||
|
|
||||||
|
## Reproducer
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./py313/bin/python -m pytest \
|
||||||
|
tests/test_multi_program.py::test_register_duplicate_name \
|
||||||
|
--tpt-proto=tcp \
|
||||||
|
--spawn-backend=main_thread_forkserver \
|
||||||
|
-v --capture=sys
|
||||||
|
```
|
||||||
|
|
||||||
|
Subactor pegs a CPU core indefinitely; parent test
|
||||||
|
hangs waiting for the subactor.
|
||||||
|
|
||||||
|
## Empirical evidence (caught alive)
|
||||||
|
|
||||||
|
```
|
||||||
|
$ sudo strace -p <subactor-pid>
|
||||||
|
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||||
|
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||||
|
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||||
|
... (no `epoll_wait`, no other syscalls, just this back-to-back)
|
||||||
|
```
|
||||||
|
|
||||||
|
Pattern: tight C-level `recvfrom` loop returning 0
|
||||||
|
each call. No `epoll_wait` between iterations →
|
||||||
|
**not trio's task scheduler**. Pure synchronous C
|
||||||
|
loop.
|
||||||
|
|
||||||
|
```
|
||||||
|
$ sudo readlink /proc/<subactor-pid>/fd/6
|
||||||
|
socket:[<inode>]
|
||||||
|
|
||||||
|
$ sudo lsof -p <subactor-pid> | grep ' 6u'
|
||||||
|
<cmd> <pid> goodboy 6u unix 0xffff... 0t0 <inode> type=STREAM (CONNECTED)
|
||||||
|
```
|
||||||
|
|
||||||
|
fd=6 is an **AF_UNIX socket** in CONNECTED state.
|
||||||
|
Even though the test uses `--tpt-proto=tcp`, this fd
|
||||||
|
is NOT a tractor IPC channel — it's an internal
|
||||||
|
trio socketpair.
|
||||||
|
|
||||||
|
## Root-cause: `WakeupSocketpair.drain()`
|
||||||
|
|
||||||
|
`/site-packages/trio/_core/_wakeup_socketpair.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
class WakeupSocketpair:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.wakeup_sock, self.write_sock = socket.socketpair()
|
||||||
|
self.wakeup_sock.setblocking(False)
|
||||||
|
self.write_sock.setblocking(False)
|
||||||
|
...
|
||||||
|
|
||||||
|
def drain(self) -> None:
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
self.wakeup_sock.recv(2**16)
|
||||||
|
except BlockingIOError:
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
|
||||||
|
`socket.socketpair()` on Linux defaults to AF_UNIX
|
||||||
|
SOCK_STREAM. Both ends non-blocking. Normal flow:
|
||||||
|
|
||||||
|
1. Signal/wake event → `write_sock.send(b'\x00')`
|
||||||
|
queues a byte.
|
||||||
|
2. `wakeup_sock` becomes readable → trio's epoll
|
||||||
|
triggers.
|
||||||
|
3. Trio calls `drain()` to flush the buffer.
|
||||||
|
4. drain loops on `wakeup_sock.recv(64KB)`.
|
||||||
|
5. Eventually buffer empty → non-blocking socket
|
||||||
|
raises `BlockingIOError` → except → break.
|
||||||
|
|
||||||
|
**Bug surface — peer-closed missed-EOF**:
|
||||||
|
|
||||||
|
Non-blocking socket semantics:
|
||||||
|
- buffer has data → `recv` returns N>0 bytes (loop continues)
|
||||||
|
- buffer empty → `recv` raises `BlockingIOError`
|
||||||
|
- **peer FIN'd → `recv` returns 0 bytes (NEITHER exception NOR
|
||||||
|
break — infinite tight loop)**
|
||||||
|
|
||||||
|
`drain()` does not handle the `b''` return-value
|
||||||
|
(EOF) case. If `write_sock` has been closed (or the
|
||||||
|
process holding it is gone), every iteration returns
|
||||||
|
0 → infinite loop → 100% CPU on a single core.
|
||||||
|
|
||||||
|
## Why this triggers under `main_thread_forkserver`
|
||||||
|
|
||||||
|
Under `os.fork()` from the forkserver-worker thread:
|
||||||
|
|
||||||
|
1. Parent has a `WakeupSocketpair` instance with
|
||||||
|
`wakeup_sock=fdN`, `write_sock=fdM`. Both fds
|
||||||
|
open in parent.
|
||||||
|
2. Fork → child inherits BOTH fds (kernel-level fd
|
||||||
|
table dup).
|
||||||
|
3. `_close_inherited_fds()` runs in child →
|
||||||
|
closes everything except stdio. `wakeup_sock` and
|
||||||
|
`write_sock` of the parent's `WakeupSocketpair`
|
||||||
|
ARE closed in child.
|
||||||
|
4. Child's trio (running fresh) creates its OWN
|
||||||
|
`WakeupSocketpair` → NEW fd numbers (e.g. fd 6, 7).
|
||||||
|
5. **In `infect_asyncio` mode** the asyncio loop is
|
||||||
|
the host; trio runs as guest via
|
||||||
|
`start_guest_run`. trio still creates its
|
||||||
|
`WakeupSocketpair` in the I/O manager but its
|
||||||
|
role is different.
|
||||||
|
|
||||||
|
The race window: somewhere between (3) and (5), if a
|
||||||
|
`WakeupSocketpair` Python object reference inherited
|
||||||
|
via COW (from parent's pre-fork heap) survives long
|
||||||
|
enough that `drain()` is called on it AFTER its fds
|
||||||
|
were closed but BEFORE the child's NEW socketpair
|
||||||
|
takes over the recycled fd numbers — the recycled fd
|
||||||
|
will be one of the child's NEW socketpair ends, whose
|
||||||
|
peer might be FIN-flagged (e.g. parent-process
|
||||||
|
peer-end is closed).
|
||||||
|
|
||||||
|
Or simpler: the `wait_for_actor`/`find_actor` discovery
|
||||||
|
flow in `test_register_duplicate_name` triggers an
|
||||||
|
unusual code path where a stale `WakeupSocketpair`
|
||||||
|
gets `drain()`-called on a fd whose peer has already
|
||||||
|
closed.
|
||||||
|
|
||||||
|
## Why `drain()` shouldn't loop indefinitely on EOF
|
||||||
|
(upstream trio bug)
|
||||||
|
|
||||||
|
Even WITHOUT fork, `drain()` should treat `b''` as
|
||||||
|
EOF and break. The current code is correct for the
|
||||||
|
"buffer drained on a healthy socketpair" scenario but
|
||||||
|
incorrect for the "peer is gone" scenario. It's a
|
||||||
|
defensive-programming gap in trio.
|
||||||
|
|
||||||
|
A one-line patch upstream:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def drain(self) -> None:
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = self.wakeup_sock.recv(2**16)
|
||||||
|
if not data:
|
||||||
|
break # peer-closed; nothing more to drain
|
||||||
|
except BlockingIOError:
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
|
||||||
|
## Workarounds (until the underlying issue lands)
|
||||||
|
|
||||||
|
1. **Skip-mark on the fork backend**:
|
||||||
|
`tests/test_multi_program.py` →
|
||||||
|
`pytest.mark.skipon_spawn_backend('main_thread_forkserver',
|
||||||
|
reason='trio WakeupSocketpair.drain busy-loop, see ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md')`.
|
||||||
|
|
||||||
|
2. **Defensive monkey-patch in tractor's
|
||||||
|
forkserver-child prelude** — wrap
|
||||||
|
`WakeupSocketpair.drain` to handle `b''`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# in `_actor_child_main` or `_close_inherited_fds`'s
|
||||||
|
# post-fork prelude:
|
||||||
|
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||||
|
_orig_drain = WakeupSocketpair.drain
|
||||||
|
def _safe_drain(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = self.wakeup_sock.recv(2**16)
|
||||||
|
if not data:
|
||||||
|
return # peer closed
|
||||||
|
except BlockingIOError:
|
||||||
|
pass
|
||||||
|
WakeupSocketpair.drain = _safe_drain
|
||||||
|
```
|
||||||
|
|
||||||
|
Tracks upstream — remove once trio fixes.
|
||||||
|
|
||||||
|
3. **Upstream the fix**: 1-line PR to `python-trio/trio`
|
||||||
|
adding `if not data: break` to `drain()`.
|
||||||
|
|
||||||
|
## Investigation next steps
|
||||||
|
|
||||||
|
1. **Confirm via py-spy**: when caught alive, detach
|
||||||
|
strace first then
|
||||||
|
`sudo py-spy dump --pid <subactor> --locals`. The
|
||||||
|
busy thread should show `drain` from `WakeupSocketpair`
|
||||||
|
in the call chain.
|
||||||
|
2. **Identify which write-end peer is closed**: from
|
||||||
|
the inode of fd 6, look up the matching peer
|
||||||
|
inode via `ss -xp` and see whose process it
|
||||||
|
was/is.
|
||||||
|
3. **Verify the missed-EOF hypothesis**: hand-craft a
|
||||||
|
minimal `WakeupSocketpair` repro:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||||
|
ws = WakeupSocketpair()
|
||||||
|
ws.write_sock.close() # simulate peer-gone
|
||||||
|
ws.drain() # should hang forever
|
||||||
|
```
|
||||||
|
|
||||||
|
## Sibling bug
|
||||||
|
|
||||||
|
`tests/test_infected_asyncio.py::test_aio_simple_error`
|
||||||
|
hangs under the same backend with a DIFFERENT
|
||||||
|
fingerprint (Mode-A deadlock, both parties in
|
||||||
|
`epoll_wait`, no busy-loop). Distinct root cause —
|
||||||
|
see `infected_asyncio_under_main_thread_forkserver_hang_issue.md`.
|
||||||
|
|
||||||
|
Both share the broader theme: **trio internal-state
|
||||||
|
initialization isn't fully fork-safe under
|
||||||
|
`main_thread_forkserver`** for the more exotic
|
||||||
|
dispatch paths.
|
||||||
|
|
||||||
|
## See also
|
||||||
|
|
||||||
|
- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella
|
||||||
|
- python-trio/trio#1614 — trio + fork hazards
|
||||||
|
- `trio._core._wakeup_socketpair.WakeupSocketpair`
|
||||||
|
source (the smoking gun)
|
||||||
|
- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md`
|
||||||
|
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
|
||||||
|
|
@ -27,12 +27,9 @@ async def main():
|
||||||
'''
|
'''
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
loglevel='cancel',
|
) as an:
|
||||||
# loglevel='devx',
|
p0 = await an.start_actor('bp_forever', enable_modules=[__name__])
|
||||||
) as n:
|
p1 = await an.start_actor('name_error', enable_modules=[__name__])
|
||||||
|
|
||||||
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
|
|
||||||
p1 = await n.start_actor('name_error', enable_modules=[__name__])
|
|
||||||
|
|
||||||
# retreive results
|
# retreive results
|
||||||
async with p0.open_stream_from(breakpoint_forever) as stream:
|
async with p0.open_stream_from(breakpoint_forever) as stream:
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@ async def main():
|
||||||
"""
|
"""
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
# loglevel='cancel',
|
loglevel='pdb',
|
||||||
) as n:
|
) as n:
|
||||||
|
|
||||||
# spawn both actors
|
# spawn both actors
|
||||||
|
|
|
||||||
|
|
@ -39,8 +39,8 @@ async def main():
|
||||||
'''
|
'''
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
loglevel='devx',
|
enable_transports=['uds'], # TODO, apss this via osenv?
|
||||||
enable_transports=['uds'],
|
loglevel='devx', # XXX, required for test!
|
||||||
) as n:
|
) as n:
|
||||||
|
|
||||||
# spawn both actors
|
# spawn both actors
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,3 @@
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
|
@ -9,16 +8,22 @@ async def key_error():
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
"""Root dies
|
'''
|
||||||
|
Root is fail-after-cancelled while blocking and child RPC fails
|
||||||
|
simultaneously.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
loglevel='debug'
|
# loglevel='debug' # ?XXX required?
|
||||||
) as n:
|
) as n:
|
||||||
|
|
||||||
# spawn both actors
|
# spawn both actors
|
||||||
portal = await n.run_in_actor(key_error)
|
portal = await n.run_in_actor(key_error)
|
||||||
|
print(
|
||||||
|
f'Child is up @ {portal.chan.aid.reprol()}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# XXX: originally a bug caused by this is where root would enter
|
# XXX: originally a bug caused by this is where root would enter
|
||||||
# the debugger and clobber the tty used by the repl even though
|
# the debugger and clobber the tty used by the repl even though
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,11 @@ async def just_bp(
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
|
# !TODO, parametrize the --tpt-proto={key} with osenv vars just
|
||||||
|
# like we do for loglevel/spawn-backend!
|
||||||
|
# - [ ] run on both tpts for all such debugger tests?
|
||||||
|
# - [ ] special skip for macos!
|
||||||
|
#
|
||||||
if platform.system() != 'Darwin':
|
if platform.system() != 'Darwin':
|
||||||
tpt = 'uds'
|
tpt = 'uds'
|
||||||
else:
|
else:
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ async def name_error():
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
# loglevel='transport',
|
|
||||||
) as an:
|
) as an:
|
||||||
|
|
||||||
# TODO: ideally the REPL arrives at this frame in the parent,
|
# TODO: ideally the REPL arrives at this frame in the parent,
|
||||||
|
|
|
||||||
|
|
@ -135,25 +135,30 @@ def pytest_addoption(
|
||||||
"--ll",
|
"--ll",
|
||||||
action="store",
|
action="store",
|
||||||
dest='loglevel',
|
dest='loglevel',
|
||||||
default='ERROR', help="logging level to set when testing"
|
default=None,
|
||||||
|
help="logging level to set when testing",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session', autouse=True)
|
@pytest.fixture(scope='session', autouse=True)
|
||||||
def loglevel(
|
def loglevel(
|
||||||
request: pytest.FixtureRequest,
|
request: pytest.FixtureRequest,
|
||||||
) -> str:
|
) -> str|None:
|
||||||
import tractor
|
import tractor
|
||||||
orig = tractor.log._default_loglevel
|
orig = tractor.log._default_loglevel
|
||||||
level = tractor.log._default_loglevel = request.config.option.loglevel
|
flag_level: str|None = request.config.option.loglevel
|
||||||
|
|
||||||
|
if flag_level is not None:
|
||||||
|
tractor.log._default_loglevel = flag_level
|
||||||
|
|
||||||
log = tractor.log.get_console_log(
|
log = tractor.log.get_console_log(
|
||||||
level=level,
|
level=flag_level,
|
||||||
name='tractor', # <- enable root logger
|
name='tractor', # <- enable root logger
|
||||||
)
|
)
|
||||||
log.info(
|
log.info(
|
||||||
f'Test-harness set runtime loglevel: {level!r}\n'
|
f'Test-harness set runtime loglevel: {flag_level!r}\n'
|
||||||
)
|
)
|
||||||
yield level
|
yield flag_level
|
||||||
tractor.log._default_loglevel = orig
|
tractor.log._default_loglevel = orig
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -269,13 +269,10 @@ def ctlc(
|
||||||
|
|
||||||
def expect(
|
def expect(
|
||||||
child,
|
child,
|
||||||
|
patt: str, # often a `pdbp`-prompt
|
||||||
# normally a `pdb` prompt by default
|
|
||||||
patt: str,
|
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> None:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
Expect wrapper that prints last seen console
|
Expect wrapper that prints last seen console
|
||||||
data before failing.
|
data before failing.
|
||||||
|
|
@ -286,6 +283,8 @@ def expect(
|
||||||
patt,
|
patt,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
before = str(child.before.decode())
|
||||||
|
return before
|
||||||
except TIMEOUT:
|
except TIMEOUT:
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
print(before)
|
print(before)
|
||||||
|
|
|
||||||
|
|
@ -765,6 +765,7 @@ def test_multi_subactors_root_errors(
|
||||||
def test_multi_nested_subactors_error_through_nurseries(
|
def test_multi_nested_subactors_error_through_nurseries(
|
||||||
ci_env: bool,
|
ci_env: bool,
|
||||||
spawn: PexpectSpawner,
|
spawn: PexpectSpawner,
|
||||||
|
start_method: str,
|
||||||
|
|
||||||
# TODO: address debugger issue for nested tree:
|
# TODO: address debugger issue for nested tree:
|
||||||
# https://github.com/goodboy/tractor/issues/320
|
# https://github.com/goodboy/tractor/issues/320
|
||||||
|
|
@ -781,16 +782,16 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
# A test (below) has now been added to explicitly verify this is
|
# A test (below) has now been added to explicitly verify this is
|
||||||
# fixed.
|
# fixed.
|
||||||
|
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn(
|
||||||
|
'multi_nested_subactors_error_up_through_nurseries',
|
||||||
# timed_out_early: bool = False
|
loglevel='pdb',
|
||||||
|
)
|
||||||
for (
|
for (
|
||||||
i,
|
i,
|
||||||
send_char,
|
send_char,
|
||||||
) in enumerate(itertools.cycle(['c', 'q'])):
|
) in enumerate(itertools.cycle(['c', 'q'])):
|
||||||
|
|
||||||
timeout: float = -1
|
timeout: float = child.timeout
|
||||||
if (
|
if (
|
||||||
_non_linux
|
_non_linux
|
||||||
and
|
and
|
||||||
|
|
@ -803,6 +804,15 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
elif i == 0:
|
elif i == 0:
|
||||||
timeout = 5
|
timeout = 5
|
||||||
|
|
||||||
|
# XXX forking backends may take longer due to
|
||||||
|
# determinstic IPC cancellation.
|
||||||
|
if (
|
||||||
|
start_method in [
|
||||||
|
'main_thread_forkserver',
|
||||||
|
]
|
||||||
|
):
|
||||||
|
timeout += 4
|
||||||
|
|
||||||
try:
|
try:
|
||||||
child.expect(
|
child.expect(
|
||||||
PROMPT,
|
PROMPT,
|
||||||
|
|
@ -1187,7 +1197,11 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
|
||||||
mashed and zombie reaper kills sub with no hangs.
|
mashed and zombie reaper kills sub with no hangs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
child = spawn('subactor_bp_in_ctx')
|
child = spawn(
|
||||||
|
'subactor_bp_in_ctx',
|
||||||
|
loglevel='devx'
|
||||||
|
# ^XXX REQUIRED for below patt matching!
|
||||||
|
)
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
|
|
||||||
# 3 iters for the `gen()` pause-points
|
# 3 iters for the `gen()` pause-points
|
||||||
|
|
@ -1277,7 +1291,11 @@ def test_crash_handling_within_cancelled_root_actor(
|
||||||
call.
|
call.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
child = spawn('root_self_cancelled_w_error')
|
child = spawn(
|
||||||
|
'root_self_cancelled_w_error',
|
||||||
|
loglevel='cancel',
|
||||||
|
# ^XXX REQUIRED for below patt matching!
|
||||||
|
)
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
|
|
||||||
assert_before(
|
assert_before(
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,8 @@ def test_shield_pause(
|
||||||
...,
|
...,
|
||||||
PexpectSpawner,
|
PexpectSpawner,
|
||||||
],
|
],
|
||||||
|
start_method: str,
|
||||||
|
request: pytest.FixtureRequest,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||||
|
|
@ -119,6 +121,21 @@ def test_shield_pause(
|
||||||
# (above) and `end-of-('hanger'` (below).
|
# (above) and `end-of-('hanger'` (below).
|
||||||
handle_out_of_order: bool = False
|
handle_out_of_order: bool = False
|
||||||
|
|
||||||
|
# XXX, when capfd is NOT used we don't expect to
|
||||||
|
# see the logging output from the subactor.
|
||||||
|
if (no_capfd := (start_method in [
|
||||||
|
'main_thread_forkserver',
|
||||||
|
])
|
||||||
|
):
|
||||||
|
opts = request.config.option
|
||||||
|
assert opts.spawn_backend == start_method
|
||||||
|
# ?XXX? i guess the `testdir` fixture "pretends to" reset
|
||||||
|
# this to the default 'fd'??
|
||||||
|
# assert opts.capture in [
|
||||||
|
# 'sys',
|
||||||
|
# 'no',
|
||||||
|
# ]
|
||||||
|
|
||||||
if (
|
if (
|
||||||
handle_out_of_order
|
handle_out_of_order
|
||||||
and
|
and
|
||||||
|
|
@ -128,25 +145,30 @@ def test_shield_pause(
|
||||||
assert 'Relaying `SIGUSR1`[10] to sub-actor' in _before
|
assert 'Relaying `SIGUSR1`[10] to sub-actor' in _before
|
||||||
|
|
||||||
else:
|
else:
|
||||||
expect(
|
_before = expect(
|
||||||
child,
|
child,
|
||||||
'Relaying `SIGUSR1`\\[10\\] to sub-actor',
|
'Relaying `SIGUSR1`\\[10\\] to sub-actor',
|
||||||
)
|
)
|
||||||
expect(
|
# _before: str = assert_before(
|
||||||
child,
|
# child,
|
||||||
# end-of-subactor's-tree delimiter
|
# ["('hanger'",] # uid line
|
||||||
"end-of-\('hanger'",
|
# )
|
||||||
)
|
if not no_capfd:
|
||||||
_before: str = assert_before(
|
expect(
|
||||||
child,
|
child,
|
||||||
[
|
# end-of-subactor's-tree delimiter
|
||||||
"('hanger'", # uid line
|
"end-of-\('hanger'",
|
||||||
|
)
|
||||||
|
_before: str = assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
"('hanger'", # uid line
|
||||||
|
|
||||||
# TODO!? SEE ABOVE
|
# TODO!? SEE ABOVE
|
||||||
# hanger LOC where it's shield-halted
|
# hanger LOC where it's shield-halted
|
||||||
# 'await trio.sleep_forever() # in subactor',
|
# 'await trio.sleep_forever() # in subactor',
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# simulate the user sending a ctl-c to the hanging program.
|
# simulate the user sending a ctl-c to the hanging program.
|
||||||
|
|
@ -163,14 +185,19 @@ def test_shield_pause(
|
||||||
_shutdown_msg,
|
_shutdown_msg,
|
||||||
timeout=6,
|
timeout=6,
|
||||||
)
|
)
|
||||||
assert_before(
|
expect_on_teardown: list[str] = [
|
||||||
child,
|
'raise KeyboardInterrupt',
|
||||||
[
|
'Root actor terminated',
|
||||||
'raise KeyboardInterrupt',
|
]
|
||||||
|
if not no_capfd:
|
||||||
|
expect_on_teardown += [
|
||||||
# 'Shutting down actor runtime',
|
# 'Shutting down actor runtime',
|
||||||
'#T-800 deployed to collect zombie B0',
|
'#T-800 deployed to collect zombie B0',
|
||||||
"'--uid', \"('hanger',",
|
"'--uid', \"('hanger',",
|
||||||
]
|
]
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
expect_on_teardown,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,99 @@
|
||||||
|
'''
|
||||||
|
Regression tests for `tractor.trionics.patches` —
|
||||||
|
defensive monkey-patches on upstream `trio` bugs.
|
||||||
|
|
||||||
|
Each test asserts:
|
||||||
|
|
||||||
|
1. The bug exists (or is gone — skip cleanly if
|
||||||
|
upstream shipped the fix and our `is_needed()` now
|
||||||
|
returns `False`).
|
||||||
|
2. Our patch fixes it (post-`apply()` the `repro()`
|
||||||
|
returns cleanly within a tight wall-clock cap).
|
||||||
|
|
||||||
|
Wall-clock caps are critical here — the bugs we patch
|
||||||
|
are tight-loops or deadlocks, so a regression would
|
||||||
|
HANG the test runner unless we hard-cap each
|
||||||
|
`repro()` call.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import signal
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tractor.trionics import patches
|
||||||
|
from tractor.trionics.patches import _wakeup_socketpair as wsp
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _alarm_cleanup():
|
||||||
|
'''
|
||||||
|
Ensure no leftover SIGALRM survives a test failure
|
||||||
|
or unexpected return.
|
||||||
|
|
||||||
|
'''
|
||||||
|
yield
|
||||||
|
signal.alarm(0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_wakeup_socketpair_drain_eof_patch_works():
|
||||||
|
'''
|
||||||
|
Without the patch, `WakeupSocketpair.drain()` on a
|
||||||
|
socketpair whose write-end has been closed spins
|
||||||
|
forever. With the patch applied, it returns
|
||||||
|
cleanly within milliseconds.
|
||||||
|
|
||||||
|
Wall-clock cap: 2s. If the patch regresses, SIGALRM
|
||||||
|
fires and the test hard-fails with a clear signal
|
||||||
|
instead of hanging CI indefinitely.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if not wsp.is_needed():
|
||||||
|
pytest.skip(
|
||||||
|
'upstream trio shipped the fix — '
|
||||||
|
'patch no longer needed for trio '
|
||||||
|
'(see `is_needed()` for version gate)'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Apply the patch.
|
||||||
|
applied: bool = wsp.apply()
|
||||||
|
# First call MUST return True; idempotent guard
|
||||||
|
# prevents False on subsequent calls within the
|
||||||
|
# same process.
|
||||||
|
assert applied is True or applied is False # idempotent
|
||||||
|
|
||||||
|
# Cap wall-clock at 2s; SIGALRM raises in main
|
||||||
|
# thread which interrupts the C-level recv loop
|
||||||
|
# IF the patch regresses (since `signal.alarm`
|
||||||
|
# uses Python's signal-wakeup-fd which the patch
|
||||||
|
# itself relies on... but `repro()` runs OUTSIDE
|
||||||
|
# a trio.run, so it's plain stdlib semantics here
|
||||||
|
# — alarm WILL fire during `recv` syscall).
|
||||||
|
signal.alarm(2)
|
||||||
|
wsp.repro()
|
||||||
|
signal.alarm(0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_all_idempotent():
|
||||||
|
'''
|
||||||
|
Calling `apply_all()` twice should not double-
|
||||||
|
apply: second call's dict has all-False values
|
||||||
|
(every patch reports "already applied").
|
||||||
|
|
||||||
|
'''
|
||||||
|
first: dict[str, bool] = patches.apply_all()
|
||||||
|
second: dict[str, bool] = patches.apply_all()
|
||||||
|
|
||||||
|
# Second call: every patch reports skipped.
|
||||||
|
assert all(v is False for v in second.values()), (
|
||||||
|
f'apply_all() not idempotent: {second}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# First call: at least one patch was applied
|
||||||
|
# (or all are no-ops because `is_needed()` is
|
||||||
|
# False everywhere — the all-fixed-upstream future
|
||||||
|
# state which is also valid).
|
||||||
|
assert isinstance(first, dict)
|
||||||
|
for name, applied in first.items():
|
||||||
|
assert isinstance(applied, bool), (
|
||||||
|
f'patch {name!r} returned non-bool: {applied!r}'
|
||||||
|
)
|
||||||
|
|
@ -63,6 +63,14 @@ def _actor_child_main(
|
||||||
sub-interpreter via `Interpreter.call()`.
|
sub-interpreter via `Interpreter.call()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# Apply defensive monkey-patches for upstream `trio`
|
||||||
|
# bugs we've encountered while running tractor — see
|
||||||
|
# `tractor.trionics.patches` for the catalog +
|
||||||
|
# per-patch upstream-fix tracking. Must run BEFORE
|
||||||
|
# any trio runtime init.
|
||||||
|
from .trionics.patches import apply_all
|
||||||
|
apply_all()
|
||||||
|
|
||||||
subactor = Actor(
|
subactor = Actor(
|
||||||
name=uid[0],
|
name=uid[0],
|
||||||
uuid=uid[1],
|
uuid=uid[1],
|
||||||
|
|
|
||||||
|
|
@ -218,6 +218,65 @@ def find_descendants(
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def find_runaway_subactors(
|
||||||
|
parent_pid: int,
|
||||||
|
*,
|
||||||
|
cpu_threshold: float = 95.0,
|
||||||
|
sample_interval: float = 0.5,
|
||||||
|
only_pids: set[int]|None = None,
|
||||||
|
) -> list[tuple[int, float, str]]:
|
||||||
|
'''
|
||||||
|
Return `(pid, cpu_pct, cmdline)` for any descendant
|
||||||
|
of `parent_pid` currently burning CPU above
|
||||||
|
`cpu_threshold` (default 95%) — the smoking-gun
|
||||||
|
signature of a runaway tight-loop bug (e.g. a C-level
|
||||||
|
`recvfrom` loop on a closed socket that missed EOF
|
||||||
|
detection — see
|
||||||
|
`ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`).
|
||||||
|
|
||||||
|
`cpu_percent(interval=sample_interval)` is the
|
||||||
|
canonical psutil API for a "what %CPU is this proc
|
||||||
|
using NOW" answer — it samples twice with a delta to
|
||||||
|
compute true utilization.
|
||||||
|
|
||||||
|
`only_pids` filters to a specific pre-snapshotted set
|
||||||
|
(e.g. "pids spawned during this test only"); when
|
||||||
|
`None`, all live descendants are checked.
|
||||||
|
|
||||||
|
Returns `[]` when `psutil` isn't installed or no
|
||||||
|
descendants match.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
import psutil
|
||||||
|
except ImportError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
candidates: list[int] = find_descendants(parent_pid)
|
||||||
|
if only_pids is not None:
|
||||||
|
candidates = [p for p in candidates if p in only_pids]
|
||||||
|
if not candidates:
|
||||||
|
return []
|
||||||
|
|
||||||
|
runaways: list[tuple[int, float, str]] = []
|
||||||
|
for pid in candidates:
|
||||||
|
try:
|
||||||
|
proc = psutil.Process(pid)
|
||||||
|
cpu: float = proc.cpu_percent(
|
||||||
|
interval=sample_interval,
|
||||||
|
)
|
||||||
|
if cpu < cpu_threshold:
|
||||||
|
continue
|
||||||
|
cmdline: str = ' '.join(proc.cmdline())
|
||||||
|
runaways.append((pid, cpu, cmdline))
|
||||||
|
except (
|
||||||
|
psutil.NoSuchProcess,
|
||||||
|
psutil.AccessDenied,
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
return runaways
|
||||||
|
|
||||||
|
|
||||||
def find_orphans(
|
def find_orphans(
|
||||||
repo_root: pathlib.Path,
|
repo_root: pathlib.Path,
|
||||||
) -> list[int]:
|
) -> list[int]:
|
||||||
|
|
@ -679,7 +738,7 @@ def _track_orphaned_uds_per_test():
|
||||||
`wait_for_actor`/`find_actor` discovery probes can
|
`wait_for_actor`/`find_actor` discovery probes can
|
||||||
accidentally hit (FileExistsError on rebind, or
|
accidentally hit (FileExistsError on rebind, or
|
||||||
epoll register on a half-closed peer-FIN'd fd → see
|
epoll register on a half-closed peer-FIN'd fd → see
|
||||||
issue #452). Catching the leak the test that caused
|
issue #454). Catching the leak the test that caused
|
||||||
it (vs. blanket session-end sweep) makes blame
|
it (vs. blanket session-end sweep) makes blame
|
||||||
obvious + prevents cascade flakiness.
|
obvious + prevents cascade flakiness.
|
||||||
|
|
||||||
|
|
@ -728,14 +787,133 @@ def _track_orphaned_uds_per_test():
|
||||||
if new_orphans:
|
if new_orphans:
|
||||||
import warnings
|
import warnings
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
f'UDS sock-file LEAK detected from test '
|
'UDS sock-file LEAK detected from test '
|
||||||
f'(reaping):\n '
|
'(reaping):\n '
|
||||||
+ '\n '.join(new_orphans),
|
+ '\n '.join(new_orphans),
|
||||||
stacklevel=1,
|
stacklevel=1,
|
||||||
)
|
)
|
||||||
reap_uds(new_orphans)
|
reap_uds(new_orphans)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope='function',
|
||||||
|
autouse=True,
|
||||||
|
)
|
||||||
|
def _detect_runaway_subactors_per_test():
|
||||||
|
'''
|
||||||
|
Per-test (function-scoped) autouse runaway-subactor
|
||||||
|
detector.
|
||||||
|
|
||||||
|
Snapshots descendant pids before+after each test;
|
||||||
|
for any pid spawned during the test that's still
|
||||||
|
ALIVE at teardown AND burning >95% CPU, emits a loud
|
||||||
|
warning with `pid`, sampled `cpu%`, full `cmdline`,
|
||||||
|
AND copy-pastable diag commands (`strace`, `lsof`,
|
||||||
|
`ss`, `kill`).
|
||||||
|
|
||||||
|
**Does NOT kill the runaway** — by design.
|
||||||
|
The point of this fixture is to make tight-loop bugs
|
||||||
|
(e.g. C-level `recvfrom` loop on a closed socket
|
||||||
|
that missed EOF detection — see
|
||||||
|
`ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`)
|
||||||
|
loudly visible AT the test that triggers, while
|
||||||
|
keeping the live pid available for hands-on
|
||||||
|
diagnosis. The session-end
|
||||||
|
`_reap_orphaned_subactors` fixture will
|
||||||
|
SIGINT-then-SIGKILL any survivors when the test
|
||||||
|
session completes normally; if the user Ctrl-C's
|
||||||
|
pytest mid-warning, the pid stays alive for as long
|
||||||
|
as needed.
|
||||||
|
|
||||||
|
Cost: one extra `os.listdir('/proc')` snapshot
|
||||||
|
pre-test, one snapshot + N×`psutil.cpu_percent(0.5)`
|
||||||
|
post-test (only when there ARE new descendants —
|
||||||
|
most tests don't trigger any sampling). Skips
|
||||||
|
silently when `psutil` isn't installed.
|
||||||
|
|
||||||
|
'''
|
||||||
|
parent_pid: int = os.getpid()
|
||||||
|
|
||||||
|
def _emit_runaway_warning(
|
||||||
|
runaways: list[tuple[int, float, str]],
|
||||||
|
when: str,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Format + emit the runaway warning. Shared between
|
||||||
|
the SETUP-side (pre-yield, catches survivors of a
|
||||||
|
prior hung test) and TEARDOWN-side (post-yield,
|
||||||
|
catches normally-completing tests that left a
|
||||||
|
runaway behind) detection passes.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msg_lines: list[str] = [
|
||||||
|
f'RUNAWAY subactor(s) detected at {when} — '
|
||||||
|
f'burning CPU (>95%):',
|
||||||
|
]
|
||||||
|
for pid, cpu, cmdline in runaways:
|
||||||
|
msg_lines.extend([
|
||||||
|
f' pid={pid} cpu={cpu:.1f}% cmdline={cmdline!r}',
|
||||||
|
f' diagnose live (pid stays alive — NOT killed):',
|
||||||
|
f' sudo strace -p {pid} -f -tt -e trace=recvfrom,epoll_wait,read,write',
|
||||||
|
f' sudo readlink /proc/{pid}/fd/* 2>/dev/null | head -20',
|
||||||
|
f' sudo ss -tnp | grep {pid}',
|
||||||
|
f' sudo lsof -p {pid}',
|
||||||
|
f' manual kill when done:',
|
||||||
|
f' kill -SIGINT {pid} # graceful first',
|
||||||
|
f' kill -SIGKILL {pid} # if SIGINT ignored (busy in C)',
|
||||||
|
'',
|
||||||
|
])
|
||||||
|
import warnings
|
||||||
|
warnings.warn(
|
||||||
|
'\n'.join(msg_lines),
|
||||||
|
stacklevel=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
# SETUP-side detection: catches runaways inherited
|
||||||
|
# from a PRIOR test that hung (and the user
|
||||||
|
# Ctrl-C'd or pytest-timeout fired) — those tests'
|
||||||
|
# teardown-side detector never ran, but the
|
||||||
|
# subactor is still burning CPU when the next test
|
||||||
|
# starts. The warning comes ONE TEST LATE which is
|
||||||
|
# imperfect but better than silence.
|
||||||
|
pre_existing: set[int] = set(find_descendants(parent_pid))
|
||||||
|
pre_runaways: list[tuple[int, float, str]] = (
|
||||||
|
find_runaway_subactors(
|
||||||
|
parent_pid,
|
||||||
|
only_pids=pre_existing,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if pre_runaways:
|
||||||
|
_emit_runaway_warning(
|
||||||
|
pre_runaways,
|
||||||
|
when='test SETUP (leftover from prior hung test)',
|
||||||
|
)
|
||||||
|
|
||||||
|
yield
|
||||||
|
|
||||||
|
# TEARDOWN-side detection: catches runaways spawned
|
||||||
|
# by THIS test that survived a normal teardown
|
||||||
|
# (i.e. parent's `hard_kill` SIGKILL didn't actually
|
||||||
|
# stop the runaway because it was in C tight-loop
|
||||||
|
# somewhere unreachable to signals — see
|
||||||
|
# `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
|
||||||
|
# for the canonical fork-spawn forkserver-worker
|
||||||
|
# post-fork-close gap).
|
||||||
|
post_runaways: list[tuple[int, float, str]] = (
|
||||||
|
find_runaway_subactors(
|
||||||
|
parent_pid,
|
||||||
|
only_pids=set(
|
||||||
|
find_descendants(parent_pid)
|
||||||
|
) - pre_existing,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if post_runaways:
|
||||||
|
_emit_runaway_warning(
|
||||||
|
post_runaways,
|
||||||
|
when='test teardown',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def reap_subactors_per_test() -> int:
|
def reap_subactors_per_test() -> int:
|
||||||
'''
|
'''
|
||||||
|
|
|
||||||
|
|
@ -52,16 +52,30 @@ pytest_plugins: tuple[str, ...] = (
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from argparse import Namespace
|
from argparse import Namespace
|
||||||
|
|
||||||
|
_cap_sys_passed_as_flag: bool = False
|
||||||
|
_cap_fd_set: bool = False
|
||||||
|
|
||||||
# XXX REQUIRED in order to enforce `--capture=` flag
|
# XXX REQUIRED in order to enforce `--capture=` flag
|
||||||
# pre test session.
|
# pre test session.
|
||||||
# https://docs.pytest.org/en/stable/reference/reference.html#bootstrapping-hooks
|
# https://docs.pytest.org/en/stable/reference/reference.html#bootstrapping-hooks
|
||||||
|
@pytest.hookimpl(tryfirst=True)
|
||||||
def pytest_load_initial_conftests(
|
def pytest_load_initial_conftests(
|
||||||
early_config: pytest.Config,
|
early_config: pytest.Config,
|
||||||
parser: pytest.Parser,
|
parser: pytest.Parser,
|
||||||
args: list[str],
|
args: list[str],
|
||||||
):
|
):
|
||||||
|
global _cap_sys_passed_as_flag, _cap_fd_set
|
||||||
|
|
||||||
opts: Namespace = early_config.option
|
opts: Namespace = early_config.option
|
||||||
|
if opts.capture == 'fd':
|
||||||
|
_cap_fd_set = True
|
||||||
|
|
||||||
opts_w_args: Namespace = parser.parse_known_args(args)
|
opts_w_args: Namespace = parser.parse_known_args(args)
|
||||||
|
if opts_w_args.capture == 'fd':
|
||||||
|
_cap_fd_set = True
|
||||||
|
|
||||||
|
if '--capture=sys' in args:
|
||||||
|
_cap_sys_passed_as_flag = True
|
||||||
|
|
||||||
# XXX, ALWAYS apply capsys for fork based spawners:
|
# XXX, ALWAYS apply capsys for fork based spawners:
|
||||||
# * main_thread_forkserver
|
# * main_thread_forkserver
|
||||||
|
|
@ -94,14 +108,23 @@ def pytest_load_initial_conftests(
|
||||||
(spawner := opts_w_args.spawn_backend) in [
|
(spawner := opts_w_args.spawn_backend) in [
|
||||||
'main_thread_forkserver',
|
'main_thread_forkserver',
|
||||||
]
|
]
|
||||||
and
|
|
||||||
opts.capture == 'fd'
|
|
||||||
):
|
):
|
||||||
print(
|
print(
|
||||||
f'XXX SETTING CAPSYS due to spawning backend XXX\n'
|
f'XXX SETTING CAPSYS due to spawning backend XXX\n'
|
||||||
f'--spawn-backend={spawner!r}\n'
|
f'--spawn-backend={spawner!r}\n'
|
||||||
)
|
)
|
||||||
opts.capture = 'sys'
|
opts.capture = 'sys'
|
||||||
|
# ^TODO XXX?/
|
||||||
|
# seems like this doesn't get set by the above!?
|
||||||
|
args.append(
|
||||||
|
'--capture=sys',
|
||||||
|
)
|
||||||
|
out = parser.parse_known_and_unknown_args(
|
||||||
|
args,
|
||||||
|
early_config.option,
|
||||||
|
)
|
||||||
|
assert out[0].capture == 'sys'
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
# TODO, set various `$TRACTOR_X*` osenv vars here!
|
# TODO, set various `$TRACTOR_X*` osenv vars here!
|
||||||
print(
|
print(
|
||||||
|
|
@ -187,11 +210,17 @@ def tractor_test(
|
||||||
# injection (via `__wrapped__`) without leaking the async
|
# injection (via `__wrapped__`) without leaking the async
|
||||||
# nature.
|
# nature.
|
||||||
@wraps(wrapped)
|
@wraps(wrapped)
|
||||||
def wrapper(**kwargs):
|
def wrapper(
|
||||||
|
set_fork_aware_capture: pytest.CaptureFixture|None = None,
|
||||||
|
# ^NOTE when set, the decorated fn declared as fixture-param.
|
||||||
|
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
# NOTE, ensure we inject any test-fn declared fixture
|
# NOTE, ensure we inject any test-fn declared fixture
|
||||||
# names.
|
# names.
|
||||||
|
sig = inspect.signature(wrapped)
|
||||||
for kw in [
|
for kw in [
|
||||||
'reg_addr',
|
'reg_addr',
|
||||||
'loglevel',
|
'loglevel',
|
||||||
|
|
@ -200,9 +229,13 @@ def tractor_test(
|
||||||
'tpt_proto',
|
'tpt_proto',
|
||||||
'timeout',
|
'timeout',
|
||||||
]:
|
]:
|
||||||
if kw in inspect.signature(wrapped).parameters:
|
if kw in sig.parameters:
|
||||||
assert kw in kwargs
|
assert kw in kwargs
|
||||||
|
|
||||||
|
if 'set_fork_aware_capture' in sig.parameters:
|
||||||
|
assert set_fork_aware_capture
|
||||||
|
kwargs['set_fork_aware_capture'] = set_fork_aware_capture
|
||||||
|
|
||||||
# Extract runtime settings as locals for
|
# Extract runtime settings as locals for
|
||||||
# `open_root_actor()`; these must NOT leak into
|
# `open_root_actor()`; these must NOT leak into
|
||||||
# `kwargs` when the test fn doesn't declare them
|
# `kwargs` when the test fn doesn't declare them
|
||||||
|
|
@ -245,7 +278,6 @@ def tractor_test(
|
||||||
# invoke test-fn body IN THIS task
|
# invoke test-fn body IN THIS task
|
||||||
await wrapped(**kwargs)
|
await wrapped(**kwargs)
|
||||||
|
|
||||||
# invoke runtime via a root task.
|
|
||||||
return trio.run(
|
return trio.run(
|
||||||
partial(
|
partial(
|
||||||
_main,
|
_main,
|
||||||
|
|
@ -259,13 +291,6 @@ def tractor_test(
|
||||||
def pytest_addoption(
|
def pytest_addoption(
|
||||||
parser: pytest.Parser,
|
parser: pytest.Parser,
|
||||||
):
|
):
|
||||||
# parser.addoption(
|
|
||||||
# "--ll",
|
|
||||||
# action="store",
|
|
||||||
# dest='loglevel',
|
|
||||||
# default='ERROR', help="logging level to set when testing"
|
|
||||||
# )
|
|
||||||
|
|
||||||
parser.addoption(
|
parser.addoption(
|
||||||
"--spawn-backend",
|
"--spawn-backend",
|
||||||
action="store",
|
action="store",
|
||||||
|
|
@ -291,7 +316,7 @@ def pytest_addoption(
|
||||||
"--enable-stackscope",
|
"--enable-stackscope",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
dest='enable_stackscope',
|
dest='enable_stackscope',
|
||||||
# default=False,
|
default=False,
|
||||||
help=(
|
help=(
|
||||||
'Install `stackscope` SIGUSR1 handler in pytest + '
|
'Install `stackscope` SIGUSR1 handler in pytest + '
|
||||||
'every spawned subactor for live trio task-tree '
|
'every spawned subactor for live trio task-tree '
|
||||||
|
|
@ -317,6 +342,13 @@ def pytest_addoption(
|
||||||
def pytest_configure(
|
def pytest_configure(
|
||||||
config: pytest.Config,
|
config: pytest.Config,
|
||||||
):
|
):
|
||||||
|
# opts: Namespace = config.option
|
||||||
|
# print(
|
||||||
|
# f'PYTEST_CONFIGURE\n'
|
||||||
|
# f'capture={opts.capture!r}\n'
|
||||||
|
# )
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
backend: str = config.option.spawn_backend
|
backend: str = config.option.spawn_backend
|
||||||
from tractor.spawn._spawn import try_set_start_method
|
from tractor.spawn._spawn import try_set_start_method
|
||||||
try:
|
try:
|
||||||
|
|
@ -414,6 +446,25 @@ def pytest_collection_modifyitems(
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope="session",
|
||||||
|
autouse=True,
|
||||||
|
)
|
||||||
|
def alert_on_finish():
|
||||||
|
'''
|
||||||
|
Ring a terminal notification on full test session
|
||||||
|
completion to alert any would be human.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO, check attached to tty or skip!
|
||||||
|
yield # run all tests
|
||||||
|
print("\a") # trigger terminal bell
|
||||||
|
# ?TODO, any other nice-tricks/specific tuis we could try?
|
||||||
|
# - supposedly works in many terminals:
|
||||||
|
# >> print("\033]5;Alert: Tests Finished\a")
|
||||||
|
# - sway/i3-nag?
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
def debug_mode(
|
def debug_mode(
|
||||||
request: pytest.FixtureRequest,
|
request: pytest.FixtureRequest,
|
||||||
|
|
@ -538,6 +589,7 @@ def pytest_generate_tests(
|
||||||
"start_method",
|
"start_method",
|
||||||
[spawn_backend],
|
[spawn_backend],
|
||||||
scope='module',
|
scope='module',
|
||||||
|
ids=lambda item: f'start_method={spawn_backend}',
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO, parametrize any `tpt_proto: str` declaring tests!
|
# TODO, parametrize any `tpt_proto: str` declaring tests!
|
||||||
|
|
@ -548,3 +600,85 @@ def pytest_generate_tests(
|
||||||
# proto_tpts, # TODO, double check this list usage!
|
# proto_tpts, # TODO, double check this list usage!
|
||||||
# scope='module',
|
# scope='module',
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
def _is_forking_spawner(
|
||||||
|
start_method: str,
|
||||||
|
) -> bool:
|
||||||
|
return start_method in [
|
||||||
|
'main_thread_forkserver',
|
||||||
|
'mp_forkserver',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def is_forking_spawner(
|
||||||
|
start_method: str,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Is the `pytest` run using a `fork()`ing process spawning-backend?
|
||||||
|
|
||||||
|
'''
|
||||||
|
return _is_forking_spawner
|
||||||
|
|
||||||
|
|
||||||
|
def maybe_xfail_for_spawner(
|
||||||
|
start_method: str,
|
||||||
|
is_forking_spawner: bool,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Fork based spawning backends caude issues with `pytest`'s
|
||||||
|
fd-capture mechanism and can cause various suites to hang.
|
||||||
|
|
||||||
|
Instead this helper allows skipping/xfailing from a test
|
||||||
|
when a certain spawner + CLI-flag input is detected.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if (
|
||||||
|
not _cap_sys_passed_as_flag
|
||||||
|
and
|
||||||
|
is_forking_spawner
|
||||||
|
):
|
||||||
|
pytest.skip(
|
||||||
|
f'Spawner {start_method!r} requires the flag,\n'
|
||||||
|
f'--capture=sys or similar..\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def maybe_override_capture(
|
||||||
|
request: pytest.FixtureRequest,
|
||||||
|
start_method: bool,
|
||||||
|
) -> str:
|
||||||
|
if _is_forking_spawner(start_method):
|
||||||
|
request.getfixturevalue('capsys')
|
||||||
|
return 'sys'
|
||||||
|
|
||||||
|
return request.config.option.capture
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def set_fork_aware_capture(
|
||||||
|
request: pytest.FixtureRequest,
|
||||||
|
start_method: str,
|
||||||
|
) -> pytest.CaptureFixture|str:
|
||||||
|
'''
|
||||||
|
Force `--capture=sys` method for tests using
|
||||||
|
a forking-spawner backend due to fd-copying issues
|
||||||
|
which can oddly make certain tests hang/fail.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if _cap_sys_passed_as_flag:
|
||||||
|
return 'sys'
|
||||||
|
|
||||||
|
capsys: pytest.CaptureFixture = maybe_override_capture(
|
||||||
|
request=request,
|
||||||
|
start_method=start_method,
|
||||||
|
)
|
||||||
|
return capsys
|
||||||
|
# XXX reset?
|
||||||
|
# with capsys.disabled():
|
||||||
|
# pass
|
||||||
|
# return partial(
|
||||||
|
# maybe_override_capture,
|
||||||
|
# request=request,
|
||||||
|
# start_method=start_method,
|
||||||
|
# )
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,183 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Post-mortem subactor cleanup primitives — things the parent
|
||||||
|
runtime has to clean up because the dead-or-SIGKILL'd child
|
||||||
|
couldn't.
|
||||||
|
|
||||||
|
Sibling of `tractor._testing._reap` which is the test-harness
|
||||||
|
equivalent (orphan-pid + leaked-shm + leaked-UDS-sock sweeper
|
||||||
|
fixtures). This module is the spawn-layer counterpart, called
|
||||||
|
inline from `hard_kill` and the broader subactor reap path.
|
||||||
|
|
||||||
|
Today this is just `unlink_uds_bind_addrs()`. As future
|
||||||
|
post-mortem cleanup needs surface (e.g. `/dev/shm` segment
|
||||||
|
unlink for hard-crashed actors, leaked-pidfile cleanup), they
|
||||||
|
land here too.
|
||||||
|
|
||||||
|
Future-work TODO — authoritative UDS bind-addr tracking
|
||||||
|
-------------------------------------------------------
|
||||||
|
|
||||||
|
`unlink_uds_bind_addrs()` currently has two cleanup paths:
|
||||||
|
|
||||||
|
1. Explicit `bind_addrs` (when parent set them at spawn time)
|
||||||
|
2. **Convention-based reconstruction** —
|
||||||
|
`<XDG_RUNTIME_DIR>/tractor/<name>@<pid>.sock` — for the
|
||||||
|
common case where the subactor self-assigned a random sock
|
||||||
|
via `UDSAddress.get_random()`.
|
||||||
|
|
||||||
|
Path (2) hardcodes the `<name>@<pid>.sock` convention from
|
||||||
|
`tractor.ipc._uds.UDSAddress`. If that convention ever
|
||||||
|
changes — or the subactor binds to a non-default
|
||||||
|
`bindspace`/`filedir` — we'll silently fail to unlink.
|
||||||
|
|
||||||
|
A more authoritative approach would be:
|
||||||
|
|
||||||
|
- Subactors register their bound UDS sockpaths in a
|
||||||
|
per-process registry inside `tractor.ipc._uds` at
|
||||||
|
`start_listener()` time.
|
||||||
|
- The subactor reports its bound sockpath(s) back to the
|
||||||
|
parent over IPC immediately post-bind (extension to
|
||||||
|
`SpawnSpec` reply / a new handshake msg).
|
||||||
|
- Parent caches the subactor's authoritative sockpaths.
|
||||||
|
- `unlink_uds_bind_addrs()` checks the cache FIRST, falls
|
||||||
|
back to convention-reconstruction if the subactor died
|
||||||
|
before reporting (which is the SIGKILL case this fn
|
||||||
|
primarily exists for).
|
||||||
|
|
||||||
|
Tracked as future work in #454 (the parent UDS-leak
|
||||||
|
issue this module addresses); a separate issue may be
|
||||||
|
filed if/when the registry impl is scoped.
|
||||||
|
|
||||||
|
See also #452 — the discovery-client `CLOSE_WAIT` TCP
|
||||||
|
fd leak. Different bug class but same broader theme of
|
||||||
|
"fork-spawn unmasked latent cleanup gaps".
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from tractor.discovery._addr import (
|
||||||
|
UnwrappedAddress,
|
||||||
|
wrap_address,
|
||||||
|
)
|
||||||
|
from tractor.ipc._uds import UDSAddress
|
||||||
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor.runtime._runtime import Actor
|
||||||
|
|
||||||
|
|
||||||
|
log = get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
|
def unlink_uds_bind_addrs(
|
||||||
|
proc: trio.Process,
|
||||||
|
*,
|
||||||
|
bind_addrs: list[UnwrappedAddress] | None = None,
|
||||||
|
subactor: Actor | None = None,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Best-effort post-mortem cleanup of any UDS sock-files
|
||||||
|
a hard-killed subactor was bound to.
|
||||||
|
|
||||||
|
SIGKILL bypasses Python execution → the subactor's
|
||||||
|
`_serve_ipc_eps` `finally:` block (which normally calls
|
||||||
|
`os.unlink(addr.sockpath)`) never runs. Without this
|
||||||
|
parent-side cleanup, the dead subactor's
|
||||||
|
`${XDG_RUNTIME_DIR}/tractor/<name>@<pid>.sock` file
|
||||||
|
accumulates on the filesystem (see issue #454 + the
|
||||||
|
autouse `_track_orphaned_uds_per_test` fixture).
|
||||||
|
|
||||||
|
Two cleanup paths, in order:
|
||||||
|
|
||||||
|
1. **Explicit `bind_addrs`** — when the parent set the
|
||||||
|
subactor's bind addrs at spawn time, unlink each
|
||||||
|
UDS-flavored sockpath directly.
|
||||||
|
2. **Self-assigned reconstruction** — when
|
||||||
|
`bind_addrs` is empty (the common case: subactor
|
||||||
|
picked its own random sock via
|
||||||
|
`UDSAddress.get_random()`), reconstruct the path
|
||||||
|
from `(subactor.aid.name, proc.pid)` using the
|
||||||
|
same `<name>@<pid>.sock` convention. We can do this
|
||||||
|
because the subactor uses its OWN `os.getpid()` at
|
||||||
|
bind time, which equals `proc.pid` from the
|
||||||
|
parent's view.
|
||||||
|
|
||||||
|
Idempotent: `FileNotFoundError` (graceful exit
|
||||||
|
already-unlinked, or sock never bound under early-
|
||||||
|
spawn cancel) is silenced; other `OSError`s log a
|
||||||
|
warning but never raise. TCP / non-UDS bind addrs are
|
||||||
|
skipped.
|
||||||
|
|
||||||
|
'''
|
||||||
|
sockpaths: list[str] = []
|
||||||
|
|
||||||
|
# path 1: explicit bind_addrs set at spawn time
|
||||||
|
for unwrapped in (bind_addrs or ()):
|
||||||
|
try:
|
||||||
|
addr = wrap_address(unwrapped)
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
f'Failed to wrap addr for UDS post-kill cleanup '
|
||||||
|
f'— skipping {unwrapped!r}\n'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
if isinstance(addr, UDSAddress):
|
||||||
|
sockpaths.append(str(addr.sockpath))
|
||||||
|
|
||||||
|
# path 2: reconstruct from subactor name + proc pid
|
||||||
|
# for the random-self-assign case (bind_addrs=None)
|
||||||
|
#
|
||||||
|
# TODO authoritative tracking — see module docstring.
|
||||||
|
if (
|
||||||
|
not sockpaths
|
||||||
|
and subactor is not None
|
||||||
|
and proc.pid is not None
|
||||||
|
):
|
||||||
|
sockname: str = f'{subactor.aid.name}@{proc.pid}.sock'
|
||||||
|
sockpath: str = str(
|
||||||
|
UDSAddress.def_bindspace / sockname
|
||||||
|
)
|
||||||
|
sockpaths.append(sockpath)
|
||||||
|
|
||||||
|
for sockpath in sockpaths:
|
||||||
|
try:
|
||||||
|
os.unlink(sockpath)
|
||||||
|
log.runtime(
|
||||||
|
f'Unlinked orphaned UDS sock-file post-SIGKILL\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
f' |_{sockpath}\n'
|
||||||
|
)
|
||||||
|
except FileNotFoundError:
|
||||||
|
# raced — subactor cleaned up before SIGKILL,
|
||||||
|
# OR sockfile never bound (early-spawn cancel),
|
||||||
|
# OR transport wasn't UDS this run.
|
||||||
|
pass
|
||||||
|
except OSError as exc:
|
||||||
|
log.warning(
|
||||||
|
f'Failed to unlink subactor UDS sock-file '
|
||||||
|
f'post-SIGKILL\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
f' |_{sockpath}\n'
|
||||||
|
f' |_{exc!r}\n'
|
||||||
|
)
|
||||||
|
|
@ -40,7 +40,10 @@ from tractor.runtime._state import (
|
||||||
_runtime_vars,
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor.discovery._addr import UnwrappedAddress
|
from tractor.discovery._addr import (
|
||||||
|
UnwrappedAddress,
|
||||||
|
)
|
||||||
|
from ._reap import unlink_uds_bind_addrs
|
||||||
from tractor.runtime._portal import Portal
|
from tractor.runtime._portal import Portal
|
||||||
from tractor.runtime._runtime import Actor
|
from tractor.runtime._runtime import Actor
|
||||||
from tractor.msg import types as msgtypes
|
from tractor.msg import types as msgtypes
|
||||||
|
|
@ -279,6 +282,16 @@ async def hard_kill(
|
||||||
# whilst also hacking on it XD
|
# whilst also hacking on it XD
|
||||||
# terminate_after: int = 99999,
|
# terminate_after: int = 99999,
|
||||||
|
|
||||||
|
*,
|
||||||
|
# Subactor's bind addresses + subactor record, used
|
||||||
|
# for post-SIGKILL UDS sockpath cleanup. Optional for
|
||||||
|
# legacy callers; new call sites should pass at least
|
||||||
|
# `subactor` (which lets us reconstruct the sock path
|
||||||
|
# from `aid.name + proc.pid` when `bind_addrs` is
|
||||||
|
# empty/self-assigned). See `._reap.unlink_uds_bind_addrs()`.
|
||||||
|
bind_addrs: list[UnwrappedAddress] | None = None,
|
||||||
|
subactor: Actor | None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Un-gracefully terminate an OS level `trio.Process` after timeout.
|
Un-gracefully terminate an OS level `trio.Process` after timeout.
|
||||||
|
|
@ -366,6 +379,21 @@ async def hard_kill(
|
||||||
)
|
)
|
||||||
proc.kill()
|
proc.kill()
|
||||||
|
|
||||||
|
# Post-mortem UDS sockpath cleanup. SIGKILL bypassed
|
||||||
|
# the subactor's normal `os.unlink(addr.sockpath)` in
|
||||||
|
# `_serve_ipc_eps`'s `finally:`; the parent has the
|
||||||
|
# bind addrs (or can reconstruct from name + pid) so
|
||||||
|
# we do it here. Runs UNCONDITIONALLY (graceful-exit
|
||||||
|
# case is a no-op via `FileNotFoundError` skip in the
|
||||||
|
# helper) so the cleanup also covers the "cancelled
|
||||||
|
# during spawn" path where the subactor never reached
|
||||||
|
# its IPC server finally block.
|
||||||
|
unlink_uds_bind_addrs(
|
||||||
|
proc,
|
||||||
|
bind_addrs=bind_addrs,
|
||||||
|
subactor=subactor,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def soft_kill(
|
async def soft_kill(
|
||||||
proc: ProcessType,
|
proc: ProcessType,
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ from tractor.runtime._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
is_root_process,
|
is_root_process,
|
||||||
debug_mode,
|
debug_mode,
|
||||||
get_runtime_vars,
|
# get_runtime_vars,
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor.discovery._addr import UnwrappedAddress
|
from tractor.discovery._addr import UnwrappedAddress
|
||||||
|
|
@ -282,7 +282,23 @@ async def trio_proc(
|
||||||
|
|
||||||
if proc.poll() is None:
|
if proc.poll() is None:
|
||||||
log.cancel(f"Attempting to hard kill {proc}")
|
log.cancel(f"Attempting to hard kill {proc}")
|
||||||
await hard_kill(proc)
|
await hard_kill(
|
||||||
|
proc,
|
||||||
|
# NOTE, pass through so post-SIGKILL we
|
||||||
|
# can `os.unlink()` the subactor's
|
||||||
|
# orphaned UDS sock-file(s) — the
|
||||||
|
# subactor's own
|
||||||
|
# `_serve_ipc_eps`-`finally:` cleanup
|
||||||
|
# never runs under SIGKILL. `subactor`
|
||||||
|
# lets the helper reconstruct the
|
||||||
|
# sock path via `aid.name + proc.pid`
|
||||||
|
# when `bind_addrs` is the common
|
||||||
|
# self-assigned-random case
|
||||||
|
# (bind_addrs=None at spawn). See
|
||||||
|
# `_unlink_uds_bind_addrs()` in `_spawn`.
|
||||||
|
bind_addrs=bind_addrs,
|
||||||
|
subactor=subactor,
|
||||||
|
)
|
||||||
|
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
else:
|
else:
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,95 @@
|
||||||
|
# `tractor.trionics.patches`
|
||||||
|
|
||||||
|
Defensive monkey-patches for bugs in `trio` itself.
|
||||||
|
|
||||||
|
## What goes here
|
||||||
|
|
||||||
|
- Bugs in upstream `trio` that we've encountered while
|
||||||
|
running `tractor` and need to work around until
|
||||||
|
upstream releases a fix.
|
||||||
|
- Each patch fixes EXACTLY one trio internal — no
|
||||||
|
multi-bug omnibus patches.
|
||||||
|
|
||||||
|
## What does NOT go here
|
||||||
|
|
||||||
|
- Bugs in `tractor`'s own code (those get fixed
|
||||||
|
in-tree, in the offending tractor module).
|
||||||
|
- Bugs in `asyncio`, `pytest`, the stdlib, etc. (file
|
||||||
|
separate `tractor.<lib>.patches` subpkgs as
|
||||||
|
needed).
|
||||||
|
- Workarounds for behavior we *disagree* with but that
|
||||||
|
isn't a bug per se. If trio's API does what it says
|
||||||
|
on the tin, we don't override it here.
|
||||||
|
|
||||||
|
## Per-patch contract
|
||||||
|
|
||||||
|
Every `_<topic>.py` module in this directory MUST
|
||||||
|
expose:
|
||||||
|
|
||||||
|
- **`apply() -> bool`** — apply the patch. Idempotent
|
||||||
|
(safe to call multiple times). Version-gated — must
|
||||||
|
consult `is_needed()` and skip when False. Returns
|
||||||
|
`True` if patched this call, `False` if skipped.
|
||||||
|
|
||||||
|
- **`is_needed() -> bool`** — does upstream still need
|
||||||
|
patching? Today most patches return `True`
|
||||||
|
unconditionally, but as upstream releases land each
|
||||||
|
should gate on `Version(trio.__version__) <
|
||||||
|
Version('X.Y.Z')`. When the gated version is
|
||||||
|
released, the patch can be DELETED entirely.
|
||||||
|
|
||||||
|
- **`repro() -> None`** — minimal demonstration of the
|
||||||
|
bug. Used by the regression test suite to assert (a)
|
||||||
|
the upstream bug still exists, (b) our patch fixes
|
||||||
|
it. Should be tight enough that calling it post-
|
||||||
|
`apply()` returns cleanly within a few hundred
|
||||||
|
milliseconds — tests wrap it with a wall-clock cap.
|
||||||
|
|
||||||
|
Each module's docstring MUST contain:
|
||||||
|
|
||||||
|
- **Problem**: what trio does wrong + the trigger
|
||||||
|
conditions (e.g. "fork-spawn backend, peer-closed
|
||||||
|
socketpair, etc.")
|
||||||
|
- **Fix**: the one-line (ideally) patch
|
||||||
|
- **Repro**: the standalone snippet `repro()`
|
||||||
|
implements
|
||||||
|
- **Upstream**: link to filed issue/PR (or
|
||||||
|
`TODO: file`)
|
||||||
|
- **REMOVE WHEN**: `trio>=X.Y.Z` ships the upstream
|
||||||
|
fix
|
||||||
|
|
||||||
|
## Adding a patch
|
||||||
|
|
||||||
|
1. Create `_<topic>.py` with the `apply` /
|
||||||
|
`is_needed` / `repro` API.
|
||||||
|
2. Register it in `__init__.py::_PATCHES`.
|
||||||
|
3. Add a regression test in
|
||||||
|
`tests/trionics/test_patches.py` that uses
|
||||||
|
`repro()` to assert pre/post-patch behavior with a
|
||||||
|
wall-clock cap.
|
||||||
|
4. File the upstream issue/PR. Add the link to your
|
||||||
|
module's `Upstream:` and `# REMOVE WHEN:` lines.
|
||||||
|
|
||||||
|
## Removing a patch (when upstream releases the fix)
|
||||||
|
|
||||||
|
1. Confirm the upstream-fixed `trio` version is the
|
||||||
|
minimum we depend on, OR keep the version-gate in
|
||||||
|
`is_needed()` if we still support older trio.
|
||||||
|
2. If we've fully bumped past the broken versions:
|
||||||
|
- Delete `_<topic>.py`
|
||||||
|
- Remove the entry from `__init__.py::_PATCHES`
|
||||||
|
- Delete the corresponding test in
|
||||||
|
`tests/trionics/test_patches.py`
|
||||||
|
- Bump the conc-anal doc with a "FIXED" header
|
||||||
|
|
||||||
|
## Calling
|
||||||
|
|
||||||
|
```python
|
||||||
|
from tractor.trionics.patches import apply_all
|
||||||
|
apply_all()
|
||||||
|
```
|
||||||
|
|
||||||
|
Currently invoked from `tractor._child._actor_child_main`
|
||||||
|
before `_trio_main` so every spawned subactor gets
|
||||||
|
patched. The root actor's entry could opt in too if a
|
||||||
|
patch turns out to bite the root (none do today).
|
||||||
|
|
@ -0,0 +1,84 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Defensive monkey-patches for `trio` internals.
|
||||||
|
|
||||||
|
Every patch in this package fixes a bug in `trio` itself
|
||||||
|
that we've encountered while running `tractor` — usually
|
||||||
|
a fork-survival edge case that upstream `trio` hasn't
|
||||||
|
filed/fixed yet. Each patch is:
|
||||||
|
|
||||||
|
- **idempotent** — safe to call multiple times
|
||||||
|
- **version-gated** — checks `trio.__version__` and skips
|
||||||
|
itself if upstream has shipped the fix
|
||||||
|
- **scoped** — only modifies the specific trio internal
|
||||||
|
it's targeting; no broad side effects
|
||||||
|
- **removable** — every patch carries a `# REMOVE WHEN:`
|
||||||
|
marker in its docstring pointing at the upstream PR
|
||||||
|
whose release allows us to drop it
|
||||||
|
|
||||||
|
Add a new patch by:
|
||||||
|
|
||||||
|
1. Create `tractor/trionics/patches/_<topic>.py` exposing
|
||||||
|
the `apply()` / `is_needed()` / `repro()` API
|
||||||
|
contract.
|
||||||
|
2. Import it in this `__init__.py` and add an entry to
|
||||||
|
`_PATCHES`.
|
||||||
|
3. Document upstream-fix-tracking in the module
|
||||||
|
docstring's `# REMOVE WHEN:` line.
|
||||||
|
4. Add a regression test in
|
||||||
|
`tests/trionics/test_patches.py` that uses the
|
||||||
|
patch's `repro()` to assert the bug exists + the
|
||||||
|
patch fixes it.
|
||||||
|
|
||||||
|
Calling `apply_all()` from a tractor entry point (e.g.
|
||||||
|
`tractor._child._actor_child_main`) applies every
|
||||||
|
registered patch + returns `{patch_name: applied?}` so
|
||||||
|
callers can log/assert as needed.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
from . import _wakeup_socketpair
|
||||||
|
|
||||||
|
|
||||||
|
_PATCHES: list[tuple[str, Callable[[], bool]]] = [
|
||||||
|
(
|
||||||
|
'trio_wakeup_socketpair_drain_eof',
|
||||||
|
_wakeup_socketpair.apply,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def apply_all() -> dict[str, bool]:
|
||||||
|
'''
|
||||||
|
Apply every registered patch. Idempotent — calling
|
||||||
|
twice is fine, second call's dict will be all
|
||||||
|
`False`.
|
||||||
|
|
||||||
|
Returns `{patch_name: applied?}`:
|
||||||
|
|
||||||
|
- `True` — patch was applied THIS call (inaugural
|
||||||
|
apply, or first-call-since-process-start).
|
||||||
|
- `False` — skipped (already applied OR upstream fix
|
||||||
|
detected via `is_needed() == False`).
|
||||||
|
|
||||||
|
'''
|
||||||
|
results: dict[str, bool] = {}
|
||||||
|
for name, applier in _PATCHES:
|
||||||
|
results[name] = applier()
|
||||||
|
return results
|
||||||
|
|
@ -0,0 +1,171 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Patch `trio._core._wakeup_socketpair.WakeupSocketpair.drain()`
|
||||||
|
to break on peer-closed EOF.
|
||||||
|
|
||||||
|
Problem
|
||||||
|
-------
|
||||||
|
`drain()` loops on `self.wakeup_sock.recv(2**16)` and
|
||||||
|
exits ONLY on `BlockingIOError` (buffer-empty on a
|
||||||
|
non-blocking socket), NEVER on `recv() == b''`
|
||||||
|
(peer-closed FIN). When the socketpair's write-end
|
||||||
|
has been closed, `recv` returns 0 bytes each call →
|
||||||
|
infinite C-level tight loop → 100% CPU, no Python
|
||||||
|
checkpoints, no signal delivery, no progress.
|
||||||
|
|
||||||
|
Most reliably triggered under fork-spawn backends —
|
||||||
|
`os.fork()` + `_close_inherited_fds()` can leave a
|
||||||
|
`WakeupSocketpair` instance whose `write_sock` was
|
||||||
|
closed in the child (or whose peer-end is held by a
|
||||||
|
process that has since exited).
|
||||||
|
|
||||||
|
Repro
|
||||||
|
-----
|
||||||
|
```python
|
||||||
|
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||||
|
ws = WakeupSocketpair()
|
||||||
|
ws.write_sock.close()
|
||||||
|
ws.drain() # spins forever pre-patch
|
||||||
|
```
|
||||||
|
|
||||||
|
Fix
|
||||||
|
---
|
||||||
|
One line: break the drain loop on `b''` EOF
|
||||||
|
in addition to the existing `BlockingIOError` exit.
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _safe_drain(self) -> None:
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = self.wakeup_sock.recv(2**16)
|
||||||
|
if not data: # ← peer-closed; nothing more to drain
|
||||||
|
return
|
||||||
|
except BlockingIOError:
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
|
||||||
|
Upstream
|
||||||
|
--------
|
||||||
|
TODO: file at `python-trio/trio` — the standalone
|
||||||
|
`repro()` below + this docstring is the issue body's
|
||||||
|
evidence section.
|
||||||
|
|
||||||
|
REMOVE WHEN: trio>=`<TBD>` ships the EOF-break in
|
||||||
|
`_wakeup_socketpair.WakeupSocketpair.drain()`.
|
||||||
|
|
||||||
|
See also
|
||||||
|
--------
|
||||||
|
- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
|
||||||
|
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
|
||||||
|
— sibling-bug analysis fixed by the same patch.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
|
||||||
|
# Module-local sentinel — set True by `apply()` after the
|
||||||
|
# first successful patch. Idempotency guard.
|
||||||
|
_APPLIED: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
def is_needed() -> bool:
|
||||||
|
'''
|
||||||
|
True iff upstream `trio` is the broken version that
|
||||||
|
needs our patch.
|
||||||
|
|
||||||
|
Today: always True since no released `trio` has the
|
||||||
|
fix. When upstream lands it, gate on:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from packaging.version import Version
|
||||||
|
import trio
|
||||||
|
return Version(trio.__version__) < Version('<TBD>')
|
||||||
|
```
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO version-gate once upstream lands the fix.
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def repro() -> None:
|
||||||
|
'''
|
||||||
|
Minimal hang demonstrator + regression test target.
|
||||||
|
|
||||||
|
Returns CLEANLY when `apply()` has been called
|
||||||
|
earlier in this process (the patched
|
||||||
|
`_safe_drain` breaks on EOF). Spins forever
|
||||||
|
UNPATCHED — caller should wrap with a wall-clock
|
||||||
|
cap (e.g. `signal.alarm(N)` or `trio.fail_after`)
|
||||||
|
to avoid hanging the test runner if regressing.
|
||||||
|
|
||||||
|
Used by `tests/trionics/test_patches.py` to assert
|
||||||
|
both:
|
||||||
|
|
||||||
|
1. The bug exists upstream (sanity check the
|
||||||
|
repro is real).
|
||||||
|
2. Our patch fixes it (post-`apply()` returns
|
||||||
|
cleanly).
|
||||||
|
|
||||||
|
'''
|
||||||
|
from trio._core._wakeup_socketpair import (
|
||||||
|
WakeupSocketpair,
|
||||||
|
)
|
||||||
|
ws = WakeupSocketpair()
|
||||||
|
ws.write_sock.close()
|
||||||
|
ws.drain() # ← targeted operation
|
||||||
|
|
||||||
|
|
||||||
|
def apply() -> bool:
|
||||||
|
'''
|
||||||
|
Apply the EOF-break patch to
|
||||||
|
`WakeupSocketpair.drain`. Idempotent + version-
|
||||||
|
gated.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
- `True` if patched THIS call (inaugural apply).
|
||||||
|
- `False` if skipped (already applied this process,
|
||||||
|
OR `is_needed() == False` because upstream fixed
|
||||||
|
it).
|
||||||
|
|
||||||
|
'''
|
||||||
|
global _APPLIED
|
||||||
|
if _APPLIED or not is_needed():
|
||||||
|
return False
|
||||||
|
|
||||||
|
from trio._core._wakeup_socketpair import (
|
||||||
|
WakeupSocketpair as _WSP,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _safe_drain(self) -> None:
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = self.wakeup_sock.recv(2**16)
|
||||||
|
# XXX patch — break on EOF instead of
|
||||||
|
# spinning. Upstream trio's `drain()`
|
||||||
|
# only handles the `BlockingIOError`
|
||||||
|
# (buffer-empty) case; missed the
|
||||||
|
# peer-closed (`recv == b''`) case.
|
||||||
|
if not data:
|
||||||
|
return
|
||||||
|
except BlockingIOError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
_WSP.drain = _safe_drain
|
||||||
|
_APPLIED = True
|
||||||
|
return True
|
||||||
Loading…
Reference in New Issue