From 13ea500a44130ae25c8a93e3e9bcc8356625c427 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 May 2024 16:09:59 -0400 Subject: [PATCH] Rename `PldRx.dec_msg()` -> `.decode_pld()` Keep the old alias, but i think it's better form to use longer names for internal public APIs and this name better reflects the functionality: decoding and returning a `PayloadMsg.pld` field. --- tractor/msg/_ops.py | 54 +++++++++++++-------------------------------- 1 file changed, 15 insertions(+), 39 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 97cd3f2..86f8039 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -167,7 +167,7 @@ class PldRx(Struct): ipc_msg: MsgType|None = None, expect_msg: Type[MsgType]|None = None, hide_tb: bool = False, - **dec_msg_kwargs, + **dec_pld_kwargs, ) -> Any|Raw: __tracebackhide__: bool = hide_tb @@ -179,12 +179,12 @@ class PldRx(Struct): # sync-rx msg from underlying IPC feeder (mem-)chan ipc._rx_chan.receive_nowait() ) - return self.dec_msg( + return self.decode_pld( msg, ipc=ipc, expect_msg=expect_msg, hide_tb=hide_tb, - **dec_msg_kwargs, + **dec_pld_kwargs, ) async def recv_pld( @@ -194,7 +194,7 @@ class PldRx(Struct): expect_msg: Type[MsgType]|None = None, hide_tb: bool = True, - **dec_msg_kwargs, + **dec_pld_kwargs, ) -> Any|Raw: ''' @@ -208,17 +208,14 @@ class PldRx(Struct): # async-rx msg from underlying IPC feeder (mem-)chan await ipc._rx_chan.receive() ) - return self.dec_msg( + return self.decode_pld( msg=msg, ipc=ipc, expect_msg=expect_msg, - **dec_msg_kwargs, + **dec_pld_kwargs, ) - # TODO: rename to, - # -[ ] `.decode_pld()`? - # -[ ] `.dec_pld()`? - def dec_msg( + def decode_pld( self, msg: MsgType, ipc: Context|MsgStream, @@ -299,9 +296,6 @@ class PldRx(Struct): if not is_started_send_side else ipc._actor.uid ), - # tb=valerr.__traceback__, - # tb_str=mte._message, - # message=mte._message, ) mte._ipc_msg = err_msg @@ -317,29 +311,6 @@ class PldRx(Struct): # validation error. src_err = valerr - # TODO: should we instead make this explicit and - # use the above masked `is_started_send_decode`, - # expecting the `Context.started()` caller to set - # it? Rn this is kinda, howyousayyy, implicitly - # edge-case-y.. - # TODO: remove this since it's been added to - # `_raise_from_unexpected_msg()`..? - # if ( - # expect_msg is not Started - # and not is_started_send_side - # ): - # # set emulated remote error more-or-less as the - # # runtime would - # ctx: Context = getattr(ipc, 'ctx', ipc) - # ctx._maybe_cancel_and_set_remote_error(mte) - - # XXX some other decoder specific failure? - # except TypeError as src_error: - # from .devx import mk_pdb - # mk_pdb().set_trace() - # raise src_error - # ^-TODO-^ can remove? - # a runtime-internal RPC endpoint response. # always passthrough since (internal) runtime # responses are generally never exposed to consumer @@ -435,6 +406,8 @@ class PldRx(Struct): __tracebackhide__: bool = False raise + dec_msg = decode_pld + async def recv_msg_w_pld( self, ipc: Context|MsgStream, @@ -463,7 +436,7 @@ class PldRx(Struct): # TODO: is there some way we can inject the decoded # payload into an existing output buffer for the original # msg instance? - pld: PayloadT = self.dec_msg( + pld: PayloadT = self.decode_pld( msg, ipc=ipc, expect_msg=expect_msg, @@ -610,7 +583,10 @@ async def drain_to_final_msg( # only when we are sure the remote error is # the source cause of this local task's # cancellation. - ctx.maybe_raise() + ctx.maybe_raise( + # TODO: when use this/ + # from_src_exc=taskc, + ) # CASE 1: we DID request the cancel we simply # continue to bubble up as normal. @@ -783,7 +759,7 @@ def validate_payload_msg( try: roundtripped: Started = codec.decode(msg_bytes) ctx: Context = getattr(ipc, 'ctx', ipc) - pld: PayloadT = ctx.pld_rx.dec_msg( + pld: PayloadT = ctx.pld_rx.decode_pld( msg=roundtripped, ipc=ipc, expect_msg=Started,