Compare commits
5 Commits
9b4f580470
...
a51632ffa6
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | a51632ffa6 | |
Tyler Goodlet | 0df7d557db | |
Tyler Goodlet | 7b020c42cc | |
Tyler Goodlet | d18cf32e28 | |
Tyler Goodlet | dd6a4d49d8 |
|
@ -24,6 +24,7 @@ import importlib
|
|||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
@ -32,8 +33,11 @@ import traceback
|
|||
|
||||
import trio
|
||||
from msgspec import (
|
||||
structs,
|
||||
defstruct,
|
||||
msgpack,
|
||||
Raw,
|
||||
structs,
|
||||
ValidationError,
|
||||
)
|
||||
|
||||
from tractor._state import current_actor
|
||||
|
@ -44,6 +48,8 @@ from tractor.msg import (
|
|||
Stop,
|
||||
Yield,
|
||||
types as msgtypes,
|
||||
MsgCodec,
|
||||
MsgDec,
|
||||
)
|
||||
from tractor.msg.pretty_struct import (
|
||||
iter_fields,
|
||||
|
@ -932,7 +938,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def _raise_from_no_key_in_msg(
|
||||
def _raise_from_unexpected_msg(
|
||||
ctx: Context,
|
||||
msg: MsgType,
|
||||
src_err: AttributeError,
|
||||
|
@ -972,8 +978,6 @@ def _raise_from_no_key_in_msg(
|
|||
# an internal error should never get here
|
||||
try:
|
||||
cid: str = msg.cid
|
||||
# cid: str = msg['cid']
|
||||
# except KeyError as src_err:
|
||||
except AttributeError as src_err:
|
||||
raise MessagingError(
|
||||
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
|
||||
|
@ -985,7 +989,6 @@ def _raise_from_no_key_in_msg(
|
|||
# TODO: test that shows stream raising an expected error!!!
|
||||
|
||||
# raise the error message in a boxed exception type!
|
||||
# if msg.get('error'):
|
||||
if isinstance(msg, Error):
|
||||
# match msg:
|
||||
# case Error():
|
||||
|
@ -1001,7 +1004,6 @@ def _raise_from_no_key_in_msg(
|
|||
# the stream._eoc outside this in the calleer always?
|
||||
# case Stop():
|
||||
elif (
|
||||
# msg.get('stop')
|
||||
isinstance(msg, Stop)
|
||||
or (
|
||||
stream
|
||||
|
@ -1036,7 +1038,6 @@ def _raise_from_no_key_in_msg(
|
|||
# that arrived which is probably the source of this stream
|
||||
# closure
|
||||
ctx.maybe_raise()
|
||||
|
||||
raise eoc from src_err
|
||||
|
||||
if (
|
||||
|
@ -1056,3 +1057,128 @@ def _raise_from_no_key_in_msg(
|
|||
" BUT received a non-error msg:\n"
|
||||
f'{pformat(msg)}'
|
||||
) from src_err
|
||||
|
||||
|
||||
_raise_from_no_key_in_msg = _raise_from_unexpected_msg
|
||||
|
||||
|
||||
def _mk_msg_type_err(
|
||||
msg: Any|bytes|Raw,
|
||||
codec: MsgCodec|MsgDec,
|
||||
|
||||
message: str|None = None,
|
||||
verb_header: str = '',
|
||||
|
||||
src_validation_error: ValidationError|None = None,
|
||||
src_type_error: TypeError|None = None,
|
||||
|
||||
) -> MsgTypeError:
|
||||
'''
|
||||
Compose a `MsgTypeError` from an input runtime context.
|
||||
|
||||
'''
|
||||
# `Channel.send()` case
|
||||
if src_validation_error is None:
|
||||
|
||||
if isinstance(codec, MsgDec):
|
||||
raise RuntimeError(
|
||||
'`codec` must be a `MsgCodec` for send-side errors?'
|
||||
)
|
||||
|
||||
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||
# prolly a manual type-check on our part.
|
||||
if message is None:
|
||||
fmt_spec: str = codec.pformat_msg_spec()
|
||||
fmt_stack: str = (
|
||||
'\n'.join(traceback.format_stack(limit=3))
|
||||
)
|
||||
tb_fmt: str = pformat_boxed_tb(
|
||||
tb_str=fmt_stack,
|
||||
# fields_str=header,
|
||||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
message: str = (
|
||||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||
f'{tb_fmt}\n'
|
||||
f'Valid IPC msgs are:\n\n'
|
||||
# f' ------ - ------\n'
|
||||
f'{fmt_spec}\n',
|
||||
)
|
||||
elif src_type_error:
|
||||
src_message: str = str(src_type_error)
|
||||
patt: str = 'type '
|
||||
type_idx: int = src_message.find('type ')
|
||||
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
|
||||
|
||||
enc_hook: Callable|None = codec.enc.enc_hook
|
||||
if enc_hook is None:
|
||||
message += (
|
||||
'\n\n'
|
||||
|
||||
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
|
||||
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
|
||||
|
||||
f'Check the `msgspec` docs for ad-hoc type extending:\n'
|
||||
'|_ https://jcristharif.com/msgspec/extending.html\n'
|
||||
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
||||
)
|
||||
|
||||
|
||||
msgtyperr = MsgTypeError(
|
||||
message=message,
|
||||
ipc_msg=msg,
|
||||
)
|
||||
# ya, might be `None`
|
||||
msgtyperr.__cause__ = src_type_error
|
||||
return msgtyperr
|
||||
|
||||
# `Channel.recv()` case
|
||||
else:
|
||||
# decode the msg-bytes using the std msgpack
|
||||
# interchange-prot (i.e. without any
|
||||
# `msgspec.Struct` handling) so that we can
|
||||
# determine what `.msg.types.Msg` is the culprit
|
||||
# by reporting the received value.
|
||||
msg_dict: dict = msgpack.decode(msg)
|
||||
msg_type_name: str = msg_dict['msg_type']
|
||||
msg_type = getattr(msgtypes, msg_type_name)
|
||||
message: str = (
|
||||
f'invalid `{msg_type_name}` IPC msg\n\n'
|
||||
)
|
||||
if verb_header:
|
||||
message = f'{verb_header} ' + message
|
||||
|
||||
# XXX see if we can determine the exact invalid field
|
||||
# such that we can comprehensively report the
|
||||
# specific field's type problem
|
||||
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
|
||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||
obj = object()
|
||||
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
||||
field_name_expr: str = (
|
||||
f' |_{maybe_field}: {codec.pld_spec_str} = '
|
||||
)
|
||||
fmt_val_lines: list[str] = pformat(field_val).splitlines()
|
||||
fmt_val: str = (
|
||||
f'{fmt_val_lines[0]}\n'
|
||||
+
|
||||
textwrap.indent(
|
||||
'\n'.join(fmt_val_lines[1:]),
|
||||
prefix=' '*len(field_name_expr),
|
||||
)
|
||||
)
|
||||
message += (
|
||||
f'{msg.rstrip("`")}\n\n'
|
||||
f'<{msg_type.__qualname__}(\n'
|
||||
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
|
||||
f'{field_name_expr}{fmt_val}\n'
|
||||
f')>'
|
||||
)
|
||||
|
||||
msgtyperr = MsgTypeError.from_decode(
|
||||
message=message,
|
||||
msgdict=msg_dict,
|
||||
)
|
||||
msgtyperr.__cause__ = src_validation_error
|
||||
return msgtyperr
|
||||
|
|
130
tractor/_ipc.py
130
tractor/_ipc.py
|
@ -49,6 +49,7 @@ from tractor._exceptions import (
|
|||
MsgTypeError,
|
||||
pack_from_raise,
|
||||
TransportClosed,
|
||||
_mk_msg_type_err,
|
||||
)
|
||||
from tractor.msg import (
|
||||
_ctxvar_MsgCodec,
|
||||
|
@ -118,127 +119,6 @@ class MsgTransport(Protocol[MsgType]):
|
|||
...
|
||||
|
||||
|
||||
def _mk_msg_type_err(
|
||||
msg: Any|bytes,
|
||||
codec: MsgCodec,
|
||||
|
||||
message: str|None = None,
|
||||
verb_header: str = '',
|
||||
|
||||
src_validation_error: msgspec.ValidationError|None = None,
|
||||
src_type_error: TypeError|None = None,
|
||||
|
||||
) -> MsgTypeError:
|
||||
|
||||
import textwrap
|
||||
|
||||
# `Channel.send()` case
|
||||
if src_validation_error is None: # send-side
|
||||
|
||||
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||
# prolly a manual type-check on our part.
|
||||
if message is None:
|
||||
import traceback
|
||||
from tractor._exceptions import pformat_boxed_tb
|
||||
|
||||
fmt_spec: str = '\n'.join(
|
||||
map(str, codec.msg_spec.__args__)
|
||||
)
|
||||
fmt_stack: str = (
|
||||
'\n'.join(traceback.format_stack(limit=3))
|
||||
)
|
||||
tb_fmt: str = pformat_boxed_tb(
|
||||
tb_str=fmt_stack,
|
||||
# fields_str=header,
|
||||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
message: str = (
|
||||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||
f'{tb_fmt}\n'
|
||||
f'Valid IPC msgs are:\n\n'
|
||||
# f' ------ - ------\n'
|
||||
f'{fmt_spec}\n',
|
||||
)
|
||||
elif src_type_error:
|
||||
src_message: str = str(src_type_error)
|
||||
patt: str = 'type '
|
||||
type_idx: int = src_message.find('type ')
|
||||
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
|
||||
|
||||
enc_hook: Callable|None = codec.enc.enc_hook
|
||||
if enc_hook is None:
|
||||
message += (
|
||||
'\n\n'
|
||||
|
||||
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
|
||||
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
|
||||
|
||||
f'Check the `msgspec` docs for ad-hoc type extending:\n'
|
||||
'|_ https://jcristharif.com/msgspec/extending.html\n'
|
||||
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
||||
)
|
||||
|
||||
|
||||
msgtyperr = MsgTypeError(
|
||||
message=message,
|
||||
ipc_msg=msg,
|
||||
)
|
||||
# ya, might be `None`
|
||||
msgtyperr.__cause__ = src_type_error
|
||||
return msgtyperr
|
||||
|
||||
# `Channel.recv()` case
|
||||
else:
|
||||
# decode the msg-bytes using the std msgpack
|
||||
# interchange-prot (i.e. without any
|
||||
# `msgspec.Struct` handling) so that we can
|
||||
# determine what `.msg.types.Msg` is the culprit
|
||||
# by reporting the received value.
|
||||
msg_dict: dict = msgspec.msgpack.decode(msg)
|
||||
msg_type_name: str = msg_dict['msg_type']
|
||||
msg_type = getattr(msgtypes, msg_type_name)
|
||||
message: str = (
|
||||
f'invalid `{msg_type_name}` IPC msg\n\n'
|
||||
)
|
||||
if verb_header:
|
||||
message = f'{verb_header} ' + message
|
||||
|
||||
# XXX see if we can determine the exact invalid field
|
||||
# such that we can comprehensively report the
|
||||
# specific field's type problem
|
||||
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
|
||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||
obj = object()
|
||||
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
||||
field_name_expr: str = (
|
||||
f' |_{maybe_field}: {codec.pld_spec_str} = '
|
||||
)
|
||||
fmt_val_lines: list[str] = pformat(field_val).splitlines()
|
||||
fmt_val: str = (
|
||||
f'{fmt_val_lines[0]}\n'
|
||||
+
|
||||
textwrap.indent(
|
||||
'\n'.join(fmt_val_lines[1:]),
|
||||
prefix=' '*len(field_name_expr),
|
||||
)
|
||||
)
|
||||
message += (
|
||||
f'{msg.rstrip("`")}\n\n'
|
||||
f'<{msg_type.__qualname__}(\n'
|
||||
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
|
||||
f'{field_name_expr}{fmt_val}\n'
|
||||
f')>'
|
||||
)
|
||||
|
||||
msgtyperr = MsgTypeError.from_decode(
|
||||
message=message,
|
||||
msgdict=msg_dict,
|
||||
)
|
||||
msgtyperr.__cause__ = src_validation_error
|
||||
return msgtyperr
|
||||
|
||||
|
||||
# TODO: not sure why we have to inherit here, but it seems to be an
|
||||
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
|
||||
# probably should make a `mypy` issue?
|
||||
|
@ -299,10 +179,10 @@ class MsgpackTCPStream(MsgTransport):
|
|||
_codec._ctxvar_MsgCodec.get()
|
||||
)
|
||||
# TODO: mask out before release?
|
||||
log.runtime(
|
||||
f'New {self} created with codec\n'
|
||||
f'codec: {self._codec}\n'
|
||||
)
|
||||
# log.runtime(
|
||||
# f'New {self} created with codec\n'
|
||||
# f'codec: {self._codec}\n'
|
||||
# )
|
||||
|
||||
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
||||
'''
|
||||
|
|
|
@ -34,6 +34,7 @@ from ._codec import (
|
|||
apply_codec as apply_codec,
|
||||
mk_codec as mk_codec,
|
||||
MsgCodec as MsgCodec,
|
||||
MsgDec as MsgDec,
|
||||
current_codec as current_codec,
|
||||
)
|
||||
|
||||
|
@ -50,6 +51,7 @@ from .types import (
|
|||
Yield as Yield,
|
||||
Stop as Stop,
|
||||
Return as Return,
|
||||
CancelAck as CancelAck,
|
||||
|
||||
Error as Error,
|
||||
|
||||
|
|
|
@ -33,25 +33,29 @@ from __future__ import annotations
|
|||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
)
|
||||
# from contextvars import (
|
||||
# ContextVar,
|
||||
# Token,
|
||||
# )
|
||||
from contextvars import (
|
||||
ContextVar,
|
||||
Token,
|
||||
)
|
||||
import textwrap
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
Union,
|
||||
)
|
||||
from types import ModuleType
|
||||
|
||||
import msgspec
|
||||
from msgspec import msgpack
|
||||
from trio.lowlevel import (
|
||||
RunVar,
|
||||
RunVarToken,
|
||||
from msgspec import (
|
||||
msgpack,
|
||||
Raw,
|
||||
)
|
||||
# from trio.lowlevel import (
|
||||
# RunVar,
|
||||
# RunVarToken,
|
||||
# )
|
||||
# TODO: see notes below from @mikenerone..
|
||||
# from tricycle import TreeVar
|
||||
|
||||
|
@ -62,8 +66,113 @@ from tractor.msg.types import (
|
|||
)
|
||||
from tractor.log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._context import Context
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
# TODO: unify with `MsgCodec` by making `._dec` part this?
|
||||
class MsgDec(Struct):
|
||||
'''
|
||||
An IPC msg decoder.
|
||||
|
||||
Normally used to decode only a payload: `MsgType.pld:
|
||||
PayloadT` field before delivery to IPC consumer code.
|
||||
|
||||
'''
|
||||
_dec: msgpack.Decoder
|
||||
|
||||
@property
|
||||
def dec(self) -> msgpack.Decoder:
|
||||
return self._dec
|
||||
|
||||
# struct type unions
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
#
|
||||
# ^-TODO-^: make a wrapper type for this such that alt
|
||||
# backends can be represented easily without a `Union` needed,
|
||||
# AND so that we have better support for wire transport.
|
||||
#
|
||||
# -[ ] maybe `FieldSpec` is a good name since msg-spec
|
||||
# better applies to a `MsgType[FieldSpec]`?
|
||||
#
|
||||
# -[ ] both as part of the `.open_context()` call AND as part of the
|
||||
# immediate ack-reponse (see similar below)
|
||||
# we should do spec matching and fail if anything is awry?
|
||||
#
|
||||
# -[ ] eventually spec should be generated/parsed from the
|
||||
# type-annots as # desired in GH issue:
|
||||
# https://github.com/goodboy/tractor/issues/365
|
||||
#
|
||||
# -[ ] semantics of the mismatch case
|
||||
# - when caller-callee specs we should raise
|
||||
# a `MsgTypeError` or `MsgSpecError` or similar?
|
||||
#
|
||||
# -[ ] wrapper types for both spec types such that we can easily
|
||||
# IPC transport them?
|
||||
# - `TypeSpec: Union[Type]`
|
||||
# * also a `.__contains__()` for doing `None in
|
||||
# TypeSpec[None|int]` since rn you need to do it on
|
||||
# `.__args__` for unions..
|
||||
# - `MsgSpec: Union[Type[Msg]]
|
||||
#
|
||||
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
|
||||
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
|
||||
# |_ historical pep 695: https://peps.python.org/pep-0695/
|
||||
# |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/
|
||||
# |_ on annotation scopes:
|
||||
# https://docs.python.org/3/reference/executionmodel.html#annotation-scopes
|
||||
# |_ 3.13 will have subscriptable funcs Bo
|
||||
# https://peps.python.org/pep-0718/
|
||||
@property
|
||||
def spec(self) -> Union[Type[Struct]]:
|
||||
# NOTE: defined and applied inside `mk_codec()`
|
||||
return self._dec.type
|
||||
|
||||
# no difference, as compared to a `MsgCodec` which defines the
|
||||
# `MsgType.pld: PayloadT` part of its spec separately
|
||||
pld_spec = spec
|
||||
|
||||
# TODO: would get moved into `FieldSpec.__str__()` right?
|
||||
@property
|
||||
def spec_str(self) -> str:
|
||||
|
||||
# TODO: could also use match: instead?
|
||||
spec: Union[Type]|Type = self.spec
|
||||
|
||||
# `typing.Union` case
|
||||
if getattr(spec, '__args__', False):
|
||||
return str(spec)
|
||||
|
||||
# just a single type
|
||||
else:
|
||||
return spec.__name__
|
||||
|
||||
pld_spec_str = spec_str
|
||||
|
||||
def decode(
|
||||
self,
|
||||
raw: Raw|bytes,
|
||||
) -> Any:
|
||||
return self._dec.decode(raw)
|
||||
|
||||
@property
|
||||
def hook(self) -> Callable|None:
|
||||
return self._dec.dec_hook
|
||||
|
||||
|
||||
def mk_dec(
|
||||
spec: Union[Type[Struct]]|Any = Any,
|
||||
dec_hook: Callable|None = None,
|
||||
|
||||
) -> MsgDec:
|
||||
|
||||
return msgpack.Decoder(
|
||||
type=spec, # like `Msg[Any]`
|
||||
dec_hook=dec_hook,
|
||||
)
|
||||
|
||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||
#
|
||||
# -[ ] API changes towards being interchange lib agnostic!
|
||||
|
@ -87,8 +196,7 @@ class MsgCodec(Struct):
|
|||
'''
|
||||
_enc: msgpack.Encoder
|
||||
_dec: msgpack.Decoder
|
||||
|
||||
pld_spec: Union[Type[Struct]]|None
|
||||
_pld_spec: Type[Struct]|Raw|Any
|
||||
|
||||
def __repr__(self) -> str:
|
||||
speclines: str = textwrap.indent(
|
||||
|
@ -111,14 +219,21 @@ class MsgCodec(Struct):
|
|||
')>'
|
||||
)
|
||||
|
||||
@property
|
||||
def pld_spec(self) -> Type[Struct]|Raw|Any:
|
||||
return self._pld_spec
|
||||
|
||||
@property
|
||||
def pld_spec_str(self) -> str:
|
||||
spec: Union[Type]|Type = self.pld_spec
|
||||
|
||||
# TODO: could also use match: instead?
|
||||
if getattr(spec, '__args__', False):
|
||||
spec: Union[Type]|Type = self.pld_spec
|
||||
|
||||
# `typing.Union` case
|
||||
if getattr(spec, '__args__', False):
|
||||
return str(spec)
|
||||
|
||||
# just a single type
|
||||
else:
|
||||
return spec.__name__
|
||||
|
||||
|
@ -126,6 +241,7 @@ class MsgCodec(Struct):
|
|||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
@property
|
||||
def msg_spec(self) -> Union[Type[Struct]]:
|
||||
# NOTE: defined and applied inside `mk_codec()`
|
||||
return self._dec.type
|
||||
|
||||
def msg_spec_items(
|
||||
|
@ -150,31 +266,14 @@ class MsgCodec(Struct):
|
|||
def pformat_msg_spec(
|
||||
self,
|
||||
msg: MsgType|None = None,
|
||||
join_char: str = '\n',
|
||||
) -> str:
|
||||
return '\n'.join(
|
||||
return join_char.join(
|
||||
self.msg_spec_items(msg=msg).values()
|
||||
)
|
||||
|
||||
lib: ModuleType = msgspec
|
||||
|
||||
# TODO: a sub-decoder system as well?
|
||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||
# see related comments in `.msg.types`
|
||||
# _payload_decs: (
|
||||
# dict[
|
||||
# str,
|
||||
# msgpack.Decoder,
|
||||
# ]
|
||||
# |None
|
||||
# ) = None
|
||||
# OR
|
||||
# ) = {
|
||||
# # pre-seed decoders for std-py-type-set for use when
|
||||
# # `MsgType.pld == None|Any`.
|
||||
# None: msgpack.Decoder(Any),
|
||||
# Any: msgpack.Decoder(Any),
|
||||
# }
|
||||
|
||||
# TODO: use `functools.cached_property` for these ?
|
||||
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
||||
@property
|
||||
|
@ -210,7 +309,25 @@ class MsgCodec(Struct):
|
|||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||
return self._dec.decode(msg)
|
||||
|
||||
# TODO: do we still want to try and support the sub-decoder with
|
||||
# TODO: a sub-decoder system as well?
|
||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||
# see related comments in `.msg.types`
|
||||
# _payload_decs: (
|
||||
# dict[
|
||||
# str,
|
||||
# msgpack.Decoder,
|
||||
# ]
|
||||
# |None
|
||||
# ) = None
|
||||
# OR
|
||||
# ) = {
|
||||
# # pre-seed decoders for std-py-type-set for use when
|
||||
# # `MsgType.pld == None|Any`.
|
||||
# None: msgpack.Decoder(Any),
|
||||
# Any: msgpack.Decoder(Any),
|
||||
# }
|
||||
#
|
||||
# -[ ] do we still want to try and support the sub-decoder with
|
||||
# `.Raw` technique in the case that the `Generic` approach gives
|
||||
# future grief?
|
||||
#
|
||||
|
@ -398,18 +515,25 @@ def mk_codec(
|
|||
assert len(ipc_msg_spec.__args__) == len(msg_types)
|
||||
assert ipc_msg_spec
|
||||
|
||||
# TODO: use this shim instead?
|
||||
# bc.. unification, err somethin?
|
||||
# dec: MsgDec = mk_dec(
|
||||
# spec=ipc_msg_spec,
|
||||
# dec_hook=dec_hook,
|
||||
# )
|
||||
|
||||
dec = msgpack.Decoder(
|
||||
type=ipc_msg_spec,
|
||||
dec_hook=dec_hook,
|
||||
)
|
||||
enc = msgpack.Encoder(
|
||||
enc_hook=enc_hook,
|
||||
)
|
||||
dec = msgpack.Decoder(
|
||||
type=ipc_msg_spec, # like `Msg[Any]`
|
||||
dec_hook=dec_hook,
|
||||
)
|
||||
|
||||
codec = MsgCodec(
|
||||
_enc=enc,
|
||||
_dec=dec,
|
||||
pld_spec=ipc_pld_spec,
|
||||
_pld_spec=ipc_pld_spec,
|
||||
)
|
||||
|
||||
# sanity on expected backend support
|
||||
|
@ -428,7 +552,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
|||
# https://jcristharif.com/msgspec/supported-types.html
|
||||
#
|
||||
_def_tractor_codec: MsgCodec = mk_codec(
|
||||
ipc_pld_spec=Any,
|
||||
# TODO: use this for debug mode locking prot?
|
||||
# ipc_pld_spec=Any,
|
||||
ipc_pld_spec=Raw,
|
||||
)
|
||||
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
||||
# IPC msging codec used by the transport layer when doing
|
||||
|
@ -462,11 +588,9 @@ _def_tractor_codec: MsgCodec = mk_codec(
|
|||
|
||||
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||
_ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||
# _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||
'msgspec_codec',
|
||||
|
||||
# TODO: move this to our new `Msg`-spec!
|
||||
# default=_def_msgspec_codec,
|
||||
default=_def_tractor_codec,
|
||||
)
|
||||
|
||||
|
@ -475,23 +599,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
|
|||
def apply_codec(
|
||||
codec: MsgCodec,
|
||||
|
||||
ctx: Context|None = None,
|
||||
|
||||
) -> MsgCodec:
|
||||
'''
|
||||
Dynamically apply a `MsgCodec` to the current task's
|
||||
runtime context such that all IPC msgs are processed
|
||||
with it for that task.
|
||||
Dynamically apply a `MsgCodec` to the current task's runtime
|
||||
context such that all (of a certain class of payload
|
||||
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
|
||||
processed with it for that task.
|
||||
|
||||
Uses a `contextvars.ContextVar` to ensure the scope of any
|
||||
codec setting matches the current `Context` or
|
||||
`._rpc.process_messages()` feeder task's prior setting without
|
||||
mutating any surrounding scope.
|
||||
|
||||
When a `ctx` is supplied, only mod its `Context.pld_codec`.
|
||||
|
||||
Uses a `tricycle.TreeVar` to ensure the scope of the codec
|
||||
matches the `@cm` block and DOES NOT change to the original
|
||||
(default) value in new tasks (as it does for `ContextVar`).
|
||||
|
||||
See the docs:
|
||||
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
orig: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
if ctx is not None:
|
||||
var: ContextVar = ctx._var_pld_codec
|
||||
else:
|
||||
# use IPC channel-connection "global" codec
|
||||
var: ContextVar = _ctxvar_MsgCodec
|
||||
|
||||
orig: MsgCodec = var.get()
|
||||
|
||||
assert orig is not codec
|
||||
if codec.pld_spec is None:
|
||||
breakpoint()
|
||||
|
@ -500,26 +637,29 @@ def apply_codec(
|
|||
'Applying new msg-spec codec\n\n'
|
||||
f'{codec}\n'
|
||||
)
|
||||
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
|
||||
token: Token = var.set(codec)
|
||||
|
||||
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
|
||||
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||
# try:
|
||||
# ?TODO? for TreeVar approach which copies from the
|
||||
# cancel-scope of the prior value, NOT the prior task
|
||||
# See the docs:
|
||||
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||
# ^- see docs for @cm `.being()` API
|
||||
# with _ctxvar_MsgCodec.being(codec):
|
||||
# new = _ctxvar_MsgCodec.get()
|
||||
# assert new is codec
|
||||
# yield codec
|
||||
|
||||
try:
|
||||
yield _ctxvar_MsgCodec.get()
|
||||
yield var.get()
|
||||
finally:
|
||||
_ctxvar_MsgCodec.reset(token)
|
||||
|
||||
assert _ctxvar_MsgCodec.get() is orig
|
||||
var.reset(token)
|
||||
log.info(
|
||||
'Reverted to last msg-spec codec\n\n'
|
||||
f'{orig}\n'
|
||||
)
|
||||
assert var.get() is orig
|
||||
|
||||
|
||||
def current_codec() -> MsgCodec:
|
||||
'''
|
||||
|
@ -550,7 +690,7 @@ def limit_msg_spec(
|
|||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
curr_codec = current_codec()
|
||||
curr_codec: MsgCodec = current_codec()
|
||||
msgspec_codec: MsgCodec = mk_codec(
|
||||
ipc_pld_spec=payload_spec,
|
||||
**codec_kwargs,
|
||||
|
|
|
@ -38,6 +38,7 @@ from typing import (
|
|||
from msgspec import (
|
||||
defstruct,
|
||||
# field,
|
||||
Raw,
|
||||
Struct,
|
||||
# UNSET,
|
||||
# UnsetType,
|
||||
|
@ -105,7 +106,7 @@ class Msg(
|
|||
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
|
||||
# approach is preferred over the generic parameterization
|
||||
# approach as take by `mk_msg_spec()` below.
|
||||
pld: PayloadT
|
||||
pld: PayloadT|Raw
|
||||
|
||||
|
||||
class Aid(
|
||||
|
@ -265,35 +266,7 @@ class Start(
|
|||
|
||||
# TODO: enforcing a msg-spec in terms `Msg.pld`
|
||||
# parameterizable msgs to be used in the appls IPC dialog.
|
||||
#
|
||||
# -[ ] both as part of the `.open_context()` call AND as part of the
|
||||
# immediate ack-reponse (see similar below)
|
||||
# we should do spec matching and fail if anything is awry?
|
||||
#
|
||||
# -[ ] eventually spec should be generated/parsed from the
|
||||
# type-annots as # desired in GH issue:
|
||||
# https://github.com/goodboy/tractor/issues/365
|
||||
#
|
||||
# -[ ] semantics of the mismatch case
|
||||
# - when caller-callee specs we should raise
|
||||
# a `MsgTypeError` or `MsgSpecError` or similar?
|
||||
#
|
||||
# -[ ] wrapper types for both spec types such that we can easily
|
||||
# IPC transport them?
|
||||
# - `TypeSpec: Union[Type]`
|
||||
# * also a `.__contains__()` for doing `None in
|
||||
# TypeSpec[None|int]` since rn you need to do it on
|
||||
# `.__args__` for unions..
|
||||
# - `MsgSpec: Union[Type[Msg]]
|
||||
#
|
||||
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
|
||||
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
|
||||
# |_ historical pep 695: https://peps.python.org/pep-0695/
|
||||
# |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/
|
||||
# |_ on annotation scopes:
|
||||
# https://docs.python.org/3/reference/executionmodel.html#annotation-scopes
|
||||
# |_ 3.13 will have subscriptable funcs Bo
|
||||
# https://peps.python.org/pep-0718/
|
||||
# => SEE `._codec.MsgDec` for more <=
|
||||
pld_spec: str = str(Any)
|
||||
|
||||
|
||||
|
@ -332,7 +305,7 @@ class Started(
|
|||
decorated IPC endpoint.
|
||||
|
||||
'''
|
||||
pld: PayloadT
|
||||
pld: PayloadT|Raw
|
||||
|
||||
|
||||
# TODO: instead of using our existing `Start`
|
||||
|
@ -349,7 +322,7 @@ class Yield(
|
|||
Per IPC transmission of a value from `await MsgStream.send(<value>)`.
|
||||
|
||||
'''
|
||||
pld: PayloadT
|
||||
pld: PayloadT|Raw
|
||||
|
||||
|
||||
class Stop(
|
||||
|
@ -377,11 +350,12 @@ class Return(
|
|||
func-as-`trio.Task`.
|
||||
|
||||
'''
|
||||
pld: PayloadT
|
||||
pld: PayloadT|Raw
|
||||
|
||||
|
||||
class CancelAck(
|
||||
Return,
|
||||
Msg,
|
||||
Generic[PayloadT],
|
||||
):
|
||||
'''
|
||||
Deliver the `bool` return-value from a cancellation `Actor`
|
||||
|
@ -710,7 +684,9 @@ def mk_msg_spec(
|
|||
)
|
||||
return (
|
||||
ipc_spec,
|
||||
msgtypes_table[spec_build_method] + ipc_msg_types,
|
||||
msgtypes_table[spec_build_method]
|
||||
+
|
||||
ipc_msg_types,
|
||||
)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue