Fix msg-draining on `parent_never_opened_stream`!

Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case
block we weren't guarding against the `ctx._stream is None` edge case
which should be treated a `continue`-draining (not a `break` or
attr-error!!) situation since the peer task maybe be continuing to send
`Yield` but has not yet sent an outcome msg (one of
`Return/Error/ContextCancelled`) to terminate the loop. Ensure we
explicitly warn about this case as well as `.cancel()` emit on a taskc.

Thanks again to @guille for discovering this!

Also add temporary `.info()`s around rxed `Return` msgs as part of
trying to debug a different bug discovered while updating the
context-semantics test suite (in a prior commit).
Tyler Goodlet 2025-03-11 14:31:53 -04:00
parent 37b8d77d98
commit fde6d84b9d
1 changed files with 64 additions and 20 deletions

View File

@ -186,10 +186,16 @@ class PldRx(Struct):
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait() ipc._rx_chan.receive_nowait()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg, msg,
ipc=ipc, ipc=ipc,
@ -219,6 +225,13 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ipc._rx_chan.receive()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg=msg, msg=msg,
ipc=ipc, ipc=ipc,
@ -407,8 +420,6 @@ class PldRx(Struct):
__tracebackhide__: bool = False __tracebackhide__: bool = False
raise raise
dec_msg = decode_pld
async def recv_msg_w_pld( async def recv_msg_w_pld(
self, self,
ipc: Context|MsgStream, ipc: Context|MsgStream,
@ -422,12 +433,19 @@ class PldRx(Struct):
) -> tuple[MsgType, PayloadT]: ) -> tuple[MsgType, PayloadT]:
''' '''
Retrieve the next avail IPC msg, decode it's payload, and return Retrieve the next avail IPC msg, decode it's payload, and
the pair of refs. return the pair of refs.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive() msg: MsgType = await ipc._rx_chan.receive()
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs: if passthrough_non_pld_msgs:
match msg: match msg:
@ -444,6 +462,10 @@ class PldRx(Struct):
hide_tb=hide_tb, hide_tb=hide_tb,
**kwargs, **kwargs,
) )
# log.runtime(
# f'Delivering payload msg\n'
# f'{msg}\n'
# )
return msg, pld return msg, pld
@ -538,8 +560,8 @@ async def maybe_limit_plds(
async def drain_to_final_msg( async def drain_to_final_msg(
ctx: Context, ctx: Context,
hide_tb: bool = True,
msg_limit: int = 6, msg_limit: int = 6,
hide_tb: bool = True,
) -> tuple[ ) -> tuple[
Return|None, Return|None,
@ -568,8 +590,8 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit. even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
@ -578,7 +600,8 @@ async def drain_to_final_msg(
result_msg: Return|Error|None = None result_msg: Return|Error|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and
not ctx._final_result_is_set()
): ):
try: try:
# receive all msgs, scanning for either a final result # receive all msgs, scanning for either a final result
@ -631,6 +654,11 @@ async def drain_to_final_msg(
) )
__tracebackhide__: bool = False __tracebackhide__: bool = False
else:
log.cancel(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
@ -662,17 +690,24 @@ async def drain_to_final_msg(
case Yield(): case Yield():
pre_result_drained.append(msg) pre_result_drained.append(msg)
if ( if (
(ctx._stream.closed not parent_never_opened_stream
and (reason := 'stream was already closed') and (
) (ctx._stream.closed
or (ctx.cancel_acked and
and (reason := 'ctx cancelled other side') (reason := 'stream was already closed')
) ) or
or (ctx._cancel_called (ctx.cancel_acked
and (reason := 'ctx called `.cancel()`') and
) (reason := 'ctx cancelled other side')
or (len(pre_result_drained) > msg_limit )
and (reason := f'"yield" limit={msg_limit}') or (ctx._cancel_called
and
(reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and
(reason := f'"yield" limit={msg_limit}')
)
) )
): ):
log.cancel( log.cancel(
@ -690,7 +725,7 @@ async def drain_to_final_msg(
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
else: else:
log.warning( report: str = (
'Ignoring "yield" msg during `ctx.result()` drain..\n' 'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n' f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n' f' |_{ctx._nsf}()\n\n'
@ -699,6 +734,14 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
if parent_never_opened_stream:
report = (
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
f'\n'
# f'{ctx}\n'
) + report
log.warning(report)
continue continue
# stream terminated, but no result yet.. # stream terminated, but no result yet..
@ -790,6 +833,7 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )
__tracebackhide__: bool = hide_tb
return ( return (
result_msg, result_msg,
pre_result_drained, pre_result_drained,