tractor/tests/test_clustering.py

125 lines
3.4 KiB
Python
Raw Normal View History

2021-10-17 05:33:54 +00:00
import itertools
2022-12-12 00:46:33 +00:00
import pytest
2021-10-17 05:33:54 +00:00
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import gather_contexts
Add (back) a `tractor._testing` sub-pkg Since importing from our top level `conftest.py` is not scaleable or as "future forward thinking" in terms of: - LoC-wise (it's only one file), - prevents "external" (aka non-test) example scripts from importing content easily, - seemingly(?) can't be used via abs-import if using a `[tool.pytest.ini_options]` in a `pyproject.toml` vs. a `pytest.ini`, see: https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml) => Go back to having an internal "testing" pkg like `trio` (kinda) does. Deats: - move generic top level helpers into pkg-mod including the new `expect_ctxc()` (which i needed in the advanced faults testing script. - move `@tractor_test` into `._testing.pytest` sub-mod. - adjust all the helper imports to be a `from tractor._testing import <..>` Rework `test_ipc_channel_break_during_stream()` and backing script: - make test(s) pull `debug_mode` from new fixture (which is now controlled manually from `--tpdb` flag) and drop the previous parametrized input. - update logic in ^ test for "which-side-fails" cases to better match recently updated/stricter cancel/failure semantics in terms of `ClosedResouruceError` vs. `EndOfChannel` expectations. - handle `ExceptionGroup`s with expected embedded errors in test. - better pendantics around whether to expect a user simulated KBI. - for `examples/advanced_faults/ipc_failure_during_stream.py` script: - generalize ipc breakage in new `break_ipc()` with support for diff internal `trio` methods and a #TODO for future disti frameworks - only make one sub-actor task break and the other just stream. - use new `._testing.expect_ctxc()` around ctx block. - add a bit of exception handling with `print()`s around ctxc (unused except if 'msg' break method is set) and eoc cases. - don't break parent side ipc in loop any more then once after first break, checked via flag var. - add a `pre_close: bool` flag to control whether `MsgStreama.aclose()` is called *before* any ipc breakage method. Still TODO: - drop `pytest.ini` and add the alt section to `pyproject.py`. -> currently can't get `--rootdir=` opt to work.. not showing in console header. -> ^ also breaks on 'tests' `enable_modules` imports in subactors during discovery tests?
2024-03-12 19:48:20 +00:00
from tractor._testing import tractor_test
2021-10-17 05:33:54 +00:00
MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises(
tpt_proto: str,
) -> None:
# TODO, the `open_actor_cluster()` teardown hangs
# intermittently on UDS when `gather_contexts(mngrs=())`
# raises `ValueError` mid-setup; likely a race in the
# actor-nursery cleanup vs UDS socket shutdown. Needs
# a deeper look at `._clustering`/`._supervise` teardown
# paths with the UDS transport.
if tpt_proto == 'uds':
pytest.skip(
'actor-cluster teardown hangs intermittently on UDS'
)
Add `cpu_perf_headroom()` for throttle-aware deadlines Mass `trio` deadline-miss failures on byte-identical code turned out to be a firmware/EC power-cap (AMD PPT/STAPM) clamping the all-core sustained clock while every static knob (`governor`, `scaling_max_freq`, EPP, platform-profile) still read "performance" — invisible to the existing `cpu_scaling_factor()` check. See `scripts/cpu-perf-check` + the `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md` notes. Deats, - add `_measure_sustained_headroom()` to `tests/conftest.py`: a one-shot ~0.9s all-core burn (explicit `fork`-ctx `mp` procs) sampling achieved-vs-max freq AFTER the boost window; under a 0.6 gate it returns the full inverse fraction (capped 4x), else 1.0; best-effort 1.0 on non-linux or any error, - add `cpu_perf_headroom()`: `max()` of the static scaling factor and the (session-cached) sustained probe, - inflate deadline budgets by it in `test_dynamic_pub_sub`, both `test_clustering` cases, the `test_multi_nested_subactors_error_through_nurseries` pexpect waits + `test_nested_multierrors`, - `xfail(strict=False)` `test_nested_multierrors` depth=3 under throttle: the deep tree trips tractor's INTERNAL reap deadlines (`soft_kill`/`hard_kill` `terminate_after=1.6`) minting a `Cancelled` inside the runtime — not fixable by test-budget inflation; auto-clears once the box un-throttles. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-06-12 17:37:05 +00:00
# inflate under CPU throttle — incl. the sustained-load
# power-cap invisible to static freq reads. See
# `scripts/cpu-perf-check`.
from .conftest import cpu_perf_headroom
fail_after_s: float = 3 * cpu_perf_headroom()
2022-12-12 00:46:33 +00:00
async def main():
Add `cpu_perf_headroom()` for throttle-aware deadlines Mass `trio` deadline-miss failures on byte-identical code turned out to be a firmware/EC power-cap (AMD PPT/STAPM) clamping the all-core sustained clock while every static knob (`governor`, `scaling_max_freq`, EPP, platform-profile) still read "performance" — invisible to the existing `cpu_scaling_factor()` check. See `scripts/cpu-perf-check` + the `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md` notes. Deats, - add `_measure_sustained_headroom()` to `tests/conftest.py`: a one-shot ~0.9s all-core burn (explicit `fork`-ctx `mp` procs) sampling achieved-vs-max freq AFTER the boost window; under a 0.6 gate it returns the full inverse fraction (capped 4x), else 1.0; best-effort 1.0 on non-linux or any error, - add `cpu_perf_headroom()`: `max()` of the static scaling factor and the (session-cached) sustained probe, - inflate deadline budgets by it in `test_dynamic_pub_sub`, both `test_clustering` cases, the `test_multi_nested_subactors_error_through_nurseries` pexpect waits + `test_nested_multierrors`, - `xfail(strict=False)` `test_nested_multierrors` depth=3 under throttle: the deep tree trips tractor's INTERNAL reap deadlines (`soft_kill`/`hard_kill` `terminate_after=1.6`) minting a `Cancelled` inside the runtime — not fixable by test-budget inflation; auto-clears once the box un-throttles. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-06-12 17:37:05 +00:00
with trio.fail_after(fail_after_s):
2022-12-12 00:46:33 +00:00
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
loglevel='cancel',
debug_mode=False,
2022-12-12 00:46:33 +00:00
) as portals,
gather_contexts(mngrs=()),
2022-12-12 00:46:33 +00:00
):
# should fail before this?
assert portals
# test should fail if we mk it here!
assert 0, 'Should have raised val-err !?'
2022-12-12 00:46:33 +00:00
with pytest.raises(ValueError):
trio.run(main)
2021-10-17 05:33:54 +00:00
@tractor.context
2022-12-12 00:46:33 +00:00
async def worker(
ctx: tractor.Context,
) -> None:
2021-10-17 05:33:54 +00:00
await ctx.started()
2022-12-12 00:46:33 +00:00
async with ctx.open_stream(
allow_overruns=True,
2022-12-12 00:46:33 +00:00
) as stream:
# TODO: this with the below assert causes a hang bug?
# with trio.move_on_after(1):
2021-10-17 05:33:54 +00:00
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
# ?TODO, does this ever cause a hang?
2022-12-12 00:46:33 +00:00
# assert 0
2021-10-17 05:33:54 +00:00
# ?TODO, but needs a fn-scoped tpt_proto fixture..
# @pytest.mark.no_tpt('uds')
2021-10-17 05:33:54 +00:00
@tractor_test
async def test_streaming_to_actor_cluster(
tpt_proto: str,
is_forking_spawner: bool,
):
'''
Open an actor "cluster" using the (experimental) `._clustering`
API and conduct standard inter-task-ctx streaming.
'''
if tpt_proto == 'uds':
pytest.skip(
f'Test currently fails with tpt-proto={tpt_proto!r}\n'
)
2022-12-12 00:46:33 +00:00
delay: float = (
10 if is_forking_spawner
else 6
)
Add `cpu_perf_headroom()` for throttle-aware deadlines Mass `trio` deadline-miss failures on byte-identical code turned out to be a firmware/EC power-cap (AMD PPT/STAPM) clamping the all-core sustained clock while every static knob (`governor`, `scaling_max_freq`, EPP, platform-profile) still read "performance" — invisible to the existing `cpu_scaling_factor()` check. See `scripts/cpu-perf-check` + the `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md` notes. Deats, - add `_measure_sustained_headroom()` to `tests/conftest.py`: a one-shot ~0.9s all-core burn (explicit `fork`-ctx `mp` procs) sampling achieved-vs-max freq AFTER the boost window; under a 0.6 gate it returns the full inverse fraction (capped 4x), else 1.0; best-effort 1.0 on non-linux or any error, - add `cpu_perf_headroom()`: `max()` of the static scaling factor and the (session-cached) sustained probe, - inflate deadline budgets by it in `test_dynamic_pub_sub`, both `test_clustering` cases, the `test_multi_nested_subactors_error_through_nurseries` pexpect waits + `test_nested_multierrors`, - `xfail(strict=False)` `test_nested_multierrors` depth=3 under throttle: the deep tree trips tractor's INTERNAL reap deadlines (`soft_kill`/`hard_kill` `terminate_after=1.6`) minting a `Cancelled` inside the runtime — not fixable by test-budget inflation; auto-clears once the box un-throttles. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-06-12 17:37:05 +00:00
# inflate under CPU throttle — incl. the sustained-load
# power-cap invisible to static freq reads. See
# `scripts/cpu-perf-check`.
from .conftest import cpu_perf_headroom
headroom: float = cpu_perf_headroom()
if headroom != 1.:
delay *= headroom
with trio.fail_after(delay):
async with (
open_actor_cluster(modules=[__name__]) as portals,
2022-12-12 00:46:33 +00:00
gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
2022-12-12 00:46:33 +00:00
gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
2022-12-12 00:46:33 +00:00
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)