forked from goodboy/tractor
1
0
Fork 0
tractor/tests/test_local_task_broadcast.py

246 lines
7.2 KiB
Python
Raw Normal View History

"""
Broadcast channels for fan-out to local tasks.
"""
from functools import partial
from itertools import cycle
import time
import trio
from trio.lowlevel import current_task
import tractor
from tractor._broadcast import broadcast_receiver, Lagged
@tractor.context
async def echo_sequences(
2021-08-19 17:55:16 +00:00
ctx: tractor.Context,
2021-08-19 17:55:16 +00:00
) -> None:
'''Bidir streaming endpoint which will stream
back any sequence it is sent item-wise.
'''
await ctx.started()
async with ctx.open_stream() as stream:
async for sequence in stream:
seq = list(sequence)
for value in seq:
print(f'sending {value}')
await stream.send(value)
async def ensure_sequence(
stream: tractor.ReceiveMsgStream,
sequence: list,
) -> None:
name = current_task().name
async with stream.subscribe() as bcaster:
assert not isinstance(bcaster, type(stream))
async for value in bcaster:
print(f'{name} rx: {value}')
assert value == sequence[0]
sequence.remove(value)
if not sequence:
# fully consumed
break
def test_stream_fan_out_to_local_subscriptions(
arb_addr,
start_method,
):
sequence = list(range(1000))
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
start_method=start_method,
) as tn:
portal = await tn.start_actor(
'sequence_echoer',
enable_modules=[__name__],
)
async with portal.open_context(
echo_sequences,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
async with trio.open_nursery() as n:
for i in range(10):
n.start_soon(
ensure_sequence,
stream,
sequence.copy(),
name=f'consumer_{i}',
)
await stream.send(tuple(sequence))
async for value in stream:
print(f'source stream rx: {value}')
assert value == sequence[0]
sequence.remove(value)
if not sequence:
# fully consumed
break
await portal.cancel_actor()
trio.run(main)
2021-08-20 17:04:51 +00:00
def test_subscribe_errors_after_close():
async def main():
size = 1
tx, rx = trio.open_memory_channel(size)
async with broadcast_receiver(rx, size) as brx:
pass
try:
# open and close
async with brx.subscribe():
pass
except trio.ClosedResourceError:
assert brx.key not in brx._state.subs
else:
assert 0
trio.run(main)
def test_ensure_slow_consumers_lag_out(
arb_addr,
start_method,
):
'''This is a pure local task test; no tractor
machinery is really required.
'''
async def main():
2021-08-19 17:55:16 +00:00
# make sure it all works within the runtime
async with tractor.open_root_actor():
num_laggers = 4
laggers: dict[str, int] = {}
retries = 3
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
async def sub_and_print(
delay: float,
) -> None:
task = current_task()
start = time.time()
async with brx.subscribe() as lbrx:
while True:
print(f'{task.name}: starting consume loop')
try:
async for value in lbrx:
print(f'{task.name}: {value}')
await trio.sleep(delay)
if task.name == 'sub_1':
# the non-lagger got
# a ``trio.EndOfChannel``
# because the ``tx`` below was closed
assert len(lbrx._state.subs) == 1
await lbrx.aclose()
assert len(lbrx._state.subs) == 0
except trio.ClosedResourceError:
# only the fast sub will try to re-enter
# iteration on the now closed bcaster
assert task.name == 'sub_1'
return
except Lagged:
lag_time = time.time() - start
lags = laggers[task.name]
print(
2021-08-20 17:04:51 +00:00
f'restarting slow task {task.name} '
f'that bailed out on {lags}:{value} '
f'after {lag_time:.3f}')
if lags <= retries:
laggers[task.name] += 1
continue
else:
print(
f'{task.name} was too slow and terminated '
f'on {lags}:{value}')
return
async with trio.open_nursery() as nursery:
for i in range(1, num_laggers):
task_name = f'sub_{i}'
laggers[task_name] = 0
nursery.start_soon(
partial(
sub_and_print,
delay=i*0.001,
),
name=task_name,
)
# allow subs to sched
await trio.sleep(0.1)
async with tx:
for i in cycle(range(size)):
await tx.send(i)
if len(brx._state.subs) == 2:
# only one, the non lagger, sub is left
break
# the non-lagger
assert laggers.pop('sub_1') == 0
for n, v in laggers.items():
assert v == 4
assert tx._closed
assert not tx._state.open_send_channels
# check that "first" bcaster that we created
# above, never wass iterated and is thus overrun
try:
await brx.receive()
except Lagged:
# expect tokio style index truncation
2021-08-20 17:04:51 +00:00
seq = brx._state.subs[brx.key]
assert seq == len(brx._state.queue) - 1
# all backpressured entries in the underlying
# channel should have been copied into the caster
# queue trailing-window
async for i in rx:
print(f'bped: {i}')
assert i in brx._state.queue
# should be noop
await brx.aclose()
trio.run(main)