diff --git a/tests/test_context_streams.py b/tests/test_context_streams.py index e1d4de4..48ccbc8 100644 --- a/tests/test_context_streams.py +++ b/tests/test_context_streams.py @@ -5,9 +5,10 @@ Verify the we raise errors when streams are opened prior to sync-opening a ``tractor.Context`` beforehand. ''' +from itertools import count + import pytest import trio -from trio.lowlevel import current_task import tractor @@ -18,7 +19,7 @@ async def really_started( await ctx.started() try: await ctx.started() - except RuntimeError as err: + except RuntimeError: raise @@ -32,9 +33,10 @@ def test_started_called_more_then_once(): ) async with portal.open_context(really_started) as (ctx, sent): - pass + await trio.sleep(1) + # pass - with pytest.raises(tractor.RemoteActorError) as excinfo: + with pytest.raises(tractor.RemoteActorError): trio.run(main) @@ -44,20 +46,50 @@ async def never_open_stream( ctx: tractor.Context, ) -> None: - '''Bidir streaming endpoint which will stream - back any sequence it is sent item-wise. + ''' + Context which never opens a stream and blocks. ''' await ctx.started() await trio.sleep_forever() -def test_no_far_end_stream_opened(): +@tractor.context +async def keep_sending_from_callee( + + ctx: tractor.Context, + +) -> None: + ''' + Send endlessly on the calleee stream. + + ''' + await ctx.started() + async with ctx.open_stream() as stream: + for msg in count(): + await stream.send(msg) + await trio.sleep(0.01) + + +@pytest.mark.parametrize( + 'overrun_by', + [ + (None, 0, never_open_stream), # use default settings + ('caller', 1, never_open_stream), + ('callee', 0, keep_sending_from_callee), + ], + ids='overrun_condition_by={}'.format, +) +def test_one_end_stream_not_opened(overrun_by): ''' This should exemplify the bug from: https://github.com/goodboy/tractor/issues/265 ''' + overrunner, buf_size_increase, entrypoint = overrun_by + from tractor._actor import Actor + buf_size = buf_size_increase + Actor.msg_buffer_size + async def main(): async with tractor.open_nursery() as n: portal = await n.start_actor( @@ -65,23 +97,37 @@ def test_no_far_end_stream_opened(): enable_modules=[__name__], ) - async with ( - portal.open_context( - never_open_stream,) as (ctx, sent), - ctx.open_stream() as stream, - ): + async with portal.open_context(entrypoint) as (ctx, sent): assert sent is None - # XXX: so the question is whether - # this should error if the far end - # has not yet called `ctx.open_stream()`? - # If we decide to do that we need a synchronization - # message which is sent from that call? - await stream.send('yo') + if overrunner in (None, 'caller'): + + async with ctx.open_stream() as stream: + for i in range(buf_size - 1): + await stream.send('yo') + + else: + # callee overruns caller case so we do nothing here + await trio.sleep_forever() # without this we block waiting on the child side await ctx.cancel() await portal.cancel_actor() - trio.run(main) + # 2 overrun cases and the no overrun case (which pushes right up to + # the msg limit) + if overrunner == 'caller': + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + assert excinfo.value.type == tractor._exceptions.StreamOverrun + + elif overrunner == 'callee': + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + assert excinfo.value.type == tractor.RemoteActorError + + else: + trio.run(main)