Compare commits

..

No commits in common. "0e8c60ee4aab56c4668f192b541bd7804256e6f1" and "eee4c61b51e6d3053549e67650be04fcd03ab2d5" have entirely different histories.

16 changed files with 312 additions and 522 deletions

View File

@ -4,15 +4,9 @@ import trio
async def breakpoint_forever(): async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor." "Indefinitely re-enter debugger in child actor."
try: while True:
while True: yield 'yo'
yield 'yo' await tractor.breakpoint()
await tractor.breakpoint()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'
)
raise
async def name_error(): async def name_error():
@ -25,7 +19,7 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='cancel', loglevel='error',
) as n: ) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__]) p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -45,7 +45,6 @@ async def spawn_until(depth=0):
) )
# TODO: notes on the new boxed-relayed errors through proxy actors
async def main(): async def main():
"""The main ``tractor`` routine. """The main ``tractor`` routine.

View File

@ -23,6 +23,5 @@ async def main():
n.start_soon(debug_actor.run, die) n.start_soon(debug_actor.run, die)
n.start_soon(crash_boi.run, die) n.start_soon(crash_boi.run, die)
if __name__ == '__main__': if __name__ == '__main__':
trio.run(main) trio.run(main)

View File

@ -2,13 +2,10 @@ import trio
import tractor import tractor
async def main( async def main():
registry_addrs: tuple[str, int]|None = None
):
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=True, debug_mode=True,
# loglevel='runtime',
): ):
while True: while True:
await tractor.breakpoint() await tractor.breakpoint()

View File

@ -3,26 +3,16 @@ import tractor
async def name_error(): async def name_error():
getattr(doggypants) # noqa (on purpose) getattr(doggypants)
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
# loglevel='transport', ) as n:
) as an:
# TODO: ideally the REPL arrives at this frame in the parent, portal = await n.run_in_actor(name_error)
# ABOVE the @api_frame of `Portal.run_in_actor()` (which await portal.result()
# should eventually not even be a portal method ... XD)
# await tractor.pause()
p: tractor.Portal = await an.run_in_actor(name_error)
# with this style, should raise on this line
await p.result()
# with this alt style should raise at `open_nusery()`
# return await p.result()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -7,7 +7,7 @@ def sync_pause(
error: bool = False, error: bool = False,
): ):
if use_builtin: if use_builtin:
breakpoint(hide_tb=False) breakpoint()
else: else:
tractor.pause_from_sync() tractor.pause_from_sync()
@ -20,20 +20,18 @@ def sync_pause(
async def start_n_sync_pause( async def start_n_sync_pause(
ctx: tractor.Context, ctx: tractor.Context,
): ):
actor: tractor.Actor = tractor.current_actor() # sync to requesting peer
# sync to parent-side task
await ctx.started() await ctx.started()
actor: tractor.Actor = tractor.current_actor()
print(f'entering SYNC PAUSE in {actor.uid}') print(f'entering SYNC PAUSE in {actor.uid}')
sync_pause() sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}') print(f'back from SYNC PAUSE in {actor.uid}')
async def main() -> None: async def main() -> None:
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True, debug_mode=True,
) as an: ) as an:

View File

@ -39,7 +39,8 @@ msgspec='^0.18.5' # interchange
wrapt = "^1.16.0" # decorators wrapt = "^1.16.0" # decorators
colorlog = "^6.8.2" # logging colorlog = "^6.8.2" # logging
# built-in multi-actor `pdb` REPL # .devx tooling
stackscope = "^0.2.2"
pdbp = "^1.5.0" pdbp = "^1.5.0"
@ -48,19 +49,15 @@ pdbp = "^1.5.0"
# 'pyroute2 # 'pyroute2
# ------ - ------ # ------ - ------
xontrib-vox = "^0.0.1"
[tool.poetry.group.dev] [tool.poetry.group.dev]
optional = false optional = false
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
# testing
pytest = "^8.2.0" pytest = "^8.2.0"
pexpect = "^4.9.0" pexpect = "^4.9.0"
# .devx tooling # only for xonsh as sh..
greenback = "^1.2.1"
stackscope = "^0.2.2"
# (light) xonsh usage/integration
xontrib-vox = "^0.0.1" xontrib-vox = "^0.0.1"
prompt-toolkit = "^3.0.43" prompt-toolkit = "^3.0.43"
xonsh-vox-tabcomplete = "^0.5" xonsh-vox-tabcomplete = "^0.5"

View File

@ -6,19 +6,30 @@ related settings around IPC contexts.
''' '''
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
contextmanager as cm,
)
# import typing
from typing import (
# Any,
TypeAlias,
# Union,
) )
from contextvars import ( from contextvars import (
Context, Context,
) )
from msgspec import ( from msgspec import (
# structs,
# msgpack,
Struct, Struct,
# ValidationError,
) )
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor import ( from tractor import (
# _state,
MsgTypeError, MsgTypeError,
current_ipc_ctx, current_ipc_ctx,
Portal, Portal,
@ -29,9 +40,20 @@ from tractor.msg import (
) )
from tractor.msg import ( from tractor.msg import (
_codec, _codec,
# _ctxvar_MsgCodec,
# NamespacePath,
# MsgCodec,
# mk_codec,
# apply_codec,
# current_codec,
) )
from tractor.msg.types import ( from tractor.msg.types import (
log, log,
# _payload_msgs,
# PayloadMsg,
# Started,
# mk_msg_spec,
) )
@ -42,10 +64,23 @@ class PldMsg(Struct):
maybe_msg_spec = PldMsg|None maybe_msg_spec = PldMsg|None
@cm
def custom_spec(
ctx: Context,
spec: TypeAlias,
) -> _codec.MsgCodec:
'''
Apply a custom payload spec, remove on exit.
'''
rx: msgops.PldRx = ctx._pld_rx
@acm @acm
async def maybe_expect_raises( async def maybe_expect_raises(
raises: BaseException|None = None, raises: BaseException|None = None,
ensure_in_message: list[str]|None = None, ensure_in_message: list[str]|None = None,
reraise: bool = False, reraise: bool = False,
timeout: int = 3, timeout: int = 3,
) -> None: ) -> None:
@ -53,9 +88,6 @@ async def maybe_expect_raises(
Async wrapper for ensuring errors propagate from the inner scope. Async wrapper for ensuring errors propagate from the inner scope.
''' '''
if tractor._state.debug_mode():
timeout += 999
with trio.fail_after(timeout): with trio.fail_after(timeout):
try: try:
yield yield
@ -71,10 +103,9 @@ async def maybe_expect_raises(
# maybe check for error txt content # maybe check for error txt content
if ensure_in_message: if ensure_in_message:
part: str part: str
err_repr: str = repr(inner_err)
for part in ensure_in_message: for part in ensure_in_message:
for i, arg in enumerate(inner_err.args): for i, arg in enumerate(inner_err.args):
if part in err_repr: if part in arg:
break break
# if part never matches an arg, then we're # if part never matches an arg, then we're
# missing a match. # missing a match.
@ -101,7 +132,7 @@ async def child(
ctx: Context, ctx: Context,
started_value: int|PldMsg|None, started_value: int|PldMsg|None,
return_value: str|None, return_value: str|None,
validate_pld_spec: bool, validate_pld_spec: bool,
raise_on_started_mte: bool = True, raise_on_started_mte: bool = True,
) -> None: ) -> None:
@ -135,15 +166,13 @@ async def child(
# 2 cases: hdndle send-side and recv-only validation # 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate # - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation # - else, parent-recv-side only validation
mte: MsgTypeError|None = None
try: try:
await ctx.started( await ctx.started(
value=started_value, value=started_value,
validate_pld_spec=validate_pld_spec, validate_pld_spec=validate_pld_spec,
) )
except MsgTypeError as _mte: except MsgTypeError:
mte = _mte
log.exception('started()` raised an MTE!\n') log.exception('started()` raised an MTE!\n')
if not expect_started_mte: if not expect_started_mte:
raise RuntimeError( raise RuntimeError(
@ -151,60 +180,15 @@ async def child(
f'{started_value!r}\n' f'{started_value!r}\n'
) )
boxed_div: str = '------ - ------'
assert boxed_div not in mte._message
assert boxed_div not in mte.tb_str
assert boxed_div not in repr(mte)
assert boxed_div not in str(mte)
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
# since this is a *local error* there should be no
# boxed traceback content!
assert not mte.tb_str
# propagate to parent? # propagate to parent?
if raise_on_started_mte: if raise_on_started_mte:
raise raise
else:
# no-send-side-error fallthrough if expect_started_mte:
if ( raise RuntimeError(
validate_pld_spec 'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
and f'{started_value!r}\n'
expect_started_mte )
):
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
assert (
not expect_started_mte
or
not validate_pld_spec
)
# if wait_for_parent_to_cancel:
# ...
#
# ^-TODO-^ logic for diff validation policies on each side:
#
# -[ ] ensure that if we don't validate on the send
# side, that we are eventually error-cancelled by our
# parent due to the bad `Started` payload!
# -[ ] the boxed error should be srced from the parent's
# runtime NOT ours!
# -[ ] we should still error on bad `return_value`s
# despite the parent not yet error-cancelling us?
# |_ how do we want the parent side to look in that
# case?
# -[ ] maybe the equiv of "during handling of the
# above error another occurred" for the case where
# the parent sends a MTE to this child and while
# waiting for the child to terminate it gets back
# the MTE for this case?
#
# XXX should always fail on recv side since we can't # XXX should always fail on recv side since we can't
# really do much else beside terminate and relay the # really do much else beside terminate and relay the
@ -227,8 +211,8 @@ async def child(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'return_value', 'return_value',
[ [
'yo',
None, None,
'yo',
], ],
ids=[ ids=[
'return[invalid-"yo"]', 'return[invalid-"yo"]',
@ -287,32 +271,16 @@ def test_basic_payload_spec(
# since not opened yet. # since not opened yet.
assert current_ipc_ctx() is None assert current_ipc_ctx() is None
if invalid_started:
msg_type_str: str = 'Started'
bad_value_str: str = '10'
elif invalid_return:
msg_type_str: str = 'Return'
bad_value_str: str = "'yo'"
else:
# XXX but should never be used below then..
msg_type_str: str = ''
bad_value_str: str = ''
maybe_mte: MsgTypeError|None = None
should_raise: Exception|None = (
MsgTypeError if (
invalid_return
or
invalid_started
) else None
)
async with ( async with (
maybe_expect_raises( maybe_expect_raises(
raises=should_raise, raises=MsgTypeError if (
invalid_return
or
invalid_started
) else None,
ensure_in_message=[ ensure_in_message=[
f"invalid `{msg_type_str}` msg payload", "invalid `Return` payload",
f"value: `{bad_value_str}` does not " "value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
], ],
), ),
p.open_context( p.open_context(
@ -330,35 +298,18 @@ def test_basic_payload_spec(
assert first.field == 'yo' assert first.field == 'yo'
try: try:
res: None|PldMsg = await ctx.result(hide_tb=False) assert (await ctx.result()) is None
assert res is None
except MsgTypeError as mte: except MsgTypeError as mte:
maybe_mte = mte
if not invalid_return: if not invalid_return:
raise raise
# expected this invalid `Return.pld` so audit else: # expected this invalid `Return.pld`
# the error state + meta-data assert mte.cid == ctx.cid
assert mte.expected_msg_type is Return
assert mte.cid == ctx.cid
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
assert mte.tb_str # verify expected remote mte deats
# await tractor.pause(shield=True) await tractor.pause()
assert ctx._remote_error is mte
# verify expected remote mte deats assert mte.expected_msg_type is Return
assert ctx._local_error is None
assert (
mte is
ctx._remote_error is
ctx.maybe_error is
ctx.outcome
)
if should_raise is None:
assert maybe_mte is None
await p.cancel_actor() await p.cancel_actor()

View File

@ -58,6 +58,9 @@ from typing import (
import warnings import warnings
# ------ - ------ # ------ - ------
import trio import trio
from msgspec import (
ValidationError,
)
# ------ - ------ # ------ - ------
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
@ -75,16 +78,19 @@ from .log import (
from .msg import ( from .msg import (
Error, Error,
MsgType, MsgType,
MsgCodec,
NamespacePath, NamespacePath,
PayloadT, PayloadT,
Started, Started,
Stop, Stop,
Yield, Yield,
current_codec,
pretty_struct, pretty_struct,
_ops as msgops, _ops as msgops,
) )
from ._ipc import ( from ._ipc import (
Channel, Channel,
_mk_msg_type_err,
) )
from ._streaming import MsgStream from ._streaming import MsgStream
from ._state import ( from ._state import (
@ -664,7 +670,7 @@ class Context:
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n' f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}: {self._actor.uid}\n\n' f'=> {self.side!r}: {self._actor.uid}\n\n'
f'{error!r}' f'{error}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
@ -718,7 +724,7 @@ class Context:
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
# f'{pformat(self)}\n' # f'{pformat(self)}\n'
f'{error!r}' f'{error}'
) )
if self._canceller is None: if self._canceller is None:
@ -742,27 +748,26 @@ class Context:
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
): ):
if ( if not (
msgerr msgerr
# NOTE: we allow user to config not cancelling the # NOTE: we allow user to config not cancelling the
# local scope on `MsgTypeError`s # local scope on `MsgTypeError`s
and and not self._cancel_on_msgerr
not self._cancel_on_msgerr
): ):
message: str = (
'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
)
else:
# TODO: it'd sure be handy to inject our own # TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n' message: str = 'Cancelling `Context._scope` !\n\n'
self._scope.cancel() self._scope.cancel()
else:
message: str = (
'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
)
else: else:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb # from .devx import mk_pdb
@ -1652,21 +1657,54 @@ class Context:
# #
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if validate_pld_spec: if validate_pld_spec:
msgops.validate_payload_msg( # __tracebackhide__: bool = False
pld_msg=started_msg, codec: MsgCodec = current_codec()
pld_value=value, msg_bytes: bytes = codec.encode(started_msg)
ipc=self, try:
strict_pld_parity=strict_pld_parity, roundtripped: Started = codec.decode(msg_bytes)
hide_tb=hide_tb, # pld: PayloadT = await self.pld_rx.recv_pld(
) pld: PayloadT = self.pld_rx.dec_msg(
msg=roundtripped,
ipc=self,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_pld_parity
and
pld != value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
started_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
# always show this src frame in the tb
# __tracebackhide__: bool = False
raise _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
) from verr
# TODO: maybe a flag to by-pass encode op if already done # TODO: maybe a flag to by-pass encode op if already done
# here in caller? # here in caller?
await self.chan.send(started_msg) await self.chan.send(started_msg)
# set msg-related internal runtime-state # set msg-related internal runtime-state
self._started_called: bool = True self._started_called = True
self._started_msg: Started = started_msg self._started_msg = started_msg
self._started_pld = value self._started_pld = value
async def _drain_overflows( async def _drain_overflows(
@ -2059,12 +2097,6 @@ async def open_context_from_portal(
if maybe_msgdec: if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec assert maybe_msgdec.pld_spec == pld_spec
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the # XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered # `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will # in `._maybe_cancel_and_set_remote_error()` will
@ -2072,42 +2104,25 @@ async def open_context_from_portal(
# -> it's expected that if there is an error in this phase of # -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try: started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
started_msg, first = await ctx._pld_rx.recv_msg_w_pld( ipc=ctx,
ipc=ctx, expect_msg=Started,
expect_msg=Started, passthrough_non_pld_msgs=False,
passthrough_non_pld_msgs=False, hide_tb=hide_tb,
hide_tb=hide_tb, )
)
except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope
if not ctx_cs.cancel_called:
raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure
# we raise the underlying `._remote_error` directly
# instead of bubbling that taskc.
ctx.maybe_raise()
# OW, some other unexpected cancel condition
# that should prolly never happen right?
raise InternalError(
'Invalid cancellation during IPC ctx sync phase?\n'
) from taskc
# from .devx import pause
# await pause()
ctx._started_called: bool = True ctx._started_called: bool = True
ctx._started_msg: bool = started_msg ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first ctx._started_pld: bool = first
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# deliver context instance and .started() msg value # deliver context instance and .started() msg value
# in enter tuple. # in enter tuple.
yield ctx, first yield ctx, first

View File

@ -22,7 +22,6 @@ from __future__ import annotations
import builtins import builtins
import importlib import importlib
from pprint import pformat from pprint import pformat
import sys
from types import ( from types import (
TracebackType, TracebackType,
) )
@ -111,7 +110,6 @@ _body_fields: list[str] = list(
'tb_str', 'tb_str',
'relay_path', 'relay_path',
'cid', 'cid',
'message',
# only ctxc should show it but `Error` does # only ctxc should show it but `Error` does
# have it as an optional field. # have it as an optional field.
@ -238,7 +236,6 @@ class RemoteActorError(Exception):
self._boxed_type: BaseException = boxed_type self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None self._src_type: BaseException|None = None
self._ipc_msg: Error|None = ipc_msg self._ipc_msg: Error|None = ipc_msg
self._extra_msgdata = extra_msgdata
if ( if (
extra_msgdata extra_msgdata
@ -253,6 +250,8 @@ class RemoteActorError(Exception):
k, k,
v, v,
) )
else:
self._extra_msgdata = extra_msgdata
# TODO: mask out eventually or place in `pack_error()` # TODO: mask out eventually or place in `pack_error()`
# pre-`return` lines? # pre-`return` lines?
@ -283,17 +282,6 @@ class RemoteActorError(Exception):
# ensure any roundtripping evals to the input value # ensure any roundtripping evals to the input value
assert self.boxed_type is boxed_type assert self.boxed_type is boxed_type
@property
def message(self) -> str:
'''
Be explicit, instead of trying to read it from the the parent
type's loosely defined `.args: tuple`:
https://docs.python.org/3/library/exceptions.html#BaseException.args
'''
return self._message
@property @property
def ipc_msg(self) -> Struct: def ipc_msg(self) -> Struct:
''' '''
@ -367,10 +355,7 @@ class RemoteActorError(Exception):
''' '''
bt: Type[BaseException] = self.boxed_type bt: Type[BaseException] = self.boxed_type
if bt: return str(bt.__name__)
return str(bt.__name__)
return ''
@property @property
def boxed_type(self) -> Type[BaseException]: def boxed_type(self) -> Type[BaseException]:
@ -441,7 +426,8 @@ class RemoteActorError(Exception):
for key in fields: for key in fields:
if ( if (
key == 'relay_uid' and not self.is_inception() key == 'relay_uid'
and not self.is_inception()
): ):
continue continue
@ -518,80 +504,19 @@ class RemoteActorError(Exception):
def pformat( def pformat(
self, self,
with_type_header: bool = True, with_type_header: bool = True,
# with_ascii_box: bool = True,
) -> str: ) -> str:
''' '''
Format any boxed remote error by multi-line display of, Nicely formatted boxed error meta data + traceback, OR just
the normal message from `.args` (for eg. as you'd want shown
- error's src or relay actor meta-data, by a locally raised `ContextCancelled`).
- remote runtime env's traceback,
With optional control over the format of,
- whether the boxed traceback is ascii-decorated with
a surrounding "box" annotating the embedded stack-trace.
- if the error's type name should be added as margins
around the field and tb content like:
`<RemoteActorError(.. <<multi-line-content>> .. )>`
- the placement of the `.message: str` (explicit equiv of
`.args[0]`), either placed below the `.tb_str` or in the
first line's header when the error is raised locally (since
the type name is already implicitly shown by python).
''' '''
header: str = '' header: str = ''
body: str = '' body: str = ''
message: str = ''
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(exc := sys.exception())
and
exc is self
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if with_type_header: if with_type_header:
header: str = f'<{type(self).__name__}(' header: str = f'<{type(self).__name__}(\n'
if message := self._message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
if tb_str := self.tb_str: if tb_str := self.tb_str:
fields: str = self._mk_fields_str( fields: str = self._mk_fields_str(
_body_fields _body_fields
@ -610,19 +535,36 @@ class RemoteActorError(Exception):
# |___ .. # |___ ..
tb_body_indent=1, tb_body_indent=1,
) )
if not with_type_header:
body = '\n' + body
tail = '' elif message := self._message:
if ( # split off the first line so it isn't indented
with_type_header # the same like the "boxed content".
and not message if not with_type_header:
): lines: list[str] = message.splitlines()
tail: str = '>' first: str = lines[0]
message: str = message.removeprefix(first)
else:
first: str = ''
body: str = (
first
+
message
+
'\n'
)
if with_type_header:
tail: str = ')>'
else:
tail = ''
return ( return (
header header
+ +
message
+
f'{body}' f'{body}'
+ +
tail tail
@ -635,9 +577,7 @@ class RemoteActorError(Exception):
# |_ i guess `pexepect` relies on `str`-casing # |_ i guess `pexepect` relies on `str`-casing
# of output? # of output?
def __str__(self) -> str: def __str__(self) -> str:
return self.pformat( return self.pformat(with_type_header=False)
with_type_header=False
)
def unwrap( def unwrap(
self, self,
@ -885,6 +825,9 @@ class MsgTypeError(
extra_msgdata['_bad_msg'] = bad_msg extra_msgdata['_bad_msg'] = bad_msg
extra_msgdata['cid'] = bad_msg.cid extra_msgdata['cid'] = bad_msg.cid
if 'cid' not in extra_msgdata:
import pdbp; pdbp.set_trace()
return cls( return cls(
message=message, message=message,
boxed_type=cls, boxed_type=cls,
@ -946,7 +889,6 @@ def pack_error(
src_uid: tuple[str, str]|None = None, src_uid: tuple[str, str]|None = None,
tb: TracebackType|None = None, tb: TracebackType|None = None,
tb_str: str = '', tb_str: str = '',
message: str = '',
) -> Error: ) -> Error:
''' '''
@ -960,7 +902,7 @@ def pack_error(
tb_str: str = ( tb_str: str = (
''.join(traceback.format_exception(exc)) ''.join(traceback.format_exception(exc))
# TODO: can we remove this since `exc` is required.. right? # TODO: can we remove this is `exc` is required?
or or
# NOTE: this is just a shorthand for the "last error" as # NOTE: this is just a shorthand for the "last error" as
# provided by `sys.exeception()`, see: # provided by `sys.exeception()`, see:
@ -975,8 +917,8 @@ def pack_error(
# when caller provides a tb instance (say pulled from some other # when caller provides a tb instance (say pulled from some other
# src error's `.__traceback__`) we use that as the "boxed" # src error's `.__traceback__`) we use that as the "boxed"
# tb-string instead. # tb-string instead.
# https://docs.python.org/3/library/traceback.html#traceback.format_list
if tb: if tb:
# https://docs.python.org/3/library/traceback.html#traceback.format_list
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
error_msg: dict[ # for IPC error_msg: dict[ # for IPC
@ -1019,17 +961,17 @@ def pack_error(
error_msg['src_type_str'] = type(exc).__name__ error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__ error_msg['boxed_type_str'] = type(exc).__name__
# XXX always append us the last relay in error propagation path # XXX alawys append us the last relay in error propagation path
error_msg.setdefault( error_msg.setdefault(
'relay_path', 'relay_path',
[], [],
).append(our_uid) ).append(our_uid)
# XXX NOTE XXX always ensure the traceback-str content is from # XXX NOTE: always ensure the traceback-str is from the
# the locally raised error (so, NOT the prior relay's boxed # locally raised error (**not** the prior relay's boxed
# `._ipc_msg.tb_str`). # content's in `._ipc_msg.tb_str`).
error_msg['tb_str'] = tb_str error_msg['tb_str'] = tb_str
error_msg['message'] = message or getattr(exc, 'message', '')
if cid is not None: if cid is not None:
error_msg['cid'] = cid error_msg['cid'] = cid
@ -1053,24 +995,26 @@ def unpack_error(
if not isinstance(msg, Error): if not isinstance(msg, Error):
return None return None
# retrieve the remote error's msg-encoded details
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
+
tb_str
)
# try to lookup a suitable error type from the local runtime # try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance. # env then use it to construct a local instance.
# boxed_type_str: str = error_dict['boxed_type_str'] # boxed_type_str: str = error_dict['boxed_type_str']
boxed_type_str: str = msg.boxed_type_str boxed_type_str: str = msg.boxed_type_str
boxed_type: Type[BaseException] = get_err_type(boxed_type_str) boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
# retrieve the error's msg-encoded remotoe-env info if boxed_type_str == 'ContextCancelled':
message: str = f'remote task raised a {msg.boxed_type_str!r}\n' box_type = ContextCancelled
assert boxed_type is box_type
# TODO: do we even really need these checks for RAEs? elif boxed_type_str == 'MsgTypeError':
if boxed_type_str in [ box_type = MsgTypeError
'ContextCancelled',
'MsgTypeError',
]:
box_type = {
'ContextCancelled': ContextCancelled,
'MsgTypeError': MsgTypeError,
}[boxed_type_str]
assert boxed_type is box_type assert boxed_type is box_type
# TODO: already included by `_this_mod` in else loop right? # TODO: already included by `_this_mod` in else loop right?
@ -1085,21 +1029,19 @@ def unpack_error(
exc = box_type( exc = box_type(
message, message,
ipc_msg=msg, ipc_msg=msg,
tb_str=msg.tb_str,
) )
return exc return exc
def is_multi_cancelled( def is_multi_cancelled(exc: BaseException) -> bool:
exc: BaseException|BaseExceptionGroup
) -> bool:
''' '''
Predicate to determine if a possible ``BaseExceptionGroup`` contains Predicate to determine if a possible ``BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks. cancelling a collection of subtasks.
''' '''
# if isinstance(exc, eg.BaseExceptionGroup):
if isinstance(exc, BaseExceptionGroup): if isinstance(exc, BaseExceptionGroup):
return exc.subgroup( return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled) lambda exc: isinstance(exc, trio.Cancelled)
@ -1167,7 +1109,6 @@ def _raise_from_unexpected_msg(
msg, msg,
ctx.chan, ctx.chan,
) )
ctx._maybe_cancel_and_set_remote_error(exc)
raise exc from src_err raise exc from src_err
# `MsgStream` termination msg. # `MsgStream` termination msg.
@ -1242,6 +1183,7 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None, src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None, src_type_error: TypeError|None = None,
is_invalid_payload: bool = False, is_invalid_payload: bool = False,
# src_err_msg: Error|None = None,
**mte_kwargs, **mte_kwargs,
@ -1308,11 +1250,19 @@ def _mk_msg_type_err(
msg_type: str = type(msg) msg_type: str = type(msg)
any_pld: Any = msgpack.decode(msg.pld) any_pld: Any = msgpack.decode(msg.pld)
message: str = ( message: str = (
f'invalid `{msg_type.__qualname__}` msg payload\n\n' f'invalid `{msg_type.__qualname__}` payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: ' f'value: `{any_pld!r}` does not match type-spec: ' #\n'
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
# f'<{type(msg).__qualname__}(\n'
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
# f')>\n\n'
) )
# src_err_msg = msg
bad_msg = msg bad_msg = msg
# TODO: should we just decode the msg to a dict despite
# only the payload being wrong?
# -[ ] maybe the better design is to break this construct
# logic into a separate explicit helper raiser-func?
else: else:
# decode the msg-bytes using the std msgpack # decode the msg-bytes using the std msgpack
@ -1357,21 +1307,21 @@ def _mk_msg_type_err(
if verb_header: if verb_header:
message = f'{verb_header} ' + message message = f'{verb_header} ' + message
# if not isinstance(bad_msg, PayloadMsg):
# import pdbp; pdbp.set_trace()
msgtyperr = MsgTypeError.from_decode( msgtyperr = MsgTypeError.from_decode(
message=message, message=message,
bad_msg=bad_msg, bad_msg=bad_msg,
bad_msg_as_dict=msg_dict, bad_msg_as_dict=msg_dict,
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually: # NOTE: for the send-side `.started()` pld-validate
# - for the send-side `.started()` pld-validate # case we actually set the `._ipc_msg` AFTER we return
# case we actually raise inline so we don't need to # from here inside `Context.started()` since we actually
# set the it at all. # want to emulate the `Error` from the mte we build here
# - for recv side we set it inside `PldRx.decode_pld()` # Bo
# after a manual call to `pack_error()` since we # so by default in that case this is set to `None`
# actually want to emulate the `Error` from the mte we # ipc_msg=src_err_msg,
# build here. So by default in that case, this is left
# as `None` here.
# ipc_msg=src_err_msg,
) )
msgtyperr.__cause__ = src_validation_error msgtyperr.__cause__ = src_validation_error
return msgtyperr return msgtyperr

View File

@ -291,7 +291,7 @@ class MsgpackTCPStream(MsgTransport):
async def send( async def send(
self, self,
msg: msgtypes.MsgType, msg: msgtypes.Msg,
strict_types: bool = True, strict_types: bool = True,
# hide_tb: bool = False, # hide_tb: bool = False,

View File

@ -64,7 +64,6 @@ from .log import get_logger
from .msg import ( from .msg import (
current_codec, current_codec,
MsgCodec, MsgCodec,
PayloadT,
NamespacePath, NamespacePath,
pretty_struct, pretty_struct,
) )
@ -99,7 +98,7 @@ async def _invoke_non_context(
treat_as_gen: bool, treat_as_gen: bool,
is_rpc: bool, is_rpc: bool,
return_msg_type: Return|CancelAck = Return, return_msg: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -221,7 +220,7 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
ret_msg = return_msg_type( ret_msg = return_msg(
cid=cid, cid=cid,
pld=result, pld=result,
) )
@ -393,22 +392,16 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
log.warning( log.warning(
'RPC task likely errored or cancelled before start?\n' 'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
else:
log.cancel(
'Failed to de-alloc internal runtime cancel task?\n'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n' f' >> {ctx.repr_rpc}\n'
) )
# TODO: remove this right? rn the only non-`is_rpc` cases
# are cancellation methods and according the RPC loop eps
# for thoses below, nothing is ever registered in
# `Actor._rpc_tasks` for those cases.. but should we?
#
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
# else:
# log.cancel(
# 'Failed to de-alloc internal runtime cancel task?\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# )
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
@ -426,7 +419,7 @@ async def _invoke(
is_rpc: bool = True, is_rpc: bool = True,
hide_tb: bool = True, hide_tb: bool = True,
return_msg_type: Return|CancelAck = Return, return_msg: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -540,7 +533,7 @@ async def _invoke(
kwargs, kwargs,
treat_as_gen, treat_as_gen,
is_rpc, is_rpc,
return_msg_type, return_msg,
task_status, task_status,
) )
# XXX below fallthrough is ONLY for `@context` eps # XXX below fallthrough is ONLY for `@context` eps
@ -600,21 +593,18 @@ async def _invoke(
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
task_status.started(ctx) task_status.started(ctx)
# TODO: better `trionics` tooling: # TODO: should would be nice to have our
# -[ ] should would be nice to have our `TaskMngr` # `TaskMngr` nursery here!
# nursery here! res: Any = await coro
# -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage
# here in the child task instead of waiting for the
# parent to crash with it's own MTE..
res: Any|PayloadT = await coro
return_msg: Return|CancelAck = return_msg_type(
cid=cid,
pld=res,
)
# set and shuttle final result to "parent"-side task.
ctx._result = res ctx._result = res
await chan.send(return_msg)
# deliver final result to caller side.
await chan.send(
return_msg(
cid=cid,
pld=res,
)
)
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
@ -950,7 +940,7 @@ async def process_messages(
actor.cancel, actor.cancel,
kwargs, kwargs,
is_rpc=False, is_rpc=False,
return_msg_type=CancelAck, return_msg=CancelAck,
) )
log.runtime( log.runtime(
@ -984,7 +974,7 @@ async def process_messages(
actor._cancel_task, actor._cancel_task,
kwargs, kwargs,
is_rpc=False, is_rpc=False,
return_msg_type=CancelAck, return_msg=CancelAck,
) )
except BaseException: except BaseException:
log.exception( log.exception(

View File

@ -1256,10 +1256,9 @@ class Actor:
# - child returns a result before cancel-msg/ctxc-raised # - child returns a result before cancel-msg/ctxc-raised
# - child self raises ctxc before parent send request, # - child self raises ctxc before parent send request,
# - child errors prior to cancel req. # - child errors prior to cancel req.
log.runtime( log.cancel(
'Cancel request for invalid RPC task.\n' 'Cancel request invalid, RPC task already completed?\n\n'
'The task likely already completed or was never started!\n\n' f'<= canceller: {requesting_uid}\n\n'
f'<= canceller: {requesting_uid}\n'
f'=> {cid}@{parent_chan.uid}\n' f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n' f' |_{parent_chan}\n'
) )

View File

@ -140,7 +140,7 @@ class MsgDec(Struct):
# * also a `.__contains__()` for doing `None in # * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on # TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions.. # `.__args__` for unions..
# - `MsgSpec: Union[MsgType] # - `MsgSpec: Union[Type[Msg]]
# #
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo # -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params # |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
@ -188,7 +188,7 @@ def mk_dec(
return MsgDec( return MsgDec(
_dec=msgpack.Decoder( _dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]` type=spec, # like `Msg[Any]`
dec_hook=dec_hook, dec_hook=dec_hook,
) )
) )
@ -561,7 +561,7 @@ def mk_codec(
''' '''
# (manually) generate a msg-payload-spec for all relevant # (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT` # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP # for the decoder such that all sub-type msgs in our SCIPP
# will automatically decode to a type-"limited" payload (`Struct`) # will automatically decode to a type-"limited" payload (`Struct`)
# object (set). # object (set).
@ -607,7 +607,7 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# The built-in IPC `Msg` spec. # The built-in IPC `Msg` spec.
# Our composing "shuttle" protocol which allows `tractor`-app code # Our composing "shuttle" protocol which allows `tractor`-app code
# to use any `msgspec` supported type as the `PayloadMsg.pld` payload, # to use any `msgspec` supported type as the `Msg.pld` payload,
# https://jcristharif.com/msgspec/supported-types.html # https://jcristharif.com/msgspec/supported-types.html
# #
_def_tractor_codec: MsgCodec = mk_codec( _def_tractor_codec: MsgCodec = mk_codec(
@ -743,7 +743,7 @@ def limit_msg_spec(
) -> MsgCodec: ) -> MsgCodec:
''' '''
Apply a `MsgCodec` that will natively decode the SC-msg set's Apply a `MsgCodec` that will natively decode the SC-msg set's
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using `Msg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types` tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`. for all IPC contexts in use by the current `trio.Task`.

View File

@ -53,8 +53,6 @@ from tractor._state import current_ipc_ctx
from ._codec import ( from ._codec import (
mk_dec, mk_dec,
MsgDec, MsgDec,
MsgCodec,
current_codec,
) )
from .types import ( from .types import (
CancelAck, CancelAck,
@ -215,9 +213,6 @@ class PldRx(Struct):
**dec_msg_kwargs, **dec_msg_kwargs,
) )
# TODO: rename to,
# -[ ] `.decode_pld()`?
# -[ ] `.dec_pld()`?
def dec_msg( def dec_msg(
self, self,
msg: MsgType, msg: MsgType,
@ -251,8 +246,8 @@ class PldRx(Struct):
pld: PayloadT = self._pld_dec.decode(pld) pld: PayloadT = self._pld_dec.decode(pld)
log.runtime( log.runtime(
'Decoded msg payload\n\n' 'Decoded msg payload\n\n'
f'{msg}\n' f'{msg}\n\n'
f'where payload decoded as\n' f'where payload is\n'
f'|_pld={pld!r}\n' f'|_pld={pld!r}\n'
) )
return pld return pld
@ -268,29 +263,13 @@ class PldRx(Struct):
src_validation_error=valerr, src_validation_error=valerr,
is_invalid_payload=True, is_invalid_payload=True,
expected_msg=expect_msg, expected_msg=expect_msg,
# ipc_msg=msg,
) )
# NOTE: just raise the MTE inline instead of all # NOTE: override the `msg` passed to
# the pack-unpack-repack non-sense when this is # `_raise_from_unexpected_msg()` (below) so so that
# a "send side" validation error. # we're effectively able to use that same func to
if is_started_send_side: # unpack and raise an "emulated remote `Error`" of
raise mte # this local MTE.
# XXX TODO: remove this right?
# => any bad stated/return values should
# always be treated a remote errors right?
#
# if (
# expect_msg is Return
# or expect_msg is Started
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# NOTE: the `.message` is automatically
# transferred into the message as long as we
# define it as a `Error.message` field.
err_msg: Error = pack_error( err_msg: Error = pack_error(
exc=mte, exc=mte,
cid=msg.cid, cid=msg.cid,
@ -300,38 +279,36 @@ class PldRx(Struct):
else ipc._actor.uid else ipc._actor.uid
), ),
# tb=valerr.__traceback__, # tb=valerr.__traceback__,
# tb_str=mte._message, tb_str=mte._message,
# message=mte._message,
) )
mte._ipc_msg = err_msg # ^-TODO-^ just raise this inline instead of all the
# pack-unpack-repack non-sense!
# XXX override the `msg` passed to mte._ipc_msg = err_msg
# `_raise_from_unexpected_msg()` (below) so so
# that we're effectively able to use that same
# func to unpack and raise an "emulated remote
# `Error`" of this local MTE.
msg = err_msg msg = err_msg
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises # set emulated remote error more-or-less as the
# it from the above caught interchange-lib # runtime would
# validation error. ctx: Context = getattr(ipc, 'ctx', ipc)
src_err = valerr
# TODO: should we instead make this explicit and # TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`, # use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set # expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly # it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y.. # edge-case-y..
# TODO: remove this since it's been added to if (
# `_raise_from_unexpected_msg()`..? expect_msg is not Started
# if ( and not is_started_send_side
# expect_msg is not Started ):
# and not is_started_send_side ctx._maybe_cancel_and_set_remote_error(mte)
# ):
# # set emulated remote error more-or-less as the # XXX NOTE: so when the `_raise_from_unexpected_msg()`
# # runtime would # raises the boxed `err_msg` from above it raises
# ctx: Context = getattr(ipc, 'ctx', ipc) # it from `None`.
# ctx._maybe_cancel_and_set_remote_error(mte) src_err = valerr
# if is_started_send_side:
# src_err = None
# XXX some other decoder specific failure? # XXX some other decoder specific failure?
# except TypeError as src_error: # except TypeError as src_error:
@ -582,7 +559,6 @@ async def drain_to_final_msg(
ipc=ctx, ipc=ctx,
expect_msg=Return, expect_msg=Return,
raise_error=False, raise_error=False,
hide_tb=hide_tb,
) )
# ^-TODO-^ some bad ideas? # ^-TODO-^ some bad ideas?
# -[ ] wrap final outcome .receive() in a scope so # -[ ] wrap final outcome .receive() in a scope so
@ -761,61 +737,9 @@ async def drain_to_final_msg(
) )
# TODO: factor logic from `.Context.started()` for send-side
# validate raising!
def validate_payload_msg( def validate_payload_msg(
pld_msg: Started|Yield|Return, msg: Started|Yield|Return,
pld_value: PayloadT,
ipc: Context|MsgStream,
raise_mte: bool = True,
strict_pld_parity: bool = False,
hide_tb: bool = True,
) -> MsgTypeError|None: ) -> MsgTypeError|None:
''' ...
Validate a `PayloadMsg.pld` value with the current
IPC ctx's `PldRx` and raise an appropriate `MsgTypeError`
on failure.
'''
__tracebackhide__: bool = hide_tb
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(pld_msg)
try:
roundtripped: Started = codec.decode(msg_bytes)
ctx: Context = getattr(ipc, 'ctx', ipc)
pld: PayloadT = ctx.pld_rx.dec_msg(
msg=roundtripped,
ipc=ipc,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_pld_parity
and
pld != pld_value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
pld_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
mte: MsgTypeError = _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
)
if not raise_mte:
return mte
raise mte from verr

View File

@ -89,12 +89,11 @@ class PayloadMsg(
# -[ ] `uuid.UUID` which has multi-protocol support # -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid # https://jcristharif.com/msgspec/supported-types.html#uuid
# The msg's "payload" (spelled without vowels): # The msgs "payload" (spelled without vowels):
# https://en.wikipedia.org/wiki/Payload_(computing) # https://en.wikipedia.org/wiki/Payload_(computing)
pld: Raw #
# NOTE: inherited from any `Msg` (and maybe overriden
# ^-NOTE-^ inherited from any `PayloadMsg` (and maybe type # by use of `limit_msg_spec()`), but by default is
# overriden via the `._ops.limit_plds()` API), but by default is
# parameterized to be `Any`. # parameterized to be `Any`.
# #
# XXX this `Union` must strictly NOT contain `Any` if # XXX this `Union` must strictly NOT contain `Any` if
@ -107,6 +106,7 @@ class PayloadMsg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders # TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization # approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below. # approach as take by `mk_msg_spec()` below.
pld: Raw
# TODO: complete rename # TODO: complete rename
@ -410,32 +410,21 @@ class Error(
src_type_str: str src_type_str: str
boxed_type_str: str boxed_type_str: str
relay_path: list[tuple[str, str]] relay_path: list[tuple[str, str]]
tb_str: str
# normally either both are provided or just cid: str|None = None
# a message for certain special cases where
# we pack a message for a locally raised
# mte or ctxc.
message: str|None = None
tb_str: str = ''
# TODO: only optionally include sub-type specfic fields? # TODO: use UNSET or don't include them via
# -[ ] use UNSET or don't include them via `omit_defaults` (see
# inheritance-line options above)
# #
# `ContextCancelled` reports the src cancelling `Actor.uid` # `ContextCancelled`
canceller: tuple[str, str]|None = None canceller: tuple[str, str]|None = None
# `StreamOverrun`-specific src `Actor.uid` # `StreamOverrun`
sender: tuple[str, str]|None = None sender: tuple[str, str]|None = None
# `MsgTypeError` meta-data # for the `MsgTypeError` case where the receiver side
cid: str|None = None # decodes the underlying original `Msg`-subtype
# when the receiver side fails to decode a delivered _msg_dict: dict|None = None
# `PayloadMsg`-subtype; one and/or both the msg-struct instance
# and `Any`-decoded to `dict` of the msg are set and relayed
# (back to the sender) for introspection.
_bad_msg: Started|Yield|Return|None = None
_bad_msg_as_dict: dict|None = None
def from_dict_msg( def from_dict_msg(
@ -447,11 +436,9 @@ def from_dict_msg(
) -> MsgType: ) -> MsgType:
''' '''
Helper to build a specific `MsgType` struct from a "vanilla" Helper to build a specific `MsgType` struct from
decoded `dict`-ified equivalent of the msg: i.e. if the a "vanilla" decoded `dict`-ified equivalent of the
`msgpack.Decoder.type == Any`, the default when using msg: i.e. if the `msgpack.Decoder.type == Any`.
`msgspec.msgpack` and not "typed decoding" using
`msgspec.Struct`.
''' '''
msg_type_tag_field: str = ( msg_type_tag_field: str = (