forked from goodboy/tractor
1
0
Fork 0
tractor/tests/test_clustering.py

83 lines
1.9 KiB
Python
Raw Normal View History

2021-10-17 05:33:54 +00:00
import itertools
2022-12-12 00:46:33 +00:00
import pytest
2021-10-17 05:33:54 +00:00
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import gather_contexts
Add (back) a `tractor._testing` sub-pkg Since importing from our top level `conftest.py` is not scaleable or as "future forward thinking" in terms of: - LoC-wise (it's only one file), - prevents "external" (aka non-test) example scripts from importing content easily, - seemingly(?) can't be used via abs-import if using a `[tool.pytest.ini_options]` in a `pyproject.toml` vs. a `pytest.ini`, see: https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml) => Go back to having an internal "testing" pkg like `trio` (kinda) does. Deats: - move generic top level helpers into pkg-mod including the new `expect_ctxc()` (which i needed in the advanced faults testing script. - move `@tractor_test` into `._testing.pytest` sub-mod. - adjust all the helper imports to be a `from tractor._testing import <..>` Rework `test_ipc_channel_break_during_stream()` and backing script: - make test(s) pull `debug_mode` from new fixture (which is now controlled manually from `--tpdb` flag) and drop the previous parametrized input. - update logic in ^ test for "which-side-fails" cases to better match recently updated/stricter cancel/failure semantics in terms of `ClosedResouruceError` vs. `EndOfChannel` expectations. - handle `ExceptionGroup`s with expected embedded errors in test. - better pendantics around whether to expect a user simulated KBI. - for `examples/advanced_faults/ipc_failure_during_stream.py` script: - generalize ipc breakage in new `break_ipc()` with support for diff internal `trio` methods and a #TODO for future disti frameworks - only make one sub-actor task break and the other just stream. - use new `._testing.expect_ctxc()` around ctx block. - add a bit of exception handling with `print()`s around ctxc (unused except if 'msg' break method is set) and eoc cases. - don't break parent side ipc in loop any more then once after first break, checked via flag var. - add a `pre_close: bool` flag to control whether `MsgStreama.aclose()` is called *before* any ipc breakage method. Still TODO: - drop `pytest.ini` and add the alt section to `pyproject.py`. -> currently can't get `--rootdir=` opt to work.. not showing in console header. -> ^ also breaks on 'tests' `enable_modules` imports in subactors during discovery tests?
2024-03-12 19:48:20 +00:00
from tractor._testing import tractor_test
2021-10-17 05:33:54 +00:00
MESSAGE = 'tractoring at full speed'
2022-12-12 00:46:33 +00:00
def test_empty_mngrs_input_raises() -> None:
async def main():
with trio.fail_after(1):
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
loglevel='info',
# debug_mode=True,
) as portals,
gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
):
assert 0
with pytest.raises(ValueError):
trio.run(main)
2021-10-17 05:33:54 +00:00
@tractor.context
2022-12-12 00:46:33 +00:00
async def worker(
ctx: tractor.Context,
) -> None:
2021-10-17 05:33:54 +00:00
await ctx.started()
2022-12-12 00:46:33 +00:00
async with ctx.open_stream(
allow_overruns=True,
2022-12-12 00:46:33 +00:00
) as stream:
# TODO: this with the below assert causes a hang bug?
# with trio.move_on_after(1):
2021-10-17 05:33:54 +00:00
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
2022-12-12 00:46:33 +00:00
# TODO: does this ever cause a hang
# assert 0
2021-10-17 05:33:54 +00:00
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
2022-12-12 00:46:33 +00:00
2021-10-17 05:33:54 +00:00
async with (
open_actor_cluster(modules=[__name__]) as portals,
2022-12-12 00:46:33 +00:00
gather_contexts(
2021-10-17 05:33:54 +00:00
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
2022-12-12 00:46:33 +00:00
gather_contexts(
2021-10-17 05:33:54 +00:00
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
2022-12-12 00:46:33 +00:00
2021-10-17 05:33:54 +00:00
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)