diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 694eaf9..7713811 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -38,7 +38,6 @@ from typing import ( Protocol, Type, TypeVar, - Union, ) import msgspec @@ -47,8 +46,9 @@ import trio from tractor.log import get_logger from tractor._exceptions import ( - TransportClosed, MsgTypeError, + pack_from_raise, + TransportClosed, ) from tractor.msg import ( _ctxvar_MsgCodec, @@ -118,40 +118,75 @@ class MsgTransport(Protocol[MsgType]): ... -def _raise_msg_type_err( +def _mk_msg_type_err( msg: Any|bytes, codec: MsgCodec, - validation_err: msgspec.ValidationError|None = None, + + message: str|None = None, verb_header: str = '', -) -> None: + src_validation_error: msgspec.ValidationError|None = None, + src_type_error: TypeError|None = None, - # if side == 'send': - if validation_err is None: # send-side +) -> MsgTypeError: - import traceback - from tractor._exceptions import pformat_boxed_tb + # `Channel.send()` case + if src_validation_error is None: # send-side - fmt_spec: str = '\n'.join( - map(str, codec.msg_spec.__args__) - ) - fmt_stack: str = ( - '\n'.join(traceback.format_stack(limit=3)) - ) - tb_fmt: str = pformat_boxed_tb( - tb_str=fmt_stack, - # fields_str=header, - field_prefix=' ', - indent='', - ) - raise MsgTypeError( - f'invalid msg -> {msg}: {type(msg)}\n\n' - f'{tb_fmt}\n' - f'Valid IPC msgs are:\n\n' - # f' ------ - ------\n' - f'{fmt_spec}\n' - ) + # no src error from `msgspec.msgpack.Decoder.decode()` so + # prolly a manual type-check on our part. + if message is None: + import traceback + from tractor._exceptions import pformat_boxed_tb + fmt_spec: str = '\n'.join( + map(str, codec.msg_spec.__args__) + ) + fmt_stack: str = ( + '\n'.join(traceback.format_stack(limit=3)) + ) + tb_fmt: str = pformat_boxed_tb( + tb_str=fmt_stack, + # fields_str=header, + field_prefix=' ', + indent='', + ) + message: str = ( + f'invalid msg -> {msg}: {type(msg)}\n\n' + f'{tb_fmt}\n' + f'Valid IPC msgs are:\n\n' + # f' ------ - ------\n' + f'{fmt_spec}\n', + ) + elif src_type_error: + src_message: str = str(src_type_error) + patt: str = 'type ' + type_idx: int = src_message.find('type ') + invalid_type: str = src_message[type_idx + len(patt):].split()[0] + + enc_hook: Callable|None = codec.enc.enc_hook + if enc_hook is None: + message += ( + '\n\n' + + f"The current IPC-msg codec can't encode type `{invalid_type}` !\n" + f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n' + + f'Check the `msgspec` docs for ad-hoc type extending:\n' + '|_ https://jcristharif.com/msgspec/extending.html\n' + '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' + ) + + + msgtyperr = MsgTypeError( + message=message, + ipc_msg=msg, + ) + # ya, might be `None` + msgtyperr.__cause__ = src_type_error + return msgtyperr + + # `Channel.recv()` case else: # decode the msg-bytes using the std msgpack # interchange-prot (i.e. without any @@ -161,29 +196,31 @@ def _raise_msg_type_err( msg_dict: dict = msgspec.msgpack.decode(msg) msg_type_name: str = msg_dict['msg_type'] msg_type = getattr(msgtypes, msg_type_name) - errmsg: str = ( + message: str = ( f'invalid `{msg_type_name}` IPC msg\n\n' ) if verb_header: - errmsg = f'{verb_header} ' + errmsg + message = f'{verb_header} ' + message # XXX see if we can determine the exact invalid field # such that we can comprehensively report the # specific field's type problem - msgspec_msg: str = validation_err.args[0].rstrip('`') + msgspec_msg: str = src_validation_error.args[0].rstrip('`') msg, _, maybe_field = msgspec_msg.rpartition('$.') obj = object() if (field_val := msg_dict.get(maybe_field, obj)) is not obj: - field_type: Union[Type] = msg_type.__signature__.parameters[ - maybe_field - ].annotation - errmsg += ( + message += ( f'{msg.rstrip("`")}\n\n' f'{msg_type}\n' - f' |_.{maybe_field}: {field_type} = {field_val!r}\n' + f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n' ) - raise MsgTypeError(errmsg) from validation_err + msgtyperr = MsgTypeError.from_decode( + message=message, + msgdict=msg_dict, + ) + msgtyperr.__cause__ = src_validation_error + return msgtyperr # TODO: not sure why we have to inherit here, but it seems to be an @@ -325,12 +362,15 @@ class MsgpackTCPStream(MsgTransport): # and always raise such that spec violations # are never allowed to be caught silently! except msgspec.ValidationError as verr: - # re-raise as type error - _raise_msg_type_err( + msgtyperr: MsgTypeError = _mk_msg_type_err( msg=msg_bytes, codec=codec, - validation_err=verr, + src_validation_error=verr, ) + # XXX deliver up to `Channel.recv()` where + # a re-raise and `Error`-pack can inject the far + # end actor `.uid`. + yield msgtyperr except ( msgspec.DecodeError, @@ -387,7 +427,7 @@ class MsgpackTCPStream(MsgTransport): if type(msg) not in msgtypes.__msg_types__: if strict_types: - _raise_msg_type_err( + raise _mk_msg_type_err( msg, codec=codec, ) @@ -400,11 +440,16 @@ class MsgpackTCPStream(MsgTransport): try: bytes_data: bytes = codec.encode(msg) except TypeError as typerr: - raise MsgTypeError( - 'A msg field violates the current spec\n' - f'{codec.pld_spec}\n\n' - f'{pretty_struct.Struct.pformat(msg)}' - ) from typerr + msgtyperr: MsgTypeError = _mk_msg_type_err( + msg, + codec=codec, + message=( + f'IPC-msg-spec violation in\n\n' + f'{pretty_struct.Struct.pformat(msg)}' + ), + src_type_error=typerr, + ) + raise msgtyperr from typerr # supposedly the fastest says, # https://stackoverflow.com/a/54027962 @@ -719,13 +764,35 @@ class Channel: assert self._transport while True: try: - async for item in self._transport: - yield item + async for msg in self._transport: + match msg: + # NOTE: if transport/interchange delivers + # a type error, we pack it with the far + # end peer `Actor.uid` and relay the + # `Error`-msg upward to the `._rpc` stack + # for normal RAE handling. + case MsgTypeError(): + yield pack_from_raise( + local_err=msg, + cid=msg.cid, + + # XXX we pack it here bc lower + # layers have no notion of an + # actor-id ;) + src_uid=self.uid, + ) + case _: + yield msg + + # TODO: if we were gonna do this it should be + # done up at the `MsgStream` layer! + # # sent = yield item # if sent is not None: # # optimization, passing None through all the # # time is pointless # await self._transport.send(sent) + except trio.BrokenResourceError: # if not self._autorecon: