From 10db117864e57e5030b92051972cc4bdea68addf Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 13 May 2026 11:33:49 -0400 Subject: [PATCH] Add signal-alarm guard to `test_dynamic_pub_sub` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Outer `signal.alarm` cap that fires even when trio's `fail_after` is blocked by a shielded-await deadlock (the bug-class-3 hang under MTF backends). Only armed for fork-based spawners where the bug lives. Deats, - `_DIAG_CAP_S = fail_after_s + 5` — slightly larger than the trio-native guard so it always loses when the in-band path works. - `test_log.cancel()` breadcrumbs at each cancel-scope boundary so the last-fired breadcrumb names the swallow point on hang. - try/finally wrapping around each scope level for deterministic breadcrumb emission. - add `is_forking_spawner`, `set_fork_aware_capture` fixture params. - rework `fail_after_s`: 4s for fork, 12s for trio (was 30/12). Also, - `test_sigint_both_stream_types`: `assert 0` -> `pytest.fail()`, add TODO re `pytest.raises()`. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/test_advanced_streaming.py | 189 +++++++++++++++++++++---------- 1 file changed, 127 insertions(+), 62 deletions(-) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 645d2759..23cd886d 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -147,6 +147,9 @@ def test_dynamic_pub_sub( test_log: tractor.log.StackLevelAdapter, reap_subactors_per_test: int, expect_cancel_exc: Type[BaseException], + + is_forking_spawner: bool, + set_fork_aware_capture, ): failed_to_raise_report: str = ( f'Never got a {expect_cancel_exc!r} ??' @@ -157,79 +160,138 @@ def test_dynamic_pub_sub( from multiprocessing import cpu_count cpus = cpu_count() - # Hard safety cap via trio's own cancellation — see the - # module-level NOTE on why we avoid `pytest-timeout` for - # this test. Picked backend-aware: under `trio` backend - # spawn is cheap (~1s for `cpus` actors) but fork-based - # backends pay a per-spawn cost (forkserver round-trip + - # IPC peer-handshake) that can stack up over `cpus - 1` - # sequential `n.run_in_actor()` calls — especially on UDS - # under cross-pytest contention (#451 / #452). Empirically - # 12s flakes on `main_thread_forkserver`; 30s gives - # plenty of headroom while still failing-loud on a real - # hang. - from tractor.spawn import _spawn as _spawn_mod + # Hard safety cap via trio's own cancellation. NOTE see the + # module-level note on why we avoid `pytest-timeout` for this + # test. Picked backend-aware: under `trio` backend spawn is + # cheap (~1s for `cpus` actors) but fork-based backends pay + # a per-spawn cost (forkserver round-trip + IPC peer-handshake) + # that can stack up over `cpus - 1` sequential `n.run_in_actor()` + # calls — especially on UDS under cross-pytest contention + # (#451 / #452). Empirically a flat 15s flakes on + # `main_thread_forkserver` for many-cpu hosts (a single bad + # spawn-stack puts total run-time at ~15.5s, just over); + # 30s gives plenty of headroom while still failing-loud on + # a real hang. + # + # XXX caveat: this is an *inner* `trio.fail_after` — its + # `Cancelled` cannot reach a task parked in a shielded `await` + # (e.g. inside actor-nursery teardown). When the in-band cancel + # path is itself buggy (the bug-class-3 `raise KBI` swallow we're + # currently chasing) this guard does NOT fire and the test sits + # forever until external SIGINT. The `_DIAG_CAP_S` outer guard + # below is the AFK-safety counterpart. fail_after_s: int = ( - 30 - if _spawn_mod._spawn_method == 'main_thread_forkserver' + 4 + if is_forking_spawner else 12 ) + # outer guard: when the inner fail_after fails to fire because of + # a shielded-await deadlock, this cap *aborts the trio run via + # signal.alarm → KBI* so AFK runs don't sit for >20min on the + # bug-class-3 hang. Slightly larger than `fail_after_s` so the + # trio-native path always wins when it works. + _DIAG_CAP_S: int = fail_after_s + 5 + async def main(): - with trio.fail_after(fail_after_s): - async with tractor.open_nursery( - registry_addrs=[reg_addr], - debug_mode=debug_mode, - ) as n: + # bug-class-3 breadcrumb: tag each level of the cancel path + # so when the run hangs and we capture cancel-level logs, the + # *last* breadcrumb that fired names the swallow point. + test_log.cancel('test_dynamic_pub_sub: enter main()') + try: + with trio.fail_after(fail_after_s): + test_log.cancel( + f'test_dynamic_pub_sub: ' + f'enter `trio.fail_after({fail_after_s})` scope' + ) + try: + async with tractor.open_nursery( + registry_addrs=[reg_addr], + debug_mode=debug_mode, + ) as n: + test_log.cancel( + 'test_dynamic_pub_sub: ' + 'actor nursery opened' + ) - # name of this actor will be same as target func - await n.run_in_actor(publisher) + # name of this actor will be same as target func + await n.run_in_actor(publisher) - for i, sub in zip( - range(cpus - 2), - itertools.cycle(_registry.keys()) - ): - await n.run_in_actor( - consumer, - name=f'consumer_{sub}', - subs=[sub], + for i, sub in zip( + range(cpus - 2), + itertools.cycle(_registry.keys()) + ): + await n.run_in_actor( + consumer, + name=f'consumer_{sub}', + subs=[sub], + ) + + # make one dynamic subscriber + await n.run_in_actor( + consumer, + name='consumer_dynamic', + subs=list(_registry.keys()), + ) + + # block until "cancelled by user" + await trio.sleep(3) + test_log.warning( + f'Raising user cancel exc: ' + f'{expect_cancel_exc!r}' + ) + test_log.cancel( + f'test_dynamic_pub_sub: ' + f'ABOUT TO RAISE {expect_cancel_exc!r}' + ) + raise expect_cancel_exc('simulate user cancel!') + finally: + test_log.cancel( + 'test_dynamic_pub_sub: ' + 'actor nursery `__aexit__` returned' ) + test_log.cancel( + 'test_dynamic_pub_sub: `fail_after` scope exited' + ) + finally: + test_log.cancel( + 'test_dynamic_pub_sub: leaving `main()`' + ) - # make one dynamic subscriber - await n.run_in_actor( - consumer, - name='consumer_dynamic', - subs=list(_registry.keys()), - ) - - # block until "cancelled by user" - await trio.sleep(3) - test_log.warning( - f'Raising user cancel exc: ' - f'{expect_cancel_exc!r}' - ) - raise expect_cancel_exc('simulate user cancel!') - + # outer signal-based guard — survives a shielded-await deadlock + # since `signal.alarm` raises in the main thread regardless of + # trio's scope state. ONLY armed under fork-based backends since + # the bug we're chasing is MTF-specific. + import signal + armed_alarm: bool = bool(is_forking_spawner) + if armed_alarm: + signal.alarm(_DIAG_CAP_S) try: - trio.run(main) - pytest.fail(failed_to_raise_report) - except expect_cancel_exc: - # parent-side raised the user-cancel exc directly and - # it propagated unwrapped; clean path. - test_log.exception('Got user-cancel exc AS EXPECTED') - except BaseExceptionGroup as err: - # under fork-based backends the user-raised cancel - # can race with subactor-side stream teardown - # (`trio.EndOfChannel` from a publisher's `send()` - # whose remote half got cut). The expected exc may - # then be nested deeper in the group rather than at - # the top level. `BaseExceptionGroup.split()` walks - # the exc tree recursively (Python 3.11+). - matched, _ = err.split(expect_cancel_exc) - if matched is None: + try: + trio.run(main) pytest.fail(failed_to_raise_report) + except expect_cancel_exc: + # parent-side raised the user-cancel exc directly and + # it propagated unwrapped; clean path. + test_log.exception('Got user-cancel exc AS EXPECTED') + except BaseExceptionGroup as err: + # under fork-based backends the user-raised cancel + # can race with subactor-side stream teardown + # (`trio.EndOfChannel` from a publisher's `send()` + # whose remote half got cut). The expected exc may + # then be nested deeper in the group rather than at + # the top level. `BaseExceptionGroup.split()` walks + # the exc tree recursively (Python 3.11+). + matched, _ = err.split(expect_cancel_exc) + if matched is None: + pytest.fail(failed_to_raise_report) - test_log.exception('Got user-cancel exc AS EXPECTED') + test_log.exception('Got user-cancel exc AS EXPECTED') + finally: + # always disarm so a passing test doesn't get killed + # post-trio.run by a stale alarm. + if armed_alarm: + signal.alarm(0) @tractor.context @@ -361,9 +423,12 @@ def test_sigint_both_stream_types(): resp = await stream.receive() assert resp == msg raise KeyboardInterrupt + + # TODO, use pytest.raises() here instead? + # (why weren't we originally?) try: trio.run(main) - assert 0, "Didn't receive KBI!?" + pytest.fail("Didn't receive KBI!?") except KeyboardInterrupt: pass