Compare commits
3 Commits
e4ec6b7b0c
...
eee4c61b51
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | eee4c61b51 | |
Tyler Goodlet | 42ba855d1b | |
Tyler Goodlet | c2cc12e14f |
|
@ -0,0 +1,316 @@
|
||||||
|
'''
|
||||||
|
Audit sub-sys APIs from `.msg._ops`
|
||||||
|
mostly for ensuring correct `contextvars`
|
||||||
|
related settings around IPC contexts.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
contextmanager as cm,
|
||||||
|
)
|
||||||
|
# import typing
|
||||||
|
from typing import (
|
||||||
|
# Any,
|
||||||
|
TypeAlias,
|
||||||
|
# Union,
|
||||||
|
)
|
||||||
|
from contextvars import (
|
||||||
|
Context,
|
||||||
|
)
|
||||||
|
|
||||||
|
from msgspec import (
|
||||||
|
# structs,
|
||||||
|
# msgpack,
|
||||||
|
Struct,
|
||||||
|
# ValidationError,
|
||||||
|
)
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
from tractor import (
|
||||||
|
# _state,
|
||||||
|
MsgTypeError,
|
||||||
|
current_ipc_ctx,
|
||||||
|
Portal,
|
||||||
|
)
|
||||||
|
from tractor.msg import (
|
||||||
|
_ops as msgops,
|
||||||
|
Return,
|
||||||
|
)
|
||||||
|
from tractor.msg import (
|
||||||
|
_codec,
|
||||||
|
# _ctxvar_MsgCodec,
|
||||||
|
|
||||||
|
# NamespacePath,
|
||||||
|
# MsgCodec,
|
||||||
|
# mk_codec,
|
||||||
|
# apply_codec,
|
||||||
|
# current_codec,
|
||||||
|
)
|
||||||
|
from tractor.msg.types import (
|
||||||
|
log,
|
||||||
|
# _payload_msgs,
|
||||||
|
# PayloadMsg,
|
||||||
|
# Started,
|
||||||
|
# mk_msg_spec,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class PldMsg(Struct):
|
||||||
|
field: str
|
||||||
|
|
||||||
|
|
||||||
|
maybe_msg_spec = PldMsg|None
|
||||||
|
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def custom_spec(
|
||||||
|
ctx: Context,
|
||||||
|
spec: TypeAlias,
|
||||||
|
) -> _codec.MsgCodec:
|
||||||
|
'''
|
||||||
|
Apply a custom payload spec, remove on exit.
|
||||||
|
|
||||||
|
'''
|
||||||
|
rx: msgops.PldRx = ctx._pld_rx
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def maybe_expect_raises(
|
||||||
|
raises: BaseException|None = None,
|
||||||
|
ensure_in_message: list[str]|None = None,
|
||||||
|
|
||||||
|
reraise: bool = False,
|
||||||
|
timeout: int = 3,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Async wrapper for ensuring errors propagate from the inner scope.
|
||||||
|
|
||||||
|
'''
|
||||||
|
with trio.fail_after(timeout):
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except BaseException as _inner_err:
|
||||||
|
inner_err = _inner_err
|
||||||
|
# wasn't-expected to error..
|
||||||
|
if raises is None:
|
||||||
|
raise
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert type(inner_err) is raises
|
||||||
|
|
||||||
|
# maybe check for error txt content
|
||||||
|
if ensure_in_message:
|
||||||
|
part: str
|
||||||
|
for part in ensure_in_message:
|
||||||
|
for i, arg in enumerate(inner_err.args):
|
||||||
|
if part in arg:
|
||||||
|
break
|
||||||
|
# if part never matches an arg, then we're
|
||||||
|
# missing a match.
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
'Failed to find error message content?\n\n'
|
||||||
|
f'expected: {ensure_in_message!r}\n'
|
||||||
|
f'part: {part!r}\n\n'
|
||||||
|
f'{inner_err.args}'
|
||||||
|
)
|
||||||
|
|
||||||
|
if reraise:
|
||||||
|
raise inner_err
|
||||||
|
|
||||||
|
else:
|
||||||
|
if raises:
|
||||||
|
raise RuntimeError(
|
||||||
|
f'Expected a {raises.__name__!r} to be raised?'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def child(
|
||||||
|
ctx: Context,
|
||||||
|
started_value: int|PldMsg|None,
|
||||||
|
return_value: str|None,
|
||||||
|
validate_pld_spec: bool,
|
||||||
|
raise_on_started_mte: bool = True,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Call ``Context.started()`` more then once (an error).
|
||||||
|
|
||||||
|
'''
|
||||||
|
expect_started_mte: bool = started_value == 10
|
||||||
|
|
||||||
|
# sanaity check that child RPC context is the current one
|
||||||
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
|
assert ctx is curr_ctx
|
||||||
|
|
||||||
|
rx: msgops.PldRx = ctx._pld_rx
|
||||||
|
orig_pldec: _codec.MsgDec = rx.pld_dec
|
||||||
|
# senity that default pld-spec should be set
|
||||||
|
assert (
|
||||||
|
rx.pld_dec
|
||||||
|
is
|
||||||
|
msgops._def_any_pldec
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with msgops.limit_plds(
|
||||||
|
spec=maybe_msg_spec,
|
||||||
|
) as pldec:
|
||||||
|
# sanity on `MsgDec` state
|
||||||
|
assert rx.pld_dec is pldec
|
||||||
|
assert pldec.spec is maybe_msg_spec
|
||||||
|
|
||||||
|
# 2 cases: hdndle send-side and recv-only validation
|
||||||
|
# - when `raise_on_started_mte == True`, send validate
|
||||||
|
# - else, parent-recv-side only validation
|
||||||
|
try:
|
||||||
|
await ctx.started(
|
||||||
|
value=started_value,
|
||||||
|
validate_pld_spec=validate_pld_spec,
|
||||||
|
)
|
||||||
|
|
||||||
|
except MsgTypeError:
|
||||||
|
log.exception('started()` raised an MTE!\n')
|
||||||
|
if not expect_started_mte:
|
||||||
|
raise RuntimeError(
|
||||||
|
'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n'
|
||||||
|
f'{started_value!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# propagate to parent?
|
||||||
|
if raise_on_started_mte:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
if expect_started_mte:
|
||||||
|
raise RuntimeError(
|
||||||
|
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
|
||||||
|
f'{started_value!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX should always fail on recv side since we can't
|
||||||
|
# really do much else beside terminate and relay the
|
||||||
|
# msg-type-error from this RPC task ;)
|
||||||
|
return return_value
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# sanity on `limit_plds()` reversion
|
||||||
|
assert (
|
||||||
|
rx.pld_dec
|
||||||
|
is
|
||||||
|
msgops._def_any_pldec
|
||||||
|
)
|
||||||
|
log.runtime(
|
||||||
|
'Reverted to previous pld-spec\n\n'
|
||||||
|
f'{orig_pldec}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'return_value',
|
||||||
|
[
|
||||||
|
None,
|
||||||
|
'yo',
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'return[invalid-"yo"]',
|
||||||
|
'return[valid-None]',
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'started_value',
|
||||||
|
[
|
||||||
|
10,
|
||||||
|
PldMsg(field='yo'),
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'Started[invalid-10]',
|
||||||
|
'Started[valid-PldMsg]',
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'pld_check_started_value',
|
||||||
|
[
|
||||||
|
True,
|
||||||
|
False,
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'check-started-pld',
|
||||||
|
'no-started-pld-validate',
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_basic_payload_spec(
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
return_value: str|None,
|
||||||
|
started_value: int|PldMsg,
|
||||||
|
pld_check_started_value: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Validate the most basic `PldRx` msg-type-spec semantics around
|
||||||
|
a IPC `Context` endpoint start, started-sync, and final return
|
||||||
|
value depending on set payload types and the currently applied
|
||||||
|
pld-spec.
|
||||||
|
|
||||||
|
'''
|
||||||
|
invalid_return: bool = return_value == 'yo'
|
||||||
|
invalid_started: bool = started_value == 10
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
) as an:
|
||||||
|
p: Portal = await an.start_actor(
|
||||||
|
'child',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
# since not opened yet.
|
||||||
|
assert current_ipc_ctx() is None
|
||||||
|
|
||||||
|
async with (
|
||||||
|
maybe_expect_raises(
|
||||||
|
raises=MsgTypeError if (
|
||||||
|
invalid_return
|
||||||
|
or
|
||||||
|
invalid_started
|
||||||
|
) else None,
|
||||||
|
ensure_in_message=[
|
||||||
|
"invalid `Return` payload",
|
||||||
|
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
|
||||||
|
],
|
||||||
|
),
|
||||||
|
p.open_context(
|
||||||
|
child,
|
||||||
|
return_value=return_value,
|
||||||
|
started_value=started_value,
|
||||||
|
pld_spec=maybe_msg_spec,
|
||||||
|
validate_pld_spec=pld_check_started_value,
|
||||||
|
) as (ctx, first),
|
||||||
|
):
|
||||||
|
# now opened with 'child' sub
|
||||||
|
assert current_ipc_ctx() is ctx
|
||||||
|
|
||||||
|
assert type(first) is PldMsg
|
||||||
|
assert first.field == 'yo'
|
||||||
|
|
||||||
|
try:
|
||||||
|
assert (await ctx.result()) is None
|
||||||
|
except MsgTypeError as mte:
|
||||||
|
if not invalid_return:
|
||||||
|
raise
|
||||||
|
|
||||||
|
else: # expected this invalid `Return.pld`
|
||||||
|
assert mte.cid == ctx.cid
|
||||||
|
|
||||||
|
# verify expected remote mte deats
|
||||||
|
await tractor.pause()
|
||||||
|
assert ctx._remote_error is mte
|
||||||
|
assert mte.expected_msg_type is Return
|
||||||
|
|
||||||
|
await p.cancel_actor()
|
||||||
|
|
||||||
|
trio.run(main)
|
|
@ -15,12 +15,22 @@
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
The fundamental cross process SC abstraction: an inter-actor,
|
The fundamental cross-process SC abstraction: an inter-actor,
|
||||||
cancel-scope linked task "context".
|
transitively cancel-scope linked, (dual) task IPC coupled "context".
|
||||||
|
|
||||||
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
|
A `Context` is very similar to the look and feel of the
|
||||||
into each ``trio.Nursery`` except it links the lifetimes of memory space
|
`.cancel_scope: trio.CancelScope` built into each `trio.Nursery`
|
||||||
disjoint, parallel executing tasks in separate actors.
|
except that it links the lifetimes of 2 memory space disjoint,
|
||||||
|
parallel executing, tasks scheduled in separate "actors".
|
||||||
|
|
||||||
|
So while a `trio.Nursery` has a `.parent_task` which exists both
|
||||||
|
before (open) and then inside the body of the `async with` of the
|
||||||
|
nursery's scope (/block), a `Context` contains 2 tasks, a "parent"
|
||||||
|
and a "child" side, where both execute independently in separate
|
||||||
|
memory domains of different (host's) processes linked through
|
||||||
|
a SC-transitive IPC "shuttle dialog protocol". The underlying IPC
|
||||||
|
dialog-(un)protocol allows for the maintainance of SC properties
|
||||||
|
end-2-end between the tasks.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
@ -71,13 +81,11 @@ from .msg import (
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
PayloadT,
|
PayloadT,
|
||||||
Return,
|
|
||||||
Started,
|
Started,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
current_codec,
|
current_codec,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
types as msgtypes,
|
|
||||||
_ops as msgops,
|
_ops as msgops,
|
||||||
)
|
)
|
||||||
from ._ipc import (
|
from ._ipc import (
|
||||||
|
@ -90,7 +98,7 @@ from ._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
_ctxvar_Context,
|
_ctxvar_Context,
|
||||||
)
|
)
|
||||||
|
# ------ - ------
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
@ -1598,16 +1606,15 @@ class Context:
|
||||||
async def started(
|
async def started(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
# TODO: how to type this so that it's the
|
|
||||||
# same as the payload type? Is this enough?
|
|
||||||
value: PayloadT|None = None,
|
value: PayloadT|None = None,
|
||||||
|
validate_pld_spec: bool = True,
|
||||||
|
strict_pld_parity: bool = False,
|
||||||
|
|
||||||
strict_parity: bool = False,
|
# TODO: this will always emit for msgpack for any () vs. []
|
||||||
|
# inside the value.. do we want to offer warnings on that?
|
||||||
|
# complain_no_parity: bool = False,
|
||||||
|
|
||||||
# TODO: this will always emit now that we do `.pld: Raw`
|
hide_tb: bool = True,
|
||||||
# passthrough.. so maybe just only complain when above strict
|
|
||||||
# flag is set?
|
|
||||||
complain_no_parity: bool = False,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -1648,63 +1655,54 @@ class Context:
|
||||||
#
|
#
|
||||||
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
|
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
|
||||||
#
|
#
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
|
if validate_pld_spec:
|
||||||
|
# __tracebackhide__: bool = False
|
||||||
codec: MsgCodec = current_codec()
|
codec: MsgCodec = current_codec()
|
||||||
msg_bytes: bytes = codec.encode(started_msg)
|
msg_bytes: bytes = codec.encode(started_msg)
|
||||||
try:
|
try:
|
||||||
# be a "cheap" dialog (see above!)
|
roundtripped: Started = codec.decode(msg_bytes)
|
||||||
|
# pld: PayloadT = await self.pld_rx.recv_pld(
|
||||||
|
pld: PayloadT = self.pld_rx.dec_msg(
|
||||||
|
msg=roundtripped,
|
||||||
|
ipc=self,
|
||||||
|
expect_msg=Started,
|
||||||
|
hide_tb=hide_tb,
|
||||||
|
is_started_send_side=True,
|
||||||
|
)
|
||||||
if (
|
if (
|
||||||
strict_parity
|
strict_pld_parity
|
||||||
or
|
and
|
||||||
complain_no_parity
|
pld != value
|
||||||
):
|
):
|
||||||
rt_started: Started = codec.decode(msg_bytes)
|
|
||||||
|
|
||||||
# XXX something is prolly totes cucked with the
|
|
||||||
# codec state!
|
|
||||||
if isinstance(rt_started, dict):
|
|
||||||
rt_started = msgtypes.from_dict_msg(
|
|
||||||
dict_msg=rt_started,
|
|
||||||
)
|
|
||||||
raise RuntimeError(
|
|
||||||
'Failed to roundtrip `Started` msg?\n'
|
|
||||||
f'{pretty_struct.pformat(rt_started)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if rt_started != started_msg:
|
|
||||||
# TODO: make that one a mod func too..
|
# TODO: make that one a mod func too..
|
||||||
diff = pretty_struct.Struct.__sub__(
|
diff = pretty_struct.Struct.__sub__(
|
||||||
rt_started,
|
roundtripped,
|
||||||
started_msg,
|
started_msg,
|
||||||
)
|
)
|
||||||
complaint: str = (
|
complaint: str = (
|
||||||
'Started value does not match after roundtrip?\n\n'
|
'Started value does not match after roundtrip?\n\n'
|
||||||
f'{diff}'
|
f'{diff}'
|
||||||
)
|
)
|
||||||
|
raise ValidationError(complaint)
|
||||||
# TODO: rn this will pretty much always fail with
|
|
||||||
# any other sequence type embeded in the
|
|
||||||
# payload...
|
|
||||||
if (
|
|
||||||
self._strict_started
|
|
||||||
or
|
|
||||||
strict_parity
|
|
||||||
):
|
|
||||||
raise ValueError(complaint)
|
|
||||||
else:
|
|
||||||
log.warning(complaint)
|
|
||||||
|
|
||||||
await self.chan.send(started_msg)
|
|
||||||
|
|
||||||
# raise any msg type error NO MATTER WHAT!
|
# raise any msg type error NO MATTER WHAT!
|
||||||
except ValidationError as verr:
|
except ValidationError as verr:
|
||||||
|
# always show this src frame in the tb
|
||||||
|
# __tracebackhide__: bool = False
|
||||||
raise _mk_msg_type_err(
|
raise _mk_msg_type_err(
|
||||||
msg=msg_bytes,
|
msg=roundtripped,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
src_validation_error=verr,
|
src_validation_error=verr,
|
||||||
verb_header='Trying to send payload'
|
verb_header='Trying to send ',
|
||||||
# > 'invalid `Started IPC msgs\n'
|
is_invalid_payload=True,
|
||||||
) from verr
|
) from verr
|
||||||
|
|
||||||
|
# TODO: maybe a flag to by-pass encode op if already done
|
||||||
|
# here in caller?
|
||||||
|
await self.chan.send(started_msg)
|
||||||
|
|
||||||
|
# set msg-related internal runtime-state
|
||||||
self._started_called = True
|
self._started_called = True
|
||||||
self._started_msg = started_msg
|
self._started_msg = started_msg
|
||||||
self._started_pld = value
|
self._started_pld = value
|
||||||
|
@ -1997,12 +1995,7 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
pld_spec: TypeAlias|None = None,
|
pld_spec: TypeAlias|None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
|
hide_tb: bool = True,
|
||||||
# TODO: if we set this the wrapping `@acm` body will
|
|
||||||
# still be shown (awkwardly) on pdb REPL entry. Ideally
|
|
||||||
# we can similarly annotate that frame to NOT show? for now
|
|
||||||
# we DO SHOW this frame since it's awkward ow..
|
|
||||||
hide_tb: bool = False,
|
|
||||||
|
|
||||||
# proxied to RPC
|
# proxied to RPC
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -2115,6 +2108,7 @@ async def open_context_from_portal(
|
||||||
ipc=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
passthrough_non_pld_msgs=False,
|
passthrough_non_pld_msgs=False,
|
||||||
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
|
|
||||||
# from .devx import pause
|
# from .devx import pause
|
||||||
|
|
|
@ -22,6 +22,9 @@ from __future__ import annotations
|
||||||
import builtins
|
import builtins
|
||||||
import importlib
|
import importlib
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
from types import (
|
||||||
|
TracebackType,
|
||||||
|
)
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
|
@ -92,26 +95,30 @@ _ipcmsg_keys: list[str] = [
|
||||||
fi.name
|
fi.name
|
||||||
for fi, k, v
|
for fi, k, v
|
||||||
in iter_fields(Error)
|
in iter_fields(Error)
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|
||||||
_body_fields: list[str] = list(
|
_body_fields: list[str] = list(
|
||||||
set(_ipcmsg_keys)
|
set(_ipcmsg_keys)
|
||||||
|
|
||||||
# NOTE: don't show fields that either don't provide
|
# XXX NOTE: DON'T-SHOW-FIELDS
|
||||||
# any extra useful info or that are already shown
|
# - don't provide any extra useful info or,
|
||||||
# as part of `.__repr__()` output.
|
# - are already shown as part of `.__repr__()` or,
|
||||||
|
# - are sub-type specific.
|
||||||
- {
|
- {
|
||||||
'src_type_str',
|
'src_type_str',
|
||||||
'boxed_type_str',
|
'boxed_type_str',
|
||||||
'tb_str',
|
'tb_str',
|
||||||
'relay_path',
|
'relay_path',
|
||||||
'_msg_dict',
|
|
||||||
'cid',
|
'cid',
|
||||||
|
|
||||||
# since only ctxc should show it but `Error` does
|
# only ctxc should show it but `Error` does
|
||||||
# have it as an optional field.
|
# have it as an optional field.
|
||||||
'canceller',
|
'canceller',
|
||||||
|
|
||||||
|
# only for MTEs and generally only used
|
||||||
|
# when devving/testing/debugging.
|
||||||
|
'_msg_dict',
|
||||||
|
'_bad_msg',
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,6 +153,7 @@ def pack_from_raise(
|
||||||
|MsgTypeError
|
|MsgTypeError
|
||||||
),
|
),
|
||||||
cid: str,
|
cid: str,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
|
||||||
**rae_fields,
|
**rae_fields,
|
||||||
|
|
||||||
|
@ -156,7 +164,7 @@ def pack_from_raise(
|
||||||
`Error`-msg using `pack_error()` to extract the tb info.
|
`Error`-msg using `pack_error()` to extract the tb info.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = hide_tb
|
||||||
try:
|
try:
|
||||||
raise local_err
|
raise local_err
|
||||||
except type(local_err) as local_err:
|
except type(local_err) as local_err:
|
||||||
|
@ -231,7 +239,8 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
if (
|
if (
|
||||||
extra_msgdata
|
extra_msgdata
|
||||||
and ipc_msg
|
and
|
||||||
|
ipc_msg
|
||||||
):
|
):
|
||||||
# XXX mutate the orig msg directly from
|
# XXX mutate the orig msg directly from
|
||||||
# manually provided input params.
|
# manually provided input params.
|
||||||
|
@ -261,17 +270,16 @@ class RemoteActorError(Exception):
|
||||||
# either by customizing `ContextCancelled.__init__()` or
|
# either by customizing `ContextCancelled.__init__()` or
|
||||||
# through a special factor func?
|
# through a special factor func?
|
||||||
elif boxed_type:
|
elif boxed_type:
|
||||||
boxed_type_str: str = type(boxed_type).__name__
|
boxed_type_str: str = boxed_type.__name__
|
||||||
if (
|
if (
|
||||||
ipc_msg
|
ipc_msg
|
||||||
and not self._ipc_msg.boxed_type_str
|
and
|
||||||
|
self._ipc_msg.boxed_type_str != boxed_type_str
|
||||||
):
|
):
|
||||||
self._ipc_msg.boxed_type_str = boxed_type_str
|
self._ipc_msg.boxed_type_str = boxed_type_str
|
||||||
assert self.boxed_type_str == self._ipc_msg.boxed_type_str
|
assert self.boxed_type_str == self._ipc_msg.boxed_type_str
|
||||||
|
|
||||||
else:
|
# ensure any roundtripping evals to the input value
|
||||||
self._extra_msgdata['boxed_type_str'] = boxed_type_str
|
|
||||||
|
|
||||||
assert self.boxed_type is boxed_type
|
assert self.boxed_type is boxed_type
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -309,7 +317,9 @@ class RemoteActorError(Exception):
|
||||||
if self._ipc_msg
|
if self._ipc_msg
|
||||||
else {}
|
else {}
|
||||||
)
|
)
|
||||||
return self._extra_msgdata | msgdata
|
return {
|
||||||
|
k: v for k, v in self._extra_msgdata.items()
|
||||||
|
} | msgdata
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def src_type_str(self) -> str:
|
def src_type_str(self) -> str:
|
||||||
|
@ -502,6 +512,8 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
header: str = ''
|
header: str = ''
|
||||||
|
body: str = ''
|
||||||
|
|
||||||
if with_type_header:
|
if with_type_header:
|
||||||
header: str = f'<{type(self).__name__}(\n'
|
header: str = f'<{type(self).__name__}(\n'
|
||||||
|
|
||||||
|
@ -525,24 +537,22 @@ class RemoteActorError(Exception):
|
||||||
)
|
)
|
||||||
if not with_type_header:
|
if not with_type_header:
|
||||||
body = '\n' + body
|
body = '\n' + body
|
||||||
else:
|
|
||||||
first: str = ''
|
|
||||||
message: str = self._message
|
|
||||||
|
|
||||||
|
elif message := self._message:
|
||||||
# split off the first line so it isn't indented
|
# split off the first line so it isn't indented
|
||||||
# the same like the "boxed content".
|
# the same like the "boxed content".
|
||||||
if not with_type_header:
|
if not with_type_header:
|
||||||
lines: list[str] = message.splitlines()
|
lines: list[str] = message.splitlines()
|
||||||
first = lines[0]
|
first: str = lines[0]
|
||||||
message = ''.join(lines[1:])
|
message: str = message.removeprefix(first)
|
||||||
|
|
||||||
|
else:
|
||||||
|
first: str = ''
|
||||||
|
|
||||||
body: str = (
|
body: str = (
|
||||||
first
|
first
|
||||||
+
|
+
|
||||||
textwrap.indent(
|
message
|
||||||
message,
|
|
||||||
prefix=' ',
|
|
||||||
)
|
|
||||||
+
|
+
|
||||||
'\n'
|
'\n'
|
||||||
)
|
)
|
||||||
|
@ -708,52 +718,72 @@ class MsgTypeError(
|
||||||
]
|
]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def msg_dict(self) -> dict[str, Any]:
|
def bad_msg(self) -> PayloadMsg|None:
|
||||||
'''
|
'''
|
||||||
If the underlying IPC `MsgType` was received from a remote
|
Ref to the the original invalid IPC shuttle msg which failed
|
||||||
actor but was unable to be decoded to a native
|
to decode thus providing for the reason for this error.
|
||||||
`Yield`|`Started`|`Return` struct, the interchange backend
|
|
||||||
native format decoder can be used to stash a `dict`
|
|
||||||
version for introspection by the invalidating RPC task.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.msgdata.get('_msg_dict')
|
if (
|
||||||
|
(_bad_msg := self.msgdata.get('_bad_msg'))
|
||||||
|
and
|
||||||
|
isinstance(_bad_msg, PayloadMsg)
|
||||||
|
):
|
||||||
|
return _bad_msg
|
||||||
|
|
||||||
@property
|
elif bad_msg_dict := self.bad_msg_as_dict:
|
||||||
def expected_msg(self) -> MsgType|None:
|
|
||||||
'''
|
|
||||||
Attempt to construct what would have been the original
|
|
||||||
`MsgType`-with-payload subtype (i.e. an instance from the set
|
|
||||||
of msgs in `.msg.types._payload_msgs`) which failed
|
|
||||||
validation.
|
|
||||||
|
|
||||||
'''
|
|
||||||
if msg_dict := self.msg_dict.copy():
|
|
||||||
return msgtypes.from_dict_msg(
|
return msgtypes.from_dict_msg(
|
||||||
dict_msg=msg_dict,
|
dict_msg=bad_msg_dict.copy(),
|
||||||
# use_pretty=True,
|
# use_pretty=True,
|
||||||
# ^-TODO-^ would luv to use this BUT then the
|
# ^-TODO-^ would luv to use this BUT then the
|
||||||
# `field_prefix` in `pformat_boxed_tb()` cucks it
|
# `field_prefix` in `pformat_boxed_tb()` cucks it
|
||||||
# all up.. XD
|
# all up.. XD
|
||||||
)
|
)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def bad_msg_as_dict(self) -> dict[str, Any]:
|
||||||
|
'''
|
||||||
|
If the underlying IPC `MsgType` was received from a remote
|
||||||
|
actor but was unable to be decoded to a native `PayloadMsg`
|
||||||
|
(`Yield`|`Started`|`Return`) struct, the interchange backend
|
||||||
|
native format decoder can be used to stash a `dict` version
|
||||||
|
for introspection by the invalidating RPC task.
|
||||||
|
|
||||||
|
Optionally when this error is constructed from
|
||||||
|
`.from_decode()` the caller can attempt to construct what
|
||||||
|
would have been the original `MsgType`-with-payload subtype
|
||||||
|
(i.e. an instance from the set of msgs in
|
||||||
|
`.msg.types._payload_msgs`) which failed validation.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self.msgdata.get('_bad_msg_as_dict')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def expected_msg_type(self) -> Type[MsgType]|None:
|
def expected_msg_type(self) -> Type[MsgType]|None:
|
||||||
return type(self.expected_msg)
|
return type(self.bad_msg)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def cid(self) -> str:
|
def cid(self) -> str:
|
||||||
# pre-packed using `.from_decode()` constructor
|
# pull from required `.bad_msg` ref (or src dict)
|
||||||
return self.msgdata.get('cid')
|
if bad_msg := self.bad_msg:
|
||||||
|
return bad_msg.cid
|
||||||
|
|
||||||
|
return self.msgdata['cid']
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_decode(
|
def from_decode(
|
||||||
cls,
|
cls,
|
||||||
message: str,
|
message: str,
|
||||||
|
|
||||||
ipc_msg: PayloadMsg|None = None,
|
bad_msg: PayloadMsg|None = None,
|
||||||
msgdict: dict|None = None,
|
bad_msg_as_dict: dict|None = None,
|
||||||
|
|
||||||
|
# if provided, expand and pack all RAE compat fields into the
|
||||||
|
# `._extra_msgdata` auxillary data `dict` internal to
|
||||||
|
# `RemoteActorError`.
|
||||||
|
**extra_msgdata,
|
||||||
|
|
||||||
) -> MsgTypeError:
|
) -> MsgTypeError:
|
||||||
'''
|
'''
|
||||||
|
@ -763,25 +793,44 @@ class MsgTypeError(
|
||||||
(which is normally the caller of this).
|
(which is normally the caller of this).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# if provided, expand and pack all RAE compat fields into the
|
if bad_msg_as_dict:
|
||||||
# `._extra_msgdata` auxillary data `dict` internal to
|
|
||||||
# `RemoteActorError`.
|
|
||||||
extra_msgdata: dict = {}
|
|
||||||
if msgdict:
|
|
||||||
extra_msgdata: dict = {
|
|
||||||
k: v
|
|
||||||
for k, v in msgdict.items()
|
|
||||||
if k in _ipcmsg_keys
|
|
||||||
}
|
|
||||||
# NOTE: original "vanilla decode" of the msg-bytes
|
# NOTE: original "vanilla decode" of the msg-bytes
|
||||||
# is placed inside a value readable from
|
# is placed inside a value readable from
|
||||||
# `.msgdata['_msg_dict']`
|
# `.msgdata['_msg_dict']`
|
||||||
extra_msgdata['_msg_dict'] = msgdict
|
extra_msgdata['_bad_msg_as_dict'] = bad_msg_as_dict
|
||||||
|
|
||||||
|
# scrape out any underlying fields from the
|
||||||
|
# msg that failed validation.
|
||||||
|
for k, v in bad_msg_as_dict.items():
|
||||||
|
if (
|
||||||
|
# always skip a duplicate entry
|
||||||
|
# if already provided as an arg
|
||||||
|
k == '_bad_msg' and bad_msg
|
||||||
|
or
|
||||||
|
# skip anything not in the default msg-field set.
|
||||||
|
k not in _ipcmsg_keys
|
||||||
|
# k not in _body_fields
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
extra_msgdata[k] = v
|
||||||
|
|
||||||
|
|
||||||
|
elif bad_msg:
|
||||||
|
if not isinstance(bad_msg, PayloadMsg):
|
||||||
|
raise TypeError(
|
||||||
|
'The provided `bad_msg` is not a `PayloadMsg` type?\n\n'
|
||||||
|
f'{bad_msg}'
|
||||||
|
)
|
||||||
|
extra_msgdata['_bad_msg'] = bad_msg
|
||||||
|
extra_msgdata['cid'] = bad_msg.cid
|
||||||
|
|
||||||
|
if 'cid' not in extra_msgdata:
|
||||||
|
import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
return cls(
|
return cls(
|
||||||
message=message,
|
message=message,
|
||||||
boxed_type=cls,
|
boxed_type=cls,
|
||||||
ipc_msg=ipc_msg,
|
|
||||||
**extra_msgdata,
|
**extra_msgdata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -836,9 +885,10 @@ class MessagingError(Exception):
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException|RemoteActorError,
|
exc: BaseException|RemoteActorError,
|
||||||
|
|
||||||
tb: str|None = None,
|
|
||||||
cid: str|None = None,
|
cid: str|None = None,
|
||||||
src_uid: tuple[str, str]|None = None,
|
src_uid: tuple[str, str]|None = None,
|
||||||
|
tb: TracebackType|None = None,
|
||||||
|
tb_str: str = '',
|
||||||
|
|
||||||
) -> Error:
|
) -> Error:
|
||||||
'''
|
'''
|
||||||
|
@ -848,10 +898,28 @@ def pack_error(
|
||||||
the receiver side using `unpack_error()` below.
|
the receiver side using `unpack_error()` below.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if tb:
|
if not tb_str:
|
||||||
tb_str = ''.join(traceback.format_tb(tb))
|
tb_str: str = (
|
||||||
|
''.join(traceback.format_exception(exc))
|
||||||
|
|
||||||
|
# TODO: can we remove this is `exc` is required?
|
||||||
|
or
|
||||||
|
# NOTE: this is just a shorthand for the "last error" as
|
||||||
|
# provided by `sys.exeception()`, see:
|
||||||
|
# - https://docs.python.org/3/library/traceback.html#traceback.print_exc
|
||||||
|
# - https://docs.python.org/3/library/traceback.html#traceback.format_exc
|
||||||
|
traceback.format_exc()
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
tb_str = traceback.format_exc()
|
if tb_str[-2:] != '\n':
|
||||||
|
tb_str += '\n'
|
||||||
|
|
||||||
|
# when caller provides a tb instance (say pulled from some other
|
||||||
|
# src error's `.__traceback__`) we use that as the "boxed"
|
||||||
|
# tb-string instead.
|
||||||
|
if tb:
|
||||||
|
# https://docs.python.org/3/library/traceback.html#traceback.format_list
|
||||||
|
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
|
||||||
|
|
||||||
error_msg: dict[ # for IPC
|
error_msg: dict[ # for IPC
|
||||||
str,
|
str,
|
||||||
|
@ -1115,7 +1183,7 @@ def _mk_msg_type_err(
|
||||||
src_validation_error: ValidationError|None = None,
|
src_validation_error: ValidationError|None = None,
|
||||||
src_type_error: TypeError|None = None,
|
src_type_error: TypeError|None = None,
|
||||||
is_invalid_payload: bool = False,
|
is_invalid_payload: bool = False,
|
||||||
src_err_msg: Error|None = None,
|
# src_err_msg: Error|None = None,
|
||||||
|
|
||||||
**mte_kwargs,
|
**mte_kwargs,
|
||||||
|
|
||||||
|
@ -1164,10 +1232,10 @@ def _mk_msg_type_err(
|
||||||
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
msgtyperr = MsgTypeError(
|
msgtyperr = MsgTypeError(
|
||||||
message=message,
|
message=message,
|
||||||
ipc_msg=msg,
|
ipc_msg=msg,
|
||||||
|
bad_msg=msg,
|
||||||
)
|
)
|
||||||
# ya, might be `None`
|
# ya, might be `None`
|
||||||
msgtyperr.__cause__ = src_type_error
|
msgtyperr.__cause__ = src_type_error
|
||||||
|
@ -1175,6 +1243,9 @@ def _mk_msg_type_err(
|
||||||
|
|
||||||
# `Channel.recv()` case
|
# `Channel.recv()` case
|
||||||
else:
|
else:
|
||||||
|
msg_dict: dict|None = None
|
||||||
|
bad_msg: PayloadMsg|None = None
|
||||||
|
|
||||||
if is_invalid_payload:
|
if is_invalid_payload:
|
||||||
msg_type: str = type(msg)
|
msg_type: str = type(msg)
|
||||||
any_pld: Any = msgpack.decode(msg.pld)
|
any_pld: Any = msgpack.decode(msg.pld)
|
||||||
|
@ -1186,19 +1257,20 @@ def _mk_msg_type_err(
|
||||||
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
|
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
|
||||||
# f')>\n\n'
|
# f')>\n\n'
|
||||||
)
|
)
|
||||||
|
# src_err_msg = msg
|
||||||
|
bad_msg = msg
|
||||||
# TODO: should we just decode the msg to a dict despite
|
# TODO: should we just decode the msg to a dict despite
|
||||||
# only the payload being wrong?
|
# only the payload being wrong?
|
||||||
# -[ ] maybe the better design is to break this construct
|
# -[ ] maybe the better design is to break this construct
|
||||||
# logic into a separate explicit helper raiser-func?
|
# logic into a separate explicit helper raiser-func?
|
||||||
msg_dict = None
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
msg: bytes
|
|
||||||
# decode the msg-bytes using the std msgpack
|
# decode the msg-bytes using the std msgpack
|
||||||
# interchange-prot (i.e. without any
|
# interchange-prot (i.e. without any `msgspec.Struct`
|
||||||
# `msgspec.Struct` handling) so that we can
|
# handling) so that we can determine what
|
||||||
# determine what `.msg.types.Msg` is the culprit
|
# `.msg.types.PayloadMsg` is the culprit by reporting the
|
||||||
# by reporting the received value.
|
# received value.
|
||||||
|
msg: bytes
|
||||||
msg_dict: dict = msgpack.decode(msg)
|
msg_dict: dict = msgpack.decode(msg)
|
||||||
msg_type_name: str = msg_dict['msg_type']
|
msg_type_name: str = msg_dict['msg_type']
|
||||||
msg_type = getattr(msgtypes, msg_type_name)
|
msg_type = getattr(msgtypes, msg_type_name)
|
||||||
|
@ -1235,9 +1307,13 @@ def _mk_msg_type_err(
|
||||||
if verb_header:
|
if verb_header:
|
||||||
message = f'{verb_header} ' + message
|
message = f'{verb_header} ' + message
|
||||||
|
|
||||||
|
# if not isinstance(bad_msg, PayloadMsg):
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
msgtyperr = MsgTypeError.from_decode(
|
msgtyperr = MsgTypeError.from_decode(
|
||||||
message=message,
|
message=message,
|
||||||
msgdict=msg_dict,
|
bad_msg=bad_msg,
|
||||||
|
bad_msg_as_dict=msg_dict,
|
||||||
|
|
||||||
# NOTE: for the send-side `.started()` pld-validate
|
# NOTE: for the send-side `.started()` pld-validate
|
||||||
# case we actually set the `._ipc_msg` AFTER we return
|
# case we actually set the `._ipc_msg` AFTER we return
|
||||||
|
@ -1245,7 +1321,7 @@ def _mk_msg_type_err(
|
||||||
# want to emulate the `Error` from the mte we build here
|
# want to emulate the `Error` from the mte we build here
|
||||||
# Bo
|
# Bo
|
||||||
# so by default in that case this is set to `None`
|
# so by default in that case this is set to `None`
|
||||||
ipc_msg=src_err_msg,
|
# ipc_msg=src_err_msg,
|
||||||
)
|
)
|
||||||
msgtyperr.__cause__ = src_validation_error
|
msgtyperr.__cause__ = src_validation_error
|
||||||
return msgtyperr
|
return msgtyperr
|
||||||
|
|
|
@ -47,7 +47,7 @@ from tractor._exceptions import (
|
||||||
_raise_from_unexpected_msg,
|
_raise_from_unexpected_msg,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
_mk_msg_type_err,
|
_mk_msg_type_err,
|
||||||
pack_from_raise,
|
pack_error,
|
||||||
)
|
)
|
||||||
from tractor._state import current_ipc_ctx
|
from tractor._state import current_ipc_ctx
|
||||||
from ._codec import (
|
from ._codec import (
|
||||||
|
@ -203,7 +203,6 @@ class PldRx(Struct):
|
||||||
msg: MsgType = (
|
msg: MsgType = (
|
||||||
ipc_msg
|
ipc_msg
|
||||||
or
|
or
|
||||||
|
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ipc._rx_chan.receive()
|
await ipc._rx_chan.receive()
|
||||||
)
|
)
|
||||||
|
@ -223,6 +222,10 @@ class PldRx(Struct):
|
||||||
raise_error: bool = True,
|
raise_error: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
|
# XXX for special (default?) case of send side call with
|
||||||
|
# `Context.started(validate_pld_spec=True)`
|
||||||
|
is_started_send_side: bool = False,
|
||||||
|
|
||||||
) -> PayloadT|Raw:
|
) -> PayloadT|Raw:
|
||||||
'''
|
'''
|
||||||
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
|
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
|
||||||
|
@ -230,8 +233,6 @@ class PldRx(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
_src_err = None
|
|
||||||
src_err: BaseException|None = None
|
src_err: BaseException|None = None
|
||||||
match msg:
|
match msg:
|
||||||
# payload-data shuttle msg; deliver the `.pld` value
|
# payload-data shuttle msg; deliver the `.pld` value
|
||||||
|
@ -256,18 +257,58 @@ class PldRx(Struct):
|
||||||
# pack mgterr into error-msg for
|
# pack mgterr into error-msg for
|
||||||
# reraise below; ensure remote-actor-err
|
# reraise below; ensure remote-actor-err
|
||||||
# info is displayed nicely?
|
# info is displayed nicely?
|
||||||
msgterr: MsgTypeError = _mk_msg_type_err(
|
mte: MsgTypeError = _mk_msg_type_err(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
codec=self.pld_dec,
|
codec=self.pld_dec,
|
||||||
src_validation_error=valerr,
|
src_validation_error=valerr,
|
||||||
is_invalid_payload=True,
|
is_invalid_payload=True,
|
||||||
|
expected_msg=expect_msg,
|
||||||
|
# ipc_msg=msg,
|
||||||
)
|
)
|
||||||
msg: Error = pack_from_raise(
|
# NOTE: override the `msg` passed to
|
||||||
local_err=msgterr,
|
# `_raise_from_unexpected_msg()` (below) so so that
|
||||||
|
# we're effectively able to use that same func to
|
||||||
|
# unpack and raise an "emulated remote `Error`" of
|
||||||
|
# this local MTE.
|
||||||
|
err_msg: Error = pack_error(
|
||||||
|
exc=mte,
|
||||||
cid=msg.cid,
|
cid=msg.cid,
|
||||||
src_uid=ipc.chan.uid,
|
src_uid=(
|
||||||
|
ipc.chan.uid
|
||||||
|
if not is_started_send_side
|
||||||
|
else ipc._actor.uid
|
||||||
|
),
|
||||||
|
# tb=valerr.__traceback__,
|
||||||
|
tb_str=mte._message,
|
||||||
)
|
)
|
||||||
|
# ^-TODO-^ just raise this inline instead of all the
|
||||||
|
# pack-unpack-repack non-sense!
|
||||||
|
|
||||||
|
mte._ipc_msg = err_msg
|
||||||
|
msg = err_msg
|
||||||
|
|
||||||
|
# set emulated remote error more-or-less as the
|
||||||
|
# runtime would
|
||||||
|
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
|
|
||||||
|
# TODO: should we instead make this explicit and
|
||||||
|
# use the above masked `is_started_send_decode`,
|
||||||
|
# expecting the `Context.started()` caller to set
|
||||||
|
# it? Rn this is kinda, howyousayyy, implicitly
|
||||||
|
# edge-case-y..
|
||||||
|
if (
|
||||||
|
expect_msg is not Started
|
||||||
|
and not is_started_send_side
|
||||||
|
):
|
||||||
|
ctx._maybe_cancel_and_set_remote_error(mte)
|
||||||
|
|
||||||
|
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
|
||||||
|
# raises the boxed `err_msg` from above it raises
|
||||||
|
# it from `None`.
|
||||||
src_err = valerr
|
src_err = valerr
|
||||||
|
# if is_started_send_side:
|
||||||
|
# src_err = None
|
||||||
|
|
||||||
|
|
||||||
# XXX some other decoder specific failure?
|
# XXX some other decoder specific failure?
|
||||||
# except TypeError as src_error:
|
# except TypeError as src_error:
|
||||||
|
@ -379,6 +420,7 @@ class PldRx(Struct):
|
||||||
# NOTE: generally speaking only for handling `Stop`-msgs that
|
# NOTE: generally speaking only for handling `Stop`-msgs that
|
||||||
# arrive during a call to `drain_to_final_msg()` above!
|
# arrive during a call to `drain_to_final_msg()` above!
|
||||||
passthrough_non_pld_msgs: bool = True,
|
passthrough_non_pld_msgs: bool = True,
|
||||||
|
hide_tb: bool = True,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tuple[MsgType, PayloadT]:
|
) -> tuple[MsgType, PayloadT]:
|
||||||
|
@ -387,6 +429,7 @@ class PldRx(Struct):
|
||||||
the pair of refs.
|
the pair of refs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
msg: MsgType = await ipc._rx_chan.receive()
|
msg: MsgType = await ipc._rx_chan.receive()
|
||||||
|
|
||||||
if passthrough_non_pld_msgs:
|
if passthrough_non_pld_msgs:
|
||||||
|
@ -401,6 +444,7 @@ class PldRx(Struct):
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
|
hide_tb=hide_tb,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
return msg, pld
|
return msg, pld
|
||||||
|
@ -414,7 +458,7 @@ def limit_plds(
|
||||||
) -> MsgDec:
|
) -> MsgDec:
|
||||||
'''
|
'''
|
||||||
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
||||||
`Msg.pld: Union[Type[Struct]]` payload fields using
|
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using
|
||||||
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
||||||
for all IPC contexts in use by the current `trio.Task`.
|
for all IPC contexts in use by the current `trio.Task`.
|
||||||
|
|
||||||
|
@ -691,3 +735,11 @@ async def drain_to_final_msg(
|
||||||
result_msg,
|
result_msg,
|
||||||
pre_result_drained,
|
pre_result_drained,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: factor logic from `.Context.started()` for send-side
|
||||||
|
# validate raising!
|
||||||
|
def validate_payload_msg(
|
||||||
|
msg: Started|Yield|Return,
|
||||||
|
) -> MsgTypeError|None:
|
||||||
|
...
|
||||||
|
|
Loading…
Reference in New Issue