Compare commits
16 Commits
eee4c61b51
...
0e8c60ee4a
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 0e8c60ee4a | |
Tyler Goodlet | 1db5d4def2 | |
Tyler Goodlet | 6e54abc56d | |
Tyler Goodlet | 28af4749cc | |
Tyler Goodlet | 02a7c7c276 | |
Tyler Goodlet | 4fa71cc01c | |
Tyler Goodlet | 6a4ee461f5 | |
Tyler Goodlet | 2db03444f7 | |
Tyler Goodlet | a1b124b62b | |
Tyler Goodlet | 59ca256183 | |
Tyler Goodlet | 6c2efc96dc | |
Tyler Goodlet | f7fd8278af | |
Tyler Goodlet | 7ac730e326 | |
Tyler Goodlet | 582144830f | |
Tyler Goodlet | 8b860f4245 | |
Tyler Goodlet | 27fd96729a |
|
@ -4,9 +4,15 @@ import trio
|
|||
|
||||
async def breakpoint_forever():
|
||||
"Indefinitely re-enter debugger in child actor."
|
||||
while True:
|
||||
yield 'yo'
|
||||
await tractor.breakpoint()
|
||||
try:
|
||||
while True:
|
||||
yield 'yo'
|
||||
await tractor.breakpoint()
|
||||
except BaseException:
|
||||
tractor.log.get_console_log().exception(
|
||||
'Cancelled while trying to enter pause point!'
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def name_error():
|
||||
|
@ -19,7 +25,7 @@ async def main():
|
|||
"""
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
loglevel='error',
|
||||
loglevel='cancel',
|
||||
) as n:
|
||||
|
||||
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
|
||||
|
|
|
@ -45,6 +45,7 @@ async def spawn_until(depth=0):
|
|||
)
|
||||
|
||||
|
||||
# TODO: notes on the new boxed-relayed errors through proxy actors
|
||||
async def main():
|
||||
"""The main ``tractor`` routine.
|
||||
|
||||
|
|
|
@ -23,5 +23,6 @@ async def main():
|
|||
n.start_soon(debug_actor.run, die)
|
||||
n.start_soon(crash_boi.run, die)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trio.run(main)
|
||||
|
|
|
@ -2,10 +2,13 @@ import trio
|
|||
import tractor
|
||||
|
||||
|
||||
async def main():
|
||||
async def main(
|
||||
registry_addrs: tuple[str, int]|None = None
|
||||
):
|
||||
|
||||
async with tractor.open_root_actor(
|
||||
debug_mode=True,
|
||||
# loglevel='runtime',
|
||||
):
|
||||
while True:
|
||||
await tractor.breakpoint()
|
||||
|
|
|
@ -3,16 +3,26 @@ import tractor
|
|||
|
||||
|
||||
async def name_error():
|
||||
getattr(doggypants)
|
||||
getattr(doggypants) # noqa (on purpose)
|
||||
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
) as n:
|
||||
# loglevel='transport',
|
||||
) as an:
|
||||
|
||||
portal = await n.run_in_actor(name_error)
|
||||
await portal.result()
|
||||
# TODO: ideally the REPL arrives at this frame in the parent,
|
||||
# ABOVE the @api_frame of `Portal.run_in_actor()` (which
|
||||
# should eventually not even be a portal method ... XD)
|
||||
# await tractor.pause()
|
||||
p: tractor.Portal = await an.run_in_actor(name_error)
|
||||
|
||||
# with this style, should raise on this line
|
||||
await p.result()
|
||||
|
||||
# with this alt style should raise at `open_nusery()`
|
||||
# return await p.result()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -7,7 +7,7 @@ def sync_pause(
|
|||
error: bool = False,
|
||||
):
|
||||
if use_builtin:
|
||||
breakpoint()
|
||||
breakpoint(hide_tb=False)
|
||||
|
||||
else:
|
||||
tractor.pause_from_sync()
|
||||
|
@ -20,18 +20,20 @@ def sync_pause(
|
|||
async def start_n_sync_pause(
|
||||
ctx: tractor.Context,
|
||||
):
|
||||
# sync to requesting peer
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
|
||||
# sync to parent-side task
|
||||
await ctx.started()
|
||||
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
print(f'entering SYNC PAUSE in {actor.uid}')
|
||||
sync_pause()
|
||||
print(f'back from SYNC PAUSE in {actor.uid}')
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
|
||||
async with tractor.open_nursery(
|
||||
# NOTE: required for pausing from sync funcs
|
||||
maybe_enable_greenback=True,
|
||||
debug_mode=True,
|
||||
) as an:
|
||||
|
||||
|
|
|
@ -39,8 +39,7 @@ msgspec='^0.18.5' # interchange
|
|||
wrapt = "^1.16.0" # decorators
|
||||
colorlog = "^6.8.2" # logging
|
||||
|
||||
# .devx tooling
|
||||
stackscope = "^0.2.2"
|
||||
# built-in multi-actor `pdb` REPL
|
||||
pdbp = "^1.5.0"
|
||||
|
||||
|
||||
|
@ -49,15 +48,19 @@ pdbp = "^1.5.0"
|
|||
# 'pyroute2
|
||||
|
||||
# ------ - ------
|
||||
xontrib-vox = "^0.0.1"
|
||||
|
||||
[tool.poetry.group.dev]
|
||||
optional = false
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
# testing
|
||||
pytest = "^8.2.0"
|
||||
pexpect = "^4.9.0"
|
||||
|
||||
# only for xonsh as sh..
|
||||
# .devx tooling
|
||||
greenback = "^1.2.1"
|
||||
stackscope = "^0.2.2"
|
||||
|
||||
# (light) xonsh usage/integration
|
||||
xontrib-vox = "^0.0.1"
|
||||
prompt-toolkit = "^3.0.43"
|
||||
xonsh-vox-tabcomplete = "^0.5"
|
||||
|
|
|
@ -6,30 +6,19 @@ related settings around IPC contexts.
|
|||
'''
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
contextmanager as cm,
|
||||
)
|
||||
# import typing
|
||||
from typing import (
|
||||
# Any,
|
||||
TypeAlias,
|
||||
# Union,
|
||||
)
|
||||
from contextvars import (
|
||||
Context,
|
||||
)
|
||||
|
||||
from msgspec import (
|
||||
# structs,
|
||||
# msgpack,
|
||||
Struct,
|
||||
# ValidationError,
|
||||
)
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
import tractor
|
||||
from tractor import (
|
||||
# _state,
|
||||
MsgTypeError,
|
||||
current_ipc_ctx,
|
||||
Portal,
|
||||
|
@ -40,20 +29,9 @@ from tractor.msg import (
|
|||
)
|
||||
from tractor.msg import (
|
||||
_codec,
|
||||
# _ctxvar_MsgCodec,
|
||||
|
||||
# NamespacePath,
|
||||
# MsgCodec,
|
||||
# mk_codec,
|
||||
# apply_codec,
|
||||
# current_codec,
|
||||
)
|
||||
from tractor.msg.types import (
|
||||
log,
|
||||
# _payload_msgs,
|
||||
# PayloadMsg,
|
||||
# Started,
|
||||
# mk_msg_spec,
|
||||
)
|
||||
|
||||
|
||||
|
@ -64,23 +42,10 @@ class PldMsg(Struct):
|
|||
maybe_msg_spec = PldMsg|None
|
||||
|
||||
|
||||
@cm
|
||||
def custom_spec(
|
||||
ctx: Context,
|
||||
spec: TypeAlias,
|
||||
) -> _codec.MsgCodec:
|
||||
'''
|
||||
Apply a custom payload spec, remove on exit.
|
||||
|
||||
'''
|
||||
rx: msgops.PldRx = ctx._pld_rx
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_expect_raises(
|
||||
raises: BaseException|None = None,
|
||||
ensure_in_message: list[str]|None = None,
|
||||
|
||||
reraise: bool = False,
|
||||
timeout: int = 3,
|
||||
) -> None:
|
||||
|
@ -88,6 +53,9 @@ async def maybe_expect_raises(
|
|||
Async wrapper for ensuring errors propagate from the inner scope.
|
||||
|
||||
'''
|
||||
if tractor._state.debug_mode():
|
||||
timeout += 999
|
||||
|
||||
with trio.fail_after(timeout):
|
||||
try:
|
||||
yield
|
||||
|
@ -103,9 +71,10 @@ async def maybe_expect_raises(
|
|||
# maybe check for error txt content
|
||||
if ensure_in_message:
|
||||
part: str
|
||||
err_repr: str = repr(inner_err)
|
||||
for part in ensure_in_message:
|
||||
for i, arg in enumerate(inner_err.args):
|
||||
if part in arg:
|
||||
if part in err_repr:
|
||||
break
|
||||
# if part never matches an arg, then we're
|
||||
# missing a match.
|
||||
|
@ -132,7 +101,7 @@ async def child(
|
|||
ctx: Context,
|
||||
started_value: int|PldMsg|None,
|
||||
return_value: str|None,
|
||||
validate_pld_spec: bool,
|
||||
validate_pld_spec: bool,
|
||||
raise_on_started_mte: bool = True,
|
||||
|
||||
) -> None:
|
||||
|
@ -166,13 +135,15 @@ async def child(
|
|||
# 2 cases: hdndle send-side and recv-only validation
|
||||
# - when `raise_on_started_mte == True`, send validate
|
||||
# - else, parent-recv-side only validation
|
||||
mte: MsgTypeError|None = None
|
||||
try:
|
||||
await ctx.started(
|
||||
value=started_value,
|
||||
validate_pld_spec=validate_pld_spec,
|
||||
)
|
||||
|
||||
except MsgTypeError:
|
||||
except MsgTypeError as _mte:
|
||||
mte = _mte
|
||||
log.exception('started()` raised an MTE!\n')
|
||||
if not expect_started_mte:
|
||||
raise RuntimeError(
|
||||
|
@ -180,15 +151,60 @@ async def child(
|
|||
f'{started_value!r}\n'
|
||||
)
|
||||
|
||||
boxed_div: str = '------ - ------'
|
||||
assert boxed_div not in mte._message
|
||||
assert boxed_div not in mte.tb_str
|
||||
assert boxed_div not in repr(mte)
|
||||
assert boxed_div not in str(mte)
|
||||
mte_repr: str = repr(mte)
|
||||
for line in mte.message.splitlines():
|
||||
assert line in mte_repr
|
||||
|
||||
# since this is a *local error* there should be no
|
||||
# boxed traceback content!
|
||||
assert not mte.tb_str
|
||||
|
||||
# propagate to parent?
|
||||
if raise_on_started_mte:
|
||||
raise
|
||||
else:
|
||||
if expect_started_mte:
|
||||
raise RuntimeError(
|
||||
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
|
||||
f'{started_value!r}\n'
|
||||
)
|
||||
|
||||
# no-send-side-error fallthrough
|
||||
if (
|
||||
validate_pld_spec
|
||||
and
|
||||
expect_started_mte
|
||||
):
|
||||
raise RuntimeError(
|
||||
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
|
||||
f'{started_value!r}\n'
|
||||
)
|
||||
|
||||
assert (
|
||||
not expect_started_mte
|
||||
or
|
||||
not validate_pld_spec
|
||||
)
|
||||
|
||||
# if wait_for_parent_to_cancel:
|
||||
# ...
|
||||
#
|
||||
# ^-TODO-^ logic for diff validation policies on each side:
|
||||
#
|
||||
# -[ ] ensure that if we don't validate on the send
|
||||
# side, that we are eventually error-cancelled by our
|
||||
# parent due to the bad `Started` payload!
|
||||
# -[ ] the boxed error should be srced from the parent's
|
||||
# runtime NOT ours!
|
||||
# -[ ] we should still error on bad `return_value`s
|
||||
# despite the parent not yet error-cancelling us?
|
||||
# |_ how do we want the parent side to look in that
|
||||
# case?
|
||||
# -[ ] maybe the equiv of "during handling of the
|
||||
# above error another occurred" for the case where
|
||||
# the parent sends a MTE to this child and while
|
||||
# waiting for the child to terminate it gets back
|
||||
# the MTE for this case?
|
||||
#
|
||||
|
||||
# XXX should always fail on recv side since we can't
|
||||
# really do much else beside terminate and relay the
|
||||
|
@ -211,8 +227,8 @@ async def child(
|
|||
@pytest.mark.parametrize(
|
||||
'return_value',
|
||||
[
|
||||
None,
|
||||
'yo',
|
||||
None,
|
||||
],
|
||||
ids=[
|
||||
'return[invalid-"yo"]',
|
||||
|
@ -271,16 +287,32 @@ def test_basic_payload_spec(
|
|||
# since not opened yet.
|
||||
assert current_ipc_ctx() is None
|
||||
|
||||
if invalid_started:
|
||||
msg_type_str: str = 'Started'
|
||||
bad_value_str: str = '10'
|
||||
elif invalid_return:
|
||||
msg_type_str: str = 'Return'
|
||||
bad_value_str: str = "'yo'"
|
||||
else:
|
||||
# XXX but should never be used below then..
|
||||
msg_type_str: str = ''
|
||||
bad_value_str: str = ''
|
||||
|
||||
maybe_mte: MsgTypeError|None = None
|
||||
should_raise: Exception|None = (
|
||||
MsgTypeError if (
|
||||
invalid_return
|
||||
or
|
||||
invalid_started
|
||||
) else None
|
||||
)
|
||||
async with (
|
||||
maybe_expect_raises(
|
||||
raises=MsgTypeError if (
|
||||
invalid_return
|
||||
or
|
||||
invalid_started
|
||||
) else None,
|
||||
raises=should_raise,
|
||||
ensure_in_message=[
|
||||
"invalid `Return` payload",
|
||||
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
|
||||
f"invalid `{msg_type_str}` msg payload",
|
||||
f"value: `{bad_value_str}` does not "
|
||||
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
||||
],
|
||||
),
|
||||
p.open_context(
|
||||
|
@ -298,18 +330,35 @@ def test_basic_payload_spec(
|
|||
assert first.field == 'yo'
|
||||
|
||||
try:
|
||||
assert (await ctx.result()) is None
|
||||
res: None|PldMsg = await ctx.result(hide_tb=False)
|
||||
assert res is None
|
||||
except MsgTypeError as mte:
|
||||
maybe_mte = mte
|
||||
if not invalid_return:
|
||||
raise
|
||||
|
||||
else: # expected this invalid `Return.pld`
|
||||
assert mte.cid == ctx.cid
|
||||
# expected this invalid `Return.pld` so audit
|
||||
# the error state + meta-data
|
||||
assert mte.expected_msg_type is Return
|
||||
assert mte.cid == ctx.cid
|
||||
mte_repr: str = repr(mte)
|
||||
for line in mte.message.splitlines():
|
||||
assert line in mte_repr
|
||||
|
||||
# verify expected remote mte deats
|
||||
await tractor.pause()
|
||||
assert ctx._remote_error is mte
|
||||
assert mte.expected_msg_type is Return
|
||||
assert mte.tb_str
|
||||
# await tractor.pause(shield=True)
|
||||
|
||||
# verify expected remote mte deats
|
||||
assert ctx._local_error is None
|
||||
assert (
|
||||
mte is
|
||||
ctx._remote_error is
|
||||
ctx.maybe_error is
|
||||
ctx.outcome
|
||||
)
|
||||
|
||||
if should_raise is None:
|
||||
assert maybe_mte is None
|
||||
|
||||
await p.cancel_actor()
|
||||
|
||||
|
|
|
@ -58,9 +58,6 @@ from typing import (
|
|||
import warnings
|
||||
# ------ - ------
|
||||
import trio
|
||||
from msgspec import (
|
||||
ValidationError,
|
||||
)
|
||||
# ------ - ------
|
||||
from ._exceptions import (
|
||||
ContextCancelled,
|
||||
|
@ -78,19 +75,16 @@ from .log import (
|
|||
from .msg import (
|
||||
Error,
|
||||
MsgType,
|
||||
MsgCodec,
|
||||
NamespacePath,
|
||||
PayloadT,
|
||||
Started,
|
||||
Stop,
|
||||
Yield,
|
||||
current_codec,
|
||||
pretty_struct,
|
||||
_ops as msgops,
|
||||
)
|
||||
from ._ipc import (
|
||||
Channel,
|
||||
_mk_msg_type_err,
|
||||
)
|
||||
from ._streaming import MsgStream
|
||||
from ._state import (
|
||||
|
@ -670,7 +664,7 @@ class Context:
|
|||
'Setting remote error for ctx\n\n'
|
||||
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||
f'=> {self.side!r}: {self._actor.uid}\n\n'
|
||||
f'{error}'
|
||||
f'{error!r}'
|
||||
)
|
||||
self._remote_error: BaseException = error
|
||||
|
||||
|
@ -724,7 +718,7 @@ class Context:
|
|||
log.error(
|
||||
f'Remote context error:\n\n'
|
||||
# f'{pformat(self)}\n'
|
||||
f'{error}'
|
||||
f'{error!r}'
|
||||
)
|
||||
|
||||
if self._canceller is None:
|
||||
|
@ -748,26 +742,27 @@ class Context:
|
|||
and not cs.cancel_called
|
||||
and not cs.cancelled_caught
|
||||
):
|
||||
if not (
|
||||
if (
|
||||
msgerr
|
||||
|
||||
# NOTE: we allow user to config not cancelling the
|
||||
# local scope on `MsgTypeError`s
|
||||
and not self._cancel_on_msgerr
|
||||
and
|
||||
not self._cancel_on_msgerr
|
||||
):
|
||||
# TODO: it'd sure be handy to inject our own
|
||||
# `trio.Cancelled` subtype here ;)
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||
self._scope.cancel()
|
||||
|
||||
else:
|
||||
message: str = (
|
||||
'NOT Cancelling `Context._scope` since,\n'
|
||||
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
||||
f'AND we got a msg-type-error!\n'
|
||||
f'{error}\n'
|
||||
)
|
||||
else:
|
||||
# TODO: it'd sure be handy to inject our own
|
||||
# `trio.Cancelled` subtype here ;)
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||
self._scope.cancel()
|
||||
|
||||
else:
|
||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||
# from .devx import mk_pdb
|
||||
|
@ -1657,54 +1652,21 @@ class Context:
|
|||
#
|
||||
__tracebackhide__: bool = hide_tb
|
||||
if validate_pld_spec:
|
||||
# __tracebackhide__: bool = False
|
||||
codec: MsgCodec = current_codec()
|
||||
msg_bytes: bytes = codec.encode(started_msg)
|
||||
try:
|
||||
roundtripped: Started = codec.decode(msg_bytes)
|
||||
# pld: PayloadT = await self.pld_rx.recv_pld(
|
||||
pld: PayloadT = self.pld_rx.dec_msg(
|
||||
msg=roundtripped,
|
||||
ipc=self,
|
||||
expect_msg=Started,
|
||||
hide_tb=hide_tb,
|
||||
is_started_send_side=True,
|
||||
)
|
||||
if (
|
||||
strict_pld_parity
|
||||
and
|
||||
pld != value
|
||||
):
|
||||
# TODO: make that one a mod func too..
|
||||
diff = pretty_struct.Struct.__sub__(
|
||||
roundtripped,
|
||||
started_msg,
|
||||
)
|
||||
complaint: str = (
|
||||
'Started value does not match after roundtrip?\n\n'
|
||||
f'{diff}'
|
||||
)
|
||||
raise ValidationError(complaint)
|
||||
|
||||
# raise any msg type error NO MATTER WHAT!
|
||||
except ValidationError as verr:
|
||||
# always show this src frame in the tb
|
||||
# __tracebackhide__: bool = False
|
||||
raise _mk_msg_type_err(
|
||||
msg=roundtripped,
|
||||
codec=codec,
|
||||
src_validation_error=verr,
|
||||
verb_header='Trying to send ',
|
||||
is_invalid_payload=True,
|
||||
) from verr
|
||||
msgops.validate_payload_msg(
|
||||
pld_msg=started_msg,
|
||||
pld_value=value,
|
||||
ipc=self,
|
||||
strict_pld_parity=strict_pld_parity,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
|
||||
# TODO: maybe a flag to by-pass encode op if already done
|
||||
# here in caller?
|
||||
await self.chan.send(started_msg)
|
||||
|
||||
# set msg-related internal runtime-state
|
||||
self._started_called = True
|
||||
self._started_msg = started_msg
|
||||
self._started_called: bool = True
|
||||
self._started_msg: Started = started_msg
|
||||
self._started_pld = value
|
||||
|
||||
async def _drain_overflows(
|
||||
|
@ -2097,6 +2059,12 @@ async def open_context_from_portal(
|
|||
if maybe_msgdec:
|
||||
assert maybe_msgdec.pld_spec == pld_spec
|
||||
|
||||
# NOTE: this in an implicit runtime nursery used to,
|
||||
# - start overrun queuing tasks when as well as
|
||||
# for cancellation of the scope opened by the user.
|
||||
ctx._scope_nursery: trio.Nursery = tn
|
||||
ctx._scope: trio.CancelScope = tn.cancel_scope
|
||||
|
||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||
# `Started`-msg any cancellation triggered
|
||||
# in `._maybe_cancel_and_set_remote_error()` will
|
||||
|
@ -2104,25 +2072,42 @@ async def open_context_from_portal(
|
|||
# -> it's expected that if there is an error in this phase of
|
||||
# the dialog, the `Error` msg should be raised from the `msg`
|
||||
# handling block below.
|
||||
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
|
||||
ipc=ctx,
|
||||
expect_msg=Started,
|
||||
passthrough_non_pld_msgs=False,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
try:
|
||||
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
|
||||
ipc=ctx,
|
||||
expect_msg=Started,
|
||||
passthrough_non_pld_msgs=False,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
except trio.Cancelled as taskc:
|
||||
ctx_cs: trio.CancelScope = ctx._scope
|
||||
if not ctx_cs.cancel_called:
|
||||
raise
|
||||
|
||||
# from .devx import pause
|
||||
# await pause(shield=True)
|
||||
|
||||
log.cancel(
|
||||
'IPC ctx was cancelled during "child" task sync due to\n\n'
|
||||
f'{ctx.maybe_error}\n'
|
||||
)
|
||||
# OW if the ctx's scope was cancelled manually,
|
||||
# likely the `Context` was cancelled via a call to
|
||||
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||
# we raise the underlying `._remote_error` directly
|
||||
# instead of bubbling that taskc.
|
||||
ctx.maybe_raise()
|
||||
|
||||
# OW, some other unexpected cancel condition
|
||||
# that should prolly never happen right?
|
||||
raise InternalError(
|
||||
'Invalid cancellation during IPC ctx sync phase?\n'
|
||||
) from taskc
|
||||
|
||||
# from .devx import pause
|
||||
# await pause()
|
||||
ctx._started_called: bool = True
|
||||
ctx._started_msg: bool = started_msg
|
||||
ctx._started_pld: bool = first
|
||||
|
||||
# NOTE: this in an implicit runtime nursery used to,
|
||||
# - start overrun queuing tasks when as well as
|
||||
# for cancellation of the scope opened by the user.
|
||||
ctx._scope_nursery: trio.Nursery = tn
|
||||
ctx._scope: trio.CancelScope = tn.cancel_scope
|
||||
|
||||
# deliver context instance and .started() msg value
|
||||
# in enter tuple.
|
||||
yield ctx, first
|
||||
|
|
|
@ -22,6 +22,7 @@ from __future__ import annotations
|
|||
import builtins
|
||||
import importlib
|
||||
from pprint import pformat
|
||||
import sys
|
||||
from types import (
|
||||
TracebackType,
|
||||
)
|
||||
|
@ -110,6 +111,7 @@ _body_fields: list[str] = list(
|
|||
'tb_str',
|
||||
'relay_path',
|
||||
'cid',
|
||||
'message',
|
||||
|
||||
# only ctxc should show it but `Error` does
|
||||
# have it as an optional field.
|
||||
|
@ -236,6 +238,7 @@ class RemoteActorError(Exception):
|
|||
self._boxed_type: BaseException = boxed_type
|
||||
self._src_type: BaseException|None = None
|
||||
self._ipc_msg: Error|None = ipc_msg
|
||||
self._extra_msgdata = extra_msgdata
|
||||
|
||||
if (
|
||||
extra_msgdata
|
||||
|
@ -250,8 +253,6 @@ class RemoteActorError(Exception):
|
|||
k,
|
||||
v,
|
||||
)
|
||||
else:
|
||||
self._extra_msgdata = extra_msgdata
|
||||
|
||||
# TODO: mask out eventually or place in `pack_error()`
|
||||
# pre-`return` lines?
|
||||
|
@ -282,6 +283,17 @@ class RemoteActorError(Exception):
|
|||
# ensure any roundtripping evals to the input value
|
||||
assert self.boxed_type is boxed_type
|
||||
|
||||
@property
|
||||
def message(self) -> str:
|
||||
'''
|
||||
Be explicit, instead of trying to read it from the the parent
|
||||
type's loosely defined `.args: tuple`:
|
||||
|
||||
https://docs.python.org/3/library/exceptions.html#BaseException.args
|
||||
|
||||
'''
|
||||
return self._message
|
||||
|
||||
@property
|
||||
def ipc_msg(self) -> Struct:
|
||||
'''
|
||||
|
@ -355,7 +367,10 @@ class RemoteActorError(Exception):
|
|||
|
||||
'''
|
||||
bt: Type[BaseException] = self.boxed_type
|
||||
return str(bt.__name__)
|
||||
if bt:
|
||||
return str(bt.__name__)
|
||||
|
||||
return ''
|
||||
|
||||
@property
|
||||
def boxed_type(self) -> Type[BaseException]:
|
||||
|
@ -426,8 +441,7 @@ class RemoteActorError(Exception):
|
|||
|
||||
for key in fields:
|
||||
if (
|
||||
key == 'relay_uid'
|
||||
and not self.is_inception()
|
||||
key == 'relay_uid' and not self.is_inception()
|
||||
):
|
||||
continue
|
||||
|
||||
|
@ -504,19 +518,80 @@ class RemoteActorError(Exception):
|
|||
def pformat(
|
||||
self,
|
||||
with_type_header: bool = True,
|
||||
# with_ascii_box: bool = True,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Nicely formatted boxed error meta data + traceback, OR just
|
||||
the normal message from `.args` (for eg. as you'd want shown
|
||||
by a locally raised `ContextCancelled`).
|
||||
Format any boxed remote error by multi-line display of,
|
||||
|
||||
- error's src or relay actor meta-data,
|
||||
- remote runtime env's traceback,
|
||||
|
||||
With optional control over the format of,
|
||||
|
||||
- whether the boxed traceback is ascii-decorated with
|
||||
a surrounding "box" annotating the embedded stack-trace.
|
||||
- if the error's type name should be added as margins
|
||||
around the field and tb content like:
|
||||
|
||||
`<RemoteActorError(.. <<multi-line-content>> .. )>`
|
||||
|
||||
- the placement of the `.message: str` (explicit equiv of
|
||||
`.args[0]`), either placed below the `.tb_str` or in the
|
||||
first line's header when the error is raised locally (since
|
||||
the type name is already implicitly shown by python).
|
||||
|
||||
'''
|
||||
header: str = ''
|
||||
body: str = ''
|
||||
message: str = ''
|
||||
|
||||
# XXX when the currently raised exception is this instance,
|
||||
# we do not ever use the "type header" style repr.
|
||||
is_being_raised: bool = False
|
||||
if (
|
||||
(exc := sys.exception())
|
||||
and
|
||||
exc is self
|
||||
):
|
||||
is_being_raised: bool = True
|
||||
|
||||
with_type_header: bool = (
|
||||
with_type_header
|
||||
and
|
||||
not is_being_raised
|
||||
)
|
||||
|
||||
# <RemoteActorError( .. )> style
|
||||
if with_type_header:
|
||||
header: str = f'<{type(self).__name__}(\n'
|
||||
header: str = f'<{type(self).__name__}('
|
||||
|
||||
if message := self._message:
|
||||
|
||||
# split off the first line so, if needed, it isn't
|
||||
# indented the same like the "boxed content" which
|
||||
# since there is no `.tb_str` is just the `.message`.
|
||||
lines: list[str] = message.splitlines()
|
||||
first: str = lines[0]
|
||||
message: str = message.removeprefix(first)
|
||||
|
||||
# with a type-style header we,
|
||||
# - have no special message "first line" extraction/handling
|
||||
# - place the message a space in from the header:
|
||||
# `MsgTypeError( <message> ..`
|
||||
# ^-here
|
||||
# - indent the `.message` inside the type body.
|
||||
if with_type_header:
|
||||
first = f' {first} )>'
|
||||
|
||||
message: str = textwrap.indent(
|
||||
message,
|
||||
prefix=' '*2,
|
||||
)
|
||||
message: str = first + message
|
||||
|
||||
# IFF there is an embedded traceback-str we always
|
||||
# draw the ascii-box around it.
|
||||
if tb_str := self.tb_str:
|
||||
fields: str = self._mk_fields_str(
|
||||
_body_fields
|
||||
|
@ -535,36 +610,19 @@ class RemoteActorError(Exception):
|
|||
# |___ ..
|
||||
tb_body_indent=1,
|
||||
)
|
||||
if not with_type_header:
|
||||
body = '\n' + body
|
||||
|
||||
elif message := self._message:
|
||||
# split off the first line so it isn't indented
|
||||
# the same like the "boxed content".
|
||||
if not with_type_header:
|
||||
lines: list[str] = message.splitlines()
|
||||
first: str = lines[0]
|
||||
message: str = message.removeprefix(first)
|
||||
|
||||
else:
|
||||
first: str = ''
|
||||
|
||||
body: str = (
|
||||
first
|
||||
+
|
||||
message
|
||||
+
|
||||
'\n'
|
||||
)
|
||||
|
||||
if with_type_header:
|
||||
tail: str = ')>'
|
||||
else:
|
||||
tail = ''
|
||||
tail = ''
|
||||
if (
|
||||
with_type_header
|
||||
and not message
|
||||
):
|
||||
tail: str = '>'
|
||||
|
||||
return (
|
||||
header
|
||||
+
|
||||
message
|
||||
+
|
||||
f'{body}'
|
||||
+
|
||||
tail
|
||||
|
@ -577,7 +635,9 @@ class RemoteActorError(Exception):
|
|||
# |_ i guess `pexepect` relies on `str`-casing
|
||||
# of output?
|
||||
def __str__(self) -> str:
|
||||
return self.pformat(with_type_header=False)
|
||||
return self.pformat(
|
||||
with_type_header=False
|
||||
)
|
||||
|
||||
def unwrap(
|
||||
self,
|
||||
|
@ -825,9 +885,6 @@ class MsgTypeError(
|
|||
extra_msgdata['_bad_msg'] = bad_msg
|
||||
extra_msgdata['cid'] = bad_msg.cid
|
||||
|
||||
if 'cid' not in extra_msgdata:
|
||||
import pdbp; pdbp.set_trace()
|
||||
|
||||
return cls(
|
||||
message=message,
|
||||
boxed_type=cls,
|
||||
|
@ -889,6 +946,7 @@ def pack_error(
|
|||
src_uid: tuple[str, str]|None = None,
|
||||
tb: TracebackType|None = None,
|
||||
tb_str: str = '',
|
||||
message: str = '',
|
||||
|
||||
) -> Error:
|
||||
'''
|
||||
|
@ -902,7 +960,7 @@ def pack_error(
|
|||
tb_str: str = (
|
||||
''.join(traceback.format_exception(exc))
|
||||
|
||||
# TODO: can we remove this is `exc` is required?
|
||||
# TODO: can we remove this since `exc` is required.. right?
|
||||
or
|
||||
# NOTE: this is just a shorthand for the "last error" as
|
||||
# provided by `sys.exeception()`, see:
|
||||
|
@ -917,8 +975,8 @@ def pack_error(
|
|||
# when caller provides a tb instance (say pulled from some other
|
||||
# src error's `.__traceback__`) we use that as the "boxed"
|
||||
# tb-string instead.
|
||||
# https://docs.python.org/3/library/traceback.html#traceback.format_list
|
||||
if tb:
|
||||
# https://docs.python.org/3/library/traceback.html#traceback.format_list
|
||||
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
|
||||
|
||||
error_msg: dict[ # for IPC
|
||||
|
@ -961,17 +1019,17 @@ def pack_error(
|
|||
error_msg['src_type_str'] = type(exc).__name__
|
||||
error_msg['boxed_type_str'] = type(exc).__name__
|
||||
|
||||
# XXX alawys append us the last relay in error propagation path
|
||||
# XXX always append us the last relay in error propagation path
|
||||
error_msg.setdefault(
|
||||
'relay_path',
|
||||
[],
|
||||
).append(our_uid)
|
||||
|
||||
# XXX NOTE: always ensure the traceback-str is from the
|
||||
# locally raised error (**not** the prior relay's boxed
|
||||
# content's in `._ipc_msg.tb_str`).
|
||||
# XXX NOTE XXX always ensure the traceback-str content is from
|
||||
# the locally raised error (so, NOT the prior relay's boxed
|
||||
# `._ipc_msg.tb_str`).
|
||||
error_msg['tb_str'] = tb_str
|
||||
|
||||
error_msg['message'] = message or getattr(exc, 'message', '')
|
||||
if cid is not None:
|
||||
error_msg['cid'] = cid
|
||||
|
||||
|
@ -995,26 +1053,24 @@ def unpack_error(
|
|||
if not isinstance(msg, Error):
|
||||
return None
|
||||
|
||||
# retrieve the remote error's msg-encoded details
|
||||
tb_str: str = msg.tb_str
|
||||
message: str = (
|
||||
f'{chan.uid}\n'
|
||||
+
|
||||
tb_str
|
||||
)
|
||||
|
||||
# try to lookup a suitable error type from the local runtime
|
||||
# env then use it to construct a local instance.
|
||||
# boxed_type_str: str = error_dict['boxed_type_str']
|
||||
boxed_type_str: str = msg.boxed_type_str
|
||||
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
|
||||
|
||||
if boxed_type_str == 'ContextCancelled':
|
||||
box_type = ContextCancelled
|
||||
assert boxed_type is box_type
|
||||
# retrieve the error's msg-encoded remotoe-env info
|
||||
message: str = f'remote task raised a {msg.boxed_type_str!r}\n'
|
||||
|
||||
elif boxed_type_str == 'MsgTypeError':
|
||||
box_type = MsgTypeError
|
||||
# TODO: do we even really need these checks for RAEs?
|
||||
if boxed_type_str in [
|
||||
'ContextCancelled',
|
||||
'MsgTypeError',
|
||||
]:
|
||||
box_type = {
|
||||
'ContextCancelled': ContextCancelled,
|
||||
'MsgTypeError': MsgTypeError,
|
||||
}[boxed_type_str]
|
||||
assert boxed_type is box_type
|
||||
|
||||
# TODO: already included by `_this_mod` in else loop right?
|
||||
|
@ -1029,19 +1085,21 @@ def unpack_error(
|
|||
exc = box_type(
|
||||
message,
|
||||
ipc_msg=msg,
|
||||
tb_str=msg.tb_str,
|
||||
)
|
||||
|
||||
return exc
|
||||
|
||||
|
||||
def is_multi_cancelled(exc: BaseException) -> bool:
|
||||
def is_multi_cancelled(
|
||||
exc: BaseException|BaseExceptionGroup
|
||||
) -> bool:
|
||||
'''
|
||||
Predicate to determine if a possible ``BaseExceptionGroup`` contains
|
||||
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
|
||||
cancelling a collection of subtasks.
|
||||
|
||||
'''
|
||||
# if isinstance(exc, eg.BaseExceptionGroup):
|
||||
if isinstance(exc, BaseExceptionGroup):
|
||||
return exc.subgroup(
|
||||
lambda exc: isinstance(exc, trio.Cancelled)
|
||||
|
@ -1109,6 +1167,7 @@ def _raise_from_unexpected_msg(
|
|||
msg,
|
||||
ctx.chan,
|
||||
)
|
||||
ctx._maybe_cancel_and_set_remote_error(exc)
|
||||
raise exc from src_err
|
||||
|
||||
# `MsgStream` termination msg.
|
||||
|
@ -1183,7 +1242,6 @@ def _mk_msg_type_err(
|
|||
src_validation_error: ValidationError|None = None,
|
||||
src_type_error: TypeError|None = None,
|
||||
is_invalid_payload: bool = False,
|
||||
# src_err_msg: Error|None = None,
|
||||
|
||||
**mte_kwargs,
|
||||
|
||||
|
@ -1250,19 +1308,11 @@ def _mk_msg_type_err(
|
|||
msg_type: str = type(msg)
|
||||
any_pld: Any = msgpack.decode(msg.pld)
|
||||
message: str = (
|
||||
f'invalid `{msg_type.__qualname__}` payload\n\n'
|
||||
f'value: `{any_pld!r}` does not match type-spec: ' #\n'
|
||||
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
|
||||
f'value: `{any_pld!r}` does not match type-spec: '
|
||||
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
||||
# f'<{type(msg).__qualname__}(\n'
|
||||
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
|
||||
# f')>\n\n'
|
||||
)
|
||||
# src_err_msg = msg
|
||||
bad_msg = msg
|
||||
# TODO: should we just decode the msg to a dict despite
|
||||
# only the payload being wrong?
|
||||
# -[ ] maybe the better design is to break this construct
|
||||
# logic into a separate explicit helper raiser-func?
|
||||
|
||||
else:
|
||||
# decode the msg-bytes using the std msgpack
|
||||
|
@ -1307,21 +1357,21 @@ def _mk_msg_type_err(
|
|||
if verb_header:
|
||||
message = f'{verb_header} ' + message
|
||||
|
||||
# if not isinstance(bad_msg, PayloadMsg):
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
msgtyperr = MsgTypeError.from_decode(
|
||||
message=message,
|
||||
bad_msg=bad_msg,
|
||||
bad_msg_as_dict=msg_dict,
|
||||
|
||||
# NOTE: for the send-side `.started()` pld-validate
|
||||
# case we actually set the `._ipc_msg` AFTER we return
|
||||
# from here inside `Context.started()` since we actually
|
||||
# want to emulate the `Error` from the mte we build here
|
||||
# Bo
|
||||
# so by default in that case this is set to `None`
|
||||
# ipc_msg=src_err_msg,
|
||||
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
||||
# - for the send-side `.started()` pld-validate
|
||||
# case we actually raise inline so we don't need to
|
||||
# set the it at all.
|
||||
# - for recv side we set it inside `PldRx.decode_pld()`
|
||||
# after a manual call to `pack_error()` since we
|
||||
# actually want to emulate the `Error` from the mte we
|
||||
# build here. So by default in that case, this is left
|
||||
# as `None` here.
|
||||
# ipc_msg=src_err_msg,
|
||||
)
|
||||
msgtyperr.__cause__ = src_validation_error
|
||||
return msgtyperr
|
||||
|
|
|
@ -291,7 +291,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
|
||||
async def send(
|
||||
self,
|
||||
msg: msgtypes.Msg,
|
||||
msg: msgtypes.MsgType,
|
||||
|
||||
strict_types: bool = True,
|
||||
# hide_tb: bool = False,
|
||||
|
|
|
@ -64,6 +64,7 @@ from .log import get_logger
|
|||
from .msg import (
|
||||
current_codec,
|
||||
MsgCodec,
|
||||
PayloadT,
|
||||
NamespacePath,
|
||||
pretty_struct,
|
||||
)
|
||||
|
@ -98,7 +99,7 @@ async def _invoke_non_context(
|
|||
|
||||
treat_as_gen: bool,
|
||||
is_rpc: bool,
|
||||
return_msg: Return|CancelAck = Return,
|
||||
return_msg_type: Return|CancelAck = Return,
|
||||
|
||||
task_status: TaskStatus[
|
||||
Context | BaseException
|
||||
|
@ -220,7 +221,7 @@ async def _invoke_non_context(
|
|||
and chan.connected()
|
||||
):
|
||||
try:
|
||||
ret_msg = return_msg(
|
||||
ret_msg = return_msg_type(
|
||||
cid=cid,
|
||||
pld=result,
|
||||
)
|
||||
|
@ -392,16 +393,22 @@ async def _errors_relayed_via_ipc(
|
|||
# cancel scope will not have been inserted yet
|
||||
if is_rpc:
|
||||
log.warning(
|
||||
'RPC task likely errored or cancelled before start?'
|
||||
f'|_{ctx._task}\n'
|
||||
f' >> {ctx.repr_rpc}\n'
|
||||
)
|
||||
else:
|
||||
log.cancel(
|
||||
'Failed to de-alloc internal runtime cancel task?\n'
|
||||
'RPC task likely errored or cancelled before start?\n'
|
||||
f'|_{ctx._task}\n'
|
||||
f' >> {ctx.repr_rpc}\n'
|
||||
)
|
||||
# TODO: remove this right? rn the only non-`is_rpc` cases
|
||||
# are cancellation methods and according the RPC loop eps
|
||||
# for thoses below, nothing is ever registered in
|
||||
# `Actor._rpc_tasks` for those cases.. but should we?
|
||||
#
|
||||
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
|
||||
# else:
|
||||
# log.cancel(
|
||||
# 'Failed to de-alloc internal runtime cancel task?\n'
|
||||
# f'|_{ctx._task}\n'
|
||||
# f' >> {ctx.repr_rpc}\n'
|
||||
# )
|
||||
|
||||
finally:
|
||||
if not actor._rpc_tasks:
|
||||
|
@ -419,7 +426,7 @@ async def _invoke(
|
|||
|
||||
is_rpc: bool = True,
|
||||
hide_tb: bool = True,
|
||||
return_msg: Return|CancelAck = Return,
|
||||
return_msg_type: Return|CancelAck = Return,
|
||||
|
||||
task_status: TaskStatus[
|
||||
Context | BaseException
|
||||
|
@ -533,7 +540,7 @@ async def _invoke(
|
|||
kwargs,
|
||||
treat_as_gen,
|
||||
is_rpc,
|
||||
return_msg,
|
||||
return_msg_type,
|
||||
task_status,
|
||||
)
|
||||
# XXX below fallthrough is ONLY for `@context` eps
|
||||
|
@ -593,18 +600,21 @@ async def _invoke(
|
|||
ctx._scope = tn.cancel_scope
|
||||
task_status.started(ctx)
|
||||
|
||||
# TODO: should would be nice to have our
|
||||
# `TaskMngr` nursery here!
|
||||
res: Any = await coro
|
||||
ctx._result = res
|
||||
|
||||
# deliver final result to caller side.
|
||||
await chan.send(
|
||||
return_msg(
|
||||
cid=cid,
|
||||
pld=res,
|
||||
)
|
||||
# TODO: better `trionics` tooling:
|
||||
# -[ ] should would be nice to have our `TaskMngr`
|
||||
# nursery here!
|
||||
# -[ ] payload value checking like we do with
|
||||
# `.started()` such that the debbuger can engage
|
||||
# here in the child task instead of waiting for the
|
||||
# parent to crash with it's own MTE..
|
||||
res: Any|PayloadT = await coro
|
||||
return_msg: Return|CancelAck = return_msg_type(
|
||||
cid=cid,
|
||||
pld=res,
|
||||
)
|
||||
# set and shuttle final result to "parent"-side task.
|
||||
ctx._result = res
|
||||
await chan.send(return_msg)
|
||||
|
||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||
# called by any of,
|
||||
|
@ -940,7 +950,7 @@ async def process_messages(
|
|||
actor.cancel,
|
||||
kwargs,
|
||||
is_rpc=False,
|
||||
return_msg=CancelAck,
|
||||
return_msg_type=CancelAck,
|
||||
)
|
||||
|
||||
log.runtime(
|
||||
|
@ -974,7 +984,7 @@ async def process_messages(
|
|||
actor._cancel_task,
|
||||
kwargs,
|
||||
is_rpc=False,
|
||||
return_msg=CancelAck,
|
||||
return_msg_type=CancelAck,
|
||||
)
|
||||
except BaseException:
|
||||
log.exception(
|
||||
|
|
|
@ -1256,9 +1256,10 @@ class Actor:
|
|||
# - child returns a result before cancel-msg/ctxc-raised
|
||||
# - child self raises ctxc before parent send request,
|
||||
# - child errors prior to cancel req.
|
||||
log.cancel(
|
||||
'Cancel request invalid, RPC task already completed?\n\n'
|
||||
f'<= canceller: {requesting_uid}\n\n'
|
||||
log.runtime(
|
||||
'Cancel request for invalid RPC task.\n'
|
||||
'The task likely already completed or was never started!\n\n'
|
||||
f'<= canceller: {requesting_uid}\n'
|
||||
f'=> {cid}@{parent_chan.uid}\n'
|
||||
f' |_{parent_chan}\n'
|
||||
)
|
||||
|
|
|
@ -140,7 +140,7 @@ class MsgDec(Struct):
|
|||
# * also a `.__contains__()` for doing `None in
|
||||
# TypeSpec[None|int]` since rn you need to do it on
|
||||
# `.__args__` for unions..
|
||||
# - `MsgSpec: Union[Type[Msg]]
|
||||
# - `MsgSpec: Union[MsgType]
|
||||
#
|
||||
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
|
||||
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
|
||||
|
@ -188,7 +188,7 @@ def mk_dec(
|
|||
|
||||
return MsgDec(
|
||||
_dec=msgpack.Decoder(
|
||||
type=spec, # like `Msg[Any]`
|
||||
type=spec, # like `MsgType[Any]`
|
||||
dec_hook=dec_hook,
|
||||
)
|
||||
)
|
||||
|
@ -561,7 +561,7 @@ def mk_codec(
|
|||
|
||||
'''
|
||||
# (manually) generate a msg-payload-spec for all relevant
|
||||
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
|
||||
# god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT`
|
||||
# for the decoder such that all sub-type msgs in our SCIPP
|
||||
# will automatically decode to a type-"limited" payload (`Struct`)
|
||||
# object (set).
|
||||
|
@ -607,7 +607,7 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
|||
|
||||
# The built-in IPC `Msg` spec.
|
||||
# Our composing "shuttle" protocol which allows `tractor`-app code
|
||||
# to use any `msgspec` supported type as the `Msg.pld` payload,
|
||||
# to use any `msgspec` supported type as the `PayloadMsg.pld` payload,
|
||||
# https://jcristharif.com/msgspec/supported-types.html
|
||||
#
|
||||
_def_tractor_codec: MsgCodec = mk_codec(
|
||||
|
@ -743,7 +743,7 @@ def limit_msg_spec(
|
|||
) -> MsgCodec:
|
||||
'''
|
||||
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
||||
`Msg.pld: Union[Type[Struct]]` payload fields using
|
||||
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using
|
||||
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
||||
for all IPC contexts in use by the current `trio.Task`.
|
||||
|
||||
|
|
|
@ -53,6 +53,8 @@ from tractor._state import current_ipc_ctx
|
|||
from ._codec import (
|
||||
mk_dec,
|
||||
MsgDec,
|
||||
MsgCodec,
|
||||
current_codec,
|
||||
)
|
||||
from .types import (
|
||||
CancelAck,
|
||||
|
@ -213,6 +215,9 @@ class PldRx(Struct):
|
|||
**dec_msg_kwargs,
|
||||
)
|
||||
|
||||
# TODO: rename to,
|
||||
# -[ ] `.decode_pld()`?
|
||||
# -[ ] `.dec_pld()`?
|
||||
def dec_msg(
|
||||
self,
|
||||
msg: MsgType,
|
||||
|
@ -246,8 +251,8 @@ class PldRx(Struct):
|
|||
pld: PayloadT = self._pld_dec.decode(pld)
|
||||
log.runtime(
|
||||
'Decoded msg payload\n\n'
|
||||
f'{msg}\n\n'
|
||||
f'where payload is\n'
|
||||
f'{msg}\n'
|
||||
f'where payload decoded as\n'
|
||||
f'|_pld={pld!r}\n'
|
||||
)
|
||||
return pld
|
||||
|
@ -263,13 +268,29 @@ class PldRx(Struct):
|
|||
src_validation_error=valerr,
|
||||
is_invalid_payload=True,
|
||||
expected_msg=expect_msg,
|
||||
# ipc_msg=msg,
|
||||
)
|
||||
# NOTE: override the `msg` passed to
|
||||
# `_raise_from_unexpected_msg()` (below) so so that
|
||||
# we're effectively able to use that same func to
|
||||
# unpack and raise an "emulated remote `Error`" of
|
||||
# this local MTE.
|
||||
# NOTE: just raise the MTE inline instead of all
|
||||
# the pack-unpack-repack non-sense when this is
|
||||
# a "send side" validation error.
|
||||
if is_started_send_side:
|
||||
raise mte
|
||||
|
||||
# XXX TODO: remove this right?
|
||||
# => any bad stated/return values should
|
||||
# always be treated a remote errors right?
|
||||
#
|
||||
# if (
|
||||
# expect_msg is Return
|
||||
# or expect_msg is Started
|
||||
# ):
|
||||
# # set emulated remote error more-or-less as the
|
||||
# # runtime would
|
||||
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
# ctx._maybe_cancel_and_set_remote_error(mte)
|
||||
|
||||
# NOTE: the `.message` is automatically
|
||||
# transferred into the message as long as we
|
||||
# define it as a `Error.message` field.
|
||||
err_msg: Error = pack_error(
|
||||
exc=mte,
|
||||
cid=msg.cid,
|
||||
|
@ -279,36 +300,38 @@ class PldRx(Struct):
|
|||
else ipc._actor.uid
|
||||
),
|
||||
# tb=valerr.__traceback__,
|
||||
tb_str=mte._message,
|
||||
# tb_str=mte._message,
|
||||
# message=mte._message,
|
||||
)
|
||||
# ^-TODO-^ just raise this inline instead of all the
|
||||
# pack-unpack-repack non-sense!
|
||||
|
||||
mte._ipc_msg = err_msg
|
||||
msg = err_msg
|
||||
|
||||
# set emulated remote error more-or-less as the
|
||||
# runtime would
|
||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
# XXX override the `msg` passed to
|
||||
# `_raise_from_unexpected_msg()` (below) so so
|
||||
# that we're effectively able to use that same
|
||||
# func to unpack and raise an "emulated remote
|
||||
# `Error`" of this local MTE.
|
||||
msg = err_msg
|
||||
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
|
||||
# raises the boxed `err_msg` from above it raises
|
||||
# it from the above caught interchange-lib
|
||||
# validation error.
|
||||
src_err = valerr
|
||||
|
||||
# TODO: should we instead make this explicit and
|
||||
# use the above masked `is_started_send_decode`,
|
||||
# expecting the `Context.started()` caller to set
|
||||
# it? Rn this is kinda, howyousayyy, implicitly
|
||||
# edge-case-y..
|
||||
if (
|
||||
expect_msg is not Started
|
||||
and not is_started_send_side
|
||||
):
|
||||
ctx._maybe_cancel_and_set_remote_error(mte)
|
||||
|
||||
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
|
||||
# raises the boxed `err_msg` from above it raises
|
||||
# it from `None`.
|
||||
src_err = valerr
|
||||
# if is_started_send_side:
|
||||
# src_err = None
|
||||
|
||||
# TODO: remove this since it's been added to
|
||||
# `_raise_from_unexpected_msg()`..?
|
||||
# if (
|
||||
# expect_msg is not Started
|
||||
# and not is_started_send_side
|
||||
# ):
|
||||
# # set emulated remote error more-or-less as the
|
||||
# # runtime would
|
||||
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
# ctx._maybe_cancel_and_set_remote_error(mte)
|
||||
|
||||
# XXX some other decoder specific failure?
|
||||
# except TypeError as src_error:
|
||||
|
@ -559,6 +582,7 @@ async def drain_to_final_msg(
|
|||
ipc=ctx,
|
||||
expect_msg=Return,
|
||||
raise_error=False,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
# ^-TODO-^ some bad ideas?
|
||||
# -[ ] wrap final outcome .receive() in a scope so
|
||||
|
@ -737,9 +761,61 @@ async def drain_to_final_msg(
|
|||
)
|
||||
|
||||
|
||||
# TODO: factor logic from `.Context.started()` for send-side
|
||||
# validate raising!
|
||||
def validate_payload_msg(
|
||||
msg: Started|Yield|Return,
|
||||
pld_msg: Started|Yield|Return,
|
||||
pld_value: PayloadT,
|
||||
ipc: Context|MsgStream,
|
||||
|
||||
raise_mte: bool = True,
|
||||
strict_pld_parity: bool = False,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> MsgTypeError|None:
|
||||
...
|
||||
'''
|
||||
Validate a `PayloadMsg.pld` value with the current
|
||||
IPC ctx's `PldRx` and raise an appropriate `MsgTypeError`
|
||||
on failure.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
codec: MsgCodec = current_codec()
|
||||
msg_bytes: bytes = codec.encode(pld_msg)
|
||||
try:
|
||||
roundtripped: Started = codec.decode(msg_bytes)
|
||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
pld: PayloadT = ctx.pld_rx.dec_msg(
|
||||
msg=roundtripped,
|
||||
ipc=ipc,
|
||||
expect_msg=Started,
|
||||
hide_tb=hide_tb,
|
||||
is_started_send_side=True,
|
||||
)
|
||||
if (
|
||||
strict_pld_parity
|
||||
and
|
||||
pld != pld_value
|
||||
):
|
||||
# TODO: make that one a mod func too..
|
||||
diff = pretty_struct.Struct.__sub__(
|
||||
roundtripped,
|
||||
pld_msg,
|
||||
)
|
||||
complaint: str = (
|
||||
'Started value does not match after roundtrip?\n\n'
|
||||
f'{diff}'
|
||||
)
|
||||
raise ValidationError(complaint)
|
||||
|
||||
# raise any msg type error NO MATTER WHAT!
|
||||
except ValidationError as verr:
|
||||
mte: MsgTypeError = _mk_msg_type_err(
|
||||
msg=roundtripped,
|
||||
codec=codec,
|
||||
src_validation_error=verr,
|
||||
verb_header='Trying to send ',
|
||||
is_invalid_payload=True,
|
||||
)
|
||||
if not raise_mte:
|
||||
return mte
|
||||
|
||||
raise mte from verr
|
||||
|
|
|
@ -89,11 +89,12 @@ class PayloadMsg(
|
|||
# -[ ] `uuid.UUID` which has multi-protocol support
|
||||
# https://jcristharif.com/msgspec/supported-types.html#uuid
|
||||
|
||||
# The msgs "payload" (spelled without vowels):
|
||||
# The msg's "payload" (spelled without vowels):
|
||||
# https://en.wikipedia.org/wiki/Payload_(computing)
|
||||
#
|
||||
# NOTE: inherited from any `Msg` (and maybe overriden
|
||||
# by use of `limit_msg_spec()`), but by default is
|
||||
pld: Raw
|
||||
|
||||
# ^-NOTE-^ inherited from any `PayloadMsg` (and maybe type
|
||||
# overriden via the `._ops.limit_plds()` API), but by default is
|
||||
# parameterized to be `Any`.
|
||||
#
|
||||
# XXX this `Union` must strictly NOT contain `Any` if
|
||||
|
@ -106,7 +107,6 @@ class PayloadMsg(
|
|||
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
|
||||
# approach is preferred over the generic parameterization
|
||||
# approach as take by `mk_msg_spec()` below.
|
||||
pld: Raw
|
||||
|
||||
|
||||
# TODO: complete rename
|
||||
|
@ -410,21 +410,32 @@ class Error(
|
|||
src_type_str: str
|
||||
boxed_type_str: str
|
||||
relay_path: list[tuple[str, str]]
|
||||
tb_str: str
|
||||
|
||||
cid: str|None = None
|
||||
# normally either both are provided or just
|
||||
# a message for certain special cases where
|
||||
# we pack a message for a locally raised
|
||||
# mte or ctxc.
|
||||
message: str|None = None
|
||||
tb_str: str = ''
|
||||
|
||||
# TODO: use UNSET or don't include them via
|
||||
# TODO: only optionally include sub-type specfic fields?
|
||||
# -[ ] use UNSET or don't include them via `omit_defaults` (see
|
||||
# inheritance-line options above)
|
||||
#
|
||||
# `ContextCancelled`
|
||||
# `ContextCancelled` reports the src cancelling `Actor.uid`
|
||||
canceller: tuple[str, str]|None = None
|
||||
|
||||
# `StreamOverrun`
|
||||
# `StreamOverrun`-specific src `Actor.uid`
|
||||
sender: tuple[str, str]|None = None
|
||||
|
||||
# for the `MsgTypeError` case where the receiver side
|
||||
# decodes the underlying original `Msg`-subtype
|
||||
_msg_dict: dict|None = None
|
||||
# `MsgTypeError` meta-data
|
||||
cid: str|None = None
|
||||
# when the receiver side fails to decode a delivered
|
||||
# `PayloadMsg`-subtype; one and/or both the msg-struct instance
|
||||
# and `Any`-decoded to `dict` of the msg are set and relayed
|
||||
# (back to the sender) for introspection.
|
||||
_bad_msg: Started|Yield|Return|None = None
|
||||
_bad_msg_as_dict: dict|None = None
|
||||
|
||||
|
||||
def from_dict_msg(
|
||||
|
@ -436,9 +447,11 @@ def from_dict_msg(
|
|||
|
||||
) -> MsgType:
|
||||
'''
|
||||
Helper to build a specific `MsgType` struct from
|
||||
a "vanilla" decoded `dict`-ified equivalent of the
|
||||
msg: i.e. if the `msgpack.Decoder.type == Any`.
|
||||
Helper to build a specific `MsgType` struct from a "vanilla"
|
||||
decoded `dict`-ified equivalent of the msg: i.e. if the
|
||||
`msgpack.Decoder.type == Any`, the default when using
|
||||
`msgspec.msgpack` and not "typed decoding" using
|
||||
`msgspec.Struct`.
|
||||
|
||||
'''
|
||||
msg_type_tag_field: str = (
|
||||
|
|
Loading…
Reference in New Issue