Use trace CM helpers in `test_dynamic_pub_sub`

Replace inline `trio.fail_after` + manual `signal.alarm` guard with the
`_testing.trace` CM helpers that auto-capture a full ptree/wchan/py-spy
diag snapshot to disk on timeout.

Deats,
- inner guard: `trio.fail_after` → `fail_after_w_trace` (async CM,
  captures on `TooSlowError`).
- outer AFK guard: raw `signal.alarm` → `afk_alarm_w_trace` (sync
  CM, captures on `SIGALRM`), only armed under fork backends.
  Extracts `_run_and_match()` helper to keep branching clean.
- bump `fail_after_s` from 4/12 → 8/20 to stop borderline flakes
  while diag harness accumulates evidence.
- drop `_DIAG_CAP_S` var + manual signal import (now internal to
  `afk_alarm_w_trace`).

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver_backend
Gud Boi 2026-05-13 20:39:37 -04:00
parent 32955db02e
commit bd07a95d80
1 changed files with 38 additions and 37 deletions

View File

@ -10,6 +10,10 @@ from typing import Type
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor._testing.trace import (
AfkAlarmWTraceFactory,
FailAfterWTraceFactory,
)
def is_win(): def is_win():
@ -150,6 +154,9 @@ def test_dynamic_pub_sub(
is_forking_spawner: bool, is_forking_spawner: bool,
set_fork_aware_capture, set_fork_aware_capture,
fail_after_w_trace: FailAfterWTraceFactory,
afk_alarm_w_trace: AfkAlarmWTraceFactory,
): ):
failed_to_raise_report: str = ( failed_to_raise_report: str = (
f'Never got a {expect_cancel_exc!r} ??' f'Never got a {expect_cancel_exc!r} ??'
@ -167,42 +174,36 @@ def test_dynamic_pub_sub(
# a per-spawn cost (forkserver round-trip + IPC peer-handshake) # a per-spawn cost (forkserver round-trip + IPC peer-handshake)
# that can stack up over `cpus - 1` sequential `n.run_in_actor()` # that can stack up over `cpus - 1` sequential `n.run_in_actor()`
# calls — especially on UDS under cross-pytest contention # calls — especially on UDS under cross-pytest contention
# (#451 / #452). Empirically a flat 15s flakes on # (#451 / #452). 4s was flaking right at the edge under fork
# `main_thread_forkserver` for many-cpu hosts (a single bad # backends — bumped to 8s with diag-snapshot-on-timeout via
# spawn-stack puts total run-time at ~15.5s, just over); # `fail_after_w_trace` so a borderline run still fails loud
# 30s gives plenty of headroom while still failing-loud on # but lands a ptree/wchan/py-spy dump in
# a real hang. # `$XDG_CACHE_HOME/tractor/hung-dumps/` for inspection.
# #
# XXX caveat: this is an *inner* `trio.fail_after` — its # XXX caveat: this is an *inner* trio cancel — its `Cancelled`
# `Cancelled` cannot reach a task parked in a shielded `await` # cannot reach a task parked in a shielded `await` (e.g. inside
# (e.g. inside actor-nursery teardown). When the in-band cancel # actor-nursery teardown). When the in-band cancel path is
# path is itself buggy (the bug-class-3 `raise KBI` swallow we're # itself buggy (the bug-class-3 `raise KBI` swallow we're
# currently chasing) this guard does NOT fire and the test sits # currently chasing) this guard does NOT fire and the test
# forever until external SIGINT. The `_DIAG_CAP_S` outer guard # sits forever until external SIGINT. The `afk_alarm_w_trace`
# below is the AFK-safety counterpart. # outer guard below is the AFK-safety counterpart (SIGALRM
# raises in the main thread regardless of trio scope state).
fail_after_s: int = ( fail_after_s: int = (
4 8
if is_forking_spawner if is_forking_spawner
else 12 else 20
) )
# outer guard: when the inner fail_after fails to fire because of
# a shielded-await deadlock, this cap *aborts the trio run via
# signal.alarm → KBI* so AFK runs don't sit for >20min on the
# bug-class-3 hang. Slightly larger than `fail_after_s` so the
# trio-native path always wins when it works.
_DIAG_CAP_S: int = fail_after_s + 5
async def main(): async def main():
# bug-class-3 breadcrumb: tag each level of the cancel path # bug-class-3 breadcrumb: tag each level of the cancel path
# so when the run hangs and we capture cancel-level logs, the # so when the run hangs and we capture cancel-level logs, the
# *last* breadcrumb that fired names the swallow point. # *last* breadcrumb that fired names the swallow point.
test_log.cancel('test_dynamic_pub_sub: enter main()') test_log.cancel('test_dynamic_pub_sub: enter main()')
try: try:
with trio.fail_after(fail_after_s): async with fail_after_w_trace(fail_after_s):
test_log.cancel( test_log.cancel(
f'test_dynamic_pub_sub: ' f'test_dynamic_pub_sub: '
f'enter `trio.fail_after({fail_after_s})` scope' f'enter `fail_after_w_trace({fail_after_s})` scope'
) )
try: try:
async with tractor.open_nursery( async with tractor.open_nursery(
@ -258,15 +259,7 @@ def test_dynamic_pub_sub(
'test_dynamic_pub_sub: leaving `main()`' 'test_dynamic_pub_sub: leaving `main()`'
) )
# outer signal-based guard — survives a shielded-await deadlock def _run_and_match():
# since `signal.alarm` raises in the main thread regardless of
# trio's scope state. ONLY armed under fork-based backends since
# the bug we're chasing is MTF-specific.
import signal
armed_alarm: bool = bool(is_forking_spawner)
if armed_alarm:
signal.alarm(_DIAG_CAP_S)
try:
try: try:
trio.run(main) trio.run(main)
pytest.fail(failed_to_raise_report) pytest.fail(failed_to_raise_report)
@ -287,11 +280,19 @@ def test_dynamic_pub_sub(
pytest.fail(failed_to_raise_report) pytest.fail(failed_to_raise_report)
test_log.exception('Got user-cancel exc AS EXPECTED') test_log.exception('Got user-cancel exc AS EXPECTED')
finally:
# always disarm so a passing test doesn't get killed # outer SIGALRM-based guard — survives a shielded-await
# post-trio.run by a stale alarm. # deadlock since `signal.alarm` raises in the main thread
if armed_alarm: # regardless of trio's scope state, AND captures a full diag
signal.alarm(0) # snapshot to `$XDG_CACHE_HOME/tractor/hung-dumps/` before
# re-raising. ONLY armed under fork-based backends since the
# bug we're chasing is MTF-specific. Cap = `fail_after_s + 5`
# so the trio-native path always wins when it works.
if is_forking_spawner:
with afk_alarm_w_trace(fail_after_s + 5):
_run_and_match()
else:
_run_and_match()
@tractor.context @tractor.context