forked from goodboy/tractor
1
0
Fork 0

Rename `PldRx.dec_msg()` -> `.decode_pld()`

Keep the old alias, but i think it's better form to use longer names for
internal public APIs and this name better reflects the functionality:
decoding and returning a `PayloadMsg.pld` field.
runtime_to_msgspec
Tyler Goodlet 2024-05-30 16:09:59 -04:00
parent 2f854a3e86
commit 13ea500a44
1 changed files with 15 additions and 39 deletions

View File

@ -167,7 +167,7 @@ class PldRx(Struct):
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = False, hide_tb: bool = False,
**dec_msg_kwargs, **dec_pld_kwargs,
) -> Any|Raw: ) -> Any|Raw:
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -179,12 +179,12 @@ class PldRx(Struct):
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait() ipc._rx_chan.receive_nowait()
) )
return self.dec_msg( return self.decode_pld(
msg, msg,
ipc=ipc, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=hide_tb, hide_tb=hide_tb,
**dec_msg_kwargs, **dec_pld_kwargs,
) )
async def recv_pld( async def recv_pld(
@ -194,7 +194,7 @@ class PldRx(Struct):
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True, hide_tb: bool = True,
**dec_msg_kwargs, **dec_pld_kwargs,
) -> Any|Raw: ) -> Any|Raw:
''' '''
@ -208,17 +208,14 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ipc._rx_chan.receive()
) )
return self.dec_msg( return self.decode_pld(
msg=msg, msg=msg,
ipc=ipc, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
**dec_msg_kwargs, **dec_pld_kwargs,
) )
# TODO: rename to, def decode_pld(
# -[ ] `.decode_pld()`?
# -[ ] `.dec_pld()`?
def dec_msg(
self, self,
msg: MsgType, msg: MsgType,
ipc: Context|MsgStream, ipc: Context|MsgStream,
@ -299,9 +296,6 @@ class PldRx(Struct):
if not is_started_send_side if not is_started_send_side
else ipc._actor.uid else ipc._actor.uid
), ),
# tb=valerr.__traceback__,
# tb_str=mte._message,
# message=mte._message,
) )
mte._ipc_msg = err_msg mte._ipc_msg = err_msg
@ -317,29 +311,6 @@ class PldRx(Struct):
# validation error. # validation error.
src_err = valerr src_err = valerr
# TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y..
# TODO: remove this since it's been added to
# `_raise_from_unexpected_msg()`..?
# if (
# expect_msg is not Started
# and not is_started_send_side
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# XXX some other decoder specific failure?
# except TypeError as src_error:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise src_error
# ^-TODO-^ can remove?
# a runtime-internal RPC endpoint response. # a runtime-internal RPC endpoint response.
# always passthrough since (internal) runtime # always passthrough since (internal) runtime
# responses are generally never exposed to consumer # responses are generally never exposed to consumer
@ -435,6 +406,8 @@ class PldRx(Struct):
__tracebackhide__: bool = False __tracebackhide__: bool = False
raise raise
dec_msg = decode_pld
async def recv_msg_w_pld( async def recv_msg_w_pld(
self, self,
ipc: Context|MsgStream, ipc: Context|MsgStream,
@ -463,7 +436,7 @@ class PldRx(Struct):
# TODO: is there some way we can inject the decoded # TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original # payload into an existing output buffer for the original
# msg instance? # msg instance?
pld: PayloadT = self.dec_msg( pld: PayloadT = self.decode_pld(
msg, msg,
ipc=ipc, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
@ -610,7 +583,10 @@ async def drain_to_final_msg(
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
# cancellation. # cancellation.
ctx.maybe_raise() ctx.maybe_raise(
# TODO: when use this/
# from_src_exc=taskc,
)
# CASE 1: we DID request the cancel we simply # CASE 1: we DID request the cancel we simply
# continue to bubble up as normal. # continue to bubble up as normal.
@ -783,7 +759,7 @@ def validate_payload_msg(
try: try:
roundtripped: Started = codec.decode(msg_bytes) roundtripped: Started = codec.decode(msg_bytes)
ctx: Context = getattr(ipc, 'ctx', ipc) ctx: Context = getattr(ipc, 'ctx', ipc)
pld: PayloadT = ctx.pld_rx.dec_msg( pld: PayloadT = ctx.pld_rx.decode_pld(
msg=roundtripped, msg=roundtripped,
ipc=ipc, ipc=ipc,
expect_msg=Started, expect_msg=Started,