#!/usr/bin/env python3
# tractor: distributed structured concurrency.
# Copyright 2018-eternity Tyler Goodlet.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
'''
`cpu-perf-check` — sustained-load CPU throttle detector.

Idle freq snapshots LIE. A laptop can read
`governor=performance`, `EPP=performance`,
`platform_profile=performance`, `scaling_max_freq=<full>`
and momentarily clock a P-core at 5GHz — while a
firmware/EC power cap (AMD PPT/STAPM and friends) clamps
the whole package to ~1.5GHz the instant a sustained
multi-core load lands. That throttle masquerades as a
`trio`-backend test *regression*: a wave of `fail_after` /
`TooSlowError` / `Cancelled(source='deadline')` deadline
misses on spawn-heavy tests, on byte-identical code that
was green yesterday.

The existing `tests/conftest.py:cpu_scaling_factor()` only
reads STATIC `scaling_max_freq` vs `*_pstate_max_freq`, so
it returns `1.0` (no throttle) during exactly this failure
— it can't see the cap. This script complements it by
BURNING every core for a few seconds and sampling the
ACHIEVED `scaling_cur_freq`, which is the only thing that
exposes the clamp.

Exit code: `0` if sustained perf looks restored, `1` if
throttled — so it gates a test run:

    py313/bin/python scripts/cpu-perf-check && pytest tests/ ...

Tunables (env-overridable):
    CPU_PERF_SECS        load duration       (default 4.0)
    CPU_PERF_HEALTHY_FRAC sustained/max floor (default 0.45)

'''
from __future__ import annotations
import glob
import os
import time
import multiprocessing as mp


def _read(path: str) -> str | None:
    try:
        with open(path) as f:
            return f.read().strip()
    except OSError:
        return None


def _cur_freqs_mhz() -> list[int]:
    out: list[int] = []
    for f in glob.glob(
        '/sys/devices/system/cpu/cpu[0-9]*/cpufreq/scaling_cur_freq'
    ):
        if (v := _read(f)):
            out.append(int(v) // 1000)
    return out


def _pkg_max_mhz() -> int:
    '''
    Highest per-core ceiling across the package — the
    P-core max on hybrid parts.

    '''
    mxs: list[int] = []
    for f in glob.glob(
        '/sys/devices/system/cpu/cpu[0-9]*/cpufreq/scaling_max_freq'
    ):
        if (v := _read(f)):
            mxs.append(int(v) // 1000)
    return max(mxs) if mxs else 0


def _burn(stop: float) -> None:
    x: int = 1
    while time.perf_counter() < stop:
        x += x * x ^ 0x5


def main(
    secs: float = float(os.environ.get('CPU_PERF_SECS', 4.0)),
    # sustained aggregate must clear this fraction of the
    # package max-freq ceiling. Throttled (~1.5GHz of 5.1GHz)
    # ~= 0.29; a healthy all-core load easily clears 0.5.
    healthy_frac: float = float(
        os.environ.get('CPU_PERF_HEALTHY_FRAC', 0.45)
    ),
) -> int:
    if not glob.glob('/sys/devices/system/cpu/cpu0/cpufreq'):
        print('no cpufreq sysfs (non-linux?) — skipping, assume OK')
        return 0

    b: str = '/sys/devices/system/cpu/cpu0/cpufreq/'
    pkg_max: int = _pkg_max_mhz()
    print('=== static knobs (ALL can read fine while throttled) ===')
    print(f'  governor          : {_read(b + "scaling_governor")}')
    print(f'  EPP               : {_read(b + "energy_performance_preference")}')
    print(f'  platform_profile  : '
          f'{_read("/sys/firmware/acpi/platform_profile")}')
    print(f'  pkg max freq      : {pkg_max} MHz')

    ncpu: int = os.cpu_count() or 1
    print(f'\n=== sustained {ncpu}-core load ({secs:.0f}s) — the real test ===')
    stop: float = time.perf_counter() + secs
    procs = [
        mp.Process(target=_burn, args=(stop,))
        for _ in range(ncpu)
    ]
    for p in procs:
        p.start()

    # skip the initial ~0.6s ramp, then sample steady-state
    samples: list[int] = []
    time.sleep(0.6)
    while time.perf_counter() < stop - 0.2:
        if (fr := _cur_freqs_mhz()):
            samples.append(sum(fr) // len(fr))
        time.sleep(0.3)
    for p in procs:
        p.join()

    if not (samples and pkg_max):
        print('  could not sample cur freq — assume OK')
        return 0

    sustained: int = sum(samples) // len(samples)
    frac: float = sustained / pkg_max
    print(f'  aggregate cur-freq samples: {samples}')
    print(f'  sustained avg             : {sustained} MHz '
          f'({frac * 100:.0f}% of {pkg_max} MHz max)')

    if frac < healthy_frac:
        print(
            f'\n  ❌ THROTTLED — sustained {sustained}MHz is only '
            f'{frac * 100:.0f}% of max (< {healthy_frac * 100:.0f}%).\n'
            f'     Power cap (PPT/STAPM) still engaged. Fixes:\n'
            f'       - bounce /sys/firmware/acpi/platform_profile\n'
            f'         (balanced -> performance)\n'
            f'       - unplug/replug USB-C to re-negotiate PD\n'
            f'       - ryzenadj to lift STAPM/PPT\n'
            f'       - else reboot\n'
            f'     Do NOT bump test budgets — the box is slow, not the code.'
        )
        return 1

    print(
        f'\n  ✅ PERF OK — sustained {sustained}MHz holds '
        f'{frac * 100:.0f}% of max. Cap looks lifted; safe to run tests.'
    )
    return 0


if __name__ == '__main__':
    raise SystemExit(main())
