From 11832766536abd557c7eb88ab7064b688120eb6a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Apr 2023 19:46:35 -0400 Subject: [PATCH] Seriously cover all overrun cases This actually caught further runtime bugs so it's gud i tried.. Add overrun-ignore enabled / disabled cases and error catching for all of them. More or less this should cover every possible outcome when it comes to setting `allow_overruns: bool` i hope XD --- tests/test_context_stream_semantics.py | 107 +++++++++++++++++++++---- tractor/_streaming.py | 6 +- 2 files changed, 98 insertions(+), 15 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 2531fd8..615ccd7 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -722,7 +722,9 @@ async def echo_back_sequence( ctx: tractor.Context, seq: list[int], wait_for_cancel: bool, - msg_buffer_size: int | None = None, + allow_overruns_side: str, + be_slow: bool = False, + msg_buffer_size: int = 1, ) -> None: ''' @@ -737,12 +739,22 @@ async def echo_back_sequence( total_batches: int = 1000 if wait_for_cancel else 6 await ctx.started() + # await tractor.breakpoint() async with ctx.open_stream( msg_buffer_size=msg_buffer_size, - allow_overruns=True, + + # literally the point of this test XD + allow_overruns=(allow_overruns_side in {'child', 'both'}), ) as stream: - seq = list(seq) # bleh, `msgpack`... + # ensure mem chan settings are correct + assert ( + ctx._send_chan._state.max_buffer_size + == + msg_buffer_size + ) + + seq = list(seq) # bleh, msgpack sometimes ain't decoded right for _ in range(total_batches): batch = [] async for msg in stream: @@ -750,6 +762,9 @@ async def echo_back_sequence( if batch == seq: break + if be_slow: + await trio.sleep(0.05) + print('callee waiting on next') for msg in batch: @@ -763,13 +778,29 @@ async def echo_back_sequence( return 'yo' +@pytest.mark.parametrize( + # aka the side that will / should raise + # and overrun under normal conditions. + 'allow_overruns_side', + ['parent', 'child', 'none', 'both'], + ids=lambda item: f'allow_overruns_side={item}' +) +@pytest.mark.parametrize( + # aka the side that will / should raise + # and overrun under normal conditions. + 'slow_side', + ['parent', 'child'], + ids=lambda item: f'slow_side={item}' +) @pytest.mark.parametrize( 'cancel_ctx', [True, False], ids=lambda item: f'cancel_ctx={item}' ) -def test_allow_overruns_stream( +def test_maybe_allow_overruns_stream( cancel_ctx: bool, + slow_side: str, + allow_overruns_side: str, loglevel: str, ): ''' @@ -794,26 +825,35 @@ def test_allow_overruns_stream( 'callee_sends_forever', enable_modules=[__name__], loglevel=loglevel, + + # debug_mode=True, ) - seq = list(range(3)) + seq = list(range(10)) async with portal.open_context( echo_back_sequence, seq=seq, wait_for_cancel=cancel_ctx, + be_slow=(slow_side == 'child'), + allow_overruns_side=allow_overruns_side, ) as (ctx, sent): assert sent is None async with ctx.open_stream( - msg_buffer_size=1, - allow_overruns=True, + msg_buffer_size=1 if slow_side == 'parent' else None, + allow_overruns=(allow_overruns_side in {'parent', 'both'}), ) as stream: - count = 0 - while count < 3: + + total_batches: int = 2 + for _ in range(total_batches): for msg in seq: - print(f'root tx {msg}') + # print(f'root tx {msg}') await stream.send(msg) - await trio.sleep(0.1) + if slow_side == 'parent': + # NOTE: we make the parent slightly + # slower, when it is slow, to make sure + # that in the overruns everywhere case + await trio.sleep(0.16) batch = [] async for msg in stream: @@ -822,8 +862,6 @@ def test_allow_overruns_stream( if batch == seq: break - count += 1 - if cancel_ctx: # cancel the remote task print('sending root side cancel') @@ -842,7 +880,48 @@ def test_allow_overruns_stream( # cancel the daemon await portal.cancel_actor() - trio.run(main) + if ( + allow_overruns_side == 'both' + or slow_side == allow_overruns_side + ): + trio.run(main) + + elif ( + slow_side != allow_overruns_side + ): + + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + err = excinfo.value + + if ( + allow_overruns_side == 'none' + ): + # depends on timing is is racy which side will + # overrun first :sadkitty: + + # NOTE: i tried to isolate to a deterministic case here + # based on timeing, but i was kinda wasted, and i don't + # think it's sane to catch them.. + assert err.type in ( + tractor.RemoteActorError, + StreamOverrun, + ) + + elif ( + slow_side == 'child' + ): + assert err.type == StreamOverrun + + elif slow_side == 'parent': + assert err.type == tractor.RemoteActorError + assert 'StreamOverrun' in err.msgdata['tb_str'] + + else: + # if this hits the logic blocks from above are not + # exhaustive.. + pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') @tractor.context diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 450c712..0598443 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -426,6 +426,10 @@ class Context: if remote_uid: return tuple(remote_uid) + @property + def cancelled_caught(self) -> bool: + return self._scope.cancelled_caught + # init and streaming state _started_called: bool = False _started_received: bool = False @@ -743,7 +747,7 @@ class Context: ): return err - raise err from None + raise err # from None async def result(self) -> Any | Exception: '''