Compare commits

..

No commits in common. "eee4c61b51e6d3053549e67650be04fcd03ab2d5" and "e4ec6b7b0c3401b336e03a4ea7b42015480b7677" have entirely different histories.

4 changed files with 148 additions and 586 deletions

View File

@ -1,316 +0,0 @@
'''
Audit sub-sys APIs from `.msg._ops`
mostly for ensuring correct `contextvars`
related settings around IPC contexts.
'''
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
# import typing
from typing import (
# Any,
TypeAlias,
# Union,
)
from contextvars import (
Context,
)
from msgspec import (
# structs,
# msgpack,
Struct,
# ValidationError,
)
import pytest
import trio
import tractor
from tractor import (
# _state,
MsgTypeError,
current_ipc_ctx,
Portal,
)
from tractor.msg import (
_ops as msgops,
Return,
)
from tractor.msg import (
_codec,
# _ctxvar_MsgCodec,
# NamespacePath,
# MsgCodec,
# mk_codec,
# apply_codec,
# current_codec,
)
from tractor.msg.types import (
log,
# _payload_msgs,
# PayloadMsg,
# Started,
# mk_msg_spec,
)
class PldMsg(Struct):
field: str
maybe_msg_spec = PldMsg|None
@cm
def custom_spec(
ctx: Context,
spec: TypeAlias,
) -> _codec.MsgCodec:
'''
Apply a custom payload spec, remove on exit.
'''
rx: msgops.PldRx = ctx._pld_rx
@acm
async def maybe_expect_raises(
raises: BaseException|None = None,
ensure_in_message: list[str]|None = None,
reraise: bool = False,
timeout: int = 3,
) -> None:
'''
Async wrapper for ensuring errors propagate from the inner scope.
'''
with trio.fail_after(timeout):
try:
yield
except BaseException as _inner_err:
inner_err = _inner_err
# wasn't-expected to error..
if raises is None:
raise
else:
assert type(inner_err) is raises
# maybe check for error txt content
if ensure_in_message:
part: str
for part in ensure_in_message:
for i, arg in enumerate(inner_err.args):
if part in arg:
break
# if part never matches an arg, then we're
# missing a match.
else:
raise ValueError(
'Failed to find error message content?\n\n'
f'expected: {ensure_in_message!r}\n'
f'part: {part!r}\n\n'
f'{inner_err.args}'
)
if reraise:
raise inner_err
else:
if raises:
raise RuntimeError(
f'Expected a {raises.__name__!r} to be raised?'
)
@tractor.context
async def child(
ctx: Context,
started_value: int|PldMsg|None,
return_value: str|None,
validate_pld_spec: bool,
raise_on_started_mte: bool = True,
) -> None:
'''
Call ``Context.started()`` more then once (an error).
'''
expect_started_mte: bool = started_value == 10
# sanaity check that child RPC context is the current one
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
rx: msgops.PldRx = ctx._pld_rx
orig_pldec: _codec.MsgDec = rx.pld_dec
# senity that default pld-spec should be set
assert (
rx.pld_dec
is
msgops._def_any_pldec
)
try:
with msgops.limit_plds(
spec=maybe_msg_spec,
) as pldec:
# sanity on `MsgDec` state
assert rx.pld_dec is pldec
assert pldec.spec is maybe_msg_spec
# 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation
try:
await ctx.started(
value=started_value,
validate_pld_spec=validate_pld_spec,
)
except MsgTypeError:
log.exception('started()` raised an MTE!\n')
if not expect_started_mte:
raise RuntimeError(
'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
# propagate to parent?
if raise_on_started_mte:
raise
else:
if expect_started_mte:
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
# XXX should always fail on recv side since we can't
# really do much else beside terminate and relay the
# msg-type-error from this RPC task ;)
return return_value
finally:
# sanity on `limit_plds()` reversion
assert (
rx.pld_dec
is
msgops._def_any_pldec
)
log.runtime(
'Reverted to previous pld-spec\n\n'
f'{orig_pldec}\n'
)
@pytest.mark.parametrize(
'return_value',
[
None,
'yo',
],
ids=[
'return[invalid-"yo"]',
'return[valid-None]',
],
)
@pytest.mark.parametrize(
'started_value',
[
10,
PldMsg(field='yo'),
],
ids=[
'Started[invalid-10]',
'Started[valid-PldMsg]',
],
)
@pytest.mark.parametrize(
'pld_check_started_value',
[
True,
False,
],
ids=[
'check-started-pld',
'no-started-pld-validate',
],
)
def test_basic_payload_spec(
debug_mode: bool,
loglevel: str,
return_value: str|None,
started_value: int|PldMsg,
pld_check_started_value: bool,
):
'''
Validate the most basic `PldRx` msg-type-spec semantics around
a IPC `Context` endpoint start, started-sync, and final return
value depending on set payload types and the currently applied
pld-spec.
'''
invalid_return: bool = return_value == 'yo'
invalid_started: bool = started_value == 10
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
loglevel=loglevel,
) as an:
p: Portal = await an.start_actor(
'child',
enable_modules=[__name__],
)
# since not opened yet.
assert current_ipc_ctx() is None
async with (
maybe_expect_raises(
raises=MsgTypeError if (
invalid_return
or
invalid_started
) else None,
ensure_in_message=[
"invalid `Return` payload",
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
],
),
p.open_context(
child,
return_value=return_value,
started_value=started_value,
pld_spec=maybe_msg_spec,
validate_pld_spec=pld_check_started_value,
) as (ctx, first),
):
# now opened with 'child' sub
assert current_ipc_ctx() is ctx
assert type(first) is PldMsg
assert first.field == 'yo'
try:
assert (await ctx.result()) is None
except MsgTypeError as mte:
if not invalid_return:
raise
else: # expected this invalid `Return.pld`
assert mte.cid == ctx.cid
# verify expected remote mte deats
await tractor.pause()
assert ctx._remote_error is mte
assert mte.expected_msg_type is Return
await p.cancel_actor()
trio.run(main)

View File

@ -15,22 +15,12 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
The fundamental cross-process SC abstraction: an inter-actor, The fundamental cross process SC abstraction: an inter-actor,
transitively cancel-scope linked, (dual) task IPC coupled "context". cancel-scope linked task "context".
A `Context` is very similar to the look and feel of the A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
`.cancel_scope: trio.CancelScope` built into each `trio.Nursery` into each ``trio.Nursery`` except it links the lifetimes of memory space
except that it links the lifetimes of 2 memory space disjoint, disjoint, parallel executing tasks in separate actors.
parallel executing, tasks scheduled in separate "actors".
So while a `trio.Nursery` has a `.parent_task` which exists both
before (open) and then inside the body of the `async with` of the
nursery's scope (/block), a `Context` contains 2 tasks, a "parent"
and a "child" side, where both execute independently in separate
memory domains of different (host's) processes linked through
a SC-transitive IPC "shuttle dialog protocol". The underlying IPC
dialog-(un)protocol allows for the maintainance of SC properties
end-2-end between the tasks.
''' '''
from __future__ import annotations from __future__ import annotations
@ -81,11 +71,13 @@ from .msg import (
MsgCodec, MsgCodec,
NamespacePath, NamespacePath,
PayloadT, PayloadT,
Return,
Started, Started,
Stop, Stop,
Yield, Yield,
current_codec, current_codec,
pretty_struct, pretty_struct,
types as msgtypes,
_ops as msgops, _ops as msgops,
) )
from ._ipc import ( from ._ipc import (
@ -98,7 +90,7 @@ from ._state import (
debug_mode, debug_mode,
_ctxvar_Context, _ctxvar_Context,
) )
# ------ - ------
if TYPE_CHECKING: if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
@ -1606,15 +1598,16 @@ class Context:
async def started( async def started(
self, self,
# TODO: how to type this so that it's the
# same as the payload type? Is this enough?
value: PayloadT|None = None, value: PayloadT|None = None,
validate_pld_spec: bool = True,
strict_pld_parity: bool = False,
# TODO: this will always emit for msgpack for any () vs. [] strict_parity: bool = False,
# inside the value.. do we want to offer warnings on that?
# complain_no_parity: bool = False,
hide_tb: bool = True, # TODO: this will always emit now that we do `.pld: Raw`
# passthrough.. so maybe just only complain when above strict
# flag is set?
complain_no_parity: bool = False,
) -> None: ) -> None:
''' '''
@ -1655,54 +1648,63 @@ class Context:
# #
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
# #
__tracebackhide__: bool = hide_tb codec: MsgCodec = current_codec()
if validate_pld_spec: msg_bytes: bytes = codec.encode(started_msg)
# __tracebackhide__: bool = False try:
codec: MsgCodec = current_codec() # be a "cheap" dialog (see above!)
msg_bytes: bytes = codec.encode(started_msg) if (
try: strict_parity
roundtripped: Started = codec.decode(msg_bytes) or
# pld: PayloadT = await self.pld_rx.recv_pld( complain_no_parity
pld: PayloadT = self.pld_rx.dec_msg( ):
msg=roundtripped, rt_started: Started = codec.decode(msg_bytes)
ipc=self,
expect_msg=Started, # XXX something is prolly totes cucked with the
hide_tb=hide_tb, # codec state!
is_started_send_side=True, if isinstance(rt_started, dict):
) rt_started = msgtypes.from_dict_msg(
if ( dict_msg=rt_started,
strict_pld_parity )
and raise RuntimeError(
pld != value 'Failed to roundtrip `Started` msg?\n'
): f'{pretty_struct.pformat(rt_started)}\n'
)
if rt_started != started_msg:
# TODO: make that one a mod func too.. # TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__( diff = pretty_struct.Struct.__sub__(
roundtripped, rt_started,
started_msg, started_msg,
) )
complaint: str = ( complaint: str = (
'Started value does not match after roundtrip?\n\n' 'Started value does not match after roundtrip?\n\n'
f'{diff}' f'{diff}'
) )
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT! # TODO: rn this will pretty much always fail with
except ValidationError as verr: # any other sequence type embeded in the
# always show this src frame in the tb # payload...
# __tracebackhide__: bool = False if (
raise _mk_msg_type_err( self._strict_started
msg=roundtripped, or
codec=codec, strict_parity
src_validation_error=verr, ):
verb_header='Trying to send ', raise ValueError(complaint)
is_invalid_payload=True, else:
) from verr log.warning(complaint)
# TODO: maybe a flag to by-pass encode op if already done await self.chan.send(started_msg)
# here in caller?
await self.chan.send(started_msg) # raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
raise _mk_msg_type_err(
msg=msg_bytes,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send payload'
# > 'invalid `Started IPC msgs\n'
) from verr
# set msg-related internal runtime-state
self._started_called = True self._started_called = True
self._started_msg = started_msg self._started_msg = started_msg
self._started_pld = value self._started_pld = value
@ -1995,7 +1997,12 @@ async def open_context_from_portal(
pld_spec: TypeAlias|None = None, pld_spec: TypeAlias|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
hide_tb: bool = True,
# TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show? for now
# we DO SHOW this frame since it's awkward ow..
hide_tb: bool = False,
# proxied to RPC # proxied to RPC
**kwargs, **kwargs,
@ -2108,7 +2115,6 @@ async def open_context_from_portal(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
passthrough_non_pld_msgs=False, passthrough_non_pld_msgs=False,
hide_tb=hide_tb,
) )
# from .devx import pause # from .devx import pause

View File

@ -22,9 +22,6 @@ from __future__ import annotations
import builtins import builtins
import importlib import importlib
from pprint import pformat from pprint import pformat
from types import (
TracebackType,
)
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -95,30 +92,26 @@ _ipcmsg_keys: list[str] = [
fi.name fi.name
for fi, k, v for fi, k, v
in iter_fields(Error) in iter_fields(Error)
] ]
_body_fields: list[str] = list( _body_fields: list[str] = list(
set(_ipcmsg_keys) set(_ipcmsg_keys)
# XXX NOTE: DON'T-SHOW-FIELDS # NOTE: don't show fields that either don't provide
# - don't provide any extra useful info or, # any extra useful info or that are already shown
# - are already shown as part of `.__repr__()` or, # as part of `.__repr__()` output.
# - are sub-type specific.
- { - {
'src_type_str', 'src_type_str',
'boxed_type_str', 'boxed_type_str',
'tb_str', 'tb_str',
'relay_path', 'relay_path',
'_msg_dict',
'cid', 'cid',
# only ctxc should show it but `Error` does # since only ctxc should show it but `Error` does
# have it as an optional field. # have it as an optional field.
'canceller', 'canceller',
# only for MTEs and generally only used
# when devving/testing/debugging.
'_msg_dict',
'_bad_msg',
} }
) )
@ -153,7 +146,6 @@ def pack_from_raise(
|MsgTypeError |MsgTypeError
), ),
cid: str, cid: str,
hide_tb: bool = True,
**rae_fields, **rae_fields,
@ -164,7 +156,7 @@ def pack_from_raise(
`Error`-msg using `pack_error()` to extract the tb info. `Error`-msg using `pack_error()` to extract the tb info.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = True
try: try:
raise local_err raise local_err
except type(local_err) as local_err: except type(local_err) as local_err:
@ -239,8 +231,7 @@ class RemoteActorError(Exception):
if ( if (
extra_msgdata extra_msgdata
and and ipc_msg
ipc_msg
): ):
# XXX mutate the orig msg directly from # XXX mutate the orig msg directly from
# manually provided input params. # manually provided input params.
@ -270,16 +261,17 @@ class RemoteActorError(Exception):
# either by customizing `ContextCancelled.__init__()` or # either by customizing `ContextCancelled.__init__()` or
# through a special factor func? # through a special factor func?
elif boxed_type: elif boxed_type:
boxed_type_str: str = boxed_type.__name__ boxed_type_str: str = type(boxed_type).__name__
if ( if (
ipc_msg ipc_msg
and and not self._ipc_msg.boxed_type_str
self._ipc_msg.boxed_type_str != boxed_type_str
): ):
self._ipc_msg.boxed_type_str = boxed_type_str self._ipc_msg.boxed_type_str = boxed_type_str
assert self.boxed_type_str == self._ipc_msg.boxed_type_str assert self.boxed_type_str == self._ipc_msg.boxed_type_str
# ensure any roundtripping evals to the input value else:
self._extra_msgdata['boxed_type_str'] = boxed_type_str
assert self.boxed_type is boxed_type assert self.boxed_type is boxed_type
@property @property
@ -317,9 +309,7 @@ class RemoteActorError(Exception):
if self._ipc_msg if self._ipc_msg
else {} else {}
) )
return { return self._extra_msgdata | msgdata
k: v for k, v in self._extra_msgdata.items()
} | msgdata
@property @property
def src_type_str(self) -> str: def src_type_str(self) -> str:
@ -512,8 +502,6 @@ class RemoteActorError(Exception):
''' '''
header: str = '' header: str = ''
body: str = ''
if with_type_header: if with_type_header:
header: str = f'<{type(self).__name__}(\n' header: str = f'<{type(self).__name__}(\n'
@ -537,22 +525,24 @@ class RemoteActorError(Exception):
) )
if not with_type_header: if not with_type_header:
body = '\n' + body body = '\n' + body
else:
first: str = ''
message: str = self._message
elif message := self._message:
# split off the first line so it isn't indented # split off the first line so it isn't indented
# the same like the "boxed content". # the same like the "boxed content".
if not with_type_header: if not with_type_header:
lines: list[str] = message.splitlines() lines: list[str] = message.splitlines()
first: str = lines[0] first = lines[0]
message: str = message.removeprefix(first) message = ''.join(lines[1:])
else:
first: str = ''
body: str = ( body: str = (
first first
+ +
message textwrap.indent(
message,
prefix=' ',
)
+ +
'\n' '\n'
) )
@ -718,72 +708,52 @@ class MsgTypeError(
] ]
@property @property
def bad_msg(self) -> PayloadMsg|None: def msg_dict(self) -> dict[str, Any]:
''' '''
Ref to the the original invalid IPC shuttle msg which failed If the underlying IPC `MsgType` was received from a remote
to decode thus providing for the reason for this error. actor but was unable to be decoded to a native
`Yield`|`Started`|`Return` struct, the interchange backend
native format decoder can be used to stash a `dict`
version for introspection by the invalidating RPC task.
''' '''
if ( return self.msgdata.get('_msg_dict')
(_bad_msg := self.msgdata.get('_bad_msg'))
and
isinstance(_bad_msg, PayloadMsg)
):
return _bad_msg
elif bad_msg_dict := self.bad_msg_as_dict: @property
def expected_msg(self) -> MsgType|None:
'''
Attempt to construct what would have been the original
`MsgType`-with-payload subtype (i.e. an instance from the set
of msgs in `.msg.types._payload_msgs`) which failed
validation.
'''
if msg_dict := self.msg_dict.copy():
return msgtypes.from_dict_msg( return msgtypes.from_dict_msg(
dict_msg=bad_msg_dict.copy(), dict_msg=msg_dict,
# use_pretty=True, # use_pretty=True,
# ^-TODO-^ would luv to use this BUT then the # ^-TODO-^ would luv to use this BUT then the
# `field_prefix` in `pformat_boxed_tb()` cucks it # `field_prefix` in `pformat_boxed_tb()` cucks it
# all up.. XD # all up.. XD
) )
return None return None
@property
def bad_msg_as_dict(self) -> dict[str, Any]:
'''
If the underlying IPC `MsgType` was received from a remote
actor but was unable to be decoded to a native `PayloadMsg`
(`Yield`|`Started`|`Return`) struct, the interchange backend
native format decoder can be used to stash a `dict` version
for introspection by the invalidating RPC task.
Optionally when this error is constructed from
`.from_decode()` the caller can attempt to construct what
would have been the original `MsgType`-with-payload subtype
(i.e. an instance from the set of msgs in
`.msg.types._payload_msgs`) which failed validation.
'''
return self.msgdata.get('_bad_msg_as_dict')
@property @property
def expected_msg_type(self) -> Type[MsgType]|None: def expected_msg_type(self) -> Type[MsgType]|None:
return type(self.bad_msg) return type(self.expected_msg)
@property @property
def cid(self) -> str: def cid(self) -> str:
# pull from required `.bad_msg` ref (or src dict) # pre-packed using `.from_decode()` constructor
if bad_msg := self.bad_msg: return self.msgdata.get('cid')
return bad_msg.cid
return self.msgdata['cid']
@classmethod @classmethod
def from_decode( def from_decode(
cls, cls,
message: str, message: str,
bad_msg: PayloadMsg|None = None, ipc_msg: PayloadMsg|None = None,
bad_msg_as_dict: dict|None = None, msgdict: dict|None = None,
# if provided, expand and pack all RAE compat fields into the
# `._extra_msgdata` auxillary data `dict` internal to
# `RemoteActorError`.
**extra_msgdata,
) -> MsgTypeError: ) -> MsgTypeError:
''' '''
@ -793,44 +763,25 @@ class MsgTypeError(
(which is normally the caller of this). (which is normally the caller of this).
''' '''
if bad_msg_as_dict: # if provided, expand and pack all RAE compat fields into the
# `._extra_msgdata` auxillary data `dict` internal to
# `RemoteActorError`.
extra_msgdata: dict = {}
if msgdict:
extra_msgdata: dict = {
k: v
for k, v in msgdict.items()
if k in _ipcmsg_keys
}
# NOTE: original "vanilla decode" of the msg-bytes # NOTE: original "vanilla decode" of the msg-bytes
# is placed inside a value readable from # is placed inside a value readable from
# `.msgdata['_msg_dict']` # `.msgdata['_msg_dict']`
extra_msgdata['_bad_msg_as_dict'] = bad_msg_as_dict extra_msgdata['_msg_dict'] = msgdict
# scrape out any underlying fields from the
# msg that failed validation.
for k, v in bad_msg_as_dict.items():
if (
# always skip a duplicate entry
# if already provided as an arg
k == '_bad_msg' and bad_msg
or
# skip anything not in the default msg-field set.
k not in _ipcmsg_keys
# k not in _body_fields
):
continue
extra_msgdata[k] = v
elif bad_msg:
if not isinstance(bad_msg, PayloadMsg):
raise TypeError(
'The provided `bad_msg` is not a `PayloadMsg` type?\n\n'
f'{bad_msg}'
)
extra_msgdata['_bad_msg'] = bad_msg
extra_msgdata['cid'] = bad_msg.cid
if 'cid' not in extra_msgdata:
import pdbp; pdbp.set_trace()
return cls( return cls(
message=message, message=message,
boxed_type=cls, boxed_type=cls,
ipc_msg=ipc_msg,
**extra_msgdata, **extra_msgdata,
) )
@ -885,10 +836,9 @@ class MessagingError(Exception):
def pack_error( def pack_error(
exc: BaseException|RemoteActorError, exc: BaseException|RemoteActorError,
tb: str|None = None,
cid: str|None = None, cid: str|None = None,
src_uid: tuple[str, str]|None = None, src_uid: tuple[str, str]|None = None,
tb: TracebackType|None = None,
tb_str: str = '',
) -> Error: ) -> Error:
''' '''
@ -898,28 +848,10 @@ def pack_error(
the receiver side using `unpack_error()` below. the receiver side using `unpack_error()` below.
''' '''
if not tb_str:
tb_str: str = (
''.join(traceback.format_exception(exc))
# TODO: can we remove this is `exc` is required?
or
# NOTE: this is just a shorthand for the "last error" as
# provided by `sys.exeception()`, see:
# - https://docs.python.org/3/library/traceback.html#traceback.print_exc
# - https://docs.python.org/3/library/traceback.html#traceback.format_exc
traceback.format_exc()
)
else:
if tb_str[-2:] != '\n':
tb_str += '\n'
# when caller provides a tb instance (say pulled from some other
# src error's `.__traceback__`) we use that as the "boxed"
# tb-string instead.
if tb: if tb:
# https://docs.python.org/3/library/traceback.html#traceback.format_list tb_str = ''.join(traceback.format_tb(tb))
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str else:
tb_str = traceback.format_exc()
error_msg: dict[ # for IPC error_msg: dict[ # for IPC
str, str,
@ -1183,7 +1115,7 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None, src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None, src_type_error: TypeError|None = None,
is_invalid_payload: bool = False, is_invalid_payload: bool = False,
# src_err_msg: Error|None = None, src_err_msg: Error|None = None,
**mte_kwargs, **mte_kwargs,
@ -1232,10 +1164,10 @@ def _mk_msg_type_err(
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
) )
msgtyperr = MsgTypeError( msgtyperr = MsgTypeError(
message=message, message=message,
ipc_msg=msg, ipc_msg=msg,
bad_msg=msg,
) )
# ya, might be `None` # ya, might be `None`
msgtyperr.__cause__ = src_type_error msgtyperr.__cause__ = src_type_error
@ -1243,9 +1175,6 @@ def _mk_msg_type_err(
# `Channel.recv()` case # `Channel.recv()` case
else: else:
msg_dict: dict|None = None
bad_msg: PayloadMsg|None = None
if is_invalid_payload: if is_invalid_payload:
msg_type: str = type(msg) msg_type: str = type(msg)
any_pld: Any = msgpack.decode(msg.pld) any_pld: Any = msgpack.decode(msg.pld)
@ -1257,20 +1186,19 @@ def _mk_msg_type_err(
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n' # f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
# f')>\n\n' # f')>\n\n'
) )
# src_err_msg = msg
bad_msg = msg
# TODO: should we just decode the msg to a dict despite # TODO: should we just decode the msg to a dict despite
# only the payload being wrong? # only the payload being wrong?
# -[ ] maybe the better design is to break this construct # -[ ] maybe the better design is to break this construct
# logic into a separate explicit helper raiser-func? # logic into a separate explicit helper raiser-func?
msg_dict = None
else: else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any `msgspec.Struct`
# handling) so that we can determine what
# `.msg.types.PayloadMsg` is the culprit by reporting the
# received value.
msg: bytes msg: bytes
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgpack.decode(msg) msg_dict: dict = msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type'] msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name) msg_type = getattr(msgtypes, msg_type_name)
@ -1307,13 +1235,9 @@ def _mk_msg_type_err(
if verb_header: if verb_header:
message = f'{verb_header} ' + message message = f'{verb_header} ' + message
# if not isinstance(bad_msg, PayloadMsg):
# import pdbp; pdbp.set_trace()
msgtyperr = MsgTypeError.from_decode( msgtyperr = MsgTypeError.from_decode(
message=message, message=message,
bad_msg=bad_msg, msgdict=msg_dict,
bad_msg_as_dict=msg_dict,
# NOTE: for the send-side `.started()` pld-validate # NOTE: for the send-side `.started()` pld-validate
# case we actually set the `._ipc_msg` AFTER we return # case we actually set the `._ipc_msg` AFTER we return
@ -1321,7 +1245,7 @@ def _mk_msg_type_err(
# want to emulate the `Error` from the mte we build here # want to emulate the `Error` from the mte we build here
# Bo # Bo
# so by default in that case this is set to `None` # so by default in that case this is set to `None`
# ipc_msg=src_err_msg, ipc_msg=src_err_msg,
) )
msgtyperr.__cause__ = src_validation_error msgtyperr.__cause__ = src_validation_error
return msgtyperr return msgtyperr

View File

@ -47,7 +47,7 @@ from tractor._exceptions import (
_raise_from_unexpected_msg, _raise_from_unexpected_msg,
MsgTypeError, MsgTypeError,
_mk_msg_type_err, _mk_msg_type_err,
pack_error, pack_from_raise,
) )
from tractor._state import current_ipc_ctx from tractor._state import current_ipc_ctx
from ._codec import ( from ._codec import (
@ -203,6 +203,7 @@ class PldRx(Struct):
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ipc._rx_chan.receive()
) )
@ -222,10 +223,6 @@ class PldRx(Struct):
raise_error: bool = True, raise_error: bool = True,
hide_tb: bool = True, hide_tb: bool = True,
# XXX for special (default?) case of send side call with
# `Context.started(validate_pld_spec=True)`
is_started_send_side: bool = False,
) -> PayloadT|Raw: ) -> PayloadT|Raw:
''' '''
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
@ -233,6 +230,8 @@ class PldRx(Struct):
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
_src_err = None
src_err: BaseException|None = None src_err: BaseException|None = None
match msg: match msg:
# payload-data shuttle msg; deliver the `.pld` value # payload-data shuttle msg; deliver the `.pld` value
@ -257,58 +256,18 @@ class PldRx(Struct):
# pack mgterr into error-msg for # pack mgterr into error-msg for
# reraise below; ensure remote-actor-err # reraise below; ensure remote-actor-err
# info is displayed nicely? # info is displayed nicely?
mte: MsgTypeError = _mk_msg_type_err( msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg, msg=msg,
codec=self.pld_dec, codec=self.pld_dec,
src_validation_error=valerr, src_validation_error=valerr,
is_invalid_payload=True, is_invalid_payload=True,
expected_msg=expect_msg,
# ipc_msg=msg,
) )
# NOTE: override the `msg` passed to msg: Error = pack_from_raise(
# `_raise_from_unexpected_msg()` (below) so so that local_err=msgterr,
# we're effectively able to use that same func to
# unpack and raise an "emulated remote `Error`" of
# this local MTE.
err_msg: Error = pack_error(
exc=mte,
cid=msg.cid, cid=msg.cid,
src_uid=( src_uid=ipc.chan.uid,
ipc.chan.uid
if not is_started_send_side
else ipc._actor.uid
),
# tb=valerr.__traceback__,
tb_str=mte._message,
) )
# ^-TODO-^ just raise this inline instead of all the
# pack-unpack-repack non-sense!
mte._ipc_msg = err_msg
msg = err_msg
# set emulated remote error more-or-less as the
# runtime would
ctx: Context = getattr(ipc, 'ctx', ipc)
# TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y..
if (
expect_msg is not Started
and not is_started_send_side
):
ctx._maybe_cancel_and_set_remote_error(mte)
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises
# it from `None`.
src_err = valerr src_err = valerr
# if is_started_send_side:
# src_err = None
# XXX some other decoder specific failure? # XXX some other decoder specific failure?
# except TypeError as src_error: # except TypeError as src_error:
@ -420,7 +379,6 @@ class PldRx(Struct):
# NOTE: generally speaking only for handling `Stop`-msgs that # NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above! # arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True, passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**kwargs, **kwargs,
) -> tuple[MsgType, PayloadT]: ) -> tuple[MsgType, PayloadT]:
@ -429,7 +387,6 @@ class PldRx(Struct):
the pair of refs. the pair of refs.
''' '''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive() msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs: if passthrough_non_pld_msgs:
@ -444,7 +401,6 @@ class PldRx(Struct):
msg, msg,
ipc=ipc, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=hide_tb,
**kwargs, **kwargs,
) )
return msg, pld return msg, pld
@ -458,7 +414,7 @@ def limit_plds(
) -> MsgDec: ) -> MsgDec:
''' '''
Apply a `MsgCodec` that will natively decode the SC-msg set's Apply a `MsgCodec` that will natively decode the SC-msg set's
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using `Msg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types` tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`. for all IPC contexts in use by the current `trio.Task`.
@ -735,11 +691,3 @@ async def drain_to_final_msg(
result_msg, result_msg,
pre_result_drained, pre_result_drained,
) )
# TODO: factor logic from `.Context.started()` for send-side
# validate raising!
def validate_payload_msg(
msg: Started|Yield|Return,
) -> MsgTypeError|None:
...