forked from goodboy/tractor
1
0
Fork 0

Add laggy parent stream tests

Add a couple more tests to check that a parent and sub-task stream can
be lagged and recovered (depending on who's slower). Factor some of the
test machinery into a new ctx mngr to make it all happen.
live_on_air_from_tokio
Tyler Goodlet 2021-08-31 13:03:20 -04:00
parent 093e7d921c
commit 0d70e3081a
1 changed files with 145 additions and 34 deletions

View File

@ -1,10 +1,13 @@
""" """
Broadcast channels for fan-out to local tasks. Broadcast channels for fan-out to local tasks.
""" """
from contextlib import asynccontextmanager
from functools import partial from functools import partial
from itertools import cycle from itertools import cycle
import time import time
from typing import Optional
import pytest
import trio import trio
from trio.lowlevel import current_task from trio.lowlevel import current_task
import tractor import tractor
@ -32,8 +35,11 @@ async def echo_sequences(
async def ensure_sequence( async def ensure_sequence(
stream: tractor.ReceiveMsgStream, stream: tractor.ReceiveMsgStream,
sequence: list, sequence: list,
delay: Optional[float] = None,
) -> None: ) -> None:
name = current_task().name name = current_task().name
@ -44,11 +50,44 @@ async def ensure_sequence(
assert value == sequence[0] assert value == sequence[0]
sequence.remove(value) sequence.remove(value)
if delay:
await trio.sleep(delay)
if not sequence: if not sequence:
# fully consumed # fully consumed
break break
@asynccontextmanager
async def open_sequence_streamer(
sequence: list[int],
arb_addr: tuple[str, int],
start_method: str,
) -> tractor.MsgStream:
async with tractor.open_nursery(
arbiter_addr=arb_addr,
start_method=start_method,
) as tn:
portal = await tn.start_actor(
'sequence_echoer',
enable_modules=[__name__],
)
async with portal.open_context(
echo_sequences,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
yield stream
await portal.cancel_actor()
def test_stream_fan_out_to_local_subscriptions( def test_stream_fan_out_to_local_subscriptions(
arb_addr, arb_addr,
start_method, start_method,
@ -58,48 +97,120 @@ def test_stream_fan_out_to_local_subscriptions(
async def main(): async def main():
async with tractor.open_nursery( async with open_sequence_streamer(
arbiter_addr=arb_addr, sequence,
start_method=start_method, arb_addr,
) as tn: start_method,
) as stream:
portal = await tn.start_actor( async with trio.open_nursery() as n:
'sequence_echoer', for i in range(10):
enable_modules=[__name__], n.start_soon(
) ensure_sequence,
stream,
sequence.copy(),
name=f'consumer_{i}',
)
async with portal.open_context( await stream.send(tuple(sequence))
echo_sequences,
) as (ctx, first):
assert first is None async for value in stream:
async with ctx.open_stream() as stream: print(f'source stream rx: {value}')
assert value == sequence[0]
sequence.remove(value)
async with trio.open_nursery() as n: if not sequence:
for i in range(10): # fully consumed
n.start_soon( break
ensure_sequence,
stream,
sequence.copy(),
name=f'consumer_{i}',
)
await stream.send(tuple(sequence))
async for value in stream:
print(f'source stream rx: {value}')
assert value == sequence[0]
sequence.remove(value)
if not sequence:
# fully consumed
break
await portal.cancel_actor()
trio.run(main) trio.run(main)
@pytest.mark.parametrize(
'task_delays',
[
(0.01, 0.001),
(0.001, 0.01),
]
)
def test_consumer_and_parent_maybe_lag(
arb_addr,
start_method,
task_delays,
):
async def main():
sequence = list(range(1000))
parent_delay, sub_delay = task_delays
async with open_sequence_streamer(
sequence,
arb_addr,
start_method,
) as stream:
try:
async with trio.open_nursery() as n:
n.start_soon(
ensure_sequence,
stream,
sequence.copy(),
sub_delay,
name='consumer_task',
)
await stream.send(tuple(sequence))
# async for value in stream:
lagged = False
lag_count = 0
while True:
try:
value = await stream.receive()
print(f'source stream rx: {value}')
if lagged:
# re set the sequence starting at our last
# value
sequence = sequence[sequence.index(value) + 1:]
else:
assert value == sequence[0]
sequence.remove(value)
lagged = False
except Lagged:
lagged = True
print(f'source stream lagged after {value}')
lag_count += 1
continue
# lag the parent
await trio.sleep(parent_delay)
if not sequence:
# fully consumed
break
print(f'parent + source stream lagged: {lag_count}')
if parent_delay > sub_delay:
assert lag_count > 0
except Lagged:
# child was lagged
assert parent_delay < sub_delay
trio.run(main)
# TODO:
# def test_first_task_to_recv_is_cancelled():
# ...
def test_subscribe_errors_after_close(): def test_subscribe_errors_after_close():
async def main(): async def main():