forked from goodboy/tractor
1
0
Fork 0
tractor/tests/test_clustering.py

85 lines
1.9 KiB
Python
Raw Permalink Normal View History

2021-10-17 05:33:54 +00:00
import itertools
2022-12-12 00:46:33 +00:00
import pytest
2021-10-17 05:33:54 +00:00
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import gather_contexts
2021-10-17 05:33:54 +00:00
from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
2022-12-12 00:46:33 +00:00
def test_empty_mngrs_input_raises() -> None:
async def main():
with trio.fail_after(1):
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
loglevel='info',
# debug_mode=True,
) as portals,
gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
):
assert 0
with pytest.raises(ValueError):
trio.run(main)
2021-10-17 05:33:54 +00:00
@tractor.context
2022-12-12 00:46:33 +00:00
async def worker(
ctx: tractor.Context,
) -> None:
2021-10-17 05:33:54 +00:00
await ctx.started()
2022-12-12 00:46:33 +00:00
async with ctx.open_stream(
allow_overruns=True,
2022-12-12 00:46:33 +00:00
) as stream:
# TODO: this with the below assert causes a hang bug?
# with trio.move_on_after(1):
2021-10-17 05:33:54 +00:00
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
2022-12-12 00:46:33 +00:00
# TODO: does this ever cause a hang
# assert 0
2021-10-17 05:33:54 +00:00
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
2022-12-12 00:46:33 +00:00
2021-10-17 05:33:54 +00:00
async with (
open_actor_cluster(modules=[__name__]) as portals,
2022-12-12 00:46:33 +00:00
gather_contexts(
2021-10-17 05:33:54 +00:00
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
2022-12-12 00:46:33 +00:00
gather_contexts(
2021-10-17 05:33:54 +00:00
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
2022-12-12 00:46:33 +00:00
2021-10-17 05:33:54 +00:00
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)