From b56352b0e4951d73be2b94af1b692bd3bd98aec3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 3 Jul 2024 17:01:37 -0400 Subject: [PATCH] Quieter `Stop` handling on ctx result capture In the `drain_to_final_msg()` impl, since a stream terminating gracefully requires this msg, there's really no reason to `log.cancel()` about it; go `.runtime()` level instead since we're trying de-noise under "normal operation". Also, - passthrough `hide_tb` to taskc-handler's `ctx.maybe_raise()` call. - raise `MessagingError` for the `MsgType` unmatched `case _:`. - detail the doc string motivation a little more. --- tractor/msg/_ops.py | 55 +++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 91c0dde..f0f3b6b 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -374,7 +374,7 @@ class PldRx(Struct): case _: src_err = InternalError( - 'Unknown IPC msg ??\n\n' + 'Invalid IPC msg ??\n\n' f'{msg}\n' ) @@ -499,7 +499,7 @@ async def maybe_limit_plds( yield None return - # sanity on scoping + # sanity check on IPC scoping curr_ctx: Context = current_ipc_ctx() assert ctx is curr_ctx @@ -510,6 +510,8 @@ async def maybe_limit_plds( ) as msgdec: yield msgdec + # when the applied spec is unwound/removed, the same IPC-ctx + # should still be in scope. curr_ctx: Context = current_ipc_ctx() assert ctx is curr_ctx @@ -525,16 +527,26 @@ async def drain_to_final_msg( list[MsgType] ]: ''' - Drain IPC msgs delivered to the underlying IPC primitive's - rx-mem-chan (eg. `Context._rx_chan`) from the runtime in - search for a final result or error. + Drain IPC msgs delivered to the underlying IPC context's + rx-mem-chan (i.e. from `Context._rx_chan`) in search for a final + `Return` or `Error` msg. - The motivation here is to ideally capture errors during ctxc - conditions where a canc-request/or local error is sent but the - local task also excepts and enters the - `Portal.open_context().__aexit__()` block wherein we prefer to - capture and raise any remote error or ctxc-ack as part of the - `ctx.result()` cleanup and teardown sequence. + Deliver the `Return` + preceding drained msgs (`list[MsgType]`) + as a pair unless an `Error` is found, in which unpack and raise + it. + + The motivation here is to always capture any remote error relayed + by the remote peer task during a ctxc condition. + + For eg. a ctxc-request may be sent to the peer as part of the + local task's (request for) cancellation but then that same task + **also errors** before executing the teardown in the + `Portal.open_context().__aexit__()` block. In such error-on-exit + cases we want to always capture and raise any delivered remote + error (like an expected ctxc-ACK) as part of the final + `ctx.wait_for_result()` teardown sequence such that the + `Context.outcome` related state always reflect what transpired + even after ctx closure and the `.open_context()` block exit. ''' __tracebackhide__: bool = hide_tb @@ -572,7 +584,6 @@ async def drain_to_final_msg( # |_from tractor.devx._debug import pause # await pause() - # NOTE: we get here if the far end was # `ContextCancelled` in 2 cases: # 1. we requested the cancellation and thus @@ -580,13 +591,13 @@ async def drain_to_final_msg( # 2. WE DID NOT REQUEST that cancel and thus # SHOULD RAISE HERE! except trio.Cancelled as taskc: - # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is # the source cause of this local task's # cancellation. ctx.maybe_raise( - # TODO: when use this/ + hide_tb=hide_tb, + # TODO: when use this? # from_src_exc=taskc, ) @@ -659,7 +670,7 @@ async def drain_to_final_msg( # Stop() case Stop(): pre_result_drained.append(msg) - log.cancel( + log.runtime( # normal/expected shutdown transaction 'Remote stream terminated due to "stop" msg:\n\n' f'{pretty_struct.pformat(msg)}\n' ) @@ -719,13 +730,19 @@ async def drain_to_final_msg( pre_result_drained.append(msg) # It's definitely an internal error if any other # msg type without a`'cid'` field arrives here! + report: str = ( + f'Invalid or unknown msg type {type(msg)!r}!?\n' + ) if not msg.cid: - raise InternalError( - 'Unexpected cid-missing msg?\n\n' - f'{msg}\n' + report += ( + '\nWhich also has no `.cid` field?\n' ) - raise RuntimeError('Unknown msg type: {msg}') + raise MessagingError( + report + + + f'\n{msg}\n' + ) else: log.cancel(