diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 74c06ca..b3e2bc1 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -1,7 +1,8 @@ -""" +''' Advanced streaming patterns using bidirectional streams and contexts. -""" +''' +from collections import Counter import itertools from typing import Set, Dict, List @@ -269,3 +270,98 @@ def test_sigint_both_stream_types(): assert 0, "Didn't receive KBI!?" except KeyboardInterrupt: pass + + +@tractor.context +async def inf_streamer( + ctx: tractor.Context, + +) -> None: + ''' + Stream increasing ints until terminated with a 'done' msg. + + ''' + await ctx.started() + + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + async def bail_on_sentinel(): + async for msg in stream: + if msg == 'done': + await stream.aclose() + else: + print(f'streamer received {msg}') + + # start termination detector + n.start_soon(bail_on_sentinel) + + for val in itertools.count(): + try: + await stream.send(val) + except trio.ClosedResourceError: + # close out the stream gracefully + break + + print('terminating streamer') + + +def test_local_task_fanout_from_stream(): + ''' + Single stream with multiple local consumer tasks using the + ``MsgStream.subscribe()` api. + + Ensure all tasks receive all values after stream completes sending. + + ''' + consumers = 22 + + async def main(): + + counts = Counter() + + async with tractor.open_nursery() as tn: + p = await tn.start_actor( + 'inf_streamer', + enable_modules=[__name__], + ) + async with ( + p.open_context(inf_streamer) as (ctx, _), + ctx.open_stream() as stream, + ): + + async def pull_and_count(name: str): + # name = trio.lowlevel.current_task().name + async with stream.subscribe() as recver: + assert isinstance( + recver, + tractor.trionics.BroadcastReceiver + ) + async for val in recver: + # print(f'{name}: {val}') + counts[name] += 1 + + print(f'{name} bcaster ended') + + print(f'{name} completed') + + with trio.fail_after(3): + async with trio.open_nursery() as nurse: + for i in range(consumers): + nurse.start_soon(pull_and_count, i) + + await trio.sleep(0.5) + print('\nterminating') + await stream.send('done') + + print('closed stream connection') + + assert len(counts) == consumers + mx = max(counts.values()) + # make sure each task received all stream values + assert all(val == mx for val in counts.values()) + + await p.cancel_actor() + + trio.run(main)