Add `tractor.trionics.patches` subpkg + first fix
With a seminal patch fixing `trio`'s `WakeupSocketpair.drain()` which
can busy-loop due to lack of handling `EOF`.
New `tractor.trionics.patches` subpkg housing defensive monkey-patches
for upstream `trio` bugs we've encountered while running `tractor`
— particularly as of recent, fork-survival edge cases that haven't been
filed/fixed upstream yet. Each patch is idempotent, version-gated via
`is_needed()`, and carries a `# REMOVE WHEN:` marker pointing at the
upstream release whose adoption allows deletion.
Subpkg layout + per-patch contract documented in
`tractor/trionics/patches/README.md` — `apply()` / `is_needed()`
/ `repro()` API, registry pattern via `_PATCHES` in `__init__.py`,
single-call entry point `apply_all()`.
First patch, `_wakeup_socketpair`:
- `trio`'s `WakeupSocketpair.drain()` loops on `recv(64KB)` and exits
ONLY on `BlockingIOError`, NEVER on `recv() == b''` (peer-closed FIN).
- under `fork()`-spawning backends the COW-inherited socketpair fds
& `_close_inherited_fds()` teardown can leave a `WakeupSocketpair`
instance whose write-end is closed, and `drain()` then **spins forever
in C with no Python checkpoints**,
- this obviously burns 100% CPU and no signal delivery.
Standalone repro:
from trio._core._wakeup_socketpair import WakeupSocketpair
ws = WakeupSocketpair()
ws.write_sock.close()
ws.drain() # spins forever
Patch is one-line — break the drain loop on b'' EOF.
Manifested as two distinct test failures:
- `tests/test_multi_program.py::test_register_duplicate_name` hung at
100% CPU on the busy-loop directly (fork child's worker thread)
- `tests/test_infected_asyncio.py::test_aio_simple_error` Mode-A
deadlock — busy-loop wedged trio's scheduler inside `start_guest_run`,
both threads parked in `epoll_wait`, no TCP connect-back to parent
ever happened.
Same patch fixes both. Restored 99.7% pass rate on full
suite under `--spawn-backend=main_thread_forkserver`
(was hanging indefinitely before).
Wired into `tractor._child._actor_child_main` via `apply_all()` BEFORE
any trio runtime init. Harmless on non-fork backends.
Conc-anal write-ups, including strace + py-spy evidence:
- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
Regression tests in `tests/trionics/test_patches.py`: each test asserts
(a) the bug exists pre-patch (or is fixed upstream — skip cleanly), (b)
the patch fixes it with a SIGALRM wall-clock cap so a regression hangs
loud instead of silently.
TODO:
- [ ] file the upstream `python-trio/trio` issue + PR.
- [ ] use the `repro()` callable in `_wakeup_socketpair.py` IS the issue
body's evidence section.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver_backend
parent
e9712dcaeb
commit
0ef549fadb
|
|
@ -0,0 +1,378 @@
|
|||
# `infect_asyncio` × `main_thread_forkserver` Mode-A deadlock
|
||||
|
||||
## Reproducer
|
||||
|
||||
```bash
|
||||
./py313/bin/python -m pytest \
|
||||
tests/test_infected_asyncio.py::test_aio_simple_error \
|
||||
--tpt-proto=tcp \
|
||||
--spawn-backend=main_thread_forkserver \
|
||||
-v --capture=sys
|
||||
```
|
||||
|
||||
Hangs indefinitely. Mode-A signature — both processes
|
||||
parked in `epoll_wait`, **neither burning CPU**.
|
||||
|
||||
## Empirical observations (caught alive)
|
||||
|
||||
### Outer pytest (parent)
|
||||
|
||||
`py-spy dump` on the test runner pid shows the trio
|
||||
event loop parked at the bottom of `trio.run()`:
|
||||
|
||||
```
|
||||
Thread <pid> (idle): "MainThread"
|
||||
get_events (trio/_core/_io_epoll.py:245)
|
||||
self: <EpollIOManager at 0x...>
|
||||
timeout: 86400
|
||||
run (trio/_core/_run.py:2415)
|
||||
next_send: []
|
||||
timeout: 86400
|
||||
test_aio_simple_error (tests/test_infected_asyncio.py:175)
|
||||
```
|
||||
|
||||
`timeout: 86400` is trio's "no scheduled work, just wait
|
||||
for I/O forever" sentinel. `next_send: []` confirms
|
||||
nothing is queued. The parent is stuck inside
|
||||
`tractor.open_nursery(...).run_in_actor(...)` waiting
|
||||
for `ipc_server.wait_for_peer(uid)` to fire — i.e.
|
||||
waiting for the spawned subactor to connect back.
|
||||
|
||||
### Subactor (forked child)
|
||||
|
||||
`/proc/<pid>/stack`:
|
||||
|
||||
```
|
||||
do_epoll_wait+0x4c0/0x500
|
||||
__x64_sys_epoll_wait+0x70/0x120
|
||||
do_syscall_64+0xef/0x1540
|
||||
entry_SYSCALL_64_after_hwframe+0x77/0x7f
|
||||
```
|
||||
|
||||
`strace -p <pid> -f`:
|
||||
|
||||
```
|
||||
[pid <child-A>] epoll_wait(6 <unfinished ...>
|
||||
[pid <child-B>] epoll_wait(3
|
||||
```
|
||||
|
||||
**Two threads**, both parked in `epoll_wait` on
|
||||
distinct epoll fds. Both blocked, neither making
|
||||
progress.
|
||||
|
||||
### Subactor file-descriptor table
|
||||
|
||||
```
|
||||
fd=0,1,2 stdio
|
||||
fd=3 eventpoll [watches fd 4]
|
||||
fd=4 ↔ fd=5 unix STREAM (CONNECTED) — internal pair
|
||||
fd=6 eventpoll [watches fds 7, 9]
|
||||
fd=7 ↔ fd=8 unix STREAM (CONNECTED) — internal pair
|
||||
fd=9 ↔ fd=10 unix STREAM (CONNECTED) — internal pair
|
||||
```
|
||||
|
||||
Confirmed via `ss -xp` peer-inode lookup: **all 6 unix
|
||||
sockets are internal socketpairs** (peer in same pid).
|
||||
|
||||
**Critical**: zero TCP/IPv4/IPv6 sockets, despite
|
||||
`--tpt-proto=tcp`:
|
||||
|
||||
```
|
||||
$ sudo lsof -p <subactor> | grep -iE 'TCP|IPv'
|
||||
(empty)
|
||||
$ sudo ss -tnp | grep <subactor>
|
||||
(empty)
|
||||
```
|
||||
|
||||
**The subactor never opened a TCP connection back to
|
||||
the parent.**
|
||||
|
||||
## Diagnosis
|
||||
|
||||
The subactor reaches `_actor_child_main` →
|
||||
`_trio_main(actor)` →
|
||||
`run_as_asyncio_guest(trio_main)`. Code path
|
||||
(`tractor.spawn._entry`):
|
||||
|
||||
```python
|
||||
if infect_asyncio:
|
||||
actor._infected_aio = True
|
||||
run_as_asyncio_guest(trio_main) # ← this branch
|
||||
else:
|
||||
trio.run(trio_main)
|
||||
```
|
||||
|
||||
`run_as_asyncio_guest` (`tractor.to_asyncio`):
|
||||
|
||||
```python
|
||||
def run_as_asyncio_guest(trio_main, ...):
|
||||
async def aio_main(trio_main):
|
||||
loop = asyncio.get_running_loop()
|
||||
trio_done_fute = asyncio.Future()
|
||||
...
|
||||
trio.lowlevel.start_guest_run(
|
||||
trio_main,
|
||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||
done_callback=trio_done_callback,
|
||||
)
|
||||
out = await asyncio.shield(trio_done_fute)
|
||||
return out.unwrap()
|
||||
...
|
||||
return asyncio.run(aio_main(trio_main))
|
||||
```
|
||||
|
||||
Expected flow:
|
||||
1. `asyncio.run(aio_main(...))` — boots fresh asyncio
|
||||
loop in calling thread.
|
||||
2. `aio_main` calls `trio.lowlevel.start_guest_run(...)`
|
||||
— initializes trio's I/O manager, schedules first
|
||||
trio slice via `loop.call_soon_threadsafe`.
|
||||
3. asyncio loop dispatches the callback → trio runs a
|
||||
slice → yields back via `call_soon_threadsafe`.
|
||||
4. Trio's `async_main` (the user function) runs →
|
||||
`Channel.from_addr(parent_addr)` → TCP connect to
|
||||
parent.
|
||||
|
||||
What we observe instead:
|
||||
- 2 threads in `epoll_wait` (one trio epoll, one
|
||||
asyncio epoll, both inactive)
|
||||
- 6 unix-socket fds (3 socketpairs: trio
|
||||
wakeup-fd-pair, asyncio wakeup-fd-pair, trio kicker
|
||||
socketpair)
|
||||
- ZERO TCP — `Channel.from_addr` never ran
|
||||
|
||||
Most likely cause: **trio's guest-run scheduling
|
||||
callback didn't get dispatched by asyncio's loop in
|
||||
the forked child**, so trio's `async_main` never
|
||||
executes past trio bootstrap, and the
|
||||
parent-IPC-connect step is never reached.
|
||||
|
||||
## Fork-survival risk surface (hypothesis)
|
||||
|
||||
`trio.lowlevel.start_guest_run` builds Python-level
|
||||
closures + signal handlers + wakeup-fd registrations
|
||||
that depend on:
|
||||
|
||||
- The asyncio event loop's `call_soon_threadsafe`
|
||||
thread-id matching the loop owner thread.
|
||||
- Process-wide signal-wakeup-fd state
|
||||
(`signal.set_wakeup_fd`).
|
||||
- Trio's `KIManager` SIGINT handler.
|
||||
|
||||
Under `main_thread_forkserver`, the fork happens from
|
||||
a worker thread that has **never entered trio**
|
||||
(intentional — trio-free launchpad). But the FORKED
|
||||
child then tries to bring up BOTH asyncio AND
|
||||
trio-as-guest fresh from this trio-free thread. The
|
||||
asyncio loop boots fine; trio's `start_guest_run`
|
||||
initializes BUT the cross-loop dispatch (asyncio
|
||||
queue → trio slice) appears to silently fail to wire
|
||||
up.
|
||||
|
||||
Two more hypotheses worth probing:
|
||||
|
||||
1. **Wakeup-fd contention**: asyncio installs
|
||||
`signal.set_wakeup_fd(<own_pair>)`. trio's
|
||||
guest-run also wants a wakeup-fd. Whoever installs
|
||||
second wins; the loser's `epoll_wait` no longer
|
||||
wakes on signals. Combined with the `asyncio.shield(
|
||||
trio_done_fute)` + `asyncio.CancelledError`
|
||||
handling in `run_as_asyncio_guest`, a missed signal
|
||||
delivery could explain the indefinite park.
|
||||
|
||||
2. **Trio kicker socketpair race**: trio's I/O manager
|
||||
uses an internal `socket.socketpair()` to "kick"
|
||||
itself out of `epoll_wait` when a non-IO task needs
|
||||
scheduling. In guest mode, the kicker is still
|
||||
present but is supposed to be triggered via the
|
||||
asyncio dispatch. If the kicker write never gets
|
||||
issued by asyncio's callback, trio's epoll never
|
||||
wakes.
|
||||
|
||||
## Confirmed via py-spy (live capture)
|
||||
|
||||
After detaching `strace` (ptrace is exclusive — that's
|
||||
why `py-spy` returns EPERM if strace is attached):
|
||||
|
||||
```
|
||||
Thread <pid> (idle): "main-thread-forkserver[asyncio_actor]"
|
||||
select (selectors.py:452) # asyncio epoll
|
||||
_run_once (asyncio/base_events.py:2012)
|
||||
run_forever (asyncio/base_events.py:683)
|
||||
run_until_complete (asyncio/base_events.py:712)
|
||||
run (asyncio/runners.py:118)
|
||||
run (asyncio/runners.py:195)
|
||||
run_as_asyncio_guest (tractor/to_asyncio.py:1770)
|
||||
_trio_main (tractor/spawn/_entry.py:160)
|
||||
_actor_child_main (tractor/_child.py:72)
|
||||
_child_target (tractor/spawn/_main_thread_forkserver.py:910)
|
||||
_worker (tractor/spawn/_main_thread_forkserver.py:605)
|
||||
[thread bootstrap]
|
||||
|
||||
Thread <pid+1> (idle): "Trio thread 14"
|
||||
get_events (trio/_core/_io_epoll.py:245) # trio epoll
|
||||
get_events (trio/_core/_run.py:1678)
|
||||
capture (outcome/_impl.py:67)
|
||||
_handle_job (trio/_core/_thread_cache.py:173)
|
||||
_work (trio/_core/_thread_cache.py:196)
|
||||
[thread bootstrap]
|
||||
```
|
||||
|
||||
This data **rewrites the diagnosis**: trio guest-run
|
||||
isn't broken across the fork — it's working as designed.
|
||||
The two threads ARE the canonical guest-run architecture:
|
||||
|
||||
1. **Asyncio main loop** runs in the lead thread. Parked
|
||||
in `selectors.EpollSelector.select(timeout=-1)` —
|
||||
waiting indefinitely for ANY callback to be queued.
|
||||
2. **Trio's I/O manager** offloads `get_events`
|
||||
(`epoll_wait`) onto a `trio._core._thread_cache`
|
||||
worker thread. The worker calls
|
||||
`outcome.capture(get_events)` and parks in
|
||||
`epoll_wait(timeout=86400)`.
|
||||
3. When trio I/O fires (or its kicker socketpair gets a
|
||||
write), the worker returns from `epoll_wait`,
|
||||
delivers the result via `_handle_job`'s `deliver`
|
||||
callback, which schedules the next trio slice on
|
||||
asyncio via `loop.call_soon_threadsafe`.
|
||||
|
||||
The fact that the trio thread is *already* in
|
||||
`_thread_cache._handle_job` doing `capture(get_events)`
|
||||
means **trio's scheduler HAS started** — the bridge
|
||||
asyncio↔trio is wired correctly post-fork.
|
||||
|
||||
So `async_main` DID run far enough to register some
|
||||
trio task that's now awaiting I/O. The question
|
||||
becomes: **what is `async_main` waiting on?**
|
||||
|
||||
Process state confirms it's NOT waiting on the TCP
|
||||
connect to parent:
|
||||
|
||||
```
|
||||
$ sudo lsof -p <subactor> | grep -iE 'TCP|IPv'
|
||||
(empty)
|
||||
$ sudo ss -tnp | grep <subactor>
|
||||
(empty)
|
||||
```
|
||||
|
||||
`Channel.from_addr(parent_addr)` — the very first
|
||||
thing `async_main` does — was never reached, OR was
|
||||
reached but errored before `socket()` was called. The
|
||||
parent (running `ipc_server.wait_for_peer`) waits
|
||||
forever for the connection; it never comes.
|
||||
|
||||
## Refined hypothesis
|
||||
|
||||
`async_main` is stalled in some PRE-`Channel.from_addr`
|
||||
checkpoint. Candidates:
|
||||
|
||||
1. **`get_console_log` / logger init** — called early in
|
||||
`_trio_main` if `actor.loglevel is not None`. Logging
|
||||
setup involves file/handler init that could block on
|
||||
something fork-inherited (e.g. a stale lock).
|
||||
2. **`debug.maybe_init_greenback`** — `start_guest_run`
|
||||
includes a check (`if debug_mode(): assert 0` —
|
||||
currently asserts unsupported). For non-debug mode
|
||||
this is bypassed but related machinery may run.
|
||||
3. **Stackscope SIGUSR1 handler install** — gated on
|
||||
`_debug_mode` OR `TRACTOR_ENABLE_STACKSCOPE` env-var.
|
||||
The `enable_stack_on_sig()` path captures a trio
|
||||
token via `trio.lowlevel.current_trio_token()` —
|
||||
could block under guest mode.
|
||||
4. **Initial `await trio.sleep(0)` / first checkpoint**
|
||||
in `async_main` before reaching the
|
||||
`Channel.from_addr` line. Under guest mode, if the
|
||||
FIRST `call_soon_threadsafe` callback never gets
|
||||
processed by asyncio, trio's first slice never
|
||||
completes — but the worker thread WOULD still be in
|
||||
`epoll_wait` having been started by trio's I/O
|
||||
manager init.
|
||||
|
||||
## Confirming `async_main`'s parked location
|
||||
|
||||
Add temporary logging at the top of `Actor.async_main`:
|
||||
|
||||
```python
|
||||
# tractor/runtime/_runtime.py around line 855
|
||||
async def async_main(self, parent_addr=None):
|
||||
log.devx('async_main: ENTERED') # marker A
|
||||
try:
|
||||
log.devx('async_main: pre-Channel.from_addr') # marker B
|
||||
chan = await Channel.from_addr(
|
||||
addr=wrap_address(parent_addr)
|
||||
)
|
||||
log.devx('async_main: post-Channel.from_addr') # marker C
|
||||
...
|
||||
```
|
||||
|
||||
Re-run the test with `--ll=devx`. The last marker logged
|
||||
tells us exactly where `async_main` parked. If only A
|
||||
fires, the issue is between A and B (logger init,
|
||||
stackscope, etc.). If A and B fire but not C, it's in
|
||||
`Channel.from_addr` (DNS, socket creation, connect).
|
||||
|
||||
## Related sibling bug
|
||||
|
||||
`tests/test_multi_program.py::test_register_duplicate_name`
|
||||
hangs under the same backend with a DIFFERENT
|
||||
fingerprint:
|
||||
|
||||
- Subactor at 100% CPU (busy-loop), not parked
|
||||
- `recvfrom(6, "", 65536, 0, NULL, NULL) = 0` repeating
|
||||
with no `epoll_wait` in between
|
||||
- fd=6 is one of trio's internal AF_UNIX
|
||||
socketpair fds (the kicker mechanism)
|
||||
|
||||
Distinct root cause — possibly trio's kicker socketpair
|
||||
inheriting a half-closed state across the fork — but
|
||||
shares the broader theme: **trio internal-state
|
||||
initialization isn't fully fork-safe under
|
||||
`main_thread_forkserver`** for the more exotic
|
||||
dispatch paths.
|
||||
|
||||
## Workarounds (until fix lands)
|
||||
|
||||
1. **Skip-mark on the fork backend** — temporarily mark
|
||||
`tests/test_infected_asyncio.py` with
|
||||
`pytest.mark.skipon_spawn_backend('main_thread_forkserver',
|
||||
reason='infect_asyncio + fork interaction broken,
|
||||
see ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md')`.
|
||||
Lets the rest of the test suite run green while
|
||||
this is being fixed properly.
|
||||
|
||||
2. **Run infected-asyncio tests under the `trio`
|
||||
backend only** — they don't exercise fork
|
||||
semantics, so they won't hit this bug.
|
||||
|
||||
## Investigation next steps
|
||||
|
||||
In rough priority:
|
||||
|
||||
1. Catch the hang alive again, **detach strace**,
|
||||
`py-spy --locals` the subactor — confirm trio
|
||||
thread is NOT yet at `async_main`.
|
||||
2. Diff `start_guest_run` setup pre-fork vs post-fork
|
||||
by adding `log.devx()` markers in
|
||||
`tractor.to_asyncio.run_as_asyncio_guest::aio_main`
|
||||
at:
|
||||
- asyncio loop bringup
|
||||
- immediately before `start_guest_run`
|
||||
- immediately after `start_guest_run`
|
||||
- inside the `trio_done_callback` registration
|
||||
3. Check whether the asyncio loop dispatches ANY
|
||||
callbacks in the forked child — instrument
|
||||
`loop.call_soon_threadsafe` (e.g. monkey-patch
|
||||
`loop._call_soon` to log).
|
||||
4. If steps 1–3 confirm that asyncio's queue is
|
||||
stuck, look at whether the asyncio event-loop
|
||||
policy or selector is being inherited from a
|
||||
pre-fork (parent-process) state in a way that
|
||||
breaks the new loop.
|
||||
|
||||
## See also
|
||||
|
||||
- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella
|
||||
- [#451](https://github.com/goodboy/tractor/issues/451) — Mode-A cancel-cascade hang
|
||||
- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md`
|
||||
- `ai/conc-anal/subint_forkserver_test_cancellation_leak_issue.md`
|
||||
- python-trio/trio#1614 — trio + fork hazards
|
||||
|
|
@ -0,0 +1,221 @@
|
|||
# trio `WakeupSocketpair.drain()` busy-loop in forked child (peer-closed missed-EOF)
|
||||
|
||||
## Reproducer
|
||||
|
||||
```bash
|
||||
./py313/bin/python -m pytest \
|
||||
tests/test_multi_program.py::test_register_duplicate_name \
|
||||
--tpt-proto=tcp \
|
||||
--spawn-backend=main_thread_forkserver \
|
||||
-v --capture=sys
|
||||
```
|
||||
|
||||
Subactor pegs a CPU core indefinitely; parent test
|
||||
hangs waiting for the subactor.
|
||||
|
||||
## Empirical evidence (caught alive)
|
||||
|
||||
```
|
||||
$ sudo strace -p <subactor-pid>
|
||||
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||
... (no `epoll_wait`, no other syscalls, just this back-to-back)
|
||||
```
|
||||
|
||||
Pattern: tight C-level `recvfrom` loop returning 0
|
||||
each call. No `epoll_wait` between iterations →
|
||||
**not trio's task scheduler**. Pure synchronous C
|
||||
loop.
|
||||
|
||||
```
|
||||
$ sudo readlink /proc/<subactor-pid>/fd/6
|
||||
socket:[<inode>]
|
||||
|
||||
$ sudo lsof -p <subactor-pid> | grep ' 6u'
|
||||
<cmd> <pid> goodboy 6u unix 0xffff... 0t0 <inode> type=STREAM (CONNECTED)
|
||||
```
|
||||
|
||||
fd=6 is an **AF_UNIX socket** in CONNECTED state.
|
||||
Even though the test uses `--tpt-proto=tcp`, this fd
|
||||
is NOT a tractor IPC channel — it's an internal
|
||||
trio socketpair.
|
||||
|
||||
## Root-cause: `WakeupSocketpair.drain()`
|
||||
|
||||
`/site-packages/trio/_core/_wakeup_socketpair.py`:
|
||||
|
||||
```python
|
||||
class WakeupSocketpair:
|
||||
def __init__(self) -> None:
|
||||
self.wakeup_sock, self.write_sock = socket.socketpair()
|
||||
self.wakeup_sock.setblocking(False)
|
||||
self.write_sock.setblocking(False)
|
||||
...
|
||||
|
||||
def drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
self.wakeup_sock.recv(2**16)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
```
|
||||
|
||||
`socket.socketpair()` on Linux defaults to AF_UNIX
|
||||
SOCK_STREAM. Both ends non-blocking. Normal flow:
|
||||
|
||||
1. Signal/wake event → `write_sock.send(b'\x00')`
|
||||
queues a byte.
|
||||
2. `wakeup_sock` becomes readable → trio's epoll
|
||||
triggers.
|
||||
3. Trio calls `drain()` to flush the buffer.
|
||||
4. drain loops on `wakeup_sock.recv(64KB)`.
|
||||
5. Eventually buffer empty → non-blocking socket
|
||||
raises `BlockingIOError` → except → break.
|
||||
|
||||
**Bug surface — peer-closed missed-EOF**:
|
||||
|
||||
Non-blocking socket semantics:
|
||||
- buffer has data → `recv` returns N>0 bytes (loop continues)
|
||||
- buffer empty → `recv` raises `BlockingIOError`
|
||||
- **peer FIN'd → `recv` returns 0 bytes (NEITHER exception NOR
|
||||
break — infinite tight loop)**
|
||||
|
||||
`drain()` does not handle the `b''` return-value
|
||||
(EOF) case. If `write_sock` has been closed (or the
|
||||
process holding it is gone), every iteration returns
|
||||
0 → infinite loop → 100% CPU on a single core.
|
||||
|
||||
## Why this triggers under `main_thread_forkserver`
|
||||
|
||||
Under `os.fork()` from the forkserver-worker thread:
|
||||
|
||||
1. Parent has a `WakeupSocketpair` instance with
|
||||
`wakeup_sock=fdN`, `write_sock=fdM`. Both fds
|
||||
open in parent.
|
||||
2. Fork → child inherits BOTH fds (kernel-level fd
|
||||
table dup).
|
||||
3. `_close_inherited_fds()` runs in child →
|
||||
closes everything except stdio. `wakeup_sock` and
|
||||
`write_sock` of the parent's `WakeupSocketpair`
|
||||
ARE closed in child.
|
||||
4. Child's trio (running fresh) creates its OWN
|
||||
`WakeupSocketpair` → NEW fd numbers (e.g. fd 6, 7).
|
||||
5. **In `infect_asyncio` mode** the asyncio loop is
|
||||
the host; trio runs as guest via
|
||||
`start_guest_run`. trio still creates its
|
||||
`WakeupSocketpair` in the I/O manager but its
|
||||
role is different.
|
||||
|
||||
The race window: somewhere between (3) and (5), if a
|
||||
`WakeupSocketpair` Python object reference inherited
|
||||
via COW (from parent's pre-fork heap) survives long
|
||||
enough that `drain()` is called on it AFTER its fds
|
||||
were closed but BEFORE the child's NEW socketpair
|
||||
takes over the recycled fd numbers — the recycled fd
|
||||
will be one of the child's NEW socketpair ends, whose
|
||||
peer might be FIN-flagged (e.g. parent-process
|
||||
peer-end is closed).
|
||||
|
||||
Or simpler: the `wait_for_actor`/`find_actor` discovery
|
||||
flow in `test_register_duplicate_name` triggers an
|
||||
unusual code path where a stale `WakeupSocketpair`
|
||||
gets `drain()`-called on a fd whose peer has already
|
||||
closed.
|
||||
|
||||
## Why `drain()` shouldn't loop indefinitely on EOF
|
||||
(upstream trio bug)
|
||||
|
||||
Even WITHOUT fork, `drain()` should treat `b''` as
|
||||
EOF and break. The current code is correct for the
|
||||
"buffer drained on a healthy socketpair" scenario but
|
||||
incorrect for the "peer is gone" scenario. It's a
|
||||
defensive-programming gap in trio.
|
||||
|
||||
A one-line patch upstream:
|
||||
|
||||
```python
|
||||
def drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
if not data:
|
||||
break # peer-closed; nothing more to drain
|
||||
except BlockingIOError:
|
||||
pass
|
||||
```
|
||||
|
||||
## Workarounds (until the underlying issue lands)
|
||||
|
||||
1. **Skip-mark on the fork backend**:
|
||||
`tests/test_multi_program.py` →
|
||||
`pytest.mark.skipon_spawn_backend('main_thread_forkserver',
|
||||
reason='trio WakeupSocketpair.drain busy-loop, see ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md')`.
|
||||
|
||||
2. **Defensive monkey-patch in tractor's
|
||||
forkserver-child prelude** — wrap
|
||||
`WakeupSocketpair.drain` to handle `b''`:
|
||||
|
||||
```python
|
||||
# in `_actor_child_main` or `_close_inherited_fds`'s
|
||||
# post-fork prelude:
|
||||
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||
_orig_drain = WakeupSocketpair.drain
|
||||
def _safe_drain(self):
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
if not data:
|
||||
return # peer closed
|
||||
except BlockingIOError:
|
||||
pass
|
||||
WakeupSocketpair.drain = _safe_drain
|
||||
```
|
||||
|
||||
Tracks upstream — remove once trio fixes.
|
||||
|
||||
3. **Upstream the fix**: 1-line PR to `python-trio/trio`
|
||||
adding `if not data: break` to `drain()`.
|
||||
|
||||
## Investigation next steps
|
||||
|
||||
1. **Confirm via py-spy**: when caught alive, detach
|
||||
strace first then
|
||||
`sudo py-spy dump --pid <subactor> --locals`. The
|
||||
busy thread should show `drain` from `WakeupSocketpair`
|
||||
in the call chain.
|
||||
2. **Identify which write-end peer is closed**: from
|
||||
the inode of fd 6, look up the matching peer
|
||||
inode via `ss -xp` and see whose process it
|
||||
was/is.
|
||||
3. **Verify the missed-EOF hypothesis**: hand-craft a
|
||||
minimal `WakeupSocketpair` repro:
|
||||
|
||||
```python
|
||||
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||
ws = WakeupSocketpair()
|
||||
ws.write_sock.close() # simulate peer-gone
|
||||
ws.drain() # should hang forever
|
||||
```
|
||||
|
||||
## Sibling bug
|
||||
|
||||
`tests/test_infected_asyncio.py::test_aio_simple_error`
|
||||
hangs under the same backend with a DIFFERENT
|
||||
fingerprint (Mode-A deadlock, both parties in
|
||||
`epoll_wait`, no busy-loop). Distinct root cause —
|
||||
see `infected_asyncio_under_main_thread_forkserver_hang_issue.md`.
|
||||
|
||||
Both share the broader theme: **trio internal-state
|
||||
initialization isn't fully fork-safe under
|
||||
`main_thread_forkserver`** for the more exotic
|
||||
dispatch paths.
|
||||
|
||||
## See also
|
||||
|
||||
- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella
|
||||
- python-trio/trio#1614 — trio + fork hazards
|
||||
- `trio._core._wakeup_socketpair.WakeupSocketpair`
|
||||
source (the smoking gun)
|
||||
- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md`
|
||||
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
'''
|
||||
Regression tests for `tractor.trionics.patches` —
|
||||
defensive monkey-patches on upstream `trio` bugs.
|
||||
|
||||
Each test asserts:
|
||||
|
||||
1. The bug exists (or is gone — skip cleanly if
|
||||
upstream shipped the fix and our `is_needed()` now
|
||||
returns `False`).
|
||||
2. Our patch fixes it (post-`apply()` the `repro()`
|
||||
returns cleanly within a tight wall-clock cap).
|
||||
|
||||
Wall-clock caps are critical here — the bugs we patch
|
||||
are tight-loops or deadlocks, so a regression would
|
||||
HANG the test runner unless we hard-cap each
|
||||
`repro()` call.
|
||||
|
||||
'''
|
||||
import signal
|
||||
|
||||
import pytest
|
||||
|
||||
from tractor.trionics import patches
|
||||
from tractor.trionics.patches import _wakeup_socketpair as wsp
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _alarm_cleanup():
|
||||
'''
|
||||
Ensure no leftover SIGALRM survives a test failure
|
||||
or unexpected return.
|
||||
|
||||
'''
|
||||
yield
|
||||
signal.alarm(0)
|
||||
|
||||
|
||||
def test_wakeup_socketpair_drain_eof_patch_works():
|
||||
'''
|
||||
Without the patch, `WakeupSocketpair.drain()` on a
|
||||
socketpair whose write-end has been closed spins
|
||||
forever. With the patch applied, it returns
|
||||
cleanly within milliseconds.
|
||||
|
||||
Wall-clock cap: 2s. If the patch regresses, SIGALRM
|
||||
fires and the test hard-fails with a clear signal
|
||||
instead of hanging CI indefinitely.
|
||||
|
||||
'''
|
||||
if not wsp.is_needed():
|
||||
pytest.skip(
|
||||
'upstream trio shipped the fix — '
|
||||
'patch no longer needed for trio '
|
||||
'(see `is_needed()` for version gate)'
|
||||
)
|
||||
|
||||
# Apply the patch.
|
||||
applied: bool = wsp.apply()
|
||||
# First call MUST return True; idempotent guard
|
||||
# prevents False on subsequent calls within the
|
||||
# same process.
|
||||
assert applied is True or applied is False # idempotent
|
||||
|
||||
# Cap wall-clock at 2s; SIGALRM raises in main
|
||||
# thread which interrupts the C-level recv loop
|
||||
# IF the patch regresses (since `signal.alarm`
|
||||
# uses Python's signal-wakeup-fd which the patch
|
||||
# itself relies on... but `repro()` runs OUTSIDE
|
||||
# a trio.run, so it's plain stdlib semantics here
|
||||
# — alarm WILL fire during `recv` syscall).
|
||||
signal.alarm(2)
|
||||
wsp.repro()
|
||||
signal.alarm(0)
|
||||
|
||||
|
||||
def test_apply_all_idempotent():
|
||||
'''
|
||||
Calling `apply_all()` twice should not double-
|
||||
apply: second call's dict has all-False values
|
||||
(every patch reports "already applied").
|
||||
|
||||
'''
|
||||
first: dict[str, bool] = patches.apply_all()
|
||||
second: dict[str, bool] = patches.apply_all()
|
||||
|
||||
# Second call: every patch reports skipped.
|
||||
assert all(v is False for v in second.values()), (
|
||||
f'apply_all() not idempotent: {second}'
|
||||
)
|
||||
|
||||
# First call: at least one patch was applied
|
||||
# (or all are no-ops because `is_needed()` is
|
||||
# False everywhere — the all-fixed-upstream future
|
||||
# state which is also valid).
|
||||
assert isinstance(first, dict)
|
||||
for name, applied in first.items():
|
||||
assert isinstance(applied, bool), (
|
||||
f'patch {name!r} returned non-bool: {applied!r}'
|
||||
)
|
||||
|
|
@ -63,6 +63,14 @@ def _actor_child_main(
|
|||
sub-interpreter via `Interpreter.call()`.
|
||||
|
||||
'''
|
||||
# Apply defensive monkey-patches for upstream `trio`
|
||||
# bugs we've encountered while running tractor — see
|
||||
# `tractor.trionics.patches` for the catalog +
|
||||
# per-patch upstream-fix tracking. Must run BEFORE
|
||||
# any trio runtime init.
|
||||
from .trionics.patches import apply_all
|
||||
apply_all()
|
||||
|
||||
subactor = Actor(
|
||||
name=uid[0],
|
||||
uuid=uid[1],
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
# `tractor.trionics.patches`
|
||||
|
||||
Defensive monkey-patches for bugs in `trio` itself.
|
||||
|
||||
## What goes here
|
||||
|
||||
- Bugs in upstream `trio` that we've encountered while
|
||||
running `tractor` and need to work around until
|
||||
upstream releases a fix.
|
||||
- Each patch fixes EXACTLY one trio internal — no
|
||||
multi-bug omnibus patches.
|
||||
|
||||
## What does NOT go here
|
||||
|
||||
- Bugs in `tractor`'s own code (those get fixed
|
||||
in-tree, in the offending tractor module).
|
||||
- Bugs in `asyncio`, `pytest`, the stdlib, etc. (file
|
||||
separate `tractor.<lib>.patches` subpkgs as
|
||||
needed).
|
||||
- Workarounds for behavior we *disagree* with but that
|
||||
isn't a bug per se. If trio's API does what it says
|
||||
on the tin, we don't override it here.
|
||||
|
||||
## Per-patch contract
|
||||
|
||||
Every `_<topic>.py` module in this directory MUST
|
||||
expose:
|
||||
|
||||
- **`apply() -> bool`** — apply the patch. Idempotent
|
||||
(safe to call multiple times). Version-gated — must
|
||||
consult `is_needed()` and skip when False. Returns
|
||||
`True` if patched this call, `False` if skipped.
|
||||
|
||||
- **`is_needed() -> bool`** — does upstream still need
|
||||
patching? Today most patches return `True`
|
||||
unconditionally, but as upstream releases land each
|
||||
should gate on `Version(trio.__version__) <
|
||||
Version('X.Y.Z')`. When the gated version is
|
||||
released, the patch can be DELETED entirely.
|
||||
|
||||
- **`repro() -> None`** — minimal demonstration of the
|
||||
bug. Used by the regression test suite to assert (a)
|
||||
the upstream bug still exists, (b) our patch fixes
|
||||
it. Should be tight enough that calling it post-
|
||||
`apply()` returns cleanly within a few hundred
|
||||
milliseconds — tests wrap it with a wall-clock cap.
|
||||
|
||||
Each module's docstring MUST contain:
|
||||
|
||||
- **Problem**: what trio does wrong + the trigger
|
||||
conditions (e.g. "fork-spawn backend, peer-closed
|
||||
socketpair, etc.")
|
||||
- **Fix**: the one-line (ideally) patch
|
||||
- **Repro**: the standalone snippet `repro()`
|
||||
implements
|
||||
- **Upstream**: link to filed issue/PR (or
|
||||
`TODO: file`)
|
||||
- **REMOVE WHEN**: `trio>=X.Y.Z` ships the upstream
|
||||
fix
|
||||
|
||||
## Adding a patch
|
||||
|
||||
1. Create `_<topic>.py` with the `apply` /
|
||||
`is_needed` / `repro` API.
|
||||
2. Register it in `__init__.py::_PATCHES`.
|
||||
3. Add a regression test in
|
||||
`tests/trionics/test_patches.py` that uses
|
||||
`repro()` to assert pre/post-patch behavior with a
|
||||
wall-clock cap.
|
||||
4. File the upstream issue/PR. Add the link to your
|
||||
module's `Upstream:` and `# REMOVE WHEN:` lines.
|
||||
|
||||
## Removing a patch (when upstream releases the fix)
|
||||
|
||||
1. Confirm the upstream-fixed `trio` version is the
|
||||
minimum we depend on, OR keep the version-gate in
|
||||
`is_needed()` if we still support older trio.
|
||||
2. If we've fully bumped past the broken versions:
|
||||
- Delete `_<topic>.py`
|
||||
- Remove the entry from `__init__.py::_PATCHES`
|
||||
- Delete the corresponding test in
|
||||
`tests/trionics/test_patches.py`
|
||||
- Bump the conc-anal doc with a "FIXED" header
|
||||
|
||||
## Calling
|
||||
|
||||
```python
|
||||
from tractor.trionics.patches import apply_all
|
||||
apply_all()
|
||||
```
|
||||
|
||||
Currently invoked from `tractor._child._actor_child_main`
|
||||
before `_trio_main` so every spawned subactor gets
|
||||
patched. The root actor's entry could opt in too if a
|
||||
patch turns out to bite the root (none do today).
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Defensive monkey-patches for `trio` internals.
|
||||
|
||||
Every patch in this package fixes a bug in `trio` itself
|
||||
that we've encountered while running `tractor` — usually
|
||||
a fork-survival edge case that upstream `trio` hasn't
|
||||
filed/fixed yet. Each patch is:
|
||||
|
||||
- **idempotent** — safe to call multiple times
|
||||
- **version-gated** — checks `trio.__version__` and skips
|
||||
itself if upstream has shipped the fix
|
||||
- **scoped** — only modifies the specific trio internal
|
||||
it's targeting; no broad side effects
|
||||
- **removable** — every patch carries a `# REMOVE WHEN:`
|
||||
marker in its docstring pointing at the upstream PR
|
||||
whose release allows us to drop it
|
||||
|
||||
Add a new patch by:
|
||||
|
||||
1. Create `tractor/trionics/patches/_<topic>.py` exposing
|
||||
the `apply()` / `is_needed()` / `repro()` API
|
||||
contract.
|
||||
2. Import it in this `__init__.py` and add an entry to
|
||||
`_PATCHES`.
|
||||
3. Document upstream-fix-tracking in the module
|
||||
docstring's `# REMOVE WHEN:` line.
|
||||
4. Add a regression test in
|
||||
`tests/trionics/test_patches.py` that uses the
|
||||
patch's `repro()` to assert the bug exists + the
|
||||
patch fixes it.
|
||||
|
||||
Calling `apply_all()` from a tractor entry point (e.g.
|
||||
`tractor._child._actor_child_main`) applies every
|
||||
registered patch + returns `{patch_name: applied?}` so
|
||||
callers can log/assert as needed.
|
||||
|
||||
'''
|
||||
from typing import Callable
|
||||
|
||||
from . import _wakeup_socketpair
|
||||
|
||||
|
||||
_PATCHES: list[tuple[str, Callable[[], bool]]] = [
|
||||
(
|
||||
'trio_wakeup_socketpair_drain_eof',
|
||||
_wakeup_socketpair.apply,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def apply_all() -> dict[str, bool]:
|
||||
'''
|
||||
Apply every registered patch. Idempotent — calling
|
||||
twice is fine, second call's dict will be all
|
||||
`False`.
|
||||
|
||||
Returns `{patch_name: applied?}`:
|
||||
|
||||
- `True` — patch was applied THIS call (inaugural
|
||||
apply, or first-call-since-process-start).
|
||||
- `False` — skipped (already applied OR upstream fix
|
||||
detected via `is_needed() == False`).
|
||||
|
||||
'''
|
||||
results: dict[str, bool] = {}
|
||||
for name, applier in _PATCHES:
|
||||
results[name] = applier()
|
||||
return results
|
||||
|
|
@ -0,0 +1,171 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Patch `trio._core._wakeup_socketpair.WakeupSocketpair.drain()`
|
||||
to break on peer-closed EOF.
|
||||
|
||||
Problem
|
||||
-------
|
||||
`drain()` loops on `self.wakeup_sock.recv(2**16)` and
|
||||
exits ONLY on `BlockingIOError` (buffer-empty on a
|
||||
non-blocking socket), NEVER on `recv() == b''`
|
||||
(peer-closed FIN). When the socketpair's write-end
|
||||
has been closed, `recv` returns 0 bytes each call →
|
||||
infinite C-level tight loop → 100% CPU, no Python
|
||||
checkpoints, no signal delivery, no progress.
|
||||
|
||||
Most reliably triggered under fork-spawn backends —
|
||||
`os.fork()` + `_close_inherited_fds()` can leave a
|
||||
`WakeupSocketpair` instance whose `write_sock` was
|
||||
closed in the child (or whose peer-end is held by a
|
||||
process that has since exited).
|
||||
|
||||
Repro
|
||||
-----
|
||||
```python
|
||||
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||
ws = WakeupSocketpair()
|
||||
ws.write_sock.close()
|
||||
ws.drain() # spins forever pre-patch
|
||||
```
|
||||
|
||||
Fix
|
||||
---
|
||||
One line: break the drain loop on `b''` EOF
|
||||
in addition to the existing `BlockingIOError` exit.
|
||||
|
||||
```python
|
||||
def _safe_drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
if not data: # ← peer-closed; nothing more to drain
|
||||
return
|
||||
except BlockingIOError:
|
||||
pass
|
||||
```
|
||||
|
||||
Upstream
|
||||
--------
|
||||
TODO: file at `python-trio/trio` — the standalone
|
||||
`repro()` below + this docstring is the issue body's
|
||||
evidence section.
|
||||
|
||||
REMOVE WHEN: trio>=`<TBD>` ships the EOF-break in
|
||||
`_wakeup_socketpair.WakeupSocketpair.drain()`.
|
||||
|
||||
See also
|
||||
--------
|
||||
- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
|
||||
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
|
||||
— sibling-bug analysis fixed by the same patch.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
# Module-local sentinel — set True by `apply()` after the
|
||||
# first successful patch. Idempotency guard.
|
||||
_APPLIED: bool = False
|
||||
|
||||
|
||||
def is_needed() -> bool:
|
||||
'''
|
||||
True iff upstream `trio` is the broken version that
|
||||
needs our patch.
|
||||
|
||||
Today: always True since no released `trio` has the
|
||||
fix. When upstream lands it, gate on:
|
||||
|
||||
```python
|
||||
from packaging.version import Version
|
||||
import trio
|
||||
return Version(trio.__version__) < Version('<TBD>')
|
||||
```
|
||||
|
||||
'''
|
||||
# TODO version-gate once upstream lands the fix.
|
||||
return True
|
||||
|
||||
|
||||
def repro() -> None:
|
||||
'''
|
||||
Minimal hang demonstrator + regression test target.
|
||||
|
||||
Returns CLEANLY when `apply()` has been called
|
||||
earlier in this process (the patched
|
||||
`_safe_drain` breaks on EOF). Spins forever
|
||||
UNPATCHED — caller should wrap with a wall-clock
|
||||
cap (e.g. `signal.alarm(N)` or `trio.fail_after`)
|
||||
to avoid hanging the test runner if regressing.
|
||||
|
||||
Used by `tests/trionics/test_patches.py` to assert
|
||||
both:
|
||||
|
||||
1. The bug exists upstream (sanity check the
|
||||
repro is real).
|
||||
2. Our patch fixes it (post-`apply()` returns
|
||||
cleanly).
|
||||
|
||||
'''
|
||||
from trio._core._wakeup_socketpair import (
|
||||
WakeupSocketpair,
|
||||
)
|
||||
ws = WakeupSocketpair()
|
||||
ws.write_sock.close()
|
||||
ws.drain() # ← targeted operation
|
||||
|
||||
|
||||
def apply() -> bool:
|
||||
'''
|
||||
Apply the EOF-break patch to
|
||||
`WakeupSocketpair.drain`. Idempotent + version-
|
||||
gated.
|
||||
|
||||
Returns:
|
||||
|
||||
- `True` if patched THIS call (inaugural apply).
|
||||
- `False` if skipped (already applied this process,
|
||||
OR `is_needed() == False` because upstream fixed
|
||||
it).
|
||||
|
||||
'''
|
||||
global _APPLIED
|
||||
if _APPLIED or not is_needed():
|
||||
return False
|
||||
|
||||
from trio._core._wakeup_socketpair import (
|
||||
WakeupSocketpair as _WSP,
|
||||
)
|
||||
|
||||
def _safe_drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
# XXX patch — break on EOF instead of
|
||||
# spinning. Upstream trio's `drain()`
|
||||
# only handles the `BlockingIOError`
|
||||
# (buffer-empty) case; missed the
|
||||
# peer-closed (`recv == b''`) case.
|
||||
if not data:
|
||||
return
|
||||
except BlockingIOError:
|
||||
pass
|
||||
|
||||
_WSP.drain = _safe_drain
|
||||
_APPLIED = True
|
||||
return True
|
||||
Loading…
Reference in New Issue