diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 3d8714cf..d4b206f1 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -157,17 +157,26 @@ def test_dynamic_pub_sub( from multiprocessing import cpu_count cpus = cpu_count() + # Hard safety cap via trio's own cancellation — see the + # module-level NOTE on why we avoid `pytest-timeout` for + # this test. Picked backend-aware: under `trio` backend + # spawn is cheap (~1s for `cpus` actors) but fork-based + # backends pay a per-spawn cost (forkserver round-trip + + # IPC peer-handshake) that can stack up over `cpus - 1` + # sequential `n.run_in_actor()` calls — especially on UDS + # under cross-pytest contention (#451 / #452). Empirically + # 12s flakes on `main_thread_forkserver`; 30s gives + # plenty of headroom while still failing-loud on a real + # hang. + from tractor.spawn import _spawn as _spawn_mod + fail_after_s: int = ( + 30 + if _spawn_mod._spawn_method == 'main_thread_forkserver' + else 12 + ) + async def main(): - # Hard safety cap via trio's own cancellation — see - # the module-level NOTE on why we avoid `pytest-timeout` - # for this test. Total expected runtime: ~1s spawn + 3s - # sleep + ~1-2s cancel cascade ≈ 5-6s. 12s gives plenty - # of headroom; if exceeded, trio raises `TooSlowError` - # which the outer `try` block treats as a hang report - # (or, if `expect_cancel_exc is trio.TooSlowError`, as - # the test passing — either way, no global state - # corruption). - with trio.fail_after(12): + with trio.fail_after(fail_after_s): async with tractor.open_nursery( registry_addrs=[reg_addr], debug_mode=debug_mode,