Compare commits

..

3 Commits

Author SHA1 Message Date
Tyler Goodlet eee4c61b51 Add `MsgTypeError` "bad msg" capture
Such that if caught by user code and/or the runtime we can introspect
the original msg which caused the type error. Previously this was kinda
half-baked with a `.msg_dict` which was delivered from an `Any`-decode
of the shuttle msg in `_mk_msg_type_err()` but now this more explicitly
refines the API and supports both `PayloadMsg`-instance or the msg-dict
style injection:
- allow passing either of `bad_msg: PayloadMsg|None` or
  `bad_msg_as_dict: dict|None` to `MsgTypeError.from_decode()`.
- expose public props for both ^ whilst dropping prior `.msgdict`.
- rework `.from_decode()` to explicitly accept `**extra_msgdata: dict`
  |_ only overriding it from any `bad_msg_as_dict` if the keys are found in
    `_ipcmsg_keys`, **except** for `_bad_msg` when `bad_msg` is passed.
  |_ drop `.ipc_msg` passthrough.
  |_ drop `msgdict` input.
- adjust `.cid` to only pull from the `.bad_msg` if set.

Related fixes/adjustments:
- `pack_from_raise()` should pull `boxed_type_str` from
  `boxed_type.__name__`, not the `type()` of it.. also add a
  `hide_tb: bool` flag.
- don't include `_msg_dict` and `_bad_msg` in the `_body_fields` set.
- allow more granular boxed traceback-str controls:
  |_ allow passing a `tb_str: str` explicitly in which case we use it
    verbatim and presume caller knows what they're doing.
  |_ when not provided, use the more explicit
    `traceback.format_exception(exc)` since the error instance is
    a required input (we still fail back to the old `.format_exc()` call
    if for some reason the caller passes `None`; but that should be
    a bug right?).
  |_ if a `tb: TracebackType` and a `tb_str` is passed, concat them.
- in `RemoteActorError.pformat()` don't indent the `._message` part used
  for the `body` when `with_type_header == False`.
- update `_mk_msg_type_err()` to use `bad_msg`/`bad_msg_as_dict`
  appropriately and drop passing `ipc_msg`.
2024-05-27 22:36:05 -04:00
Tyler Goodlet 42ba855d1b More correct/explicit `.started()` send-side validation
In the sense that we handle it as a special case that exposed
through to `RxPld.dec_msg()` with a new `is_started_send_side: bool`.

(Non-ideal) `Context.started()` impl deats:
- only do send-side pld-spec validation when a new `validate_pld_spec`
  is set (by default it's not).
- call `self.pld_rx.dec_msg(is_started_send_side=True)` to validate the
  payload field from the just codec-ed `Started` msg's `msg_bytes` by
  passing the `roundtripped` msg (with it's `.pld: Raw`) directly.
- add a `hide_tb: bool` param and proxy it to the `.dec_msg()` call.

(Non-ideal) `PldRx.dec_msg()` impl deats:
- for now we're packing the MTE inside an `Error` via a manual call to
  `pack_error()` and then setting that as the `msg` passed to
  `_raise_from_unexpected_msg()` (though really we should just raise
  inline?).
- manually set the `MsgTypeError._ipc_msg` to the above..

Other,
- more comprehensive `Context` type doc string.
- various `hide_tb: bool` kwarg additions through `._ops.PldRx` meths.
- proto a `.msg._ops.validate_payload_msg()` helper planned to get the
  logic from this version of `.started()`'s send-side validation so as
  to be useful more generally elsewhere.. (like for raising back
  `Return` values on the child side?).

Warning: this commit may have been made out of order from required
changes to `._exceptions` which will come in a follow up!
2024-05-27 14:59:40 -04:00
Tyler Goodlet c2cc12e14f Add basic payload-spec test suite
Starts with some very basic cases:
- verify both subactor-as-child-ctx-task send side validation (failures)
  as well as relay and raise on root-parent-side-task.
- wrap failure expectation cases that bubble out of `@acm`s with
  a `maybe_expect_raises()` equiv wrapper with an embedded timeout.
- add `Return` cases including invalid by `str` and valid by a `None`.

Still ToDo:
- commit impl changes to make the bulk of this suite pass.
- adjust how `MsgTypeError`s format the local (`.started()`) send side
  `.tb_str` such that we don't do a "boxed" error prior to
  `pack_error()` being called normally prior to `Error` transit.
2024-05-27 13:52:35 -04:00
4 changed files with 586 additions and 148 deletions

View File

@ -0,0 +1,316 @@
'''
Audit sub-sys APIs from `.msg._ops`
mostly for ensuring correct `contextvars`
related settings around IPC contexts.
'''
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
# import typing
from typing import (
# Any,
TypeAlias,
# Union,
)
from contextvars import (
Context,
)
from msgspec import (
# structs,
# msgpack,
Struct,
# ValidationError,
)
import pytest
import trio
import tractor
from tractor import (
# _state,
MsgTypeError,
current_ipc_ctx,
Portal,
)
from tractor.msg import (
_ops as msgops,
Return,
)
from tractor.msg import (
_codec,
# _ctxvar_MsgCodec,
# NamespacePath,
# MsgCodec,
# mk_codec,
# apply_codec,
# current_codec,
)
from tractor.msg.types import (
log,
# _payload_msgs,
# PayloadMsg,
# Started,
# mk_msg_spec,
)
class PldMsg(Struct):
field: str
maybe_msg_spec = PldMsg|None
@cm
def custom_spec(
ctx: Context,
spec: TypeAlias,
) -> _codec.MsgCodec:
'''
Apply a custom payload spec, remove on exit.
'''
rx: msgops.PldRx = ctx._pld_rx
@acm
async def maybe_expect_raises(
raises: BaseException|None = None,
ensure_in_message: list[str]|None = None,
reraise: bool = False,
timeout: int = 3,
) -> None:
'''
Async wrapper for ensuring errors propagate from the inner scope.
'''
with trio.fail_after(timeout):
try:
yield
except BaseException as _inner_err:
inner_err = _inner_err
# wasn't-expected to error..
if raises is None:
raise
else:
assert type(inner_err) is raises
# maybe check for error txt content
if ensure_in_message:
part: str
for part in ensure_in_message:
for i, arg in enumerate(inner_err.args):
if part in arg:
break
# if part never matches an arg, then we're
# missing a match.
else:
raise ValueError(
'Failed to find error message content?\n\n'
f'expected: {ensure_in_message!r}\n'
f'part: {part!r}\n\n'
f'{inner_err.args}'
)
if reraise:
raise inner_err
else:
if raises:
raise RuntimeError(
f'Expected a {raises.__name__!r} to be raised?'
)
@tractor.context
async def child(
ctx: Context,
started_value: int|PldMsg|None,
return_value: str|None,
validate_pld_spec: bool,
raise_on_started_mte: bool = True,
) -> None:
'''
Call ``Context.started()`` more then once (an error).
'''
expect_started_mte: bool = started_value == 10
# sanaity check that child RPC context is the current one
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
rx: msgops.PldRx = ctx._pld_rx
orig_pldec: _codec.MsgDec = rx.pld_dec
# senity that default pld-spec should be set
assert (
rx.pld_dec
is
msgops._def_any_pldec
)
try:
with msgops.limit_plds(
spec=maybe_msg_spec,
) as pldec:
# sanity on `MsgDec` state
assert rx.pld_dec is pldec
assert pldec.spec is maybe_msg_spec
# 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation
try:
await ctx.started(
value=started_value,
validate_pld_spec=validate_pld_spec,
)
except MsgTypeError:
log.exception('started()` raised an MTE!\n')
if not expect_started_mte:
raise RuntimeError(
'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
# propagate to parent?
if raise_on_started_mte:
raise
else:
if expect_started_mte:
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
# XXX should always fail on recv side since we can't
# really do much else beside terminate and relay the
# msg-type-error from this RPC task ;)
return return_value
finally:
# sanity on `limit_plds()` reversion
assert (
rx.pld_dec
is
msgops._def_any_pldec
)
log.runtime(
'Reverted to previous pld-spec\n\n'
f'{orig_pldec}\n'
)
@pytest.mark.parametrize(
'return_value',
[
None,
'yo',
],
ids=[
'return[invalid-"yo"]',
'return[valid-None]',
],
)
@pytest.mark.parametrize(
'started_value',
[
10,
PldMsg(field='yo'),
],
ids=[
'Started[invalid-10]',
'Started[valid-PldMsg]',
],
)
@pytest.mark.parametrize(
'pld_check_started_value',
[
True,
False,
],
ids=[
'check-started-pld',
'no-started-pld-validate',
],
)
def test_basic_payload_spec(
debug_mode: bool,
loglevel: str,
return_value: str|None,
started_value: int|PldMsg,
pld_check_started_value: bool,
):
'''
Validate the most basic `PldRx` msg-type-spec semantics around
a IPC `Context` endpoint start, started-sync, and final return
value depending on set payload types and the currently applied
pld-spec.
'''
invalid_return: bool = return_value == 'yo'
invalid_started: bool = started_value == 10
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
loglevel=loglevel,
) as an:
p: Portal = await an.start_actor(
'child',
enable_modules=[__name__],
)
# since not opened yet.
assert current_ipc_ctx() is None
async with (
maybe_expect_raises(
raises=MsgTypeError if (
invalid_return
or
invalid_started
) else None,
ensure_in_message=[
"invalid `Return` payload",
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
],
),
p.open_context(
child,
return_value=return_value,
started_value=started_value,
pld_spec=maybe_msg_spec,
validate_pld_spec=pld_check_started_value,
) as (ctx, first),
):
# now opened with 'child' sub
assert current_ipc_ctx() is ctx
assert type(first) is PldMsg
assert first.field == 'yo'
try:
assert (await ctx.result()) is None
except MsgTypeError as mte:
if not invalid_return:
raise
else: # expected this invalid `Return.pld`
assert mte.cid == ctx.cid
# verify expected remote mte deats
await tractor.pause()
assert ctx._remote_error is mte
assert mte.expected_msg_type is Return
await p.cancel_actor()
trio.run(main)

View File

@ -15,12 +15,22 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The fundamental cross process SC abstraction: an inter-actor,
cancel-scope linked task "context".
The fundamental cross-process SC abstraction: an inter-actor,
transitively cancel-scope linked, (dual) task IPC coupled "context".
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
A `Context` is very similar to the look and feel of the
`.cancel_scope: trio.CancelScope` built into each `trio.Nursery`
except that it links the lifetimes of 2 memory space disjoint,
parallel executing, tasks scheduled in separate "actors".
So while a `trio.Nursery` has a `.parent_task` which exists both
before (open) and then inside the body of the `async with` of the
nursery's scope (/block), a `Context` contains 2 tasks, a "parent"
and a "child" side, where both execute independently in separate
memory domains of different (host's) processes linked through
a SC-transitive IPC "shuttle dialog protocol". The underlying IPC
dialog-(un)protocol allows for the maintainance of SC properties
end-2-end between the tasks.
'''
from __future__ import annotations
@ -71,13 +81,11 @@ from .msg import (
MsgCodec,
NamespacePath,
PayloadT,
Return,
Started,
Stop,
Yield,
current_codec,
pretty_struct,
types as msgtypes,
_ops as msgops,
)
from ._ipc import (
@ -90,7 +98,7 @@ from ._state import (
debug_mode,
_ctxvar_Context,
)
# ------ - ------
if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
@ -1598,16 +1606,15 @@ class Context:
async def started(
self,
# TODO: how to type this so that it's the
# same as the payload type? Is this enough?
value: PayloadT|None = None,
validate_pld_spec: bool = True,
strict_pld_parity: bool = False,
strict_parity: bool = False,
# TODO: this will always emit for msgpack for any () vs. []
# inside the value.. do we want to offer warnings on that?
# complain_no_parity: bool = False,
# TODO: this will always emit now that we do `.pld: Raw`
# passthrough.. so maybe just only complain when above strict
# flag is set?
complain_no_parity: bool = False,
hide_tb: bool = True,
) -> None:
'''
@ -1648,63 +1655,54 @@ class Context:
#
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
#
__tracebackhide__: bool = hide_tb
if validate_pld_spec:
# __tracebackhide__: bool = False
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(started_msg)
try:
# be a "cheap" dialog (see above!)
roundtripped: Started = codec.decode(msg_bytes)
# pld: PayloadT = await self.pld_rx.recv_pld(
pld: PayloadT = self.pld_rx.dec_msg(
msg=roundtripped,
ipc=self,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_parity
or
complain_no_parity
strict_pld_parity
and
pld != value
):
rt_started: Started = codec.decode(msg_bytes)
# XXX something is prolly totes cucked with the
# codec state!
if isinstance(rt_started, dict):
rt_started = msgtypes.from_dict_msg(
dict_msg=rt_started,
)
raise RuntimeError(
'Failed to roundtrip `Started` msg?\n'
f'{pretty_struct.pformat(rt_started)}\n'
)
if rt_started != started_msg:
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
rt_started,
roundtripped,
started_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
# TODO: rn this will pretty much always fail with
# any other sequence type embeded in the
# payload...
if (
self._strict_started
or
strict_parity
):
raise ValueError(complaint)
else:
log.warning(complaint)
await self.chan.send(started_msg)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
# always show this src frame in the tb
# __tracebackhide__: bool = False
raise _mk_msg_type_err(
msg=msg_bytes,
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send payload'
# > 'invalid `Started IPC msgs\n'
verb_header='Trying to send ',
is_invalid_payload=True,
) from verr
# TODO: maybe a flag to by-pass encode op if already done
# here in caller?
await self.chan.send(started_msg)
# set msg-related internal runtime-state
self._started_called = True
self._started_msg = started_msg
self._started_pld = value
@ -1997,12 +1995,7 @@ async def open_context_from_portal(
pld_spec: TypeAlias|None = None,
allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show? for now
# we DO SHOW this frame since it's awkward ow..
hide_tb: bool = False,
hide_tb: bool = True,
# proxied to RPC
**kwargs,
@ -2115,6 +2108,7 @@ async def open_context_from_portal(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
hide_tb=hide_tb,
)
# from .devx import pause

View File

@ -22,6 +22,9 @@ from __future__ import annotations
import builtins
import importlib
from pprint import pformat
from types import (
TracebackType,
)
from typing import (
Any,
Callable,
@ -92,26 +95,30 @@ _ipcmsg_keys: list[str] = [
fi.name
for fi, k, v
in iter_fields(Error)
]
_body_fields: list[str] = list(
set(_ipcmsg_keys)
# NOTE: don't show fields that either don't provide
# any extra useful info or that are already shown
# as part of `.__repr__()` output.
# XXX NOTE: DON'T-SHOW-FIELDS
# - don't provide any extra useful info or,
# - are already shown as part of `.__repr__()` or,
# - are sub-type specific.
- {
'src_type_str',
'boxed_type_str',
'tb_str',
'relay_path',
'_msg_dict',
'cid',
# since only ctxc should show it but `Error` does
# only ctxc should show it but `Error` does
# have it as an optional field.
'canceller',
# only for MTEs and generally only used
# when devving/testing/debugging.
'_msg_dict',
'_bad_msg',
}
)
@ -146,6 +153,7 @@ def pack_from_raise(
|MsgTypeError
),
cid: str,
hide_tb: bool = True,
**rae_fields,
@ -156,7 +164,7 @@ def pack_from_raise(
`Error`-msg using `pack_error()` to extract the tb info.
'''
__tracebackhide__: bool = True
__tracebackhide__: bool = hide_tb
try:
raise local_err
except type(local_err) as local_err:
@ -231,7 +239,8 @@ class RemoteActorError(Exception):
if (
extra_msgdata
and ipc_msg
and
ipc_msg
):
# XXX mutate the orig msg directly from
# manually provided input params.
@ -261,17 +270,16 @@ class RemoteActorError(Exception):
# either by customizing `ContextCancelled.__init__()` or
# through a special factor func?
elif boxed_type:
boxed_type_str: str = type(boxed_type).__name__
boxed_type_str: str = boxed_type.__name__
if (
ipc_msg
and not self._ipc_msg.boxed_type_str
and
self._ipc_msg.boxed_type_str != boxed_type_str
):
self._ipc_msg.boxed_type_str = boxed_type_str
assert self.boxed_type_str == self._ipc_msg.boxed_type_str
else:
self._extra_msgdata['boxed_type_str'] = boxed_type_str
# ensure any roundtripping evals to the input value
assert self.boxed_type is boxed_type
@property
@ -309,7 +317,9 @@ class RemoteActorError(Exception):
if self._ipc_msg
else {}
)
return self._extra_msgdata | msgdata
return {
k: v for k, v in self._extra_msgdata.items()
} | msgdata
@property
def src_type_str(self) -> str:
@ -502,6 +512,8 @@ class RemoteActorError(Exception):
'''
header: str = ''
body: str = ''
if with_type_header:
header: str = f'<{type(self).__name__}(\n'
@ -525,24 +537,22 @@ class RemoteActorError(Exception):
)
if not with_type_header:
body = '\n' + body
else:
first: str = ''
message: str = self._message
elif message := self._message:
# split off the first line so it isn't indented
# the same like the "boxed content".
if not with_type_header:
lines: list[str] = message.splitlines()
first = lines[0]
message = ''.join(lines[1:])
first: str = lines[0]
message: str = message.removeprefix(first)
else:
first: str = ''
body: str = (
first
+
textwrap.indent(
message,
prefix=' ',
)
message
+
'\n'
)
@ -708,52 +718,72 @@ class MsgTypeError(
]
@property
def msg_dict(self) -> dict[str, Any]:
def bad_msg(self) -> PayloadMsg|None:
'''
If the underlying IPC `MsgType` was received from a remote
actor but was unable to be decoded to a native
`Yield`|`Started`|`Return` struct, the interchange backend
native format decoder can be used to stash a `dict`
version for introspection by the invalidating RPC task.
Ref to the the original invalid IPC shuttle msg which failed
to decode thus providing for the reason for this error.
'''
return self.msgdata.get('_msg_dict')
if (
(_bad_msg := self.msgdata.get('_bad_msg'))
and
isinstance(_bad_msg, PayloadMsg)
):
return _bad_msg
@property
def expected_msg(self) -> MsgType|None:
'''
Attempt to construct what would have been the original
`MsgType`-with-payload subtype (i.e. an instance from the set
of msgs in `.msg.types._payload_msgs`) which failed
validation.
'''
if msg_dict := self.msg_dict.copy():
elif bad_msg_dict := self.bad_msg_as_dict:
return msgtypes.from_dict_msg(
dict_msg=msg_dict,
dict_msg=bad_msg_dict.copy(),
# use_pretty=True,
# ^-TODO-^ would luv to use this BUT then the
# `field_prefix` in `pformat_boxed_tb()` cucks it
# all up.. XD
)
return None
@property
def bad_msg_as_dict(self) -> dict[str, Any]:
'''
If the underlying IPC `MsgType` was received from a remote
actor but was unable to be decoded to a native `PayloadMsg`
(`Yield`|`Started`|`Return`) struct, the interchange backend
native format decoder can be used to stash a `dict` version
for introspection by the invalidating RPC task.
Optionally when this error is constructed from
`.from_decode()` the caller can attempt to construct what
would have been the original `MsgType`-with-payload subtype
(i.e. an instance from the set of msgs in
`.msg.types._payload_msgs`) which failed validation.
'''
return self.msgdata.get('_bad_msg_as_dict')
@property
def expected_msg_type(self) -> Type[MsgType]|None:
return type(self.expected_msg)
return type(self.bad_msg)
@property
def cid(self) -> str:
# pre-packed using `.from_decode()` constructor
return self.msgdata.get('cid')
# pull from required `.bad_msg` ref (or src dict)
if bad_msg := self.bad_msg:
return bad_msg.cid
return self.msgdata['cid']
@classmethod
def from_decode(
cls,
message: str,
ipc_msg: PayloadMsg|None = None,
msgdict: dict|None = None,
bad_msg: PayloadMsg|None = None,
bad_msg_as_dict: dict|None = None,
# if provided, expand and pack all RAE compat fields into the
# `._extra_msgdata` auxillary data `dict` internal to
# `RemoteActorError`.
**extra_msgdata,
) -> MsgTypeError:
'''
@ -763,25 +793,44 @@ class MsgTypeError(
(which is normally the caller of this).
'''
# if provided, expand and pack all RAE compat fields into the
# `._extra_msgdata` auxillary data `dict` internal to
# `RemoteActorError`.
extra_msgdata: dict = {}
if msgdict:
extra_msgdata: dict = {
k: v
for k, v in msgdict.items()
if k in _ipcmsg_keys
}
if bad_msg_as_dict:
# NOTE: original "vanilla decode" of the msg-bytes
# is placed inside a value readable from
# `.msgdata['_msg_dict']`
extra_msgdata['_msg_dict'] = msgdict
extra_msgdata['_bad_msg_as_dict'] = bad_msg_as_dict
# scrape out any underlying fields from the
# msg that failed validation.
for k, v in bad_msg_as_dict.items():
if (
# always skip a duplicate entry
# if already provided as an arg
k == '_bad_msg' and bad_msg
or
# skip anything not in the default msg-field set.
k not in _ipcmsg_keys
# k not in _body_fields
):
continue
extra_msgdata[k] = v
elif bad_msg:
if not isinstance(bad_msg, PayloadMsg):
raise TypeError(
'The provided `bad_msg` is not a `PayloadMsg` type?\n\n'
f'{bad_msg}'
)
extra_msgdata['_bad_msg'] = bad_msg
extra_msgdata['cid'] = bad_msg.cid
if 'cid' not in extra_msgdata:
import pdbp; pdbp.set_trace()
return cls(
message=message,
boxed_type=cls,
ipc_msg=ipc_msg,
**extra_msgdata,
)
@ -836,9 +885,10 @@ class MessagingError(Exception):
def pack_error(
exc: BaseException|RemoteActorError,
tb: str|None = None,
cid: str|None = None,
src_uid: tuple[str, str]|None = None,
tb: TracebackType|None = None,
tb_str: str = '',
) -> Error:
'''
@ -848,10 +898,28 @@ def pack_error(
the receiver side using `unpack_error()` below.
'''
if tb:
tb_str = ''.join(traceback.format_tb(tb))
if not tb_str:
tb_str: str = (
''.join(traceback.format_exception(exc))
# TODO: can we remove this is `exc` is required?
or
# NOTE: this is just a shorthand for the "last error" as
# provided by `sys.exeception()`, see:
# - https://docs.python.org/3/library/traceback.html#traceback.print_exc
# - https://docs.python.org/3/library/traceback.html#traceback.format_exc
traceback.format_exc()
)
else:
tb_str = traceback.format_exc()
if tb_str[-2:] != '\n':
tb_str += '\n'
# when caller provides a tb instance (say pulled from some other
# src error's `.__traceback__`) we use that as the "boxed"
# tb-string instead.
if tb:
# https://docs.python.org/3/library/traceback.html#traceback.format_list
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
error_msg: dict[ # for IPC
str,
@ -1115,7 +1183,7 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None,
is_invalid_payload: bool = False,
src_err_msg: Error|None = None,
# src_err_msg: Error|None = None,
**mte_kwargs,
@ -1164,10 +1232,10 @@ def _mk_msg_type_err(
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
)
msgtyperr = MsgTypeError(
message=message,
ipc_msg=msg,
bad_msg=msg,
)
# ya, might be `None`
msgtyperr.__cause__ = src_type_error
@ -1175,6 +1243,9 @@ def _mk_msg_type_err(
# `Channel.recv()` case
else:
msg_dict: dict|None = None
bad_msg: PayloadMsg|None = None
if is_invalid_payload:
msg_type: str = type(msg)
any_pld: Any = msgpack.decode(msg.pld)
@ -1186,19 +1257,20 @@ def _mk_msg_type_err(
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
# f')>\n\n'
)
# src_err_msg = msg
bad_msg = msg
# TODO: should we just decode the msg to a dict despite
# only the payload being wrong?
# -[ ] maybe the better design is to break this construct
# logic into a separate explicit helper raiser-func?
msg_dict = None
else:
msg: bytes
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
# interchange-prot (i.e. without any `msgspec.Struct`
# handling) so that we can determine what
# `.msg.types.PayloadMsg` is the culprit by reporting the
# received value.
msg: bytes
msg_dict: dict = msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
@ -1235,9 +1307,13 @@ def _mk_msg_type_err(
if verb_header:
message = f'{verb_header} ' + message
# if not isinstance(bad_msg, PayloadMsg):
# import pdbp; pdbp.set_trace()
msgtyperr = MsgTypeError.from_decode(
message=message,
msgdict=msg_dict,
bad_msg=bad_msg,
bad_msg_as_dict=msg_dict,
# NOTE: for the send-side `.started()` pld-validate
# case we actually set the `._ipc_msg` AFTER we return
@ -1245,7 +1321,7 @@ def _mk_msg_type_err(
# want to emulate the `Error` from the mte we build here
# Bo
# so by default in that case this is set to `None`
ipc_msg=src_err_msg,
# ipc_msg=src_err_msg,
)
msgtyperr.__cause__ = src_validation_error
return msgtyperr

View File

@ -47,7 +47,7 @@ from tractor._exceptions import (
_raise_from_unexpected_msg,
MsgTypeError,
_mk_msg_type_err,
pack_from_raise,
pack_error,
)
from tractor._state import current_ipc_ctx
from ._codec import (
@ -203,7 +203,6 @@ class PldRx(Struct):
msg: MsgType = (
ipc_msg
or
# async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive()
)
@ -223,6 +222,10 @@ class PldRx(Struct):
raise_error: bool = True,
hide_tb: bool = True,
# XXX for special (default?) case of send side call with
# `Context.started(validate_pld_spec=True)`
is_started_send_side: bool = False,
) -> PayloadT|Raw:
'''
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
@ -230,8 +233,6 @@ class PldRx(Struct):
'''
__tracebackhide__: bool = hide_tb
_src_err = None
src_err: BaseException|None = None
match msg:
# payload-data shuttle msg; deliver the `.pld` value
@ -256,18 +257,58 @@ class PldRx(Struct):
# pack mgterr into error-msg for
# reraise below; ensure remote-actor-err
# info is displayed nicely?
msgterr: MsgTypeError = _mk_msg_type_err(
mte: MsgTypeError = _mk_msg_type_err(
msg=msg,
codec=self.pld_dec,
src_validation_error=valerr,
is_invalid_payload=True,
expected_msg=expect_msg,
# ipc_msg=msg,
)
msg: Error = pack_from_raise(
local_err=msgterr,
# NOTE: override the `msg` passed to
# `_raise_from_unexpected_msg()` (below) so so that
# we're effectively able to use that same func to
# unpack and raise an "emulated remote `Error`" of
# this local MTE.
err_msg: Error = pack_error(
exc=mte,
cid=msg.cid,
src_uid=ipc.chan.uid,
src_uid=(
ipc.chan.uid
if not is_started_send_side
else ipc._actor.uid
),
# tb=valerr.__traceback__,
tb_str=mte._message,
)
# ^-TODO-^ just raise this inline instead of all the
# pack-unpack-repack non-sense!
mte._ipc_msg = err_msg
msg = err_msg
# set emulated remote error more-or-less as the
# runtime would
ctx: Context = getattr(ipc, 'ctx', ipc)
# TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y..
if (
expect_msg is not Started
and not is_started_send_side
):
ctx._maybe_cancel_and_set_remote_error(mte)
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises
# it from `None`.
src_err = valerr
# if is_started_send_side:
# src_err = None
# XXX some other decoder specific failure?
# except TypeError as src_error:
@ -379,6 +420,7 @@ class PldRx(Struct):
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
@ -387,6 +429,7 @@ class PldRx(Struct):
the pair of refs.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
@ -401,6 +444,7 @@ class PldRx(Struct):
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**kwargs,
)
return msg, pld
@ -414,7 +458,7 @@ def limit_plds(
) -> MsgDec:
'''
Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`.
@ -691,3 +735,11 @@ async def drain_to_final_msg(
result_msg,
pre_result_drained,
)
# TODO: factor logic from `.Context.started()` for send-side
# validate raising!
def validate_payload_msg(
msg: Started|Yield|Return,
) -> MsgTypeError|None:
...