forked from goodboy/tractor
Refine `MsgTypeError` handling to relay-up-on-`.recv()`
Such that `Channel.recv()` + `MsgpackTCPStream.recv()` originating msg-type-errors are not raised at the IPC transport layer but instead relayed up the runtime stack for eventual handling by user-app code via the `Context`/`MsgStream` layer APIs. This design choice leads to a substantial amount of flexibility and modularity, and avoids `MsgTypeError` handling policies from being coupled to a particular backend IPC transport layer: - receive-side msg-type errors, as can be raised and handled in the `.open_stream()` "nasty" phase of a ctx, whilst being packed at the `MsgCodec`/transport layer (keeping the underlying src decode error coupled to the specific transport + interchange lib) and then relayed upward to app code for custom handling like a normal Error` msg. - the policy options for handling such cases could be implemented as `@acm` wrappers around `.open_context()`/`.open_stream()` blocks (and their respective delivered primitives) OR just plain old async generators around `MsgStream.receive()` such that both built-in policy handling and custom user-app solutions can be swapped without touching any `tractor` internals or providing specialized "registry APIs". -> eg. the ignore and relay-invalid-msg-to-sender approach can be more easily implemented as embedded `try: except MsgTypeError:` blocks around `MsgStream.receive()` possibly applied as either of an injected wrapper type around a stream or an async gen that `async for`s from the stream. - any performance based AOT-lang extensions used to implement a policy for handling recv-side errors space can avoid knowledge of the lower level IPC `Channel` (and-downward) primitives. - `Context` consuming code can choose to let all msg-type-errs bubble and handle them manually (like any other remote `Error` shuttled exception). - we can keep (as before) send-side msg type checks can be raised locally and cause offending senders to error and adjust before the streaming phase of an IPC ctx. Impl (related) deats: - obvi make `MsgpackTCPStream.recv()` yield up any `MsgTypeError` constructed by `_mk_msg_type_err()` such that the exception will eventually be relayed up to `._rpc.process_messages()` and from their delivered to the corresponding ctx-task. - in support of ^, make `Channel.recv()` detect said mtes and use the new `pack_from_raise()` to inject the far end `Actor.uid` for the `Error.src_uid`. - keep raising the send side equivalent (when strict enabled) errors inline immediately with no upward `Error` packing or relay. - improve `_mk_msg_type_err()` cases handling with far more detailed `MsgTypeError` "message" contents pertaining to `msgspec` specific failure-fixing-tips and type-spec mismatch info: * use `.from_decode()` constructor in recv-side case to inject the non-spec decoded `msg_dict: dict` and use the new `MsgCodec.pld_spec_str: str` when clarifying the type discrepancy with the offending field. * on send-side, if we detect that an unsupported field type was described in the original `src_type_error`, AND there is no `msgpack.Encoder.enc_hook()` set, that the real issue is likely that the user needs to extend the codec to support the non-std/custom type with a hook and link to `msgspec` docs. * if one of a `src_type/validation_error` is provided, set that error as the `.__cause__` in the new mte.runtime_to_msgspec
parent
15549f7c26
commit
a35c1d40ab
123
tractor/_ipc.py
123
tractor/_ipc.py
|
@ -38,7 +38,6 @@ from typing import (
|
|||
Protocol,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
import msgspec
|
||||
|
@ -47,8 +46,9 @@ import trio
|
|||
|
||||
from tractor.log import get_logger
|
||||
from tractor._exceptions import (
|
||||
TransportClosed,
|
||||
MsgTypeError,
|
||||
pack_from_raise,
|
||||
TransportClosed,
|
||||
)
|
||||
from tractor.msg import (
|
||||
_ctxvar_MsgCodec,
|
||||
|
@ -118,17 +118,24 @@ class MsgTransport(Protocol[MsgType]):
|
|||
...
|
||||
|
||||
|
||||
def _raise_msg_type_err(
|
||||
def _mk_msg_type_err(
|
||||
msg: Any|bytes,
|
||||
codec: MsgCodec,
|
||||
validation_err: msgspec.ValidationError|None = None,
|
||||
|
||||
message: str|None = None,
|
||||
verb_header: str = '',
|
||||
|
||||
) -> None:
|
||||
src_validation_error: msgspec.ValidationError|None = None,
|
||||
src_type_error: TypeError|None = None,
|
||||
|
||||
# if side == 'send':
|
||||
if validation_err is None: # send-side
|
||||
) -> MsgTypeError:
|
||||
|
||||
# `Channel.send()` case
|
||||
if src_validation_error is None: # send-side
|
||||
|
||||
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||
# prolly a manual type-check on our part.
|
||||
if message is None:
|
||||
import traceback
|
||||
from tractor._exceptions import pformat_boxed_tb
|
||||
|
||||
|
@ -144,14 +151,42 @@ def _raise_msg_type_err(
|
|||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
raise MsgTypeError(
|
||||
message: str = (
|
||||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||
f'{tb_fmt}\n'
|
||||
f'Valid IPC msgs are:\n\n'
|
||||
# f' ------ - ------\n'
|
||||
f'{fmt_spec}\n'
|
||||
f'{fmt_spec}\n',
|
||||
)
|
||||
elif src_type_error:
|
||||
src_message: str = str(src_type_error)
|
||||
patt: str = 'type '
|
||||
type_idx: int = src_message.find('type ')
|
||||
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
|
||||
|
||||
enc_hook: Callable|None = codec.enc.enc_hook
|
||||
if enc_hook is None:
|
||||
message += (
|
||||
'\n\n'
|
||||
|
||||
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
|
||||
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
|
||||
|
||||
f'Check the `msgspec` docs for ad-hoc type extending:\n'
|
||||
'|_ https://jcristharif.com/msgspec/extending.html\n'
|
||||
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
||||
)
|
||||
|
||||
|
||||
msgtyperr = MsgTypeError(
|
||||
message=message,
|
||||
ipc_msg=msg,
|
||||
)
|
||||
# ya, might be `None`
|
||||
msgtyperr.__cause__ = src_type_error
|
||||
return msgtyperr
|
||||
|
||||
# `Channel.recv()` case
|
||||
else:
|
||||
# decode the msg-bytes using the std msgpack
|
||||
# interchange-prot (i.e. without any
|
||||
|
@ -161,29 +196,31 @@ def _raise_msg_type_err(
|
|||
msg_dict: dict = msgspec.msgpack.decode(msg)
|
||||
msg_type_name: str = msg_dict['msg_type']
|
||||
msg_type = getattr(msgtypes, msg_type_name)
|
||||
errmsg: str = (
|
||||
message: str = (
|
||||
f'invalid `{msg_type_name}` IPC msg\n\n'
|
||||
)
|
||||
if verb_header:
|
||||
errmsg = f'{verb_header} ' + errmsg
|
||||
message = f'{verb_header} ' + message
|
||||
|
||||
# XXX see if we can determine the exact invalid field
|
||||
# such that we can comprehensively report the
|
||||
# specific field's type problem
|
||||
msgspec_msg: str = validation_err.args[0].rstrip('`')
|
||||
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
|
||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||
obj = object()
|
||||
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
||||
field_type: Union[Type] = msg_type.__signature__.parameters[
|
||||
maybe_field
|
||||
].annotation
|
||||
errmsg += (
|
||||
message += (
|
||||
f'{msg.rstrip("`")}\n\n'
|
||||
f'{msg_type}\n'
|
||||
f' |_.{maybe_field}: {field_type} = {field_val!r}\n'
|
||||
f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n'
|
||||
)
|
||||
|
||||
raise MsgTypeError(errmsg) from validation_err
|
||||
msgtyperr = MsgTypeError.from_decode(
|
||||
message=message,
|
||||
msgdict=msg_dict,
|
||||
)
|
||||
msgtyperr.__cause__ = src_validation_error
|
||||
return msgtyperr
|
||||
|
||||
|
||||
# TODO: not sure why we have to inherit here, but it seems to be an
|
||||
|
@ -325,12 +362,15 @@ class MsgpackTCPStream(MsgTransport):
|
|||
# and always raise such that spec violations
|
||||
# are never allowed to be caught silently!
|
||||
except msgspec.ValidationError as verr:
|
||||
# re-raise as type error
|
||||
_raise_msg_type_err(
|
||||
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
||||
msg=msg_bytes,
|
||||
codec=codec,
|
||||
validation_err=verr,
|
||||
src_validation_error=verr,
|
||||
)
|
||||
# XXX deliver up to `Channel.recv()` where
|
||||
# a re-raise and `Error`-pack can inject the far
|
||||
# end actor `.uid`.
|
||||
yield msgtyperr
|
||||
|
||||
except (
|
||||
msgspec.DecodeError,
|
||||
|
@ -387,7 +427,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
|
||||
if type(msg) not in msgtypes.__msg_types__:
|
||||
if strict_types:
|
||||
_raise_msg_type_err(
|
||||
raise _mk_msg_type_err(
|
||||
msg,
|
||||
codec=codec,
|
||||
)
|
||||
|
@ -400,11 +440,16 @@ class MsgpackTCPStream(MsgTransport):
|
|||
try:
|
||||
bytes_data: bytes = codec.encode(msg)
|
||||
except TypeError as typerr:
|
||||
raise MsgTypeError(
|
||||
'A msg field violates the current spec\n'
|
||||
f'{codec.pld_spec}\n\n'
|
||||
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
||||
msg,
|
||||
codec=codec,
|
||||
message=(
|
||||
f'IPC-msg-spec violation in\n\n'
|
||||
f'{pretty_struct.Struct.pformat(msg)}'
|
||||
) from typerr
|
||||
),
|
||||
src_type_error=typerr,
|
||||
)
|
||||
raise msgtyperr from typerr
|
||||
|
||||
# supposedly the fastest says,
|
||||
# https://stackoverflow.com/a/54027962
|
||||
|
@ -719,13 +764,35 @@ class Channel:
|
|||
assert self._transport
|
||||
while True:
|
||||
try:
|
||||
async for item in self._transport:
|
||||
yield item
|
||||
async for msg in self._transport:
|
||||
match msg:
|
||||
# NOTE: if transport/interchange delivers
|
||||
# a type error, we pack it with the far
|
||||
# end peer `Actor.uid` and relay the
|
||||
# `Error`-msg upward to the `._rpc` stack
|
||||
# for normal RAE handling.
|
||||
case MsgTypeError():
|
||||
yield pack_from_raise(
|
||||
local_err=msg,
|
||||
cid=msg.cid,
|
||||
|
||||
# XXX we pack it here bc lower
|
||||
# layers have no notion of an
|
||||
# actor-id ;)
|
||||
src_uid=self.uid,
|
||||
)
|
||||
case _:
|
||||
yield msg
|
||||
|
||||
# TODO: if we were gonna do this it should be
|
||||
# done up at the `MsgStream` layer!
|
||||
#
|
||||
# sent = yield item
|
||||
# if sent is not None:
|
||||
# # optimization, passing None through all the
|
||||
# # time is pointless
|
||||
# await self._transport.send(sent)
|
||||
|
||||
except trio.BrokenResourceError:
|
||||
|
||||
# if not self._autorecon:
|
||||
|
|
Loading…
Reference in New Issue