Fix msg-draining on `parent_never_opened_stream`!
Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case block we weren't guarding against the `ctx._stream is None` edge case which should be treated a `continue`-draining (not a `break` or attr-error!!) situation since the peer task maybe be continuing to send `Yield` but has not yet sent an outcome msg (one of `Return/Error/ContextCancelled`) to terminate the loop. Ensure we explicitly warn about this case as well as `.cancel()` emit on a taskc. Thanks again to @guille for discovering this! Also add temporary `.info()`s around rxed `Return` msgs as part of trying to debug a different bug discovered while updating the context-semantics test suite (in a prior commit).
parent
37b8d77d98
commit
fde6d84b9d
|
@ -186,10 +186,16 @@ class PldRx(Struct):
|
||||||
msg: MsgType = (
|
msg: MsgType = (
|
||||||
ipc_msg
|
ipc_msg
|
||||||
or
|
or
|
||||||
|
|
||||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||||
ipc._rx_chan.receive_nowait()
|
ipc._rx_chan.receive_nowait()
|
||||||
)
|
)
|
||||||
|
if (
|
||||||
|
type(msg) is Return
|
||||||
|
):
|
||||||
|
log.info(
|
||||||
|
f'Rxed final result msg\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
return self.decode_pld(
|
return self.decode_pld(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
|
@ -219,6 +225,13 @@ class PldRx(Struct):
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ipc._rx_chan.receive()
|
await ipc._rx_chan.receive()
|
||||||
)
|
)
|
||||||
|
if (
|
||||||
|
type(msg) is Return
|
||||||
|
):
|
||||||
|
log.info(
|
||||||
|
f'Rxed final result msg\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
return self.decode_pld(
|
return self.decode_pld(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
|
@ -407,8 +420,6 @@ class PldRx(Struct):
|
||||||
__tracebackhide__: bool = False
|
__tracebackhide__: bool = False
|
||||||
raise
|
raise
|
||||||
|
|
||||||
dec_msg = decode_pld
|
|
||||||
|
|
||||||
async def recv_msg_w_pld(
|
async def recv_msg_w_pld(
|
||||||
self,
|
self,
|
||||||
ipc: Context|MsgStream,
|
ipc: Context|MsgStream,
|
||||||
|
@ -422,12 +433,19 @@ class PldRx(Struct):
|
||||||
|
|
||||||
) -> tuple[MsgType, PayloadT]:
|
) -> tuple[MsgType, PayloadT]:
|
||||||
'''
|
'''
|
||||||
Retrieve the next avail IPC msg, decode it's payload, and return
|
Retrieve the next avail IPC msg, decode it's payload, and
|
||||||
the pair of refs.
|
return the pair of refs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
msg: MsgType = await ipc._rx_chan.receive()
|
msg: MsgType = await ipc._rx_chan.receive()
|
||||||
|
if (
|
||||||
|
type(msg) is Return
|
||||||
|
):
|
||||||
|
log.info(
|
||||||
|
f'Rxed final result msg\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
|
|
||||||
if passthrough_non_pld_msgs:
|
if passthrough_non_pld_msgs:
|
||||||
match msg:
|
match msg:
|
||||||
|
@ -444,6 +462,10 @@ class PldRx(Struct):
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
# log.runtime(
|
||||||
|
# f'Delivering payload msg\n'
|
||||||
|
# f'{msg}\n'
|
||||||
|
# )
|
||||||
return msg, pld
|
return msg, pld
|
||||||
|
|
||||||
|
|
||||||
|
@ -538,8 +560,8 @@ async def maybe_limit_plds(
|
||||||
async def drain_to_final_msg(
|
async def drain_to_final_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
|
||||||
hide_tb: bool = True,
|
|
||||||
msg_limit: int = 6,
|
msg_limit: int = 6,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
Return|None,
|
Return|None,
|
||||||
|
@ -568,8 +590,8 @@ async def drain_to_final_msg(
|
||||||
even after ctx closure and the `.open_context()` block exit.
|
even after ctx closure and the `.open_context()` block exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
|
||||||
raise_overrun: bool = not ctx._allow_overruns
|
raise_overrun: bool = not ctx._allow_overruns
|
||||||
|
parent_never_opened_stream: bool = ctx._stream is None
|
||||||
|
|
||||||
# wait for a final context result by collecting (but
|
# wait for a final context result by collecting (but
|
||||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||||
|
@ -578,7 +600,8 @@ async def drain_to_final_msg(
|
||||||
result_msg: Return|Error|None = None
|
result_msg: Return|Error|None = None
|
||||||
while not (
|
while not (
|
||||||
ctx.maybe_error
|
ctx.maybe_error
|
||||||
and not ctx._final_result_is_set()
|
and
|
||||||
|
not ctx._final_result_is_set()
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
# receive all msgs, scanning for either a final result
|
# receive all msgs, scanning for either a final result
|
||||||
|
@ -631,6 +654,11 @@ async def drain_to_final_msg(
|
||||||
)
|
)
|
||||||
__tracebackhide__: bool = False
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.cancel(
|
||||||
|
f'IPC ctx cancelled externally during result drain ?\n'
|
||||||
|
f'{ctx}'
|
||||||
|
)
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
|
@ -662,17 +690,24 @@ async def drain_to_final_msg(
|
||||||
case Yield():
|
case Yield():
|
||||||
pre_result_drained.append(msg)
|
pre_result_drained.append(msg)
|
||||||
if (
|
if (
|
||||||
(ctx._stream.closed
|
not parent_never_opened_stream
|
||||||
and (reason := 'stream was already closed')
|
and (
|
||||||
)
|
(ctx._stream.closed
|
||||||
or (ctx.cancel_acked
|
and
|
||||||
and (reason := 'ctx cancelled other side')
|
(reason := 'stream was already closed')
|
||||||
)
|
) or
|
||||||
or (ctx._cancel_called
|
(ctx.cancel_acked
|
||||||
and (reason := 'ctx called `.cancel()`')
|
and
|
||||||
)
|
(reason := 'ctx cancelled other side')
|
||||||
or (len(pre_result_drained) > msg_limit
|
)
|
||||||
and (reason := f'"yield" limit={msg_limit}')
|
or (ctx._cancel_called
|
||||||
|
and
|
||||||
|
(reason := 'ctx called `.cancel()`')
|
||||||
|
)
|
||||||
|
or (len(pre_result_drained) > msg_limit
|
||||||
|
and
|
||||||
|
(reason := f'"yield" limit={msg_limit}')
|
||||||
|
)
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -690,7 +725,7 @@ async def drain_to_final_msg(
|
||||||
# drain up to the `msg_limit` hoping to get
|
# drain up to the `msg_limit` hoping to get
|
||||||
# a final result or error/ctxc.
|
# a final result or error/ctxc.
|
||||||
else:
|
else:
|
||||||
log.warning(
|
report: str = (
|
||||||
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
||||||
f'<= {ctx.chan.uid}\n'
|
f'<= {ctx.chan.uid}\n'
|
||||||
f' |_{ctx._nsf}()\n\n'
|
f' |_{ctx._nsf}()\n\n'
|
||||||
|
@ -699,6 +734,14 @@ async def drain_to_final_msg(
|
||||||
|
|
||||||
f'{pretty_struct.pformat(msg)}\n'
|
f'{pretty_struct.pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
|
if parent_never_opened_stream:
|
||||||
|
report = (
|
||||||
|
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
|
||||||
|
f'\n'
|
||||||
|
# f'{ctx}\n'
|
||||||
|
) + report
|
||||||
|
|
||||||
|
log.warning(report)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# stream terminated, but no result yet..
|
# stream terminated, but no result yet..
|
||||||
|
@ -790,6 +833,7 @@ async def drain_to_final_msg(
|
||||||
f'{ctx.outcome}\n'
|
f'{ctx.outcome}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
return (
|
return (
|
||||||
result_msg,
|
result_msg,
|
||||||
pre_result_drained,
|
pre_result_drained,
|
||||||
|
|
Loading…
Reference in New Issue