Compare commits

...

7 Commits

Author SHA1 Message Date
Gud Boi 7ee0dc2e8f Harden `test_infected_asyncio` for fork spawners
Deats,
- `test_echoserver_detailed_mechanics`: add `is_forking_spawner`
  param, wrap `main()` in `fa_main()` with per-backend
  `trio.fail_after` (4s fork / 1s trio) to cap cancel-cascade
  teardown that compounds under forkserver.
- `test_sigint_closes_lifetime_stack`: swap `start_method` param
  for `is_forking_spawner`, pre-init `tmp_file`/`ctx` to `None` so
  KBI firing before `open_context` body doesn't `UnboundLocalError`,
  add `pytest.fail` guard for the spawn-time IPC race case, arm
  `signal.alarm` AFK-safety cap (10s) under fork backends

Also,
- `pytestmark`: add `track_orphaned_uds_per_test` +
  `detect_runaway_subactors_per_test` fixtures.
- `delay()`: hardcode `return 1e3` at top (debug override still in
  place).

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-13 15:56:35 -04:00
Gud Boi b10011a36e Adjust `test_streaming_to_actor_cluster` timeout
For forking spawner backends that is.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-13 15:47:36 -04:00
Gud Boi 7d0a53d205 Enrich `pytestmark` in `test_inter_peer_cancellation`
- `skipon_spawn_backend('subint')`: expand reason with specific
  analysis doc refs + GH issue #379 umbrella link.
- add `track_orphaned_uds_per_test` fixture via `usefixtures` to
  blame-attribute UDS sock-file orphans left by SIGKILL cancel
  cascades.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-13 12:28:17 -04:00
Gud Boi 75d5b4cf7b Adjust `test_simple_context` timeout for forking spawner
(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-13 12:03:58 -04:00
Gud Boi 8aa07a7932 Add `set_fork_aware_capture`, timeout to msg tests
- `test_ext_types_over_ipc`: wrap `main()` in `fa_main()` with
  `trio.fail_after(2)` + commented `capfd.disabled()` investigation
  (pytest#14444).
- `test_basic_payload_spec`: add fixture param with note on fork-spawner
  hang prevention.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-13 11:59:37 -04:00
Gud Boi 10db117864 Add signal-alarm guard to `test_dynamic_pub_sub`
Outer `signal.alarm` cap that fires even when trio's
`fail_after` is blocked by a shielded-await deadlock
(the bug-class-3 hang under MTF backends). Only armed
for fork-based spawners where the bug lives.

Deats,
- `_DIAG_CAP_S = fail_after_s + 5` — slightly larger than the
  trio-native guard so it always loses when the in-band path works.
- `test_log.cancel()` breadcrumbs at each cancel-scope boundary so the
  last-fired breadcrumb names the swallow point on hang.
- try/finally wrapping around each scope level for deterministic
  breadcrumb emission.
- add `is_forking_spawner`, `set_fork_aware_capture` fixture params.
- rework `fail_after_s`: 4s for fork, 12s for trio (was 30/12).

Also,
- `test_sigint_both_stream_types`: `assert 0` -> `pytest.fail()`, add
  TODO re `pytest.raises()`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-13 11:43:17 -04:00
Gud Boi 83b6a3373a Fix `is_forking_spawner` fixture to call helper fn
(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-05-13 11:20:17 -04:00
8 changed files with 284 additions and 82 deletions

View File

@ -57,6 +57,7 @@ from tractor.msg._ops import (
limit_plds,
)
def enc_nsp(obj: Any) -> Any:
actor: Actor = tractor.current_actor(
err_on_no_runtime=False,
@ -617,6 +618,17 @@ def test_ext_types_over_ipc(
debug_mode: bool,
pld_spec: Union[Type],
add_hooks: bool,
set_fork_aware_capture,
# ^^XXX? for forking spawners
# capfd: pytest.CaptureFixture,
# ^^NOTE, super interesting that if
# we disable this below then the tpt-layer
# suffers as an "unclean EOF"??
# ?TODO, determine why/how that mks sense when addressing,
# https://github.com/pytest-dev/pytest/issues/14444
#
):
'''
Ensure we can support extension types coverted using
@ -725,18 +737,26 @@ def test_ext_types_over_ipc(
await p.cancel_actor()
async def fa_main():
with (
trio.fail_after(2),
# ?TODO, investigate? see NOTE above..
# capfd.disabled(),
):
await main()
if (
NamespacePath in pld_types
and
add_hooks
):
trio.run(main)
trio.run(fa_main)
else:
with pytest.raises(
expected_exception=tractor.RemoteActorError,
) as excinfo:
trio.run(main)
trio.run(fa_main)
exc = excinfo.value
# bc `.started(nsp: NamespacePath)` will raise

View File

@ -284,6 +284,11 @@ def test_basic_payload_spec(
return_value: str|None,
started_value: int|PldMsg,
pld_check_started_value: bool,
set_fork_aware_capture,
# ^XXX TODO? for forking spawners, seems to prevent hangs when
# --capture=sys not set, but only for a while then the problem
# accumulates?
):
'''
Validate the most basic `PldRx` msg-type-spec semantics around

View File

@ -147,6 +147,9 @@ def test_dynamic_pub_sub(
test_log: tractor.log.StackLevelAdapter,
reap_subactors_per_test: int,
expect_cancel_exc: Type[BaseException],
is_forking_spawner: bool,
set_fork_aware_capture,
):
failed_to_raise_report: str = (
f'Never got a {expect_cancel_exc!r} ??'
@ -157,79 +160,138 @@ def test_dynamic_pub_sub(
from multiprocessing import cpu_count
cpus = cpu_count()
# Hard safety cap via trio's own cancellation — see the
# module-level NOTE on why we avoid `pytest-timeout` for
# this test. Picked backend-aware: under `trio` backend
# spawn is cheap (~1s for `cpus` actors) but fork-based
# backends pay a per-spawn cost (forkserver round-trip +
# IPC peer-handshake) that can stack up over `cpus - 1`
# sequential `n.run_in_actor()` calls — especially on UDS
# under cross-pytest contention (#451 / #452). Empirically
# 12s flakes on `main_thread_forkserver`; 30s gives
# plenty of headroom while still failing-loud on a real
# hang.
from tractor.spawn import _spawn as _spawn_mod
# Hard safety cap via trio's own cancellation. NOTE see the
# module-level note on why we avoid `pytest-timeout` for this
# test. Picked backend-aware: under `trio` backend spawn is
# cheap (~1s for `cpus` actors) but fork-based backends pay
# a per-spawn cost (forkserver round-trip + IPC peer-handshake)
# that can stack up over `cpus - 1` sequential `n.run_in_actor()`
# calls — especially on UDS under cross-pytest contention
# (#451 / #452). Empirically a flat 15s flakes on
# `main_thread_forkserver` for many-cpu hosts (a single bad
# spawn-stack puts total run-time at ~15.5s, just over);
# 30s gives plenty of headroom while still failing-loud on
# a real hang.
#
# XXX caveat: this is an *inner* `trio.fail_after` — its
# `Cancelled` cannot reach a task parked in a shielded `await`
# (e.g. inside actor-nursery teardown). When the in-band cancel
# path is itself buggy (the bug-class-3 `raise KBI` swallow we're
# currently chasing) this guard does NOT fire and the test sits
# forever until external SIGINT. The `_DIAG_CAP_S` outer guard
# below is the AFK-safety counterpart.
fail_after_s: int = (
30
if _spawn_mod._spawn_method == 'main_thread_forkserver'
4
if is_forking_spawner
else 12
)
# outer guard: when the inner fail_after fails to fire because of
# a shielded-await deadlock, this cap *aborts the trio run via
# signal.alarm → KBI* so AFK runs don't sit for >20min on the
# bug-class-3 hang. Slightly larger than `fail_after_s` so the
# trio-native path always wins when it works.
_DIAG_CAP_S: int = fail_after_s + 5
async def main():
with trio.fail_after(fail_after_s):
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
# bug-class-3 breadcrumb: tag each level of the cancel path
# so when the run hangs and we capture cancel-level logs, the
# *last* breadcrumb that fired names the swallow point.
test_log.cancel('test_dynamic_pub_sub: enter main()')
try:
with trio.fail_after(fail_after_s):
test_log.cancel(
f'test_dynamic_pub_sub: '
f'enter `trio.fail_after({fail_after_s})` scope'
)
try:
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
test_log.cancel(
'test_dynamic_pub_sub: '
'actor nursery opened'
)
# name of this actor will be same as target func
await n.run_in_actor(publisher)
# name of this actor will be same as target func
await n.run_in_actor(publisher)
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
# make one dynamic subscriber
await n.run_in_actor(
consumer,
name='consumer_dynamic',
subs=list(_registry.keys()),
)
# block until "cancelled by user"
await trio.sleep(3)
test_log.warning(
f'Raising user cancel exc: '
f'{expect_cancel_exc!r}'
)
test_log.cancel(
f'test_dynamic_pub_sub: '
f'ABOUT TO RAISE {expect_cancel_exc!r}'
)
raise expect_cancel_exc('simulate user cancel!')
finally:
test_log.cancel(
'test_dynamic_pub_sub: '
'actor nursery `__aexit__` returned'
)
test_log.cancel(
'test_dynamic_pub_sub: `fail_after` scope exited'
)
finally:
test_log.cancel(
'test_dynamic_pub_sub: leaving `main()`'
)
# make one dynamic subscriber
await n.run_in_actor(
consumer,
name='consumer_dynamic',
subs=list(_registry.keys()),
)
# block until "cancelled by user"
await trio.sleep(3)
test_log.warning(
f'Raising user cancel exc: '
f'{expect_cancel_exc!r}'
)
raise expect_cancel_exc('simulate user cancel!')
# outer signal-based guard — survives a shielded-await deadlock
# since `signal.alarm` raises in the main thread regardless of
# trio's scope state. ONLY armed under fork-based backends since
# the bug we're chasing is MTF-specific.
import signal
armed_alarm: bool = bool(is_forking_spawner)
if armed_alarm:
signal.alarm(_DIAG_CAP_S)
try:
trio.run(main)
pytest.fail(failed_to_raise_report)
except expect_cancel_exc:
# parent-side raised the user-cancel exc directly and
# it propagated unwrapped; clean path.
test_log.exception('Got user-cancel exc AS EXPECTED')
except BaseExceptionGroup as err:
# under fork-based backends the user-raised cancel
# can race with subactor-side stream teardown
# (`trio.EndOfChannel` from a publisher's `send()`
# whose remote half got cut). The expected exc may
# then be nested deeper in the group rather than at
# the top level. `BaseExceptionGroup.split()` walks
# the exc tree recursively (Python 3.11+).
matched, _ = err.split(expect_cancel_exc)
if matched is None:
try:
trio.run(main)
pytest.fail(failed_to_raise_report)
except expect_cancel_exc:
# parent-side raised the user-cancel exc directly and
# it propagated unwrapped; clean path.
test_log.exception('Got user-cancel exc AS EXPECTED')
except BaseExceptionGroup as err:
# under fork-based backends the user-raised cancel
# can race with subactor-side stream teardown
# (`trio.EndOfChannel` from a publisher's `send()`
# whose remote half got cut). The expected exc may
# then be nested deeper in the group rather than at
# the top level. `BaseExceptionGroup.split()` walks
# the exc tree recursively (Python 3.11+).
matched, _ = err.split(expect_cancel_exc)
if matched is None:
pytest.fail(failed_to_raise_report)
test_log.exception('Got user-cancel exc AS EXPECTED')
test_log.exception('Got user-cancel exc AS EXPECTED')
finally:
# always disarm so a passing test doesn't get killed
# post-trio.run by a stale alarm.
if armed_alarm:
signal.alarm(0)
@tractor.context
@ -361,9 +423,12 @@ def test_sigint_both_stream_types():
resp = await stream.receive()
assert resp == msg
raise KeyboardInterrupt
# TODO, use pytest.raises() here instead?
# (why weren't we originally?)
try:
trio.run(main)
assert 0, "Didn't receive KBI!?"
pytest.fail("Didn't receive KBI!?")
except KeyboardInterrupt:
pass

View File

@ -77,6 +77,7 @@ async def worker(
@tractor_test
async def test_streaming_to_actor_cluster(
tpt_proto: str,
is_forking_spawner: bool,
):
'''
Open an actor "cluster" using the (experimental) `._clustering`
@ -88,7 +89,11 @@ async def test_streaming_to_actor_cluster(
f'Test currently fails with tpt-proto={tpt_proto!r}\n'
)
with trio.fail_after(6):
delay: float = (
10 if is_forking_spawner
else 6
)
with trio.fail_after(delay):
async with (
open_actor_cluster(modules=[__name__]) as portals,

View File

@ -188,9 +188,16 @@ def test_simple_context(
pointlessly_open_stream,
reg_addr: tuple,
debug_mode: bool,
is_forking_spawner: bool,
):
timeout = 1.5 if not platform.system() == 'Windows' else 4
timeout: float = 1.5
# windows and forking-spawner both have "slower but more
# deterministic" cancel teardown.
if platform.system() == 'Windows':
timeout = 4
elif is_forking_spawner:
timeout = 3
async def main():

View File

@ -45,6 +45,12 @@ from tractor._testing import expect_ctxc
# `test_legacy_one_way_streaming`, etc.).
pytestmark = pytest.mark.usefixtures(
'reap_subactors_per_test',
# NOTE, asyncio cancel cascade has historically
# triggered both UDS sockfile leaks (SIGKILL path)
# AND the trio `WakeupSocketpair.drain()` busy-loop
# — see `test_aio_simple_error`'s history.
'track_orphaned_uds_per_test',
'detect_runaway_subactors_per_test',
)
@ -52,6 +58,7 @@ pytestmark = pytest.mark.usefixtures(
scope='module',
)
def delay(debug_mode: bool) -> int:
return 1e3
if debug_mode:
return 999
else:
@ -826,13 +833,19 @@ async def trio_to_aio_echo_server(
@pytest.mark.parametrize(
'raise_error_mid_stream',
[False, Exception, KeyboardInterrupt],
[
False,
Exception,
KeyboardInterrupt,
],
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
reg_addr: tuple[str, int],
debug_mode: bool,
raise_error_mid_stream,
is_forking_spawner: bool,
):
async def main():
async with tractor.open_nursery(
@ -880,12 +893,34 @@ def test_echoserver_detailed_mechanics(
# is cancelled by kbi or out of task cancellation
await p.cancel_actor()
# NOTE: under fork-based backends the cancel-cascade
# path is structurally slower than `trio`'s subproc-exec
# (per-spawn forkserver-handshake compounds during
# teardown). Bump the cap so cross-test contamination
# doesn't flake this — see
# `ai/conc-anal/cancel_cascade_too_slow_under_main_thread_forkserver_issue.md`.
timeout: float = (
999 if tractor.debug_mode()
else 4 if is_forking_spawner
else 1
)
with_timeout: bool = (
True
# False
)
async def fa_main():
if with_timeout:
with trio.fail_after(timeout):
await main()
else:
await main()
if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream):
trio.run(main)
trio.run(fa_main)
else:
trio.run(main)
trio.run(fa_main)
@tractor.context
@ -1038,7 +1073,7 @@ def test_sigint_closes_lifetime_stack(
bg_aio_task: bool,
trio_side_is_shielded: bool,
send_sigint_to: str,
start_method: str,
is_forking_spawner: bool,
):
'''
Ensure that an infected child can use the `Actor.lifetime_stack`
@ -1053,6 +1088,14 @@ def test_sigint_closes_lifetime_stack(
if debug_mode
else 1
)
# pre-init so the `except (KeyboardInterrupt, ContextCancelled)`
# handler below doesn't `UnboundLocalError` if KBI fires BEFORE
# we ever enter the `as (ctx, first)` body (e.g. when
# `p.open_context().__aenter__` is hung waiting for the
# subactor's `StartAck` due to a fork-child IPC race —
# see `dynamic_pub_sub_spawn_time_transport_close_under_mtf_issue.md`).
tmp_file: Path|None = None
ctx: tractor.Context|None = None
try:
an: tractor.ActorNursery
async with tractor.open_nursery(
@ -1078,7 +1121,7 @@ def test_sigint_closes_lifetime_stack(
) as (ctx, first):
path_str, cpid = first
tmp_file: Path = Path(path_str)
tmp_file = Path(path_str)
assert tmp_file.exists()
# XXX originally to simulate what (hopefully)
@ -1129,7 +1172,7 @@ def test_sigint_closes_lifetime_stack(
if (
send_sigint_to == 'child'
and
start_method == 'main_thread_forkserver'
is_forking_spawner
):
pytest.xfail(
reason=(
@ -1156,6 +1199,21 @@ def test_sigint_closes_lifetime_stack(
KeyboardInterrupt,
ContextCancelled,
):
# If we got here BEFORE entering the ctx body (e.g.
# spawn-time IPC race hung `open_context.__aenter__` and
# the AFK-guard `signal.alarm` fired KBI from outside the
# trio loop), `tmp_file`/`ctx` are still `None` — surface
# that fact directly instead of `UnboundLocalError`.
if tmp_file is None:
pytest.fail(
'KBI/ctxc fired BEFORE `p.open_context()` returned '
"the child's `started` value — likely fork-child "
'IPC race; see '
'`ai/conc-anal/'
'dynamic_pub_sub_spawn_time_transport_close_'
'under_mtf_issue.md`'
)
# XXX CASE 2: without the bug fixed, in the
# KBI-raised-in-parent case, the actor teardown should
# never get run (silently abaondoned by `asyncio`..) and
@ -1163,7 +1221,32 @@ def test_sigint_closes_lifetime_stack(
assert not tmp_file.exists()
assert ctx.maybe_error
trio.run(main)
# outer signal-based AFK-safety guard. mirrors the pattern in
# `tests/test_advanced_streaming.py::test_dynamic_pub_sub`: when
# the in-band trio cancel path doesn't fire (e.g. parent is
# parked in a shielded `await` inside actor-nursery teardown, or
# `open_context.__aenter__` hangs waiting for a child's
# `StartAck` that never comes), `signal.alarm` raises KBI in the
# main thread regardless of trio's scope state. This caps the
# absolute wall-clock so an AFK run can't sit for an hour on a
# forkserver-launchpad-contamination hang. Only armed under fork-
# based backends since the bug class is MTF-specific.
_AFK_CAP_S: int = (
999 if debug_mode
else 10
)
armed_alarm: bool = (
not debug_mode
and
is_forking_spawner
)
if armed_alarm:
signal.alarm(_AFK_CAP_S)
try:
trio.run(main)
finally:
if armed_alarm:
signal.alarm(0)

View File

@ -26,14 +26,30 @@ from tractor._testing import (
from .conftest import cpu_scaling_factor
pytestmark = pytest.mark.skipon_spawn_backend(
'subint',
reason=(
'XXX SUBINT GIL-CONTENTION HANGING TEST XXX\n'
'See oustanding issue(s)\n'
# TODO, put issue link!
)
)
pytestmark = [
pytest.mark.skipon_spawn_backend(
'subint',
reason=(
'XXX SUBINT GIL-CONTENTION HANGING TEST XXX\n'
'Inter-peer cancel cascades under '
'`--spawn-backend=subint` trip the abandoned-subint '
'GIL-hostage class — see\n'
' - `ai/conc-anal/subint_sigint_starvation_issue.md` '
'(GIL-hostage, SIGINT-unresponsive)\n'
' - `ai/conc-anal/subint_cancel_delivery_hang_issue.md` '
'(sibling: parent parks on dead chan)\n'
' - https://github.com/goodboy/tractor/issues/379 '
'(subint umbrella)\n'
)
),
# NOTE, inter-peer cancellation tests stress the
# multi-actor cancel cascade which under SIGKILL
# leaves UDS sock-files orphaned. Track per-test
# for blame attribution.
pytest.mark.usefixtures(
'track_orphaned_uds_per_test',
),
]
# XXX TODO cases:
# - [x] WE cancelled the peer and thus should not see any raised

View File

@ -653,6 +653,7 @@ def pytest_generate_tests(
# scope='module',
# )
def _is_forking_spawner(
start_method: str,
) -> bool:
@ -670,7 +671,7 @@ def is_forking_spawner(
Is the `pytest` run using a `fork()`ing process spawning-backend?
'''
return _is_forking_spawner
return _is_forking_spawner(start_method)
def maybe_xfail_for_spawner(