Add a multi-task fan out streaming test
This actually catches a lot of bugs to do with stream termination and ``MsgStream.subscribe()`` usage where the underlying stream closes from the producer side. When this passes the broadcaster logic will have to ensure non-lossy fan out semantics and closure tracking.end_of_channel_fixes
parent
3deb1b91e6
commit
79d63585b0
|
@ -1,7 +1,8 @@
|
||||||
"""
|
'''
|
||||||
Advanced streaming patterns using bidirectional streams and contexts.
|
Advanced streaming patterns using bidirectional streams and contexts.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
|
from collections import Counter
|
||||||
import itertools
|
import itertools
|
||||||
from typing import Set, Dict, List
|
from typing import Set, Dict, List
|
||||||
|
|
||||||
|
@ -269,3 +270,98 @@ def test_sigint_both_stream_types():
|
||||||
assert 0, "Didn't receive KBI!?"
|
assert 0, "Didn't receive KBI!?"
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def inf_streamer(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Stream increasing ints until terminated with a 'done' msg.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
async with (
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
):
|
||||||
|
async def bail_on_sentinel():
|
||||||
|
async for msg in stream:
|
||||||
|
if msg == 'done':
|
||||||
|
await stream.aclose()
|
||||||
|
else:
|
||||||
|
print(f'streamer received {msg}')
|
||||||
|
|
||||||
|
# start termination detector
|
||||||
|
n.start_soon(bail_on_sentinel)
|
||||||
|
|
||||||
|
for val in itertools.count():
|
||||||
|
try:
|
||||||
|
await stream.send(val)
|
||||||
|
except trio.ClosedResourceError:
|
||||||
|
# close out the stream gracefully
|
||||||
|
break
|
||||||
|
|
||||||
|
print('terminating streamer')
|
||||||
|
|
||||||
|
|
||||||
|
def test_local_task_fanout_from_stream():
|
||||||
|
'''
|
||||||
|
Single stream with multiple local consumer tasks using the
|
||||||
|
``MsgStream.subscribe()` api.
|
||||||
|
|
||||||
|
Ensure all tasks receive all values after stream completes sending.
|
||||||
|
|
||||||
|
'''
|
||||||
|
consumers = 22
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
counts = Counter()
|
||||||
|
|
||||||
|
async with tractor.open_nursery() as tn:
|
||||||
|
p = await tn.start_actor(
|
||||||
|
'inf_streamer',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
async with (
|
||||||
|
p.open_context(inf_streamer) as (ctx, _),
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
|
||||||
|
async def pull_and_count(name: str):
|
||||||
|
# name = trio.lowlevel.current_task().name
|
||||||
|
async with stream.subscribe() as recver:
|
||||||
|
assert isinstance(
|
||||||
|
recver,
|
||||||
|
tractor.trionics.BroadcastReceiver
|
||||||
|
)
|
||||||
|
async for val in recver:
|
||||||
|
# print(f'{name}: {val}')
|
||||||
|
counts[name] += 1
|
||||||
|
|
||||||
|
print(f'{name} bcaster ended')
|
||||||
|
|
||||||
|
print(f'{name} completed')
|
||||||
|
|
||||||
|
with trio.fail_after(3):
|
||||||
|
async with trio.open_nursery() as nurse:
|
||||||
|
for i in range(consumers):
|
||||||
|
nurse.start_soon(pull_and_count, i)
|
||||||
|
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
print('\nterminating')
|
||||||
|
await stream.send('done')
|
||||||
|
|
||||||
|
print('closed stream connection')
|
||||||
|
|
||||||
|
assert len(counts) == consumers
|
||||||
|
mx = max(counts.values())
|
||||||
|
# make sure each task received all stream values
|
||||||
|
assert all(val == mx for val in counts.values())
|
||||||
|
|
||||||
|
await p.cancel_actor()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
Loading…
Reference in New Issue