forked from goodboy/tractor
Break `_mk_msg_type_err()` into recv/send side funcs
Name them `_mk_send_mte()`/`_mk_recv_mte()` and change the runtime to call each appropriately depending on location/usage. Also add some dynamic call-frame "unhide" blocks such that when we expect raised MTE from the aboves calls but we get a different unexpected error from the runtime, we ensure the call stack downward is shown in tbs/pdb. |_ ideally in the longer run we come up with a fancier dynamic sys for this, prolly something in `.devx._frame_stack`?runtime_to_msgspec
parent
8477919fc9
commit
711f639fc5
|
@ -49,7 +49,6 @@ from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
Callable,
|
Callable,
|
||||||
Mapping,
|
|
||||||
Type,
|
Type,
|
||||||
TypeAlias,
|
TypeAlias,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -1484,13 +1483,21 @@ class Context:
|
||||||
#
|
#
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
if validate_pld_spec:
|
if validate_pld_spec:
|
||||||
msgops.validate_payload_msg(
|
# TODO: prolly wrap this as a `show_frame_when_not()`
|
||||||
pld_msg=started_msg,
|
try:
|
||||||
pld_value=value,
|
msgops.validate_payload_msg(
|
||||||
ipc=self,
|
pld_msg=started_msg,
|
||||||
strict_pld_parity=strict_pld_parity,
|
pld_value=value,
|
||||||
hide_tb=hide_tb,
|
ipc=self,
|
||||||
)
|
strict_pld_parity=strict_pld_parity,
|
||||||
|
hide_tb=hide_tb,
|
||||||
|
)
|
||||||
|
except BaseException as err:
|
||||||
|
if not isinstance(err, MsgTypeError):
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
# TODO: maybe a flag to by-pass encode op if already done
|
# TODO: maybe a flag to by-pass encode op if already done
|
||||||
# here in caller?
|
# here in caller?
|
||||||
|
@ -2185,11 +2192,6 @@ async def open_context_from_portal(
|
||||||
try:
|
try:
|
||||||
result_or_err: Exception|Any = await ctx.result()
|
result_or_err: Exception|Any = await ctx.result()
|
||||||
except BaseException as berr:
|
except BaseException as berr:
|
||||||
# cancelled before (or maybe during?) final result capture
|
|
||||||
# if isinstance(trio.Cancelled, berr):
|
|
||||||
# from .devx import mk_pdb
|
|
||||||
# mk_pdb.set_trace()
|
|
||||||
|
|
||||||
# on normal teardown, if we get some error
|
# on normal teardown, if we get some error
|
||||||
# raised in `Context.result()` we still want to
|
# raised in `Context.result()` we still want to
|
||||||
# save that error on the ctx's state to
|
# save that error on the ctx's state to
|
||||||
|
@ -2201,7 +2203,7 @@ async def open_context_from_portal(
|
||||||
ctx._local_error: BaseException = scope_err
|
ctx._local_error: BaseException = scope_err
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# yes! this worx Bp
|
# yes this worx!
|
||||||
# from .devx import _debug
|
# from .devx import _debug
|
||||||
# await _debug.pause()
|
# await _debug.pause()
|
||||||
|
|
||||||
|
|
|
@ -1232,14 +1232,13 @@ def _raise_from_unexpected_msg(
|
||||||
_raise_from_no_key_in_msg = _raise_from_unexpected_msg
|
_raise_from_no_key_in_msg = _raise_from_unexpected_msg
|
||||||
|
|
||||||
|
|
||||||
def _mk_msg_type_err(
|
def _mk_send_mte(
|
||||||
msg: Any|bytes|MsgType,
|
msg: Any|bytes|MsgType,
|
||||||
codec: MsgCodec|MsgDec,
|
codec: MsgCodec|MsgDec,
|
||||||
|
|
||||||
message: str|None = None,
|
message: str|None = None,
|
||||||
verb_header: str = '',
|
verb_header: str = '',
|
||||||
|
|
||||||
src_validation_error: ValidationError|None = None,
|
|
||||||
src_type_error: TypeError|None = None,
|
src_type_error: TypeError|None = None,
|
||||||
is_invalid_payload: bool = False,
|
is_invalid_payload: bool = False,
|
||||||
|
|
||||||
|
@ -1247,131 +1246,148 @@ def _mk_msg_type_err(
|
||||||
|
|
||||||
) -> MsgTypeError:
|
) -> MsgTypeError:
|
||||||
'''
|
'''
|
||||||
Compose a `MsgTypeError` from an input runtime context.
|
Compose a `MsgTypeError` from a `Channel.send()`-side error,
|
||||||
|
normally raised witih a runtime IPC `Context`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# `Channel.send()` case
|
if isinstance(codec, MsgDec):
|
||||||
if src_validation_error is None:
|
raise RuntimeError(
|
||||||
|
'`codec` must be a `MsgCodec` for send-side errors?'
|
||||||
|
)
|
||||||
|
|
||||||
if isinstance(codec, MsgDec):
|
from tractor.devx import (
|
||||||
raise RuntimeError(
|
pformat_caller_frame,
|
||||||
'`codec` must be a `MsgCodec` for send-side errors?'
|
)
|
||||||
|
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||||
|
# prolly a manual type-check on our part.
|
||||||
|
if message is None:
|
||||||
|
tb_fmt: str = pformat_caller_frame(stack_limit=3)
|
||||||
|
message: str = (
|
||||||
|
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||||
|
f'{tb_fmt}\n'
|
||||||
|
f'Valid IPC msgs are:\n\n'
|
||||||
|
f'{codec.msg_spec_str}\n',
|
||||||
|
)
|
||||||
|
elif src_type_error:
|
||||||
|
src_message: str = str(src_type_error)
|
||||||
|
patt: str = 'type '
|
||||||
|
type_idx: int = src_message.find('type ')
|
||||||
|
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
|
||||||
|
|
||||||
|
enc_hook: Callable|None = codec.enc.enc_hook
|
||||||
|
if enc_hook is None:
|
||||||
|
message += (
|
||||||
|
'\n\n'
|
||||||
|
|
||||||
|
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
|
||||||
|
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
|
||||||
|
|
||||||
|
f'Check the `msgspec` docs for ad-hoc type extending:\n'
|
||||||
|
'|_ https://jcristharif.com/msgspec/extending.html\n'
|
||||||
|
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
from tractor.devx import (
|
msgtyperr = MsgTypeError(
|
||||||
pformat_caller_frame,
|
message=message,
|
||||||
|
_bad_msg=msg,
|
||||||
|
)
|
||||||
|
# ya, might be `None`
|
||||||
|
msgtyperr.__cause__ = src_type_error
|
||||||
|
return msgtyperr
|
||||||
|
|
||||||
|
|
||||||
|
def _mk_recv_mte(
|
||||||
|
msg: Any|bytes|MsgType,
|
||||||
|
codec: MsgCodec|MsgDec,
|
||||||
|
|
||||||
|
message: str|None = None,
|
||||||
|
verb_header: str = '',
|
||||||
|
|
||||||
|
src_validation_error: ValidationError|None = None,
|
||||||
|
is_invalid_payload: bool = False,
|
||||||
|
|
||||||
|
**mte_kwargs,
|
||||||
|
|
||||||
|
) -> MsgTypeError:
|
||||||
|
'''
|
||||||
|
Compose a `MsgTypeError` from a
|
||||||
|
`Channel|Context|MsgStream.receive()`-side error,
|
||||||
|
normally raised witih a runtime IPC ctx or streaming
|
||||||
|
block.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msg_dict: dict|None = None
|
||||||
|
bad_msg: PayloadMsg|None = None
|
||||||
|
|
||||||
|
if is_invalid_payload:
|
||||||
|
msg_type: str = type(msg)
|
||||||
|
any_pld: Any = msgpack.decode(msg.pld)
|
||||||
|
message: str = (
|
||||||
|
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
|
||||||
|
f'value: `{any_pld!r}` does not match type-spec: '
|
||||||
|
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
||||||
)
|
)
|
||||||
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
bad_msg = msg
|
||||||
# prolly a manual type-check on our part.
|
|
||||||
if message is None:
|
|
||||||
tb_fmt: str = pformat_caller_frame(stack_limit=3)
|
|
||||||
message: str = (
|
|
||||||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
|
||||||
f'{tb_fmt}\n'
|
|
||||||
f'Valid IPC msgs are:\n\n'
|
|
||||||
f'{codec.msg_spec_str}\n',
|
|
||||||
)
|
|
||||||
elif src_type_error:
|
|
||||||
src_message: str = str(src_type_error)
|
|
||||||
patt: str = 'type '
|
|
||||||
type_idx: int = src_message.find('type ')
|
|
||||||
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
|
|
||||||
|
|
||||||
enc_hook: Callable|None = codec.enc.enc_hook
|
|
||||||
if enc_hook is None:
|
|
||||||
message += (
|
|
||||||
'\n\n'
|
|
||||||
|
|
||||||
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
|
|
||||||
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
|
|
||||||
|
|
||||||
f'Check the `msgspec` docs for ad-hoc type extending:\n'
|
|
||||||
'|_ https://jcristharif.com/msgspec/extending.html\n'
|
|
||||||
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
msgtyperr = MsgTypeError(
|
|
||||||
message=message,
|
|
||||||
_bad_msg=msg,
|
|
||||||
)
|
|
||||||
# ya, might be `None`
|
|
||||||
msgtyperr.__cause__ = src_type_error
|
|
||||||
return msgtyperr
|
|
||||||
|
|
||||||
# `Channel.recv()` case
|
|
||||||
else:
|
else:
|
||||||
msg_dict: dict|None = None
|
# decode the msg-bytes using the std msgpack
|
||||||
bad_msg: PayloadMsg|None = None
|
# interchange-prot (i.e. without any `msgspec.Struct`
|
||||||
|
# handling) so that we can determine what
|
||||||
if is_invalid_payload:
|
# `.msg.types.PayloadMsg` is the culprit by reporting the
|
||||||
msg_type: str = type(msg)
|
# received value.
|
||||||
any_pld: Any = msgpack.decode(msg.pld)
|
msg: bytes
|
||||||
message: str = (
|
msg_dict: dict = msgpack.decode(msg)
|
||||||
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
|
msg_type_name: str = msg_dict['msg_type']
|
||||||
f'value: `{any_pld!r}` does not match type-spec: '
|
msg_type = getattr(msgtypes, msg_type_name)
|
||||||
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
message: str = (
|
||||||
)
|
f'invalid `{msg_type_name}` IPC msg\n\n'
|
||||||
bad_msg = msg
|
|
||||||
|
|
||||||
else:
|
|
||||||
# decode the msg-bytes using the std msgpack
|
|
||||||
# interchange-prot (i.e. without any `msgspec.Struct`
|
|
||||||
# handling) so that we can determine what
|
|
||||||
# `.msg.types.PayloadMsg` is the culprit by reporting the
|
|
||||||
# received value.
|
|
||||||
msg: bytes
|
|
||||||
msg_dict: dict = msgpack.decode(msg)
|
|
||||||
msg_type_name: str = msg_dict['msg_type']
|
|
||||||
msg_type = getattr(msgtypes, msg_type_name)
|
|
||||||
message: str = (
|
|
||||||
f'invalid `{msg_type_name}` IPC msg\n\n'
|
|
||||||
)
|
|
||||||
# XXX be "fancy" and see if we can determine the exact
|
|
||||||
# invalid field such that we can comprehensively report
|
|
||||||
# the specific field's type problem.
|
|
||||||
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
|
|
||||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
|
||||||
obj = object()
|
|
||||||
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
|
||||||
field_name_expr: str = (
|
|
||||||
f' |_{maybe_field}: {codec.pld_spec_str} = '
|
|
||||||
)
|
|
||||||
fmt_val_lines: list[str] = pformat(field_val).splitlines()
|
|
||||||
fmt_val: str = (
|
|
||||||
f'{fmt_val_lines[0]}\n'
|
|
||||||
+
|
|
||||||
textwrap.indent(
|
|
||||||
'\n'.join(fmt_val_lines[1:]),
|
|
||||||
prefix=' '*len(field_name_expr),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
message += (
|
|
||||||
f'{msg.rstrip("`")}\n\n'
|
|
||||||
f'<{msg_type.__qualname__}(\n'
|
|
||||||
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
|
|
||||||
f'{field_name_expr}{fmt_val}\n'
|
|
||||||
f')>'
|
|
||||||
)
|
|
||||||
|
|
||||||
if verb_header:
|
|
||||||
message = f'{verb_header} ' + message
|
|
||||||
|
|
||||||
msgtyperr = MsgTypeError.from_decode(
|
|
||||||
message=message,
|
|
||||||
bad_msg=bad_msg,
|
|
||||||
bad_msg_as_dict=msg_dict,
|
|
||||||
boxed_type=type(src_validation_error),
|
|
||||||
|
|
||||||
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
|
||||||
# - for the send-side `.started()` pld-validate
|
|
||||||
# case we actually raise inline so we don't need to
|
|
||||||
# set the it at all.
|
|
||||||
# - for recv side we set it inside `PldRx.decode_pld()`
|
|
||||||
# after a manual call to `pack_error()` since we
|
|
||||||
# actually want to emulate the `Error` from the mte we
|
|
||||||
# build here. So by default in that case, this is left
|
|
||||||
# as `None` here.
|
|
||||||
# ipc_msg=src_err_msg,
|
|
||||||
)
|
)
|
||||||
msgtyperr.__cause__ = src_validation_error
|
# XXX be "fancy" and see if we can determine the exact
|
||||||
return msgtyperr
|
# invalid field such that we can comprehensively report
|
||||||
|
# the specific field's type problem.
|
||||||
|
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
|
||||||
|
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||||
|
obj = object()
|
||||||
|
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
||||||
|
field_name_expr: str = (
|
||||||
|
f' |_{maybe_field}: {codec.pld_spec_str} = '
|
||||||
|
)
|
||||||
|
fmt_val_lines: list[str] = pformat(field_val).splitlines()
|
||||||
|
fmt_val: str = (
|
||||||
|
f'{fmt_val_lines[0]}\n'
|
||||||
|
+
|
||||||
|
textwrap.indent(
|
||||||
|
'\n'.join(fmt_val_lines[1:]),
|
||||||
|
prefix=' '*len(field_name_expr),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
message += (
|
||||||
|
f'{msg.rstrip("`")}\n\n'
|
||||||
|
f'<{msg_type.__qualname__}(\n'
|
||||||
|
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
|
||||||
|
f'{field_name_expr}{fmt_val}\n'
|
||||||
|
f')>'
|
||||||
|
)
|
||||||
|
|
||||||
|
if verb_header:
|
||||||
|
message = f'{verb_header} ' + message
|
||||||
|
|
||||||
|
msgtyperr = MsgTypeError.from_decode(
|
||||||
|
message=message,
|
||||||
|
bad_msg=bad_msg,
|
||||||
|
bad_msg_as_dict=msg_dict,
|
||||||
|
boxed_type=type(src_validation_error),
|
||||||
|
|
||||||
|
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
||||||
|
# - for the send-side `.started()` pld-validate
|
||||||
|
# case we actually raise inline so we don't need to
|
||||||
|
# set the it at all.
|
||||||
|
# - for recv side we set it inside `PldRx.decode_pld()`
|
||||||
|
# after a manual call to `pack_error()` since we
|
||||||
|
# actually want to emulate the `Error` from the mte we
|
||||||
|
# build here. So by default in that case, this is left
|
||||||
|
# as `None` here.
|
||||||
|
# ipc_msg=src_err_msg,
|
||||||
|
)
|
||||||
|
msgtyperr.__cause__ = src_validation_error
|
||||||
|
return msgtyperr
|
||||||
|
|
|
@ -49,7 +49,8 @@ from tractor._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
_mk_msg_type_err,
|
_mk_send_mte,
|
||||||
|
_mk_recv_mte,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_ctxvar_MsgCodec,
|
_ctxvar_MsgCodec,
|
||||||
|
@ -256,7 +257,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# and always raise such that spec violations
|
# and always raise such that spec violations
|
||||||
# are never allowed to be caught silently!
|
# are never allowed to be caught silently!
|
||||||
except msgspec.ValidationError as verr:
|
except msgspec.ValidationError as verr:
|
||||||
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
msgtyperr: MsgTypeError = _mk_recv_mte(
|
||||||
msg=msg_bytes,
|
msg=msg_bytes,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
src_validation_error=verr,
|
src_validation_error=verr,
|
||||||
|
@ -321,7 +322,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
|
|
||||||
if type(msg) not in msgtypes.__msg_types__:
|
if type(msg) not in msgtypes.__msg_types__:
|
||||||
if strict_types:
|
if strict_types:
|
||||||
raise _mk_msg_type_err(
|
raise _mk_send_mte(
|
||||||
msg,
|
msg,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
)
|
)
|
||||||
|
@ -333,8 +334,9 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
bytes_data: bytes = codec.encode(msg)
|
bytes_data: bytes = codec.encode(msg)
|
||||||
except TypeError as typerr:
|
except TypeError as _err:
|
||||||
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
typerr = _err
|
||||||
|
msgtyperr: MsgTypeError = _mk_send_mte(
|
||||||
msg,
|
msg,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
message=(
|
message=(
|
||||||
|
|
|
@ -47,7 +47,7 @@ from tractor._exceptions import (
|
||||||
InternalError,
|
InternalError,
|
||||||
_raise_from_unexpected_msg,
|
_raise_from_unexpected_msg,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
_mk_msg_type_err,
|
_mk_recv_mte,
|
||||||
pack_error,
|
pack_error,
|
||||||
)
|
)
|
||||||
from tractor._state import current_ipc_ctx
|
from tractor._state import current_ipc_ctx
|
||||||
|
@ -264,7 +264,7 @@ class PldRx(Struct):
|
||||||
# pack mgterr into error-msg for
|
# pack mgterr into error-msg for
|
||||||
# reraise below; ensure remote-actor-err
|
# reraise below; ensure remote-actor-err
|
||||||
# info is displayed nicely?
|
# info is displayed nicely?
|
||||||
mte: MsgTypeError = _mk_msg_type_err(
|
mte: MsgTypeError = _mk_recv_mte(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
codec=self.pld_dec,
|
codec=self.pld_dec,
|
||||||
src_validation_error=valerr,
|
src_validation_error=valerr,
|
||||||
|
@ -277,19 +277,6 @@ class PldRx(Struct):
|
||||||
if is_started_send_side:
|
if is_started_send_side:
|
||||||
raise mte
|
raise mte
|
||||||
|
|
||||||
# XXX TODO: remove this right?
|
|
||||||
# => any bad stated/return values should
|
|
||||||
# always be treated a remote errors right?
|
|
||||||
#
|
|
||||||
# if (
|
|
||||||
# expect_msg is Return
|
|
||||||
# or expect_msg is Started
|
|
||||||
# ):
|
|
||||||
# # set emulated remote error more-or-less as the
|
|
||||||
# # runtime would
|
|
||||||
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
|
||||||
# ctx._maybe_cancel_and_set_remote_error(mte)
|
|
||||||
|
|
||||||
# NOTE: the `.message` is automatically
|
# NOTE: the `.message` is automatically
|
||||||
# transferred into the message as long as we
|
# transferred into the message as long as we
|
||||||
# define it as a `Error.message` field.
|
# define it as a `Error.message` field.
|
||||||
|
@ -799,13 +786,18 @@ def validate_payload_msg(
|
||||||
|
|
||||||
# raise any msg type error NO MATTER WHAT!
|
# raise any msg type error NO MATTER WHAT!
|
||||||
except ValidationError as verr:
|
except ValidationError as verr:
|
||||||
mte: MsgTypeError = _mk_msg_type_err(
|
try:
|
||||||
msg=roundtripped,
|
mte: MsgTypeError = _mk_recv_mte(
|
||||||
codec=codec,
|
msg=roundtripped,
|
||||||
src_validation_error=verr,
|
codec=codec,
|
||||||
verb_header='Trying to send ',
|
src_validation_error=verr,
|
||||||
is_invalid_payload=True,
|
verb_header='Trying to send ',
|
||||||
)
|
is_invalid_payload=True,
|
||||||
|
)
|
||||||
|
except BaseException:
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
raise
|
||||||
|
|
||||||
if not raise_mte:
|
if not raise_mte:
|
||||||
return mte
|
return mte
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue