Fix msg-draining on `parent_never_opened_stream`!
Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case block we weren't guarding against the `ctx._stream is None` edge case which should be treated a `continue`-draining (not a `break` or attr-error!!) situation since the peer task maybe be continuing to send `Yield` but has not yet sent an outcome msg (one of `Return/Error/ContextCancelled`) to terminate the loop. Ensure we explicitly warn about this case as well as `.cancel()` emit on a taskc. Thanks again to @guille for discovering this! Also add temporary `.info()`s around rxed `Return` msgs as part of trying to debug a different bug discovered while updating the context-semantics test suite (in a prior commit).ext_type_plds_XPS_BACKUP
							parent
							
								
									f0561fc8c0
								
							
						
					
					
						commit
						3a3fd36890
					
				|  | @ -186,10 +186,16 @@ class PldRx(Struct): | |||
|         msg: MsgType = ( | ||||
|             ipc_msg | ||||
|             or | ||||
| 
 | ||||
|             # sync-rx msg from underlying IPC feeder (mem-)chan | ||||
|             ipc._rx_chan.receive_nowait() | ||||
|         ) | ||||
|         if ( | ||||
|             type(msg) is Return | ||||
|         ): | ||||
|             log.info( | ||||
|                 f'Rxed final result msg\n' | ||||
|                 f'{msg}\n' | ||||
|             ) | ||||
|         return self.decode_pld( | ||||
|             msg, | ||||
|             ipc=ipc, | ||||
|  | @ -219,6 +225,13 @@ class PldRx(Struct): | |||
|             # async-rx msg from underlying IPC feeder (mem-)chan | ||||
|             await ipc._rx_chan.receive() | ||||
|         ) | ||||
|         if ( | ||||
|             type(msg) is Return | ||||
|         ): | ||||
|             log.info( | ||||
|                 f'Rxed final result msg\n' | ||||
|                 f'{msg}\n' | ||||
|             ) | ||||
|         return self.decode_pld( | ||||
|             msg=msg, | ||||
|             ipc=ipc, | ||||
|  | @ -407,8 +420,6 @@ class PldRx(Struct): | |||
|             __tracebackhide__: bool = False | ||||
|             raise | ||||
| 
 | ||||
|     dec_msg = decode_pld | ||||
| 
 | ||||
|     async def recv_msg_w_pld( | ||||
|         self, | ||||
|         ipc: Context|MsgStream, | ||||
|  | @ -422,12 +433,19 @@ class PldRx(Struct): | |||
| 
 | ||||
|     ) -> tuple[MsgType, PayloadT]: | ||||
|         ''' | ||||
|         Retrieve the next avail IPC msg, decode it's payload, and return | ||||
|         the pair of refs. | ||||
|         Retrieve the next avail IPC msg, decode it's payload, and | ||||
|         return the pair of refs. | ||||
| 
 | ||||
|         ''' | ||||
|         __tracebackhide__: bool = hide_tb | ||||
|         msg: MsgType = await ipc._rx_chan.receive() | ||||
|         if ( | ||||
|             type(msg) is Return | ||||
|         ): | ||||
|             log.info( | ||||
|                 f'Rxed final result msg\n' | ||||
|                 f'{msg}\n' | ||||
|             ) | ||||
| 
 | ||||
|         if passthrough_non_pld_msgs: | ||||
|             match msg: | ||||
|  | @ -444,6 +462,10 @@ class PldRx(Struct): | |||
|             hide_tb=hide_tb, | ||||
|             **kwargs, | ||||
|         ) | ||||
|         # log.runtime( | ||||
|         #     f'Delivering payload msg\n' | ||||
|         #     f'{msg}\n' | ||||
|         # ) | ||||
|         return msg, pld | ||||
| 
 | ||||
| 
 | ||||
|  | @ -538,8 +560,8 @@ async def maybe_limit_plds( | |||
| async def drain_to_final_msg( | ||||
|     ctx: Context, | ||||
| 
 | ||||
|     hide_tb: bool = True, | ||||
|     msg_limit: int = 6, | ||||
|     hide_tb: bool = True, | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|     Return|None, | ||||
|  | @ -568,8 +590,8 @@ async def drain_to_final_msg( | |||
|     even after ctx closure and the `.open_context()` block exit. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = hide_tb | ||||
|     raise_overrun: bool = not ctx._allow_overruns | ||||
|     parent_never_opened_stream: bool = ctx._stream is None | ||||
| 
 | ||||
|     # wait for a final context result by collecting (but | ||||
|     # basically ignoring) any bi-dir-stream msgs still in transit | ||||
|  | @ -578,7 +600,8 @@ async def drain_to_final_msg( | |||
|     result_msg: Return|Error|None = None | ||||
|     while not ( | ||||
|         ctx.maybe_error | ||||
|         and not ctx._final_result_is_set() | ||||
|         and | ||||
|         not ctx._final_result_is_set() | ||||
|     ): | ||||
|         try: | ||||
|             # receive all msgs, scanning for either a final result | ||||
|  | @ -631,6 +654,11 @@ async def drain_to_final_msg( | |||
|                     ) | ||||
|                     __tracebackhide__: bool = False | ||||
| 
 | ||||
|             else: | ||||
|                 log.cancel( | ||||
|                     f'IPC ctx cancelled externally during result drain ?\n' | ||||
|                     f'{ctx}' | ||||
|                 ) | ||||
|             # CASE 2: mask the local cancelled-error(s) | ||||
|             # only when we are sure the remote error is | ||||
|             # the source cause of this local task's | ||||
|  | @ -662,17 +690,24 @@ async def drain_to_final_msg( | |||
|             case Yield(): | ||||
|                 pre_result_drained.append(msg) | ||||
|                 if ( | ||||
|                     (ctx._stream.closed | ||||
|                      and (reason := 'stream was already closed') | ||||
|                     ) | ||||
|                     or (ctx.cancel_acked | ||||
|                         and (reason := 'ctx cancelled other side') | ||||
|                     ) | ||||
|                     or (ctx._cancel_called | ||||
|                         and (reason := 'ctx called `.cancel()`') | ||||
|                     ) | ||||
|                     or (len(pre_result_drained) > msg_limit | ||||
|                         and (reason := f'"yield" limit={msg_limit}') | ||||
|                     not parent_never_opened_stream | ||||
|                     and ( | ||||
|                         (ctx._stream.closed | ||||
|                          and | ||||
|                          (reason := 'stream was already closed') | ||||
|                         ) or | ||||
|                         (ctx.cancel_acked | ||||
|                             and | ||||
|                             (reason := 'ctx cancelled other side') | ||||
|                         ) | ||||
|                         or (ctx._cancel_called | ||||
|                             and | ||||
|                             (reason := 'ctx called `.cancel()`') | ||||
|                         ) | ||||
|                         or (len(pre_result_drained) > msg_limit | ||||
|                             and | ||||
|                             (reason := f'"yield" limit={msg_limit}') | ||||
|                         ) | ||||
|                     ) | ||||
|                 ): | ||||
|                     log.cancel( | ||||
|  | @ -690,7 +725,7 @@ async def drain_to_final_msg( | |||
|                 # drain up to the `msg_limit` hoping to get | ||||
|                 # a final result or error/ctxc. | ||||
|                 else: | ||||
|                     log.warning( | ||||
|                     report: str = ( | ||||
|                         'Ignoring "yield" msg during `ctx.result()` drain..\n' | ||||
|                         f'<= {ctx.chan.uid}\n' | ||||
|                         f'  |_{ctx._nsf}()\n\n' | ||||
|  | @ -699,6 +734,14 @@ async def drain_to_final_msg( | |||
| 
 | ||||
|                         f'{pretty_struct.pformat(msg)}\n' | ||||
|                     ) | ||||
|                     if parent_never_opened_stream: | ||||
|                         report = ( | ||||
|                             f'IPC ctx never opened stream on {ctx.side!r}-side!\n' | ||||
|                             f'\n' | ||||
|                             # f'{ctx}\n' | ||||
|                         ) + report | ||||
| 
 | ||||
|                     log.warning(report) | ||||
|                     continue | ||||
| 
 | ||||
|             # stream terminated, but no result yet.. | ||||
|  | @ -790,6 +833,7 @@ async def drain_to_final_msg( | |||
|             f'{ctx.outcome}\n' | ||||
|         ) | ||||
| 
 | ||||
|     __tracebackhide__: bool = hide_tb | ||||
|     return ( | ||||
|         result_msg, | ||||
|         pre_result_drained, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue