Compare commits

...

5 Commits

Author SHA1 Message Date
Gud Boi f8178df0fd Return parent `pid: int` from new `reap_subactors_per_test` fixture 2026-04-27 23:27:19 -04:00
Gud Boi 530160fa69 Use `trio.fail_after` cap in `test_dynamic_pub_sub`
Drop `@pytest.mark.timeout(...)` for the per-test wall-clock
cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)`
inside `main()` instead.

Both pytest-timeout enforcement modes are incompatible with
trio under fork-based backends:

- `method='signal'` (SIGALRM) synchronously raises `Failed`
  in trio's main thread mid-`epoll.poll()`, leaving
  `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got
  abandoned") so EVERY subsequent `trio.run()` in the same
  pytest process bails with
  `RuntimeError: Attempted to call run() from inside a run()`
  — full-session poison.
- `method='thread'` calls `_thread.interrupt_main()` which
  can let the KBI escape trio's `KIManager` under fork-
  cascade teardown races and bubble out of pytest entirely
  — kills the whole session.

`trio.fail_after()` keeps cancellation inside the trio loop:
- Raises `TooSlowError` cleanly through the open-nursery's
  cancel cascade.
- Doesn't disturb any out-of-band signal/thread state.
- Failure stays scoped to the single test — no cross-test
  global state corruption either way.

Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub`
go from 5/10 fail (with global-state poison) to 3/10 fail
(no poison, all sibling tests still pass). The ~30%
remaining flake rate is a genuine fork-cancel-cascade
hang — separate from this fix but no longer contaminates.

Module-level NOTE comment explains the rationale so future
readers don't re-introduce the bug.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 23:25:04 -04:00
Gud Boi b376eb0332 Add opt-in `reap_subactors_per_test` fixture
Function-scoped, NON-autouse zombie-subactor reaper for
modules whose teardown is known-leaky enough to cascade-
fail every following test in a session.

Sibling to the autouse session-scoped `_reap_orphaned_subactors`. The
session-scoped one fires at session end — too late to save tests that
follow a hung/leaky test in the suite. The new fixture, opted into via
`pytestmark = pytest.mark.usefixtures(...)`, runs between tests in
a problem-module so a leftover subactor from test N can't squat on
registrar ports / UDS paths / shm segments needed by tests N+1,
N+2, ...

Intentionally NOT autouse — the fixture's presence on a module signals
"this module's teardown leaks; please root-cause instead of relying
forever on cleanup". A visibility-vs-convenience trade picked in favor
of the former.

Apply to `tests/test_infected_asyncio.py` since both recent full-suite
runs (parallel-tpt-proto + TCP-only) showed the cascade originating in
this file's KBI- and SIGINT-flavored tests under
`main_thread_forkserver`. Module-comment names the specific offenders so
future de-flake work has a starting point.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 21:41:02 -04:00
Gud Boi 7c5dd4d033 Fix `_testing.addr.get_rando_addr` cross-process collisions
Previously the random port was a default-arg expression
(`_rando_port: str = random.randint(1000, 9999)`) — evaluated
ONCE at module import time, making it a per-process singleton.
Two parallel pytest sessions had a 1/9000 birthday-pair chance
of picking the same port; when it hit, every `reg_addr`-using
test in BOTH runs would cascade-fail with "Address already in
use".

Switch to per-call `random.randint()` salted with `os.getpid()`
so:

- within one session: two calls return distinct ports — e.g.
  `test_tpt_bind_addrs::bind-subset-reg` now actually gets two
  different reg addrs on the TCP backend (it was silently
  duplicating before),
- across parallel sessions: pid salt biases each process's
  port choices apart, making cross-run collisions
  vanishingly rare.

Drop the bogus `: str` annotation (was always `int`). UDS already gets
per-process isolation via `UDSAddress.get_random()`'s `@<pid>`
socket-path suffix, so no change needed there.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 20:15:20 -04:00
Gud Boi cbdf1eb6db Guard `subint_forkserver` stub against re-alias
Add `test_subint_forkserver_key_errors_cleanly` — a tn-tier
regression guard that pins down the variant-2 reservation
contract: the `'subint_forkserver'` key in
`_spawn._methods` MUST raise `NotImplementedError` today,
not silently dispatch to `main_thread_forkserver_proc`.

The transient alias-state existed briefly during the rename
(commit `57dae0e4`'s "Split forkserver backend into variant
1/2 mods" landed the alias; `5e83881f` flipped it to the
stub). Without a guard, a future refactor could easily
re-collapse the two keys back to a single coro and silently
break the variant-1 / variant-2 contract.

Also asserts the stub's error msg surfaces the two pointers
an operator hitting it actually needs:

- `'main_thread_forkserver'` — the working backend they
  prolly meant,
- `'msgspec#1026'` — the upstream blocker that has to land
  before variant-2 can ship.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 20:06:44 -04:00
5 changed files with 247 additions and 42 deletions

View File

@ -603,3 +603,50 @@ def test_orphaned_subactor_sigint_cleanup_DRAFT(
proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
pass
# ----------------------------------------------------------------
# regression guard: variant-2 (`subint_forkserver`) placeholder
# MUST raise `NotImplementedError` today — guards against future
# commits accidentally re-aliasing the key to the variant-1
# coroutine (which was a transient state during the rename).
# ----------------------------------------------------------------
def test_subint_forkserver_key_errors_cleanly() -> None:
'''
`--spawn-backend=subint_forkserver` is reserved for the
eventual variant-2 (subint-isolated child runtime)
backend, gated on jcrist/msgspec#1026 unblocking PEP 684
isolated-mode subints upstream.
Until that lands, the dispatch entry MUST raise
`NotImplementedError` immediately rather than silently
aliasing to `main_thread_forkserver_proc`. Verify the
error message also surfaces both the working-backend
pointer and the upstream-blocker ref so an operator
arriving at the error has somewhere to go.
'''
import asyncio
from tractor.spawn._spawn import _methods
proc = _methods['subint_forkserver']
with pytest.raises(NotImplementedError) as ei:
# signature args match `main_thread_forkserver_proc`'s
# — the stub raises before touching them so dummy
# values are fine.
asyncio.run(
proc(
'x', None, None, {}, [],
('127.0.0.1', 0), {},
)
)
msg: str = str(ei.value)
assert 'main_thread_forkserver' in msg, (
f'stub error msg should redirect to the working '
f'variant-1 backend; got: {msg!r}'
)
assert 'msgspec#1026' in msg or '1026' in msg, (
f'stub error msg should reference the upstream '
f'blocker (jcrist/msgspec#1026); got: {msg!r}'
)

View File

@ -5,6 +5,7 @@ Advanced streaming patterns using bidirectional streams and contexts.
from collections import Counter
import itertools
import platform
from typing import Type
import pytest
import trio
@ -106,61 +107,120 @@ async def consumer(
print(f'{uid} got: {value}')
# NOTE: deliberately NOT using `@pytest.mark.timeout(...)` —
# both pytest-timeout enforcement modes break trio under
# fork-based backends:
#
# - `method='signal'` (SIGALRM): the handler synchronously
# raises `Failed` in trio's main thread mid-`epoll.poll()`,
# leaves `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest
# run got abandoned"), and EVERY subsequent `trio.run()`
# in the same pytest process bails with
# `RuntimeError: Attempted to call run() from inside a
# run()` — session-wide poison.
#
# - `method='thread'`: calls `_thread.interrupt_main()`
# raising `KeyboardInterrupt` into the main thread. Under
# fork-based backends with mid-cascade fd-juggling the KBI
# can escape trio's `KIManager` and bubble out of pytest
# itself — kills the WHOLE session.
#
# Instead we use `trio.fail_after()` INSIDE `main()` below:
# trio's own `Cancelled`/`TooSlowError` machinery handles the
# timeout, cleanly unwinds the actor nursery's cancel
# cascade, and only fails the single test (no cross-test
# state corruption either way).
#
# `pyproject.toml`'s default `timeout = 200` is still a
# last-resort safety net.
@pytest.mark.parametrize(
'expect_cancel_exc', [
KeyboardInterrupt,
trio.TooSlowError,
],
ids=lambda item:
f'expect_user_exc_raised={item.__name__}'
)
def test_dynamic_pub_sub(
reg_addr: tuple,
debug_mode: bool,
test_log: tractor.log.StackLevelAdapter,
reap_subactors_per_test: int,
expect_cancel_exc: Type[BaseException],
):
failed_to_raise_report: str = (
f'Never got a {expect_cancel_exc!r} ??'
)
global _registry
from multiprocessing import cpu_count
cpus = cpu_count()
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
# Hard safety cap via trio's own cancellation — see
# the module-level NOTE on why we avoid `pytest-timeout`
# for this test. Total expected runtime: ~1s spawn + 3s
# sleep + ~1-2s cancel cascade ≈ 5-6s. 12s gives plenty
# of headroom; if exceeded, trio raises `TooSlowError`
# which the outer `try` block treats as a hang report
# (or, if `expect_cancel_exc is trio.TooSlowError`, as
# the test passing — either way, no global state
# corruption).
with trio.fail_after(12):
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
# name of this actor will be same as target func
await n.run_in_actor(publisher)
# name of this actor will be same as target func
await n.run_in_actor(publisher)
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
# make one dynamic subscriber
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
name='consumer_dynamic',
subs=list(_registry.keys()),
)
# make one dynamic subscriber
await n.run_in_actor(
consumer,
name='consumer_dynamic',
subs=list(_registry.keys()),
)
# block until cancelled by user
with trio.fail_after(3):
await trio.sleep_forever()
# block until "cancelled by user"
await trio.sleep(3)
test_log.warning(
f'Raising user cancel exc: '
f'{expect_cancel_exc!r}'
)
raise expect_cancel_exc('simulate user cancel!')
try:
trio.run(main)
except (
trio.TooSlowError,
ExceptionGroup,
) as err:
if isinstance(err, ExceptionGroup):
for suberr in err.exceptions:
if isinstance(suberr, trio.TooSlowError):
break
else:
pytest.fail('Never got a `TooSlowError` ?')
pytest.fail(failed_to_raise_report)
except expect_cancel_exc:
# parent-side raised the user-cancel exc directly and
# it propagated unwrapped; clean path.
test_log.exception('Got user-cancel exc AS EXPECTED')
except BaseExceptionGroup as err:
# under fork-based backends the user-raised cancel
# can race with subactor-side stream teardown
# (`trio.EndOfChannel` from a publisher's `send()`
# whose remote half got cut). The expected exc may
# then be nested deeper in the group rather than at
# the top level. `BaseExceptionGroup.split()` walks
# the exc tree recursively (Python 3.11+).
matched, _ = err.split(expect_cancel_exc)
if matched is None:
pytest.fail(failed_to_raise_report)
assert err
test_log.exception('Timed out AS EXPECTED')
test_log.exception('Got user-cancel exc AS EXPECTED')
@tractor.context
@ -292,7 +352,6 @@ def test_sigint_both_stream_types():
resp = await stream.receive()
assert resp == msg
raise KeyboardInterrupt
try:
trio.run(main)
assert 0, "Didn't receive KBI!?"
@ -362,7 +421,12 @@ async def inf_streamer(
print('streamer exited .open_streamer() block')
@pytest.mark.timeout(
6,
method='signal',
)
def test_local_task_fanout_from_stream(
reg_addr: tuple,
debug_mode: bool,
):
'''
@ -427,4 +491,9 @@ def test_local_task_fanout_from_stream(
await p.cancel_actor()
trio.run(main)
async def w_timeout():
with trio.fail_after(6):
await main()
# trio.run(main)
trio.run(w_timeout)

View File

@ -32,6 +32,22 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc
# Per-test zombie-subactor reaper. Opt-in (NOT autouse) —
# see `tractor._testing.pytest.reap_subactors_per_test`'s
# docstring for the full rationale. This module specifically
# needs it because tests like
# `test_echoserver_detailed_mechanics[KeyboardInterrupt]`
# and the `test_sigint_closes_lifetime_stack[*]` matrix have
# been observed to hang past pytest's wall-clock under
# `main_thread_forkserver`, leaving subactor forks that
# squat on registrar resources and cascade-fail every
# subsequent test (`test_inter_peer_cancellation`,
# `test_legacy_one_way_streaming`, etc.).
pytestmark = pytest.mark.usefixtures(
'reap_subactors_per_test',
)
@pytest.fixture(
scope='module',
)

View File

@ -22,6 +22,7 @@ Might be eventually useful to expose as a util set from
our `tractor.discovery` subsys?
'''
import os
import random
from typing import (
Type,
@ -31,17 +32,28 @@ from tractor.discovery import _addr
def get_rando_addr(
tpt_proto: str,
*,
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
) -> tuple[str, str|int]:
'''
Used to globally override the runtime to the
per-test-session-dynamic addr so that all tests never conflict
with any other actor tree using the default.
Cross-process isolation: TCP-port picks salt
`random.randint()` with `os.getpid()` so two parallel
pytest sessions (e.g. one running `--tpt-proto=tcp` and
another `--tpt-proto=uds` concurrently) almost-never
collide on the same port. Without the salt, the prior
impl's import-time `random.randint(1000, 9999)` default
arg was effectively a process-singleton with a 1/9000
chance of cross-run collision per pair and when it
happened EVERY `reg_addr`-using test in BOTH runs would
fight over the bind, cascading into a chain of
"Address already in use" failures.
For UDS this concern doesn't apply: `UDSAddress.get_random()`
already builds socket paths from `os.getpid()` so each
pytest process gets its own socket-path namespace.
'''
addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
@ -51,9 +63,21 @@ def get_rando_addr(
testrun_reg_addr: tuple[str, int|str]
match tpt_proto:
case 'tcp':
# Per-call randomness mixed with `os.getpid()` —
# see the docstring above for the cross-process
# isolation rationale. The mix means:
# - within one pytest session, two calls return
# distinct ports (good for tests that need a
# second-different-reg-addr in one fn body, e.g.
# `test_tpt_bind_addrs::bind-subset-reg`),
# - across parallel pytest sessions, the pid bias
# makes coincident port choices unlikely.
port: int = 1000 + (
random.randint(0, 8999) + os.getpid()
) % 9000
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
port,
)
# NOTE, file-name uniqueness (no-collisions) will be based on

View File

@ -327,6 +327,55 @@ def _reap_orphaned_subactors():
reap(pids, grace=3.0)
@pytest.fixture
def reap_subactors_per_test() -> int:
'''
Per-test (function-scoped) zombie-subactor reaper
**opt-in**, NOT autouse.
When a test's teardown fails to fully cancel its actor
tree (e.g. an asyncio cancel-cascade times out under
`main_thread_forkserver`, pytest hits its 200s wall-
clock and abandons), the leftover subactor lingers as a
direct child of `pytest` and squats on whatever
registrar port / UDS path / shm segment it had bound.
Subsequent tests trying to allocate the same resource
fail and with backends that bind a session-shared
`reg_addr`, that means EVERY following test in the
suite cascades. The session-scoped sibling
(`_reap_orphaned_subactors`) only kicks in at session
end which is too late to save the cascade.
Apply at module-level on the topically-problematic
test files via:
```python
pytestmark = pytest.mark.usefixtures(
'reap_subactors_per_test',
)
```
Or per-test via the same `usefixtures` mark on a
specific function. Intentionally NOT autouse so the
fixture's presence on a module signals "this module's
teardown is known-leaky enough to contaminate
siblings"; the visibility helps future-us track down
root causes rather than burying them under blanket
cleanup.
'''
import os
parent_pid: int = os.getpid()
yield parent_pid
from tractor._testing._reap import (
find_descendants,
reap,
)
pids: list[int] = find_descendants(parent_pid)
if pids:
reap(pids, grace=3.0)
@pytest.fixture(scope='session')
def debug_mode(
request: pytest.FixtureRequest,