Compare commits

..

No commits in common. "af013912acbb5aabadae26d168ae5c45599c9f71" and "cf48fdecfeb76cdf67a3f779d98c086e75659dd6" have entirely different histories.

8 changed files with 197 additions and 290 deletions

View File

@ -49,21 +49,20 @@ from ._exceptions import (
InternalError, InternalError,
RemoteActorError, RemoteActorError,
StreamOverrun, StreamOverrun,
pack_from_raise, pack_error,
unpack_error, unpack_error,
_raise_from_no_key_in_msg, _raise_from_no_key_in_msg,
) )
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
Error,
MsgType,
MsgCodec,
NamespacePath, NamespacePath,
Msg,
Return, Return,
Started, Started,
Stop, Stop,
Yield, Yield,
current_codec, current_codec,
MsgCodec,
pretty_struct, pretty_struct,
) )
from ._ipc import Channel from ._ipc import Channel
@ -108,7 +107,8 @@ async def _drain_to_final_msg(
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
# from the far end. # from the far end.
pre_result_drained: list[MsgType] = [] # pre_result_drained: list[dict] = []
pre_result_drained: list[Msg] = []
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and not ctx._final_result_is_set()
@ -168,7 +168,7 @@ async def _drain_to_final_msg(
# pray to the `trio` gawds that we're corrent with this # pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._recv_chan.receive() # msg: dict = await ctx._recv_chan.receive()
msg: MsgType = await ctx._recv_chan.receive() msg: Msg = await ctx._recv_chan.receive()
# always capture unexpected/non-result msgs # always capture unexpected/non-result msgs
pre_result_drained.append(msg) pre_result_drained.append(msg)
@ -191,12 +191,13 @@ async def _drain_to_final_msg(
raise raise
match msg: match msg:
# final result arrived!
case Return( case Return(
# cid=cid, cid=cid,
pld=res, pld=res,
): ):
# try:
# ctx._result: Any = msg['return']
# ctx._result: Any = msg.pld
ctx._result: Any = res ctx._result: Any = res
log.runtime( log.runtime(
'Context delivered final draining msg:\n' 'Context delivered final draining msg:\n'
@ -209,9 +210,13 @@ async def _drain_to_final_msg(
# TODO: ^ we don't need it right? # TODO: ^ we don't need it right?
break break
# far end task is still streaming to us so discard # except KeyError:
# and report depending on local ctx state. # except AttributeError:
case Yield(): case Yield():
# if 'yield' in msg:
# far end task is still streaming to us so discard
# and report per local context state.
if ( if (
(ctx._stream.closed (ctx._stream.closed
and (reason := 'stream was already closed') and (reason := 'stream was already closed')
@ -252,34 +257,45 @@ async def _drain_to_final_msg(
) )
continue continue
# stream terminated, but no result yet..
#
# TODO: work out edge cases here where # TODO: work out edge cases here where
# a stream is open but the task also calls # a stream is open but the task also calls
# this? # this?
# -[ ] should be a runtime error if a stream is open right? # -[ ] should be a runtime error if a stream is open right?
# Stop() # Stop()
case Stop(): case Stop():
# elif 'stop' in msg:
log.cancel( log.cancel(
'Remote stream terminated due to "stop" msg:\n\n' 'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
continue continue
# remote error msg, likely already handled inside # It's an internal error if any other msg type without
# `Context._deliver_msg()` # a`'cid'` field arrives here!
case Error(): case _:
# if not msg.get('cid'):
if not msg.cid:
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
# TODO: can we replace this with `ctx.maybe_raise()`? # XXX fallthrough to handle expected error XXX
# -[ ] would this be handier for this case maybe? # TODO: replace this with `ctx.maybe_raise()`
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
# #
# TODO: would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
re: Exception|None = ctx._remote_error re: Exception|None = ctx._remote_error
if re: if re:
log.critical(
'Remote ctx terminated due to "error" msg:\n'
f'{re}'
)
assert msg is ctx._cancel_msg assert msg is ctx._cancel_msg
# NOTE: this solved a super duper edge case XD # NOTE: this solved a super dupe edge case XD
# this was THE super duper edge case of: # this was THE super duper edge case of:
# - local task opens a remote task, # - local task opens a remote task,
# - requests remote cancellation of far end # - requests remote cancellation of far end
@ -296,10 +312,9 @@ async def _drain_to_final_msg(
# does not re-raise any ctxc it receives # does not re-raise any ctxc it receives
# IFF **it** was the cancellation # IFF **it** was the cancellation
# requester.. # requester..
# # will raise if necessary, ow break from
# XXX will raise if necessary but ow break # loop presuming any error terminates the
# from loop presuming any supressed error # context!
# (ctxc) should terminate the context!
ctx._maybe_raise_remote_err( ctx._maybe_raise_remote_err(
re, re,
# NOTE: obvi we don't care if we # NOTE: obvi we don't care if we
@ -323,7 +338,6 @@ async def _drain_to_final_msg(
log.critical('SHOULD NEVER GET HERE!?') log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata assert error.msgdata == ctx._remote_error.msgdata
assert error.ipc_msg == ctx._remote_error.ipc_msg
from .devx._debug import pause from .devx._debug import pause
await pause() await pause()
ctx._maybe_cancel_and_set_remote_error(error) ctx._maybe_cancel_and_set_remote_error(error)
@ -332,20 +346,6 @@ async def _drain_to_final_msg(
else: else:
# bubble the original src key error # bubble the original src key error
raise raise
# XXX should pretty much never get here unless someone
# overrides the default `MsgType` spec.
case _:
# It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here!
if not msg.cid:
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
raise RuntimeError('Unknown msg type: {msg}')
else: else:
log.cancel( log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n' 'Skipping `MsgStream` drain since final outcome is set\n\n'
@ -1342,11 +1342,8 @@ class Context:
# `._cancel_called == True`. # `._cancel_called == True`.
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.msgdata['boxed_type_str'] == 'StreamOverrun'
and remote_error.boxed_type_str == 'StreamOverrun' and tuple(remote_error.msgdata['sender']) == our_uid
# and tuple(remote_error.msgdata['sender']) == our_uid
and tuple(remote_error.sender) == our_uid
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing" # cancellation" error-response thus "absorbing"
@ -1415,11 +1412,16 @@ class Context:
assert self._recv_chan assert self._recv_chan
raise_overrun: bool = not self._allow_overruns raise_overrun: bool = not self._allow_overruns
# res_placeholder: int = id(self)
if ( if (
# self._result == res_placeholder
# and not self._remote_error
self.maybe_error is None self.maybe_error is None
and # not self._remote_error
not self._recv_chan._closed # type: ignore # and not self._local_error
and not self._recv_chan._closed # type: ignore
): ):
# wait for a final context result/error by "draining" # wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield" # (by more or less ignoring) any bi-dir-stream "yield"
# msgs still in transit from the far end. # msgs still in transit from the far end.
@ -1430,6 +1432,7 @@ class Context:
for msg in drained_msgs: for msg in drained_msgs:
# TODO: mask this by default.. # TODO: mask this by default..
# if 'return' in msg:
if isinstance(msg, Return): if isinstance(msg, Return):
# from .devx import pause # from .devx import pause
# await pause() # await pause()
@ -1445,9 +1448,6 @@ class Context:
) )
self.maybe_raise( self.maybe_raise(
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
raise_overrun_from_self=( raise_overrun_from_self=(
raise_overrun raise_overrun
and and
@ -1458,12 +1458,34 @@ class Context:
(not self._cancel_called) (not self._cancel_called)
) )
) )
# if (
# (re := self._remote_error)
# # and self._result == res_placeholder
# ):
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=(
# raise_overrun
# and
# # only when we ARE NOT the canceller
# # should we raise overruns, bc ow we're
# # raising something we know might happen
# # during cancellation ;)
# (not self._cancel_called)
# ),
# )
# if maybe_err:
# self._result = maybe_err
return self.outcome return self.outcome
# TODO: switch this with above! # TODO: switch this with above which should be named
# -[ ] should be named `.wait_for_outcome()` and instead do # `.wait_for_outcome()` and instead do
# a `.outcome.Outcome.unwrap()` ? # a `.outcome.Outcome.unwrap()` ?
#
# @property # @property
# def result(self) -> Any|None: # def result(self) -> Any|None:
# if self._final_result_is_set(): # if self._final_result_is_set():
@ -1522,6 +1544,7 @@ class Context:
return None return None
def _final_result_is_set(self) -> bool: def _final_result_is_set(self) -> bool:
# return not (self._result == id(self))
return self._result is not Unresolved return self._result is not Unresolved
# def get_result_nowait(self) -> Any|None: # def get_result_nowait(self) -> Any|None:
@ -1738,7 +1761,8 @@ class Context:
async def _deliver_msg( async def _deliver_msg(
self, self,
msg: MsgType, # msg: dict,
msg: Msg,
) -> bool: ) -> bool:
''' '''
@ -1752,20 +1776,6 @@ class Context:
`._scope_nursery: trio.Nursery`) which ensures that such `._scope_nursery: trio.Nursery`) which ensures that such
messages are queued up and eventually sent if possible. messages are queued up and eventually sent if possible.
XXX RULES XXX
------ - ------
- NEVER raise remote errors from this method; a runtime task caller.
An error "delivered" to a ctx should always be raised by
the corresponding local task operating on the
`Portal`/`Context` APIs.
- NEVER `return` early before delivering the msg!
bc if the error is a ctxc and there is a task waiting on
`.result()` we need the msg to be
`send_chan.send_nowait()`-ed over the `._recv_chan` so
that the error is relayed to that waiter task and thus
raised in user code!
''' '''
cid: str = self.cid cid: str = self.cid
chan: Channel = self.chan chan: Channel = self.chan
@ -1796,14 +1806,28 @@ class Context:
) )
self._cancel_msg: dict = msg self._cancel_msg: dict = msg
# XXX NOTE: this will not raise an error, merely set # NOTE: this will not raise an error, merely set
# `._remote_error` and maybe cancel any task currently # `._remote_error` and maybe cancel any task currently
# entered in `Portal.open_context()` presuming the # entered in `Portal.open_context()` presuming the
# error is "cancel causing" (i.e. a `ContextCancelled` # error is "cancel causing" (i.e. a `ContextCancelled`
# or `RemoteActorError`). # or `RemoteActorError`).
self._maybe_cancel_and_set_remote_error(re) self._maybe_cancel_and_set_remote_error(re)
# XXX only case where returning early is fine! # XXX NEVER do this XXX..!!
# bc if the error is a ctxc and there is a task
# waiting on `.result()` we need the msg to be sent
# over the `send_chan`/`._recv_chan` so that the error
# is relayed to that waiter task..
# return True
#
# XXX ALSO NO!! XXX
# => NEVER raise remote errors from the calling
# runtime task, they should always be raised by
# consumer side tasks operating on the
# `Portal`/`Context` APIs.
# if self._remote_error:
# self._maybe_raise_remote_err(error)
if self._in_overrun: if self._in_overrun:
log.warning( log.warning(
f'Queueing OVERRUN msg on caller task:\n' f'Queueing OVERRUN msg on caller task:\n'
@ -1922,27 +1946,31 @@ class Context:
# anything different. # anything different.
return False return False
else: else:
# txt += f'\n{msg}\n'
# raise local overrun and immediately pack as IPC # raise local overrun and immediately pack as IPC
# msg for far end. # msg for far end.
err_msg: Error = pack_from_raise( try:
local_err=StreamOverrun( raise StreamOverrun(
txt, txt,
sender=from_uid, sender=from_uid,
),
cid=cid,
)
try:
# relay condition to sender side remote task
await chan.send(err_msg)
return True
# XXX: local consumer has closed their side of
# the IPC so cancel the far end streaming task
except trio.BrokenResourceError:
log.warning(
'Channel for ctx is already closed?\n'
f'|_{chan}\n'
) )
except StreamOverrun as err:
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
try:
# relay condition to sender side remote task
await chan.send(err_msg)
return True
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(
'Channel for ctx is already closed?\n'
f'|_{chan}\n'
)
# ow, indicate unable to deliver by default # ow, indicate unable to deliver by default
return False return False
@ -2351,17 +2379,28 @@ async def open_context_from_portal(
# an exception type boxed in a `RemoteActorError` # an exception type boxed in a `RemoteActorError`
# is returned (meaning it was obvi not raised) # is returned (meaning it was obvi not raised)
# that we want to log-report on. # that we want to log-report on.
match result_or_err: msgdata: str|None = getattr(
case ContextCancelled() as ctxc: result_or_err,
log.cancel(ctxc.tb_str) 'msgdata',
None
)
match (msgdata, result_or_err):
case (
{'tb_str': tbstr},
ContextCancelled(),
):
log.cancel(tbstr)
case RemoteActorError() as rae: case (
{'tb_str': tbstr},
RemoteActorError(),
):
log.exception( log.exception(
'Context remotely errored!\n' 'Context remotely errored!\n'
f'<= peer: {uid}\n' f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n' f' |_ {nsf}()\n\n'
f'{rae.tb_str}' f'{tbstr}'
) )
case (None, _): case (None, _):
log.runtime( log.runtime(
@ -2371,6 +2410,7 @@ async def open_context_from_portal(
f'`{result_or_err}`\n' f'`{result_or_err}`\n'
) )
finally: finally:
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the

View File

@ -38,6 +38,7 @@ from typing import (
Protocol, Protocol,
Type, Type,
TypeVar, TypeVar,
Union,
) )
import msgspec import msgspec
@ -46,9 +47,8 @@ import trio
from tractor.log import get_logger from tractor.log import get_logger
from tractor._exceptions import ( from tractor._exceptions import (
MsgTypeError,
pack_from_raise,
TransportClosed, TransportClosed,
MsgTypeError,
) )
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
@ -118,75 +118,40 @@ class MsgTransport(Protocol[MsgType]):
... ...
def _mk_msg_type_err( def _raise_msg_type_err(
msg: Any|bytes, msg: Any|bytes,
codec: MsgCodec, codec: MsgCodec,
validation_err: msgspec.ValidationError|None = None,
message: str|None = None,
verb_header: str = '', verb_header: str = '',
src_validation_error: msgspec.ValidationError|None = None, ) -> None:
src_type_error: TypeError|None = None,
) -> MsgTypeError: # if side == 'send':
if validation_err is None: # send-side
# `Channel.send()` case import traceback
if src_validation_error is None: # send-side from tractor._exceptions import pformat_boxed_tb
# no src error from `msgspec.msgpack.Decoder.decode()` so fmt_spec: str = '\n'.join(
# prolly a manual type-check on our part. map(str, codec.msg_spec.__args__)
if message is None: )
import traceback fmt_stack: str = (
from tractor._exceptions import pformat_boxed_tb '\n'.join(traceback.format_stack(limit=3))
)
fmt_spec: str = '\n'.join( tb_fmt: str = pformat_boxed_tb(
map(str, codec.msg_spec.__args__) tb_str=fmt_stack,
) # fields_str=header,
fmt_stack: str = ( field_prefix=' ',
'\n'.join(traceback.format_stack(limit=3)) indent='',
) )
tb_fmt: str = pformat_boxed_tb( raise MsgTypeError(
tb_str=fmt_stack, f'invalid msg -> {msg}: {type(msg)}\n\n'
# fields_str=header, f'{tb_fmt}\n'
field_prefix=' ', f'Valid IPC msgs are:\n\n'
indent='', # f' ------ - ------\n'
) f'{fmt_spec}\n'
message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n',
)
elif src_type_error:
src_message: str = str(src_type_error)
patt: str = 'type '
type_idx: int = src_message.find('type ')
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
enc_hook: Callable|None = codec.enc.enc_hook
if enc_hook is None:
message += (
'\n\n'
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
f'Check the `msgspec` docs for ad-hoc type extending:\n'
'|_ https://jcristharif.com/msgspec/extending.html\n'
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
)
msgtyperr = MsgTypeError(
message=message,
ipc_msg=msg,
) )
# ya, might be `None`
msgtyperr.__cause__ = src_type_error
return msgtyperr
# `Channel.recv()` case
else: else:
# decode the msg-bytes using the std msgpack # decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any # interchange-prot (i.e. without any
@ -196,31 +161,29 @@ def _mk_msg_type_err(
msg_dict: dict = msgspec.msgpack.decode(msg) msg_dict: dict = msgspec.msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type'] msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name) msg_type = getattr(msgtypes, msg_type_name)
message: str = ( errmsg: str = (
f'invalid `{msg_type_name}` IPC msg\n\n' f'invalid `{msg_type_name}` IPC msg\n\n'
) )
if verb_header: if verb_header:
message = f'{verb_header} ' + message errmsg = f'{verb_header} ' + errmsg
# XXX see if we can determine the exact invalid field # XXX see if we can determine the exact invalid field
# such that we can comprehensively report the # such that we can comprehensively report the
# specific field's type problem # specific field's type problem
msgspec_msg: str = src_validation_error.args[0].rstrip('`') msgspec_msg: str = validation_err.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.') msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object() obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj: if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
message += ( field_type: Union[Type] = msg_type.__signature__.parameters[
maybe_field
].annotation
errmsg += (
f'{msg.rstrip("`")}\n\n' f'{msg.rstrip("`")}\n\n'
f'{msg_type}\n' f'{msg_type}\n'
f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n' f' |_.{maybe_field}: {field_type} = {field_val!r}\n'
) )
msgtyperr = MsgTypeError.from_decode( raise MsgTypeError(errmsg) from validation_err
message=message,
msgdict=msg_dict,
)
msgtyperr.__cause__ = src_validation_error
return msgtyperr
# TODO: not sure why we have to inherit here, but it seems to be an # TODO: not sure why we have to inherit here, but it seems to be an
@ -362,15 +325,12 @@ class MsgpackTCPStream(MsgTransport):
# and always raise such that spec violations # and always raise such that spec violations
# are never allowed to be caught silently! # are never allowed to be caught silently!
except msgspec.ValidationError as verr: except msgspec.ValidationError as verr:
msgtyperr: MsgTypeError = _mk_msg_type_err( # re-raise as type error
_raise_msg_type_err(
msg=msg_bytes, msg=msg_bytes,
codec=codec, codec=codec,
src_validation_error=verr, validation_err=verr,
) )
# XXX deliver up to `Channel.recv()` where
# a re-raise and `Error`-pack can inject the far
# end actor `.uid`.
yield msgtyperr
except ( except (
msgspec.DecodeError, msgspec.DecodeError,
@ -427,7 +387,7 @@ class MsgpackTCPStream(MsgTransport):
if type(msg) not in msgtypes.__msg_types__: if type(msg) not in msgtypes.__msg_types__:
if strict_types: if strict_types:
raise _mk_msg_type_err( _raise_msg_type_err(
msg, msg,
codec=codec, codec=codec,
) )
@ -440,16 +400,11 @@ class MsgpackTCPStream(MsgTransport):
try: try:
bytes_data: bytes = codec.encode(msg) bytes_data: bytes = codec.encode(msg)
except TypeError as typerr: except TypeError as typerr:
msgtyperr: MsgTypeError = _mk_msg_type_err( raise MsgTypeError(
msg, 'A msg field violates the current spec\n'
codec=codec, f'{codec.pld_spec}\n\n'
message=( f'{pretty_struct.Struct.pformat(msg)}'
f'IPC-msg-spec violation in\n\n' ) from typerr
f'{pretty_struct.Struct.pformat(msg)}'
),
src_type_error=typerr,
)
raise msgtyperr from typerr
# supposedly the fastest says, # supposedly the fastest says,
# https://stackoverflow.com/a/54027962 # https://stackoverflow.com/a/54027962
@ -764,35 +719,13 @@ class Channel:
assert self._transport assert self._transport
while True: while True:
try: try:
async for msg in self._transport: async for item in self._transport:
match msg: yield item
# NOTE: if transport/interchange delivers
# a type error, we pack it with the far
# end peer `Actor.uid` and relay the
# `Error`-msg upward to the `._rpc` stack
# for normal RAE handling.
case MsgTypeError():
yield pack_from_raise(
local_err=msg,
cid=msg.cid,
# XXX we pack it here bc lower
# layers have no notion of an
# actor-id ;)
src_uid=self.uid,
)
case _:
yield msg
# TODO: if we were gonna do this it should be
# done up at the `MsgStream` layer!
#
# sent = yield item # sent = yield item
# if sent is not None: # if sent is not None:
# # optimization, passing None through all the # # optimization, passing None through all the
# # time is pointless # # time is pointless
# await self._transport.send(sent) # await self._transport.send(sent)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# if not self._autorecon: # if not self._autorecon:

View File

@ -46,7 +46,6 @@ from ._state import (
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
Error,
NamespacePath, NamespacePath,
Return, Return,
) )
@ -70,7 +69,8 @@ log = get_logger(__name__)
# `._raise_from_no_key_in_msg()` (after tweak to # `._raise_from_no_key_in_msg()` (after tweak to
# accept a `chan: Channel` arg) in key block! # accept a `chan: Channel` arg) in key block!
def _unwrap_msg( def _unwrap_msg(
msg: Return|Error, # msg: dict[str, Any],
msg: Return,
channel: Channel, channel: Channel,
hide_tb: bool = True, hide_tb: bool = True,

View File

@ -47,13 +47,12 @@ from ._context import (
Context, Context,
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled,
ModuleNotExposed, ModuleNotExposed,
MsgTypeError,
TransportClosed,
is_multi_cancelled, is_multi_cancelled,
ContextCancelled,
pack_error, pack_error,
unpack_error, unpack_error,
TransportClosed,
) )
from .devx import ( from .devx import (
maybe_wait_for_debugger, maybe_wait_for_debugger,
@ -637,7 +636,7 @@ async def _invoke(
# (callee) task, so relay this cancel signal to the # (callee) task, so relay this cancel signal to the
# other side. # other side.
ctxc = ContextCancelled( ctxc = ContextCancelled(
message=msg, msg,
boxed_type=trio.Cancelled, boxed_type=trio.Cancelled,
canceller=canceller, canceller=canceller,
) )
@ -827,12 +826,7 @@ async def process_messages(
| Stop(cid=cid) | Stop(cid=cid)
| Return(cid=cid) | Return(cid=cid)
| CancelAck(cid=cid) | CancelAck(cid=cid)
| Error(cid=cid) # RPC-task ctx specific
# `.cid` means RPC-ctx-task specific
| Error(cid=cid)
# recv-side `MsgType` decode violation
| MsgTypeError(cid=cid)
): ):
# deliver response to local caller/waiter # deliver response to local caller/waiter
# via its per-remote-context memory channel. # via its per-remote-context memory channel.

View File

@ -49,6 +49,7 @@ from pprint import pformat
import signal import signal
import sys import sys
from typing import ( from typing import (
Any,
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )

View File

@ -19,6 +19,7 @@ Built-in messaging patterns, types, APIs and helpers.
''' '''
from typing import ( from typing import (
Union,
TypeAlias, TypeAlias,
) )
from .ptr import ( from .ptr import (
@ -55,9 +56,8 @@ from .types import (
# full msg class set from above as list # full msg class set from above as list
__msg_types__ as __msg_types__, __msg_types__ as __msg_types__,
# type-alias for union of all msgs
MsgType as MsgType,
) )
# TODO: use new type declaration syntax for msg-type-spec
__msg_spec__: TypeAlias = MsgType # https://docs.python.org/3/library/typing.html#type-aliases
# https://docs.python.org/3/reference/simple_stmts.html#type
__msg_spec__: TypeAlias = Union[*__msg_types__]

View File

@ -57,7 +57,7 @@ from trio.lowlevel import (
from tractor.msg.pretty_struct import Struct from tractor.msg.pretty_struct import Struct
from tractor.msg.types import ( from tractor.msg.types import (
mk_msg_spec, mk_msg_spec,
MsgType, Msg,
) )
@ -87,50 +87,12 @@ class MsgCodec(Struct):
pld_spec: Union[Type[Struct]]|None pld_spec: Union[Type[Struct]]|None
@property
def pld_spec_str(self) -> str:
spec: Union[Type]|Type = self.pld_spec
# TODO: could also use match: instead?
if getattr(spec, '__args__', False):
# `typing.Union` case
return str(spec)
else:
return spec.__name__
# struct type unions # struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
@property @property
def msg_spec(self) -> Union[Type[Struct]]: def msg_spec(self) -> Union[Type[Struct]]:
return self._dec.type return self._dec.type
def msg_spec_items(
self,
msg: MsgType|None = None,
) -> dict[str, MsgType]|str:
msgt_table: dict[str, MsgType] = {
msgt: str(msgt)
for msgt in self.msg_spec.__args__
}
if msg:
msgt: MsgType = type(msg)
str_repr: str = msgt_table[msgt]
return {msgt: str_repr}
return msgt_table
# TODO: some way to make `pretty_struct.Struct` use this
# wrapped field over the `.msg_spec` one?
def pformat_msg_spec(
self,
msg: MsgType|None = None,
) -> str:
return '\n'.join(
self.msg_spec_items(msg=msg).values()
)
lib: ModuleType = msgspec lib: ModuleType = msgspec
# TODO: a sub-decoder system as well? # TODO: a sub-decoder system as well?
@ -146,7 +108,7 @@ class MsgCodec(Struct):
# OR # OR
# ) = { # ) = {
# # pre-seed decoders for std-py-type-set for use when # # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`. # # `Msg.pld == None|Any`.
# None: msgpack.Decoder(Any), # None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any), # Any: msgpack.Decoder(Any),
# } # }
@ -341,7 +303,7 @@ def mk_codec(
# by `tag_field: str` value key? # by `tag_field: str` value key?
# payload_msg_specs: dict[ # payload_msg_specs: dict[
# str, # tag_field value as sub-decoder key # str, # tag_field value as sub-decoder key
# Union[Type[Struct]] # `MsgType.pld` type spec # Union[Type[Struct]] # `Msg.pld` type spec
# ]|None = None, # ]|None = None,
libname: str = 'msgspec', libname: str = 'msgspec',
@ -374,7 +336,7 @@ def mk_codec(
raise RuntimeError( raise RuntimeError(
f'If a payload spec is provided,\n' f'If a payload spec is provided,\n'
"the builtin SC-shuttle-protocol's msg set\n" "the builtin SC-shuttle-protocol's msg set\n"
f'(i.e. a `{MsgType}`) MUST be used!\n\n' f'(i.e. `{Msg}`) MUST be used!\n\n'
f'However both values were passed as => mk_codec(\n' f'However both values were passed as => mk_codec(\n'
f' ipc_msg_spec={ipc_msg_spec}`\n' f' ipc_msg_spec={ipc_msg_spec}`\n'
f' ipc_pld_spec={ipc_pld_spec}`\n)\n' f' ipc_pld_spec={ipc_pld_spec}`\n)\n'

View File

@ -31,7 +31,6 @@ from typing import (
Literal, Literal,
Type, Type,
TypeVar, TypeVar,
TypeAlias,
Union, Union,
) )
@ -401,29 +400,16 @@ class CancelAck(
pld: bool pld: bool
# TODO: unify this with `._exceptions.RemoteActorError`
# such that we can have a msg which is both raisable and
# IPC-wire ready?
# B~o
class Error( class Error(
Struct, Struct,
tag=True, tag=True,
tag_field='msg_type', tag_field='msg_type',
# TODO may omit defaults?
# https://jcristharif.com/msgspec/structs.html#omitting-default-values
# omit_defaults=True,
): ):
''' '''
A pkt that wraps `RemoteActorError`s for relay and raising. A pkt that wraps `RemoteActorError`s for relay and raising.
Fields are 1-to-1 meta-data as needed originally by Fields are 1-to-1 meta-data as needed originally by
`RemoteActorError.msgdata: dict` but now are defined here. `RemoteActorError.msgdata: dict`.
Note: this msg shuttles `ContextCancelled` and `StreamOverrun`
as well is used to rewrap any `MsgTypeError` for relay-reponse
to bad `Yield.pld` senders during an IPC ctx's streaming dialog
phase.
''' '''
src_uid: tuple[str, str] src_uid: tuple[str, str]
@ -442,10 +428,6 @@ class Error(
# `StreamOverrun` # `StreamOverrun`
sender: tuple[str, str]|None = None sender: tuple[str, str]|None = None
# for the `MsgTypeError` case where the receiver side
# decodes the underlying original `Msg`-subtype
_msg_dict: dict|None = None
# TODO: should be make a msg version of `ContextCancelled?` # TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`? # and/or with a scope field or a full `ActorCancelled`?
@ -504,11 +486,6 @@ __msg_types__: list[Msg] = (
_payload_msgs _payload_msgs
) )
# TODO: use new type declaration syntax for msg-type-spec
# https://docs.python.org/3/library/typing.html#type-aliases
# https://docs.python.org/3/reference/simple_stmts.html#type
MsgType: TypeAlias = Union[*__msg_types__]
def mk_msg_spec( def mk_msg_spec(
payload_type_union: Union[Type] = Any, payload_type_union: Union[Type] = Any,