381 lines
9.6 KiB
Python
381 lines
9.6 KiB
Python
'''
|
|
Advanced streaming patterns using bidirectional streams and contexts.
|
|
|
|
'''
|
|
from collections import Counter
|
|
import itertools
|
|
import platform
|
|
|
|
import trio
|
|
import tractor
|
|
|
|
|
|
def is_win():
|
|
return platform.system() == 'Windows'
|
|
|
|
|
|
_registry: dict[str, set[tractor.ReceiveMsgStream]] = {
|
|
'even': set(),
|
|
'odd': set(),
|
|
}
|
|
|
|
|
|
async def publisher(
|
|
|
|
seed: int = 0,
|
|
|
|
) -> None:
|
|
|
|
global _registry
|
|
|
|
def is_even(i):
|
|
return i % 2 == 0
|
|
|
|
for val in itertools.count(seed):
|
|
|
|
sub = 'even' if is_even(val) else 'odd'
|
|
|
|
for sub_stream in _registry[sub].copy():
|
|
await sub_stream.send(val)
|
|
|
|
# throttle send rate to ~1kHz
|
|
# making it readable to a human user
|
|
await trio.sleep(1/1000)
|
|
|
|
|
|
@tractor.context
|
|
async def subscribe(
|
|
|
|
ctx: tractor.Context,
|
|
|
|
) -> None:
|
|
|
|
global _registry
|
|
|
|
# syn caller
|
|
await ctx.started(None)
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
# update subs list as consumer requests
|
|
async for new_subs in stream:
|
|
|
|
new_subs = set(new_subs)
|
|
remove = new_subs - _registry.keys()
|
|
|
|
print(f'setting sub to {new_subs} for {ctx.chan.uid}')
|
|
|
|
# remove old subs
|
|
for sub in remove:
|
|
_registry[sub].remove(stream)
|
|
|
|
# add new subs for consumer
|
|
for sub in new_subs:
|
|
_registry[sub].add(stream)
|
|
|
|
|
|
async def consumer(
|
|
|
|
subs: list[str],
|
|
|
|
) -> None:
|
|
|
|
uid = tractor.current_actor().uid
|
|
|
|
async with tractor.wait_for_actor('publisher') as portal:
|
|
async with portal.open_context(subscribe) as (ctx, first):
|
|
async with ctx.open_stream() as stream:
|
|
|
|
# flip between the provided subs dynamically
|
|
if len(subs) > 1:
|
|
|
|
for sub in itertools.cycle(subs):
|
|
print(f'setting dynamic sub to {sub}')
|
|
await stream.send([sub])
|
|
|
|
count = 0
|
|
async for value in stream:
|
|
print(f'{uid} got: {value}')
|
|
if count > 5:
|
|
break
|
|
count += 1
|
|
|
|
else: # static sub
|
|
|
|
await stream.send(subs)
|
|
async for value in stream:
|
|
print(f'{uid} got: {value}')
|
|
|
|
|
|
def test_dynamic_pub_sub():
|
|
|
|
global _registry
|
|
|
|
from multiprocessing import cpu_count
|
|
cpus = cpu_count()
|
|
|
|
async def main():
|
|
async with tractor.open_nursery() as n:
|
|
|
|
# name of this actor will be same as target func
|
|
await n.run_in_actor(publisher)
|
|
|
|
for i, sub in zip(
|
|
range(cpus - 2),
|
|
itertools.cycle(_registry.keys())
|
|
):
|
|
await n.run_in_actor(
|
|
consumer,
|
|
name=f'consumer_{sub}',
|
|
subs=[sub],
|
|
)
|
|
|
|
# make one dynamic subscriber
|
|
await n.run_in_actor(
|
|
consumer,
|
|
name='consumer_dynamic',
|
|
subs=list(_registry.keys()),
|
|
)
|
|
|
|
# block until cancelled by user
|
|
with trio.fail_after(3):
|
|
await trio.sleep_forever()
|
|
|
|
try:
|
|
trio.run(main)
|
|
except trio.TooSlowError:
|
|
pass
|
|
|
|
|
|
@tractor.context
|
|
async def one_task_streams_and_one_handles_reqresp(
|
|
|
|
ctx: tractor.Context,
|
|
|
|
) -> None:
|
|
|
|
await ctx.started()
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
async def pingpong():
|
|
'''Run a simple req/response service.
|
|
|
|
'''
|
|
async for msg in stream:
|
|
print('rpc server ping')
|
|
assert msg == 'ping'
|
|
print('rpc server pong')
|
|
await stream.send('pong')
|
|
|
|
async with trio.open_nursery() as n:
|
|
n.start_soon(pingpong)
|
|
|
|
for _ in itertools.count():
|
|
await stream.send('yo')
|
|
await trio.sleep(0.01)
|
|
|
|
|
|
def test_reqresp_ontopof_streaming():
|
|
'''
|
|
Test a subactor that both streams with one task and
|
|
spawns another which handles a small requests-response
|
|
dialogue over the same bidir-stream.
|
|
|
|
'''
|
|
async def main():
|
|
|
|
# flat to make sure we get at least one pong
|
|
got_pong: bool = False
|
|
timeout: int = 2
|
|
|
|
if is_win(): # smh
|
|
timeout = 4
|
|
|
|
with trio.move_on_after(timeout):
|
|
async with tractor.open_nursery() as n:
|
|
|
|
# name of this actor will be same as target func
|
|
portal = await n.start_actor(
|
|
'dual_tasks',
|
|
enable_modules=[__name__]
|
|
)
|
|
|
|
async with portal.open_context(
|
|
one_task_streams_and_one_handles_reqresp,
|
|
|
|
) as (ctx, first):
|
|
|
|
assert first is None
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
await stream.send('ping')
|
|
|
|
async for msg in stream:
|
|
print(f'client received: {msg}')
|
|
|
|
assert msg in {'pong', 'yo'}
|
|
|
|
if msg == 'pong':
|
|
got_pong = True
|
|
await stream.send('ping')
|
|
print('client sent ping')
|
|
|
|
assert got_pong
|
|
|
|
try:
|
|
trio.run(main)
|
|
except trio.TooSlowError:
|
|
pass
|
|
|
|
|
|
async def async_gen_stream(sequence):
|
|
for i in sequence:
|
|
yield i
|
|
await trio.sleep(0.1)
|
|
|
|
|
|
@tractor.context
|
|
async def echo_ctx_stream(
|
|
ctx: tractor.Context,
|
|
) -> None:
|
|
await ctx.started()
|
|
|
|
async with ctx.open_stream() as stream:
|
|
async for msg in stream:
|
|
await stream.send(msg)
|
|
|
|
|
|
def test_sigint_both_stream_types():
|
|
'''Verify that running a bi-directional and recv only stream
|
|
side-by-side will cancel correctly from SIGINT.
|
|
|
|
'''
|
|
timeout: float = 2
|
|
if is_win(): # smh
|
|
timeout += 1
|
|
|
|
async def main():
|
|
with trio.fail_after(timeout):
|
|
async with tractor.open_nursery() as n:
|
|
# name of this actor will be same as target func
|
|
portal = await n.start_actor(
|
|
'2_way',
|
|
enable_modules=[__name__]
|
|
)
|
|
|
|
async with portal.open_context(echo_ctx_stream) as (ctx, _):
|
|
async with ctx.open_stream() as stream:
|
|
async with portal.open_stream_from(
|
|
async_gen_stream,
|
|
sequence=list(range(1)),
|
|
) as gen_stream:
|
|
|
|
msg = await gen_stream.receive()
|
|
await stream.send(msg)
|
|
resp = await stream.receive()
|
|
assert resp == msg
|
|
raise KeyboardInterrupt
|
|
|
|
try:
|
|
trio.run(main)
|
|
assert 0, "Didn't receive KBI!?"
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
@tractor.context
|
|
async def inf_streamer(
|
|
ctx: tractor.Context,
|
|
|
|
) -> None:
|
|
'''
|
|
Stream increasing ints until terminated with a 'done' msg.
|
|
|
|
'''
|
|
await ctx.started()
|
|
|
|
async with (
|
|
ctx.open_stream() as stream,
|
|
trio.open_nursery() as n,
|
|
):
|
|
async def bail_on_sentinel():
|
|
async for msg in stream:
|
|
if msg == 'done':
|
|
await stream.aclose()
|
|
else:
|
|
print(f'streamer received {msg}')
|
|
|
|
# start termination detector
|
|
n.start_soon(bail_on_sentinel)
|
|
|
|
for val in itertools.count():
|
|
try:
|
|
await stream.send(val)
|
|
except trio.ClosedResourceError:
|
|
# close out the stream gracefully
|
|
break
|
|
|
|
print('terminating streamer')
|
|
|
|
|
|
def test_local_task_fanout_from_stream():
|
|
'''
|
|
Single stream with multiple local consumer tasks using the
|
|
``MsgStream.subscribe()` api.
|
|
|
|
Ensure all tasks receive all values after stream completes sending.
|
|
|
|
'''
|
|
consumers = 22
|
|
|
|
async def main():
|
|
|
|
counts = Counter()
|
|
|
|
async with tractor.open_nursery() as tn:
|
|
p = await tn.start_actor(
|
|
'inf_streamer',
|
|
enable_modules=[__name__],
|
|
)
|
|
async with (
|
|
p.open_context(inf_streamer) as (ctx, _),
|
|
ctx.open_stream() as stream,
|
|
):
|
|
|
|
async def pull_and_count(name: str):
|
|
# name = trio.lowlevel.current_task().name
|
|
async with stream.subscribe() as recver:
|
|
assert isinstance(
|
|
recver,
|
|
tractor.trionics.BroadcastReceiver
|
|
)
|
|
async for val in recver:
|
|
# print(f'{name}: {val}')
|
|
counts[name] += 1
|
|
|
|
print(f'{name} bcaster ended')
|
|
|
|
print(f'{name} completed')
|
|
|
|
with trio.fail_after(3):
|
|
async with trio.open_nursery() as nurse:
|
|
for i in range(consumers):
|
|
nurse.start_soon(pull_and_count, i)
|
|
|
|
await trio.sleep(0.5)
|
|
print('\nterminating')
|
|
await stream.send('done')
|
|
|
|
print('closed stream connection')
|
|
|
|
assert len(counts) == consumers
|
|
mx = max(counts.values())
|
|
# make sure each task received all stream values
|
|
assert all(val == mx for val in counts.values())
|
|
|
|
await p.cancel_actor()
|
|
|
|
trio.run(main)
|