2021-10-17 05:33:54 +00:00
|
|
|
import itertools
|
|
|
|
|
|
|
|
import trio
|
|
|
|
import tractor
|
|
|
|
from tractor import open_actor_cluster
|
2021-10-24 17:48:36 +00:00
|
|
|
from tractor.trionics import gather_contexts
|
2021-10-17 05:33:54 +00:00
|
|
|
|
|
|
|
from conftest import tractor_test
|
|
|
|
|
|
|
|
|
|
|
|
MESSAGE = 'tractoring at full speed'
|
|
|
|
|
|
|
|
|
|
|
|
@tractor.context
|
|
|
|
async def worker(ctx: tractor.Context) -> None:
|
|
|
|
await ctx.started()
|
2021-12-06 01:02:55 +00:00
|
|
|
async with ctx.open_stream(backpressure=True) as stream:
|
2021-10-17 05:33:54 +00:00
|
|
|
async for msg in stream:
|
|
|
|
# do something with msg
|
|
|
|
print(msg)
|
|
|
|
assert msg == MESSAGE
|
|
|
|
|
|
|
|
|
|
|
|
@tractor_test
|
|
|
|
async def test_streaming_to_actor_cluster() -> None:
|
|
|
|
async with (
|
|
|
|
open_actor_cluster(modules=[__name__]) as portals,
|
2021-10-24 17:48:36 +00:00
|
|
|
gather_contexts(
|
2021-10-17 05:33:54 +00:00
|
|
|
mngrs=[p.open_context(worker) for p in portals.values()],
|
|
|
|
) as contexts,
|
2021-10-24 17:48:36 +00:00
|
|
|
gather_contexts(
|
2021-10-17 05:33:54 +00:00
|
|
|
mngrs=[ctx[0].open_stream() for ctx in contexts],
|
|
|
|
) as streams,
|
|
|
|
):
|
|
|
|
with trio.move_on_after(1):
|
|
|
|
for stream in itertools.cycle(streams):
|
|
|
|
await stream.send(MESSAGE)
|