From d3cbc92751046924b19932770b7a139dcdcbf8ff Mon Sep 17 00:00:00 2001 From: goodboy Date: Mon, 11 May 2026 21:43:19 -0400 Subject: [PATCH] Adjust legacy streaming test timeouts for fork+UDS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Forking spawner + UDS transport has different timing vs `trio_proc` — streaming example completes faster in some cases, slower in others depending on fork overhead + sock setup. Deats, - add `expect_cancel` param to `cancel_after()`, raise `ActorTooSlowError` when cancel scope fires unexpectedly instead of silently returning `None`. - `time_quad_ex` fixture: bump timeout +1 for forking+UDS, explicit `ActorTooSlowError` on `None` result instead of bare `assert results`. - `test_not_fast_enough_quad`: `xfail` for forking+UDS being "too fast" (cancel doesn't fire bc streaming finishes before delay). - add `is_forking_spawner`, `tpt_proto` fixture params throughout. Also, - `_testing/pytest.py`: widen `start_method` parametrize and `is_forking_spawner` fixture to `scope='session'`. - `"""` -> `'''` docstring style throughout. - hoist `_non_linux` to module scope (was redefined locally in two places). - type hints, kwarg-style `partial()` calls. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/test_legacy_one_way_streaming.py | 121 +++++++++++++++++++------ tractor/_testing/pytest.py | 4 +- 2 files changed, 96 insertions(+), 29 deletions(-) diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index 954328e5..d3787e97 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -1,7 +1,7 @@ -""" +''' Streaming via the, now legacy, "async-gen API". -""" +''' import time from functools import partial import platform @@ -12,6 +12,11 @@ import tractor import pytest from tractor._testing import tractor_test +from tractor._exceptions import ActorTooSlowError + +_non_linux: bool = ( + _sys := platform.system() +) != 'Linux' def test_must_define_ctx(): @@ -68,8 +73,10 @@ async def stream_from_single_subactor( start_method, stream_func, ): - """Verify we can spawn a daemon actor and retrieve streamed data. - """ + ''' + Verify we can spawn a daemon actor and retrieve streamed data. + + ''' # only one per host address, spawns an actor if None async with tractor.open_nursery( @@ -242,14 +249,19 @@ async def a_quadruple_example() -> list[int]: start = time.time() # the portal call returns exactly what you'd expect # as if the remote "aggregate" function was called locally - result_stream = [] + result_stream: list[int] = [] - async with portal.open_stream_from(aggregate, seed=seed) as stream: + async with portal.open_stream_from( + aggregate, + seed=seed, + ) as stream: async for value in stream: result_stream.append(value) - print(f"STREAM TIME = {time.time() - start}") - print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + print( + f"STREAM TIME = {time.time() - start}\n" + f"STREAM + SPAWN TIME = {time.time() - pre_start}\n" + ) assert result_stream == list(range(seed)) await portal.cancel_actor() return result_stream @@ -258,13 +270,24 @@ async def a_quadruple_example() -> list[int]: async def cancel_after( wait: float, reg_addr: tuple, + expect_cancel: bool, ) -> list[int]: async with tractor.open_root_actor( registry_addrs=[reg_addr], ): - with trio.move_on_after(wait): - return await a_quadruple_example() + res: list[int]|None = None + with trio.move_on_after(wait) as cs: + res: list[int] = await a_quadruple_example() + return res + + if ( + not expect_cancel + and + cs.cancelled_caught + ): + assert not res + raise ActorTooSlowError @pytest.fixture(scope='module') @@ -272,9 +295,14 @@ def time_quad_ex( reg_addr: tuple, ci_env: bool, spawn_backend: str, + is_forking_spawner: bool, + tpt_proto: str, ): - non_linux: bool = (_sys := platform.system()) != 'Linux' - if ci_env and non_linux: + if ( + ci_env + and + _non_linux + ): pytest.skip(f'Test is too flaky on {_sys!r} in CI') if spawn_backend == 'mp': @@ -284,15 +312,36 @@ def time_quad_ex( ''' pytest.skip("Test is too flaky on mp in CI") - timeout = 7 if non_linux else 4 - start = time.time() - results: list[int] = trio.run( - cancel_after, - timeout, - reg_addr, + timeout: float = ( + 7 if _non_linux + else 4 ) + + if ( + is_forking_spawner + and + tpt_proto in [ + 'uds', + ] + ): + timeout += 1 + + start: float = time.time() + results: list[int] = trio.run(partial( + cancel_after, + wait=timeout, + reg_addr=reg_addr, + expect_cancel=True, + )) diff: float = time.time() - start - assert results + if results is None: + raise ActorTooSlowError( + f'Streaming example took longer then timeout ??\n' + f'timeout={timeout!r}\n' + f'diff={diff!r}\n' + f'results={results!r}\n' + ) + return results, diff @@ -307,11 +356,10 @@ def test_a_quadruple_example( given past empirical eval of this suite. ''' - non_linux: bool = (_sys := platform.system()) != 'Linux' this_fast_on_linux: float = 3 this_fast = ( - 6 if non_linux + 6 if _non_linux else this_fast_on_linux ) # ^ XXX NOTE, @@ -348,21 +396,26 @@ def test_not_fast_enough_quad( reg_addr: tuple, time_quad_ex: tuple[list[int], float], cancel_delay: float, + ci_env: bool, spawn_backend: str, + is_forking_spawner: bool, + tpt_proto: str, + test_log: tractor.log.StackLevelAdapter, ): ''' - Verify we can cancel midway through the quad example and all - actors cancel gracefully. + Verify we can cancel midway through `a_quadruple_example()`, at + various delays, and all subactors cancel gracefully. ''' results, diff = time_quad_ex delay = max(diff - cancel_delay, 0) - results = trio.run( + results: list[int] = trio.run(partial( cancel_after, - delay, - reg_addr, - ) + wait=delay, + reg_addr=reg_addr, + expect_cancel=True, + )) system: str = platform.system() if ( system in ('Windows', 'Darwin') @@ -373,6 +426,20 @@ def test_not_fast_enough_quad( # so just ignore these print(f'Woa there {system} caught your breath eh?') else: + if ( + results + and + is_forking_spawner + and + tpt_proto in [ + 'uds', + ] + ): + pytest.xfail( + f'Spawning backend + tpt-proto is too fast XD\n' + f'{spawn_backend!r} + {tpt_proto!r}\n' + ) + # should be cancelled mid-streaming assert results is None diff --git a/tractor/_testing/pytest.py b/tractor/_testing/pytest.py index f60144c5..c203a171 100644 --- a/tractor/_testing/pytest.py +++ b/tractor/_testing/pytest.py @@ -640,7 +640,7 @@ def pytest_generate_tests( metafunc.parametrize( "start_method", [spawn_backend], - scope='module', + scope='session', ids=lambda item: f'start_method={spawn_backend}', ) @@ -662,7 +662,7 @@ def _is_forking_spawner( ] -@pytest.fixture +@pytest.fixture(scope='session') def is_forking_spawner( start_method: str, ) -> bool: