diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index 9c45231..2ff361e 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -6,6 +6,7 @@ a ``tractor.Context`` beforehand. ''' from itertools import count +from typing import Optional import pytest import trio @@ -59,6 +60,7 @@ async def never_open_stream( async def keep_sending_from_callee( ctx: tractor.Context, + msg_buffer_size: Optional[int] = None, ) -> None: ''' @@ -66,8 +68,11 @@ async def keep_sending_from_callee( ''' await ctx.started() - async with ctx.open_stream() as stream: + async with ctx.open_stream( + msg_buffer_size=msg_buffer_size, + ) as stream: for msg in count(): + print(f'callee sending {msg}') await stream.send(msg) await trio.sleep(0.01) @@ -94,7 +99,7 @@ def test_one_end_stream_not_opened(overrun_by): async def main(): async with tractor.open_nursery() as n: portal = await n.start_actor( - 'starts_no_stream', + entrypoint.__name__, enable_modules=[__name__], ) @@ -158,3 +163,81 @@ def test_one_end_stream_not_opened(overrun_by): else: trio.run(main) + +@tractor.context +async def echo_back_sequence( + + ctx: tractor.Context, + seq: list[int], + msg_buffer_size: Optional[int] = None, + +) -> None: + ''' + Send endlessly on the calleee stream. + + ''' + await ctx.started() + async with ctx.open_stream( + msg_buffer_size=msg_buffer_size, + ) as stream: + + count = 0 + while count < 3: + batch = [] + async for msg in stream: + batch.append(msg) + if batch == seq: + break + + for msg in batch: + print(f'callee sending {msg}') + await stream.send(msg) + + count += 1 + + return 'yo' + + +def test_stream_backpressure(): + ''' + Demonstrate small overruns of each task back and forth + on a stream not raising any errors by default. + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'callee_sends_forever', + enable_modules=[__name__], + ) + seq = list(range(3)) + async with portal.open_context( + echo_back_sequence, + seq=seq, + msg_buffer_size=1, + ) as (ctx, sent): + assert sent is None + + async with ctx.open_stream(msg_buffer_size=1) as stream: + count = 0 + while count < 3: + for msg in seq: + print(f'caller sending {msg}') + await stream.send(msg) + await trio.sleep(0.1) + + batch = [] + async for msg in stream: + batch.append(msg) + if batch == seq: + break + + count += 1 + + # here the context should return + assert await ctx.result() == 'yo' + + # cancel the daemon + await portal.cancel_actor() + + trio.run(main)