From c1089dbd95ac8b27214f2cde04d06a85794c6b07 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 17 Oct 2021 07:33:54 +0200 Subject: [PATCH] Add a clustering test --- tests/test_clustering.py | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/test_clustering.py diff --git a/tests/test_clustering.py b/tests/test_clustering.py new file mode 100644 index 0000000..8f28d85 --- /dev/null +++ b/tests/test_clustering.py @@ -0,0 +1,41 @@ +import itertools + +import trio +import tractor +from tractor import open_actor_cluster +from tractor.trionics import async_enter_all + +from conftest import tractor_test + + +MESSAGE = 'tractoring at full speed' + + +@tractor.context +async def worker(ctx: tractor.Context) -> None: + await ctx.started() + async with ctx.open_stream() as stream: + async for msg in stream: + # do something with msg + print(msg) + assert msg == MESSAGE + + +@tractor_test +async def test_streaming_to_actor_cluster() -> None: + teardown_trigger = trio.Event() + async with ( + open_actor_cluster(modules=[__name__]) as portals, + async_enter_all( + mngrs=[p.open_context(worker) for p in portals.values()], + teardown_trigger=teardown_trigger, + ) as contexts, + async_enter_all( + mngrs=[ctx[0].open_stream() for ctx in contexts], + teardown_trigger=teardown_trigger, + ) as streams, + ): + with trio.move_on_after(1): + for stream in itertools.cycle(streams): + await stream.send(MESSAGE) + teardown_trigger.set() \ No newline at end of file