diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 9016324..b2ba6e8 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -24,6 +24,7 @@ import importlib from pprint import pformat from typing import ( Any, + Callable, Type, TYPE_CHECKING, ) @@ -32,8 +33,11 @@ import traceback import trio from msgspec import ( - structs, defstruct, + msgpack, + Raw, + structs, + ValidationError, ) from tractor._state import current_actor @@ -44,6 +48,8 @@ from tractor.msg import ( Stop, Yield, types as msgtypes, + MsgCodec, + MsgDec, ) from tractor.msg.pretty_struct import ( iter_fields, @@ -932,7 +938,7 @@ def is_multi_cancelled(exc: BaseException) -> bool: return False -def _raise_from_no_key_in_msg( +def _raise_from_unexpected_msg( ctx: Context, msg: MsgType, src_err: AttributeError, @@ -1032,7 +1038,6 @@ def _raise_from_no_key_in_msg( # that arrived which is probably the source of this stream # closure ctx.maybe_raise() - raise eoc from src_err if ( @@ -1052,3 +1057,128 @@ def _raise_from_no_key_in_msg( " BUT received a non-error msg:\n" f'{pformat(msg)}' ) from src_err + + +_raise_from_no_key_in_msg = _raise_from_unexpected_msg + + +def _mk_msg_type_err( + msg: Any|bytes|Raw, + codec: MsgCodec|MsgDec, + + message: str|None = None, + verb_header: str = '', + + src_validation_error: ValidationError|None = None, + src_type_error: TypeError|None = None, + +) -> MsgTypeError: + ''' + Compose a `MsgTypeError` from an input runtime context. + + ''' + # `Channel.send()` case + if src_validation_error is None: + + if isinstance(codec, MsgDec): + raise RuntimeError( + '`codec` must be a `MsgCodec` for send-side errors?' + ) + + # no src error from `msgspec.msgpack.Decoder.decode()` so + # prolly a manual type-check on our part. + if message is None: + fmt_spec: str = codec.pformat_msg_spec() + fmt_stack: str = ( + '\n'.join(traceback.format_stack(limit=3)) + ) + tb_fmt: str = pformat_boxed_tb( + tb_str=fmt_stack, + # fields_str=header, + field_prefix=' ', + indent='', + ) + message: str = ( + f'invalid msg -> {msg}: {type(msg)}\n\n' + f'{tb_fmt}\n' + f'Valid IPC msgs are:\n\n' + # f' ------ - ------\n' + f'{fmt_spec}\n', + ) + elif src_type_error: + src_message: str = str(src_type_error) + patt: str = 'type ' + type_idx: int = src_message.find('type ') + invalid_type: str = src_message[type_idx + len(patt):].split()[0] + + enc_hook: Callable|None = codec.enc.enc_hook + if enc_hook is None: + message += ( + '\n\n' + + f"The current IPC-msg codec can't encode type `{invalid_type}` !\n" + f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n' + + f'Check the `msgspec` docs for ad-hoc type extending:\n' + '|_ https://jcristharif.com/msgspec/extending.html\n' + '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' + ) + + + msgtyperr = MsgTypeError( + message=message, + ipc_msg=msg, + ) + # ya, might be `None` + msgtyperr.__cause__ = src_type_error + return msgtyperr + + # `Channel.recv()` case + else: + # decode the msg-bytes using the std msgpack + # interchange-prot (i.e. without any + # `msgspec.Struct` handling) so that we can + # determine what `.msg.types.Msg` is the culprit + # by reporting the received value. + msg_dict: dict = msgpack.decode(msg) + msg_type_name: str = msg_dict['msg_type'] + msg_type = getattr(msgtypes, msg_type_name) + message: str = ( + f'invalid `{msg_type_name}` IPC msg\n\n' + ) + if verb_header: + message = f'{verb_header} ' + message + + # XXX see if we can determine the exact invalid field + # such that we can comprehensively report the + # specific field's type problem + msgspec_msg: str = src_validation_error.args[0].rstrip('`') + msg, _, maybe_field = msgspec_msg.rpartition('$.') + obj = object() + if (field_val := msg_dict.get(maybe_field, obj)) is not obj: + field_name_expr: str = ( + f' |_{maybe_field}: {codec.pld_spec_str} = ' + ) + fmt_val_lines: list[str] = pformat(field_val).splitlines() + fmt_val: str = ( + f'{fmt_val_lines[0]}\n' + + + textwrap.indent( + '\n'.join(fmt_val_lines[1:]), + prefix=' '*len(field_name_expr), + ) + ) + message += ( + f'{msg.rstrip("`")}\n\n' + f'<{msg_type.__qualname__}(\n' + # f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n' + f'{field_name_expr}{fmt_val}\n' + f')>' + ) + + msgtyperr = MsgTypeError.from_decode( + message=message, + msgdict=msg_dict, + ) + msgtyperr.__cause__ = src_validation_error + return msgtyperr diff --git a/tractor/_ipc.py b/tractor/_ipc.py index f76d4ef..70774be 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -49,6 +49,7 @@ from tractor._exceptions import ( MsgTypeError, pack_from_raise, TransportClosed, + _mk_msg_type_err, ) from tractor.msg import ( _ctxvar_MsgCodec, @@ -118,127 +119,6 @@ class MsgTransport(Protocol[MsgType]): ... -def _mk_msg_type_err( - msg: Any|bytes, - codec: MsgCodec, - - message: str|None = None, - verb_header: str = '', - - src_validation_error: msgspec.ValidationError|None = None, - src_type_error: TypeError|None = None, - -) -> MsgTypeError: - - import textwrap - - # `Channel.send()` case - if src_validation_error is None: # send-side - - # no src error from `msgspec.msgpack.Decoder.decode()` so - # prolly a manual type-check on our part. - if message is None: - import traceback - from tractor._exceptions import pformat_boxed_tb - - fmt_spec: str = '\n'.join( - map(str, codec.msg_spec.__args__) - ) - fmt_stack: str = ( - '\n'.join(traceback.format_stack(limit=3)) - ) - tb_fmt: str = pformat_boxed_tb( - tb_str=fmt_stack, - # fields_str=header, - field_prefix=' ', - indent='', - ) - message: str = ( - f'invalid msg -> {msg}: {type(msg)}\n\n' - f'{tb_fmt}\n' - f'Valid IPC msgs are:\n\n' - # f' ------ - ------\n' - f'{fmt_spec}\n', - ) - elif src_type_error: - src_message: str = str(src_type_error) - patt: str = 'type ' - type_idx: int = src_message.find('type ') - invalid_type: str = src_message[type_idx + len(patt):].split()[0] - - enc_hook: Callable|None = codec.enc.enc_hook - if enc_hook is None: - message += ( - '\n\n' - - f"The current IPC-msg codec can't encode type `{invalid_type}` !\n" - f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n' - - f'Check the `msgspec` docs for ad-hoc type extending:\n' - '|_ https://jcristharif.com/msgspec/extending.html\n' - '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' - ) - - - msgtyperr = MsgTypeError( - message=message, - ipc_msg=msg, - ) - # ya, might be `None` - msgtyperr.__cause__ = src_type_error - return msgtyperr - - # `Channel.recv()` case - else: - # decode the msg-bytes using the std msgpack - # interchange-prot (i.e. without any - # `msgspec.Struct` handling) so that we can - # determine what `.msg.types.Msg` is the culprit - # by reporting the received value. - msg_dict: dict = msgspec.msgpack.decode(msg) - msg_type_name: str = msg_dict['msg_type'] - msg_type = getattr(msgtypes, msg_type_name) - message: str = ( - f'invalid `{msg_type_name}` IPC msg\n\n' - ) - if verb_header: - message = f'{verb_header} ' + message - - # XXX see if we can determine the exact invalid field - # such that we can comprehensively report the - # specific field's type problem - msgspec_msg: str = src_validation_error.args[0].rstrip('`') - msg, _, maybe_field = msgspec_msg.rpartition('$.') - obj = object() - if (field_val := msg_dict.get(maybe_field, obj)) is not obj: - field_name_expr: str = ( - f' |_{maybe_field}: {codec.pld_spec_str} = ' - ) - fmt_val_lines: list[str] = pformat(field_val).splitlines() - fmt_val: str = ( - f'{fmt_val_lines[0]}\n' - + - textwrap.indent( - '\n'.join(fmt_val_lines[1:]), - prefix=' '*len(field_name_expr), - ) - ) - message += ( - f'{msg.rstrip("`")}\n\n' - f'<{msg_type.__qualname__}(\n' - # f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n' - f'{field_name_expr}{fmt_val}\n' - f')>' - ) - - msgtyperr = MsgTypeError.from_decode( - message=message, - msgdict=msg_dict, - ) - msgtyperr.__cause__ = src_validation_error - return msgtyperr - - # TODO: not sure why we have to inherit here, but it seems to be an # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; # probably should make a `mypy` issue? @@ -299,10 +179,10 @@ class MsgpackTCPStream(MsgTransport): _codec._ctxvar_MsgCodec.get() ) # TODO: mask out before release? - log.runtime( - f'New {self} created with codec\n' - f'codec: {self._codec}\n' - ) + # log.runtime( + # f'New {self} created with codec\n' + # f'codec: {self._codec}\n' + # ) async def _iter_packets(self) -> AsyncGenerator[dict, None]: '''