Mk `drain_to_final_msg()` never raise from `Error`

Since we usually want them raised from some (internal) call to
`Context.maybe_raise()` and NOT directly from the drainage call, make it
possible via a new `raise_error: bool` to both `PldRx.recv_msg_w_pld()`
and `.dec_msg()`.

In support,
- rename `return_msg` -> `result_msg` since we expect to return
  `Error`s.
- do a `result_msg` assign and `break` in the `case Error()`.
- add `**dec_msg_kwargs` passthrough for other `.dec_msg()` calling
  methods.

Other,
- drop/aggregate todo-notes around the main loop's
  `ctx._pld_rx.recv_msg_w_pld()` call.
- add (configurable) frame hiding to most payload receive meths.
runtime_to_msgspec
Tyler Goodlet 2024-05-06 13:27:00 -04:00
parent 8ffa6a5e68
commit b278164f83
1 changed files with 72 additions and 74 deletions

View File

@ -161,9 +161,10 @@ class PldRx(Struct):
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
**kwargs, **dec_msg_kwargs,
) -> Any|Raw: ) -> Any|Raw:
__tracebackhide__: bool = True
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
@ -176,6 +177,7 @@ class PldRx(Struct):
msg, msg,
ctx=ctx, ctx=ctx,
expect_msg=expect_msg, expect_msg=expect_msg,
**dec_msg_kwargs,
) )
async def recv_pld( async def recv_pld(
@ -183,14 +185,16 @@ class PldRx(Struct):
ctx: Context, ctx: Context,
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
**kwargs **dec_msg_kwargs,
) -> Any|Raw: ) -> Any|Raw:
''' '''
Receive a `MsgType`, then decode and return its `.pld` field. Receive a `MsgType`, then decode and return its `.pld` field.
''' '''
__tracebackhide__: bool = hide_tb
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
@ -199,9 +203,10 @@ class PldRx(Struct):
await ctx._rx_chan.receive() await ctx._rx_chan.receive()
) )
return self.dec_msg( return self.dec_msg(
msg, msg=msg,
ctx=ctx, ctx=ctx,
expect_msg=expect_msg, expect_msg=expect_msg,
**dec_msg_kwargs,
) )
def dec_msg( def dec_msg(
@ -210,12 +215,16 @@ class PldRx(Struct):
ctx: Context, ctx: Context,
expect_msg: Type[MsgType]|None, expect_msg: Type[MsgType]|None,
raise_error: bool = True,
hide_tb: bool = True,
) -> PayloadT|Raw: ) -> PayloadT|Raw:
''' '''
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
return the value or raise an appropriate error. return the value or raise an appropriate error.
''' '''
__tracebackhide__: bool = hide_tb
match msg: match msg:
# payload-data shuttle msg; deliver the `.pld` value # payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code. # directly to IPC (primitive) client-consumer code.
@ -228,7 +237,8 @@ class PldRx(Struct):
pld: PayloadT = self._pldec.decode(pld) pld: PayloadT = self._pldec.decode(pld)
log.runtime( log.runtime(
'Decoded msg payload\n\n' 'Decoded msg payload\n\n'
f'{msg}\n' f'{msg}\n\n'
f'where payload is\n'
f'|_pld={pld!r}\n' f'|_pld={pld!r}\n'
) )
return pld return pld
@ -237,8 +247,9 @@ class PldRx(Struct):
except ValidationError as src_err: except ValidationError as src_err:
msgterr: MsgTypeError = _mk_msg_type_err( msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg, msg=msg,
codec=self.dec, codec=self.pld_dec,
src_validation_error=src_err, src_validation_error=src_err,
is_invalid_payload=True,
) )
msg: Error = pack_from_raise( msg: Error = pack_from_raise(
local_err=msgterr, local_err=msgterr,
@ -263,8 +274,29 @@ class PldRx(Struct):
case Error(): case Error():
src_err = MessagingError( src_err = MessagingError(
'IPC ctx dialog terminated without `Return`-ing a result' 'IPC ctx dialog terminated without `Return`-ing a result\n'
f'Instead it raised {msg.boxed_type_str!r}!'
) )
# XXX NOTE XXX another super subtle runtime-y thing..
#
# - when user code (transitively) calls into this
# func (usually via a `Context/MsgStream` API) we
# generally want errors to propagate immediately
# and directly so that the user can define how it
# wants to handle them.
#
# HOWEVER,
#
# - for certain runtime calling cases, we don't want to
# directly raise since the calling code might have
# special logic around whether to raise the error
# or supress it silently (eg. a `ContextCancelled`
# received from the far end which was requested by
# this side, aka a self-cancel).
#
# SO, we offer a flag to control this.
if not raise_error:
return src_err
case Stop(cid=cid): case Stop(cid=cid):
message: str = ( message: str = (
@ -305,6 +337,9 @@ class PldRx(Struct):
f'{msg}\n' f'{msg}\n'
) )
# TODO: maybe use the new `.add_note()` from 3.11?
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
#
# fallthrough and raise from `src_err` # fallthrough and raise from `src_err`
_raise_from_unexpected_msg( _raise_from_unexpected_msg(
ctx=ctx, ctx=ctx,
@ -312,7 +347,7 @@ class PldRx(Struct):
src_err=src_err, src_err=src_err,
log=log, log=log,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=False, hide_tb=hide_tb,
) )
async def recv_msg_w_pld( async def recv_msg_w_pld(
@ -320,6 +355,8 @@ class PldRx(Struct):
ipc: Context|MsgStream, ipc: Context|MsgStream,
expect_msg: MsgType, expect_msg: MsgType,
**kwargs,
) -> tuple[MsgType, PayloadT]: ) -> tuple[MsgType, PayloadT]:
''' '''
Retrieve the next avail IPC msg, decode it's payload, and return Retrieve the next avail IPC msg, decode it's payload, and return
@ -335,6 +372,7 @@ class PldRx(Struct):
msg, msg,
ctx=ipc, ctx=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
**kwargs,
) )
return msg, pld return msg, pld
@ -433,70 +471,33 @@ async def drain_to_final_msg(
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
# from the far end. # from the far end.
pre_result_drained: list[MsgType] = [] pre_result_drained: list[MsgType] = []
return_msg: Return|None = None result_msg: Return|Error|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and not ctx._final_result_is_set()
): ):
try: try:
# TODO: can remove? # receive all msgs, scanning for either a final result
# await trio.lowlevel.checkpoint() # or error; the underlying call should never raise any
# remote error directly!
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# TODO: bad idea?
# -[ ] wrap final outcome channel wait in a scope so
# it can be cancelled out of band if needed?
#
# with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
# TODO: ensure there's no more hangs, debugging the
# runtime pretty preaase!
# from .devx._debug import pause
# await pause()
# TODO: can remove this finally?
# we have no more need for the sync draining right
# since we're can kinda guarantee the async
# `.receive()` below will never block yah?
#
# if (
# ctx._cancel_called and (
# ctx.cancel_acked
# # or ctx.chan._cancel_called
# )
# # or not ctx._final_result_is_set()
# # ctx.outcome is not
# # or ctx.chan._closed
# ):
# try:
# msg: dict = await ctx._rx_chan.receive_nowait()()
# except trio.WouldBlock:
# log.warning(
# 'When draining already `.cancel_called` ctx!\n'
# 'No final msg arrived..\n'
# )
# break
# else:
# msg: dict = await ctx._rx_chan.receive()
# TODO: don't need it right jefe?
# with trio.move_on_after(1) as cs:
# if cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
# pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._rx_chan.receive()
msg, pld = await ctx._pld_rx.recv_msg_w_pld( msg, pld = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx, ipc=ctx,
expect_msg=Return, expect_msg=Return,
raise_error=False,
) )
# ^-TODO-^ some bad ideas?
# -[ ] wrap final outcome .receive() in a scope so
# it can be cancelled out of band if needed?
# |_with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
#
# -[ ] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs!
# |_from tractor.devx._debug import pause
# await pause()
# NOTE: we get here if the far end was # NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases: # `ContextCancelled` in 2 cases:
@ -504,7 +505,7 @@ async def drain_to_final_msg(
# SHOULD NOT raise that far end error, # SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus # 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE! # SHOULD RAISE HERE!
except trio.Cancelled: except trio.Cancelled as taskc:
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
@ -514,7 +515,7 @@ async def drain_to_final_msg(
# CASE 1: we DID request the cancel we simply # CASE 1: we DID request the cancel we simply
# continue to bubble up as normal. # continue to bubble up as normal.
raise raise taskc
match msg: match msg:
@ -534,7 +535,7 @@ async def drain_to_final_msg(
# if ctx._rx_chan: # if ctx._rx_chan:
# await ctx._rx_chan.aclose() # await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right? # TODO: ^ we don't need it right?
return_msg = msg result_msg = msg
break break
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
@ -565,10 +566,7 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
return ( break
return_msg,
pre_result_drained,
)
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
@ -604,7 +602,7 @@ async def drain_to_final_msg(
case Error(): case Error():
# TODO: can we replace this with `ctx.maybe_raise()`? # TODO: can we replace this with `ctx.maybe_raise()`?
# -[ ] would this be handier for this case maybe? # -[ ] would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises: # |_async with maybe_raise_on_exit() as raises:
# if raises: # if raises:
# log.error('some msg about raising..') # log.error('some msg about raising..')
# #
@ -640,7 +638,7 @@ async def drain_to_final_msg(
# raise_overrun_from_self=False, # raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun, raise_overrun_from_self=raise_overrun,
) )
result_msg = msg
break # OOOOOF, yeah obvi we need this.. break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here # XXX we should never really get here
@ -686,6 +684,6 @@ async def drain_to_final_msg(
) )
return ( return (
return_msg, result_msg,
pre_result_drained, pre_result_drained,
) )