From be22f1c263303e5f733dea74e38c401124bca773 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Tue, 11 Mar 2025 14:04:55 -0400
Subject: [PATCH] Extend ctx semantics suite for streaming edge cases!

Muchas grax to @guilledk for finding the first issue which kicked of
this further scrutiny of the `tractor.Context` and `MsgStream` semantics
test suite with a strange edge case where,
- if the parent opened and immediately closed a stream while the remote
  child task started and continued (without terminating) to send msgs
  the parent's `open_context().__aexit__()` would **not block** on the
  child to complete!
=> this was seemingly due to a bug discovered inside the
  `.msg._ops.drain_to_final_msg()` stream handling case logic where we
  are NOT checking if `Context._stream` is non-`None`!

As such this,
- extends the `test_caller_closes_ctx_after_callee_opens_stream` (now
  renamed, see below) to include cases for all combinations of the child
  and parent sending before receiving on the stream as well as all
  placements of `Context.cancel()` in the parent before, around and after
  the stream open.
- uses the new `expect_ctxc()` for expecting the taskc (`trio.Task`
  cancelled)` cases.
- also extends the `test_callee_closes_ctx_after_stream_open` (also
  renamed) to include the case where the parent sends a msg before it
  receives.
=> this case has unveiled yet-another-bug where somehow the underlying
  `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the
  child's `Return[None]` msg be consumed and NOT in a place where it is
  correctly set as `Context._result` resulting in the parent hanging
  forever inside `._ops.drain_to_final_msg()`..

Alongside,
- start renaming using the new "remote-task-peer-side" semantics
  throughout the test module: "caller" -> "parent", "callee" -> "child".
---
 tests/test_context_stream_semantics.py | 158 ++++++++++++++++++-------
 1 file changed, 117 insertions(+), 41 deletions(-)

diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py
index ade275aa..29e99b2e 100644
--- a/tests/test_context_stream_semantics.py
+++ b/tests/test_context_stream_semantics.py
@@ -443,7 +443,6 @@ def test_caller_cancels(
 
 @tractor.context
 async def close_ctx_immediately(
-
     ctx: Context,
 
 ) -> None:
@@ -454,10 +453,21 @@ async def close_ctx_immediately(
     async with ctx.open_stream():
         pass
 
+    print('child returning!')
 
+
+@pytest.mark.parametrize(
+    'parent_send_before_receive',
+    [
+        False,
+        True,
+    ],
+    ids=lambda item: f'child_send_before_receive={item}'
+)
 @tractor_test
-async def test_callee_closes_ctx_after_stream_open(
+async def test_child_exits_ctx_after_stream_open(
     debug_mode: bool,
+    parent_send_before_receive: bool,
 ):
     '''
     callee context closes without using stream.
@@ -474,6 +484,15 @@ async def test_callee_closes_ctx_after_stream_open(
     => {'stop': True, 'cid': <str>}
 
     '''
+    timeout: float = (
+        0.5 if (
+            not debug_mode
+            # NOTE, for debugging final
+            # Return-consumed-n-discarded-ishue!
+            # and
+            # not parent_send_before_receive
+        ) else 999
+    )
     async with tractor.open_nursery(
         debug_mode=debug_mode,
     ) as an:
@@ -482,7 +501,7 @@ async def test_callee_closes_ctx_after_stream_open(
             enable_modules=[__name__],
         )
 
-        with trio.fail_after(0.5):
+        with trio.fail_after(timeout):
             async with portal.open_context(
                 close_ctx_immediately,
 
@@ -494,41 +513,56 @@ async def test_callee_closes_ctx_after_stream_open(
 
                 with trio.fail_after(0.4):
                     async with ctx.open_stream() as stream:
+                        if parent_send_before_receive:
+                            print('sending first msg from parent!')
+                            await stream.send('yo')
 
                         # should fall through since ``StopAsyncIteration``
                         # should be raised through translation of
                         # a ``trio.EndOfChannel`` by
                         # ``trio.abc.ReceiveChannel.__anext__()``
-                        async for _ in stream:
+                        msg = 10
+                        async for msg in stream:
                             # trigger failure if we DO NOT
                             # get an EOC!
                             assert 0
                         else:
+                            # never should get anythinig new from
+                            # the underlying stream
+                            assert msg == 10
 
                             # verify stream is now closed
                             try:
                                 with trio.fail_after(0.3):
+                                    print('parent trying to `.receive()` on EoC stream!')
                                     await stream.receive()
+                                    assert 0, 'should have raised eoc!?'
                             except trio.EndOfChannel:
+                                print('parent got EoC as expected!')
                                 pass
+                                # raise
 
                 # TODO: should be just raise the closed resource err
                 # directly here to enforce not allowing a re-open
                 # of a stream to the context (at least until a time of
                 # if/when we decide that's a good idea?)
                 try:
-                    with trio.fail_after(0.5):
+                    with trio.fail_after(timeout):
                         async with ctx.open_stream() as stream:
                             pass
                 except trio.ClosedResourceError:
                     pass
 
+                # if ctx._rx_chan._state.data:
+                #     await tractor.pause()
+
         await portal.cancel_actor()
 
 
 @tractor.context
 async def expect_cancelled(
     ctx: Context,
+    send_before_receive: bool = False,
 
 ) -> None:
     global _state
@@ -538,6 +572,10 @@ async def expect_cancelled(
 
     try:
         async with ctx.open_stream() as stream:
+
+            if send_before_receive:
+                await stream.send('yo')
+
             async for msg in stream:
                 await stream.send(msg)  # echo server
 
@@ -567,23 +605,46 @@ async def expect_cancelled(
         assert 0, "callee wasn't cancelled !?"
 
 
+@pytest.mark.parametrize(
+    'child_send_before_receive',
+    [
+        False,
+        True,
+    ],
+    ids=lambda item: f'child_send_before_receive={item}'
+)
+@pytest.mark.parametrize(
+    'rent_wait_for_msg',
+    [
+        False,
+        True,
+    ],
+    ids=lambda item: f'rent_wait_for_msg={item}'
+)
 @pytest.mark.parametrize(
     'use_ctx_cancel_method',
-    [False, True],
+    [
+        False,
+        'pre_stream',
+        'post_stream_open',
+        'post_stream_close',
+    ],
+    ids=lambda item: f'use_ctx_cancel_method={item}'
 )
 @tractor_test
-async def test_caller_closes_ctx_after_callee_opens_stream(
-    use_ctx_cancel_method: bool,
+async def test_parent_exits_ctx_after_child_enters_stream(
+    use_ctx_cancel_method: bool|str,
     debug_mode: bool,
+    rent_wait_for_msg: bool,
+    child_send_before_receive: bool,
 ):
     '''
-    caller context closes without using/opening stream
+    Parent-side of IPC context closes without sending on `MsgStream`.
 
     '''
     async with tractor.open_nursery(
         debug_mode=debug_mode,
     ) as an:
-
         root: Actor = current_actor()
         portal = await an.start_actor(
             'ctx_cancelled',
@@ -592,41 +653,52 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
 
         async with portal.open_context(
             expect_cancelled,
+            send_before_receive=child_send_before_receive,
         ) as (ctx, sent):
             assert sent is None
 
             await portal.run(assert_state, value=True)
 
             # call `ctx.cancel()` explicitly
-            if use_ctx_cancel_method:
+            if use_ctx_cancel_method == 'pre_stream':
                 await ctx.cancel()
 
                 # NOTE: means the local side `ctx._scope` will
                 # have been cancelled by an ctxc ack and thus
                 # `._scope.cancelled_caught` should be set.
-                try:
+                async with (
+                    expect_ctxc(
+                        # XXX: the cause is US since we call
+                        # `Context.cancel()` just above!
+                        yay=True,
+
+                        # XXX: must be propagated to __aexit__
+                        # and should be silently absorbed there
+                        # since we called `.cancel()` just above ;)
+                        reraise=True,
+                    ) as maybe_ctxc,
+                ):
                     async with ctx.open_stream() as stream:
-                        async for msg in stream:
-                            pass
 
-                except tractor.ContextCancelled as ctxc:
-                    # XXX: the cause is US since we call
-                    # `Context.cancel()` just above!
-                    assert (
-                        ctxc.canceller
-                        ==
-                        current_actor().uid
-                        ==
-                        root.uid
-                    )
+                        if rent_wait_for_msg:
+                            async for msg in stream:
+                                print(f'PARENT rx: {msg!r}\n')
+                                break
 
-                    # XXX: must be propagated to __aexit__
-                    # and should be silently absorbed there
-                    # since we called `.cancel()` just above ;)
-                    raise
+                        if use_ctx_cancel_method == 'post_stream_open':
+                            await ctx.cancel()
 
-                else:
-                    assert 0, "Should have context cancelled?"
+                    if use_ctx_cancel_method == 'post_stream_close':
+                        await ctx.cancel()
+
+                ctxc: tractor.ContextCancelled = maybe_ctxc.value
+                assert (
+                    ctxc.canceller
+                    ==
+                    current_actor().uid
+                    ==
+                    root.uid
+                )
 
                 # channel should still be up
                 assert portal.channel.connected()
@@ -637,13 +709,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
                     value=False,
                 )
 
+            # XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
+            # `.open_context()` before the child has returned,
+            # errored or been cancelled!
             else:
                 try:
-                    with trio.fail_after(0.2):
-                        await ctx.result()
+                    with trio.fail_after(
+                        0.5  # if not debug_mode else 999
+                    ):
+                        res = await ctx.wait_for_result()
+                        assert res is not tractor._context.Unresolved
                         assert 0, "Callee should have blocked!?"
                 except trio.TooSlowError:
-                    # NO-OP -> since already called above
+                    # NO-OP -> since already triggered by
+                    # `trio.fail_after()` above!
                     await ctx.cancel()
 
         # NOTE: local scope should have absorbed the cancellation since
@@ -683,7 +762,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
 
 
 @tractor_test
-async def test_multitask_caller_cancels_from_nonroot_task(
+async def test_multitask_parent_cancels_from_nonroot_task(
     debug_mode: bool,
 ):
     async with tractor.open_nursery(
@@ -735,7 +814,6 @@ async def test_multitask_caller_cancels_from_nonroot_task(
 
 @tractor.context
 async def cancel_self(
-
     ctx: Context,
 
 ) -> None:
@@ -775,7 +853,7 @@ async def cancel_self(
 
 
 @tractor_test
-async def test_callee_cancels_before_started(
+async def test_child_cancels_before_started(
     debug_mode: bool,
 ):
     '''
@@ -826,8 +904,7 @@ async def never_open_stream(
 
 
 @tractor.context
-async def keep_sending_from_callee(
-
+async def keep_sending_from_child(
     ctx:  Context,
     msg_buffer_size: int|None = None,
 
@@ -850,7 +927,7 @@ async def keep_sending_from_callee(
     'overrun_by',
     [
         ('caller', 1, never_open_stream),
-        ('callee', 0, keep_sending_from_callee),
+        ('callee', 0, keep_sending_from_child),
     ],
     ids=[
          ('caller_1buf_never_open_stream'),
@@ -931,8 +1008,7 @@ def test_one_end_stream_not_opened(
 
 @tractor.context
 async def echo_back_sequence(
-
-    ctx:  Context,
+    ctx: Context,
     seq: list[int],
     wait_for_cancel: bool,
     allow_overruns_side: str,