Compare commits

..

No commits in common. "72df312e71dd3d29b16118b6233e40e7512347bf" and "8477919fc92b9811ec10bb9931da184319fa4d4c" have entirely different histories.

5 changed files with 165 additions and 188 deletions

View File

@ -49,6 +49,7 @@ from typing import (
Any, Any,
AsyncGenerator, AsyncGenerator,
Callable, Callable,
Mapping,
Type, Type,
TypeAlias, TypeAlias,
TYPE_CHECKING, TYPE_CHECKING,
@ -1483,8 +1484,6 @@ class Context:
# #
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if validate_pld_spec: if validate_pld_spec:
# TODO: prolly wrap this as a `show_frame_when_not()`
try:
msgops.validate_payload_msg( msgops.validate_payload_msg(
pld_msg=started_msg, pld_msg=started_msg,
pld_value=value, pld_value=value,
@ -1492,12 +1491,6 @@ class Context:
strict_pld_parity=strict_pld_parity, strict_pld_parity=strict_pld_parity,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
except BaseException as err:
if not isinstance(err, MsgTypeError):
__tracebackhide__: bool = False
raise
# TODO: maybe a flag to by-pass encode op if already done # TODO: maybe a flag to by-pass encode op if already done
# here in caller? # here in caller?
@ -2192,6 +2185,11 @@ async def open_context_from_portal(
try: try:
result_or_err: Exception|Any = await ctx.result() result_or_err: Exception|Any = await ctx.result()
except BaseException as berr: except BaseException as berr:
# cancelled before (or maybe during?) final result capture
# if isinstance(trio.Cancelled, berr):
# from .devx import mk_pdb
# mk_pdb.set_trace()
# on normal teardown, if we get some error # on normal teardown, if we get some error
# raised in `Context.result()` we still want to # raised in `Context.result()` we still want to
# save that error on the ctx's state to # save that error on the ctx's state to
@ -2203,7 +2201,7 @@ async def open_context_from_portal(
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_err
raise raise
# yes this worx! # yes! this worx Bp
# from .devx import _debug # from .devx import _debug
# await _debug.pause() # await _debug.pause()

View File

@ -1232,13 +1232,14 @@ def _raise_from_unexpected_msg(
_raise_from_no_key_in_msg = _raise_from_unexpected_msg _raise_from_no_key_in_msg = _raise_from_unexpected_msg
def _mk_send_mte( def _mk_msg_type_err(
msg: Any|bytes|MsgType, msg: Any|bytes|MsgType,
codec: MsgCodec|MsgDec, codec: MsgCodec|MsgDec,
message: str|None = None, message: str|None = None,
verb_header: str = '', verb_header: str = '',
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None, src_type_error: TypeError|None = None,
is_invalid_payload: bool = False, is_invalid_payload: bool = False,
@ -1246,10 +1247,12 @@ def _mk_send_mte(
) -> MsgTypeError: ) -> MsgTypeError:
''' '''
Compose a `MsgTypeError` from a `Channel.send()`-side error, Compose a `MsgTypeError` from an input runtime context.
normally raised witih a runtime IPC `Context`.
''' '''
# `Channel.send()` case
if src_validation_error is None:
if isinstance(codec, MsgDec): if isinstance(codec, MsgDec):
raise RuntimeError( raise RuntimeError(
'`codec` must be a `MsgCodec` for send-side errors?' '`codec` must be a `MsgCodec` for send-side errors?'
@ -1295,27 +1298,8 @@ def _mk_send_mte(
msgtyperr.__cause__ = src_type_error msgtyperr.__cause__ = src_type_error
return msgtyperr return msgtyperr
# `Channel.recv()` case
def _mk_recv_mte( else:
msg: Any|bytes|MsgType,
codec: MsgCodec|MsgDec,
message: str|None = None,
verb_header: str = '',
src_validation_error: ValidationError|None = None,
is_invalid_payload: bool = False,
**mte_kwargs,
) -> MsgTypeError:
'''
Compose a `MsgTypeError` from a
`Channel|Context|MsgStream.receive()`-side error,
normally raised witih a runtime IPC ctx or streaming
block.
'''
msg_dict: dict|None = None msg_dict: dict|None = None
bad_msg: PayloadMsg|None = None bad_msg: PayloadMsg|None = None

View File

@ -49,8 +49,7 @@ from tractor._exceptions import (
MsgTypeError, MsgTypeError,
pack_from_raise, pack_from_raise,
TransportClosed, TransportClosed,
_mk_send_mte, _mk_msg_type_err,
_mk_recv_mte,
) )
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
@ -257,7 +256,7 @@ class MsgpackTCPStream(MsgTransport):
# and always raise such that spec violations # and always raise such that spec violations
# are never allowed to be caught silently! # are never allowed to be caught silently!
except msgspec.ValidationError as verr: except msgspec.ValidationError as verr:
msgtyperr: MsgTypeError = _mk_recv_mte( msgtyperr: MsgTypeError = _mk_msg_type_err(
msg=msg_bytes, msg=msg_bytes,
codec=codec, codec=codec,
src_validation_error=verr, src_validation_error=verr,
@ -322,7 +321,7 @@ class MsgpackTCPStream(MsgTransport):
if type(msg) not in msgtypes.__msg_types__: if type(msg) not in msgtypes.__msg_types__:
if strict_types: if strict_types:
raise _mk_send_mte( raise _mk_msg_type_err(
msg, msg,
codec=codec, codec=codec,
) )
@ -334,9 +333,8 @@ class MsgpackTCPStream(MsgTransport):
try: try:
bytes_data: bytes = codec.encode(msg) bytes_data: bytes = codec.encode(msg)
except TypeError as _err: except TypeError as typerr:
typerr = _err msgtyperr: MsgTypeError = _mk_msg_type_err(
msgtyperr: MsgTypeError = _mk_send_mte(
msg, msg,
codec=codec, codec=codec,
message=( message=(

View File

@ -47,7 +47,7 @@ from tractor._exceptions import (
InternalError, InternalError,
_raise_from_unexpected_msg, _raise_from_unexpected_msg,
MsgTypeError, MsgTypeError,
_mk_recv_mte, _mk_msg_type_err,
pack_error, pack_error,
) )
from tractor._state import current_ipc_ctx from tractor._state import current_ipc_ctx
@ -264,7 +264,7 @@ class PldRx(Struct):
# pack mgterr into error-msg for # pack mgterr into error-msg for
# reraise below; ensure remote-actor-err # reraise below; ensure remote-actor-err
# info is displayed nicely? # info is displayed nicely?
mte: MsgTypeError = _mk_recv_mte( mte: MsgTypeError = _mk_msg_type_err(
msg=msg, msg=msg,
codec=self.pld_dec, codec=self.pld_dec,
src_validation_error=valerr, src_validation_error=valerr,
@ -277,6 +277,19 @@ class PldRx(Struct):
if is_started_send_side: if is_started_send_side:
raise mte raise mte
# XXX TODO: remove this right?
# => any bad stated/return values should
# always be treated a remote errors right?
#
# if (
# expect_msg is Return
# or expect_msg is Started
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# NOTE: the `.message` is automatically # NOTE: the `.message` is automatically
# transferred into the message as long as we # transferred into the message as long as we
# define it as a `Error.message` field. # define it as a `Error.message` field.
@ -786,18 +799,13 @@ def validate_payload_msg(
# raise any msg type error NO MATTER WHAT! # raise any msg type error NO MATTER WHAT!
except ValidationError as verr: except ValidationError as verr:
try: mte: MsgTypeError = _mk_msg_type_err(
mte: MsgTypeError = _mk_recv_mte(
msg=roundtripped, msg=roundtripped,
codec=codec, codec=codec,
src_validation_error=verr, src_validation_error=verr,
verb_header='Trying to send ', verb_header='Trying to send ',
is_invalid_payload=True, is_invalid_payload=True,
) )
except BaseException:
__tracebackhide__: bool = False
raise
if not raise_mte: if not raise_mte:
return mte return mte

View File

@ -73,22 +73,11 @@ class PayloadMsg(
# as_array=True, # as_array=True,
): ):
''' '''
An abstract payload boxing/shuttling IPC msg type. The "god" boxing msg type.
Boxes data-values passed to/from user code Boxes user data-msgs in a `.pld` and uses `msgspec`'s tagged
unions support to enable a spec from a common msg inheritance
(i.e. any values passed by `tractor` application code using any of tree.
|_ `._streaming.MsgStream.send/receive()`
|_ `._context.Context.started/result()`
|_ `._ipc.Channel.send/recv()`
aka our "IPC primitive APIs")
as message "payloads" set to the `.pld` field and uses
`msgspec`'s "tagged unions" feature to support a subset of our
"SC-transitive shuttle protocol" specification with
a `msgspec.Struct` inheritance tree.
''' '''
cid: str # call/context-id cid: str # call/context-id