Compare commits
2 Commits
8477919fc9
...
72df312e71
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 72df312e71 | |
Tyler Goodlet | 711f639fc5 |
|
@ -49,7 +49,6 @@ from typing import (
|
|||
Any,
|
||||
AsyncGenerator,
|
||||
Callable,
|
||||
Mapping,
|
||||
Type,
|
||||
TypeAlias,
|
||||
TYPE_CHECKING,
|
||||
|
@ -1484,6 +1483,8 @@ class Context:
|
|||
#
|
||||
__tracebackhide__: bool = hide_tb
|
||||
if validate_pld_spec:
|
||||
# TODO: prolly wrap this as a `show_frame_when_not()`
|
||||
try:
|
||||
msgops.validate_payload_msg(
|
||||
pld_msg=started_msg,
|
||||
pld_value=value,
|
||||
|
@ -1491,6 +1492,12 @@ class Context:
|
|||
strict_pld_parity=strict_pld_parity,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
except BaseException as err:
|
||||
if not isinstance(err, MsgTypeError):
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
raise
|
||||
|
||||
|
||||
# TODO: maybe a flag to by-pass encode op if already done
|
||||
# here in caller?
|
||||
|
@ -2185,11 +2192,6 @@ async def open_context_from_portal(
|
|||
try:
|
||||
result_or_err: Exception|Any = await ctx.result()
|
||||
except BaseException as berr:
|
||||
# cancelled before (or maybe during?) final result capture
|
||||
# if isinstance(trio.Cancelled, berr):
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb.set_trace()
|
||||
|
||||
# on normal teardown, if we get some error
|
||||
# raised in `Context.result()` we still want to
|
||||
# save that error on the ctx's state to
|
||||
|
@ -2201,7 +2203,7 @@ async def open_context_from_portal(
|
|||
ctx._local_error: BaseException = scope_err
|
||||
raise
|
||||
|
||||
# yes! this worx Bp
|
||||
# yes this worx!
|
||||
# from .devx import _debug
|
||||
# await _debug.pause()
|
||||
|
||||
|
|
|
@ -1232,14 +1232,13 @@ def _raise_from_unexpected_msg(
|
|||
_raise_from_no_key_in_msg = _raise_from_unexpected_msg
|
||||
|
||||
|
||||
def _mk_msg_type_err(
|
||||
def _mk_send_mte(
|
||||
msg: Any|bytes|MsgType,
|
||||
codec: MsgCodec|MsgDec,
|
||||
|
||||
message: str|None = None,
|
||||
verb_header: str = '',
|
||||
|
||||
src_validation_error: ValidationError|None = None,
|
||||
src_type_error: TypeError|None = None,
|
||||
is_invalid_payload: bool = False,
|
||||
|
||||
|
@ -1247,12 +1246,10 @@ def _mk_msg_type_err(
|
|||
|
||||
) -> MsgTypeError:
|
||||
'''
|
||||
Compose a `MsgTypeError` from an input runtime context.
|
||||
Compose a `MsgTypeError` from a `Channel.send()`-side error,
|
||||
normally raised witih a runtime IPC `Context`.
|
||||
|
||||
'''
|
||||
# `Channel.send()` case
|
||||
if src_validation_error is None:
|
||||
|
||||
if isinstance(codec, MsgDec):
|
||||
raise RuntimeError(
|
||||
'`codec` must be a `MsgCodec` for send-side errors?'
|
||||
|
@ -1298,8 +1295,27 @@ def _mk_msg_type_err(
|
|||
msgtyperr.__cause__ = src_type_error
|
||||
return msgtyperr
|
||||
|
||||
# `Channel.recv()` case
|
||||
else:
|
||||
|
||||
def _mk_recv_mte(
|
||||
msg: Any|bytes|MsgType,
|
||||
codec: MsgCodec|MsgDec,
|
||||
|
||||
message: str|None = None,
|
||||
verb_header: str = '',
|
||||
|
||||
src_validation_error: ValidationError|None = None,
|
||||
is_invalid_payload: bool = False,
|
||||
|
||||
**mte_kwargs,
|
||||
|
||||
) -> MsgTypeError:
|
||||
'''
|
||||
Compose a `MsgTypeError` from a
|
||||
`Channel|Context|MsgStream.receive()`-side error,
|
||||
normally raised witih a runtime IPC ctx or streaming
|
||||
block.
|
||||
|
||||
'''
|
||||
msg_dict: dict|None = None
|
||||
bad_msg: PayloadMsg|None = None
|
||||
|
||||
|
|
|
@ -49,7 +49,8 @@ from tractor._exceptions import (
|
|||
MsgTypeError,
|
||||
pack_from_raise,
|
||||
TransportClosed,
|
||||
_mk_msg_type_err,
|
||||
_mk_send_mte,
|
||||
_mk_recv_mte,
|
||||
)
|
||||
from tractor.msg import (
|
||||
_ctxvar_MsgCodec,
|
||||
|
@ -256,7 +257,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
# and always raise such that spec violations
|
||||
# are never allowed to be caught silently!
|
||||
except msgspec.ValidationError as verr:
|
||||
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
||||
msgtyperr: MsgTypeError = _mk_recv_mte(
|
||||
msg=msg_bytes,
|
||||
codec=codec,
|
||||
src_validation_error=verr,
|
||||
|
@ -321,7 +322,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
|
||||
if type(msg) not in msgtypes.__msg_types__:
|
||||
if strict_types:
|
||||
raise _mk_msg_type_err(
|
||||
raise _mk_send_mte(
|
||||
msg,
|
||||
codec=codec,
|
||||
)
|
||||
|
@ -333,8 +334,9 @@ class MsgpackTCPStream(MsgTransport):
|
|||
|
||||
try:
|
||||
bytes_data: bytes = codec.encode(msg)
|
||||
except TypeError as typerr:
|
||||
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
||||
except TypeError as _err:
|
||||
typerr = _err
|
||||
msgtyperr: MsgTypeError = _mk_send_mte(
|
||||
msg,
|
||||
codec=codec,
|
||||
message=(
|
||||
|
|
|
@ -47,7 +47,7 @@ from tractor._exceptions import (
|
|||
InternalError,
|
||||
_raise_from_unexpected_msg,
|
||||
MsgTypeError,
|
||||
_mk_msg_type_err,
|
||||
_mk_recv_mte,
|
||||
pack_error,
|
||||
)
|
||||
from tractor._state import current_ipc_ctx
|
||||
|
@ -264,7 +264,7 @@ class PldRx(Struct):
|
|||
# pack mgterr into error-msg for
|
||||
# reraise below; ensure remote-actor-err
|
||||
# info is displayed nicely?
|
||||
mte: MsgTypeError = _mk_msg_type_err(
|
||||
mte: MsgTypeError = _mk_recv_mte(
|
||||
msg=msg,
|
||||
codec=self.pld_dec,
|
||||
src_validation_error=valerr,
|
||||
|
@ -277,19 +277,6 @@ class PldRx(Struct):
|
|||
if is_started_send_side:
|
||||
raise mte
|
||||
|
||||
# XXX TODO: remove this right?
|
||||
# => any bad stated/return values should
|
||||
# always be treated a remote errors right?
|
||||
#
|
||||
# if (
|
||||
# expect_msg is Return
|
||||
# or expect_msg is Started
|
||||
# ):
|
||||
# # set emulated remote error more-or-less as the
|
||||
# # runtime would
|
||||
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
# ctx._maybe_cancel_and_set_remote_error(mte)
|
||||
|
||||
# NOTE: the `.message` is automatically
|
||||
# transferred into the message as long as we
|
||||
# define it as a `Error.message` field.
|
||||
|
@ -799,13 +786,18 @@ def validate_payload_msg(
|
|||
|
||||
# raise any msg type error NO MATTER WHAT!
|
||||
except ValidationError as verr:
|
||||
mte: MsgTypeError = _mk_msg_type_err(
|
||||
try:
|
||||
mte: MsgTypeError = _mk_recv_mte(
|
||||
msg=roundtripped,
|
||||
codec=codec,
|
||||
src_validation_error=verr,
|
||||
verb_header='Trying to send ',
|
||||
is_invalid_payload=True,
|
||||
)
|
||||
except BaseException:
|
||||
__tracebackhide__: bool = False
|
||||
raise
|
||||
|
||||
if not raise_mte:
|
||||
return mte
|
||||
|
||||
|
|
|
@ -73,11 +73,22 @@ class PayloadMsg(
|
|||
# as_array=True,
|
||||
):
|
||||
'''
|
||||
The "god" boxing msg type.
|
||||
An abstract payload boxing/shuttling IPC msg type.
|
||||
|
||||
Boxes user data-msgs in a `.pld` and uses `msgspec`'s tagged
|
||||
unions support to enable a spec from a common msg inheritance
|
||||
tree.
|
||||
Boxes data-values passed to/from user code
|
||||
|
||||
(i.e. any values passed by `tractor` application code using any of
|
||||
|
||||
|_ `._streaming.MsgStream.send/receive()`
|
||||
|_ `._context.Context.started/result()`
|
||||
|_ `._ipc.Channel.send/recv()`
|
||||
|
||||
aka our "IPC primitive APIs")
|
||||
|
||||
as message "payloads" set to the `.pld` field and uses
|
||||
`msgspec`'s "tagged unions" feature to support a subset of our
|
||||
"SC-transitive shuttle protocol" specification with
|
||||
a `msgspec.Struct` inheritance tree.
|
||||
|
||||
'''
|
||||
cid: str # call/context-id
|
||||
|
|
Loading…
Reference in New Issue