From fde6d84b9d119476ba64e3e20c1ecef0980864ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Mar 2025 14:31:53 -0400 Subject: [PATCH] Fix msg-draining on `parent_never_opened_stream`! Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case block we weren't guarding against the `ctx._stream is None` edge case which should be treated a `continue`-draining (not a `break` or attr-error!!) situation since the peer task maybe be continuing to send `Yield` but has not yet sent an outcome msg (one of `Return/Error/ContextCancelled`) to terminate the loop. Ensure we explicitly warn about this case as well as `.cancel()` emit on a taskc. Thanks again to @guille for discovering this! Also add temporary `.info()`s around rxed `Return` msgs as part of trying to debug a different bug discovered while updating the context-semantics test suite (in a prior commit). --- tractor/msg/_ops.py | 84 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 839be532..5f4b9fe8 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -186,10 +186,16 @@ class PldRx(Struct): msg: MsgType = ( ipc_msg or - # sync-rx msg from underlying IPC feeder (mem-)chan ipc._rx_chan.receive_nowait() ) + if ( + type(msg) is Return + ): + log.info( + f'Rxed final result msg\n' + f'{msg}\n' + ) return self.decode_pld( msg, ipc=ipc, @@ -219,6 +225,13 @@ class PldRx(Struct): # async-rx msg from underlying IPC feeder (mem-)chan await ipc._rx_chan.receive() ) + if ( + type(msg) is Return + ): + log.info( + f'Rxed final result msg\n' + f'{msg}\n' + ) return self.decode_pld( msg=msg, ipc=ipc, @@ -407,8 +420,6 @@ class PldRx(Struct): __tracebackhide__: bool = False raise - dec_msg = decode_pld - async def recv_msg_w_pld( self, ipc: Context|MsgStream, @@ -422,12 +433,19 @@ class PldRx(Struct): ) -> tuple[MsgType, PayloadT]: ''' - Retrieve the next avail IPC msg, decode it's payload, and return - the pair of refs. + Retrieve the next avail IPC msg, decode it's payload, and + return the pair of refs. ''' __tracebackhide__: bool = hide_tb msg: MsgType = await ipc._rx_chan.receive() + if ( + type(msg) is Return + ): + log.info( + f'Rxed final result msg\n' + f'{msg}\n' + ) if passthrough_non_pld_msgs: match msg: @@ -444,6 +462,10 @@ class PldRx(Struct): hide_tb=hide_tb, **kwargs, ) + # log.runtime( + # f'Delivering payload msg\n' + # f'{msg}\n' + # ) return msg, pld @@ -538,8 +560,8 @@ async def maybe_limit_plds( async def drain_to_final_msg( ctx: Context, - hide_tb: bool = True, msg_limit: int = 6, + hide_tb: bool = True, ) -> tuple[ Return|None, @@ -568,8 +590,8 @@ async def drain_to_final_msg( even after ctx closure and the `.open_context()` block exit. ''' - __tracebackhide__: bool = hide_tb raise_overrun: bool = not ctx._allow_overruns + parent_never_opened_stream: bool = ctx._stream is None # wait for a final context result by collecting (but # basically ignoring) any bi-dir-stream msgs still in transit @@ -578,7 +600,8 @@ async def drain_to_final_msg( result_msg: Return|Error|None = None while not ( ctx.maybe_error - and not ctx._final_result_is_set() + and + not ctx._final_result_is_set() ): try: # receive all msgs, scanning for either a final result @@ -631,6 +654,11 @@ async def drain_to_final_msg( ) __tracebackhide__: bool = False + else: + log.cancel( + f'IPC ctx cancelled externally during result drain ?\n' + f'{ctx}' + ) # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is # the source cause of this local task's @@ -662,17 +690,24 @@ async def drain_to_final_msg( case Yield(): pre_result_drained.append(msg) if ( - (ctx._stream.closed - and (reason := 'stream was already closed') - ) - or (ctx.cancel_acked - and (reason := 'ctx cancelled other side') - ) - or (ctx._cancel_called - and (reason := 'ctx called `.cancel()`') - ) - or (len(pre_result_drained) > msg_limit - and (reason := f'"yield" limit={msg_limit}') + not parent_never_opened_stream + and ( + (ctx._stream.closed + and + (reason := 'stream was already closed') + ) or + (ctx.cancel_acked + and + (reason := 'ctx cancelled other side') + ) + or (ctx._cancel_called + and + (reason := 'ctx called `.cancel()`') + ) + or (len(pre_result_drained) > msg_limit + and + (reason := f'"yield" limit={msg_limit}') + ) ) ): log.cancel( @@ -690,7 +725,7 @@ async def drain_to_final_msg( # drain up to the `msg_limit` hoping to get # a final result or error/ctxc. else: - log.warning( + report: str = ( 'Ignoring "yield" msg during `ctx.result()` drain..\n' f'<= {ctx.chan.uid}\n' f' |_{ctx._nsf}()\n\n' @@ -699,6 +734,14 @@ async def drain_to_final_msg( f'{pretty_struct.pformat(msg)}\n' ) + if parent_never_opened_stream: + report = ( + f'IPC ctx never opened stream on {ctx.side!r}-side!\n' + f'\n' + # f'{ctx}\n' + ) + report + + log.warning(report) continue # stream terminated, but no result yet.. @@ -790,6 +833,7 @@ async def drain_to_final_msg( f'{ctx.outcome}\n' ) + __tracebackhide__: bool = hide_tb return ( result_msg, pre_result_drained,