Compare commits

..

5 Commits

Author SHA1 Message Date
Gud Boi f8178df0fd Return parent `pid: int` from new `reap_subactors_per_test` fixture 2026-04-27 23:27:19 -04:00
Gud Boi 530160fa69 Use `trio.fail_after` cap in `test_dynamic_pub_sub`
Drop `@pytest.mark.timeout(...)` for the per-test wall-clock
cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)`
inside `main()` instead.

Both pytest-timeout enforcement modes are incompatible with
trio under fork-based backends:

- `method='signal'` (SIGALRM) synchronously raises `Failed`
  in trio's main thread mid-`epoll.poll()`, leaving
  `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got
  abandoned") so EVERY subsequent `trio.run()` in the same
  pytest process bails with
  `RuntimeError: Attempted to call run() from inside a run()`
  — full-session poison.
- `method='thread'` calls `_thread.interrupt_main()` which
  can let the KBI escape trio's `KIManager` under fork-
  cascade teardown races and bubble out of pytest entirely
  — kills the whole session.

`trio.fail_after()` keeps cancellation inside the trio loop:
- Raises `TooSlowError` cleanly through the open-nursery's
  cancel cascade.
- Doesn't disturb any out-of-band signal/thread state.
- Failure stays scoped to the single test — no cross-test
  global state corruption either way.

Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub`
go from 5/10 fail (with global-state poison) to 3/10 fail
(no poison, all sibling tests still pass). The ~30%
remaining flake rate is a genuine fork-cancel-cascade
hang — separate from this fix but no longer contaminates.

Module-level NOTE comment explains the rationale so future
readers don't re-introduce the bug.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 23:25:04 -04:00
Gud Boi b376eb0332 Add opt-in `reap_subactors_per_test` fixture
Function-scoped, NON-autouse zombie-subactor reaper for
modules whose teardown is known-leaky enough to cascade-
fail every following test in a session.

Sibling to the autouse session-scoped `_reap_orphaned_subactors`. The
session-scoped one fires at session end — too late to save tests that
follow a hung/leaky test in the suite. The new fixture, opted into via
`pytestmark = pytest.mark.usefixtures(...)`, runs between tests in
a problem-module so a leftover subactor from test N can't squat on
registrar ports / UDS paths / shm segments needed by tests N+1,
N+2, ...

Intentionally NOT autouse — the fixture's presence on a module signals
"this module's teardown leaks; please root-cause instead of relying
forever on cleanup". A visibility-vs-convenience trade picked in favor
of the former.

Apply to `tests/test_infected_asyncio.py` since both recent full-suite
runs (parallel-tpt-proto + TCP-only) showed the cascade originating in
this file's KBI- and SIGINT-flavored tests under
`main_thread_forkserver`. Module-comment names the specific offenders so
future de-flake work has a starting point.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 21:41:02 -04:00
Gud Boi 7c5dd4d033 Fix `_testing.addr.get_rando_addr` cross-process collisions
Previously the random port was a default-arg expression
(`_rando_port: str = random.randint(1000, 9999)`) — evaluated
ONCE at module import time, making it a per-process singleton.
Two parallel pytest sessions had a 1/9000 birthday-pair chance
of picking the same port; when it hit, every `reg_addr`-using
test in BOTH runs would cascade-fail with "Address already in
use".

Switch to per-call `random.randint()` salted with `os.getpid()`
so:

- within one session: two calls return distinct ports — e.g.
  `test_tpt_bind_addrs::bind-subset-reg` now actually gets two
  different reg addrs on the TCP backend (it was silently
  duplicating before),
- across parallel sessions: pid salt biases each process's
  port choices apart, making cross-run collisions
  vanishingly rare.

Drop the bogus `: str` annotation (was always `int`). UDS already gets
per-process isolation via `UDSAddress.get_random()`'s `@<pid>`
socket-path suffix, so no change needed there.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 20:15:20 -04:00
Gud Boi cbdf1eb6db Guard `subint_forkserver` stub against re-alias
Add `test_subint_forkserver_key_errors_cleanly` — a tn-tier
regression guard that pins down the variant-2 reservation
contract: the `'subint_forkserver'` key in
`_spawn._methods` MUST raise `NotImplementedError` today,
not silently dispatch to `main_thread_forkserver_proc`.

The transient alias-state existed briefly during the rename
(commit `57dae0e4`'s "Split forkserver backend into variant
1/2 mods" landed the alias; `5e83881f` flipped it to the
stub). Without a guard, a future refactor could easily
re-collapse the two keys back to a single coro and silently
break the variant-1 / variant-2 contract.

Also asserts the stub's error msg surfaces the two pointers
an operator hitting it actually needs:

- `'main_thread_forkserver'` — the working backend they
  prolly meant,
- `'msgspec#1026'` — the upstream blocker that has to land
  before variant-2 can ship.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-27 20:06:44 -04:00
5 changed files with 247 additions and 42 deletions

View File

@ -603,3 +603,50 @@ def test_orphaned_subactor_sigint_cleanup_DRAFT(
proc.wait(timeout=2.0) proc.wait(timeout=2.0)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
pass pass
# ----------------------------------------------------------------
# regression guard: variant-2 (`subint_forkserver`) placeholder
# MUST raise `NotImplementedError` today — guards against future
# commits accidentally re-aliasing the key to the variant-1
# coroutine (which was a transient state during the rename).
# ----------------------------------------------------------------
def test_subint_forkserver_key_errors_cleanly() -> None:
'''
`--spawn-backend=subint_forkserver` is reserved for the
eventual variant-2 (subint-isolated child runtime)
backend, gated on jcrist/msgspec#1026 unblocking PEP 684
isolated-mode subints upstream.
Until that lands, the dispatch entry MUST raise
`NotImplementedError` immediately rather than silently
aliasing to `main_thread_forkserver_proc`. Verify the
error message also surfaces both the working-backend
pointer and the upstream-blocker ref so an operator
arriving at the error has somewhere to go.
'''
import asyncio
from tractor.spawn._spawn import _methods
proc = _methods['subint_forkserver']
with pytest.raises(NotImplementedError) as ei:
# signature args match `main_thread_forkserver_proc`'s
# — the stub raises before touching them so dummy
# values are fine.
asyncio.run(
proc(
'x', None, None, {}, [],
('127.0.0.1', 0), {},
)
)
msg: str = str(ei.value)
assert 'main_thread_forkserver' in msg, (
f'stub error msg should redirect to the working '
f'variant-1 backend; got: {msg!r}'
)
assert 'msgspec#1026' in msg or '1026' in msg, (
f'stub error msg should reference the upstream '
f'blocker (jcrist/msgspec#1026); got: {msg!r}'
)

View File

@ -5,6 +5,7 @@ Advanced streaming patterns using bidirectional streams and contexts.
from collections import Counter from collections import Counter
import itertools import itertools
import platform import platform
from typing import Type
import pytest import pytest
import trio import trio
@ -106,61 +107,120 @@ async def consumer(
print(f'{uid} got: {value}') print(f'{uid} got: {value}')
# NOTE: deliberately NOT using `@pytest.mark.timeout(...)` —
# both pytest-timeout enforcement modes break trio under
# fork-based backends:
#
# - `method='signal'` (SIGALRM): the handler synchronously
# raises `Failed` in trio's main thread mid-`epoll.poll()`,
# leaves `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest
# run got abandoned"), and EVERY subsequent `trio.run()`
# in the same pytest process bails with
# `RuntimeError: Attempted to call run() from inside a
# run()` — session-wide poison.
#
# - `method='thread'`: calls `_thread.interrupt_main()`
# raising `KeyboardInterrupt` into the main thread. Under
# fork-based backends with mid-cascade fd-juggling the KBI
# can escape trio's `KIManager` and bubble out of pytest
# itself — kills the WHOLE session.
#
# Instead we use `trio.fail_after()` INSIDE `main()` below:
# trio's own `Cancelled`/`TooSlowError` machinery handles the
# timeout, cleanly unwinds the actor nursery's cancel
# cascade, and only fails the single test (no cross-test
# state corruption either way).
#
# `pyproject.toml`'s default `timeout = 200` is still a
# last-resort safety net.
@pytest.mark.parametrize(
'expect_cancel_exc', [
KeyboardInterrupt,
trio.TooSlowError,
],
ids=lambda item:
f'expect_user_exc_raised={item.__name__}'
)
def test_dynamic_pub_sub( def test_dynamic_pub_sub(
reg_addr: tuple, reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
test_log: tractor.log.StackLevelAdapter, test_log: tractor.log.StackLevelAdapter,
reap_subactors_per_test: int,
expect_cancel_exc: Type[BaseException],
): ):
failed_to_raise_report: str = (
f'Never got a {expect_cancel_exc!r} ??'
)
global _registry global _registry
from multiprocessing import cpu_count from multiprocessing import cpu_count
cpus = cpu_count() cpus = cpu_count()
async def main(): async def main():
async with tractor.open_nursery( # Hard safety cap via trio's own cancellation — see
registry_addrs=[reg_addr], # the module-level NOTE on why we avoid `pytest-timeout`
debug_mode=debug_mode, # for this test. Total expected runtime: ~1s spawn + 3s
) as n: # sleep + ~1-2s cancel cascade ≈ 5-6s. 12s gives plenty
# of headroom; if exceeded, trio raises `TooSlowError`
# which the outer `try` block treats as a hang report
# (or, if `expect_cancel_exc is trio.TooSlowError`, as
# the test passing — either way, no global state
# corruption).
with trio.fail_after(12):
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
# name of this actor will be same as target func # name of this actor will be same as target func
await n.run_in_actor(publisher) await n.run_in_actor(publisher)
for i, sub in zip( for i, sub in zip(
range(cpus - 2), range(cpus - 2),
itertools.cycle(_registry.keys()) itertools.cycle(_registry.keys())
): ):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
# make one dynamic subscriber
await n.run_in_actor( await n.run_in_actor(
consumer, consumer,
name=f'consumer_{sub}', name='consumer_dynamic',
subs=[sub], subs=list(_registry.keys()),
) )
# make one dynamic subscriber # block until "cancelled by user"
await n.run_in_actor( await trio.sleep(3)
consumer, test_log.warning(
name='consumer_dynamic', f'Raising user cancel exc: '
subs=list(_registry.keys()), f'{expect_cancel_exc!r}'
) )
raise expect_cancel_exc('simulate user cancel!')
# block until cancelled by user
with trio.fail_after(3):
await trio.sleep_forever()
try: try:
trio.run(main) trio.run(main)
except ( pytest.fail(failed_to_raise_report)
trio.TooSlowError, except expect_cancel_exc:
ExceptionGroup, # parent-side raised the user-cancel exc directly and
) as err: # it propagated unwrapped; clean path.
if isinstance(err, ExceptionGroup): test_log.exception('Got user-cancel exc AS EXPECTED')
for suberr in err.exceptions: except BaseExceptionGroup as err:
if isinstance(suberr, trio.TooSlowError): # under fork-based backends the user-raised cancel
break # can race with subactor-side stream teardown
else: # (`trio.EndOfChannel` from a publisher's `send()`
pytest.fail('Never got a `TooSlowError` ?') # whose remote half got cut). The expected exc may
# then be nested deeper in the group rather than at
# the top level. `BaseExceptionGroup.split()` walks
# the exc tree recursively (Python 3.11+).
matched, _ = err.split(expect_cancel_exc)
if matched is None:
pytest.fail(failed_to_raise_report)
assert err test_log.exception('Got user-cancel exc AS EXPECTED')
test_log.exception('Timed out AS EXPECTED')
@tractor.context @tractor.context
@ -292,7 +352,6 @@ def test_sigint_both_stream_types():
resp = await stream.receive() resp = await stream.receive()
assert resp == msg assert resp == msg
raise KeyboardInterrupt raise KeyboardInterrupt
try: try:
trio.run(main) trio.run(main)
assert 0, "Didn't receive KBI!?" assert 0, "Didn't receive KBI!?"
@ -362,7 +421,12 @@ async def inf_streamer(
print('streamer exited .open_streamer() block') print('streamer exited .open_streamer() block')
@pytest.mark.timeout(
6,
method='signal',
)
def test_local_task_fanout_from_stream( def test_local_task_fanout_from_stream(
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -427,4 +491,9 @@ def test_local_task_fanout_from_stream(
await p.cancel_actor() await p.cancel_actor()
trio.run(main) async def w_timeout():
with trio.fail_after(6):
await main()
# trio.run(main)
trio.run(w_timeout)

View File

@ -32,6 +32,22 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
# Per-test zombie-subactor reaper. Opt-in (NOT autouse) —
# see `tractor._testing.pytest.reap_subactors_per_test`'s
# docstring for the full rationale. This module specifically
# needs it because tests like
# `test_echoserver_detailed_mechanics[KeyboardInterrupt]`
# and the `test_sigint_closes_lifetime_stack[*]` matrix have
# been observed to hang past pytest's wall-clock under
# `main_thread_forkserver`, leaving subactor forks that
# squat on registrar resources and cascade-fail every
# subsequent test (`test_inter_peer_cancellation`,
# `test_legacy_one_way_streaming`, etc.).
pytestmark = pytest.mark.usefixtures(
'reap_subactors_per_test',
)
@pytest.fixture( @pytest.fixture(
scope='module', scope='module',
) )

View File

@ -22,6 +22,7 @@ Might be eventually useful to expose as a util set from
our `tractor.discovery` subsys? our `tractor.discovery` subsys?
''' '''
import os
import random import random
from typing import ( from typing import (
Type, Type,
@ -31,17 +32,28 @@ from tractor.discovery import _addr
def get_rando_addr( def get_rando_addr(
tpt_proto: str, tpt_proto: str,
*,
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
) -> tuple[str, str|int]: ) -> tuple[str, str|int]:
''' '''
Used to globally override the runtime to the Used to globally override the runtime to the
per-test-session-dynamic addr so that all tests never conflict per-test-session-dynamic addr so that all tests never conflict
with any other actor tree using the default. with any other actor tree using the default.
Cross-process isolation: TCP-port picks salt
`random.randint()` with `os.getpid()` so two parallel
pytest sessions (e.g. one running `--tpt-proto=tcp` and
another `--tpt-proto=uds` concurrently) almost-never
collide on the same port. Without the salt, the prior
impl's import-time `random.randint(1000, 9999)` default
arg was effectively a process-singleton with a 1/9000
chance of cross-run collision per pair and when it
happened EVERY `reg_addr`-using test in BOTH runs would
fight over the bind, cascading into a chain of
"Address already in use" failures.
For UDS this concern doesn't apply: `UDSAddress.get_random()`
already builds socket paths from `os.getpid()` so each
pytest process gets its own socket-path namespace.
''' '''
addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto] addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto] def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
@ -51,9 +63,21 @@ def get_rando_addr(
testrun_reg_addr: tuple[str, int|str] testrun_reg_addr: tuple[str, int|str]
match tpt_proto: match tpt_proto:
case 'tcp': case 'tcp':
# Per-call randomness mixed with `os.getpid()` —
# see the docstring above for the cross-process
# isolation rationale. The mix means:
# - within one pytest session, two calls return
# distinct ports (good for tests that need a
# second-different-reg-addr in one fn body, e.g.
# `test_tpt_bind_addrs::bind-subset-reg`),
# - across parallel pytest sessions, the pid bias
# makes coincident port choices unlikely.
port: int = 1000 + (
random.randint(0, 8999) + os.getpid()
) % 9000
testrun_reg_addr = ( testrun_reg_addr = (
addr_type.def_bindspace, addr_type.def_bindspace,
_rando_port, port,
) )
# NOTE, file-name uniqueness (no-collisions) will be based on # NOTE, file-name uniqueness (no-collisions) will be based on

View File

@ -327,6 +327,55 @@ def _reap_orphaned_subactors():
reap(pids, grace=3.0) reap(pids, grace=3.0)
@pytest.fixture
def reap_subactors_per_test() -> int:
'''
Per-test (function-scoped) zombie-subactor reaper
**opt-in**, NOT autouse.
When a test's teardown fails to fully cancel its actor
tree (e.g. an asyncio cancel-cascade times out under
`main_thread_forkserver`, pytest hits its 200s wall-
clock and abandons), the leftover subactor lingers as a
direct child of `pytest` and squats on whatever
registrar port / UDS path / shm segment it had bound.
Subsequent tests trying to allocate the same resource
fail and with backends that bind a session-shared
`reg_addr`, that means EVERY following test in the
suite cascades. The session-scoped sibling
(`_reap_orphaned_subactors`) only kicks in at session
end which is too late to save the cascade.
Apply at module-level on the topically-problematic
test files via:
```python
pytestmark = pytest.mark.usefixtures(
'reap_subactors_per_test',
)
```
Or per-test via the same `usefixtures` mark on a
specific function. Intentionally NOT autouse so the
fixture's presence on a module signals "this module's
teardown is known-leaky enough to contaminate
siblings"; the visibility helps future-us track down
root causes rather than burying them under blanket
cleanup.
'''
import os
parent_pid: int = os.getpid()
yield parent_pid
from tractor._testing._reap import (
find_descendants,
reap,
)
pids: list[int] = find_descendants(parent_pid)
if pids:
reap(pids, grace=3.0)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def debug_mode( def debug_mode(
request: pytest.FixtureRequest, request: pytest.FixtureRequest,