Adjust legacy streaming test timeouts for fork+UDS

Forking spawner + UDS transport has different timing
vs `trio_proc` — streaming example completes faster
in some cases, slower in others depending on fork
overhead + sock setup.

Deats,
- add `expect_cancel` param to `cancel_after()`, raise
  `ActorTooSlowError` when cancel scope fires unexpectedly instead of
  silently returning `None`.
- `time_quad_ex` fixture: bump timeout +1 for forking+UDS, explicit
  `ActorTooSlowError` on `None` result instead of bare `assert results`.
- `test_not_fast_enough_quad`: `xfail` for forking+UDS being "too fast"
  (cancel doesn't fire bc streaming finishes before delay).
- add `is_forking_spawner`, `tpt_proto` fixture params throughout.

Also,
- `_testing/pytest.py`: widen `start_method` parametrize and
  `is_forking_spawner` fixture to `scope='session'`.
- `"""` -> `'''` docstring style throughout.
- hoist `_non_linux` to module scope (was redefined locally in two
  places).
- type hints, kwarg-style `partial()` calls.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver_backend
Gud Boi 2026-05-11 21:43:19 -04:00
parent 099104e0af
commit d3cbc92751
2 changed files with 96 additions and 29 deletions

View File

@ -1,7 +1,7 @@
""" '''
Streaming via the, now legacy, "async-gen API". Streaming via the, now legacy, "async-gen API".
""" '''
import time import time
from functools import partial from functools import partial
import platform import platform
@ -12,6 +12,11 @@ import tractor
import pytest import pytest
from tractor._testing import tractor_test from tractor._testing import tractor_test
from tractor._exceptions import ActorTooSlowError
_non_linux: bool = (
_sys := platform.system()
) != 'Linux'
def test_must_define_ctx(): def test_must_define_ctx():
@ -68,8 +73,10 @@ async def stream_from_single_subactor(
start_method, start_method,
stream_func, stream_func,
): ):
"""Verify we can spawn a daemon actor and retrieve streamed data. '''
""" Verify we can spawn a daemon actor and retrieve streamed data.
'''
# only one per host address, spawns an actor if None # only one per host address, spawns an actor if None
async with tractor.open_nursery( async with tractor.open_nursery(
@ -242,14 +249,19 @@ async def a_quadruple_example() -> list[int]:
start = time.time() start = time.time()
# the portal call returns exactly what you'd expect # the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally # as if the remote "aggregate" function was called locally
result_stream = [] result_stream: list[int] = []
async with portal.open_stream_from(aggregate, seed=seed) as stream: async with portal.open_stream_from(
aggregate,
seed=seed,
) as stream:
async for value in stream: async for value in stream:
result_stream.append(value) result_stream.append(value)
print(f"STREAM TIME = {time.time() - start}") print(
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") f"STREAM TIME = {time.time() - start}\n"
f"STREAM + SPAWN TIME = {time.time() - pre_start}\n"
)
assert result_stream == list(range(seed)) assert result_stream == list(range(seed))
await portal.cancel_actor() await portal.cancel_actor()
return result_stream return result_stream
@ -258,13 +270,24 @@ async def a_quadruple_example() -> list[int]:
async def cancel_after( async def cancel_after(
wait: float, wait: float,
reg_addr: tuple, reg_addr: tuple,
expect_cancel: bool,
) -> list[int]: ) -> list[int]:
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
with trio.move_on_after(wait): res: list[int]|None = None
return await a_quadruple_example() with trio.move_on_after(wait) as cs:
res: list[int] = await a_quadruple_example()
return res
if (
not expect_cancel
and
cs.cancelled_caught
):
assert not res
raise ActorTooSlowError
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
@ -272,9 +295,14 @@ def time_quad_ex(
reg_addr: tuple, reg_addr: tuple,
ci_env: bool, ci_env: bool,
spawn_backend: str, spawn_backend: str,
is_forking_spawner: bool,
tpt_proto: str,
): ):
non_linux: bool = (_sys := platform.system()) != 'Linux' if (
if ci_env and non_linux: ci_env
and
_non_linux
):
pytest.skip(f'Test is too flaky on {_sys!r} in CI') pytest.skip(f'Test is too flaky on {_sys!r} in CI')
if spawn_backend == 'mp': if spawn_backend == 'mp':
@ -284,15 +312,36 @@ def time_quad_ex(
''' '''
pytest.skip("Test is too flaky on mp in CI") pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if non_linux else 4 timeout: float = (
start = time.time() 7 if _non_linux
results: list[int] = trio.run( else 4
cancel_after,
timeout,
reg_addr,
) )
if (
is_forking_spawner
and
tpt_proto in [
'uds',
]
):
timeout += 1
start: float = time.time()
results: list[int] = trio.run(partial(
cancel_after,
wait=timeout,
reg_addr=reg_addr,
expect_cancel=True,
))
diff: float = time.time() - start diff: float = time.time() - start
assert results if results is None:
raise ActorTooSlowError(
f'Streaming example took longer then timeout ??\n'
f'timeout={timeout!r}\n'
f'diff={diff!r}\n'
f'results={results!r}\n'
)
return results, diff return results, diff
@ -307,11 +356,10 @@ def test_a_quadruple_example(
given past empirical eval of this suite. given past empirical eval of this suite.
''' '''
non_linux: bool = (_sys := platform.system()) != 'Linux'
this_fast_on_linux: float = 3 this_fast_on_linux: float = 3
this_fast = ( this_fast = (
6 if non_linux 6 if _non_linux
else this_fast_on_linux else this_fast_on_linux
) )
# ^ XXX NOTE, # ^ XXX NOTE,
@ -348,21 +396,26 @@ def test_not_fast_enough_quad(
reg_addr: tuple, reg_addr: tuple,
time_quad_ex: tuple[list[int], float], time_quad_ex: tuple[list[int], float],
cancel_delay: float, cancel_delay: float,
ci_env: bool, ci_env: bool,
spawn_backend: str, spawn_backend: str,
is_forking_spawner: bool,
tpt_proto: str,
test_log: tractor.log.StackLevelAdapter,
): ):
''' '''
Verify we can cancel midway through the quad example and all Verify we can cancel midway through `a_quadruple_example()`, at
actors cancel gracefully. various delays, and all subactors cancel gracefully.
''' '''
results, diff = time_quad_ex results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0) delay = max(diff - cancel_delay, 0)
results = trio.run( results: list[int] = trio.run(partial(
cancel_after, cancel_after,
delay, wait=delay,
reg_addr, reg_addr=reg_addr,
) expect_cancel=True,
))
system: str = platform.system() system: str = platform.system()
if ( if (
system in ('Windows', 'Darwin') system in ('Windows', 'Darwin')
@ -373,6 +426,20 @@ def test_not_fast_enough_quad(
# so just ignore these # so just ignore these
print(f'Woa there {system} caught your breath eh?') print(f'Woa there {system} caught your breath eh?')
else: else:
if (
results
and
is_forking_spawner
and
tpt_proto in [
'uds',
]
):
pytest.xfail(
f'Spawning backend + tpt-proto is too fast XD\n'
f'{spawn_backend!r} + {tpt_proto!r}\n'
)
# should be cancelled mid-streaming # should be cancelled mid-streaming
assert results is None assert results is None

View File

@ -640,7 +640,7 @@ def pytest_generate_tests(
metafunc.parametrize( metafunc.parametrize(
"start_method", "start_method",
[spawn_backend], [spawn_backend],
scope='module', scope='session',
ids=lambda item: f'start_method={spawn_backend}', ids=lambda item: f'start_method={spawn_backend}',
) )
@ -662,7 +662,7 @@ def _is_forking_spawner(
] ]
@pytest.fixture @pytest.fixture(scope='session')
def is_forking_spawner( def is_forking_spawner(
start_method: str, start_method: str,
) -> bool: ) -> bool: