diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 4cf2049..1ba623d 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -161,9 +161,10 @@ class PldRx(Struct): ipc_msg: MsgType|None = None, expect_msg: Type[MsgType]|None = None, - **kwargs, + **dec_msg_kwargs, ) -> Any|Raw: + __tracebackhide__: bool = True msg: MsgType = ( ipc_msg @@ -176,6 +177,7 @@ class PldRx(Struct): msg, ctx=ctx, expect_msg=expect_msg, + **dec_msg_kwargs, ) async def recv_pld( @@ -183,14 +185,16 @@ class PldRx(Struct): ctx: Context, ipc_msg: MsgType|None = None, expect_msg: Type[MsgType]|None = None, + hide_tb: bool = True, - **kwargs + **dec_msg_kwargs, ) -> Any|Raw: ''' Receive a `MsgType`, then decode and return its `.pld` field. ''' + __tracebackhide__: bool = hide_tb msg: MsgType = ( ipc_msg or @@ -199,9 +203,10 @@ class PldRx(Struct): await ctx._rx_chan.receive() ) return self.dec_msg( - msg, + msg=msg, ctx=ctx, expect_msg=expect_msg, + **dec_msg_kwargs, ) def dec_msg( @@ -210,12 +215,16 @@ class PldRx(Struct): ctx: Context, expect_msg: Type[MsgType]|None, + raise_error: bool = True, + hide_tb: bool = True, + ) -> PayloadT|Raw: ''' Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and return the value or raise an appropriate error. ''' + __tracebackhide__: bool = hide_tb match msg: # payload-data shuttle msg; deliver the `.pld` value # directly to IPC (primitive) client-consumer code. @@ -228,7 +237,8 @@ class PldRx(Struct): pld: PayloadT = self._pldec.decode(pld) log.runtime( 'Decoded msg payload\n\n' - f'{msg}\n' + f'{msg}\n\n' + f'where payload is\n' f'|_pld={pld!r}\n' ) return pld @@ -237,8 +247,9 @@ class PldRx(Struct): except ValidationError as src_err: msgterr: MsgTypeError = _mk_msg_type_err( msg=msg, - codec=self.dec, + codec=self.pld_dec, src_validation_error=src_err, + is_invalid_payload=True, ) msg: Error = pack_from_raise( local_err=msgterr, @@ -263,8 +274,29 @@ class PldRx(Struct): case Error(): src_err = MessagingError( - 'IPC ctx dialog terminated without `Return`-ing a result' + 'IPC ctx dialog terminated without `Return`-ing a result\n' + f'Instead it raised {msg.boxed_type_str!r}!' ) + # XXX NOTE XXX another super subtle runtime-y thing.. + # + # - when user code (transitively) calls into this + # func (usually via a `Context/MsgStream` API) we + # generally want errors to propagate immediately + # and directly so that the user can define how it + # wants to handle them. + # + # HOWEVER, + # + # - for certain runtime calling cases, we don't want to + # directly raise since the calling code might have + # special logic around whether to raise the error + # or supress it silently (eg. a `ContextCancelled` + # received from the far end which was requested by + # this side, aka a self-cancel). + # + # SO, we offer a flag to control this. + if not raise_error: + return src_err case Stop(cid=cid): message: str = ( @@ -305,6 +337,9 @@ class PldRx(Struct): f'{msg}\n' ) + # TODO: maybe use the new `.add_note()` from 3.11? + # |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note + # # fallthrough and raise from `src_err` _raise_from_unexpected_msg( ctx=ctx, @@ -312,7 +347,7 @@ class PldRx(Struct): src_err=src_err, log=log, expect_msg=expect_msg, - hide_tb=False, + hide_tb=hide_tb, ) async def recv_msg_w_pld( @@ -320,6 +355,8 @@ class PldRx(Struct): ipc: Context|MsgStream, expect_msg: MsgType, + **kwargs, + ) -> tuple[MsgType, PayloadT]: ''' Retrieve the next avail IPC msg, decode it's payload, and return @@ -335,6 +372,7 @@ class PldRx(Struct): msg, ctx=ipc, expect_msg=expect_msg, + **kwargs, ) return msg, pld @@ -433,70 +471,33 @@ async def drain_to_final_msg( # basically ignoring) any bi-dir-stream msgs still in transit # from the far end. pre_result_drained: list[MsgType] = [] - return_msg: Return|None = None + result_msg: Return|Error|None = None while not ( ctx.maybe_error and not ctx._final_result_is_set() ): try: - # TODO: can remove? - # await trio.lowlevel.checkpoint() - - # NOTE: this REPL usage actually works here dawg! Bo - # from .devx._debug import pause - # await pause() - - # TODO: bad idea? - # -[ ] wrap final outcome channel wait in a scope so - # it can be cancelled out of band if needed? - # - # with trio.CancelScope() as res_cs: - # ctx._res_scope = res_cs - # msg: dict = await ctx._rx_chan.receive() - # if res_cs.cancelled_caught: - - # TODO: ensure there's no more hangs, debugging the - # runtime pretty preaase! - # from .devx._debug import pause - # await pause() - - # TODO: can remove this finally? - # we have no more need for the sync draining right - # since we're can kinda guarantee the async - # `.receive()` below will never block yah? - # - # if ( - # ctx._cancel_called and ( - # ctx.cancel_acked - # # or ctx.chan._cancel_called - # ) - # # or not ctx._final_result_is_set() - # # ctx.outcome is not - # # or ctx.chan._closed - # ): - # try: - # msg: dict = await ctx._rx_chan.receive_nowait()() - # except trio.WouldBlock: - # log.warning( - # 'When draining already `.cancel_called` ctx!\n' - # 'No final msg arrived..\n' - # ) - # break - # else: - # msg: dict = await ctx._rx_chan.receive() - - # TODO: don't need it right jefe? - # with trio.move_on_after(1) as cs: - # if cs.cancelled_caught: - # from .devx._debug import pause - # await pause() - - # pray to the `trio` gawds that we're corrent with this - # msg: dict = await ctx._rx_chan.receive() + # receive all msgs, scanning for either a final result + # or error; the underlying call should never raise any + # remote error directly! msg, pld = await ctx._pld_rx.recv_msg_w_pld( ipc=ctx, expect_msg=Return, + raise_error=False, ) + # ^-TODO-^ some bad ideas? + # -[ ] wrap final outcome .receive() in a scope so + # it can be cancelled out of band if needed? + # |_with trio.CancelScope() as res_cs: + # ctx._res_scope = res_cs + # msg: dict = await ctx._rx_chan.receive() + # if res_cs.cancelled_caught: + # + # -[ ] make sure pause points work here for REPLing + # the runtime itself; i.e. ensure there's no hangs! + # |_from tractor.devx._debug import pause + # await pause() + # NOTE: we get here if the far end was # `ContextCancelled` in 2 cases: @@ -504,7 +505,7 @@ async def drain_to_final_msg( # SHOULD NOT raise that far end error, # 2. WE DID NOT REQUEST that cancel and thus # SHOULD RAISE HERE! - except trio.Cancelled: + except trio.Cancelled as taskc: # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is @@ -514,7 +515,7 @@ async def drain_to_final_msg( # CASE 1: we DID request the cancel we simply # continue to bubble up as normal. - raise + raise taskc match msg: @@ -534,7 +535,7 @@ async def drain_to_final_msg( # if ctx._rx_chan: # await ctx._rx_chan.aclose() # TODO: ^ we don't need it right? - return_msg = msg + result_msg = msg break # far end task is still streaming to us so discard @@ -565,10 +566,7 @@ async def drain_to_final_msg( f'{pretty_struct.pformat(msg)}\n' ) - return ( - return_msg, - pre_result_drained, - ) + break # drain up to the `msg_limit` hoping to get # a final result or error/ctxc. @@ -604,9 +602,9 @@ async def drain_to_final_msg( case Error(): # TODO: can we replace this with `ctx.maybe_raise()`? # -[ ] would this be handier for this case maybe? - # async with maybe_raise_on_exit() as raises: - # if raises: - # log.error('some msg about raising..') + # |_async with maybe_raise_on_exit() as raises: + # if raises: + # log.error('some msg about raising..') # re: Exception|None = ctx._remote_error if re: @@ -640,7 +638,7 @@ async def drain_to_final_msg( # raise_overrun_from_self=False, raise_overrun_from_self=raise_overrun, ) - + result_msg = msg break # OOOOOF, yeah obvi we need this.. # XXX we should never really get here @@ -686,6 +684,6 @@ async def drain_to_final_msg( ) return ( - return_msg, + result_msg, pre_result_drained, )