Compare commits

..

16 Commits

Author SHA1 Message Date
Tyler Goodlet 0e8c60ee4a Better RAE `.pformat()`-ing for send-side MTEs
Send-side `MsgTypeError`s actually shouldn't have any "boxed" traceback
per say since they're raised in the transmitting actor's local task env
and we (normally) don't want the ascii decoration added around the
error's `._message: str`, that is not until the exc is `pack_error()`-ed
before transit. As such, the presentation of an embedded traceback (and
its ascii box) gets bypassed when only a `._message: str` is set (as we
now do for pld-spec failures in `_mk_msg_type_err()`).

Further this tweaks the `.pformat()` output to include the `._message`
part to look like `<RemoteActorError( <._message> ) ..` instead of
jamming it implicitly to the end of the embedded `.tb_str` (as was done
implicitly by `unpack_error()`) and also adds better handling for the
`with_type_header == False` case including forcing that case when we
detect that the currently handled exc is the RAE in `.pformat()`.
Toss in a lengthier doc-str explaining it all.

Surrounding/supporting changes,
- better `unpack_error()` message which just briefly reports the remote
  task's error type.
- add public `.message: str` prop.
- always set a `._extra_msgdata: dict` since some MTE props rely on it.
- handle `.boxed_type == None` for `.boxed_type_str`.
- maybe pack any detected input or `exc.message` in `pack_error()`.
- comment cruft cleanup in `_mk_msg_type_err()`.
2024-05-30 10:25:04 -04:00
Tyler Goodlet 1db5d4def2 Add `Error.message: str`
Allows passing a custom error msg other then the traceback-str over the
wire. Make `.tb_str` optional (in the blank `''` sense) since it's
treated that way thus far in `._exceptions.pack_error()`.
2024-05-30 09:14:04 -04:00
Tyler Goodlet 6e54abc56d Fix missing newline in task-cancel log-message 2024-05-30 09:06:10 -04:00
Tyler Goodlet 28af4749cc Don't need to pack an `Error` with send-side MTEs 2024-05-30 09:05:23 -04:00
Tyler Goodlet 02a7c7c276 Ensure only a boxed traceback for MTE on parent side 2024-05-30 01:11:29 -04:00
Tyler Goodlet 4fa71cc01c Ensure ctx error-state matches the MTE scenario
Namely checking that `Context._remote_error` is set to the raised MTE
in the invalid started and return value cases since prior to the recent
underlying changes to the `Context.result()` impl, it would not match.

Further,
- do asserts for non-MTE raising cases in both the parent and child.
- add todos for testing ctx-outcomes for per-side-validation policies
  i anticipate supporting and implied msg-dialog race cases therein.
2024-05-28 20:07:48 -04:00
Tyler Goodlet 6a4ee461f5 Raise remote errors rxed during `Context` child-sync
More specifically, if `.open_context()` is cancelled when awaiting the
first `Context.started()` during the child task sync phase, check to see
if it was due to `._scope.cancel_called` and raise any remote error via
`.maybe_raise()` instead the `trio.Cancelled` like in every other
remote-error handling case. Ensure we set `._scope[_nursery]` only after
the `Started` has arrived and audited.
2024-05-28 16:11:01 -04:00
Tyler Goodlet 2db03444f7 Don't (noisly) log about runtime cancel RPC tasks
Since in the case of the `Actor._cancel_task()` related runtime eps we
actually don't EVER register them in `Actor._rpc_tasks`.. logging about
them is just needless noise, though maybe we should track them in a diff
table; something like a `._runtime_rpc_tasks`?

Drop the cancel-request-for-stale-RPC-task (`KeyError` case in
`Actor._cancel_task()`) log-emit level in to `.runtime()`; it's
generally not useful info other then for granular race condition eval
when hacking the runtime.
2024-05-28 16:03:36 -04:00
Tyler Goodlet a1b124b62b Raise send-side MTEs inline in `PldRx.dec_msg()`
So when `is_started_send_side is True` we raise the newly created
`MsgTypeError` (MTE) directly instead of doing all the `Error`-msg pack
and unpack to raise stuff via `_raise_from_unexpected_msg()` since the
raise should happen send side anyway and so doesn't emulate any remote
fault like in a bad `Return` or `Started` without send-side pld-spec
validation.

Oh, and proxy-through the `hide_tb: bool` input from `.drain_to_final_msg()`
to `.recv_msg_w_pld()`.
2024-05-28 16:02:51 -04:00
Tyler Goodlet 59ca256183 Set remote errors in `_raise_from_unexpected_msg()`
By calling `Context._maybe_cancel_and_set_remote_error(exc)` on any
unpacked `Error` msg; provides for `Context.maybe_error` consistency to
match all other error delivery cases.
2024-05-28 15:48:01 -04:00
Tyler Goodlet 6c2efc96dc Factor `.started()` validation into `.msg._ops`
Filling out the helper `validate_payload_msg()` staged in a prior commit
and adjusting all imports to match.

Also add a `raise_mte: bool` flag for potential usage where the caller
wants to handle the MTE instance themselves.
2024-05-28 11:08:27 -04:00
Tyler Goodlet f7fd8278af Fix `test_basic_payload_spec` bad msg matching
Expecting `Started` or `Return` with respective bad `.pld` values
depending on what type of failure is test parametrized.

This makes the suite run green it seems B)
2024-05-28 11:05:46 -04:00
Tyler Goodlet 7ac730e326 Drop `msg.types.Msg` for new replacement types
The `TypeAlias` for the msg type-group is now `MsgType` and any user
touching shuttle messages can now be typed as `PayloadMsg`.

Relatedly, add MTE specific `Error._bad_msg[_as_dict]` fields which are
handy for introspection of remote decode failures.
2024-05-28 09:55:16 -04:00
Tyler Goodlet 582144830f Parameterize the `return_msg_type` in `._invoke()`
Since we also handle a runtime-specific `CancelAck`, allow the
caller-scheduler to pass in the expected return-type msg per the RPC msg
endpoint loop.
2024-05-28 09:36:26 -04:00
Tyler Goodlet 8b860f4245 Move `.devx` related deps to `dev` group 2024-05-28 09:34:08 -04:00
Tyler Goodlet 27fd96729a Tweaks to debugger examples
Light stuff like comments, typing, and a couple API usage updates.
2024-05-28 09:22:59 -04:00
16 changed files with 522 additions and 312 deletions

View File

@ -4,9 +4,15 @@ import trio
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
yield 'yo'
await tractor.breakpoint()
try:
while True:
yield 'yo'
await tractor.breakpoint()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'
)
raise
async def name_error():
@ -19,7 +25,7 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='error',
loglevel='cancel',
) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -45,6 +45,7 @@ async def spawn_until(depth=0):
)
# TODO: notes on the new boxed-relayed errors through proxy actors
async def main():
"""The main ``tractor`` routine.

View File

@ -23,5 +23,6 @@ async def main():
n.start_soon(debug_actor.run, die)
n.start_soon(crash_boi.run, die)
if __name__ == '__main__':
trio.run(main)

View File

@ -2,10 +2,13 @@ import trio
import tractor
async def main():
async def main(
registry_addrs: tuple[str, int]|None = None
):
async with tractor.open_root_actor(
debug_mode=True,
# loglevel='runtime',
):
while True:
await tractor.breakpoint()

View File

@ -3,16 +3,26 @@ import tractor
async def name_error():
getattr(doggypants)
getattr(doggypants) # noqa (on purpose)
async def main():
async with tractor.open_nursery(
debug_mode=True,
) as n:
# loglevel='transport',
) as an:
portal = await n.run_in_actor(name_error)
await portal.result()
# TODO: ideally the REPL arrives at this frame in the parent,
# ABOVE the @api_frame of `Portal.run_in_actor()` (which
# should eventually not even be a portal method ... XD)
# await tractor.pause()
p: tractor.Portal = await an.run_in_actor(name_error)
# with this style, should raise on this line
await p.result()
# with this alt style should raise at `open_nusery()`
# return await p.result()
if __name__ == '__main__':

View File

@ -7,7 +7,7 @@ def sync_pause(
error: bool = False,
):
if use_builtin:
breakpoint()
breakpoint(hide_tb=False)
else:
tractor.pause_from_sync()
@ -20,18 +20,20 @@ def sync_pause(
async def start_n_sync_pause(
ctx: tractor.Context,
):
# sync to requesting peer
actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
print(f'entering SYNC PAUSE in {actor.uid}')
sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}')
async def main() -> None:
async with tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
) as an:

View File

@ -39,8 +39,7 @@ msgspec='^0.18.5' # interchange
wrapt = "^1.16.0" # decorators
colorlog = "^6.8.2" # logging
# .devx tooling
stackscope = "^0.2.2"
# built-in multi-actor `pdb` REPL
pdbp = "^1.5.0"
@ -49,15 +48,19 @@ pdbp = "^1.5.0"
# 'pyroute2
# ------ - ------
xontrib-vox = "^0.0.1"
[tool.poetry.group.dev]
optional = false
[tool.poetry.group.dev.dependencies]
# testing
pytest = "^8.2.0"
pexpect = "^4.9.0"
# only for xonsh as sh..
# .devx tooling
greenback = "^1.2.1"
stackscope = "^0.2.2"
# (light) xonsh usage/integration
xontrib-vox = "^0.0.1"
prompt-toolkit = "^3.0.43"
xonsh-vox-tabcomplete = "^0.5"

View File

@ -6,30 +6,19 @@ related settings around IPC contexts.
'''
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
# import typing
from typing import (
# Any,
TypeAlias,
# Union,
)
from contextvars import (
Context,
)
from msgspec import (
# structs,
# msgpack,
Struct,
# ValidationError,
)
import pytest
import trio
import tractor
from tractor import (
# _state,
MsgTypeError,
current_ipc_ctx,
Portal,
@ -40,20 +29,9 @@ from tractor.msg import (
)
from tractor.msg import (
_codec,
# _ctxvar_MsgCodec,
# NamespacePath,
# MsgCodec,
# mk_codec,
# apply_codec,
# current_codec,
)
from tractor.msg.types import (
log,
# _payload_msgs,
# PayloadMsg,
# Started,
# mk_msg_spec,
)
@ -64,23 +42,10 @@ class PldMsg(Struct):
maybe_msg_spec = PldMsg|None
@cm
def custom_spec(
ctx: Context,
spec: TypeAlias,
) -> _codec.MsgCodec:
'''
Apply a custom payload spec, remove on exit.
'''
rx: msgops.PldRx = ctx._pld_rx
@acm
async def maybe_expect_raises(
raises: BaseException|None = None,
ensure_in_message: list[str]|None = None,
reraise: bool = False,
timeout: int = 3,
) -> None:
@ -88,6 +53,9 @@ async def maybe_expect_raises(
Async wrapper for ensuring errors propagate from the inner scope.
'''
if tractor._state.debug_mode():
timeout += 999
with trio.fail_after(timeout):
try:
yield
@ -103,9 +71,10 @@ async def maybe_expect_raises(
# maybe check for error txt content
if ensure_in_message:
part: str
err_repr: str = repr(inner_err)
for part in ensure_in_message:
for i, arg in enumerate(inner_err.args):
if part in arg:
if part in err_repr:
break
# if part never matches an arg, then we're
# missing a match.
@ -132,7 +101,7 @@ async def child(
ctx: Context,
started_value: int|PldMsg|None,
return_value: str|None,
validate_pld_spec: bool,
validate_pld_spec: bool,
raise_on_started_mte: bool = True,
) -> None:
@ -166,13 +135,15 @@ async def child(
# 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation
mte: MsgTypeError|None = None
try:
await ctx.started(
value=started_value,
validate_pld_spec=validate_pld_spec,
)
except MsgTypeError:
except MsgTypeError as _mte:
mte = _mte
log.exception('started()` raised an MTE!\n')
if not expect_started_mte:
raise RuntimeError(
@ -180,15 +151,60 @@ async def child(
f'{started_value!r}\n'
)
boxed_div: str = '------ - ------'
assert boxed_div not in mte._message
assert boxed_div not in mte.tb_str
assert boxed_div not in repr(mte)
assert boxed_div not in str(mte)
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
# since this is a *local error* there should be no
# boxed traceback content!
assert not mte.tb_str
# propagate to parent?
if raise_on_started_mte:
raise
else:
if expect_started_mte:
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
# no-send-side-error fallthrough
if (
validate_pld_spec
and
expect_started_mte
):
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
assert (
not expect_started_mte
or
not validate_pld_spec
)
# if wait_for_parent_to_cancel:
# ...
#
# ^-TODO-^ logic for diff validation policies on each side:
#
# -[ ] ensure that if we don't validate on the send
# side, that we are eventually error-cancelled by our
# parent due to the bad `Started` payload!
# -[ ] the boxed error should be srced from the parent's
# runtime NOT ours!
# -[ ] we should still error on bad `return_value`s
# despite the parent not yet error-cancelling us?
# |_ how do we want the parent side to look in that
# case?
# -[ ] maybe the equiv of "during handling of the
# above error another occurred" for the case where
# the parent sends a MTE to this child and while
# waiting for the child to terminate it gets back
# the MTE for this case?
#
# XXX should always fail on recv side since we can't
# really do much else beside terminate and relay the
@ -211,8 +227,8 @@ async def child(
@pytest.mark.parametrize(
'return_value',
[
None,
'yo',
None,
],
ids=[
'return[invalid-"yo"]',
@ -271,16 +287,32 @@ def test_basic_payload_spec(
# since not opened yet.
assert current_ipc_ctx() is None
if invalid_started:
msg_type_str: str = 'Started'
bad_value_str: str = '10'
elif invalid_return:
msg_type_str: str = 'Return'
bad_value_str: str = "'yo'"
else:
# XXX but should never be used below then..
msg_type_str: str = ''
bad_value_str: str = ''
maybe_mte: MsgTypeError|None = None
should_raise: Exception|None = (
MsgTypeError if (
invalid_return
or
invalid_started
) else None
)
async with (
maybe_expect_raises(
raises=MsgTypeError if (
invalid_return
or
invalid_started
) else None,
raises=should_raise,
ensure_in_message=[
"invalid `Return` payload",
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
f"invalid `{msg_type_str}` msg payload",
f"value: `{bad_value_str}` does not "
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
],
),
p.open_context(
@ -298,18 +330,35 @@ def test_basic_payload_spec(
assert first.field == 'yo'
try:
assert (await ctx.result()) is None
res: None|PldMsg = await ctx.result(hide_tb=False)
assert res is None
except MsgTypeError as mte:
maybe_mte = mte
if not invalid_return:
raise
else: # expected this invalid `Return.pld`
assert mte.cid == ctx.cid
# expected this invalid `Return.pld` so audit
# the error state + meta-data
assert mte.expected_msg_type is Return
assert mte.cid == ctx.cid
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
# verify expected remote mte deats
await tractor.pause()
assert ctx._remote_error is mte
assert mte.expected_msg_type is Return
assert mte.tb_str
# await tractor.pause(shield=True)
# verify expected remote mte deats
assert ctx._local_error is None
assert (
mte is
ctx._remote_error is
ctx.maybe_error is
ctx.outcome
)
if should_raise is None:
assert maybe_mte is None
await p.cancel_actor()

View File

@ -58,9 +58,6 @@ from typing import (
import warnings
# ------ - ------
import trio
from msgspec import (
ValidationError,
)
# ------ - ------
from ._exceptions import (
ContextCancelled,
@ -78,19 +75,16 @@ from .log import (
from .msg import (
Error,
MsgType,
MsgCodec,
NamespacePath,
PayloadT,
Started,
Stop,
Yield,
current_codec,
pretty_struct,
_ops as msgops,
)
from ._ipc import (
Channel,
_mk_msg_type_err,
)
from ._streaming import MsgStream
from ._state import (
@ -670,7 +664,7 @@ class Context:
'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}: {self._actor.uid}\n\n'
f'{error}'
f'{error!r}'
)
self._remote_error: BaseException = error
@ -724,7 +718,7 @@ class Context:
log.error(
f'Remote context error:\n\n'
# f'{pformat(self)}\n'
f'{error}'
f'{error!r}'
)
if self._canceller is None:
@ -748,26 +742,27 @@ class Context:
and not cs.cancel_called
and not cs.cancelled_caught
):
if not (
if (
msgerr
# NOTE: we allow user to config not cancelling the
# local scope on `MsgTypeError`s
and not self._cancel_on_msgerr
and
not self._cancel_on_msgerr
):
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
self._scope.cancel()
else:
message: str = (
'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
)
else:
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
self._scope.cancel()
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
@ -1657,54 +1652,21 @@ class Context:
#
__tracebackhide__: bool = hide_tb
if validate_pld_spec:
# __tracebackhide__: bool = False
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(started_msg)
try:
roundtripped: Started = codec.decode(msg_bytes)
# pld: PayloadT = await self.pld_rx.recv_pld(
pld: PayloadT = self.pld_rx.dec_msg(
msg=roundtripped,
ipc=self,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_pld_parity
and
pld != value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
started_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
# always show this src frame in the tb
# __tracebackhide__: bool = False
raise _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
) from verr
msgops.validate_payload_msg(
pld_msg=started_msg,
pld_value=value,
ipc=self,
strict_pld_parity=strict_pld_parity,
hide_tb=hide_tb,
)
# TODO: maybe a flag to by-pass encode op if already done
# here in caller?
await self.chan.send(started_msg)
# set msg-related internal runtime-state
self._started_called = True
self._started_msg = started_msg
self._started_called: bool = True
self._started_msg: Started = started_msg
self._started_pld = value
async def _drain_overflows(
@ -2097,6 +2059,12 @@ async def open_context_from_portal(
if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
@ -2104,25 +2072,42 @@ async def open_context_from_portal(
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
hide_tb=hide_tb,
)
try:
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
hide_tb=hide_tb,
)
except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope
if not ctx_cs.cancel_called:
raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure
# we raise the underlying `._remote_error` directly
# instead of bubbling that taskc.
ctx.maybe_raise()
# OW, some other unexpected cancel condition
# that should prolly never happen right?
raise InternalError(
'Invalid cancellation during IPC ctx sync phase?\n'
) from taskc
# from .devx import pause
# await pause()
ctx._started_called: bool = True
ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
yield ctx, first

View File

@ -22,6 +22,7 @@ from __future__ import annotations
import builtins
import importlib
from pprint import pformat
import sys
from types import (
TracebackType,
)
@ -110,6 +111,7 @@ _body_fields: list[str] = list(
'tb_str',
'relay_path',
'cid',
'message',
# only ctxc should show it but `Error` does
# have it as an optional field.
@ -236,6 +238,7 @@ class RemoteActorError(Exception):
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
self._ipc_msg: Error|None = ipc_msg
self._extra_msgdata = extra_msgdata
if (
extra_msgdata
@ -250,8 +253,6 @@ class RemoteActorError(Exception):
k,
v,
)
else:
self._extra_msgdata = extra_msgdata
# TODO: mask out eventually or place in `pack_error()`
# pre-`return` lines?
@ -282,6 +283,17 @@ class RemoteActorError(Exception):
# ensure any roundtripping evals to the input value
assert self.boxed_type is boxed_type
@property
def message(self) -> str:
'''
Be explicit, instead of trying to read it from the the parent
type's loosely defined `.args: tuple`:
https://docs.python.org/3/library/exceptions.html#BaseException.args
'''
return self._message
@property
def ipc_msg(self) -> Struct:
'''
@ -355,7 +367,10 @@ class RemoteActorError(Exception):
'''
bt: Type[BaseException] = self.boxed_type
return str(bt.__name__)
if bt:
return str(bt.__name__)
return ''
@property
def boxed_type(self) -> Type[BaseException]:
@ -426,8 +441,7 @@ class RemoteActorError(Exception):
for key in fields:
if (
key == 'relay_uid'
and not self.is_inception()
key == 'relay_uid' and not self.is_inception()
):
continue
@ -504,19 +518,80 @@ class RemoteActorError(Exception):
def pformat(
self,
with_type_header: bool = True,
# with_ascii_box: bool = True,
) -> str:
'''
Nicely formatted boxed error meta data + traceback, OR just
the normal message from `.args` (for eg. as you'd want shown
by a locally raised `ContextCancelled`).
Format any boxed remote error by multi-line display of,
- error's src or relay actor meta-data,
- remote runtime env's traceback,
With optional control over the format of,
- whether the boxed traceback is ascii-decorated with
a surrounding "box" annotating the embedded stack-trace.
- if the error's type name should be added as margins
around the field and tb content like:
`<RemoteActorError(.. <<multi-line-content>> .. )>`
- the placement of the `.message: str` (explicit equiv of
`.args[0]`), either placed below the `.tb_str` or in the
first line's header when the error is raised locally (since
the type name is already implicitly shown by python).
'''
header: str = ''
body: str = ''
message: str = ''
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(exc := sys.exception())
and
exc is self
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if with_type_header:
header: str = f'<{type(self).__name__}(\n'
header: str = f'<{type(self).__name__}('
if message := self._message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
if tb_str := self.tb_str:
fields: str = self._mk_fields_str(
_body_fields
@ -535,36 +610,19 @@ class RemoteActorError(Exception):
# |___ ..
tb_body_indent=1,
)
if not with_type_header:
body = '\n' + body
elif message := self._message:
# split off the first line so it isn't indented
# the same like the "boxed content".
if not with_type_header:
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
else:
first: str = ''
body: str = (
first
+
message
+
'\n'
)
if with_type_header:
tail: str = ')>'
else:
tail = ''
tail = ''
if (
with_type_header
and not message
):
tail: str = '>'
return (
header
+
message
+
f'{body}'
+
tail
@ -577,7 +635,9 @@ class RemoteActorError(Exception):
# |_ i guess `pexepect` relies on `str`-casing
# of output?
def __str__(self) -> str:
return self.pformat(with_type_header=False)
return self.pformat(
with_type_header=False
)
def unwrap(
self,
@ -825,9 +885,6 @@ class MsgTypeError(
extra_msgdata['_bad_msg'] = bad_msg
extra_msgdata['cid'] = bad_msg.cid
if 'cid' not in extra_msgdata:
import pdbp; pdbp.set_trace()
return cls(
message=message,
boxed_type=cls,
@ -889,6 +946,7 @@ def pack_error(
src_uid: tuple[str, str]|None = None,
tb: TracebackType|None = None,
tb_str: str = '',
message: str = '',
) -> Error:
'''
@ -902,7 +960,7 @@ def pack_error(
tb_str: str = (
''.join(traceback.format_exception(exc))
# TODO: can we remove this is `exc` is required?
# TODO: can we remove this since `exc` is required.. right?
or
# NOTE: this is just a shorthand for the "last error" as
# provided by `sys.exeception()`, see:
@ -917,8 +975,8 @@ def pack_error(
# when caller provides a tb instance (say pulled from some other
# src error's `.__traceback__`) we use that as the "boxed"
# tb-string instead.
# https://docs.python.org/3/library/traceback.html#traceback.format_list
if tb:
# https://docs.python.org/3/library/traceback.html#traceback.format_list
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
error_msg: dict[ # for IPC
@ -961,17 +1019,17 @@ def pack_error(
error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__
# XXX alawys append us the last relay in error propagation path
# XXX always append us the last relay in error propagation path
error_msg.setdefault(
'relay_path',
[],
).append(our_uid)
# XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed
# content's in `._ipc_msg.tb_str`).
# XXX NOTE XXX always ensure the traceback-str content is from
# the locally raised error (so, NOT the prior relay's boxed
# `._ipc_msg.tb_str`).
error_msg['tb_str'] = tb_str
error_msg['message'] = message or getattr(exc, 'message', '')
if cid is not None:
error_msg['cid'] = cid
@ -995,26 +1053,24 @@ def unpack_error(
if not isinstance(msg, Error):
return None
# retrieve the remote error's msg-encoded details
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
+
tb_str
)
# try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance.
# boxed_type_str: str = error_dict['boxed_type_str']
boxed_type_str: str = msg.boxed_type_str
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
if boxed_type_str == 'ContextCancelled':
box_type = ContextCancelled
assert boxed_type is box_type
# retrieve the error's msg-encoded remotoe-env info
message: str = f'remote task raised a {msg.boxed_type_str!r}\n'
elif boxed_type_str == 'MsgTypeError':
box_type = MsgTypeError
# TODO: do we even really need these checks for RAEs?
if boxed_type_str in [
'ContextCancelled',
'MsgTypeError',
]:
box_type = {
'ContextCancelled': ContextCancelled,
'MsgTypeError': MsgTypeError,
}[boxed_type_str]
assert boxed_type is box_type
# TODO: already included by `_this_mod` in else loop right?
@ -1029,19 +1085,21 @@ def unpack_error(
exc = box_type(
message,
ipc_msg=msg,
tb_str=msg.tb_str,
)
return exc
def is_multi_cancelled(exc: BaseException) -> bool:
def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup
) -> bool:
'''
Predicate to determine if a possible ``BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks.
'''
# if isinstance(exc, eg.BaseExceptionGroup):
if isinstance(exc, BaseExceptionGroup):
return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled)
@ -1109,6 +1167,7 @@ def _raise_from_unexpected_msg(
msg,
ctx.chan,
)
ctx._maybe_cancel_and_set_remote_error(exc)
raise exc from src_err
# `MsgStream` termination msg.
@ -1183,7 +1242,6 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None,
is_invalid_payload: bool = False,
# src_err_msg: Error|None = None,
**mte_kwargs,
@ -1250,19 +1308,11 @@ def _mk_msg_type_err(
msg_type: str = type(msg)
any_pld: Any = msgpack.decode(msg.pld)
message: str = (
f'invalid `{msg_type.__qualname__}` payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: ' #\n'
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: '
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
# f'<{type(msg).__qualname__}(\n'
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
# f')>\n\n'
)
# src_err_msg = msg
bad_msg = msg
# TODO: should we just decode the msg to a dict despite
# only the payload being wrong?
# -[ ] maybe the better design is to break this construct
# logic into a separate explicit helper raiser-func?
else:
# decode the msg-bytes using the std msgpack
@ -1307,21 +1357,21 @@ def _mk_msg_type_err(
if verb_header:
message = f'{verb_header} ' + message
# if not isinstance(bad_msg, PayloadMsg):
# import pdbp; pdbp.set_trace()
msgtyperr = MsgTypeError.from_decode(
message=message,
bad_msg=bad_msg,
bad_msg_as_dict=msg_dict,
# NOTE: for the send-side `.started()` pld-validate
# case we actually set the `._ipc_msg` AFTER we return
# from here inside `Context.started()` since we actually
# want to emulate the `Error` from the mte we build here
# Bo
# so by default in that case this is set to `None`
# ipc_msg=src_err_msg,
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
# - for the send-side `.started()` pld-validate
# case we actually raise inline so we don't need to
# set the it at all.
# - for recv side we set it inside `PldRx.decode_pld()`
# after a manual call to `pack_error()` since we
# actually want to emulate the `Error` from the mte we
# build here. So by default in that case, this is left
# as `None` here.
# ipc_msg=src_err_msg,
)
msgtyperr.__cause__ = src_validation_error
return msgtyperr

View File

@ -291,7 +291,7 @@ class MsgpackTCPStream(MsgTransport):
async def send(
self,
msg: msgtypes.Msg,
msg: msgtypes.MsgType,
strict_types: bool = True,
# hide_tb: bool = False,

View File

@ -64,6 +64,7 @@ from .log import get_logger
from .msg import (
current_codec,
MsgCodec,
PayloadT,
NamespacePath,
pretty_struct,
)
@ -98,7 +99,7 @@ async def _invoke_non_context(
treat_as_gen: bool,
is_rpc: bool,
return_msg: Return|CancelAck = Return,
return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
@ -220,7 +221,7 @@ async def _invoke_non_context(
and chan.connected()
):
try:
ret_msg = return_msg(
ret_msg = return_msg_type(
cid=cid,
pld=result,
)
@ -392,16 +393,22 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet
if is_rpc:
log.warning(
'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
else:
log.cancel(
'Failed to de-alloc internal runtime cancel task?\n'
'RPC task likely errored or cancelled before start?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
# TODO: remove this right? rn the only non-`is_rpc` cases
# are cancellation methods and according the RPC loop eps
# for thoses below, nothing is ever registered in
# `Actor._rpc_tasks` for those cases.. but should we?
#
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
# else:
# log.cancel(
# 'Failed to de-alloc internal runtime cancel task?\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# )
finally:
if not actor._rpc_tasks:
@ -419,7 +426,7 @@ async def _invoke(
is_rpc: bool = True,
hide_tb: bool = True,
return_msg: Return|CancelAck = Return,
return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
@ -533,7 +540,7 @@ async def _invoke(
kwargs,
treat_as_gen,
is_rpc,
return_msg,
return_msg_type,
task_status,
)
# XXX below fallthrough is ONLY for `@context` eps
@ -593,18 +600,21 @@ async def _invoke(
ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
res: Any = await coro
ctx._result = res
# deliver final result to caller side.
await chan.send(
return_msg(
cid=cid,
pld=res,
)
# TODO: better `trionics` tooling:
# -[ ] should would be nice to have our `TaskMngr`
# nursery here!
# -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage
# here in the child task instead of waiting for the
# parent to crash with it's own MTE..
res: Any|PayloadT = await coro
return_msg: Return|CancelAck = return_msg_type(
cid=cid,
pld=res,
)
# set and shuttle final result to "parent"-side task.
ctx._result = res
await chan.send(return_msg)
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
@ -940,7 +950,7 @@ async def process_messages(
actor.cancel,
kwargs,
is_rpc=False,
return_msg=CancelAck,
return_msg_type=CancelAck,
)
log.runtime(
@ -974,7 +984,7 @@ async def process_messages(
actor._cancel_task,
kwargs,
is_rpc=False,
return_msg=CancelAck,
return_msg_type=CancelAck,
)
except BaseException:
log.exception(

View File

@ -1256,9 +1256,10 @@ class Actor:
# - child returns a result before cancel-msg/ctxc-raised
# - child self raises ctxc before parent send request,
# - child errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n'
log.runtime(
'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_uid}\n'
f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n'
)

View File

@ -140,7 +140,7 @@ class MsgDec(Struct):
# * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions..
# - `MsgSpec: Union[Type[Msg]]
# - `MsgSpec: Union[MsgType]
#
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
@ -188,7 +188,7 @@ def mk_dec(
return MsgDec(
_dec=msgpack.Decoder(
type=spec, # like `Msg[Any]`
type=spec, # like `MsgType[Any]`
dec_hook=dec_hook,
)
)
@ -561,7 +561,7 @@ def mk_codec(
'''
# (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
# god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP
# will automatically decode to a type-"limited" payload (`Struct`)
# object (set).
@ -607,7 +607,7 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# The built-in IPC `Msg` spec.
# Our composing "shuttle" protocol which allows `tractor`-app code
# to use any `msgspec` supported type as the `Msg.pld` payload,
# to use any `msgspec` supported type as the `PayloadMsg.pld` payload,
# https://jcristharif.com/msgspec/supported-types.html
#
_def_tractor_codec: MsgCodec = mk_codec(
@ -743,7 +743,7 @@ def limit_msg_spec(
) -> MsgCodec:
'''
Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`.

View File

@ -53,6 +53,8 @@ from tractor._state import current_ipc_ctx
from ._codec import (
mk_dec,
MsgDec,
MsgCodec,
current_codec,
)
from .types import (
CancelAck,
@ -213,6 +215,9 @@ class PldRx(Struct):
**dec_msg_kwargs,
)
# TODO: rename to,
# -[ ] `.decode_pld()`?
# -[ ] `.dec_pld()`?
def dec_msg(
self,
msg: MsgType,
@ -246,8 +251,8 @@ class PldRx(Struct):
pld: PayloadT = self._pld_dec.decode(pld)
log.runtime(
'Decoded msg payload\n\n'
f'{msg}\n\n'
f'where payload is\n'
f'{msg}\n'
f'where payload decoded as\n'
f'|_pld={pld!r}\n'
)
return pld
@ -263,13 +268,29 @@ class PldRx(Struct):
src_validation_error=valerr,
is_invalid_payload=True,
expected_msg=expect_msg,
# ipc_msg=msg,
)
# NOTE: override the `msg` passed to
# `_raise_from_unexpected_msg()` (below) so so that
# we're effectively able to use that same func to
# unpack and raise an "emulated remote `Error`" of
# this local MTE.
# NOTE: just raise the MTE inline instead of all
# the pack-unpack-repack non-sense when this is
# a "send side" validation error.
if is_started_send_side:
raise mte
# XXX TODO: remove this right?
# => any bad stated/return values should
# always be treated a remote errors right?
#
# if (
# expect_msg is Return
# or expect_msg is Started
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# NOTE: the `.message` is automatically
# transferred into the message as long as we
# define it as a `Error.message` field.
err_msg: Error = pack_error(
exc=mte,
cid=msg.cid,
@ -279,36 +300,38 @@ class PldRx(Struct):
else ipc._actor.uid
),
# tb=valerr.__traceback__,
tb_str=mte._message,
# tb_str=mte._message,
# message=mte._message,
)
# ^-TODO-^ just raise this inline instead of all the
# pack-unpack-repack non-sense!
mte._ipc_msg = err_msg
msg = err_msg
# set emulated remote error more-or-less as the
# runtime would
ctx: Context = getattr(ipc, 'ctx', ipc)
# XXX override the `msg` passed to
# `_raise_from_unexpected_msg()` (below) so so
# that we're effectively able to use that same
# func to unpack and raise an "emulated remote
# `Error`" of this local MTE.
msg = err_msg
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises
# it from the above caught interchange-lib
# validation error.
src_err = valerr
# TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y..
if (
expect_msg is not Started
and not is_started_send_side
):
ctx._maybe_cancel_and_set_remote_error(mte)
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises
# it from `None`.
src_err = valerr
# if is_started_send_side:
# src_err = None
# TODO: remove this since it's been added to
# `_raise_from_unexpected_msg()`..?
# if (
# expect_msg is not Started
# and not is_started_send_side
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# XXX some other decoder specific failure?
# except TypeError as src_error:
@ -559,6 +582,7 @@ async def drain_to_final_msg(
ipc=ctx,
expect_msg=Return,
raise_error=False,
hide_tb=hide_tb,
)
# ^-TODO-^ some bad ideas?
# -[ ] wrap final outcome .receive() in a scope so
@ -737,9 +761,61 @@ async def drain_to_final_msg(
)
# TODO: factor logic from `.Context.started()` for send-side
# validate raising!
def validate_payload_msg(
msg: Started|Yield|Return,
pld_msg: Started|Yield|Return,
pld_value: PayloadT,
ipc: Context|MsgStream,
raise_mte: bool = True,
strict_pld_parity: bool = False,
hide_tb: bool = True,
) -> MsgTypeError|None:
...
'''
Validate a `PayloadMsg.pld` value with the current
IPC ctx's `PldRx` and raise an appropriate `MsgTypeError`
on failure.
'''
__tracebackhide__: bool = hide_tb
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(pld_msg)
try:
roundtripped: Started = codec.decode(msg_bytes)
ctx: Context = getattr(ipc, 'ctx', ipc)
pld: PayloadT = ctx.pld_rx.dec_msg(
msg=roundtripped,
ipc=ipc,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_pld_parity
and
pld != pld_value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
pld_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
mte: MsgTypeError = _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
)
if not raise_mte:
return mte
raise mte from verr

View File

@ -89,11 +89,12 @@ class PayloadMsg(
# -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
# The msgs "payload" (spelled without vowels):
# The msg's "payload" (spelled without vowels):
# https://en.wikipedia.org/wiki/Payload_(computing)
#
# NOTE: inherited from any `Msg` (and maybe overriden
# by use of `limit_msg_spec()`), but by default is
pld: Raw
# ^-NOTE-^ inherited from any `PayloadMsg` (and maybe type
# overriden via the `._ops.limit_plds()` API), but by default is
# parameterized to be `Any`.
#
# XXX this `Union` must strictly NOT contain `Any` if
@ -106,7 +107,6 @@ class PayloadMsg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below.
pld: Raw
# TODO: complete rename
@ -410,21 +410,32 @@ class Error(
src_type_str: str
boxed_type_str: str
relay_path: list[tuple[str, str]]
tb_str: str
cid: str|None = None
# normally either both are provided or just
# a message for certain special cases where
# we pack a message for a locally raised
# mte or ctxc.
message: str|None = None
tb_str: str = ''
# TODO: use UNSET or don't include them via
# TODO: only optionally include sub-type specfic fields?
# -[ ] use UNSET or don't include them via `omit_defaults` (see
# inheritance-line options above)
#
# `ContextCancelled`
# `ContextCancelled` reports the src cancelling `Actor.uid`
canceller: tuple[str, str]|None = None
# `StreamOverrun`
# `StreamOverrun`-specific src `Actor.uid`
sender: tuple[str, str]|None = None
# for the `MsgTypeError` case where the receiver side
# decodes the underlying original `Msg`-subtype
_msg_dict: dict|None = None
# `MsgTypeError` meta-data
cid: str|None = None
# when the receiver side fails to decode a delivered
# `PayloadMsg`-subtype; one and/or both the msg-struct instance
# and `Any`-decoded to `dict` of the msg are set and relayed
# (back to the sender) for introspection.
_bad_msg: Started|Yield|Return|None = None
_bad_msg_as_dict: dict|None = None
def from_dict_msg(
@ -436,9 +447,11 @@ def from_dict_msg(
) -> MsgType:
'''
Helper to build a specific `MsgType` struct from
a "vanilla" decoded `dict`-ified equivalent of the
msg: i.e. if the `msgpack.Decoder.type == Any`.
Helper to build a specific `MsgType` struct from a "vanilla"
decoded `dict`-ified equivalent of the msg: i.e. if the
`msgpack.Decoder.type == Any`, the default when using
`msgspec.msgpack` and not "typed decoding" using
`msgspec.Struct`.
'''
msg_type_tag_field: str = (