Compare commits

..

16 Commits

Author SHA1 Message Date
Tyler Goodlet 0e8c60ee4a Better RAE `.pformat()`-ing for send-side MTEs
Send-side `MsgTypeError`s actually shouldn't have any "boxed" traceback
per say since they're raised in the transmitting actor's local task env
and we (normally) don't want the ascii decoration added around the
error's `._message: str`, that is not until the exc is `pack_error()`-ed
before transit. As such, the presentation of an embedded traceback (and
its ascii box) gets bypassed when only a `._message: str` is set (as we
now do for pld-spec failures in `_mk_msg_type_err()`).

Further this tweaks the `.pformat()` output to include the `._message`
part to look like `<RemoteActorError( <._message> ) ..` instead of
jamming it implicitly to the end of the embedded `.tb_str` (as was done
implicitly by `unpack_error()`) and also adds better handling for the
`with_type_header == False` case including forcing that case when we
detect that the currently handled exc is the RAE in `.pformat()`.
Toss in a lengthier doc-str explaining it all.

Surrounding/supporting changes,
- better `unpack_error()` message which just briefly reports the remote
  task's error type.
- add public `.message: str` prop.
- always set a `._extra_msgdata: dict` since some MTE props rely on it.
- handle `.boxed_type == None` for `.boxed_type_str`.
- maybe pack any detected input or `exc.message` in `pack_error()`.
- comment cruft cleanup in `_mk_msg_type_err()`.
2024-05-30 10:25:04 -04:00
Tyler Goodlet 1db5d4def2 Add `Error.message: str`
Allows passing a custom error msg other then the traceback-str over the
wire. Make `.tb_str` optional (in the blank `''` sense) since it's
treated that way thus far in `._exceptions.pack_error()`.
2024-05-30 09:14:04 -04:00
Tyler Goodlet 6e54abc56d Fix missing newline in task-cancel log-message 2024-05-30 09:06:10 -04:00
Tyler Goodlet 28af4749cc Don't need to pack an `Error` with send-side MTEs 2024-05-30 09:05:23 -04:00
Tyler Goodlet 02a7c7c276 Ensure only a boxed traceback for MTE on parent side 2024-05-30 01:11:29 -04:00
Tyler Goodlet 4fa71cc01c Ensure ctx error-state matches the MTE scenario
Namely checking that `Context._remote_error` is set to the raised MTE
in the invalid started and return value cases since prior to the recent
underlying changes to the `Context.result()` impl, it would not match.

Further,
- do asserts for non-MTE raising cases in both the parent and child.
- add todos for testing ctx-outcomes for per-side-validation policies
  i anticipate supporting and implied msg-dialog race cases therein.
2024-05-28 20:07:48 -04:00
Tyler Goodlet 6a4ee461f5 Raise remote errors rxed during `Context` child-sync
More specifically, if `.open_context()` is cancelled when awaiting the
first `Context.started()` during the child task sync phase, check to see
if it was due to `._scope.cancel_called` and raise any remote error via
`.maybe_raise()` instead the `trio.Cancelled` like in every other
remote-error handling case. Ensure we set `._scope[_nursery]` only after
the `Started` has arrived and audited.
2024-05-28 16:11:01 -04:00
Tyler Goodlet 2db03444f7 Don't (noisly) log about runtime cancel RPC tasks
Since in the case of the `Actor._cancel_task()` related runtime eps we
actually don't EVER register them in `Actor._rpc_tasks`.. logging about
them is just needless noise, though maybe we should track them in a diff
table; something like a `._runtime_rpc_tasks`?

Drop the cancel-request-for-stale-RPC-task (`KeyError` case in
`Actor._cancel_task()`) log-emit level in to `.runtime()`; it's
generally not useful info other then for granular race condition eval
when hacking the runtime.
2024-05-28 16:03:36 -04:00
Tyler Goodlet a1b124b62b Raise send-side MTEs inline in `PldRx.dec_msg()`
So when `is_started_send_side is True` we raise the newly created
`MsgTypeError` (MTE) directly instead of doing all the `Error`-msg pack
and unpack to raise stuff via `_raise_from_unexpected_msg()` since the
raise should happen send side anyway and so doesn't emulate any remote
fault like in a bad `Return` or `Started` without send-side pld-spec
validation.

Oh, and proxy-through the `hide_tb: bool` input from `.drain_to_final_msg()`
to `.recv_msg_w_pld()`.
2024-05-28 16:02:51 -04:00
Tyler Goodlet 59ca256183 Set remote errors in `_raise_from_unexpected_msg()`
By calling `Context._maybe_cancel_and_set_remote_error(exc)` on any
unpacked `Error` msg; provides for `Context.maybe_error` consistency to
match all other error delivery cases.
2024-05-28 15:48:01 -04:00
Tyler Goodlet 6c2efc96dc Factor `.started()` validation into `.msg._ops`
Filling out the helper `validate_payload_msg()` staged in a prior commit
and adjusting all imports to match.

Also add a `raise_mte: bool` flag for potential usage where the caller
wants to handle the MTE instance themselves.
2024-05-28 11:08:27 -04:00
Tyler Goodlet f7fd8278af Fix `test_basic_payload_spec` bad msg matching
Expecting `Started` or `Return` with respective bad `.pld` values
depending on what type of failure is test parametrized.

This makes the suite run green it seems B)
2024-05-28 11:05:46 -04:00
Tyler Goodlet 7ac730e326 Drop `msg.types.Msg` for new replacement types
The `TypeAlias` for the msg type-group is now `MsgType` and any user
touching shuttle messages can now be typed as `PayloadMsg`.

Relatedly, add MTE specific `Error._bad_msg[_as_dict]` fields which are
handy for introspection of remote decode failures.
2024-05-28 09:55:16 -04:00
Tyler Goodlet 582144830f Parameterize the `return_msg_type` in `._invoke()`
Since we also handle a runtime-specific `CancelAck`, allow the
caller-scheduler to pass in the expected return-type msg per the RPC msg
endpoint loop.
2024-05-28 09:36:26 -04:00
Tyler Goodlet 8b860f4245 Move `.devx` related deps to `dev` group 2024-05-28 09:34:08 -04:00
Tyler Goodlet 27fd96729a Tweaks to debugger examples
Light stuff like comments, typing, and a couple API usage updates.
2024-05-28 09:22:59 -04:00
16 changed files with 522 additions and 312 deletions

View File

@ -4,9 +4,15 @@ import trio
async def breakpoint_forever(): async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor." "Indefinitely re-enter debugger in child actor."
try:
while True: while True:
yield 'yo' yield 'yo'
await tractor.breakpoint() await tractor.breakpoint()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'
)
raise
async def name_error(): async def name_error():
@ -19,7 +25,7 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='error', loglevel='cancel',
) as n: ) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__]) p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -45,6 +45,7 @@ async def spawn_until(depth=0):
) )
# TODO: notes on the new boxed-relayed errors through proxy actors
async def main(): async def main():
"""The main ``tractor`` routine. """The main ``tractor`` routine.

View File

@ -23,5 +23,6 @@ async def main():
n.start_soon(debug_actor.run, die) n.start_soon(debug_actor.run, die)
n.start_soon(crash_boi.run, die) n.start_soon(crash_boi.run, die)
if __name__ == '__main__': if __name__ == '__main__':
trio.run(main) trio.run(main)

View File

@ -2,10 +2,13 @@ import trio
import tractor import tractor
async def main(): async def main(
registry_addrs: tuple[str, int]|None = None
):
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=True, debug_mode=True,
# loglevel='runtime',
): ):
while True: while True:
await tractor.breakpoint() await tractor.breakpoint()

View File

@ -3,16 +3,26 @@ import tractor
async def name_error(): async def name_error():
getattr(doggypants) getattr(doggypants) # noqa (on purpose)
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
) as n: # loglevel='transport',
) as an:
portal = await n.run_in_actor(name_error) # TODO: ideally the REPL arrives at this frame in the parent,
await portal.result() # ABOVE the @api_frame of `Portal.run_in_actor()` (which
# should eventually not even be a portal method ... XD)
# await tractor.pause()
p: tractor.Portal = await an.run_in_actor(name_error)
# with this style, should raise on this line
await p.result()
# with this alt style should raise at `open_nusery()`
# return await p.result()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -7,7 +7,7 @@ def sync_pause(
error: bool = False, error: bool = False,
): ):
if use_builtin: if use_builtin:
breakpoint() breakpoint(hide_tb=False)
else: else:
tractor.pause_from_sync() tractor.pause_from_sync()
@ -20,18 +20,20 @@ def sync_pause(
async def start_n_sync_pause( async def start_n_sync_pause(
ctx: tractor.Context, ctx: tractor.Context,
): ):
# sync to requesting peer actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
await ctx.started() await ctx.started()
actor: tractor.Actor = tractor.current_actor()
print(f'entering SYNC PAUSE in {actor.uid}') print(f'entering SYNC PAUSE in {actor.uid}')
sync_pause() sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}') print(f'back from SYNC PAUSE in {actor.uid}')
async def main() -> None: async def main() -> None:
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True, debug_mode=True,
) as an: ) as an:

View File

@ -39,8 +39,7 @@ msgspec='^0.18.5' # interchange
wrapt = "^1.16.0" # decorators wrapt = "^1.16.0" # decorators
colorlog = "^6.8.2" # logging colorlog = "^6.8.2" # logging
# .devx tooling # built-in multi-actor `pdb` REPL
stackscope = "^0.2.2"
pdbp = "^1.5.0" pdbp = "^1.5.0"
@ -49,15 +48,19 @@ pdbp = "^1.5.0"
# 'pyroute2 # 'pyroute2
# ------ - ------ # ------ - ------
xontrib-vox = "^0.0.1"
[tool.poetry.group.dev] [tool.poetry.group.dev]
optional = false optional = false
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
# testing
pytest = "^8.2.0" pytest = "^8.2.0"
pexpect = "^4.9.0" pexpect = "^4.9.0"
# only for xonsh as sh.. # .devx tooling
greenback = "^1.2.1"
stackscope = "^0.2.2"
# (light) xonsh usage/integration
xontrib-vox = "^0.0.1" xontrib-vox = "^0.0.1"
prompt-toolkit = "^3.0.43" prompt-toolkit = "^3.0.43"
xonsh-vox-tabcomplete = "^0.5" xonsh-vox-tabcomplete = "^0.5"

View File

@ -6,30 +6,19 @@ related settings around IPC contexts.
''' '''
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
contextmanager as cm,
)
# import typing
from typing import (
# Any,
TypeAlias,
# Union,
) )
from contextvars import ( from contextvars import (
Context, Context,
) )
from msgspec import ( from msgspec import (
# structs,
# msgpack,
Struct, Struct,
# ValidationError,
) )
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor import ( from tractor import (
# _state,
MsgTypeError, MsgTypeError,
current_ipc_ctx, current_ipc_ctx,
Portal, Portal,
@ -40,20 +29,9 @@ from tractor.msg import (
) )
from tractor.msg import ( from tractor.msg import (
_codec, _codec,
# _ctxvar_MsgCodec,
# NamespacePath,
# MsgCodec,
# mk_codec,
# apply_codec,
# current_codec,
) )
from tractor.msg.types import ( from tractor.msg.types import (
log, log,
# _payload_msgs,
# PayloadMsg,
# Started,
# mk_msg_spec,
) )
@ -64,23 +42,10 @@ class PldMsg(Struct):
maybe_msg_spec = PldMsg|None maybe_msg_spec = PldMsg|None
@cm
def custom_spec(
ctx: Context,
spec: TypeAlias,
) -> _codec.MsgCodec:
'''
Apply a custom payload spec, remove on exit.
'''
rx: msgops.PldRx = ctx._pld_rx
@acm @acm
async def maybe_expect_raises( async def maybe_expect_raises(
raises: BaseException|None = None, raises: BaseException|None = None,
ensure_in_message: list[str]|None = None, ensure_in_message: list[str]|None = None,
reraise: bool = False, reraise: bool = False,
timeout: int = 3, timeout: int = 3,
) -> None: ) -> None:
@ -88,6 +53,9 @@ async def maybe_expect_raises(
Async wrapper for ensuring errors propagate from the inner scope. Async wrapper for ensuring errors propagate from the inner scope.
''' '''
if tractor._state.debug_mode():
timeout += 999
with trio.fail_after(timeout): with trio.fail_after(timeout):
try: try:
yield yield
@ -103,9 +71,10 @@ async def maybe_expect_raises(
# maybe check for error txt content # maybe check for error txt content
if ensure_in_message: if ensure_in_message:
part: str part: str
err_repr: str = repr(inner_err)
for part in ensure_in_message: for part in ensure_in_message:
for i, arg in enumerate(inner_err.args): for i, arg in enumerate(inner_err.args):
if part in arg: if part in err_repr:
break break
# if part never matches an arg, then we're # if part never matches an arg, then we're
# missing a match. # missing a match.
@ -166,13 +135,15 @@ async def child(
# 2 cases: hdndle send-side and recv-only validation # 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate # - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation # - else, parent-recv-side only validation
mte: MsgTypeError|None = None
try: try:
await ctx.started( await ctx.started(
value=started_value, value=started_value,
validate_pld_spec=validate_pld_spec, validate_pld_spec=validate_pld_spec,
) )
except MsgTypeError: except MsgTypeError as _mte:
mte = _mte
log.exception('started()` raised an MTE!\n') log.exception('started()` raised an MTE!\n')
if not expect_started_mte: if not expect_started_mte:
raise RuntimeError( raise RuntimeError(
@ -180,16 +151,61 @@ async def child(
f'{started_value!r}\n' f'{started_value!r}\n'
) )
boxed_div: str = '------ - ------'
assert boxed_div not in mte._message
assert boxed_div not in mte.tb_str
assert boxed_div not in repr(mte)
assert boxed_div not in str(mte)
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
# since this is a *local error* there should be no
# boxed traceback content!
assert not mte.tb_str
# propagate to parent? # propagate to parent?
if raise_on_started_mte: if raise_on_started_mte:
raise raise
else:
if expect_started_mte: # no-send-side-error fallthrough
if (
validate_pld_spec
and
expect_started_mte
):
raise RuntimeError( raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n' 'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n' f'{started_value!r}\n'
) )
assert (
not expect_started_mte
or
not validate_pld_spec
)
# if wait_for_parent_to_cancel:
# ...
#
# ^-TODO-^ logic for diff validation policies on each side:
#
# -[ ] ensure that if we don't validate on the send
# side, that we are eventually error-cancelled by our
# parent due to the bad `Started` payload!
# -[ ] the boxed error should be srced from the parent's
# runtime NOT ours!
# -[ ] we should still error on bad `return_value`s
# despite the parent not yet error-cancelling us?
# |_ how do we want the parent side to look in that
# case?
# -[ ] maybe the equiv of "during handling of the
# above error another occurred" for the case where
# the parent sends a MTE to this child and while
# waiting for the child to terminate it gets back
# the MTE for this case?
#
# XXX should always fail on recv side since we can't # XXX should always fail on recv side since we can't
# really do much else beside terminate and relay the # really do much else beside terminate and relay the
# msg-type-error from this RPC task ;) # msg-type-error from this RPC task ;)
@ -211,8 +227,8 @@ async def child(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'return_value', 'return_value',
[ [
None,
'yo', 'yo',
None,
], ],
ids=[ ids=[
'return[invalid-"yo"]', 'return[invalid-"yo"]',
@ -271,16 +287,32 @@ def test_basic_payload_spec(
# since not opened yet. # since not opened yet.
assert current_ipc_ctx() is None assert current_ipc_ctx() is None
async with ( if invalid_started:
maybe_expect_raises( msg_type_str: str = 'Started'
raises=MsgTypeError if ( bad_value_str: str = '10'
elif invalid_return:
msg_type_str: str = 'Return'
bad_value_str: str = "'yo'"
else:
# XXX but should never be used below then..
msg_type_str: str = ''
bad_value_str: str = ''
maybe_mte: MsgTypeError|None = None
should_raise: Exception|None = (
MsgTypeError if (
invalid_return invalid_return
or or
invalid_started invalid_started
) else None, ) else None
)
async with (
maybe_expect_raises(
raises=should_raise,
ensure_in_message=[ ensure_in_message=[
"invalid `Return` payload", f"invalid `{msg_type_str}` msg payload",
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`", f"value: `{bad_value_str}` does not "
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
], ],
), ),
p.open_context( p.open_context(
@ -298,18 +330,35 @@ def test_basic_payload_spec(
assert first.field == 'yo' assert first.field == 'yo'
try: try:
assert (await ctx.result()) is None res: None|PldMsg = await ctx.result(hide_tb=False)
assert res is None
except MsgTypeError as mte: except MsgTypeError as mte:
maybe_mte = mte
if not invalid_return: if not invalid_return:
raise raise
else: # expected this invalid `Return.pld` # expected this invalid `Return.pld` so audit
# the error state + meta-data
assert mte.expected_msg_type is Return
assert mte.cid == ctx.cid assert mte.cid == ctx.cid
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
assert mte.tb_str
# await tractor.pause(shield=True)
# verify expected remote mte deats # verify expected remote mte deats
await tractor.pause() assert ctx._local_error is None
assert ctx._remote_error is mte assert (
assert mte.expected_msg_type is Return mte is
ctx._remote_error is
ctx.maybe_error is
ctx.outcome
)
if should_raise is None:
assert maybe_mte is None
await p.cancel_actor() await p.cancel_actor()

View File

@ -58,9 +58,6 @@ from typing import (
import warnings import warnings
# ------ - ------ # ------ - ------
import trio import trio
from msgspec import (
ValidationError,
)
# ------ - ------ # ------ - ------
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
@ -78,19 +75,16 @@ from .log import (
from .msg import ( from .msg import (
Error, Error,
MsgType, MsgType,
MsgCodec,
NamespacePath, NamespacePath,
PayloadT, PayloadT,
Started, Started,
Stop, Stop,
Yield, Yield,
current_codec,
pretty_struct, pretty_struct,
_ops as msgops, _ops as msgops,
) )
from ._ipc import ( from ._ipc import (
Channel, Channel,
_mk_msg_type_err,
) )
from ._streaming import MsgStream from ._streaming import MsgStream
from ._state import ( from ._state import (
@ -670,7 +664,7 @@ class Context:
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n' f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}: {self._actor.uid}\n\n' f'=> {self.side!r}: {self._actor.uid}\n\n'
f'{error}' f'{error!r}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
@ -724,7 +718,7 @@ class Context:
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
# f'{pformat(self)}\n' # f'{pformat(self)}\n'
f'{error}' f'{error!r}'
) )
if self._canceller is None: if self._canceller is None:
@ -748,26 +742,27 @@ class Context:
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
): ):
if not ( if (
msgerr msgerr
# NOTE: we allow user to config not cancelling the # NOTE: we allow user to config not cancelling the
# local scope on `MsgTypeError`s # local scope on `MsgTypeError`s
and not self._cancel_on_msgerr and
not self._cancel_on_msgerr
): ):
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
self._scope.cancel()
else:
message: str = ( message: str = (
'NOT Cancelling `Context._scope` since,\n' 'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n' f'AND we got a msg-type-error!\n'
f'{error}\n' f'{error}\n'
) )
else:
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
self._scope.cancel()
else: else:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb # from .devx import mk_pdb
@ -1657,54 +1652,21 @@ class Context:
# #
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if validate_pld_spec: if validate_pld_spec:
# __tracebackhide__: bool = False msgops.validate_payload_msg(
codec: MsgCodec = current_codec() pld_msg=started_msg,
msg_bytes: bytes = codec.encode(started_msg) pld_value=value,
try:
roundtripped: Started = codec.decode(msg_bytes)
# pld: PayloadT = await self.pld_rx.recv_pld(
pld: PayloadT = self.pld_rx.dec_msg(
msg=roundtripped,
ipc=self, ipc=self,
expect_msg=Started, strict_pld_parity=strict_pld_parity,
hide_tb=hide_tb, hide_tb=hide_tb,
is_started_send_side=True,
) )
if (
strict_pld_parity
and
pld != value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
started_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
# always show this src frame in the tb
# __tracebackhide__: bool = False
raise _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
) from verr
# TODO: maybe a flag to by-pass encode op if already done # TODO: maybe a flag to by-pass encode op if already done
# here in caller? # here in caller?
await self.chan.send(started_msg) await self.chan.send(started_msg)
# set msg-related internal runtime-state # set msg-related internal runtime-state
self._started_called = True self._started_called: bool = True
self._started_msg = started_msg self._started_msg: Started = started_msg
self._started_pld = value self._started_pld = value
async def _drain_overflows( async def _drain_overflows(
@ -2097,6 +2059,12 @@ async def open_context_from_portal(
if maybe_msgdec: if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec assert maybe_msgdec.pld_spec == pld_spec
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the # XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered # `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will # in `._maybe_cancel_and_set_remote_error()` will
@ -2104,25 +2072,42 @@ async def open_context_from_portal(
# -> it's expected that if there is an error in this phase of # -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try:
started_msg, first = await ctx._pld_rx.recv_msg_w_pld( started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
passthrough_non_pld_msgs=False, passthrough_non_pld_msgs=False,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope
if not ctx_cs.cancel_called:
raise
# from .devx import pause # from .devx import pause
# await pause() # await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure
# we raise the underlying `._remote_error` directly
# instead of bubbling that taskc.
ctx.maybe_raise()
# OW, some other unexpected cancel condition
# that should prolly never happen right?
raise InternalError(
'Invalid cancellation during IPC ctx sync phase?\n'
) from taskc
ctx._started_called: bool = True ctx._started_called: bool = True
ctx._started_msg: bool = started_msg ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first ctx._started_pld: bool = first
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# deliver context instance and .started() msg value # deliver context instance and .started() msg value
# in enter tuple. # in enter tuple.
yield ctx, first yield ctx, first

View File

@ -22,6 +22,7 @@ from __future__ import annotations
import builtins import builtins
import importlib import importlib
from pprint import pformat from pprint import pformat
import sys
from types import ( from types import (
TracebackType, TracebackType,
) )
@ -110,6 +111,7 @@ _body_fields: list[str] = list(
'tb_str', 'tb_str',
'relay_path', 'relay_path',
'cid', 'cid',
'message',
# only ctxc should show it but `Error` does # only ctxc should show it but `Error` does
# have it as an optional field. # have it as an optional field.
@ -236,6 +238,7 @@ class RemoteActorError(Exception):
self._boxed_type: BaseException = boxed_type self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None self._src_type: BaseException|None = None
self._ipc_msg: Error|None = ipc_msg self._ipc_msg: Error|None = ipc_msg
self._extra_msgdata = extra_msgdata
if ( if (
extra_msgdata extra_msgdata
@ -250,8 +253,6 @@ class RemoteActorError(Exception):
k, k,
v, v,
) )
else:
self._extra_msgdata = extra_msgdata
# TODO: mask out eventually or place in `pack_error()` # TODO: mask out eventually or place in `pack_error()`
# pre-`return` lines? # pre-`return` lines?
@ -282,6 +283,17 @@ class RemoteActorError(Exception):
# ensure any roundtripping evals to the input value # ensure any roundtripping evals to the input value
assert self.boxed_type is boxed_type assert self.boxed_type is boxed_type
@property
def message(self) -> str:
'''
Be explicit, instead of trying to read it from the the parent
type's loosely defined `.args: tuple`:
https://docs.python.org/3/library/exceptions.html#BaseException.args
'''
return self._message
@property @property
def ipc_msg(self) -> Struct: def ipc_msg(self) -> Struct:
''' '''
@ -355,8 +367,11 @@ class RemoteActorError(Exception):
''' '''
bt: Type[BaseException] = self.boxed_type bt: Type[BaseException] = self.boxed_type
if bt:
return str(bt.__name__) return str(bt.__name__)
return ''
@property @property
def boxed_type(self) -> Type[BaseException]: def boxed_type(self) -> Type[BaseException]:
''' '''
@ -426,8 +441,7 @@ class RemoteActorError(Exception):
for key in fields: for key in fields:
if ( if (
key == 'relay_uid' key == 'relay_uid' and not self.is_inception()
and not self.is_inception()
): ):
continue continue
@ -504,19 +518,80 @@ class RemoteActorError(Exception):
def pformat( def pformat(
self, self,
with_type_header: bool = True, with_type_header: bool = True,
# with_ascii_box: bool = True,
) -> str: ) -> str:
''' '''
Nicely formatted boxed error meta data + traceback, OR just Format any boxed remote error by multi-line display of,
the normal message from `.args` (for eg. as you'd want shown
by a locally raised `ContextCancelled`). - error's src or relay actor meta-data,
- remote runtime env's traceback,
With optional control over the format of,
- whether the boxed traceback is ascii-decorated with
a surrounding "box" annotating the embedded stack-trace.
- if the error's type name should be added as margins
around the field and tb content like:
`<RemoteActorError(.. <<multi-line-content>> .. )>`
- the placement of the `.message: str` (explicit equiv of
`.args[0]`), either placed below the `.tb_str` or in the
first line's header when the error is raised locally (since
the type name is already implicitly shown by python).
''' '''
header: str = '' header: str = ''
body: str = '' body: str = ''
message: str = ''
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(exc := sys.exception())
and
exc is self
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if with_type_header: if with_type_header:
header: str = f'<{type(self).__name__}(\n' header: str = f'<{type(self).__name__}('
if message := self._message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
if tb_str := self.tb_str: if tb_str := self.tb_str:
fields: str = self._mk_fields_str( fields: str = self._mk_fields_str(
_body_fields _body_fields
@ -535,36 +610,19 @@ class RemoteActorError(Exception):
# |___ .. # |___ ..
tb_body_indent=1, tb_body_indent=1,
) )
if not with_type_header:
body = '\n' + body
elif message := self._message:
# split off the first line so it isn't indented
# the same like the "boxed content".
if not with_type_header:
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
else:
first: str = ''
body: str = (
first
+
message
+
'\n'
)
if with_type_header:
tail: str = ')>'
else:
tail = '' tail = ''
if (
with_type_header
and not message
):
tail: str = '>'
return ( return (
header header
+ +
message
+
f'{body}' f'{body}'
+ +
tail tail
@ -577,7 +635,9 @@ class RemoteActorError(Exception):
# |_ i guess `pexepect` relies on `str`-casing # |_ i guess `pexepect` relies on `str`-casing
# of output? # of output?
def __str__(self) -> str: def __str__(self) -> str:
return self.pformat(with_type_header=False) return self.pformat(
with_type_header=False
)
def unwrap( def unwrap(
self, self,
@ -825,9 +885,6 @@ class MsgTypeError(
extra_msgdata['_bad_msg'] = bad_msg extra_msgdata['_bad_msg'] = bad_msg
extra_msgdata['cid'] = bad_msg.cid extra_msgdata['cid'] = bad_msg.cid
if 'cid' not in extra_msgdata:
import pdbp; pdbp.set_trace()
return cls( return cls(
message=message, message=message,
boxed_type=cls, boxed_type=cls,
@ -889,6 +946,7 @@ def pack_error(
src_uid: tuple[str, str]|None = None, src_uid: tuple[str, str]|None = None,
tb: TracebackType|None = None, tb: TracebackType|None = None,
tb_str: str = '', tb_str: str = '',
message: str = '',
) -> Error: ) -> Error:
''' '''
@ -902,7 +960,7 @@ def pack_error(
tb_str: str = ( tb_str: str = (
''.join(traceback.format_exception(exc)) ''.join(traceback.format_exception(exc))
# TODO: can we remove this is `exc` is required? # TODO: can we remove this since `exc` is required.. right?
or or
# NOTE: this is just a shorthand for the "last error" as # NOTE: this is just a shorthand for the "last error" as
# provided by `sys.exeception()`, see: # provided by `sys.exeception()`, see:
@ -917,8 +975,8 @@ def pack_error(
# when caller provides a tb instance (say pulled from some other # when caller provides a tb instance (say pulled from some other
# src error's `.__traceback__`) we use that as the "boxed" # src error's `.__traceback__`) we use that as the "boxed"
# tb-string instead. # tb-string instead.
if tb:
# https://docs.python.org/3/library/traceback.html#traceback.format_list # https://docs.python.org/3/library/traceback.html#traceback.format_list
if tb:
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
error_msg: dict[ # for IPC error_msg: dict[ # for IPC
@ -961,17 +1019,17 @@ def pack_error(
error_msg['src_type_str'] = type(exc).__name__ error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__ error_msg['boxed_type_str'] = type(exc).__name__
# XXX alawys append us the last relay in error propagation path # XXX always append us the last relay in error propagation path
error_msg.setdefault( error_msg.setdefault(
'relay_path', 'relay_path',
[], [],
).append(our_uid) ).append(our_uid)
# XXX NOTE: always ensure the traceback-str is from the # XXX NOTE XXX always ensure the traceback-str content is from
# locally raised error (**not** the prior relay's boxed # the locally raised error (so, NOT the prior relay's boxed
# content's in `._ipc_msg.tb_str`). # `._ipc_msg.tb_str`).
error_msg['tb_str'] = tb_str error_msg['tb_str'] = tb_str
error_msg['message'] = message or getattr(exc, 'message', '')
if cid is not None: if cid is not None:
error_msg['cid'] = cid error_msg['cid'] = cid
@ -995,26 +1053,24 @@ def unpack_error(
if not isinstance(msg, Error): if not isinstance(msg, Error):
return None return None
# retrieve the remote error's msg-encoded details
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
+
tb_str
)
# try to lookup a suitable error type from the local runtime # try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance. # env then use it to construct a local instance.
# boxed_type_str: str = error_dict['boxed_type_str'] # boxed_type_str: str = error_dict['boxed_type_str']
boxed_type_str: str = msg.boxed_type_str boxed_type_str: str = msg.boxed_type_str
boxed_type: Type[BaseException] = get_err_type(boxed_type_str) boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
if boxed_type_str == 'ContextCancelled': # retrieve the error's msg-encoded remotoe-env info
box_type = ContextCancelled message: str = f'remote task raised a {msg.boxed_type_str!r}\n'
assert boxed_type is box_type
elif boxed_type_str == 'MsgTypeError': # TODO: do we even really need these checks for RAEs?
box_type = MsgTypeError if boxed_type_str in [
'ContextCancelled',
'MsgTypeError',
]:
box_type = {
'ContextCancelled': ContextCancelled,
'MsgTypeError': MsgTypeError,
}[boxed_type_str]
assert boxed_type is box_type assert boxed_type is box_type
# TODO: already included by `_this_mod` in else loop right? # TODO: already included by `_this_mod` in else loop right?
@ -1029,19 +1085,21 @@ def unpack_error(
exc = box_type( exc = box_type(
message, message,
ipc_msg=msg, ipc_msg=msg,
tb_str=msg.tb_str,
) )
return exc return exc
def is_multi_cancelled(exc: BaseException) -> bool: def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup
) -> bool:
''' '''
Predicate to determine if a possible ``BaseExceptionGroup`` contains Predicate to determine if a possible ``BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks. cancelling a collection of subtasks.
''' '''
# if isinstance(exc, eg.BaseExceptionGroup):
if isinstance(exc, BaseExceptionGroup): if isinstance(exc, BaseExceptionGroup):
return exc.subgroup( return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled) lambda exc: isinstance(exc, trio.Cancelled)
@ -1109,6 +1167,7 @@ def _raise_from_unexpected_msg(
msg, msg,
ctx.chan, ctx.chan,
) )
ctx._maybe_cancel_and_set_remote_error(exc)
raise exc from src_err raise exc from src_err
# `MsgStream` termination msg. # `MsgStream` termination msg.
@ -1183,7 +1242,6 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None, src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None, src_type_error: TypeError|None = None,
is_invalid_payload: bool = False, is_invalid_payload: bool = False,
# src_err_msg: Error|None = None,
**mte_kwargs, **mte_kwargs,
@ -1250,19 +1308,11 @@ def _mk_msg_type_err(
msg_type: str = type(msg) msg_type: str = type(msg)
any_pld: Any = msgpack.decode(msg.pld) any_pld: Any = msgpack.decode(msg.pld)
message: str = ( message: str = (
f'invalid `{msg_type.__qualname__}` payload\n\n' f'invalid `{msg_type.__qualname__}` msg payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: ' #\n' f'value: `{any_pld!r}` does not match type-spec: '
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
# f'<{type(msg).__qualname__}(\n'
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
# f')>\n\n'
) )
# src_err_msg = msg
bad_msg = msg bad_msg = msg
# TODO: should we just decode the msg to a dict despite
# only the payload being wrong?
# -[ ] maybe the better design is to break this construct
# logic into a separate explicit helper raiser-func?
else: else:
# decode the msg-bytes using the std msgpack # decode the msg-bytes using the std msgpack
@ -1307,20 +1357,20 @@ def _mk_msg_type_err(
if verb_header: if verb_header:
message = f'{verb_header} ' + message message = f'{verb_header} ' + message
# if not isinstance(bad_msg, PayloadMsg):
# import pdbp; pdbp.set_trace()
msgtyperr = MsgTypeError.from_decode( msgtyperr = MsgTypeError.from_decode(
message=message, message=message,
bad_msg=bad_msg, bad_msg=bad_msg,
bad_msg_as_dict=msg_dict, bad_msg_as_dict=msg_dict,
# NOTE: for the send-side `.started()` pld-validate # NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
# case we actually set the `._ipc_msg` AFTER we return # - for the send-side `.started()` pld-validate
# from here inside `Context.started()` since we actually # case we actually raise inline so we don't need to
# want to emulate the `Error` from the mte we build here # set the it at all.
# Bo # - for recv side we set it inside `PldRx.decode_pld()`
# so by default in that case this is set to `None` # after a manual call to `pack_error()` since we
# actually want to emulate the `Error` from the mte we
# build here. So by default in that case, this is left
# as `None` here.
# ipc_msg=src_err_msg, # ipc_msg=src_err_msg,
) )
msgtyperr.__cause__ = src_validation_error msgtyperr.__cause__ = src_validation_error

View File

@ -291,7 +291,7 @@ class MsgpackTCPStream(MsgTransport):
async def send( async def send(
self, self,
msg: msgtypes.Msg, msg: msgtypes.MsgType,
strict_types: bool = True, strict_types: bool = True,
# hide_tb: bool = False, # hide_tb: bool = False,

View File

@ -64,6 +64,7 @@ from .log import get_logger
from .msg import ( from .msg import (
current_codec, current_codec,
MsgCodec, MsgCodec,
PayloadT,
NamespacePath, NamespacePath,
pretty_struct, pretty_struct,
) )
@ -98,7 +99,7 @@ async def _invoke_non_context(
treat_as_gen: bool, treat_as_gen: bool,
is_rpc: bool, is_rpc: bool,
return_msg: Return|CancelAck = Return, return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -220,7 +221,7 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
ret_msg = return_msg( ret_msg = return_msg_type(
cid=cid, cid=cid,
pld=result, pld=result,
) )
@ -392,16 +393,22 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
log.warning( log.warning(
'RPC task likely errored or cancelled before start?' 'RPC task likely errored or cancelled before start?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
else:
log.cancel(
'Failed to de-alloc internal runtime cancel task?\n'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n' f' >> {ctx.repr_rpc}\n'
) )
# TODO: remove this right? rn the only non-`is_rpc` cases
# are cancellation methods and according the RPC loop eps
# for thoses below, nothing is ever registered in
# `Actor._rpc_tasks` for those cases.. but should we?
#
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
# else:
# log.cancel(
# 'Failed to de-alloc internal runtime cancel task?\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# )
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
@ -419,7 +426,7 @@ async def _invoke(
is_rpc: bool = True, is_rpc: bool = True,
hide_tb: bool = True, hide_tb: bool = True,
return_msg: Return|CancelAck = Return, return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -533,7 +540,7 @@ async def _invoke(
kwargs, kwargs,
treat_as_gen, treat_as_gen,
is_rpc, is_rpc,
return_msg, return_msg_type,
task_status, task_status,
) )
# XXX below fallthrough is ONLY for `@context` eps # XXX below fallthrough is ONLY for `@context` eps
@ -593,18 +600,21 @@ async def _invoke(
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our # TODO: better `trionics` tooling:
# `TaskMngr` nursery here! # -[ ] should would be nice to have our `TaskMngr`
res: Any = await coro # nursery here!
ctx._result = res # -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage
# deliver final result to caller side. # here in the child task instead of waiting for the
await chan.send( # parent to crash with it's own MTE..
return_msg( res: Any|PayloadT = await coro
return_msg: Return|CancelAck = return_msg_type(
cid=cid, cid=cid,
pld=res, pld=res,
) )
) # set and shuttle final result to "parent"-side task.
ctx._result = res
await chan.send(return_msg)
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
@ -940,7 +950,7 @@ async def process_messages(
actor.cancel, actor.cancel,
kwargs, kwargs,
is_rpc=False, is_rpc=False,
return_msg=CancelAck, return_msg_type=CancelAck,
) )
log.runtime( log.runtime(
@ -974,7 +984,7 @@ async def process_messages(
actor._cancel_task, actor._cancel_task,
kwargs, kwargs,
is_rpc=False, is_rpc=False,
return_msg=CancelAck, return_msg_type=CancelAck,
) )
except BaseException: except BaseException:
log.exception( log.exception(

View File

@ -1256,9 +1256,10 @@ class Actor:
# - child returns a result before cancel-msg/ctxc-raised # - child returns a result before cancel-msg/ctxc-raised
# - child self raises ctxc before parent send request, # - child self raises ctxc before parent send request,
# - child errors prior to cancel req. # - child errors prior to cancel req.
log.cancel( log.runtime(
'Cancel request invalid, RPC task already completed?\n\n' 'Cancel request for invalid RPC task.\n'
f'<= canceller: {requesting_uid}\n\n' 'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_uid}\n'
f'=> {cid}@{parent_chan.uid}\n' f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n' f' |_{parent_chan}\n'
) )

View File

@ -140,7 +140,7 @@ class MsgDec(Struct):
# * also a `.__contains__()` for doing `None in # * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on # TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions.. # `.__args__` for unions..
# - `MsgSpec: Union[Type[Msg]] # - `MsgSpec: Union[MsgType]
# #
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo # -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params # |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
@ -188,7 +188,7 @@ def mk_dec(
return MsgDec( return MsgDec(
_dec=msgpack.Decoder( _dec=msgpack.Decoder(
type=spec, # like `Msg[Any]` type=spec, # like `MsgType[Any]`
dec_hook=dec_hook, dec_hook=dec_hook,
) )
) )
@ -561,7 +561,7 @@ def mk_codec(
''' '''
# (manually) generate a msg-payload-spec for all relevant # (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` # god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP # for the decoder such that all sub-type msgs in our SCIPP
# will automatically decode to a type-"limited" payload (`Struct`) # will automatically decode to a type-"limited" payload (`Struct`)
# object (set). # object (set).
@ -607,7 +607,7 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# The built-in IPC `Msg` spec. # The built-in IPC `Msg` spec.
# Our composing "shuttle" protocol which allows `tractor`-app code # Our composing "shuttle" protocol which allows `tractor`-app code
# to use any `msgspec` supported type as the `Msg.pld` payload, # to use any `msgspec` supported type as the `PayloadMsg.pld` payload,
# https://jcristharif.com/msgspec/supported-types.html # https://jcristharif.com/msgspec/supported-types.html
# #
_def_tractor_codec: MsgCodec = mk_codec( _def_tractor_codec: MsgCodec = mk_codec(
@ -743,7 +743,7 @@ def limit_msg_spec(
) -> MsgCodec: ) -> MsgCodec:
''' '''
Apply a `MsgCodec` that will natively decode the SC-msg set's Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using `PayloadMsg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types` tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`. for all IPC contexts in use by the current `trio.Task`.

View File

@ -53,6 +53,8 @@ from tractor._state import current_ipc_ctx
from ._codec import ( from ._codec import (
mk_dec, mk_dec,
MsgDec, MsgDec,
MsgCodec,
current_codec,
) )
from .types import ( from .types import (
CancelAck, CancelAck,
@ -213,6 +215,9 @@ class PldRx(Struct):
**dec_msg_kwargs, **dec_msg_kwargs,
) )
# TODO: rename to,
# -[ ] `.decode_pld()`?
# -[ ] `.dec_pld()`?
def dec_msg( def dec_msg(
self, self,
msg: MsgType, msg: MsgType,
@ -246,8 +251,8 @@ class PldRx(Struct):
pld: PayloadT = self._pld_dec.decode(pld) pld: PayloadT = self._pld_dec.decode(pld)
log.runtime( log.runtime(
'Decoded msg payload\n\n' 'Decoded msg payload\n\n'
f'{msg}\n\n' f'{msg}\n'
f'where payload is\n' f'where payload decoded as\n'
f'|_pld={pld!r}\n' f'|_pld={pld!r}\n'
) )
return pld return pld
@ -263,13 +268,29 @@ class PldRx(Struct):
src_validation_error=valerr, src_validation_error=valerr,
is_invalid_payload=True, is_invalid_payload=True,
expected_msg=expect_msg, expected_msg=expect_msg,
# ipc_msg=msg,
) )
# NOTE: override the `msg` passed to # NOTE: just raise the MTE inline instead of all
# `_raise_from_unexpected_msg()` (below) so so that # the pack-unpack-repack non-sense when this is
# we're effectively able to use that same func to # a "send side" validation error.
# unpack and raise an "emulated remote `Error`" of if is_started_send_side:
# this local MTE. raise mte
# XXX TODO: remove this right?
# => any bad stated/return values should
# always be treated a remote errors right?
#
# if (
# expect_msg is Return
# or expect_msg is Started
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# NOTE: the `.message` is automatically
# transferred into the message as long as we
# define it as a `Error.message` field.
err_msg: Error = pack_error( err_msg: Error = pack_error(
exc=mte, exc=mte,
cid=msg.cid, cid=msg.cid,
@ -279,36 +300,38 @@ class PldRx(Struct):
else ipc._actor.uid else ipc._actor.uid
), ),
# tb=valerr.__traceback__, # tb=valerr.__traceback__,
tb_str=mte._message, # tb_str=mte._message,
# message=mte._message,
) )
# ^-TODO-^ just raise this inline instead of all the
# pack-unpack-repack non-sense!
mte._ipc_msg = err_msg mte._ipc_msg = err_msg
msg = err_msg
# set emulated remote error more-or-less as the # XXX override the `msg` passed to
# runtime would # `_raise_from_unexpected_msg()` (below) so so
ctx: Context = getattr(ipc, 'ctx', ipc) # that we're effectively able to use that same
# func to unpack and raise an "emulated remote
# `Error`" of this local MTE.
msg = err_msg
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises
# it from the above caught interchange-lib
# validation error.
src_err = valerr
# TODO: should we instead make this explicit and # TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`, # use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set # expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly # it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y.. # edge-case-y..
if ( # TODO: remove this since it's been added to
expect_msg is not Started # `_raise_from_unexpected_msg()`..?
and not is_started_send_side # if (
): # expect_msg is not Started
ctx._maybe_cancel_and_set_remote_error(mte) # and not is_started_send_side
# ):
# XXX NOTE: so when the `_raise_from_unexpected_msg()` # # set emulated remote error more-or-less as the
# raises the boxed `err_msg` from above it raises # # runtime would
# it from `None`. # ctx: Context = getattr(ipc, 'ctx', ipc)
src_err = valerr # ctx._maybe_cancel_and_set_remote_error(mte)
# if is_started_send_side:
# src_err = None
# XXX some other decoder specific failure? # XXX some other decoder specific failure?
# except TypeError as src_error: # except TypeError as src_error:
@ -559,6 +582,7 @@ async def drain_to_final_msg(
ipc=ctx, ipc=ctx,
expect_msg=Return, expect_msg=Return,
raise_error=False, raise_error=False,
hide_tb=hide_tb,
) )
# ^-TODO-^ some bad ideas? # ^-TODO-^ some bad ideas?
# -[ ] wrap final outcome .receive() in a scope so # -[ ] wrap final outcome .receive() in a scope so
@ -737,9 +761,61 @@ async def drain_to_final_msg(
) )
# TODO: factor logic from `.Context.started()` for send-side
# validate raising!
def validate_payload_msg( def validate_payload_msg(
msg: Started|Yield|Return, pld_msg: Started|Yield|Return,
pld_value: PayloadT,
ipc: Context|MsgStream,
raise_mte: bool = True,
strict_pld_parity: bool = False,
hide_tb: bool = True,
) -> MsgTypeError|None: ) -> MsgTypeError|None:
... '''
Validate a `PayloadMsg.pld` value with the current
IPC ctx's `PldRx` and raise an appropriate `MsgTypeError`
on failure.
'''
__tracebackhide__: bool = hide_tb
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(pld_msg)
try:
roundtripped: Started = codec.decode(msg_bytes)
ctx: Context = getattr(ipc, 'ctx', ipc)
pld: PayloadT = ctx.pld_rx.dec_msg(
msg=roundtripped,
ipc=ipc,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_pld_parity
and
pld != pld_value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
pld_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
mte: MsgTypeError = _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
)
if not raise_mte:
return mte
raise mte from verr

View File

@ -89,11 +89,12 @@ class PayloadMsg(
# -[ ] `uuid.UUID` which has multi-protocol support # -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid # https://jcristharif.com/msgspec/supported-types.html#uuid
# The msgs "payload" (spelled without vowels): # The msg's "payload" (spelled without vowels):
# https://en.wikipedia.org/wiki/Payload_(computing) # https://en.wikipedia.org/wiki/Payload_(computing)
# pld: Raw
# NOTE: inherited from any `Msg` (and maybe overriden
# by use of `limit_msg_spec()`), but by default is # ^-NOTE-^ inherited from any `PayloadMsg` (and maybe type
# overriden via the `._ops.limit_plds()` API), but by default is
# parameterized to be `Any`. # parameterized to be `Any`.
# #
# XXX this `Union` must strictly NOT contain `Any` if # XXX this `Union` must strictly NOT contain `Any` if
@ -106,7 +107,6 @@ class PayloadMsg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders # TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization # approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below. # approach as take by `mk_msg_spec()` below.
pld: Raw
# TODO: complete rename # TODO: complete rename
@ -410,21 +410,32 @@ class Error(
src_type_str: str src_type_str: str
boxed_type_str: str boxed_type_str: str
relay_path: list[tuple[str, str]] relay_path: list[tuple[str, str]]
tb_str: str
cid: str|None = None # normally either both are provided or just
# a message for certain special cases where
# we pack a message for a locally raised
# mte or ctxc.
message: str|None = None
tb_str: str = ''
# TODO: use UNSET or don't include them via # TODO: only optionally include sub-type specfic fields?
# -[ ] use UNSET or don't include them via `omit_defaults` (see
# inheritance-line options above)
# #
# `ContextCancelled` # `ContextCancelled` reports the src cancelling `Actor.uid`
canceller: tuple[str, str]|None = None canceller: tuple[str, str]|None = None
# `StreamOverrun` # `StreamOverrun`-specific src `Actor.uid`
sender: tuple[str, str]|None = None sender: tuple[str, str]|None = None
# for the `MsgTypeError` case where the receiver side # `MsgTypeError` meta-data
# decodes the underlying original `Msg`-subtype cid: str|None = None
_msg_dict: dict|None = None # when the receiver side fails to decode a delivered
# `PayloadMsg`-subtype; one and/or both the msg-struct instance
# and `Any`-decoded to `dict` of the msg are set and relayed
# (back to the sender) for introspection.
_bad_msg: Started|Yield|Return|None = None
_bad_msg_as_dict: dict|None = None
def from_dict_msg( def from_dict_msg(
@ -436,9 +447,11 @@ def from_dict_msg(
) -> MsgType: ) -> MsgType:
''' '''
Helper to build a specific `MsgType` struct from Helper to build a specific `MsgType` struct from a "vanilla"
a "vanilla" decoded `dict`-ified equivalent of the decoded `dict`-ified equivalent of the msg: i.e. if the
msg: i.e. if the `msgpack.Decoder.type == Any`. `msgpack.Decoder.type == Any`, the default when using
`msgspec.msgpack` and not "typed decoding" using
`msgspec.Struct`.
''' '''
msg_type_tag_field: str = ( msg_type_tag_field: str = (