Compare commits
16 Commits
eee4c61b51
...
0e8c60ee4a
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 0e8c60ee4a | |
Tyler Goodlet | 1db5d4def2 | |
Tyler Goodlet | 6e54abc56d | |
Tyler Goodlet | 28af4749cc | |
Tyler Goodlet | 02a7c7c276 | |
Tyler Goodlet | 4fa71cc01c | |
Tyler Goodlet | 6a4ee461f5 | |
Tyler Goodlet | 2db03444f7 | |
Tyler Goodlet | a1b124b62b | |
Tyler Goodlet | 59ca256183 | |
Tyler Goodlet | 6c2efc96dc | |
Tyler Goodlet | f7fd8278af | |
Tyler Goodlet | 7ac730e326 | |
Tyler Goodlet | 582144830f | |
Tyler Goodlet | 8b860f4245 | |
Tyler Goodlet | 27fd96729a |
|
@ -4,9 +4,15 @@ import trio
|
||||||
|
|
||||||
async def breakpoint_forever():
|
async def breakpoint_forever():
|
||||||
"Indefinitely re-enter debugger in child actor."
|
"Indefinitely re-enter debugger in child actor."
|
||||||
while True:
|
try:
|
||||||
yield 'yo'
|
while True:
|
||||||
await tractor.breakpoint()
|
yield 'yo'
|
||||||
|
await tractor.breakpoint()
|
||||||
|
except BaseException:
|
||||||
|
tractor.log.get_console_log().exception(
|
||||||
|
'Cancelled while trying to enter pause point!'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
async def name_error():
|
async def name_error():
|
||||||
|
@ -19,7 +25,7 @@ async def main():
|
||||||
"""
|
"""
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
loglevel='error',
|
loglevel='cancel',
|
||||||
) as n:
|
) as n:
|
||||||
|
|
||||||
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
|
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
|
||||||
|
|
|
@ -45,6 +45,7 @@ async def spawn_until(depth=0):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: notes on the new boxed-relayed errors through proxy actors
|
||||||
async def main():
|
async def main():
|
||||||
"""The main ``tractor`` routine.
|
"""The main ``tractor`` routine.
|
||||||
|
|
||||||
|
|
|
@ -23,5 +23,6 @@ async def main():
|
||||||
n.start_soon(debug_actor.run, die)
|
n.start_soon(debug_actor.run, die)
|
||||||
n.start_soon(crash_boi.run, die)
|
n.start_soon(crash_boi.run, die)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -2,10 +2,13 @@ import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main(
|
||||||
|
registry_addrs: tuple[str, int]|None = None
|
||||||
|
):
|
||||||
|
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_root_actor(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
|
# loglevel='runtime',
|
||||||
):
|
):
|
||||||
while True:
|
while True:
|
||||||
await tractor.breakpoint()
|
await tractor.breakpoint()
|
||||||
|
|
|
@ -3,16 +3,26 @@ import tractor
|
||||||
|
|
||||||
|
|
||||||
async def name_error():
|
async def name_error():
|
||||||
getattr(doggypants)
|
getattr(doggypants) # noqa (on purpose)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
) as n:
|
# loglevel='transport',
|
||||||
|
) as an:
|
||||||
|
|
||||||
portal = await n.run_in_actor(name_error)
|
# TODO: ideally the REPL arrives at this frame in the parent,
|
||||||
await portal.result()
|
# ABOVE the @api_frame of `Portal.run_in_actor()` (which
|
||||||
|
# should eventually not even be a portal method ... XD)
|
||||||
|
# await tractor.pause()
|
||||||
|
p: tractor.Portal = await an.run_in_actor(name_error)
|
||||||
|
|
||||||
|
# with this style, should raise on this line
|
||||||
|
await p.result()
|
||||||
|
|
||||||
|
# with this alt style should raise at `open_nusery()`
|
||||||
|
# return await p.result()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -7,7 +7,7 @@ def sync_pause(
|
||||||
error: bool = False,
|
error: bool = False,
|
||||||
):
|
):
|
||||||
if use_builtin:
|
if use_builtin:
|
||||||
breakpoint()
|
breakpoint(hide_tb=False)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
tractor.pause_from_sync()
|
tractor.pause_from_sync()
|
||||||
|
@ -20,18 +20,20 @@ def sync_pause(
|
||||||
async def start_n_sync_pause(
|
async def start_n_sync_pause(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
):
|
):
|
||||||
# sync to requesting peer
|
actor: tractor.Actor = tractor.current_actor()
|
||||||
|
|
||||||
|
# sync to parent-side task
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
actor: tractor.Actor = tractor.current_actor()
|
|
||||||
print(f'entering SYNC PAUSE in {actor.uid}')
|
print(f'entering SYNC PAUSE in {actor.uid}')
|
||||||
sync_pause()
|
sync_pause()
|
||||||
print(f'back from SYNC PAUSE in {actor.uid}')
|
print(f'back from SYNC PAUSE in {actor.uid}')
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
|
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
# NOTE: required for pausing from sync funcs
|
||||||
|
maybe_enable_greenback=True,
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
) as an:
|
) as an:
|
||||||
|
|
||||||
|
|
|
@ -39,8 +39,7 @@ msgspec='^0.18.5' # interchange
|
||||||
wrapt = "^1.16.0" # decorators
|
wrapt = "^1.16.0" # decorators
|
||||||
colorlog = "^6.8.2" # logging
|
colorlog = "^6.8.2" # logging
|
||||||
|
|
||||||
# .devx tooling
|
# built-in multi-actor `pdb` REPL
|
||||||
stackscope = "^0.2.2"
|
|
||||||
pdbp = "^1.5.0"
|
pdbp = "^1.5.0"
|
||||||
|
|
||||||
|
|
||||||
|
@ -49,15 +48,19 @@ pdbp = "^1.5.0"
|
||||||
# 'pyroute2
|
# 'pyroute2
|
||||||
|
|
||||||
# ------ - ------
|
# ------ - ------
|
||||||
xontrib-vox = "^0.0.1"
|
|
||||||
|
|
||||||
[tool.poetry.group.dev]
|
[tool.poetry.group.dev]
|
||||||
optional = false
|
optional = false
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
|
# testing
|
||||||
pytest = "^8.2.0"
|
pytest = "^8.2.0"
|
||||||
pexpect = "^4.9.0"
|
pexpect = "^4.9.0"
|
||||||
|
|
||||||
# only for xonsh as sh..
|
# .devx tooling
|
||||||
|
greenback = "^1.2.1"
|
||||||
|
stackscope = "^0.2.2"
|
||||||
|
|
||||||
|
# (light) xonsh usage/integration
|
||||||
xontrib-vox = "^0.0.1"
|
xontrib-vox = "^0.0.1"
|
||||||
prompt-toolkit = "^3.0.43"
|
prompt-toolkit = "^3.0.43"
|
||||||
xonsh-vox-tabcomplete = "^0.5"
|
xonsh-vox-tabcomplete = "^0.5"
|
||||||
|
|
|
@ -6,30 +6,19 @@ related settings around IPC contexts.
|
||||||
'''
|
'''
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
contextmanager as cm,
|
|
||||||
)
|
|
||||||
# import typing
|
|
||||||
from typing import (
|
|
||||||
# Any,
|
|
||||||
TypeAlias,
|
|
||||||
# Union,
|
|
||||||
)
|
)
|
||||||
from contextvars import (
|
from contextvars import (
|
||||||
Context,
|
Context,
|
||||||
)
|
)
|
||||||
|
|
||||||
from msgspec import (
|
from msgspec import (
|
||||||
# structs,
|
|
||||||
# msgpack,
|
|
||||||
Struct,
|
Struct,
|
||||||
# ValidationError,
|
|
||||||
)
|
)
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import (
|
from tractor import (
|
||||||
# _state,
|
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
current_ipc_ctx,
|
current_ipc_ctx,
|
||||||
Portal,
|
Portal,
|
||||||
|
@ -40,20 +29,9 @@ from tractor.msg import (
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_codec,
|
_codec,
|
||||||
# _ctxvar_MsgCodec,
|
|
||||||
|
|
||||||
# NamespacePath,
|
|
||||||
# MsgCodec,
|
|
||||||
# mk_codec,
|
|
||||||
# apply_codec,
|
|
||||||
# current_codec,
|
|
||||||
)
|
)
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
log,
|
log,
|
||||||
# _payload_msgs,
|
|
||||||
# PayloadMsg,
|
|
||||||
# Started,
|
|
||||||
# mk_msg_spec,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -64,23 +42,10 @@ class PldMsg(Struct):
|
||||||
maybe_msg_spec = PldMsg|None
|
maybe_msg_spec = PldMsg|None
|
||||||
|
|
||||||
|
|
||||||
@cm
|
|
||||||
def custom_spec(
|
|
||||||
ctx: Context,
|
|
||||||
spec: TypeAlias,
|
|
||||||
) -> _codec.MsgCodec:
|
|
||||||
'''
|
|
||||||
Apply a custom payload spec, remove on exit.
|
|
||||||
|
|
||||||
'''
|
|
||||||
rx: msgops.PldRx = ctx._pld_rx
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_expect_raises(
|
async def maybe_expect_raises(
|
||||||
raises: BaseException|None = None,
|
raises: BaseException|None = None,
|
||||||
ensure_in_message: list[str]|None = None,
|
ensure_in_message: list[str]|None = None,
|
||||||
|
|
||||||
reraise: bool = False,
|
reraise: bool = False,
|
||||||
timeout: int = 3,
|
timeout: int = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -88,6 +53,9 @@ async def maybe_expect_raises(
|
||||||
Async wrapper for ensuring errors propagate from the inner scope.
|
Async wrapper for ensuring errors propagate from the inner scope.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
if tractor._state.debug_mode():
|
||||||
|
timeout += 999
|
||||||
|
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(timeout):
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
|
@ -103,9 +71,10 @@ async def maybe_expect_raises(
|
||||||
# maybe check for error txt content
|
# maybe check for error txt content
|
||||||
if ensure_in_message:
|
if ensure_in_message:
|
||||||
part: str
|
part: str
|
||||||
|
err_repr: str = repr(inner_err)
|
||||||
for part in ensure_in_message:
|
for part in ensure_in_message:
|
||||||
for i, arg in enumerate(inner_err.args):
|
for i, arg in enumerate(inner_err.args):
|
||||||
if part in arg:
|
if part in err_repr:
|
||||||
break
|
break
|
||||||
# if part never matches an arg, then we're
|
# if part never matches an arg, then we're
|
||||||
# missing a match.
|
# missing a match.
|
||||||
|
@ -132,7 +101,7 @@ async def child(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
started_value: int|PldMsg|None,
|
started_value: int|PldMsg|None,
|
||||||
return_value: str|None,
|
return_value: str|None,
|
||||||
validate_pld_spec: bool,
|
validate_pld_spec: bool,
|
||||||
raise_on_started_mte: bool = True,
|
raise_on_started_mte: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -166,13 +135,15 @@ async def child(
|
||||||
# 2 cases: hdndle send-side and recv-only validation
|
# 2 cases: hdndle send-side and recv-only validation
|
||||||
# - when `raise_on_started_mte == True`, send validate
|
# - when `raise_on_started_mte == True`, send validate
|
||||||
# - else, parent-recv-side only validation
|
# - else, parent-recv-side only validation
|
||||||
|
mte: MsgTypeError|None = None
|
||||||
try:
|
try:
|
||||||
await ctx.started(
|
await ctx.started(
|
||||||
value=started_value,
|
value=started_value,
|
||||||
validate_pld_spec=validate_pld_spec,
|
validate_pld_spec=validate_pld_spec,
|
||||||
)
|
)
|
||||||
|
|
||||||
except MsgTypeError:
|
except MsgTypeError as _mte:
|
||||||
|
mte = _mte
|
||||||
log.exception('started()` raised an MTE!\n')
|
log.exception('started()` raised an MTE!\n')
|
||||||
if not expect_started_mte:
|
if not expect_started_mte:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
@ -180,15 +151,60 @@ async def child(
|
||||||
f'{started_value!r}\n'
|
f'{started_value!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
boxed_div: str = '------ - ------'
|
||||||
|
assert boxed_div not in mte._message
|
||||||
|
assert boxed_div not in mte.tb_str
|
||||||
|
assert boxed_div not in repr(mte)
|
||||||
|
assert boxed_div not in str(mte)
|
||||||
|
mte_repr: str = repr(mte)
|
||||||
|
for line in mte.message.splitlines():
|
||||||
|
assert line in mte_repr
|
||||||
|
|
||||||
|
# since this is a *local error* there should be no
|
||||||
|
# boxed traceback content!
|
||||||
|
assert not mte.tb_str
|
||||||
|
|
||||||
# propagate to parent?
|
# propagate to parent?
|
||||||
if raise_on_started_mte:
|
if raise_on_started_mte:
|
||||||
raise
|
raise
|
||||||
else:
|
|
||||||
if expect_started_mte:
|
# no-send-side-error fallthrough
|
||||||
raise RuntimeError(
|
if (
|
||||||
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
|
validate_pld_spec
|
||||||
f'{started_value!r}\n'
|
and
|
||||||
)
|
expect_started_mte
|
||||||
|
):
|
||||||
|
raise RuntimeError(
|
||||||
|
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
|
||||||
|
f'{started_value!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
not expect_started_mte
|
||||||
|
or
|
||||||
|
not validate_pld_spec
|
||||||
|
)
|
||||||
|
|
||||||
|
# if wait_for_parent_to_cancel:
|
||||||
|
# ...
|
||||||
|
#
|
||||||
|
# ^-TODO-^ logic for diff validation policies on each side:
|
||||||
|
#
|
||||||
|
# -[ ] ensure that if we don't validate on the send
|
||||||
|
# side, that we are eventually error-cancelled by our
|
||||||
|
# parent due to the bad `Started` payload!
|
||||||
|
# -[ ] the boxed error should be srced from the parent's
|
||||||
|
# runtime NOT ours!
|
||||||
|
# -[ ] we should still error on bad `return_value`s
|
||||||
|
# despite the parent not yet error-cancelling us?
|
||||||
|
# |_ how do we want the parent side to look in that
|
||||||
|
# case?
|
||||||
|
# -[ ] maybe the equiv of "during handling of the
|
||||||
|
# above error another occurred" for the case where
|
||||||
|
# the parent sends a MTE to this child and while
|
||||||
|
# waiting for the child to terminate it gets back
|
||||||
|
# the MTE for this case?
|
||||||
|
#
|
||||||
|
|
||||||
# XXX should always fail on recv side since we can't
|
# XXX should always fail on recv side since we can't
|
||||||
# really do much else beside terminate and relay the
|
# really do much else beside terminate and relay the
|
||||||
|
@ -211,8 +227,8 @@ async def child(
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'return_value',
|
'return_value',
|
||||||
[
|
[
|
||||||
None,
|
|
||||||
'yo',
|
'yo',
|
||||||
|
None,
|
||||||
],
|
],
|
||||||
ids=[
|
ids=[
|
||||||
'return[invalid-"yo"]',
|
'return[invalid-"yo"]',
|
||||||
|
@ -271,16 +287,32 @@ def test_basic_payload_spec(
|
||||||
# since not opened yet.
|
# since not opened yet.
|
||||||
assert current_ipc_ctx() is None
|
assert current_ipc_ctx() is None
|
||||||
|
|
||||||
|
if invalid_started:
|
||||||
|
msg_type_str: str = 'Started'
|
||||||
|
bad_value_str: str = '10'
|
||||||
|
elif invalid_return:
|
||||||
|
msg_type_str: str = 'Return'
|
||||||
|
bad_value_str: str = "'yo'"
|
||||||
|
else:
|
||||||
|
# XXX but should never be used below then..
|
||||||
|
msg_type_str: str = ''
|
||||||
|
bad_value_str: str = ''
|
||||||
|
|
||||||
|
maybe_mte: MsgTypeError|None = None
|
||||||
|
should_raise: Exception|None = (
|
||||||
|
MsgTypeError if (
|
||||||
|
invalid_return
|
||||||
|
or
|
||||||
|
invalid_started
|
||||||
|
) else None
|
||||||
|
)
|
||||||
async with (
|
async with (
|
||||||
maybe_expect_raises(
|
maybe_expect_raises(
|
||||||
raises=MsgTypeError if (
|
raises=should_raise,
|
||||||
invalid_return
|
|
||||||
or
|
|
||||||
invalid_started
|
|
||||||
) else None,
|
|
||||||
ensure_in_message=[
|
ensure_in_message=[
|
||||||
"invalid `Return` payload",
|
f"invalid `{msg_type_str}` msg payload",
|
||||||
"value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`",
|
f"value: `{bad_value_str}` does not "
|
||||||
|
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
||||||
],
|
],
|
||||||
),
|
),
|
||||||
p.open_context(
|
p.open_context(
|
||||||
|
@ -298,18 +330,35 @@ def test_basic_payload_spec(
|
||||||
assert first.field == 'yo'
|
assert first.field == 'yo'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
assert (await ctx.result()) is None
|
res: None|PldMsg = await ctx.result(hide_tb=False)
|
||||||
|
assert res is None
|
||||||
except MsgTypeError as mte:
|
except MsgTypeError as mte:
|
||||||
|
maybe_mte = mte
|
||||||
if not invalid_return:
|
if not invalid_return:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
else: # expected this invalid `Return.pld`
|
# expected this invalid `Return.pld` so audit
|
||||||
assert mte.cid == ctx.cid
|
# the error state + meta-data
|
||||||
|
assert mte.expected_msg_type is Return
|
||||||
|
assert mte.cid == ctx.cid
|
||||||
|
mte_repr: str = repr(mte)
|
||||||
|
for line in mte.message.splitlines():
|
||||||
|
assert line in mte_repr
|
||||||
|
|
||||||
# verify expected remote mte deats
|
assert mte.tb_str
|
||||||
await tractor.pause()
|
# await tractor.pause(shield=True)
|
||||||
assert ctx._remote_error is mte
|
|
||||||
assert mte.expected_msg_type is Return
|
# verify expected remote mte deats
|
||||||
|
assert ctx._local_error is None
|
||||||
|
assert (
|
||||||
|
mte is
|
||||||
|
ctx._remote_error is
|
||||||
|
ctx.maybe_error is
|
||||||
|
ctx.outcome
|
||||||
|
)
|
||||||
|
|
||||||
|
if should_raise is None:
|
||||||
|
assert maybe_mte is None
|
||||||
|
|
||||||
await p.cancel_actor()
|
await p.cancel_actor()
|
||||||
|
|
||||||
|
|
|
@ -58,9 +58,6 @@ from typing import (
|
||||||
import warnings
|
import warnings
|
||||||
# ------ - ------
|
# ------ - ------
|
||||||
import trio
|
import trio
|
||||||
from msgspec import (
|
|
||||||
ValidationError,
|
|
||||||
)
|
|
||||||
# ------ - ------
|
# ------ - ------
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
@ -78,19 +75,16 @@ from .log import (
|
||||||
from .msg import (
|
from .msg import (
|
||||||
Error,
|
Error,
|
||||||
MsgType,
|
MsgType,
|
||||||
MsgCodec,
|
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
PayloadT,
|
PayloadT,
|
||||||
Started,
|
Started,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
current_codec,
|
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
_ops as msgops,
|
_ops as msgops,
|
||||||
)
|
)
|
||||||
from ._ipc import (
|
from ._ipc import (
|
||||||
Channel,
|
Channel,
|
||||||
_mk_msg_type_err,
|
|
||||||
)
|
)
|
||||||
from ._streaming import MsgStream
|
from ._streaming import MsgStream
|
||||||
from ._state import (
|
from ._state import (
|
||||||
|
@ -670,7 +664,7 @@ class Context:
|
||||||
'Setting remote error for ctx\n\n'
|
'Setting remote error for ctx\n\n'
|
||||||
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||||
f'=> {self.side!r}: {self._actor.uid}\n\n'
|
f'=> {self.side!r}: {self._actor.uid}\n\n'
|
||||||
f'{error}'
|
f'{error!r}'
|
||||||
)
|
)
|
||||||
self._remote_error: BaseException = error
|
self._remote_error: BaseException = error
|
||||||
|
|
||||||
|
@ -724,7 +718,7 @@ class Context:
|
||||||
log.error(
|
log.error(
|
||||||
f'Remote context error:\n\n'
|
f'Remote context error:\n\n'
|
||||||
# f'{pformat(self)}\n'
|
# f'{pformat(self)}\n'
|
||||||
f'{error}'
|
f'{error!r}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._canceller is None:
|
if self._canceller is None:
|
||||||
|
@ -748,26 +742,27 @@ class Context:
|
||||||
and not cs.cancel_called
|
and not cs.cancel_called
|
||||||
and not cs.cancelled_caught
|
and not cs.cancelled_caught
|
||||||
):
|
):
|
||||||
if not (
|
if (
|
||||||
msgerr
|
msgerr
|
||||||
|
|
||||||
# NOTE: we allow user to config not cancelling the
|
# NOTE: we allow user to config not cancelling the
|
||||||
# local scope on `MsgTypeError`s
|
# local scope on `MsgTypeError`s
|
||||||
and not self._cancel_on_msgerr
|
and
|
||||||
|
not self._cancel_on_msgerr
|
||||||
):
|
):
|
||||||
# TODO: it'd sure be handy to inject our own
|
|
||||||
# `trio.Cancelled` subtype here ;)
|
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
|
||||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
|
||||||
self._scope.cancel()
|
|
||||||
|
|
||||||
else:
|
|
||||||
message: str = (
|
message: str = (
|
||||||
'NOT Cancelling `Context._scope` since,\n'
|
'NOT Cancelling `Context._scope` since,\n'
|
||||||
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
||||||
f'AND we got a msg-type-error!\n'
|
f'AND we got a msg-type-error!\n'
|
||||||
f'{error}\n'
|
f'{error}\n'
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
# TODO: it'd sure be handy to inject our own
|
||||||
|
# `trio.Cancelled` subtype here ;)
|
||||||
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
|
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||||
|
self._scope.cancel()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
|
@ -1657,54 +1652,21 @@ class Context:
|
||||||
#
|
#
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
if validate_pld_spec:
|
if validate_pld_spec:
|
||||||
# __tracebackhide__: bool = False
|
msgops.validate_payload_msg(
|
||||||
codec: MsgCodec = current_codec()
|
pld_msg=started_msg,
|
||||||
msg_bytes: bytes = codec.encode(started_msg)
|
pld_value=value,
|
||||||
try:
|
ipc=self,
|
||||||
roundtripped: Started = codec.decode(msg_bytes)
|
strict_pld_parity=strict_pld_parity,
|
||||||
# pld: PayloadT = await self.pld_rx.recv_pld(
|
hide_tb=hide_tb,
|
||||||
pld: PayloadT = self.pld_rx.dec_msg(
|
)
|
||||||
msg=roundtripped,
|
|
||||||
ipc=self,
|
|
||||||
expect_msg=Started,
|
|
||||||
hide_tb=hide_tb,
|
|
||||||
is_started_send_side=True,
|
|
||||||
)
|
|
||||||
if (
|
|
||||||
strict_pld_parity
|
|
||||||
and
|
|
||||||
pld != value
|
|
||||||
):
|
|
||||||
# TODO: make that one a mod func too..
|
|
||||||
diff = pretty_struct.Struct.__sub__(
|
|
||||||
roundtripped,
|
|
||||||
started_msg,
|
|
||||||
)
|
|
||||||
complaint: str = (
|
|
||||||
'Started value does not match after roundtrip?\n\n'
|
|
||||||
f'{diff}'
|
|
||||||
)
|
|
||||||
raise ValidationError(complaint)
|
|
||||||
|
|
||||||
# raise any msg type error NO MATTER WHAT!
|
|
||||||
except ValidationError as verr:
|
|
||||||
# always show this src frame in the tb
|
|
||||||
# __tracebackhide__: bool = False
|
|
||||||
raise _mk_msg_type_err(
|
|
||||||
msg=roundtripped,
|
|
||||||
codec=codec,
|
|
||||||
src_validation_error=verr,
|
|
||||||
verb_header='Trying to send ',
|
|
||||||
is_invalid_payload=True,
|
|
||||||
) from verr
|
|
||||||
|
|
||||||
# TODO: maybe a flag to by-pass encode op if already done
|
# TODO: maybe a flag to by-pass encode op if already done
|
||||||
# here in caller?
|
# here in caller?
|
||||||
await self.chan.send(started_msg)
|
await self.chan.send(started_msg)
|
||||||
|
|
||||||
# set msg-related internal runtime-state
|
# set msg-related internal runtime-state
|
||||||
self._started_called = True
|
self._started_called: bool = True
|
||||||
self._started_msg = started_msg
|
self._started_msg: Started = started_msg
|
||||||
self._started_pld = value
|
self._started_pld = value
|
||||||
|
|
||||||
async def _drain_overflows(
|
async def _drain_overflows(
|
||||||
|
@ -2097,6 +2059,12 @@ async def open_context_from_portal(
|
||||||
if maybe_msgdec:
|
if maybe_msgdec:
|
||||||
assert maybe_msgdec.pld_spec == pld_spec
|
assert maybe_msgdec.pld_spec == pld_spec
|
||||||
|
|
||||||
|
# NOTE: this in an implicit runtime nursery used to,
|
||||||
|
# - start overrun queuing tasks when as well as
|
||||||
|
# for cancellation of the scope opened by the user.
|
||||||
|
ctx._scope_nursery: trio.Nursery = tn
|
||||||
|
ctx._scope: trio.CancelScope = tn.cancel_scope
|
||||||
|
|
||||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||||
# `Started`-msg any cancellation triggered
|
# `Started`-msg any cancellation triggered
|
||||||
# in `._maybe_cancel_and_set_remote_error()` will
|
# in `._maybe_cancel_and_set_remote_error()` will
|
||||||
|
@ -2104,25 +2072,42 @@ async def open_context_from_portal(
|
||||||
# -> it's expected that if there is an error in this phase of
|
# -> it's expected that if there is an error in this phase of
|
||||||
# the dialog, the `Error` msg should be raised from the `msg`
|
# the dialog, the `Error` msg should be raised from the `msg`
|
||||||
# handling block below.
|
# handling block below.
|
||||||
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
|
try:
|
||||||
ipc=ctx,
|
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
|
||||||
expect_msg=Started,
|
ipc=ctx,
|
||||||
passthrough_non_pld_msgs=False,
|
expect_msg=Started,
|
||||||
hide_tb=hide_tb,
|
passthrough_non_pld_msgs=False,
|
||||||
)
|
hide_tb=hide_tb,
|
||||||
|
)
|
||||||
|
except trio.Cancelled as taskc:
|
||||||
|
ctx_cs: trio.CancelScope = ctx._scope
|
||||||
|
if not ctx_cs.cancel_called:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# from .devx import pause
|
||||||
|
# await pause(shield=True)
|
||||||
|
|
||||||
|
log.cancel(
|
||||||
|
'IPC ctx was cancelled during "child" task sync due to\n\n'
|
||||||
|
f'{ctx.maybe_error}\n'
|
||||||
|
)
|
||||||
|
# OW if the ctx's scope was cancelled manually,
|
||||||
|
# likely the `Context` was cancelled via a call to
|
||||||
|
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||||
|
# we raise the underlying `._remote_error` directly
|
||||||
|
# instead of bubbling that taskc.
|
||||||
|
ctx.maybe_raise()
|
||||||
|
|
||||||
|
# OW, some other unexpected cancel condition
|
||||||
|
# that should prolly never happen right?
|
||||||
|
raise InternalError(
|
||||||
|
'Invalid cancellation during IPC ctx sync phase?\n'
|
||||||
|
) from taskc
|
||||||
|
|
||||||
# from .devx import pause
|
|
||||||
# await pause()
|
|
||||||
ctx._started_called: bool = True
|
ctx._started_called: bool = True
|
||||||
ctx._started_msg: bool = started_msg
|
ctx._started_msg: bool = started_msg
|
||||||
ctx._started_pld: bool = first
|
ctx._started_pld: bool = first
|
||||||
|
|
||||||
# NOTE: this in an implicit runtime nursery used to,
|
|
||||||
# - start overrun queuing tasks when as well as
|
|
||||||
# for cancellation of the scope opened by the user.
|
|
||||||
ctx._scope_nursery: trio.Nursery = tn
|
|
||||||
ctx._scope: trio.CancelScope = tn.cancel_scope
|
|
||||||
|
|
||||||
# deliver context instance and .started() msg value
|
# deliver context instance and .started() msg value
|
||||||
# in enter tuple.
|
# in enter tuple.
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
|
@ -22,6 +22,7 @@ from __future__ import annotations
|
||||||
import builtins
|
import builtins
|
||||||
import importlib
|
import importlib
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
import sys
|
||||||
from types import (
|
from types import (
|
||||||
TracebackType,
|
TracebackType,
|
||||||
)
|
)
|
||||||
|
@ -110,6 +111,7 @@ _body_fields: list[str] = list(
|
||||||
'tb_str',
|
'tb_str',
|
||||||
'relay_path',
|
'relay_path',
|
||||||
'cid',
|
'cid',
|
||||||
|
'message',
|
||||||
|
|
||||||
# only ctxc should show it but `Error` does
|
# only ctxc should show it but `Error` does
|
||||||
# have it as an optional field.
|
# have it as an optional field.
|
||||||
|
@ -236,6 +238,7 @@ class RemoteActorError(Exception):
|
||||||
self._boxed_type: BaseException = boxed_type
|
self._boxed_type: BaseException = boxed_type
|
||||||
self._src_type: BaseException|None = None
|
self._src_type: BaseException|None = None
|
||||||
self._ipc_msg: Error|None = ipc_msg
|
self._ipc_msg: Error|None = ipc_msg
|
||||||
|
self._extra_msgdata = extra_msgdata
|
||||||
|
|
||||||
if (
|
if (
|
||||||
extra_msgdata
|
extra_msgdata
|
||||||
|
@ -250,8 +253,6 @@ class RemoteActorError(Exception):
|
||||||
k,
|
k,
|
||||||
v,
|
v,
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
self._extra_msgdata = extra_msgdata
|
|
||||||
|
|
||||||
# TODO: mask out eventually or place in `pack_error()`
|
# TODO: mask out eventually or place in `pack_error()`
|
||||||
# pre-`return` lines?
|
# pre-`return` lines?
|
||||||
|
@ -282,6 +283,17 @@ class RemoteActorError(Exception):
|
||||||
# ensure any roundtripping evals to the input value
|
# ensure any roundtripping evals to the input value
|
||||||
assert self.boxed_type is boxed_type
|
assert self.boxed_type is boxed_type
|
||||||
|
|
||||||
|
@property
|
||||||
|
def message(self) -> str:
|
||||||
|
'''
|
||||||
|
Be explicit, instead of trying to read it from the the parent
|
||||||
|
type's loosely defined `.args: tuple`:
|
||||||
|
|
||||||
|
https://docs.python.org/3/library/exceptions.html#BaseException.args
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._message
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ipc_msg(self) -> Struct:
|
def ipc_msg(self) -> Struct:
|
||||||
'''
|
'''
|
||||||
|
@ -355,7 +367,10 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
bt: Type[BaseException] = self.boxed_type
|
bt: Type[BaseException] = self.boxed_type
|
||||||
return str(bt.__name__)
|
if bt:
|
||||||
|
return str(bt.__name__)
|
||||||
|
|
||||||
|
return ''
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def boxed_type(self) -> Type[BaseException]:
|
def boxed_type(self) -> Type[BaseException]:
|
||||||
|
@ -426,8 +441,7 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
for key in fields:
|
for key in fields:
|
||||||
if (
|
if (
|
||||||
key == 'relay_uid'
|
key == 'relay_uid' and not self.is_inception()
|
||||||
and not self.is_inception()
|
|
||||||
):
|
):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -504,19 +518,80 @@ class RemoteActorError(Exception):
|
||||||
def pformat(
|
def pformat(
|
||||||
self,
|
self,
|
||||||
with_type_header: bool = True,
|
with_type_header: bool = True,
|
||||||
|
# with_ascii_box: bool = True,
|
||||||
|
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
Nicely formatted boxed error meta data + traceback, OR just
|
Format any boxed remote error by multi-line display of,
|
||||||
the normal message from `.args` (for eg. as you'd want shown
|
|
||||||
by a locally raised `ContextCancelled`).
|
- error's src or relay actor meta-data,
|
||||||
|
- remote runtime env's traceback,
|
||||||
|
|
||||||
|
With optional control over the format of,
|
||||||
|
|
||||||
|
- whether the boxed traceback is ascii-decorated with
|
||||||
|
a surrounding "box" annotating the embedded stack-trace.
|
||||||
|
- if the error's type name should be added as margins
|
||||||
|
around the field and tb content like:
|
||||||
|
|
||||||
|
`<RemoteActorError(.. <<multi-line-content>> .. )>`
|
||||||
|
|
||||||
|
- the placement of the `.message: str` (explicit equiv of
|
||||||
|
`.args[0]`), either placed below the `.tb_str` or in the
|
||||||
|
first line's header when the error is raised locally (since
|
||||||
|
the type name is already implicitly shown by python).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
header: str = ''
|
header: str = ''
|
||||||
body: str = ''
|
body: str = ''
|
||||||
|
message: str = ''
|
||||||
|
|
||||||
|
# XXX when the currently raised exception is this instance,
|
||||||
|
# we do not ever use the "type header" style repr.
|
||||||
|
is_being_raised: bool = False
|
||||||
|
if (
|
||||||
|
(exc := sys.exception())
|
||||||
|
and
|
||||||
|
exc is self
|
||||||
|
):
|
||||||
|
is_being_raised: bool = True
|
||||||
|
|
||||||
|
with_type_header: bool = (
|
||||||
|
with_type_header
|
||||||
|
and
|
||||||
|
not is_being_raised
|
||||||
|
)
|
||||||
|
|
||||||
|
# <RemoteActorError( .. )> style
|
||||||
if with_type_header:
|
if with_type_header:
|
||||||
header: str = f'<{type(self).__name__}(\n'
|
header: str = f'<{type(self).__name__}('
|
||||||
|
|
||||||
|
if message := self._message:
|
||||||
|
|
||||||
|
# split off the first line so, if needed, it isn't
|
||||||
|
# indented the same like the "boxed content" which
|
||||||
|
# since there is no `.tb_str` is just the `.message`.
|
||||||
|
lines: list[str] = message.splitlines()
|
||||||
|
first: str = lines[0]
|
||||||
|
message: str = message.removeprefix(first)
|
||||||
|
|
||||||
|
# with a type-style header we,
|
||||||
|
# - have no special message "first line" extraction/handling
|
||||||
|
# - place the message a space in from the header:
|
||||||
|
# `MsgTypeError( <message> ..`
|
||||||
|
# ^-here
|
||||||
|
# - indent the `.message` inside the type body.
|
||||||
|
if with_type_header:
|
||||||
|
first = f' {first} )>'
|
||||||
|
|
||||||
|
message: str = textwrap.indent(
|
||||||
|
message,
|
||||||
|
prefix=' '*2,
|
||||||
|
)
|
||||||
|
message: str = first + message
|
||||||
|
|
||||||
|
# IFF there is an embedded traceback-str we always
|
||||||
|
# draw the ascii-box around it.
|
||||||
if tb_str := self.tb_str:
|
if tb_str := self.tb_str:
|
||||||
fields: str = self._mk_fields_str(
|
fields: str = self._mk_fields_str(
|
||||||
_body_fields
|
_body_fields
|
||||||
|
@ -535,36 +610,19 @@ class RemoteActorError(Exception):
|
||||||
# |___ ..
|
# |___ ..
|
||||||
tb_body_indent=1,
|
tb_body_indent=1,
|
||||||
)
|
)
|
||||||
if not with_type_header:
|
|
||||||
body = '\n' + body
|
|
||||||
|
|
||||||
elif message := self._message:
|
tail = ''
|
||||||
# split off the first line so it isn't indented
|
if (
|
||||||
# the same like the "boxed content".
|
with_type_header
|
||||||
if not with_type_header:
|
and not message
|
||||||
lines: list[str] = message.splitlines()
|
):
|
||||||
first: str = lines[0]
|
tail: str = '>'
|
||||||
message: str = message.removeprefix(first)
|
|
||||||
|
|
||||||
else:
|
|
||||||
first: str = ''
|
|
||||||
|
|
||||||
body: str = (
|
|
||||||
first
|
|
||||||
+
|
|
||||||
message
|
|
||||||
+
|
|
||||||
'\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if with_type_header:
|
|
||||||
tail: str = ')>'
|
|
||||||
else:
|
|
||||||
tail = ''
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
header
|
header
|
||||||
+
|
+
|
||||||
|
message
|
||||||
|
+
|
||||||
f'{body}'
|
f'{body}'
|
||||||
+
|
+
|
||||||
tail
|
tail
|
||||||
|
@ -577,7 +635,9 @@ class RemoteActorError(Exception):
|
||||||
# |_ i guess `pexepect` relies on `str`-casing
|
# |_ i guess `pexepect` relies on `str`-casing
|
||||||
# of output?
|
# of output?
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return self.pformat(with_type_header=False)
|
return self.pformat(
|
||||||
|
with_type_header=False
|
||||||
|
)
|
||||||
|
|
||||||
def unwrap(
|
def unwrap(
|
||||||
self,
|
self,
|
||||||
|
@ -825,9 +885,6 @@ class MsgTypeError(
|
||||||
extra_msgdata['_bad_msg'] = bad_msg
|
extra_msgdata['_bad_msg'] = bad_msg
|
||||||
extra_msgdata['cid'] = bad_msg.cid
|
extra_msgdata['cid'] = bad_msg.cid
|
||||||
|
|
||||||
if 'cid' not in extra_msgdata:
|
|
||||||
import pdbp; pdbp.set_trace()
|
|
||||||
|
|
||||||
return cls(
|
return cls(
|
||||||
message=message,
|
message=message,
|
||||||
boxed_type=cls,
|
boxed_type=cls,
|
||||||
|
@ -889,6 +946,7 @@ def pack_error(
|
||||||
src_uid: tuple[str, str]|None = None,
|
src_uid: tuple[str, str]|None = None,
|
||||||
tb: TracebackType|None = None,
|
tb: TracebackType|None = None,
|
||||||
tb_str: str = '',
|
tb_str: str = '',
|
||||||
|
message: str = '',
|
||||||
|
|
||||||
) -> Error:
|
) -> Error:
|
||||||
'''
|
'''
|
||||||
|
@ -902,7 +960,7 @@ def pack_error(
|
||||||
tb_str: str = (
|
tb_str: str = (
|
||||||
''.join(traceback.format_exception(exc))
|
''.join(traceback.format_exception(exc))
|
||||||
|
|
||||||
# TODO: can we remove this is `exc` is required?
|
# TODO: can we remove this since `exc` is required.. right?
|
||||||
or
|
or
|
||||||
# NOTE: this is just a shorthand for the "last error" as
|
# NOTE: this is just a shorthand for the "last error" as
|
||||||
# provided by `sys.exeception()`, see:
|
# provided by `sys.exeception()`, see:
|
||||||
|
@ -917,8 +975,8 @@ def pack_error(
|
||||||
# when caller provides a tb instance (say pulled from some other
|
# when caller provides a tb instance (say pulled from some other
|
||||||
# src error's `.__traceback__`) we use that as the "boxed"
|
# src error's `.__traceback__`) we use that as the "boxed"
|
||||||
# tb-string instead.
|
# tb-string instead.
|
||||||
|
# https://docs.python.org/3/library/traceback.html#traceback.format_list
|
||||||
if tb:
|
if tb:
|
||||||
# https://docs.python.org/3/library/traceback.html#traceback.format_list
|
|
||||||
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
|
tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str
|
||||||
|
|
||||||
error_msg: dict[ # for IPC
|
error_msg: dict[ # for IPC
|
||||||
|
@ -961,17 +1019,17 @@ def pack_error(
|
||||||
error_msg['src_type_str'] = type(exc).__name__
|
error_msg['src_type_str'] = type(exc).__name__
|
||||||
error_msg['boxed_type_str'] = type(exc).__name__
|
error_msg['boxed_type_str'] = type(exc).__name__
|
||||||
|
|
||||||
# XXX alawys append us the last relay in error propagation path
|
# XXX always append us the last relay in error propagation path
|
||||||
error_msg.setdefault(
|
error_msg.setdefault(
|
||||||
'relay_path',
|
'relay_path',
|
||||||
[],
|
[],
|
||||||
).append(our_uid)
|
).append(our_uid)
|
||||||
|
|
||||||
# XXX NOTE: always ensure the traceback-str is from the
|
# XXX NOTE XXX always ensure the traceback-str content is from
|
||||||
# locally raised error (**not** the prior relay's boxed
|
# the locally raised error (so, NOT the prior relay's boxed
|
||||||
# content's in `._ipc_msg.tb_str`).
|
# `._ipc_msg.tb_str`).
|
||||||
error_msg['tb_str'] = tb_str
|
error_msg['tb_str'] = tb_str
|
||||||
|
error_msg['message'] = message or getattr(exc, 'message', '')
|
||||||
if cid is not None:
|
if cid is not None:
|
||||||
error_msg['cid'] = cid
|
error_msg['cid'] = cid
|
||||||
|
|
||||||
|
@ -995,26 +1053,24 @@ def unpack_error(
|
||||||
if not isinstance(msg, Error):
|
if not isinstance(msg, Error):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# retrieve the remote error's msg-encoded details
|
|
||||||
tb_str: str = msg.tb_str
|
|
||||||
message: str = (
|
|
||||||
f'{chan.uid}\n'
|
|
||||||
+
|
|
||||||
tb_str
|
|
||||||
)
|
|
||||||
|
|
||||||
# try to lookup a suitable error type from the local runtime
|
# try to lookup a suitable error type from the local runtime
|
||||||
# env then use it to construct a local instance.
|
# env then use it to construct a local instance.
|
||||||
# boxed_type_str: str = error_dict['boxed_type_str']
|
# boxed_type_str: str = error_dict['boxed_type_str']
|
||||||
boxed_type_str: str = msg.boxed_type_str
|
boxed_type_str: str = msg.boxed_type_str
|
||||||
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
|
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
|
||||||
|
|
||||||
if boxed_type_str == 'ContextCancelled':
|
# retrieve the error's msg-encoded remotoe-env info
|
||||||
box_type = ContextCancelled
|
message: str = f'remote task raised a {msg.boxed_type_str!r}\n'
|
||||||
assert boxed_type is box_type
|
|
||||||
|
|
||||||
elif boxed_type_str == 'MsgTypeError':
|
# TODO: do we even really need these checks for RAEs?
|
||||||
box_type = MsgTypeError
|
if boxed_type_str in [
|
||||||
|
'ContextCancelled',
|
||||||
|
'MsgTypeError',
|
||||||
|
]:
|
||||||
|
box_type = {
|
||||||
|
'ContextCancelled': ContextCancelled,
|
||||||
|
'MsgTypeError': MsgTypeError,
|
||||||
|
}[boxed_type_str]
|
||||||
assert boxed_type is box_type
|
assert boxed_type is box_type
|
||||||
|
|
||||||
# TODO: already included by `_this_mod` in else loop right?
|
# TODO: already included by `_this_mod` in else loop right?
|
||||||
|
@ -1029,19 +1085,21 @@ def unpack_error(
|
||||||
exc = box_type(
|
exc = box_type(
|
||||||
message,
|
message,
|
||||||
ipc_msg=msg,
|
ipc_msg=msg,
|
||||||
|
tb_str=msg.tb_str,
|
||||||
)
|
)
|
||||||
|
|
||||||
return exc
|
return exc
|
||||||
|
|
||||||
|
|
||||||
def is_multi_cancelled(exc: BaseException) -> bool:
|
def is_multi_cancelled(
|
||||||
|
exc: BaseException|BaseExceptionGroup
|
||||||
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Predicate to determine if a possible ``BaseExceptionGroup`` contains
|
Predicate to determine if a possible ``BaseExceptionGroup`` contains
|
||||||
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
|
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
|
||||||
cancelling a collection of subtasks.
|
cancelling a collection of subtasks.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# if isinstance(exc, eg.BaseExceptionGroup):
|
|
||||||
if isinstance(exc, BaseExceptionGroup):
|
if isinstance(exc, BaseExceptionGroup):
|
||||||
return exc.subgroup(
|
return exc.subgroup(
|
||||||
lambda exc: isinstance(exc, trio.Cancelled)
|
lambda exc: isinstance(exc, trio.Cancelled)
|
||||||
|
@ -1109,6 +1167,7 @@ def _raise_from_unexpected_msg(
|
||||||
msg,
|
msg,
|
||||||
ctx.chan,
|
ctx.chan,
|
||||||
)
|
)
|
||||||
|
ctx._maybe_cancel_and_set_remote_error(exc)
|
||||||
raise exc from src_err
|
raise exc from src_err
|
||||||
|
|
||||||
# `MsgStream` termination msg.
|
# `MsgStream` termination msg.
|
||||||
|
@ -1183,7 +1242,6 @@ def _mk_msg_type_err(
|
||||||
src_validation_error: ValidationError|None = None,
|
src_validation_error: ValidationError|None = None,
|
||||||
src_type_error: TypeError|None = None,
|
src_type_error: TypeError|None = None,
|
||||||
is_invalid_payload: bool = False,
|
is_invalid_payload: bool = False,
|
||||||
# src_err_msg: Error|None = None,
|
|
||||||
|
|
||||||
**mte_kwargs,
|
**mte_kwargs,
|
||||||
|
|
||||||
|
@ -1250,19 +1308,11 @@ def _mk_msg_type_err(
|
||||||
msg_type: str = type(msg)
|
msg_type: str = type(msg)
|
||||||
any_pld: Any = msgpack.decode(msg.pld)
|
any_pld: Any = msgpack.decode(msg.pld)
|
||||||
message: str = (
|
message: str = (
|
||||||
f'invalid `{msg_type.__qualname__}` payload\n\n'
|
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
|
||||||
f'value: `{any_pld!r}` does not match type-spec: ' #\n'
|
f'value: `{any_pld!r}` does not match type-spec: '
|
||||||
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
|
||||||
# f'<{type(msg).__qualname__}(\n'
|
|
||||||
# f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n'
|
|
||||||
# f')>\n\n'
|
|
||||||
)
|
)
|
||||||
# src_err_msg = msg
|
|
||||||
bad_msg = msg
|
bad_msg = msg
|
||||||
# TODO: should we just decode the msg to a dict despite
|
|
||||||
# only the payload being wrong?
|
|
||||||
# -[ ] maybe the better design is to break this construct
|
|
||||||
# logic into a separate explicit helper raiser-func?
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# decode the msg-bytes using the std msgpack
|
# decode the msg-bytes using the std msgpack
|
||||||
|
@ -1307,21 +1357,21 @@ def _mk_msg_type_err(
|
||||||
if verb_header:
|
if verb_header:
|
||||||
message = f'{verb_header} ' + message
|
message = f'{verb_header} ' + message
|
||||||
|
|
||||||
# if not isinstance(bad_msg, PayloadMsg):
|
|
||||||
# import pdbp; pdbp.set_trace()
|
|
||||||
|
|
||||||
msgtyperr = MsgTypeError.from_decode(
|
msgtyperr = MsgTypeError.from_decode(
|
||||||
message=message,
|
message=message,
|
||||||
bad_msg=bad_msg,
|
bad_msg=bad_msg,
|
||||||
bad_msg_as_dict=msg_dict,
|
bad_msg_as_dict=msg_dict,
|
||||||
|
|
||||||
# NOTE: for the send-side `.started()` pld-validate
|
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
||||||
# case we actually set the `._ipc_msg` AFTER we return
|
# - for the send-side `.started()` pld-validate
|
||||||
# from here inside `Context.started()` since we actually
|
# case we actually raise inline so we don't need to
|
||||||
# want to emulate the `Error` from the mte we build here
|
# set the it at all.
|
||||||
# Bo
|
# - for recv side we set it inside `PldRx.decode_pld()`
|
||||||
# so by default in that case this is set to `None`
|
# after a manual call to `pack_error()` since we
|
||||||
# ipc_msg=src_err_msg,
|
# actually want to emulate the `Error` from the mte we
|
||||||
|
# build here. So by default in that case, this is left
|
||||||
|
# as `None` here.
|
||||||
|
# ipc_msg=src_err_msg,
|
||||||
)
|
)
|
||||||
msgtyperr.__cause__ = src_validation_error
|
msgtyperr.__cause__ = src_validation_error
|
||||||
return msgtyperr
|
return msgtyperr
|
||||||
|
|
|
@ -291,7 +291,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
|
|
||||||
async def send(
|
async def send(
|
||||||
self,
|
self,
|
||||||
msg: msgtypes.Msg,
|
msg: msgtypes.MsgType,
|
||||||
|
|
||||||
strict_types: bool = True,
|
strict_types: bool = True,
|
||||||
# hide_tb: bool = False,
|
# hide_tb: bool = False,
|
||||||
|
|
|
@ -64,6 +64,7 @@ from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
current_codec,
|
current_codec,
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
|
PayloadT,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
)
|
)
|
||||||
|
@ -98,7 +99,7 @@ async def _invoke_non_context(
|
||||||
|
|
||||||
treat_as_gen: bool,
|
treat_as_gen: bool,
|
||||||
is_rpc: bool,
|
is_rpc: bool,
|
||||||
return_msg: Return|CancelAck = Return,
|
return_msg_type: Return|CancelAck = Return,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
|
@ -220,7 +221,7 @@ async def _invoke_non_context(
|
||||||
and chan.connected()
|
and chan.connected()
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
ret_msg = return_msg(
|
ret_msg = return_msg_type(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
pld=result,
|
pld=result,
|
||||||
)
|
)
|
||||||
|
@ -392,16 +393,22 @@ async def _errors_relayed_via_ipc(
|
||||||
# cancel scope will not have been inserted yet
|
# cancel scope will not have been inserted yet
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
log.warning(
|
log.warning(
|
||||||
'RPC task likely errored or cancelled before start?'
|
'RPC task likely errored or cancelled before start?\n'
|
||||||
f'|_{ctx._task}\n'
|
|
||||||
f' >> {ctx.repr_rpc}\n'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
log.cancel(
|
|
||||||
'Failed to de-alloc internal runtime cancel task?\n'
|
|
||||||
f'|_{ctx._task}\n'
|
f'|_{ctx._task}\n'
|
||||||
f' >> {ctx.repr_rpc}\n'
|
f' >> {ctx.repr_rpc}\n'
|
||||||
)
|
)
|
||||||
|
# TODO: remove this right? rn the only non-`is_rpc` cases
|
||||||
|
# are cancellation methods and according the RPC loop eps
|
||||||
|
# for thoses below, nothing is ever registered in
|
||||||
|
# `Actor._rpc_tasks` for those cases.. but should we?
|
||||||
|
#
|
||||||
|
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
|
||||||
|
# else:
|
||||||
|
# log.cancel(
|
||||||
|
# 'Failed to de-alloc internal runtime cancel task?\n'
|
||||||
|
# f'|_{ctx._task}\n'
|
||||||
|
# f' >> {ctx.repr_rpc}\n'
|
||||||
|
# )
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
|
@ -419,7 +426,7 @@ async def _invoke(
|
||||||
|
|
||||||
is_rpc: bool = True,
|
is_rpc: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
return_msg: Return|CancelAck = Return,
|
return_msg_type: Return|CancelAck = Return,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
|
@ -533,7 +540,7 @@ async def _invoke(
|
||||||
kwargs,
|
kwargs,
|
||||||
treat_as_gen,
|
treat_as_gen,
|
||||||
is_rpc,
|
is_rpc,
|
||||||
return_msg,
|
return_msg_type,
|
||||||
task_status,
|
task_status,
|
||||||
)
|
)
|
||||||
# XXX below fallthrough is ONLY for `@context` eps
|
# XXX below fallthrough is ONLY for `@context` eps
|
||||||
|
@ -593,18 +600,21 @@ async def _invoke(
|
||||||
ctx._scope = tn.cancel_scope
|
ctx._scope = tn.cancel_scope
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
|
||||||
# TODO: should would be nice to have our
|
# TODO: better `trionics` tooling:
|
||||||
# `TaskMngr` nursery here!
|
# -[ ] should would be nice to have our `TaskMngr`
|
||||||
res: Any = await coro
|
# nursery here!
|
||||||
ctx._result = res
|
# -[ ] payload value checking like we do with
|
||||||
|
# `.started()` such that the debbuger can engage
|
||||||
# deliver final result to caller side.
|
# here in the child task instead of waiting for the
|
||||||
await chan.send(
|
# parent to crash with it's own MTE..
|
||||||
return_msg(
|
res: Any|PayloadT = await coro
|
||||||
cid=cid,
|
return_msg: Return|CancelAck = return_msg_type(
|
||||||
pld=res,
|
cid=cid,
|
||||||
)
|
pld=res,
|
||||||
)
|
)
|
||||||
|
# set and shuttle final result to "parent"-side task.
|
||||||
|
ctx._result = res
|
||||||
|
await chan.send(return_msg)
|
||||||
|
|
||||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||||
# called by any of,
|
# called by any of,
|
||||||
|
@ -940,7 +950,7 @@ async def process_messages(
|
||||||
actor.cancel,
|
actor.cancel,
|
||||||
kwargs,
|
kwargs,
|
||||||
is_rpc=False,
|
is_rpc=False,
|
||||||
return_msg=CancelAck,
|
return_msg_type=CancelAck,
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -974,7 +984,7 @@ async def process_messages(
|
||||||
actor._cancel_task,
|
actor._cancel_task,
|
||||||
kwargs,
|
kwargs,
|
||||||
is_rpc=False,
|
is_rpc=False,
|
||||||
return_msg=CancelAck,
|
return_msg_type=CancelAck,
|
||||||
)
|
)
|
||||||
except BaseException:
|
except BaseException:
|
||||||
log.exception(
|
log.exception(
|
||||||
|
|
|
@ -1256,9 +1256,10 @@ class Actor:
|
||||||
# - child returns a result before cancel-msg/ctxc-raised
|
# - child returns a result before cancel-msg/ctxc-raised
|
||||||
# - child self raises ctxc before parent send request,
|
# - child self raises ctxc before parent send request,
|
||||||
# - child errors prior to cancel req.
|
# - child errors prior to cancel req.
|
||||||
log.cancel(
|
log.runtime(
|
||||||
'Cancel request invalid, RPC task already completed?\n\n'
|
'Cancel request for invalid RPC task.\n'
|
||||||
f'<= canceller: {requesting_uid}\n\n'
|
'The task likely already completed or was never started!\n\n'
|
||||||
|
f'<= canceller: {requesting_uid}\n'
|
||||||
f'=> {cid}@{parent_chan.uid}\n'
|
f'=> {cid}@{parent_chan.uid}\n'
|
||||||
f' |_{parent_chan}\n'
|
f' |_{parent_chan}\n'
|
||||||
)
|
)
|
||||||
|
|
|
@ -140,7 +140,7 @@ class MsgDec(Struct):
|
||||||
# * also a `.__contains__()` for doing `None in
|
# * also a `.__contains__()` for doing `None in
|
||||||
# TypeSpec[None|int]` since rn you need to do it on
|
# TypeSpec[None|int]` since rn you need to do it on
|
||||||
# `.__args__` for unions..
|
# `.__args__` for unions..
|
||||||
# - `MsgSpec: Union[Type[Msg]]
|
# - `MsgSpec: Union[MsgType]
|
||||||
#
|
#
|
||||||
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
|
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
|
||||||
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
|
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
|
||||||
|
@ -188,7 +188,7 @@ def mk_dec(
|
||||||
|
|
||||||
return MsgDec(
|
return MsgDec(
|
||||||
_dec=msgpack.Decoder(
|
_dec=msgpack.Decoder(
|
||||||
type=spec, # like `Msg[Any]`
|
type=spec, # like `MsgType[Any]`
|
||||||
dec_hook=dec_hook,
|
dec_hook=dec_hook,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -561,7 +561,7 @@ def mk_codec(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# (manually) generate a msg-payload-spec for all relevant
|
# (manually) generate a msg-payload-spec for all relevant
|
||||||
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
|
# god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT`
|
||||||
# for the decoder such that all sub-type msgs in our SCIPP
|
# for the decoder such that all sub-type msgs in our SCIPP
|
||||||
# will automatically decode to a type-"limited" payload (`Struct`)
|
# will automatically decode to a type-"limited" payload (`Struct`)
|
||||||
# object (set).
|
# object (set).
|
||||||
|
@ -607,7 +607,7 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
||||||
|
|
||||||
# The built-in IPC `Msg` spec.
|
# The built-in IPC `Msg` spec.
|
||||||
# Our composing "shuttle" protocol which allows `tractor`-app code
|
# Our composing "shuttle" protocol which allows `tractor`-app code
|
||||||
# to use any `msgspec` supported type as the `Msg.pld` payload,
|
# to use any `msgspec` supported type as the `PayloadMsg.pld` payload,
|
||||||
# https://jcristharif.com/msgspec/supported-types.html
|
# https://jcristharif.com/msgspec/supported-types.html
|
||||||
#
|
#
|
||||||
_def_tractor_codec: MsgCodec = mk_codec(
|
_def_tractor_codec: MsgCodec = mk_codec(
|
||||||
|
@ -743,7 +743,7 @@ def limit_msg_spec(
|
||||||
) -> MsgCodec:
|
) -> MsgCodec:
|
||||||
'''
|
'''
|
||||||
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
||||||
`Msg.pld: Union[Type[Struct]]` payload fields using
|
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using
|
||||||
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
||||||
for all IPC contexts in use by the current `trio.Task`.
|
for all IPC contexts in use by the current `trio.Task`.
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,8 @@ from tractor._state import current_ipc_ctx
|
||||||
from ._codec import (
|
from ._codec import (
|
||||||
mk_dec,
|
mk_dec,
|
||||||
MsgDec,
|
MsgDec,
|
||||||
|
MsgCodec,
|
||||||
|
current_codec,
|
||||||
)
|
)
|
||||||
from .types import (
|
from .types import (
|
||||||
CancelAck,
|
CancelAck,
|
||||||
|
@ -213,6 +215,9 @@ class PldRx(Struct):
|
||||||
**dec_msg_kwargs,
|
**dec_msg_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: rename to,
|
||||||
|
# -[ ] `.decode_pld()`?
|
||||||
|
# -[ ] `.dec_pld()`?
|
||||||
def dec_msg(
|
def dec_msg(
|
||||||
self,
|
self,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
|
@ -246,8 +251,8 @@ class PldRx(Struct):
|
||||||
pld: PayloadT = self._pld_dec.decode(pld)
|
pld: PayloadT = self._pld_dec.decode(pld)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Decoded msg payload\n\n'
|
'Decoded msg payload\n\n'
|
||||||
f'{msg}\n\n'
|
f'{msg}\n'
|
||||||
f'where payload is\n'
|
f'where payload decoded as\n'
|
||||||
f'|_pld={pld!r}\n'
|
f'|_pld={pld!r}\n'
|
||||||
)
|
)
|
||||||
return pld
|
return pld
|
||||||
|
@ -263,13 +268,29 @@ class PldRx(Struct):
|
||||||
src_validation_error=valerr,
|
src_validation_error=valerr,
|
||||||
is_invalid_payload=True,
|
is_invalid_payload=True,
|
||||||
expected_msg=expect_msg,
|
expected_msg=expect_msg,
|
||||||
# ipc_msg=msg,
|
|
||||||
)
|
)
|
||||||
# NOTE: override the `msg` passed to
|
# NOTE: just raise the MTE inline instead of all
|
||||||
# `_raise_from_unexpected_msg()` (below) so so that
|
# the pack-unpack-repack non-sense when this is
|
||||||
# we're effectively able to use that same func to
|
# a "send side" validation error.
|
||||||
# unpack and raise an "emulated remote `Error`" of
|
if is_started_send_side:
|
||||||
# this local MTE.
|
raise mte
|
||||||
|
|
||||||
|
# XXX TODO: remove this right?
|
||||||
|
# => any bad stated/return values should
|
||||||
|
# always be treated a remote errors right?
|
||||||
|
#
|
||||||
|
# if (
|
||||||
|
# expect_msg is Return
|
||||||
|
# or expect_msg is Started
|
||||||
|
# ):
|
||||||
|
# # set emulated remote error more-or-less as the
|
||||||
|
# # runtime would
|
||||||
|
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
|
# ctx._maybe_cancel_and_set_remote_error(mte)
|
||||||
|
|
||||||
|
# NOTE: the `.message` is automatically
|
||||||
|
# transferred into the message as long as we
|
||||||
|
# define it as a `Error.message` field.
|
||||||
err_msg: Error = pack_error(
|
err_msg: Error = pack_error(
|
||||||
exc=mte,
|
exc=mte,
|
||||||
cid=msg.cid,
|
cid=msg.cid,
|
||||||
|
@ -279,36 +300,38 @@ class PldRx(Struct):
|
||||||
else ipc._actor.uid
|
else ipc._actor.uid
|
||||||
),
|
),
|
||||||
# tb=valerr.__traceback__,
|
# tb=valerr.__traceback__,
|
||||||
tb_str=mte._message,
|
# tb_str=mte._message,
|
||||||
|
# message=mte._message,
|
||||||
)
|
)
|
||||||
# ^-TODO-^ just raise this inline instead of all the
|
|
||||||
# pack-unpack-repack non-sense!
|
|
||||||
|
|
||||||
mte._ipc_msg = err_msg
|
mte._ipc_msg = err_msg
|
||||||
msg = err_msg
|
|
||||||
|
|
||||||
# set emulated remote error more-or-less as the
|
# XXX override the `msg` passed to
|
||||||
# runtime would
|
# `_raise_from_unexpected_msg()` (below) so so
|
||||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
# that we're effectively able to use that same
|
||||||
|
# func to unpack and raise an "emulated remote
|
||||||
|
# `Error`" of this local MTE.
|
||||||
|
msg = err_msg
|
||||||
|
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
|
||||||
|
# raises the boxed `err_msg` from above it raises
|
||||||
|
# it from the above caught interchange-lib
|
||||||
|
# validation error.
|
||||||
|
src_err = valerr
|
||||||
|
|
||||||
# TODO: should we instead make this explicit and
|
# TODO: should we instead make this explicit and
|
||||||
# use the above masked `is_started_send_decode`,
|
# use the above masked `is_started_send_decode`,
|
||||||
# expecting the `Context.started()` caller to set
|
# expecting the `Context.started()` caller to set
|
||||||
# it? Rn this is kinda, howyousayyy, implicitly
|
# it? Rn this is kinda, howyousayyy, implicitly
|
||||||
# edge-case-y..
|
# edge-case-y..
|
||||||
if (
|
# TODO: remove this since it's been added to
|
||||||
expect_msg is not Started
|
# `_raise_from_unexpected_msg()`..?
|
||||||
and not is_started_send_side
|
# if (
|
||||||
):
|
# expect_msg is not Started
|
||||||
ctx._maybe_cancel_and_set_remote_error(mte)
|
# and not is_started_send_side
|
||||||
|
# ):
|
||||||
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
|
# # set emulated remote error more-or-less as the
|
||||||
# raises the boxed `err_msg` from above it raises
|
# # runtime would
|
||||||
# it from `None`.
|
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
src_err = valerr
|
# ctx._maybe_cancel_and_set_remote_error(mte)
|
||||||
# if is_started_send_side:
|
|
||||||
# src_err = None
|
|
||||||
|
|
||||||
|
|
||||||
# XXX some other decoder specific failure?
|
# XXX some other decoder specific failure?
|
||||||
# except TypeError as src_error:
|
# except TypeError as src_error:
|
||||||
|
@ -559,6 +582,7 @@ async def drain_to_final_msg(
|
||||||
ipc=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
raise_error=False,
|
raise_error=False,
|
||||||
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
# ^-TODO-^ some bad ideas?
|
# ^-TODO-^ some bad ideas?
|
||||||
# -[ ] wrap final outcome .receive() in a scope so
|
# -[ ] wrap final outcome .receive() in a scope so
|
||||||
|
@ -737,9 +761,61 @@ async def drain_to_final_msg(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO: factor logic from `.Context.started()` for send-side
|
|
||||||
# validate raising!
|
|
||||||
def validate_payload_msg(
|
def validate_payload_msg(
|
||||||
msg: Started|Yield|Return,
|
pld_msg: Started|Yield|Return,
|
||||||
|
pld_value: PayloadT,
|
||||||
|
ipc: Context|MsgStream,
|
||||||
|
|
||||||
|
raise_mte: bool = True,
|
||||||
|
strict_pld_parity: bool = False,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> MsgTypeError|None:
|
) -> MsgTypeError|None:
|
||||||
...
|
'''
|
||||||
|
Validate a `PayloadMsg.pld` value with the current
|
||||||
|
IPC ctx's `PldRx` and raise an appropriate `MsgTypeError`
|
||||||
|
on failure.
|
||||||
|
|
||||||
|
'''
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
|
codec: MsgCodec = current_codec()
|
||||||
|
msg_bytes: bytes = codec.encode(pld_msg)
|
||||||
|
try:
|
||||||
|
roundtripped: Started = codec.decode(msg_bytes)
|
||||||
|
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
|
pld: PayloadT = ctx.pld_rx.dec_msg(
|
||||||
|
msg=roundtripped,
|
||||||
|
ipc=ipc,
|
||||||
|
expect_msg=Started,
|
||||||
|
hide_tb=hide_tb,
|
||||||
|
is_started_send_side=True,
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
strict_pld_parity
|
||||||
|
and
|
||||||
|
pld != pld_value
|
||||||
|
):
|
||||||
|
# TODO: make that one a mod func too..
|
||||||
|
diff = pretty_struct.Struct.__sub__(
|
||||||
|
roundtripped,
|
||||||
|
pld_msg,
|
||||||
|
)
|
||||||
|
complaint: str = (
|
||||||
|
'Started value does not match after roundtrip?\n\n'
|
||||||
|
f'{diff}'
|
||||||
|
)
|
||||||
|
raise ValidationError(complaint)
|
||||||
|
|
||||||
|
# raise any msg type error NO MATTER WHAT!
|
||||||
|
except ValidationError as verr:
|
||||||
|
mte: MsgTypeError = _mk_msg_type_err(
|
||||||
|
msg=roundtripped,
|
||||||
|
codec=codec,
|
||||||
|
src_validation_error=verr,
|
||||||
|
verb_header='Trying to send ',
|
||||||
|
is_invalid_payload=True,
|
||||||
|
)
|
||||||
|
if not raise_mte:
|
||||||
|
return mte
|
||||||
|
|
||||||
|
raise mte from verr
|
||||||
|
|
|
@ -89,11 +89,12 @@ class PayloadMsg(
|
||||||
# -[ ] `uuid.UUID` which has multi-protocol support
|
# -[ ] `uuid.UUID` which has multi-protocol support
|
||||||
# https://jcristharif.com/msgspec/supported-types.html#uuid
|
# https://jcristharif.com/msgspec/supported-types.html#uuid
|
||||||
|
|
||||||
# The msgs "payload" (spelled without vowels):
|
# The msg's "payload" (spelled without vowels):
|
||||||
# https://en.wikipedia.org/wiki/Payload_(computing)
|
# https://en.wikipedia.org/wiki/Payload_(computing)
|
||||||
#
|
pld: Raw
|
||||||
# NOTE: inherited from any `Msg` (and maybe overriden
|
|
||||||
# by use of `limit_msg_spec()`), but by default is
|
# ^-NOTE-^ inherited from any `PayloadMsg` (and maybe type
|
||||||
|
# overriden via the `._ops.limit_plds()` API), but by default is
|
||||||
# parameterized to be `Any`.
|
# parameterized to be `Any`.
|
||||||
#
|
#
|
||||||
# XXX this `Union` must strictly NOT contain `Any` if
|
# XXX this `Union` must strictly NOT contain `Any` if
|
||||||
|
@ -106,7 +107,6 @@ class PayloadMsg(
|
||||||
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
|
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
|
||||||
# approach is preferred over the generic parameterization
|
# approach is preferred over the generic parameterization
|
||||||
# approach as take by `mk_msg_spec()` below.
|
# approach as take by `mk_msg_spec()` below.
|
||||||
pld: Raw
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: complete rename
|
# TODO: complete rename
|
||||||
|
@ -410,21 +410,32 @@ class Error(
|
||||||
src_type_str: str
|
src_type_str: str
|
||||||
boxed_type_str: str
|
boxed_type_str: str
|
||||||
relay_path: list[tuple[str, str]]
|
relay_path: list[tuple[str, str]]
|
||||||
tb_str: str
|
|
||||||
|
|
||||||
cid: str|None = None
|
# normally either both are provided or just
|
||||||
|
# a message for certain special cases where
|
||||||
|
# we pack a message for a locally raised
|
||||||
|
# mte or ctxc.
|
||||||
|
message: str|None = None
|
||||||
|
tb_str: str = ''
|
||||||
|
|
||||||
# TODO: use UNSET or don't include them via
|
# TODO: only optionally include sub-type specfic fields?
|
||||||
|
# -[ ] use UNSET or don't include them via `omit_defaults` (see
|
||||||
|
# inheritance-line options above)
|
||||||
#
|
#
|
||||||
# `ContextCancelled`
|
# `ContextCancelled` reports the src cancelling `Actor.uid`
|
||||||
canceller: tuple[str, str]|None = None
|
canceller: tuple[str, str]|None = None
|
||||||
|
|
||||||
# `StreamOverrun`
|
# `StreamOverrun`-specific src `Actor.uid`
|
||||||
sender: tuple[str, str]|None = None
|
sender: tuple[str, str]|None = None
|
||||||
|
|
||||||
# for the `MsgTypeError` case where the receiver side
|
# `MsgTypeError` meta-data
|
||||||
# decodes the underlying original `Msg`-subtype
|
cid: str|None = None
|
||||||
_msg_dict: dict|None = None
|
# when the receiver side fails to decode a delivered
|
||||||
|
# `PayloadMsg`-subtype; one and/or both the msg-struct instance
|
||||||
|
# and `Any`-decoded to `dict` of the msg are set and relayed
|
||||||
|
# (back to the sender) for introspection.
|
||||||
|
_bad_msg: Started|Yield|Return|None = None
|
||||||
|
_bad_msg_as_dict: dict|None = None
|
||||||
|
|
||||||
|
|
||||||
def from_dict_msg(
|
def from_dict_msg(
|
||||||
|
@ -436,9 +447,11 @@ def from_dict_msg(
|
||||||
|
|
||||||
) -> MsgType:
|
) -> MsgType:
|
||||||
'''
|
'''
|
||||||
Helper to build a specific `MsgType` struct from
|
Helper to build a specific `MsgType` struct from a "vanilla"
|
||||||
a "vanilla" decoded `dict`-ified equivalent of the
|
decoded `dict`-ified equivalent of the msg: i.e. if the
|
||||||
msg: i.e. if the `msgpack.Decoder.type == Any`.
|
`msgpack.Decoder.type == Any`, the default when using
|
||||||
|
`msgspec.msgpack` and not "typed decoding" using
|
||||||
|
`msgspec.Struct`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
msg_type_tag_field: str = (
|
msg_type_tag_field: str = (
|
||||||
|
|
Loading…
Reference in New Issue