From a35c1d40ab37a9cc45a23b35ff3a843b9449cda8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 9 Apr 2024 10:36:25 -0400 Subject: [PATCH] Refine `MsgTypeError` handling to relay-up-on-`.recv()` Such that `Channel.recv()` + `MsgpackTCPStream.recv()` originating msg-type-errors are not raised at the IPC transport layer but instead relayed up the runtime stack for eventual handling by user-app code via the `Context`/`MsgStream` layer APIs. This design choice leads to a substantial amount of flexibility and modularity, and avoids `MsgTypeError` handling policies from being coupled to a particular backend IPC transport layer: - receive-side msg-type errors, as can be raised and handled in the `.open_stream()` "nasty" phase of a ctx, whilst being packed at the `MsgCodec`/transport layer (keeping the underlying src decode error coupled to the specific transport + interchange lib) and then relayed upward to app code for custom handling like a normal Error` msg. - the policy options for handling such cases could be implemented as `@acm` wrappers around `.open_context()`/`.open_stream()` blocks (and their respective delivered primitives) OR just plain old async generators around `MsgStream.receive()` such that both built-in policy handling and custom user-app solutions can be swapped without touching any `tractor` internals or providing specialized "registry APIs". -> eg. the ignore and relay-invalid-msg-to-sender approach can be more easily implemented as embedded `try: except MsgTypeError:` blocks around `MsgStream.receive()` possibly applied as either of an injected wrapper type around a stream or an async gen that `async for`s from the stream. - any performance based AOT-lang extensions used to implement a policy for handling recv-side errors space can avoid knowledge of the lower level IPC `Channel` (and-downward) primitives. - `Context` consuming code can choose to let all msg-type-errs bubble and handle them manually (like any other remote `Error` shuttled exception). - we can keep (as before) send-side msg type checks can be raised locally and cause offending senders to error and adjust before the streaming phase of an IPC ctx. Impl (related) deats: - obvi make `MsgpackTCPStream.recv()` yield up any `MsgTypeError` constructed by `_mk_msg_type_err()` such that the exception will eventually be relayed up to `._rpc.process_messages()` and from their delivered to the corresponding ctx-task. - in support of ^, make `Channel.recv()` detect said mtes and use the new `pack_from_raise()` to inject the far end `Actor.uid` for the `Error.src_uid`. - keep raising the send side equivalent (when strict enabled) errors inline immediately with no upward `Error` packing or relay. - improve `_mk_msg_type_err()` cases handling with far more detailed `MsgTypeError` "message" contents pertaining to `msgspec` specific failure-fixing-tips and type-spec mismatch info: * use `.from_decode()` constructor in recv-side case to inject the non-spec decoded `msg_dict: dict` and use the new `MsgCodec.pld_spec_str: str` when clarifying the type discrepancy with the offending field. * on send-side, if we detect that an unsupported field type was described in the original `src_type_error`, AND there is no `msgpack.Encoder.enc_hook()` set, that the real issue is likely that the user needs to extend the codec to support the non-std/custom type with a hook and link to `msgspec` docs. * if one of a `src_type/validation_error` is provided, set that error as the `.__cause__` in the new mte. --- tractor/_ipc.py | 163 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 115 insertions(+), 48 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 694eaf9..7713811 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -38,7 +38,6 @@ from typing import ( Protocol, Type, TypeVar, - Union, ) import msgspec @@ -47,8 +46,9 @@ import trio from tractor.log import get_logger from tractor._exceptions import ( - TransportClosed, MsgTypeError, + pack_from_raise, + TransportClosed, ) from tractor.msg import ( _ctxvar_MsgCodec, @@ -118,40 +118,75 @@ class MsgTransport(Protocol[MsgType]): ... -def _raise_msg_type_err( +def _mk_msg_type_err( msg: Any|bytes, codec: MsgCodec, - validation_err: msgspec.ValidationError|None = None, + + message: str|None = None, verb_header: str = '', -) -> None: + src_validation_error: msgspec.ValidationError|None = None, + src_type_error: TypeError|None = None, - # if side == 'send': - if validation_err is None: # send-side +) -> MsgTypeError: - import traceback - from tractor._exceptions import pformat_boxed_tb + # `Channel.send()` case + if src_validation_error is None: # send-side - fmt_spec: str = '\n'.join( - map(str, codec.msg_spec.__args__) - ) - fmt_stack: str = ( - '\n'.join(traceback.format_stack(limit=3)) - ) - tb_fmt: str = pformat_boxed_tb( - tb_str=fmt_stack, - # fields_str=header, - field_prefix=' ', - indent='', - ) - raise MsgTypeError( - f'invalid msg -> {msg}: {type(msg)}\n\n' - f'{tb_fmt}\n' - f'Valid IPC msgs are:\n\n' - # f' ------ - ------\n' - f'{fmt_spec}\n' - ) + # no src error from `msgspec.msgpack.Decoder.decode()` so + # prolly a manual type-check on our part. + if message is None: + import traceback + from tractor._exceptions import pformat_boxed_tb + fmt_spec: str = '\n'.join( + map(str, codec.msg_spec.__args__) + ) + fmt_stack: str = ( + '\n'.join(traceback.format_stack(limit=3)) + ) + tb_fmt: str = pformat_boxed_tb( + tb_str=fmt_stack, + # fields_str=header, + field_prefix=' ', + indent='', + ) + message: str = ( + f'invalid msg -> {msg}: {type(msg)}\n\n' + f'{tb_fmt}\n' + f'Valid IPC msgs are:\n\n' + # f' ------ - ------\n' + f'{fmt_spec}\n', + ) + elif src_type_error: + src_message: str = str(src_type_error) + patt: str = 'type ' + type_idx: int = src_message.find('type ') + invalid_type: str = src_message[type_idx + len(patt):].split()[0] + + enc_hook: Callable|None = codec.enc.enc_hook + if enc_hook is None: + message += ( + '\n\n' + + f"The current IPC-msg codec can't encode type `{invalid_type}` !\n" + f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n' + + f'Check the `msgspec` docs for ad-hoc type extending:\n' + '|_ https://jcristharif.com/msgspec/extending.html\n' + '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' + ) + + + msgtyperr = MsgTypeError( + message=message, + ipc_msg=msg, + ) + # ya, might be `None` + msgtyperr.__cause__ = src_type_error + return msgtyperr + + # `Channel.recv()` case else: # decode the msg-bytes using the std msgpack # interchange-prot (i.e. without any @@ -161,29 +196,31 @@ def _raise_msg_type_err( msg_dict: dict = msgspec.msgpack.decode(msg) msg_type_name: str = msg_dict['msg_type'] msg_type = getattr(msgtypes, msg_type_name) - errmsg: str = ( + message: str = ( f'invalid `{msg_type_name}` IPC msg\n\n' ) if verb_header: - errmsg = f'{verb_header} ' + errmsg + message = f'{verb_header} ' + message # XXX see if we can determine the exact invalid field # such that we can comprehensively report the # specific field's type problem - msgspec_msg: str = validation_err.args[0].rstrip('`') + msgspec_msg: str = src_validation_error.args[0].rstrip('`') msg, _, maybe_field = msgspec_msg.rpartition('$.') obj = object() if (field_val := msg_dict.get(maybe_field, obj)) is not obj: - field_type: Union[Type] = msg_type.__signature__.parameters[ - maybe_field - ].annotation - errmsg += ( + message += ( f'{msg.rstrip("`")}\n\n' f'{msg_type}\n' - f' |_.{maybe_field}: {field_type} = {field_val!r}\n' + f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n' ) - raise MsgTypeError(errmsg) from validation_err + msgtyperr = MsgTypeError.from_decode( + message=message, + msgdict=msg_dict, + ) + msgtyperr.__cause__ = src_validation_error + return msgtyperr # TODO: not sure why we have to inherit here, but it seems to be an @@ -325,12 +362,15 @@ class MsgpackTCPStream(MsgTransport): # and always raise such that spec violations # are never allowed to be caught silently! except msgspec.ValidationError as verr: - # re-raise as type error - _raise_msg_type_err( + msgtyperr: MsgTypeError = _mk_msg_type_err( msg=msg_bytes, codec=codec, - validation_err=verr, + src_validation_error=verr, ) + # XXX deliver up to `Channel.recv()` where + # a re-raise and `Error`-pack can inject the far + # end actor `.uid`. + yield msgtyperr except ( msgspec.DecodeError, @@ -387,7 +427,7 @@ class MsgpackTCPStream(MsgTransport): if type(msg) not in msgtypes.__msg_types__: if strict_types: - _raise_msg_type_err( + raise _mk_msg_type_err( msg, codec=codec, ) @@ -400,11 +440,16 @@ class MsgpackTCPStream(MsgTransport): try: bytes_data: bytes = codec.encode(msg) except TypeError as typerr: - raise MsgTypeError( - 'A msg field violates the current spec\n' - f'{codec.pld_spec}\n\n' - f'{pretty_struct.Struct.pformat(msg)}' - ) from typerr + msgtyperr: MsgTypeError = _mk_msg_type_err( + msg, + codec=codec, + message=( + f'IPC-msg-spec violation in\n\n' + f'{pretty_struct.Struct.pformat(msg)}' + ), + src_type_error=typerr, + ) + raise msgtyperr from typerr # supposedly the fastest says, # https://stackoverflow.com/a/54027962 @@ -719,13 +764,35 @@ class Channel: assert self._transport while True: try: - async for item in self._transport: - yield item + async for msg in self._transport: + match msg: + # NOTE: if transport/interchange delivers + # a type error, we pack it with the far + # end peer `Actor.uid` and relay the + # `Error`-msg upward to the `._rpc` stack + # for normal RAE handling. + case MsgTypeError(): + yield pack_from_raise( + local_err=msg, + cid=msg.cid, + + # XXX we pack it here bc lower + # layers have no notion of an + # actor-id ;) + src_uid=self.uid, + ) + case _: + yield msg + + # TODO: if we were gonna do this it should be + # done up at the `MsgStream` layer! + # # sent = yield item # if sent is not None: # # optimization, passing None through all the # # time is pointless # await self._transport.send(sent) + except trio.BrokenResourceError: # if not self._autorecon: