From b13cd4f16b5463d47c54249ebf7b60643883d5e6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Mar 2025 14:04:55 -0400 Subject: [PATCH] Extend ctx semantics suite for streaming edge cases! Muchas grax to @guilledk for finding the first issue which kicked of this further scrutiny of the `tractor.Context` and `MsgStream` semantics test suite with a strange edge case where, - if the parent opened and immediately closed a stream while the remote child task started and continued (without terminating) to send msgs the parent's `open_context().__aexit__()` would **not block** on the child to complete! => this was seemingly due to a bug discovered inside the `.msg._ops.drain_to_final_msg()` stream handling case logic where we are NOT checking if `Context._stream` is non-`None`! As such this, - extends the `test_caller_closes_ctx_after_callee_opens_stream` (now renamed, see below) to include cases for all combinations of the child and parent sending before receiving on the stream as well as all placements of `Context.cancel()` in the parent before, around and after the stream open. - uses the new `expect_ctxc()` for expecting the taskc (`trio.Task` cancelled)` cases. - also extends the `test_callee_closes_ctx_after_stream_open` (also renamed) to include the case where the parent sends a msg before it receives. => this case has unveiled yet-another-bug where somehow the underlying `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the child's `Return[None]` msg be consumed and NOT in a place where it is correctly set as `Context._result` resulting in the parent hanging forever inside `._ops.drain_to_final_msg()`.. Alongside, - start renaming using the new "remote-task-peer-side" semantics throughout the test module: "caller" -> "parent", "callee" -> "child". --- tests/test_context_stream_semantics.py | 158 ++++++++++++++++++------- 1 file changed, 117 insertions(+), 41 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index ade275aa..29e99b2e 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -443,7 +443,6 @@ def test_caller_cancels( @tractor.context async def close_ctx_immediately( - ctx: Context, ) -> None: @@ -454,10 +453,21 @@ async def close_ctx_immediately( async with ctx.open_stream(): pass + print('child returning!') + +@pytest.mark.parametrize( + 'parent_send_before_receive', + [ + False, + True, + ], + ids=lambda item: f'child_send_before_receive={item}' +) @tractor_test -async def test_callee_closes_ctx_after_stream_open( +async def test_child_exits_ctx_after_stream_open( debug_mode: bool, + parent_send_before_receive: bool, ): ''' callee context closes without using stream. @@ -474,6 +484,15 @@ async def test_callee_closes_ctx_after_stream_open( => {'stop': True, 'cid': } ''' + timeout: float = ( + 0.5 if ( + not debug_mode + # NOTE, for debugging final + # Return-consumed-n-discarded-ishue! + # and + # not parent_send_before_receive + ) else 999 + ) async with tractor.open_nursery( debug_mode=debug_mode, ) as an: @@ -482,7 +501,7 @@ async def test_callee_closes_ctx_after_stream_open( enable_modules=[__name__], ) - with trio.fail_after(0.5): + with trio.fail_after(timeout): async with portal.open_context( close_ctx_immediately, @@ -494,41 +513,56 @@ async def test_callee_closes_ctx_after_stream_open( with trio.fail_after(0.4): async with ctx.open_stream() as stream: + if parent_send_before_receive: + print('sending first msg from parent!') + await stream.send('yo') # should fall through since ``StopAsyncIteration`` # should be raised through translation of # a ``trio.EndOfChannel`` by # ``trio.abc.ReceiveChannel.__anext__()`` - async for _ in stream: + msg = 10 + async for msg in stream: # trigger failure if we DO NOT # get an EOC! assert 0 else: + # never should get anythinig new from + # the underlying stream + assert msg == 10 # verify stream is now closed try: with trio.fail_after(0.3): + print('parent trying to `.receive()` on EoC stream!') await stream.receive() + assert 0, 'should have raised eoc!?' except trio.EndOfChannel: + print('parent got EoC as expected!') pass + # raise # TODO: should be just raise the closed resource err # directly here to enforce not allowing a re-open # of a stream to the context (at least until a time of # if/when we decide that's a good idea?) try: - with trio.fail_after(0.5): + with trio.fail_after(timeout): async with ctx.open_stream() as stream: pass except trio.ClosedResourceError: pass + # if ctx._rx_chan._state.data: + # await tractor.pause() + await portal.cancel_actor() @tractor.context async def expect_cancelled( ctx: Context, + send_before_receive: bool = False, ) -> None: global _state @@ -538,6 +572,10 @@ async def expect_cancelled( try: async with ctx.open_stream() as stream: + + if send_before_receive: + await stream.send('yo') + async for msg in stream: await stream.send(msg) # echo server @@ -567,23 +605,46 @@ async def expect_cancelled( assert 0, "callee wasn't cancelled !?" +@pytest.mark.parametrize( + 'child_send_before_receive', + [ + False, + True, + ], + ids=lambda item: f'child_send_before_receive={item}' +) +@pytest.mark.parametrize( + 'rent_wait_for_msg', + [ + False, + True, + ], + ids=lambda item: f'rent_wait_for_msg={item}' +) @pytest.mark.parametrize( 'use_ctx_cancel_method', - [False, True], + [ + False, + 'pre_stream', + 'post_stream_open', + 'post_stream_close', + ], + ids=lambda item: f'use_ctx_cancel_method={item}' ) @tractor_test -async def test_caller_closes_ctx_after_callee_opens_stream( - use_ctx_cancel_method: bool, +async def test_parent_exits_ctx_after_child_enters_stream( + use_ctx_cancel_method: bool|str, debug_mode: bool, + rent_wait_for_msg: bool, + child_send_before_receive: bool, ): ''' - caller context closes without using/opening stream + Parent-side of IPC context closes without sending on `MsgStream`. ''' async with tractor.open_nursery( debug_mode=debug_mode, ) as an: - root: Actor = current_actor() portal = await an.start_actor( 'ctx_cancelled', @@ -592,41 +653,52 @@ async def test_caller_closes_ctx_after_callee_opens_stream( async with portal.open_context( expect_cancelled, + send_before_receive=child_send_before_receive, ) as (ctx, sent): assert sent is None await portal.run(assert_state, value=True) # call `ctx.cancel()` explicitly - if use_ctx_cancel_method: + if use_ctx_cancel_method == 'pre_stream': await ctx.cancel() # NOTE: means the local side `ctx._scope` will # have been cancelled by an ctxc ack and thus # `._scope.cancelled_caught` should be set. - try: + async with ( + expect_ctxc( + # XXX: the cause is US since we call + # `Context.cancel()` just above! + yay=True, + + # XXX: must be propagated to __aexit__ + # and should be silently absorbed there + # since we called `.cancel()` just above ;) + reraise=True, + ) as maybe_ctxc, + ): async with ctx.open_stream() as stream: - async for msg in stream: - pass - except tractor.ContextCancelled as ctxc: - # XXX: the cause is US since we call - # `Context.cancel()` just above! - assert ( - ctxc.canceller - == - current_actor().uid - == - root.uid - ) + if rent_wait_for_msg: + async for msg in stream: + print(f'PARENT rx: {msg!r}\n') + break - # XXX: must be propagated to __aexit__ - # and should be silently absorbed there - # since we called `.cancel()` just above ;) - raise + if use_ctx_cancel_method == 'post_stream_open': + await ctx.cancel() - else: - assert 0, "Should have context cancelled?" + if use_ctx_cancel_method == 'post_stream_close': + await ctx.cancel() + + ctxc: tractor.ContextCancelled = maybe_ctxc.value + assert ( + ctxc.canceller + == + current_actor().uid + == + root.uid + ) # channel should still be up assert portal.channel.connected() @@ -637,13 +709,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream( value=False, ) + # XXX CHILD-BLOCKS case, we SHOULD NOT exit from the + # `.open_context()` before the child has returned, + # errored or been cancelled! else: try: - with trio.fail_after(0.2): - await ctx.result() + with trio.fail_after( + 0.5 # if not debug_mode else 999 + ): + res = await ctx.wait_for_result() + assert res is not tractor._context.Unresolved assert 0, "Callee should have blocked!?" except trio.TooSlowError: - # NO-OP -> since already called above + # NO-OP -> since already triggered by + # `trio.fail_after()` above! await ctx.cancel() # NOTE: local scope should have absorbed the cancellation since @@ -683,7 +762,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream( @tractor_test -async def test_multitask_caller_cancels_from_nonroot_task( +async def test_multitask_parent_cancels_from_nonroot_task( debug_mode: bool, ): async with tractor.open_nursery( @@ -735,7 +814,6 @@ async def test_multitask_caller_cancels_from_nonroot_task( @tractor.context async def cancel_self( - ctx: Context, ) -> None: @@ -775,7 +853,7 @@ async def cancel_self( @tractor_test -async def test_callee_cancels_before_started( +async def test_child_cancels_before_started( debug_mode: bool, ): ''' @@ -826,8 +904,7 @@ async def never_open_stream( @tractor.context -async def keep_sending_from_callee( - +async def keep_sending_from_child( ctx: Context, msg_buffer_size: int|None = None, @@ -850,7 +927,7 @@ async def keep_sending_from_callee( 'overrun_by', [ ('caller', 1, never_open_stream), - ('callee', 0, keep_sending_from_callee), + ('callee', 0, keep_sending_from_child), ], ids=[ ('caller_1buf_never_open_stream'), @@ -931,8 +1008,7 @@ def test_one_end_stream_not_opened( @tractor.context async def echo_back_sequence( - - ctx: Context, + ctx: Context, seq: list[int], wait_for_cancel: bool, allow_overruns_side: str,