diff --git a/tractor/_context.py b/tractor/_context.py index 3b29761..32acf83 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -49,7 +49,6 @@ from typing import ( Any, AsyncGenerator, Callable, - Mapping, Type, TypeAlias, TYPE_CHECKING, @@ -1484,13 +1483,21 @@ class Context: # __tracebackhide__: bool = hide_tb if validate_pld_spec: - msgops.validate_payload_msg( - pld_msg=started_msg, - pld_value=value, - ipc=self, - strict_pld_parity=strict_pld_parity, - hide_tb=hide_tb, - ) + # TODO: prolly wrap this as a `show_frame_when_not()` + try: + msgops.validate_payload_msg( + pld_msg=started_msg, + pld_value=value, + ipc=self, + strict_pld_parity=strict_pld_parity, + hide_tb=hide_tb, + ) + except BaseException as err: + if not isinstance(err, MsgTypeError): + __tracebackhide__: bool = False + + raise + # TODO: maybe a flag to by-pass encode op if already done # here in caller? @@ -2185,11 +2192,6 @@ async def open_context_from_portal( try: result_or_err: Exception|Any = await ctx.result() except BaseException as berr: - # cancelled before (or maybe during?) final result capture - # if isinstance(trio.Cancelled, berr): - # from .devx import mk_pdb - # mk_pdb.set_trace() - # on normal teardown, if we get some error # raised in `Context.result()` we still want to # save that error on the ctx's state to @@ -2201,7 +2203,7 @@ async def open_context_from_portal( ctx._local_error: BaseException = scope_err raise - # yes! this worx Bp + # yes this worx! # from .devx import _debug # await _debug.pause() diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 8ed46eb..7164d6a 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -1232,14 +1232,13 @@ def _raise_from_unexpected_msg( _raise_from_no_key_in_msg = _raise_from_unexpected_msg -def _mk_msg_type_err( +def _mk_send_mte( msg: Any|bytes|MsgType, codec: MsgCodec|MsgDec, message: str|None = None, verb_header: str = '', - src_validation_error: ValidationError|None = None, src_type_error: TypeError|None = None, is_invalid_payload: bool = False, @@ -1247,131 +1246,148 @@ def _mk_msg_type_err( ) -> MsgTypeError: ''' - Compose a `MsgTypeError` from an input runtime context. + Compose a `MsgTypeError` from a `Channel.send()`-side error, + normally raised witih a runtime IPC `Context`. ''' - # `Channel.send()` case - if src_validation_error is None: + if isinstance(codec, MsgDec): + raise RuntimeError( + '`codec` must be a `MsgCodec` for send-side errors?' + ) - if isinstance(codec, MsgDec): - raise RuntimeError( - '`codec` must be a `MsgCodec` for send-side errors?' + from tractor.devx import ( + pformat_caller_frame, + ) + # no src error from `msgspec.msgpack.Decoder.decode()` so + # prolly a manual type-check on our part. + if message is None: + tb_fmt: str = pformat_caller_frame(stack_limit=3) + message: str = ( + f'invalid msg -> {msg}: {type(msg)}\n\n' + f'{tb_fmt}\n' + f'Valid IPC msgs are:\n\n' + f'{codec.msg_spec_str}\n', + ) + elif src_type_error: + src_message: str = str(src_type_error) + patt: str = 'type ' + type_idx: int = src_message.find('type ') + invalid_type: str = src_message[type_idx + len(patt):].split()[0] + + enc_hook: Callable|None = codec.enc.enc_hook + if enc_hook is None: + message += ( + '\n\n' + + f"The current IPC-msg codec can't encode type `{invalid_type}` !\n" + f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n' + + f'Check the `msgspec` docs for ad-hoc type extending:\n' + '|_ https://jcristharif.com/msgspec/extending.html\n' + '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' ) - from tractor.devx import ( - pformat_caller_frame, + msgtyperr = MsgTypeError( + message=message, + _bad_msg=msg, + ) + # ya, might be `None` + msgtyperr.__cause__ = src_type_error + return msgtyperr + + +def _mk_recv_mte( + msg: Any|bytes|MsgType, + codec: MsgCodec|MsgDec, + + message: str|None = None, + verb_header: str = '', + + src_validation_error: ValidationError|None = None, + is_invalid_payload: bool = False, + + **mte_kwargs, + +) -> MsgTypeError: + ''' + Compose a `MsgTypeError` from a + `Channel|Context|MsgStream.receive()`-side error, + normally raised witih a runtime IPC ctx or streaming + block. + + ''' + msg_dict: dict|None = None + bad_msg: PayloadMsg|None = None + + if is_invalid_payload: + msg_type: str = type(msg) + any_pld: Any = msgpack.decode(msg.pld) + message: str = ( + f'invalid `{msg_type.__qualname__}` msg payload\n\n' + f'value: `{any_pld!r}` does not match type-spec: ' + f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' ) - # no src error from `msgspec.msgpack.Decoder.decode()` so - # prolly a manual type-check on our part. - if message is None: - tb_fmt: str = pformat_caller_frame(stack_limit=3) - message: str = ( - f'invalid msg -> {msg}: {type(msg)}\n\n' - f'{tb_fmt}\n' - f'Valid IPC msgs are:\n\n' - f'{codec.msg_spec_str}\n', - ) - elif src_type_error: - src_message: str = str(src_type_error) - patt: str = 'type ' - type_idx: int = src_message.find('type ') - invalid_type: str = src_message[type_idx + len(patt):].split()[0] + bad_msg = msg - enc_hook: Callable|None = codec.enc.enc_hook - if enc_hook is None: - message += ( - '\n\n' - - f"The current IPC-msg codec can't encode type `{invalid_type}` !\n" - f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n' - - f'Check the `msgspec` docs for ad-hoc type extending:\n' - '|_ https://jcristharif.com/msgspec/extending.html\n' - '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' - ) - - msgtyperr = MsgTypeError( - message=message, - _bad_msg=msg, - ) - # ya, might be `None` - msgtyperr.__cause__ = src_type_error - return msgtyperr - - # `Channel.recv()` case else: - msg_dict: dict|None = None - bad_msg: PayloadMsg|None = None - - if is_invalid_payload: - msg_type: str = type(msg) - any_pld: Any = msgpack.decode(msg.pld) - message: str = ( - f'invalid `{msg_type.__qualname__}` msg payload\n\n' - f'value: `{any_pld!r}` does not match type-spec: ' - f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' - ) - bad_msg = msg - - else: - # decode the msg-bytes using the std msgpack - # interchange-prot (i.e. without any `msgspec.Struct` - # handling) so that we can determine what - # `.msg.types.PayloadMsg` is the culprit by reporting the - # received value. - msg: bytes - msg_dict: dict = msgpack.decode(msg) - msg_type_name: str = msg_dict['msg_type'] - msg_type = getattr(msgtypes, msg_type_name) - message: str = ( - f'invalid `{msg_type_name}` IPC msg\n\n' - ) - # XXX be "fancy" and see if we can determine the exact - # invalid field such that we can comprehensively report - # the specific field's type problem. - msgspec_msg: str = src_validation_error.args[0].rstrip('`') - msg, _, maybe_field = msgspec_msg.rpartition('$.') - obj = object() - if (field_val := msg_dict.get(maybe_field, obj)) is not obj: - field_name_expr: str = ( - f' |_{maybe_field}: {codec.pld_spec_str} = ' - ) - fmt_val_lines: list[str] = pformat(field_val).splitlines() - fmt_val: str = ( - f'{fmt_val_lines[0]}\n' - + - textwrap.indent( - '\n'.join(fmt_val_lines[1:]), - prefix=' '*len(field_name_expr), - ) - ) - message += ( - f'{msg.rstrip("`")}\n\n' - f'<{msg_type.__qualname__}(\n' - # f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n' - f'{field_name_expr}{fmt_val}\n' - f')>' - ) - - if verb_header: - message = f'{verb_header} ' + message - - msgtyperr = MsgTypeError.from_decode( - message=message, - bad_msg=bad_msg, - bad_msg_as_dict=msg_dict, - boxed_type=type(src_validation_error), - - # NOTE: for pld-spec MTEs we set the `._ipc_msg` manually: - # - for the send-side `.started()` pld-validate - # case we actually raise inline so we don't need to - # set the it at all. - # - for recv side we set it inside `PldRx.decode_pld()` - # after a manual call to `pack_error()` since we - # actually want to emulate the `Error` from the mte we - # build here. So by default in that case, this is left - # as `None` here. - # ipc_msg=src_err_msg, + # decode the msg-bytes using the std msgpack + # interchange-prot (i.e. without any `msgspec.Struct` + # handling) so that we can determine what + # `.msg.types.PayloadMsg` is the culprit by reporting the + # received value. + msg: bytes + msg_dict: dict = msgpack.decode(msg) + msg_type_name: str = msg_dict['msg_type'] + msg_type = getattr(msgtypes, msg_type_name) + message: str = ( + f'invalid `{msg_type_name}` IPC msg\n\n' ) - msgtyperr.__cause__ = src_validation_error - return msgtyperr + # XXX be "fancy" and see if we can determine the exact + # invalid field such that we can comprehensively report + # the specific field's type problem. + msgspec_msg: str = src_validation_error.args[0].rstrip('`') + msg, _, maybe_field = msgspec_msg.rpartition('$.') + obj = object() + if (field_val := msg_dict.get(maybe_field, obj)) is not obj: + field_name_expr: str = ( + f' |_{maybe_field}: {codec.pld_spec_str} = ' + ) + fmt_val_lines: list[str] = pformat(field_val).splitlines() + fmt_val: str = ( + f'{fmt_val_lines[0]}\n' + + + textwrap.indent( + '\n'.join(fmt_val_lines[1:]), + prefix=' '*len(field_name_expr), + ) + ) + message += ( + f'{msg.rstrip("`")}\n\n' + f'<{msg_type.__qualname__}(\n' + # f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n' + f'{field_name_expr}{fmt_val}\n' + f')>' + ) + + if verb_header: + message = f'{verb_header} ' + message + + msgtyperr = MsgTypeError.from_decode( + message=message, + bad_msg=bad_msg, + bad_msg_as_dict=msg_dict, + boxed_type=type(src_validation_error), + + # NOTE: for pld-spec MTEs we set the `._ipc_msg` manually: + # - for the send-side `.started()` pld-validate + # case we actually raise inline so we don't need to + # set the it at all. + # - for recv side we set it inside `PldRx.decode_pld()` + # after a manual call to `pack_error()` since we + # actually want to emulate the `Error` from the mte we + # build here. So by default in that case, this is left + # as `None` here. + # ipc_msg=src_err_msg, + ) + msgtyperr.__cause__ = src_validation_error + return msgtyperr diff --git a/tractor/_ipc.py b/tractor/_ipc.py index ec7d348..e5e3d10 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -49,7 +49,8 @@ from tractor._exceptions import ( MsgTypeError, pack_from_raise, TransportClosed, - _mk_msg_type_err, + _mk_send_mte, + _mk_recv_mte, ) from tractor.msg import ( _ctxvar_MsgCodec, @@ -256,7 +257,7 @@ class MsgpackTCPStream(MsgTransport): # and always raise such that spec violations # are never allowed to be caught silently! except msgspec.ValidationError as verr: - msgtyperr: MsgTypeError = _mk_msg_type_err( + msgtyperr: MsgTypeError = _mk_recv_mte( msg=msg_bytes, codec=codec, src_validation_error=verr, @@ -321,7 +322,7 @@ class MsgpackTCPStream(MsgTransport): if type(msg) not in msgtypes.__msg_types__: if strict_types: - raise _mk_msg_type_err( + raise _mk_send_mte( msg, codec=codec, ) @@ -333,8 +334,9 @@ class MsgpackTCPStream(MsgTransport): try: bytes_data: bytes = codec.encode(msg) - except TypeError as typerr: - msgtyperr: MsgTypeError = _mk_msg_type_err( + except TypeError as _err: + typerr = _err + msgtyperr: MsgTypeError = _mk_send_mte( msg, codec=codec, message=( diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 80633e7..91c0dde 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -47,7 +47,7 @@ from tractor._exceptions import ( InternalError, _raise_from_unexpected_msg, MsgTypeError, - _mk_msg_type_err, + _mk_recv_mte, pack_error, ) from tractor._state import current_ipc_ctx @@ -264,7 +264,7 @@ class PldRx(Struct): # pack mgterr into error-msg for # reraise below; ensure remote-actor-err # info is displayed nicely? - mte: MsgTypeError = _mk_msg_type_err( + mte: MsgTypeError = _mk_recv_mte( msg=msg, codec=self.pld_dec, src_validation_error=valerr, @@ -277,19 +277,6 @@ class PldRx(Struct): if is_started_send_side: raise mte - # XXX TODO: remove this right? - # => any bad stated/return values should - # always be treated a remote errors right? - # - # if ( - # expect_msg is Return - # or expect_msg is Started - # ): - # # set emulated remote error more-or-less as the - # # runtime would - # ctx: Context = getattr(ipc, 'ctx', ipc) - # ctx._maybe_cancel_and_set_remote_error(mte) - # NOTE: the `.message` is automatically # transferred into the message as long as we # define it as a `Error.message` field. @@ -799,13 +786,18 @@ def validate_payload_msg( # raise any msg type error NO MATTER WHAT! except ValidationError as verr: - mte: MsgTypeError = _mk_msg_type_err( - msg=roundtripped, - codec=codec, - src_validation_error=verr, - verb_header='Trying to send ', - is_invalid_payload=True, - ) + try: + mte: MsgTypeError = _mk_recv_mte( + msg=roundtripped, + codec=codec, + src_validation_error=verr, + verb_header='Trying to send ', + is_invalid_payload=True, + ) + except BaseException: + __tracebackhide__: bool = False + raise + if not raise_mte: return mte