""" Top level of the testing suites! """ from __future__ import annotations import sys import subprocess import os import signal import platform import time from pathlib import Path from typing import Literal import pytest import tractor from tractor._testing import ( examples_dir as examples_dir, tractor_test as tractor_test, expect_ctxc as expect_ctxc, ) pytest_plugins: list[str] = [ 'pytester', # NOTE, now loaded in `pytest-ini` section of `pyproject.toml` # 'tractor._testing.pytest', ] _ci_env: bool = os.environ.get('CI', False) _non_linux: bool = platform.system() != 'Linux' # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives if platform.system() == 'Windows': _KILL_SIGNAL = signal.CTRL_BREAK_EVENT _INT_SIGNAL = signal.CTRL_C_EVENT _INT_RETURN_CODE = 3221225786 else: _KILL_SIGNAL = signal.SIGKILL _INT_SIGNAL = signal.SIGINT _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value no_windows = pytest.mark.skipif( platform.system() == "Windows", reason="Test is unsupported on windows", ) no_macos = pytest.mark.skipif( platform.system() == "Darwin", reason="Test is unsupported on MacOS", ) def get_cpu_state( icpu: int = 0, setting: Literal[ 'scaling_governor', '*_pstate_max_freq', 'scaling_max_freq', # 'scaling_cur_freq', ] = '*_pstate_max_freq', ) -> tuple[ Path, str|int, ]|None: ''' Attempt to read the (first) CPU's setting according to the set `setting` from under the file-sys, /sys/devices/system/cpu/cpu0/cpufreq/{setting} Useful to determine latency headroom for various perf affected test suites. ''' try: # Read governor for core 0 (usually same for all) setting_path: Path = list( Path(f'/sys/devices/system/cpu/cpu{icpu}/cpufreq/') .glob(f'{setting}') )[0] # <- XXX must be single match! with open( setting_path, 'r', ) as f: return ( setting_path, f.read().strip(), ) except (FileNotFoundError, IndexError): return None def cpu_scaling_factor() -> float: ''' Return a latency-headroom multiplier (>= 1.0) reflecting how much to inflate time-limits when CPU-freq scaling is active on linux. When no scaling info is available (non-linux, missing sysfs), returns 1.0 (i.e. no headroom adjustment needed). ''' if _non_linux: return 1. mx = get_cpu_state() cur = get_cpu_state(setting='scaling_max_freq') if mx is None or cur is None: return 1. _mx_pth, max_freq = mx _cur_pth, cur_freq = cur cpu_scaled: float = int(cur_freq) / int(max_freq) if cpu_scaled != 1.: return 1. / ( cpu_scaled * 2 # <- bc likely "dual threaded" ) return 1. # session-cached sustained-load throttle multiplier — measured # once (lazily) on the first `cpu_perf_headroom()` call. `None` # = not-yet-measured. _sustained_headroom: float|None = None def _measure_sustained_headroom( secs: float = 0.9, # a healthy all-core sustained clock holds AT/ABOVE this # fraction of the package single-core max ceiling (boost sags # under full multi-core load even un-throttled, but not far); # at/above it we assume no throttle and return 1.0. throttle_gate: float = 0.6, max_headroom: float = 4., ) -> float: ''' One-shot all-core burn returning a latency multiplier (>= 1.0) that reflects *sustained-load* CPU throttle. Catches the firmware/EC power-cap clamp (AMD PPT/STAPM & friends) that pins achieved `scaling_cur_freq` to a fraction of the ceiling under multi-core load while EVERY static knob (`governor`, `scaling_max_freq`, `EPP`, `platform_profile`) still reads "full performance". That cap is INVISIBLE to `cpu_scaling_factor()` and is the gremlin behind mass `trio` deadline-miss failures on byte-identical code — see `scripts/cpu-perf-check`. Best-effort: returns 1.0 on non-linux / missing sysfs / any error so it can never break a test run. ''' import glob import multiprocessing as mp def _read_mhz(path: str) -> int|None: try: return int(open(path).read()) // 1000 except OSError: return None try: maxs: list[int] = [ v for f in glob.glob( '/sys/devices/system/cpu/cpu[0-9]*/cpufreq/scaling_max_freq' ) if (v := _read_mhz(f)) is not None ] pkg_max: int = max(maxs) if maxs else 0 if not pkg_max: return 1. def _burn(stop: float) -> None: x: int = 1 while time.perf_counter() < stop: x += x * x ^ 0x5 # explicit `fork` ctx so we're immune to whatever global # mp start-method tractor/the suite may have set (`spawn` # would re-exec + re-import 24x — slow and pointless here). ctx = mp.get_context('fork') ncpu: int = os.cpu_count() or 1 stop: float = time.perf_counter() + secs procs = [ ctx.Process(target=_burn, args=(stop,), daemon=True) for _ in range(ncpu) ] for p in procs: p.start() # skip the ~0.4s boost window so we sample the steady # state AFTER any power-cap has engaged. samples: list[int] = [] time.sleep(0.4) while time.perf_counter() < stop - 0.1: curs: list[int] = [ v for f in glob.glob( '/sys/devices/system/cpu/cpu[0-9]*/cpufreq/scaling_cur_freq' ) if (v := _read_mhz(f)) is not None ] if curs: samples.append(sum(curs) // len(curs)) time.sleep(0.15) for p in procs: p.join() if not samples: return 1. frac: float = (sum(samples) // len(samples)) / pkg_max # below the gate we read it as a power-cap throttle. The # spawn/IPC/fork-bound work these budgets guard slows ~1:1 # with the achieved-vs-max freq ratio, so compensate by the # FULL inverse fraction (a boost-discounted factor # under-shoots and still trips the marginal cases). if frac >= throttle_gate: return 1. return min(max_headroom, 1. / frac) except Exception: return 1. def cpu_perf_headroom() -> float: ''' Latency-headroom multiplier (>= 1.0) covering BOTH cpu-perf throttle classes — multiply a test's deadline by it, e.g. `timeout *= cpu_perf_headroom()`: - static cpu-freq scaling — via `cpu_scaling_factor()` (governor/policy lowered the `scaling_max_freq` ceiling). - sustained-load power-cap throttle — via `_measure_sustained_headroom()` (firmware/EC PPT/STAPM clamps achieved freq under load while every static knob reads "performance"; INVISIBLE to the static check). This is the gremlin behind mass `trio` deadline-miss failures on unchanged code — see `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md`. The sustained probe runs ONCE per session (cached); the cost is a ~0.9s all-core burn on first call only. ''' global _sustained_headroom static: float = cpu_scaling_factor() if _non_linux: return static if _sustained_headroom is None: _sustained_headroom = _measure_sustained_headroom() return max(static, _sustained_headroom) # NOTE, the `--ll`/`--tl` CLI flags + the `loglevel`, `test_log` # and `testing_pkg_name` fixtures have been factored into the # `tractor._testing.pytest` plugin (loaded via the `-p` entry in # `pyproject.toml`'s `[tool.pytest.ini_options]`) so downstream # consuming projects (eg. `modden`) inherit them for free. The # plugin's `testing_pkg_name` fixture defaults to `'tractor'`, so # this suite keeps treating `--ll` as the runtime loglevel. @pytest.fixture(scope='session') def ci_env() -> bool: ''' Detect CI environment. ''' return _ci_env def sig_prog( proc: subprocess.Popen, sig: int, canc_timeout: float = 0.2, tries: int = 3, ) -> int: ''' Kill the actor-process with `sig`. Prefer to kill with the provided signal and failing a `canc_timeout`, send a `SIKILL`-like to ensure termination. ''' for i in range(tries): proc.send_signal(sig) if proc.poll() is None: print( f'WARNING, proc still alive after,\n' f'canc_timeout={canc_timeout!r}\n' f'sig={sig!r}\n' f'\n' f'{proc.args!r}\n' ) time.sleep(canc_timeout) else: # TODO: why sometimes does SIGINT not work on teardown? # seems to happen only when trace logging enabled? if proc.poll() is None: print( f'XXX WARNING KILLING PROG WITH SIGINT XXX\n' f'canc_timeout={canc_timeout!r}\n' f'{proc.args!r}\n' ) proc.send_signal(_KILL_SIGNAL) ret: int = proc.wait() assert ret # NOTE, the `daemon` fixture (+ its `_wait_for_daemon_ready` # helper + the post-yield teardown drain logic) has been # moved to `tests/discovery/conftest.py` since 100% of its # consumers are discovery-protocol tests now living under # that subdir. See: # - `tests/discovery/test_multi_program.py` # - `tests/discovery/test_registrar.py` # - `tests/discovery/test_tpt_bind_addrs.py` # @pytest.fixture(autouse=True) # def shared_last_failed(pytestconfig): # val = pytestconfig.cache.get("example/value", None) # breakpoint() # if val is None: # pytestconfig.cache.set("example/value", val) # return val # TODO: a way to let test scripts (like from `examples/`) # guarantee they won't `registry_addrs` collide! # -[ ] maybe use some kinda standard `def main()` arg-spec that # we can introspect from a fixture that is called from the test # body? # -[ ] test and figure out typing for below prototype! Bp # # @pytest.fixture # def set_script_runtime_args( # reg_addr: tuple, # ) -> Callable[[...], None]: # def import_n_partial_in_args_n_triorun( # script: Path, # under examples? # **runtime_args, # ) -> Callable[[], Any]: # a `partial`-ed equiv of `trio.run()` # # NOTE, below is taken from # # `.test_advanced_faults.test_ipc_channel_break_during_stream` # mod: ModuleType = import_path( # examples_dir() / 'advanced_faults' # / 'ipc_failure_during_stream.py', # root=examples_dir(), # consider_namespace_packages=False, # ) # return partial( # trio.run, # partial( # mod.main, # **runtime_args, # ) # ) # return import_n_partial_in_args_n_triorun