From c82ca67263a8f2464133eda7558b4a511605cc9e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 13:03:20 -0400 Subject: [PATCH] Add laggy parent stream tests Add a couple more tests to check that a parent and sub-task stream can be lagged and recovered (depending on who's slower). Factor some of the test machinery into a new ctx mngr to make it all happen. --- tests/test_local_task_broadcast.py | 179 +++++++++++++++++++++++------ 1 file changed, 145 insertions(+), 34 deletions(-) diff --git a/tests/test_local_task_broadcast.py b/tests/test_local_task_broadcast.py index fe30e33..c2200b3 100644 --- a/tests/test_local_task_broadcast.py +++ b/tests/test_local_task_broadcast.py @@ -1,10 +1,13 @@ """ Broadcast channels for fan-out to local tasks. """ +from contextlib import asynccontextmanager from functools import partial from itertools import cycle import time +from typing import Optional +import pytest import trio from trio.lowlevel import current_task import tractor @@ -32,8 +35,11 @@ async def echo_sequences( async def ensure_sequence( + stream: tractor.ReceiveMsgStream, sequence: list, + delay: Optional[float] = None, + ) -> None: name = current_task().name @@ -44,11 +50,44 @@ async def ensure_sequence( assert value == sequence[0] sequence.remove(value) + if delay: + await trio.sleep(delay) + if not sequence: # fully consumed break +@asynccontextmanager +async def open_sequence_streamer( + + sequence: list[int], + arb_addr: tuple[str, int], + start_method: str, + +) -> tractor.MsgStream: + + async with tractor.open_nursery( + arbiter_addr=arb_addr, + start_method=start_method, + ) as tn: + + portal = await tn.start_actor( + 'sequence_echoer', + enable_modules=[__name__], + ) + + async with portal.open_context( + echo_sequences, + ) as (ctx, first): + + assert first is None + async with ctx.open_stream() as stream: + yield stream + + await portal.cancel_actor() + + def test_stream_fan_out_to_local_subscriptions( arb_addr, start_method, @@ -58,48 +97,120 @@ def test_stream_fan_out_to_local_subscriptions( async def main(): - async with tractor.open_nursery( - arbiter_addr=arb_addr, - start_method=start_method, - ) as tn: + async with open_sequence_streamer( + sequence, + arb_addr, + start_method, + ) as stream: - portal = await tn.start_actor( - 'sequence_echoer', - enable_modules=[__name__], - ) + async with trio.open_nursery() as n: + for i in range(10): + n.start_soon( + ensure_sequence, + stream, + sequence.copy(), + name=f'consumer_{i}', + ) - async with portal.open_context( - echo_sequences, - ) as (ctx, first): + await stream.send(tuple(sequence)) - assert first is None - async with ctx.open_stream() as stream: + async for value in stream: + print(f'source stream rx: {value}') + assert value == sequence[0] + sequence.remove(value) - async with trio.open_nursery() as n: - for i in range(10): - n.start_soon( - ensure_sequence, - stream, - sequence.copy(), - name=f'consumer_{i}', - ) - - await stream.send(tuple(sequence)) - - async for value in stream: - print(f'source stream rx: {value}') - assert value == sequence[0] - sequence.remove(value) - - if not sequence: - # fully consumed - break - - await portal.cancel_actor() + if not sequence: + # fully consumed + break trio.run(main) +@pytest.mark.parametrize( + 'task_delays', + [ + (0.01, 0.001), + (0.001, 0.01), + ] +) +def test_consumer_and_parent_maybe_lag( + arb_addr, + start_method, + task_delays, +): + + async def main(): + + sequence = list(range(1000)) + parent_delay, sub_delay = task_delays + + async with open_sequence_streamer( + sequence, + arb_addr, + start_method, + ) as stream: + + try: + async with trio.open_nursery() as n: + + n.start_soon( + ensure_sequence, + stream, + sequence.copy(), + sub_delay, + name='consumer_task', + ) + + await stream.send(tuple(sequence)) + + # async for value in stream: + lagged = False + lag_count = 0 + + while True: + try: + value = await stream.receive() + print(f'source stream rx: {value}') + + if lagged: + # re set the sequence starting at our last + # value + sequence = sequence[sequence.index(value) + 1:] + else: + assert value == sequence[0] + sequence.remove(value) + + lagged = False + + except Lagged: + lagged = True + print(f'source stream lagged after {value}') + lag_count += 1 + continue + + # lag the parent + await trio.sleep(parent_delay) + + if not sequence: + # fully consumed + break + print(f'parent + source stream lagged: {lag_count}') + + if parent_delay > sub_delay: + assert lag_count > 0 + + except Lagged: + # child was lagged + assert parent_delay < sub_delay + + trio.run(main) + + +# TODO: +# def test_first_task_to_recv_is_cancelled(): +# ... + + def test_subscribe_errors_after_close(): async def main():