Compare commits

..

No commits in common. "0e8c60ee4aab56c4668f192b541bd7804256e6f1" and "eee4c61b51e6d3053549e67650be04fcd03ab2d5" have entirely different histories.

16 changed files with 312 additions and 522 deletions

View File

@ -4,15 +4,9 @@ import trio
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
try:
while True:
yield 'yo'
await tractor.breakpoint()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'
)
raise
async def name_error():
@ -25,7 +19,7 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='cancel',
loglevel='error',
) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -45,7 +45,6 @@ async def spawn_until(depth=0):
)
# TODO: notes on the new boxed-relayed errors through proxy actors
async def main():
"""The main ``tractor`` routine.

View File

@ -23,6 +23,5 @@ async def main():
n.start_soon(debug_actor.run, die)
n.start_soon(crash_boi.run, die)
if __name__ == '__main__':
trio.run(main)

View File

@ -2,13 +2,10 @@ import trio
import tractor
async def main(
registry_addrs: tuple[str, int]|None = None
):
async def main():
async with tractor.open_root_actor(
debug_mode=True,
# loglevel='runtime',
):
while True:
await tractor.breakpoint()

View File

@ -3,26 +3,16 @@ import tractor
async def name_error():
getattr(doggypants) # noqa (on purpose)
getattr(doggypants)
async def main():
async with tractor.open_nursery(
debug_mode=True,
# loglevel='transport',
) as an:
) as n:
# TODO: ideally the REPL arrives at this frame in the parent,
# ABOVE the @api_frame of `Portal.run_in_actor()` (which
# should eventually not even be a portal method ... XD)
# await tractor.pause()
p: tractor.Portal = await an.run_in_actor(name_error)
# with this style, should raise on this line
await p.result()
# with this alt style should raise at `open_nusery()`
# return await p.result()
portal = await n.run_in_actor(name_error)
await portal.result()
if __name__ == '__main__':

View File

@ -7,7 +7,7 @@ def sync_pause(
error: bool = False,
):
if use_builtin:
breakpoint(hide_tb=False)
breakpoint()
else:
tractor.pause_from_sync()
@ -20,20 +20,18 @@ def sync_pause(
async def start_n_sync_pause(
ctx: tractor.Context,
):
actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
# sync to requesting peer
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
print(f'entering SYNC PAUSE in {actor.uid}')
sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}')
async def main() -> None:
async with tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
) as an:

View File

@ -39,7 +39,8 @@ msgspec='^0.18.5' # interchange
wrapt = "^1.16.0" # decorators
colorlog = "^6.8.2" # logging
# built-in multi-actor `pdb` REPL
# .devx tooling
stackscope = "^0.2.2"
pdbp = "^1.5.0"
@ -48,19 +49,15 @@ pdbp = "^1.5.0"
# 'pyroute2
# ------ - ------
xontrib-vox = "^0.0.1"
[tool.poetry.group.dev]
optional = false
[tool.poetry.group.dev.dependencies]
# testing
pytest = "^8.2.0"
pexpect = "^4.9.0"
# .devx tooling
greenback = "^1.2.1"
stackscope = "^0.2.2"
# (light) xonsh usage/integration
# only for xonsh as sh..
xontrib-vox = "^0.0.1"
prompt-toolkit = "^3.0.43"
xonsh-vox-tabcomplete = "^0.5"

View File

@ -6,19 +6,30 @@ related settings around IPC contexts.
'''
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
# import typing
from typing import (
# Any,
TypeAlias,
# Union,
)
from contextvars import (
Context,
)
from msgspec import (
# structs,
# msgpack,
Struct,
# ValidationError,
)
import pytest
import trio
import tractor
from tractor import (
# _state,
MsgTypeError,
current_ipc_ctx,
Portal,
@ -29,9 +40,20 @@ from tractor.msg import (
)
from tractor.msg import (
_codec,
# _ctxvar_MsgCodec,
# NamespacePath,
# MsgCodec,
# mk_codec,
# apply_codec,
# current_codec,
)
from tractor.msg.types import (
log,
# _payload_msgs,
# PayloadMsg,
# Started,
# mk_msg_spec,
)
@ -42,10 +64,23 @@ class PldMsg(Struct):
maybe_msg_spec = PldMsg|None
@cm
def custom_spec(
ctx: Context,
spec: TypeAlias,
) -> _codec.MsgCodec:
'''
Apply a custom payload spec, remove on exit.
'''
rx: msgops.PldRx = ctx._pld_rx
@acm
async def maybe_expect_raises(
raises: BaseException|None = None,
ensure_in_message: list[str]|None = None,
reraise: bool = False,
timeout: int = 3,
) -> None:
@ -53,9 +88,6 @@ async def maybe_expect_raises(
Async wrapper for ensuring errors propagate from the inner scope.
'''
if tractor._state.debug_mode():
timeout += 999
with trio.fail_after(timeout):
try:
yield
@ -71,10 +103,9 @@ async def maybe_expect_raises(
# maybe check for error txt content
if ensure_in_message:
part: str
err_repr: str = repr(inner_err)
for part in ensure_in_message:
for i, arg in enumerate(inner_err.args):
if part in err_repr:
if part in arg:
break
# if part never matches an arg, then we're
# missing a match.
@ -135,15 +166,13 @@ async def child(
# 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation
mte: MsgTypeError|None = None
try:
await ctx.started(
value=started_value,
validate_pld_spec=validate_pld_spec,
)
except MsgTypeError as _mte:
mte = _mte
except MsgTypeError:
log.exception('started()` raised an MTE!\n')
if not expect_started_mte:
raise RuntimeError(
@ -151,61 +180,16 @@ async def child(
f'{started_value!r}\n'
)
boxed_div: str = '------ - ------'
assert boxed_div not in mte._message
assert boxed_div not in mte.tb_str
assert boxed_div not in repr(mte)
assert boxed_div not in str(mte)
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
# since this is a *local error* there should be no
# boxed traceback content!
assert not mte.tb_str
# propagate to parent?
if raise_on_started_mte:
raise
# no-send-side-error fallthrough
if (
validate_pld_spec
and
expect_started_mte
):
else:
if expect_started_mte:
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
assert (
not expect_started_mte
or
not validate_pld_spec
)
# if wait_for_parent_to_cancel:
# ...
#
# ^-TODO-^ logic for diff validation policies on each side:
#
# -[ ] ensure that if we don't validate on the send
# side, that we are eventually error-cancelled by our
# parent due to the bad `Started` payload!
# -[ ] the boxed error should be srced from the parent's
# runtime NOT ours!
# -[ ] we should still error on bad `return_value`s
# despite the parent not yet error-cancelling us?
# |_ how do we want the parent side to look in that
# case?
# -[ ] maybe the equiv of "during handling of the
# above error another occurred" for the case where
# the parent sends a MTE to this child and while
# waiting for the child to terminate it gets back
# the MTE for this case?
#
# XXX should always fail on recv side since we can't
# really do much else beside terminate and relay the
# msg-type-error from this RPC task ;)
@ -227,8 +211,8 @@ async def child(
@pytest.mark.parametrize(
'return_value',
[
'yo',
None,
'yo',
],
ids=[
'return[invalid-"yo"]',
@ -287,32 +271,16 @@ def test_basic_payload_spec(
# since not opened yet.
assert current_ipc_ctx() is None
if invalid_started:
msg_type_str: str = 'Started'
bad_value_str: str = '10'
elif invalid_return:
msg_type_str: str = 'Return'
bad_value_str: str = "'yo'"
else:
# XXX but should never be used below then..
msg_type_str: str = ''
bad_value_str: str = ''
maybe_mte: MsgTypeError|None = None
should_raise: Exception|None = (
MsgTypeError if (
async with (
maybe_expect_raises(
raises=MsgTypeError if (
invalid_return
or
invalid_started
) else None
)
async with (
maybe_expect_raises(
raises=should_raise,
) else None,
ensure_in_message=[
f"invalid `{msg_type_str}` msg payload",
f"value: `{bad_value_str}` does not "
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
"invalid `Return` payload",
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
],
),
p.open_context(
@ -330,35 +298,18 @@ def test_basic_payload_spec(
assert first.field == 'yo'
try:
res: None|PldMsg = await ctx.result(hide_tb=False)
assert res is None
assert (await ctx.result()) is None
except MsgTypeError as mte:
maybe_mte = mte
if not invalid_return:
raise
# expected this invalid `Return.pld` so audit
# the error state + meta-data
assert mte.expected_msg_type is Return
else: # expected this invalid `Return.pld`
assert mte.cid == ctx.cid
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
assert mte.tb_str
# await tractor.pause(shield=True)
# verify expected remote mte deats
assert ctx._local_error is None
assert (
mte is
ctx._remote_error is
ctx.maybe_error is
ctx.outcome
)
if should_raise is None:
assert maybe_mte is None
await tractor.pause()
assert ctx._remote_error is mte
assert mte.expected_msg_type is Return
await p.cancel_actor()

View File

@ -58,6 +58,9 @@ from typing import (
import warnings
# ------ - ------
import trio
from msgspec import (
ValidationError,
)
# ------ - ------
from ._exceptions import (
ContextCancelled,
@ -75,16 +78,19 @@ from .log import (
from .msg import (
Error,
MsgType,
MsgCodec,
NamespacePath,
PayloadT,
Started,
Stop,
Yield,
current_codec,
pretty_struct,
_ops as msgops,
)
from ._ipc import (
Channel,
_mk_msg_type_err,
)
from ._streaming import MsgStream
from ._state import (
@ -664,7 +670,7 @@ class Context:
'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}: {self._actor.uid}\n\n'
f'{error!r}'
f'{error}'
)
self._remote_error: BaseException = error
@ -718,7 +724,7 @@ class Context:
log.error(
f'Remote context error:\n\n'
# f'{pformat(self)}\n'
f'{error!r}'
f'{error}'
)
if self._canceller is None:
@ -742,27 +748,26 @@ class Context:
and not cs.cancel_called
and not cs.cancelled_caught
):
if (
if not (
msgerr
# NOTE: we allow user to config not cancelling the
# local scope on `MsgTypeError`s
and
not self._cancel_on_msgerr
and not self._cancel_on_msgerr
):
message: str = (
'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
)
else:
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
self._scope.cancel()
else:
message: str = (
'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
)
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
@ -1652,21 +1657,54 @@ class Context:
#
__tracebackhide__: bool = hide_tb
if validate_pld_spec:
msgops.validate_payload_msg(
pld_msg=started_msg,
pld_value=value,
# __tracebackhide__: bool = False
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(started_msg)
try:
roundtripped: Started = codec.decode(msg_bytes)
# pld: PayloadT = await self.pld_rx.recv_pld(
pld: PayloadT = self.pld_rx.dec_msg(
msg=roundtripped,
ipc=self,
strict_pld_parity=strict_pld_parity,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_pld_parity
and
pld != value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
started_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
# always show this src frame in the tb
# __tracebackhide__: bool = False
raise _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
) from verr
# TODO: maybe a flag to by-pass encode op if already done
# here in caller?
await self.chan.send(started_msg)
# set msg-related internal runtime-state
self._started_called: bool = True
self._started_msg: Started = started_msg
self._started_called = True
self._started_msg = started_msg
self._started_pld = value
async def _drain_overflows(
@ -2059,12 +2097,6 @@ async def open_context_from_portal(
if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
@ -2072,42 +2104,25 @@ async def open_context_from_portal(
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
try:
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
hide_tb=hide_tb,
)
except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope
if not ctx_cs.cancel_called:
raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure
# we raise the underlying `._remote_error` directly
# instead of bubbling that taskc.
ctx.maybe_raise()
# OW, some other unexpected cancel condition
# that should prolly never happen right?
raise InternalError(
'Invalid cancellation during IPC ctx sync phase?\n'
) from taskc
# await pause()
ctx._started_called: bool = True
ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
yield ctx, first

View File

@ -22,7 +22,6 @@ from __future__ import annotations
import builtins
import importlib
from pprint import pformat
import sys
from types import (
TracebackType,
)
@ -111,7 +110,6 @@ _body_fields: list[str] = list(
'tb_str',
'relay_path',
'cid',
'message',
# only ctxc should show it but `Error` does
# have it as an optional field.
@ -238,7 +236,6 @@ class RemoteActorError(Exception):
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
self._ipc_msg: Error|None = ipc_msg
self._extra_msgdata = extra_msgdata
if (
extra_msgdata
@ -253,6 +250,8 @@ class RemoteActorError(Exception):
k,
v,
)
else:
self._extra_msgdata = extra_msgdata
# TODO: mask out eventually or place in `pack_error()`
# pre-`return` lines?
@ -283,17 +282,6 @@ class RemoteActorError(Exception):
# ensure any roundtripping evals to the input value
assert self.boxed_type is boxed_type
@property
def message(self) -> str:
'''
Be explicit, instead of trying to read it from the the parent
type's loosely defined `.args: tuple`:
https://docs.python.org/3/library/exceptions.html#BaseException.args
'''
return self._message
@property
def ipc_msg(self) -> Struct:
'''
@ -367,11 +355,8 @@ class RemoteActorError(Exception):
'''
bt: Type[BaseException] = self.boxed_type
if bt:
return str(bt.__name__)
return ''
@property
def boxed_type(self) -> Type[BaseException]:
'''
@ -441,7 +426,8 @@ class RemoteActorError(Exception):
for key in fields:
if (
key == 'relay_uid' and not self.is_inception()
key == 'relay_uid'
and not self.is_inception()
):
continue
@ -518,80 +504,19 @@ class RemoteActorError(Exception):
def pformat(
self,
with_type_header: bool = True,
# with_ascii_box: bool = True,
) -> str:
'''
Format any boxed remote error by multi-line display of,
- error's src or relay actor meta-data,
- remote runtime env's traceback,
With optional control over the format of,
- whether the boxed traceback is ascii-decorated with
a surrounding "box" annotating the embedded stack-trace.
- if the error's type name should be added as margins
around the field and tb content like:
`<RemoteActorError(.. <<multi-line-content>> .. )>`
- the placement of the `.message: str` (explicit equiv of
`.args[0]`), either placed below the `.tb_str` or in the
first line's header when the error is raised locally (since
the type name is already implicitly shown by python).
Nicely formatted boxed error meta data + traceback, OR just
the normal message from `.args` (for eg. as you'd want shown
by a locally raised `ContextCancelled`).
'''
header: str = ''
body: str = ''
message: str = ''
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(exc := sys.exception())
and
exc is self
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if with_type_header:
header: str = f'<{type(self).__name__}('
header: str = f'<{type(self).__name__}(\n'
if message := self._message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
if tb_str := self.tb_str:
fields: str = self._mk_fields_str(
_body_fields
@ -610,19 +535,36 @@ class RemoteActorError(Exception):
# |___ ..
tb_body_indent=1,
)
if not with_type_header:
body = '\n' + body
elif message := self._message:
# split off the first line so it isn't indented
# the same like the "boxed content".
if not with_type_header:
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
else:
first: str = ''
body: str = (
first
+
message
+
'\n'
)
if with_type_header:
tail: str = ')>'
else:
tail = ''
if (
with_type_header
and not message
):
tail: str = '>'
return (
header
+
message
+
f'{body}'
+
tail
@ -635,9 +577,7 @@ class RemoteActorError(Exception):
# |_ i guess `pexepect` relies on `str`-casing
# of output?
def __str__(self) -> str:
return self.pformat(
with_type_header=False
)
return self.pformat(with_type_header=False)
def unwrap(
self,
@ -885,6 +825,9 @@ class MsgTypeError(
extra_msgdata['_bad_msg'] = bad_msg
extra_msgdata['cid'] = bad_msg.cid
if 'cid' not in extra_msgdata:
import pdbp; pdbp.set_trace()
return cls(
message=message,
boxed_type=cls,
@ -946,7 +889,6 @@ def pack_error(
src_uid: tuple[str, str]|None = None,
tb: TracebackType|None = None,
tb_str: str = '',
message: str = '',
) -> Error:
'''
@ -960,7 +902,7 @@ def pack_error(
tb_str: str = (
''.join(traceback.format_exception(exc))
# TODO: can we remove this since `exc` is required.. right?
# TODO: can we remove this is `exc` is required?
or
# NOTE: this is just a shorthand for the "last error" as
# provided by `sys.exeception()`, see:
@ -975,8 +917,8 @@ def pack_error(
# when caller provides a tb instance (say pulled from some other
# src error's `.__traceback__`) we use that as the "boxed"
# tb-string instead.
# https://docs.python.org/3/library/traceback.html#traceback.format_list
if tb:
# https://docs.python.org/3/library/traceback.html#traceback.format_list
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
error_msg: dict[ # for IPC
@ -1019,17 +961,17 @@ def pack_error(
error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__
# XXX always append us the last relay in error propagation path
# XXX alawys append us the last relay in error propagation path
error_msg.setdefault(
'relay_path',
[],
).append(our_uid)
# XXX NOTE XXX always ensure the traceback-str content is from
# the locally raised error (so, NOT the prior relay's boxed
# `._ipc_msg.tb_str`).
# XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed
# content's in `._ipc_msg.tb_str`).
error_msg['tb_str'] = tb_str
error_msg['message'] = message or getattr(exc, 'message', '')
if cid is not None:
error_msg['cid'] = cid
@ -1053,24 +995,26 @@ def unpack_error(
if not isinstance(msg, Error):
return None
# retrieve the remote error's msg-encoded details
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
+
tb_str
)
# try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance.
# boxed_type_str: str = error_dict['boxed_type_str']
boxed_type_str: str = msg.boxed_type_str
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
# retrieve the error's msg-encoded remotoe-env info
message: str = f'remote task raised a {msg.boxed_type_str!r}\n'
if boxed_type_str == 'ContextCancelled':
box_type = ContextCancelled
assert boxed_type is box_type
# TODO: do we even really need these checks for RAEs?
if boxed_type_str in [
'ContextCancelled',
'MsgTypeError',
]:
box_type = {
'ContextCancelled': ContextCancelled,
'MsgTypeError': MsgTypeError,
}[boxed_type_str]
elif boxed_type_str == 'MsgTypeError':
box_type = MsgTypeError
assert boxed_type is box_type
# TODO: already included by `_this_mod` in else loop right?
@ -1085,21 +1029,19 @@ def unpack_error(
exc = box_type(
message,
ipc_msg=msg,
tb_str=msg.tb_str,
)
return exc
def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup
) -> bool:
def is_multi_cancelled(exc: BaseException) -> bool:
'''
Predicate to determine if a possible ``BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks.
'''
# if isinstance(exc, eg.BaseExceptionGroup):
if isinstance(exc, BaseExceptionGroup):
return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled)
@ -1167,7 +1109,6 @@ def _raise_from_unexpected_msg(
msg,
ctx.chan,
)
ctx._maybe_cancel_and_set_remote_error(exc)
raise exc from src_err
# `MsgStream` termination msg.
@ -1242,6 +1183,7 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None,
is_invalid_payload: bool = False,
# src_err_msg: Error|None = None,
**mte_kwargs,
@ -1308,11 +1250,19 @@ def _mk_msg_type_err(
msg_type: str = type(msg)
any_pld: Any = msgpack.decode(msg.pld)
message: str = (
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: '
f'invalid `{msg_type.__qualname__}` payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: ' #\n'
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
# f'<{type(msg).__qualname__}(\n'
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
# f')>\n\n'
)
# src_err_msg = msg
bad_msg = msg
# TODO: should we just decode the msg to a dict despite
# only the payload being wrong?
# -[ ] maybe the better design is to break this construct
# logic into a separate explicit helper raiser-func?
else:
# decode the msg-bytes using the std msgpack
@ -1357,20 +1307,20 @@ def _mk_msg_type_err(
if verb_header:
message = f'{verb_header} ' + message
# if not isinstance(bad_msg, PayloadMsg):
# import pdbp; pdbp.set_trace()
msgtyperr = MsgTypeError.from_decode(
message=message,
bad_msg=bad_msg,
bad_msg_as_dict=msg_dict,
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
# - for the send-side `.started()` pld-validate
# case we actually raise inline so we don't need to
# set the it at all.
# - for recv side we set it inside `PldRx.decode_pld()`
# after a manual call to `pack_error()` since we
# actually want to emulate the `Error` from the mte we
# build here. So by default in that case, this is left
# as `None` here.
# NOTE: for the send-side `.started()` pld-validate
# case we actually set the `._ipc_msg` AFTER we return
# from here inside `Context.started()` since we actually
# want to emulate the `Error` from the mte we build here
# Bo
# so by default in that case this is set to `None`
# ipc_msg=src_err_msg,
)
msgtyperr.__cause__ = src_validation_error

View File

@ -291,7 +291,7 @@ class MsgpackTCPStream(MsgTransport):
async def send(
self,
msg: msgtypes.MsgType,
msg: msgtypes.Msg,
strict_types: bool = True,
# hide_tb: bool = False,

View File

@ -64,7 +64,6 @@ from .log import get_logger
from .msg import (
current_codec,
MsgCodec,
PayloadT,
NamespacePath,
pretty_struct,
)
@ -99,7 +98,7 @@ async def _invoke_non_context(
treat_as_gen: bool,
is_rpc: bool,
return_msg_type: Return|CancelAck = Return,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
@ -221,7 +220,7 @@ async def _invoke_non_context(
and chan.connected()
):
try:
ret_msg = return_msg_type(
ret_msg = return_msg(
cid=cid,
pld=result,
)
@ -393,22 +392,16 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet
if is_rpc:
log.warning(
'RPC task likely errored or cancelled before start?\n'
'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
else:
log.cancel(
'Failed to de-alloc internal runtime cancel task?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
# TODO: remove this right? rn the only non-`is_rpc` cases
# are cancellation methods and according the RPC loop eps
# for thoses below, nothing is ever registered in
# `Actor._rpc_tasks` for those cases.. but should we?
#
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
# else:
# log.cancel(
# 'Failed to de-alloc internal runtime cancel task?\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# )
finally:
if not actor._rpc_tasks:
@ -426,7 +419,7 @@ async def _invoke(
is_rpc: bool = True,
hide_tb: bool = True,
return_msg_type: Return|CancelAck = Return,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
@ -540,7 +533,7 @@ async def _invoke(
kwargs,
treat_as_gen,
is_rpc,
return_msg_type,
return_msg,
task_status,
)
# XXX below fallthrough is ONLY for `@context` eps
@ -600,21 +593,18 @@ async def _invoke(
ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: better `trionics` tooling:
# -[ ] should would be nice to have our `TaskMngr`
# nursery here!
# -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage
# here in the child task instead of waiting for the
# parent to crash with it's own MTE..
res: Any|PayloadT = await coro
return_msg: Return|CancelAck = return_msg_type(
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
res: Any = await coro
ctx._result = res
# deliver final result to caller side.
await chan.send(
return_msg(
cid=cid,
pld=res,
)
# set and shuttle final result to "parent"-side task.
ctx._result = res
await chan.send(return_msg)
)
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
@ -950,7 +940,7 @@ async def process_messages(
actor.cancel,
kwargs,
is_rpc=False,
return_msg_type=CancelAck,
return_msg=CancelAck,
)
log.runtime(
@ -984,7 +974,7 @@ async def process_messages(
actor._cancel_task,
kwargs,
is_rpc=False,
return_msg_type=CancelAck,
return_msg=CancelAck,
)
except BaseException:
log.exception(

View File

@ -1256,10 +1256,9 @@ class Actor:
# - child returns a result before cancel-msg/ctxc-raised
# - child self raises ctxc before parent send request,
# - child errors prior to cancel req.
log.runtime(
'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_uid}\n'
log.cancel(
'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n'
f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n'
)

View File

@ -140,7 +140,7 @@ class MsgDec(Struct):
# * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions..
# - `MsgSpec: Union[MsgType]
# - `MsgSpec: Union[Type[Msg]]
#
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
@ -188,7 +188,7 @@ def mk_dec(
return MsgDec(
_dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]`
type=spec, # like `Msg[Any]`
dec_hook=dec_hook,
)
)
@ -561,7 +561,7 @@ def mk_codec(
'''
# (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT`
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP
# will automatically decode to a type-"limited" payload (`Struct`)
# object (set).
@ -607,7 +607,7 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# The built-in IPC `Msg` spec.
# Our composing "shuttle" protocol which allows `tractor`-app code
# to use any `msgspec` supported type as the `PayloadMsg.pld` payload,
# to use any `msgspec` supported type as the `Msg.pld` payload,
# https://jcristharif.com/msgspec/supported-types.html
#
_def_tractor_codec: MsgCodec = mk_codec(
@ -743,7 +743,7 @@ def limit_msg_spec(
) -> MsgCodec:
'''
Apply a `MsgCodec` that will natively decode the SC-msg set's
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using
`Msg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`.

View File

@ -53,8 +53,6 @@ from tractor._state import current_ipc_ctx
from ._codec import (
mk_dec,
MsgDec,
MsgCodec,
current_codec,
)
from .types import (
CancelAck,
@ -215,9 +213,6 @@ class PldRx(Struct):
**dec_msg_kwargs,
)
# TODO: rename to,
# -[ ] `.decode_pld()`?
# -[ ] `.dec_pld()`?
def dec_msg(
self,
msg: MsgType,
@ -251,8 +246,8 @@ class PldRx(Struct):
pld: PayloadT = self._pld_dec.decode(pld)
log.runtime(
'Decoded msg payload\n\n'
f'{msg}\n'
f'where payload decoded as\n'
f'{msg}\n\n'
f'where payload is\n'
f'|_pld={pld!r}\n'
)
return pld
@ -268,29 +263,13 @@ class PldRx(Struct):
src_validation_error=valerr,
is_invalid_payload=True,
expected_msg=expect_msg,
# ipc_msg=msg,
)
# NOTE: just raise the MTE inline instead of all
# the pack-unpack-repack non-sense when this is
# a "send side" validation error.
if is_started_send_side:
raise mte
# XXX TODO: remove this right?
# => any bad stated/return values should
# always be treated a remote errors right?
#
# if (
# expect_msg is Return
# or expect_msg is Started
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# NOTE: the `.message` is automatically
# transferred into the message as long as we
# define it as a `Error.message` field.
# NOTE: override the `msg` passed to
# `_raise_from_unexpected_msg()` (below) so so that
# we're effectively able to use that same func to
# unpack and raise an "emulated remote `Error`" of
# this local MTE.
err_msg: Error = pack_error(
exc=mte,
cid=msg.cid,
@ -300,38 +279,36 @@ class PldRx(Struct):
else ipc._actor.uid
),
# tb=valerr.__traceback__,
# tb_str=mte._message,
# message=mte._message,
tb_str=mte._message,
)
mte._ipc_msg = err_msg
# ^-TODO-^ just raise this inline instead of all the
# pack-unpack-repack non-sense!
# XXX override the `msg` passed to
# `_raise_from_unexpected_msg()` (below) so so
# that we're effectively able to use that same
# func to unpack and raise an "emulated remote
# `Error`" of this local MTE.
mte._ipc_msg = err_msg
msg = err_msg
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises
# it from the above caught interchange-lib
# validation error.
src_err = valerr
# set emulated remote error more-or-less as the
# runtime would
ctx: Context = getattr(ipc, 'ctx', ipc)
# TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y..
# TODO: remove this since it's been added to
# `_raise_from_unexpected_msg()`..?
# if (
# expect_msg is not Started
# and not is_started_send_side
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
if (
expect_msg is not Started
and not is_started_send_side
):
ctx._maybe_cancel_and_set_remote_error(mte)
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
# raises the boxed `err_msg` from above it raises
# it from `None`.
src_err = valerr
# if is_started_send_side:
# src_err = None
# XXX some other decoder specific failure?
# except TypeError as src_error:
@ -582,7 +559,6 @@ async def drain_to_final_msg(
ipc=ctx,
expect_msg=Return,
raise_error=False,
hide_tb=hide_tb,
)
# ^-TODO-^ some bad ideas?
# -[ ] wrap final outcome .receive() in a scope so
@ -761,61 +737,9 @@ async def drain_to_final_msg(
)
# TODO: factor logic from `.Context.started()` for send-side
# validate raising!
def validate_payload_msg(
pld_msg: Started|Yield|Return,
pld_value: PayloadT,
ipc: Context|MsgStream,
raise_mte: bool = True,
strict_pld_parity: bool = False,
hide_tb: bool = True,
msg: Started|Yield|Return,
) -> MsgTypeError|None:
'''
Validate a `PayloadMsg.pld` value with the current
IPC ctx's `PldRx` and raise an appropriate `MsgTypeError`
on failure.
'''
__tracebackhide__: bool = hide_tb
codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(pld_msg)
try:
roundtripped: Started = codec.decode(msg_bytes)
ctx: Context = getattr(ipc, 'ctx', ipc)
pld: PayloadT = ctx.pld_rx.dec_msg(
msg=roundtripped,
ipc=ipc,
expect_msg=Started,
hide_tb=hide_tb,
is_started_send_side=True,
)
if (
strict_pld_parity
and
pld != pld_value
):
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
roundtripped,
pld_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
raise ValidationError(complaint)
# raise any msg type error NO MATTER WHAT!
except ValidationError as verr:
mte: MsgTypeError = _mk_msg_type_err(
msg=roundtripped,
codec=codec,
src_validation_error=verr,
verb_header='Trying to send ',
is_invalid_payload=True,
)
if not raise_mte:
return mte
raise mte from verr
...

View File

@ -89,12 +89,11 @@ class PayloadMsg(
# -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
# The msg's "payload" (spelled without vowels):
# The msgs "payload" (spelled without vowels):
# https://en.wikipedia.org/wiki/Payload_(computing)
pld: Raw
# ^-NOTE-^ inherited from any `PayloadMsg` (and maybe type
# overriden via the `._ops.limit_plds()` API), but by default is
#
# NOTE: inherited from any `Msg` (and maybe overriden
# by use of `limit_msg_spec()`), but by default is
# parameterized to be `Any`.
#
# XXX this `Union` must strictly NOT contain `Any` if
@ -107,6 +106,7 @@ class PayloadMsg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below.
pld: Raw
# TODO: complete rename
@ -410,32 +410,21 @@ class Error(
src_type_str: str
boxed_type_str: str
relay_path: list[tuple[str, str]]
tb_str: str
# normally either both are provided or just
# a message for certain special cases where
# we pack a message for a locally raised
# mte or ctxc.
message: str|None = None
tb_str: str = ''
cid: str|None = None
# TODO: only optionally include sub-type specfic fields?
# -[ ] use UNSET or don't include them via `omit_defaults` (see
# inheritance-line options above)
# TODO: use UNSET or don't include them via
#
# `ContextCancelled` reports the src cancelling `Actor.uid`
# `ContextCancelled`
canceller: tuple[str, str]|None = None
# `StreamOverrun`-specific src `Actor.uid`
# `StreamOverrun`
sender: tuple[str, str]|None = None
# `MsgTypeError` meta-data
cid: str|None = None
# when the receiver side fails to decode a delivered
# `PayloadMsg`-subtype; one and/or both the msg-struct instance
# and `Any`-decoded to `dict` of the msg are set and relayed
# (back to the sender) for introspection.
_bad_msg: Started|Yield|Return|None = None
_bad_msg_as_dict: dict|None = None
# for the `MsgTypeError` case where the receiver side
# decodes the underlying original `Msg`-subtype
_msg_dict: dict|None = None
def from_dict_msg(
@ -447,11 +436,9 @@ def from_dict_msg(
) -> MsgType:
'''
Helper to build a specific `MsgType` struct from a "vanilla"
decoded `dict`-ified equivalent of the msg: i.e. if the
`msgpack.Decoder.type == Any`, the default when using
`msgspec.msgpack` and not "typed decoding" using
`msgspec.Struct`.
Helper to build a specific `MsgType` struct from
a "vanilla" decoded `dict`-ified equivalent of the
msg: i.e. if the `msgpack.Decoder.type == Any`.
'''
msg_type_tag_field: str = (