From 8e2bcf5098c2fb3cf26f1872f3f13b64ee891c70 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Tue, 11 Mar 2025 14:31:53 -0400
Subject: [PATCH] Fix msg-draining on `parent_never_opened_stream`!

Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case
block we weren't guarding against the `ctx._stream is None` edge case
which should be treated a `continue`-draining (not a `break` or
attr-error!!) situation since the peer task maybe be continuing to send
`Yield` but has not yet sent an outcome msg (one of
`Return/Error/ContextCancelled`) to terminate the loop. Ensure we
explicitly warn about this case as well as `.cancel()` emit on a taskc.

Thanks again to @guille for discovering this!

Also add temporary `.info()`s around rxed `Return` msgs as part of
trying to debug a different bug discovered while updating the
context-semantics test suite (in a prior commit).
---
 tractor/msg/_ops.py | 84 ++++++++++++++++++++++++++++++++++-----------
 1 file changed, 64 insertions(+), 20 deletions(-)

diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py
index 839be532..5f4b9fe8 100644
--- a/tractor/msg/_ops.py
+++ b/tractor/msg/_ops.py
@@ -186,10 +186,16 @@ class PldRx(Struct):
         msg: MsgType = (
             ipc_msg
             or
-
             # sync-rx msg from underlying IPC feeder (mem-)chan
             ipc._rx_chan.receive_nowait()
         )
+        if (
+            type(msg) is Return
+        ):
+            log.info(
+                f'Rxed final result msg\n'
+                f'{msg}\n'
+            )
         return self.decode_pld(
             msg,
             ipc=ipc,
@@ -219,6 +225,13 @@ class PldRx(Struct):
             # async-rx msg from underlying IPC feeder (mem-)chan
             await ipc._rx_chan.receive()
         )
+        if (
+            type(msg) is Return
+        ):
+            log.info(
+                f'Rxed final result msg\n'
+                f'{msg}\n'
+            )
         return self.decode_pld(
             msg=msg,
             ipc=ipc,
@@ -407,8 +420,6 @@ class PldRx(Struct):
             __tracebackhide__: bool = False
             raise
 
-    dec_msg = decode_pld
-
     async def recv_msg_w_pld(
         self,
         ipc: Context|MsgStream,
@@ -422,12 +433,19 @@ class PldRx(Struct):
 
     ) -> tuple[MsgType, PayloadT]:
         '''
-        Retrieve the next avail IPC msg, decode it's payload, and return
-        the pair of refs.
+        Retrieve the next avail IPC msg, decode it's payload, and
+        return the pair of refs.
 
         '''
         __tracebackhide__: bool = hide_tb
         msg: MsgType = await ipc._rx_chan.receive()
+        if (
+            type(msg) is Return
+        ):
+            log.info(
+                f'Rxed final result msg\n'
+                f'{msg}\n'
+            )
 
         if passthrough_non_pld_msgs:
             match msg:
@@ -444,6 +462,10 @@ class PldRx(Struct):
             hide_tb=hide_tb,
             **kwargs,
         )
+        # log.runtime(
+        #     f'Delivering payload msg\n'
+        #     f'{msg}\n'
+        # )
         return msg, pld
 
 
@@ -538,8 +560,8 @@ async def maybe_limit_plds(
 async def drain_to_final_msg(
     ctx: Context,
 
-    hide_tb: bool = True,
     msg_limit: int = 6,
+    hide_tb: bool = True,
 
 ) -> tuple[
     Return|None,
@@ -568,8 +590,8 @@ async def drain_to_final_msg(
     even after ctx closure and the `.open_context()` block exit.
 
     '''
-    __tracebackhide__: bool = hide_tb
     raise_overrun: bool = not ctx._allow_overruns
+    parent_never_opened_stream: bool = ctx._stream is None
 
     # wait for a final context result by collecting (but
     # basically ignoring) any bi-dir-stream msgs still in transit
@@ -578,7 +600,8 @@ async def drain_to_final_msg(
     result_msg: Return|Error|None = None
     while not (
         ctx.maybe_error
-        and not ctx._final_result_is_set()
+        and
+        not ctx._final_result_is_set()
     ):
         try:
             # receive all msgs, scanning for either a final result
@@ -631,6 +654,11 @@ async def drain_to_final_msg(
                     )
                     __tracebackhide__: bool = False
 
+            else:
+                log.cancel(
+                    f'IPC ctx cancelled externally during result drain ?\n'
+                    f'{ctx}'
+                )
             # CASE 2: mask the local cancelled-error(s)
             # only when we are sure the remote error is
             # the source cause of this local task's
@@ -662,17 +690,24 @@ async def drain_to_final_msg(
             case Yield():
                 pre_result_drained.append(msg)
                 if (
-                    (ctx._stream.closed
-                     and (reason := 'stream was already closed')
-                    )
-                    or (ctx.cancel_acked
-                        and (reason := 'ctx cancelled other side')
-                    )
-                    or (ctx._cancel_called
-                        and (reason := 'ctx called `.cancel()`')
-                    )
-                    or (len(pre_result_drained) > msg_limit
-                        and (reason := f'"yield" limit={msg_limit}')
+                    not parent_never_opened_stream
+                    and (
+                        (ctx._stream.closed
+                         and
+                         (reason := 'stream was already closed')
+                        ) or
+                        (ctx.cancel_acked
+                            and
+                            (reason := 'ctx cancelled other side')
+                        )
+                        or (ctx._cancel_called
+                            and
+                            (reason := 'ctx called `.cancel()`')
+                        )
+                        or (len(pre_result_drained) > msg_limit
+                            and
+                            (reason := f'"yield" limit={msg_limit}')
+                        )
                     )
                 ):
                     log.cancel(
@@ -690,7 +725,7 @@ async def drain_to_final_msg(
                 # drain up to the `msg_limit` hoping to get
                 # a final result or error/ctxc.
                 else:
-                    log.warning(
+                    report: str = (
                         'Ignoring "yield" msg during `ctx.result()` drain..\n'
                         f'<= {ctx.chan.uid}\n'
                         f'  |_{ctx._nsf}()\n\n'
@@ -699,6 +734,14 @@ async def drain_to_final_msg(
 
                         f'{pretty_struct.pformat(msg)}\n'
                     )
+                    if parent_never_opened_stream:
+                        report = (
+                            f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
+                            f'\n'
+                            # f'{ctx}\n'
+                        ) + report
+
+                    log.warning(report)
                     continue
 
             # stream terminated, but no result yet..
@@ -790,6 +833,7 @@ async def drain_to_final_msg(
             f'{ctx.outcome}\n'
         )
 
+    __tracebackhide__: bool = hide_tb
     return (
         result_msg,
         pre_result_drained,