forked from goodboy/tractor
1
0
Fork 0

Move `MsgTypeError` maker func to `._exceptions`

Since it's going to be used from the IPC primitive APIs
(`Context`/`MsgStream`) for similarly handling payload type spec
validation errors and bc it's really not well situation in the IPC
module XD

Summary of (impl) tweaks:
- obvi move `_mk_msg_type_err()` and import and use it in `._ipc`; ends
  up avoiding a lot of ad-hoc imports we had from `._exceptions` anyway!
- mask out "new codec" runtime log emission from `MsgpackTCPStream`.
- allow passing a (coming in next commit) `codec: MsgDec` (message
  decoder) which supports the same required `.pld_spec_str: str` attr.
- for send side logging use existing `MsgCodec..pformat_msg_spec()`.
- rename `_raise_from_no_key_in_msg()` to the now more appropriate
  `_raise_from_unexpected_msg()`, but leaving alias for now.
runtime_to_msgspec
Tyler Goodlet 2024-04-22 18:01:09 -04:00
parent 7b020c42cc
commit 0df7d557db
2 changed files with 138 additions and 128 deletions

View File

@ -24,6 +24,7 @@ import importlib
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -32,8 +33,11 @@ import traceback
import trio import trio
from msgspec import ( from msgspec import (
structs,
defstruct, defstruct,
msgpack,
Raw,
structs,
ValidationError,
) )
from tractor._state import current_actor from tractor._state import current_actor
@ -44,6 +48,8 @@ from tractor.msg import (
Stop, Stop,
Yield, Yield,
types as msgtypes, types as msgtypes,
MsgCodec,
MsgDec,
) )
from tractor.msg.pretty_struct import ( from tractor.msg.pretty_struct import (
iter_fields, iter_fields,
@ -932,7 +938,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
return False return False
def _raise_from_no_key_in_msg( def _raise_from_unexpected_msg(
ctx: Context, ctx: Context,
msg: MsgType, msg: MsgType,
src_err: AttributeError, src_err: AttributeError,
@ -1032,7 +1038,6 @@ def _raise_from_no_key_in_msg(
# that arrived which is probably the source of this stream # that arrived which is probably the source of this stream
# closure # closure
ctx.maybe_raise() ctx.maybe_raise()
raise eoc from src_err raise eoc from src_err
if ( if (
@ -1052,3 +1057,128 @@ def _raise_from_no_key_in_msg(
" BUT received a non-error msg:\n" " BUT received a non-error msg:\n"
f'{pformat(msg)}' f'{pformat(msg)}'
) from src_err ) from src_err
_raise_from_no_key_in_msg = _raise_from_unexpected_msg
def _mk_msg_type_err(
msg: Any|bytes|Raw,
codec: MsgCodec|MsgDec,
message: str|None = None,
verb_header: str = '',
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None,
) -> MsgTypeError:
'''
Compose a `MsgTypeError` from an input runtime context.
'''
# `Channel.send()` case
if src_validation_error is None:
if isinstance(codec, MsgDec):
raise RuntimeError(
'`codec` must be a `MsgCodec` for send-side errors?'
)
# no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part.
if message is None:
fmt_spec: str = codec.pformat_msg_spec()
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ',
indent='',
)
message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n',
)
elif src_type_error:
src_message: str = str(src_type_error)
patt: str = 'type '
type_idx: int = src_message.find('type ')
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
enc_hook: Callable|None = codec.enc.enc_hook
if enc_hook is None:
message += (
'\n\n'
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
f'Check the `msgspec` docs for ad-hoc type extending:\n'
'|_ https://jcristharif.com/msgspec/extending.html\n'
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
)
msgtyperr = MsgTypeError(
message=message,
ipc_msg=msg,
)
# ya, might be `None`
msgtyperr.__cause__ = src_type_error
return msgtyperr
# `Channel.recv()` case
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
message = f'{verb_header} ' + message
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
msgtyperr = MsgTypeError.from_decode(
message=message,
msgdict=msg_dict,
)
msgtyperr.__cause__ = src_validation_error
return msgtyperr

View File

@ -49,6 +49,7 @@ from tractor._exceptions import (
MsgTypeError, MsgTypeError,
pack_from_raise, pack_from_raise,
TransportClosed, TransportClosed,
_mk_msg_type_err,
) )
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
@ -118,127 +119,6 @@ class MsgTransport(Protocol[MsgType]):
... ...
def _mk_msg_type_err(
msg: Any|bytes,
codec: MsgCodec,
message: str|None = None,
verb_header: str = '',
src_validation_error: msgspec.ValidationError|None = None,
src_type_error: TypeError|None = None,
) -> MsgTypeError:
import textwrap
# `Channel.send()` case
if src_validation_error is None: # send-side
# no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part.
if message is None:
import traceback
from tractor._exceptions import pformat_boxed_tb
fmt_spec: str = '\n'.join(
map(str, codec.msg_spec.__args__)
)
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ',
indent='',
)
message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n',
)
elif src_type_error:
src_message: str = str(src_type_error)
patt: str = 'type '
type_idx: int = src_message.find('type ')
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
enc_hook: Callable|None = codec.enc.enc_hook
if enc_hook is None:
message += (
'\n\n'
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
f'Check the `msgspec` docs for ad-hoc type extending:\n'
'|_ https://jcristharif.com/msgspec/extending.html\n'
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
)
msgtyperr = MsgTypeError(
message=message,
ipc_msg=msg,
)
# ya, might be `None`
msgtyperr.__cause__ = src_type_error
return msgtyperr
# `Channel.recv()` case
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgspec.msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
message = f'{verb_header} ' + message
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
msgtyperr = MsgTypeError.from_decode(
message=message,
msgdict=msg_dict,
)
msgtyperr.__cause__ = src_validation_error
return msgtyperr
# TODO: not sure why we have to inherit here, but it seems to be an # TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue? # probably should make a `mypy` issue?
@ -299,10 +179,10 @@ class MsgpackTCPStream(MsgTransport):
_codec._ctxvar_MsgCodec.get() _codec._ctxvar_MsgCodec.get()
) )
# TODO: mask out before release? # TODO: mask out before release?
log.runtime( # log.runtime(
f'New {self} created with codec\n' # f'New {self} created with codec\n'
f'codec: {self._codec}\n' # f'codec: {self._codec}\n'
) # )
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
''' '''