From bd07a95d8098ec10a9f3f92b304835a2028ae324 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 13 May 2026 20:39:37 -0400 Subject: [PATCH] Use trace CM helpers in `test_dynamic_pub_sub` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace inline `trio.fail_after` + manual `signal.alarm` guard with the `_testing.trace` CM helpers that auto-capture a full ptree/wchan/py-spy diag snapshot to disk on timeout. Deats, - inner guard: `trio.fail_after` → `fail_after_w_trace` (async CM, captures on `TooSlowError`). - outer AFK guard: raw `signal.alarm` → `afk_alarm_w_trace` (sync CM, captures on `SIGALRM`), only armed under fork backends. Extracts `_run_and_match()` helper to keep branching clean. - bump `fail_after_s` from 4/12 → 8/20 to stop borderline flakes while diag harness accumulates evidence. - drop `_DIAG_CAP_S` var + manual signal import (now internal to `afk_alarm_w_trace`). (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/test_advanced_streaming.py | 75 ++++++++++++++++---------------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 23cd886d..9b1a476f 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -10,6 +10,10 @@ from typing import Type import pytest import trio import tractor +from tractor._testing.trace import ( + AfkAlarmWTraceFactory, + FailAfterWTraceFactory, +) def is_win(): @@ -150,6 +154,9 @@ def test_dynamic_pub_sub( is_forking_spawner: bool, set_fork_aware_capture, + + fail_after_w_trace: FailAfterWTraceFactory, + afk_alarm_w_trace: AfkAlarmWTraceFactory, ): failed_to_raise_report: str = ( f'Never got a {expect_cancel_exc!r} ??' @@ -167,42 +174,36 @@ def test_dynamic_pub_sub( # a per-spawn cost (forkserver round-trip + IPC peer-handshake) # that can stack up over `cpus - 1` sequential `n.run_in_actor()` # calls — especially on UDS under cross-pytest contention - # (#451 / #452). Empirically a flat 15s flakes on - # `main_thread_forkserver` for many-cpu hosts (a single bad - # spawn-stack puts total run-time at ~15.5s, just over); - # 30s gives plenty of headroom while still failing-loud on - # a real hang. + # (#451 / #452). 4s was flaking right at the edge under fork + # backends — bumped to 8s with diag-snapshot-on-timeout via + # `fail_after_w_trace` so a borderline run still fails loud + # but lands a ptree/wchan/py-spy dump in + # `$XDG_CACHE_HOME/tractor/hung-dumps/` for inspection. # - # XXX caveat: this is an *inner* `trio.fail_after` — its - # `Cancelled` cannot reach a task parked in a shielded `await` - # (e.g. inside actor-nursery teardown). When the in-band cancel - # path is itself buggy (the bug-class-3 `raise KBI` swallow we're - # currently chasing) this guard does NOT fire and the test sits - # forever until external SIGINT. The `_DIAG_CAP_S` outer guard - # below is the AFK-safety counterpart. + # XXX caveat: this is an *inner* trio cancel — its `Cancelled` + # cannot reach a task parked in a shielded `await` (e.g. inside + # actor-nursery teardown). When the in-band cancel path is + # itself buggy (the bug-class-3 `raise KBI` swallow we're + # currently chasing) this guard does NOT fire and the test + # sits forever until external SIGINT. The `afk_alarm_w_trace` + # outer guard below is the AFK-safety counterpart (SIGALRM + # raises in the main thread regardless of trio scope state). fail_after_s: int = ( - 4 + 8 if is_forking_spawner - else 12 + else 20 ) - # outer guard: when the inner fail_after fails to fire because of - # a shielded-await deadlock, this cap *aborts the trio run via - # signal.alarm → KBI* so AFK runs don't sit for >20min on the - # bug-class-3 hang. Slightly larger than `fail_after_s` so the - # trio-native path always wins when it works. - _DIAG_CAP_S: int = fail_after_s + 5 - async def main(): # bug-class-3 breadcrumb: tag each level of the cancel path # so when the run hangs and we capture cancel-level logs, the # *last* breadcrumb that fired names the swallow point. test_log.cancel('test_dynamic_pub_sub: enter main()') try: - with trio.fail_after(fail_after_s): + async with fail_after_w_trace(fail_after_s): test_log.cancel( f'test_dynamic_pub_sub: ' - f'enter `trio.fail_after({fail_after_s})` scope' + f'enter `fail_after_w_trace({fail_after_s})` scope' ) try: async with tractor.open_nursery( @@ -258,15 +259,7 @@ def test_dynamic_pub_sub( 'test_dynamic_pub_sub: leaving `main()`' ) - # outer signal-based guard — survives a shielded-await deadlock - # since `signal.alarm` raises in the main thread regardless of - # trio's scope state. ONLY armed under fork-based backends since - # the bug we're chasing is MTF-specific. - import signal - armed_alarm: bool = bool(is_forking_spawner) - if armed_alarm: - signal.alarm(_DIAG_CAP_S) - try: + def _run_and_match(): try: trio.run(main) pytest.fail(failed_to_raise_report) @@ -287,11 +280,19 @@ def test_dynamic_pub_sub( pytest.fail(failed_to_raise_report) test_log.exception('Got user-cancel exc AS EXPECTED') - finally: - # always disarm so a passing test doesn't get killed - # post-trio.run by a stale alarm. - if armed_alarm: - signal.alarm(0) + + # outer SIGALRM-based guard — survives a shielded-await + # deadlock since `signal.alarm` raises in the main thread + # regardless of trio's scope state, AND captures a full diag + # snapshot to `$XDG_CACHE_HOME/tractor/hung-dumps/` before + # re-raising. ONLY armed under fork-based backends since the + # bug we're chasing is MTF-specific. Cap = `fail_after_s + 5` + # so the trio-native path always wins when it works. + if is_forking_spawner: + with afk_alarm_w_trace(fail_after_s + 5): + _run_and_match() + else: + _run_and_match() @tractor.context