From 03644f59cccb5946215ee5bb893ea7c3b39d5b63 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Apr 2023 15:14:49 -0400 Subject: [PATCH] Augment test cases for callee-returns-result early Turns out stuff was totally broken in these cases because we're either closing the underlying mem chan too early or not handling the "allow_overruns" mode's cancellation correctly.. --- tests/test_context_stream_semantics.py | 122 +++++++++++++++++-------- 1 file changed, 84 insertions(+), 38 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index f7c1e92..2531fd8 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -234,19 +234,25 @@ def test_simple_context( trio.run(main) +@pytest.mark.parametrize( + 'callee_returns_early', + [True, False], + ids=lambda item: f'callee_returns_early={item}' +) @pytest.mark.parametrize( 'cancel_method', ['ctx', 'portal'], ids=lambda item: f'cancel_method={item}' ) @pytest.mark.parametrize( - 'result_before_exit', + 'chk_ctx_result_before_exit', [True, False], - ids=lambda item: f'result_before_exit={item}' + ids=lambda item: f'chk_ctx_result_before_exit={item}' ) def test_caller_cancels( cancel_method: str, - result_before_exit: bool, + chk_ctx_result_before_exit: bool, + callee_returns_early: bool, ): ''' Verify that when the opening side of a context (aka the caller) @@ -254,19 +260,24 @@ def test_caller_cancels( either calling `.result()` or on context exit. ''' - async def check_canceller( ctx: tractor.Context, ) -> None: # should not raise yet return the remote # context cancelled error. - err = await ctx.result() - assert isinstance(err, ContextCancelled) - assert ( - tuple(err.canceller) - == - tractor.current_actor().uid - ) + res = await ctx.result() + + if callee_returns_early: + assert res == 'yo' + + else: + err = res + assert isinstance(err, ContextCancelled) + assert ( + tuple(err.canceller) + == + tractor.current_actor().uid + ) async def main(): async with tractor.open_nursery() as nursery: @@ -274,22 +285,29 @@ def test_caller_cancels( 'simple_context', enable_modules=[__name__], ) - with trio.fail_after(0.5): + timeout = 0.5 if not callee_returns_early else 2 + with trio.fail_after(timeout): async with portal.open_context( simple_setup_teardown, data=10, - block_forever=True, + block_forever=not callee_returns_early, ) as (ctx, sent): + if callee_returns_early: + # ensure we block long enough before sending + # a cancel such that the callee has already + # returned it's result. + await trio.sleep(0.5) + if cancel_method == 'ctx': await ctx.cancel() else: await portal.cancel_actor() - if result_before_exit: + if chk_ctx_result_before_exit: await check_canceller(ctx) - if not result_before_exit: + if not chk_ctx_result_before_exit: await check_canceller(ctx) if cancel_method != 'portal': @@ -703,46 +721,71 @@ async def echo_back_sequence( ctx: tractor.Context, seq: list[int], + wait_for_cancel: bool, msg_buffer_size: int | None = None, ) -> None: ''' - Send endlessly on the calleee stream. + Send endlessly on the calleee stream using a small buffer size + setting on the contex to simulate backlogging that would normally + cause overruns. ''' + # NOTE: ensure that if the caller is expecting to cancel this task + # that we stay echoing much longer then they are so we don't + # return early instead of receive the cancel msg. + total_batches: int = 1000 if wait_for_cancel else 6 + await ctx.started() async with ctx.open_stream( msg_buffer_size=msg_buffer_size, - backpressure=True, + allow_overruns=True, ) as stream: seq = list(seq) # bleh, `msgpack`... - count = 0 - # while count < 10: - while True: + for _ in range(total_batches): batch = [] async for msg in stream: batch.append(msg) if batch == seq: break + print('callee waiting on next') + for msg in batch: print(f'callee sending {msg}') await stream.send(msg) - count += 1 - - print("EXITING CALLEEE") + print( + 'EXITING CALLEEE:\n' + f'{ctx.cancel_called_remote}' + ) return 'yo' -def test_stream_backpressure( +@pytest.mark.parametrize( + 'cancel_ctx', + [True, False], + ids=lambda item: f'cancel_ctx={item}' +) +def test_allow_overruns_stream( + cancel_ctx: bool, loglevel: str, ): ''' Demonstrate small overruns of each task back and forth on a stream not raising any errors by default by setting - the ``backpressure=True``. + the ``allow_overruns=True``. + + The original idea here was to show that if you set the feeder mem + chan to a size smaller then the # of msgs sent you could could not + get a `StreamOverrun` crash plus maybe get all the msgs that were + sent. The problem with the "real backpressure" case is that due to + the current arch it can result in the msg loop being blocked and thus + blocking cancellation - which is like super bad. So instead this test + had to be adjusted to more or less just "not send overrun errors" so + as to handle the case where the sender just moreso cares about not getting + errored out when it send to fast.. ''' async def main(): @@ -756,42 +799,45 @@ def test_stream_backpressure( async with portal.open_context( echo_back_sequence, seq=seq, + wait_for_cancel=cancel_ctx, ) as (ctx, sent): assert sent is None async with ctx.open_stream( msg_buffer_size=1, - backpressure=True, - # allow_overruns=True, + allow_overruns=True, ) as stream: count = 0 while count < 3: for msg in seq: - print(f'caller sending {msg}') + print(f'root tx {msg}') await stream.send(msg) await trio.sleep(0.1) batch = [] - # with trio.move_on_after(1) as cs: async for msg in stream: - print(f'RX {msg}') + print(f'root rx {msg}') batch.append(msg) if batch == seq: break count += 1 - # if cs.cancelled_caught: - # break + if cancel_ctx: + # cancel the remote task + print('sending root side cancel') + await ctx.cancel() - # cancel the remote task - # print('SENDING ROOT SIDE CANCEL') - # await ctx.cancel() - - # here the context should return res = await ctx.result() - assert res == 'yo' + + if cancel_ctx: + assert isinstance(res, ContextCancelled) + assert tuple(res.canceller) == tractor.current_actor().uid + + else: + print(f'RX ROOT SIDE RESULT {res}') + assert res == 'yo' # cancel the daemon await portal.cancel_actor()