Move `get_cpu_state()` to `conftest` as shared latency headroom

Factor the CPU-freq-scaling helper out of
`test_legacy_one_way_streaming` into `conftest.py`
alongside a new `cpu_scaling_factor()` convenience fn
that returns a latency-headroom multiplier (>= 1.0).

Apply it to the two other flaky-timeout tests,
- `test_cancel_via_SIGINT_other_task`: 2s -> scaled
- `test_example[we_are_processes.py]`: 16s -> scaled

Deats,
- add `get_cpu_state()` + `cpu_scaling_factor()` to
  `conftest.py` so all test mods can share the logic.
- catch `IndexError` (empty glob) in addition to
  `FileNotFoundError`.
- rename `factor` var -> `headroom` at call sites for
  clarity on intent.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
multicast_revertable_streams
Gud Boi 2026-03-25 16:50:55 -04:00
parent 6215e3b2dd
commit 93d99ed2eb
4 changed files with 97 additions and 71 deletions

View File

@ -9,6 +9,8 @@ import os
import signal
import platform
import time
from pathlib import Path
from typing import Literal
import pytest
import tractor
@ -52,6 +54,76 @@ no_macos = pytest.mark.skipif(
)
def get_cpu_state(
icpu: int = 0,
setting: Literal[
'scaling_governor',
'*_pstate_max_freq',
'scaling_max_freq',
# 'scaling_cur_freq',
] = '*_pstate_max_freq',
) -> tuple[
Path,
str|int,
]|None:
'''
Attempt to read the (first) CPU's setting according
to the set `setting` from under the file-sys,
/sys/devices/system/cpu/cpu0/cpufreq/{setting}
Useful to determine latency headroom for various perf affected
test suites.
'''
try:
# Read governor for core 0 (usually same for all)
setting_path: Path = list(
Path(f'/sys/devices/system/cpu/cpu{icpu}/cpufreq/')
.glob(f'{setting}')
)[0] # <- XXX must be single match!
with open(
setting_path,
'r',
) as f:
return (
setting_path,
f.read().strip(),
)
except (FileNotFoundError, IndexError):
return None
def cpu_scaling_factor() -> float:
'''
Return a latency-headroom multiplier (>= 1.0) reflecting how
much to inflate time-limits when CPU-freq scaling is active on
linux.
When no scaling info is available (non-linux, missing sysfs),
returns 1.0 (i.e. no headroom adjustment needed).
'''
if _non_linux:
return 1.
mx = get_cpu_state()
cur = get_cpu_state(setting='scaling_max_freq')
if mx is None or cur is None:
return 1.
_mx_pth, max_freq = mx
_cur_pth, cur_freq = cur
cpu_scaled: float = int(cur_freq) / int(max_freq)
if cpu_scaled != 1.:
return 1. / (
cpu_scaled * 2 # <- bc likely "dual threaded"
)
return 1.
def pytest_addoption(
parser: pytest.Parser,
):

View File

@ -490,7 +490,7 @@ def test_cancel_via_SIGINT(
"""Ensure that a control-C (SIGINT) signal cancels both the parent and
child processes in trionic fashion
"""
pid = os.getpid()
pid: int = os.getpid()
async def main():
with trio.fail_after(2):
@ -517,6 +517,8 @@ def test_cancel_via_SIGINT_other_task(
started from a seperate ``trio`` child task.
'''
from .conftest import cpu_scaling_factor
pid: int = os.getpid()
timeout: float = (
4 if _non_linux
@ -525,6 +527,11 @@ def test_cancel_via_SIGINT_other_task(
if _friggin_windows: # smh
timeout += 1
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom
async def spawn_and_sleep_forever(
task_status=trio.TASK_STATUS_IGNORED
):

View File

@ -145,12 +145,19 @@ def test_example(
'This test does run just fine "in person" however..'
)
from .conftest import cpu_scaling_factor
timeout: float = (
60
if ci_env and _non_linux
else 16
)
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom
with open(ex_file, 'r') as ex:
code = ex.read()

View File

@ -4,12 +4,8 @@ Streaming via the, now legacy, "async-gen API".
"""
import time
from functools import partial
from pathlib import Path
import platform
from typing import (
Callable,
Literal,
)
from typing import Callable
import trio
import tractor
@ -300,46 +296,6 @@ def time_quad_ex(
return results, diff
def get_cpu_state(
icpu: int = 0,
setting: Literal[
'scaling_governor',
'*_pstate_max_freq',
'scaling_max_freq',
# 'scaling_cur_freq',
] = '*_pstate_max_freq',
) -> tuple[
Path,
str|int,
]|None:
'''
Attempt to read the (first) CPU's setting according
to the set `setting` from under the file-sys,
/sys/devices/system/cpu/cpu0/cpufreq/{setting}
Useful to determine latency limits for various perf affected test
suites.
'''
try:
# Read governor for core 0 (usually same for all)
setting_path: Path = list(
Path(f'/sys/devices/system/cpu/cpu{icpu}/cpufreq/')
.glob(f'{setting}')
)[0] # <- XXX must be single match!
with open(
setting_path,
'r',
) as f:
return (
setting_path,
f.read().strip(),
)
except FileNotFoundError:
return None
def test_a_quadruple_example(
time_quad_ex: tuple[list[int], float],
ci_env: bool,
@ -368,30 +324,14 @@ def test_a_quadruple_example(
# For ex, see the `auto-cpufreq` docs on such settings,
# https://github.com/AdnanHodzic/auto-cpufreq?tab=readme-ov-file#example-config-file-contents
#
# HENCE this below auxiliary compensation logic..
if not non_linux:
mx_pth, max_freq = get_cpu_state()
cur_pth, cur_freq = get_cpu_state(
setting='scaling_max_freq',
)
cpu_scaled: float = (
int(cur_freq) / int(max_freq)
)
if cpu_scaled != 1.:
this_fast = (
this_fast_on_linux / (
cpu_scaled * 2 # <- bc likely "dual threaded"
# ^TODO, calc the thr-per-core val?
)
)
# HENCE this below latency-headroom compensation logic..
from .conftest import cpu_scaling_factor
headroom: float = cpu_scaling_factor()
if headroom != 1.:
this_fast = this_fast_on_linux * headroom
test_log.warning(
f'Increasing time-limit on linux bc CPU scaling,\n'
f'\n'
f'{mx_pth} = {max_freq}\n'
f'{cur_pth} = {cur_freq}\n'
f'\n'
f'cpu_scaled = {cpu_scaled}\n'
f'Adding latency headroom on linux bc CPU scaling,\n'
f'headroom: {headroom}\n'
f'this_fast_on_linux: {this_fast_on_linux} -> {this_fast}\n'
)