Fix msg-draining on `parent_never_opened_stream`!

Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case
block we weren't guarding against the `ctx._stream is None` edge case
which should be treated a `continue`-draining (not a `break` or
attr-error!!) situation since the peer task maybe be continuing to send
`Yield` but has not yet sent an outcome msg (one of
`Return/Error/ContextCancelled`) to terminate the loop. Ensure we
explicitly warn about this case as well as `.cancel()` emit on a taskc.

Thanks again to @guille for discovering this!

Also add temporary `.info()`s around rxed `Return` msgs as part of
trying to debug a different bug discovered while updating the
context-semantics test suite (in a prior commit).
Tyler Goodlet 2025-03-11 14:31:53 -04:00
parent 37b8d77d98
commit fde6d84b9d
1 changed files with 64 additions and 20 deletions

View File

@ -186,10 +186,16 @@ class PldRx(Struct):
msg: MsgType = (
ipc_msg
or
# sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait()
)
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld(
msg,
ipc=ipc,
@ -219,6 +225,13 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive()
)
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld(
msg=msg,
ipc=ipc,
@ -407,8 +420,6 @@ class PldRx(Struct):
__tracebackhide__: bool = False
raise
dec_msg = decode_pld
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
@ -422,12 +433,19 @@ class PldRx(Struct):
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode it's payload, and return
the pair of refs.
Retrieve the next avail IPC msg, decode it's payload, and
return the pair of refs.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs:
match msg:
@ -444,6 +462,10 @@ class PldRx(Struct):
hide_tb=hide_tb,
**kwargs,
)
# log.runtime(
# f'Delivering payload msg\n'
# f'{msg}\n'
# )
return msg, pld
@ -538,8 +560,8 @@ async def maybe_limit_plds(
async def drain_to_final_msg(
ctx: Context,
hide_tb: bool = True,
msg_limit: int = 6,
hide_tb: bool = True,
) -> tuple[
Return|None,
@ -568,8 +590,8 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit.
'''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
@ -578,7 +600,8 @@ async def drain_to_final_msg(
result_msg: Return|Error|None = None
while not (
ctx.maybe_error
and not ctx._final_result_is_set()
and
not ctx._final_result_is_set()
):
try:
# receive all msgs, scanning for either a final result
@ -631,6 +654,11 @@ async def drain_to_final_msg(
)
__tracebackhide__: bool = False
else:
log.cancel(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
@ -662,17 +690,24 @@ async def drain_to_final_msg(
case Yield():
pre_result_drained.append(msg)
if (
not parent_never_opened_stream
and (
(ctx._stream.closed
and (reason := 'stream was already closed')
)
or (ctx.cancel_acked
and (reason := 'ctx cancelled other side')
and
(reason := 'stream was already closed')
) or
(ctx.cancel_acked
and
(reason := 'ctx cancelled other side')
)
or (ctx._cancel_called
and (reason := 'ctx called `.cancel()`')
and
(reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and (reason := f'"yield" limit={msg_limit}')
and
(reason := f'"yield" limit={msg_limit}')
)
)
):
log.cancel(
@ -690,7 +725,7 @@ async def drain_to_final_msg(
# drain up to the `msg_limit` hoping to get
# a final result or error/ctxc.
else:
log.warning(
report: str = (
'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
@ -699,6 +734,14 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n'
)
if parent_never_opened_stream:
report = (
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
f'\n'
# f'{ctx}\n'
) + report
log.warning(report)
continue
# stream terminated, but no result yet..
@ -790,6 +833,7 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n'
)
__tracebackhide__: bool = hide_tb
return (
result_msg,
pre_result_drained,