Compare commits

...

5 Commits

Author SHA1 Message Date
Tyler Goodlet a51632ffa6 Add a `MsgDec` for receive-only decoding
In prep for a "payload receiver" abstraction that will wrap
`MsgType.pld`-IO delivery from `Context` and `MsgStream`, adds a small
`msgspec.msgpack.Decoder` shim which delegates an API similar to
`MsgCodec` and is offered via a `.msg._codec.mk_dec()` factory.

Detalles:
- move over the TODOs/comments from `.msg.types.Start` to to
  `MsgDec.spec` since it's probably the ideal spot to start thinking
  about it from a consumer code PoV.
- move codec reversion assert and log emit into `finally:` block.
- flip default `.types._tractor_codec = mk_codec_ipc_pld(ipc_pld_spec=Raw)`
  in prep for always doing payload-delayed decodes.
- make `MsgCodec._dec` private with public property getter.
- change `CancelAck` to NOT derive from `Return` so it's mutex in
  `match/case:` handling.
2024-04-22 19:29:30 -04:00
Tyler Goodlet 0df7d557db Move `MsgTypeError` maker func to `._exceptions`
Since it's going to be used from the IPC primitive APIs
(`Context`/`MsgStream`) for similarly handling payload type spec
validation errors and bc it's really not well situation in the IPC
module XD

Summary of (impl) tweaks:
- obvi move `_mk_msg_type_err()` and import and use it in `._ipc`; ends
  up avoiding a lot of ad-hoc imports we had from `._exceptions` anyway!
- mask out "new codec" runtime log emission from `MsgpackTCPStream`.
- allow passing a (coming in next commit) `codec: MsgDec` (message
  decoder) which supports the same required `.pld_spec_str: str` attr.
- for send side logging use existing `MsgCodec..pformat_msg_spec()`.
- rename `_raise_from_no_key_in_msg()` to the now more appropriate
  `_raise_from_unexpected_msg()`, but leaving alias for now.
2024-04-22 18:24:02 -04:00
Tyler Goodlet 7b020c42cc Drop more `dict`-msg cruft from `._exceptions` 2024-04-21 17:08:27 -04:00
Tyler Goodlet d18cf32e28 Mark `.pld` msgs as also taking `msgspec.Raw` 2024-04-21 17:02:39 -04:00
Tyler Goodlet dd6a4d49d8 Go back to `ContextVar` for codec mgmt
Turns out we do want per-task inheritance particularly if there's to be
per `Context` dynamic mutation of the spec; we don't want mutation in
some task to affect any parent/global setting.

Turns out since we use a common "feeder task" in the rpc loop, we need to
offer a per `Context` payload decoder sys anyway in order to enable
per-task controls for inter-actor multi-task-ctx scenarios.
2024-04-18 16:34:14 -04:00
5 changed files with 359 additions and 235 deletions

View File

@ -24,6 +24,7 @@ import importlib
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -32,8 +33,11 @@ import traceback
import trio import trio
from msgspec import ( from msgspec import (
structs,
defstruct, defstruct,
msgpack,
Raw,
structs,
ValidationError,
) )
from tractor._state import current_actor from tractor._state import current_actor
@ -44,6 +48,8 @@ from tractor.msg import (
Stop, Stop,
Yield, Yield,
types as msgtypes, types as msgtypes,
MsgCodec,
MsgDec,
) )
from tractor.msg.pretty_struct import ( from tractor.msg.pretty_struct import (
iter_fields, iter_fields,
@ -170,7 +176,7 @@ def pformat_boxed_tb(
f' ------ - ------\n\n' f' ------ - ------\n\n'
# f'{tb_str}\n' # f'{tb_str}\n'
f'{tb_body}' f'{tb_body}'
f' ------ - ------\n' f' ------ - ------\n'
f'_|\n' f'_|\n'
) )
tb_box_indent: str = ( tb_box_indent: str = (
@ -932,7 +938,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
return False return False
def _raise_from_no_key_in_msg( def _raise_from_unexpected_msg(
ctx: Context, ctx: Context,
msg: MsgType, msg: MsgType,
src_err: AttributeError, src_err: AttributeError,
@ -972,8 +978,6 @@ def _raise_from_no_key_in_msg(
# an internal error should never get here # an internal error should never get here
try: try:
cid: str = msg.cid cid: str = msg.cid
# cid: str = msg['cid']
# except KeyError as src_err:
except AttributeError as src_err: except AttributeError as src_err:
raise MessagingError( raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
@ -985,7 +989,6 @@ def _raise_from_no_key_in_msg(
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
# raise the error message in a boxed exception type! # raise the error message in a boxed exception type!
# if msg.get('error'):
if isinstance(msg, Error): if isinstance(msg, Error):
# match msg: # match msg:
# case Error(): # case Error():
@ -1001,7 +1004,6 @@ def _raise_from_no_key_in_msg(
# the stream._eoc outside this in the calleer always? # the stream._eoc outside this in the calleer always?
# case Stop(): # case Stop():
elif ( elif (
# msg.get('stop')
isinstance(msg, Stop) isinstance(msg, Stop)
or ( or (
stream stream
@ -1036,7 +1038,6 @@ def _raise_from_no_key_in_msg(
# that arrived which is probably the source of this stream # that arrived which is probably the source of this stream
# closure # closure
ctx.maybe_raise() ctx.maybe_raise()
raise eoc from src_err raise eoc from src_err
if ( if (
@ -1056,3 +1057,128 @@ def _raise_from_no_key_in_msg(
" BUT received a non-error msg:\n" " BUT received a non-error msg:\n"
f'{pformat(msg)}' f'{pformat(msg)}'
) from src_err ) from src_err
_raise_from_no_key_in_msg = _raise_from_unexpected_msg
def _mk_msg_type_err(
msg: Any|bytes|Raw,
codec: MsgCodec|MsgDec,
message: str|None = None,
verb_header: str = '',
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None,
) -> MsgTypeError:
'''
Compose a `MsgTypeError` from an input runtime context.
'''
# `Channel.send()` case
if src_validation_error is None:
if isinstance(codec, MsgDec):
raise RuntimeError(
'`codec` must be a `MsgCodec` for send-side errors?'
)
# no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part.
if message is None:
fmt_spec: str = codec.pformat_msg_spec()
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ',
indent='',
)
message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n',
)
elif src_type_error:
src_message: str = str(src_type_error)
patt: str = 'type '
type_idx: int = src_message.find('type ')
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
enc_hook: Callable|None = codec.enc.enc_hook
if enc_hook is None:
message += (
'\n\n'
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
f'Check the `msgspec` docs for ad-hoc type extending:\n'
'|_ https://jcristharif.com/msgspec/extending.html\n'
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
)
msgtyperr = MsgTypeError(
message=message,
ipc_msg=msg,
)
# ya, might be `None`
msgtyperr.__cause__ = src_type_error
return msgtyperr
# `Channel.recv()` case
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
message = f'{verb_header} ' + message
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
msgtyperr = MsgTypeError.from_decode(
message=message,
msgdict=msg_dict,
)
msgtyperr.__cause__ = src_validation_error
return msgtyperr

View File

@ -49,6 +49,7 @@ from tractor._exceptions import (
MsgTypeError, MsgTypeError,
pack_from_raise, pack_from_raise,
TransportClosed, TransportClosed,
_mk_msg_type_err,
) )
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
@ -118,127 +119,6 @@ class MsgTransport(Protocol[MsgType]):
... ...
def _mk_msg_type_err(
msg: Any|bytes,
codec: MsgCodec,
message: str|None = None,
verb_header: str = '',
src_validation_error: msgspec.ValidationError|None = None,
src_type_error: TypeError|None = None,
) -> MsgTypeError:
import textwrap
# `Channel.send()` case
if src_validation_error is None: # send-side
# no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part.
if message is None:
import traceback
from tractor._exceptions import pformat_boxed_tb
fmt_spec: str = '\n'.join(
map(str, codec.msg_spec.__args__)
)
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ',
indent='',
)
message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n',
)
elif src_type_error:
src_message: str = str(src_type_error)
patt: str = 'type '
type_idx: int = src_message.find('type ')
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
enc_hook: Callable|None = codec.enc.enc_hook
if enc_hook is None:
message += (
'\n\n'
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
f'Check the `msgspec` docs for ad-hoc type extending:\n'
'|_ https://jcristharif.com/msgspec/extending.html\n'
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
)
msgtyperr = MsgTypeError(
message=message,
ipc_msg=msg,
)
# ya, might be `None`
msgtyperr.__cause__ = src_type_error
return msgtyperr
# `Channel.recv()` case
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgspec.msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
message = f'{verb_header} ' + message
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
msgtyperr = MsgTypeError.from_decode(
message=message,
msgdict=msg_dict,
)
msgtyperr.__cause__ = src_validation_error
return msgtyperr
# TODO: not sure why we have to inherit here, but it seems to be an # TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue? # probably should make a `mypy` issue?
@ -299,10 +179,10 @@ class MsgpackTCPStream(MsgTransport):
_codec._ctxvar_MsgCodec.get() _codec._ctxvar_MsgCodec.get()
) )
# TODO: mask out before release? # TODO: mask out before release?
log.runtime( # log.runtime(
f'New {self} created with codec\n' # f'New {self} created with codec\n'
f'codec: {self._codec}\n' # f'codec: {self._codec}\n'
) # )
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
''' '''

View File

@ -34,6 +34,7 @@ from ._codec import (
apply_codec as apply_codec, apply_codec as apply_codec,
mk_codec as mk_codec, mk_codec as mk_codec,
MsgCodec as MsgCodec, MsgCodec as MsgCodec,
MsgDec as MsgDec,
current_codec as current_codec, current_codec as current_codec,
) )
@ -50,6 +51,7 @@ from .types import (
Yield as Yield, Yield as Yield,
Stop as Stop, Stop as Stop,
Return as Return, Return as Return,
CancelAck as CancelAck,
Error as Error, Error as Error,

View File

@ -33,25 +33,29 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
contextmanager as cm, contextmanager as cm,
) )
# from contextvars import ( from contextvars import (
# ContextVar, ContextVar,
# Token, Token,
# ) )
import textwrap import textwrap
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Type, Type,
TYPE_CHECKING,
Union, Union,
) )
from types import ModuleType from types import ModuleType
import msgspec import msgspec
from msgspec import msgpack from msgspec import (
from trio.lowlevel import ( msgpack,
RunVar, Raw,
RunVarToken,
) )
# from trio.lowlevel import (
# RunVar,
# RunVarToken,
# )
# TODO: see notes below from @mikenerone.. # TODO: see notes below from @mikenerone..
# from tricycle import TreeVar # from tricycle import TreeVar
@ -62,8 +66,113 @@ from tractor.msg.types import (
) )
from tractor.log import get_logger from tractor.log import get_logger
if TYPE_CHECKING:
from tractor._context import Context
log = get_logger(__name__) log = get_logger(__name__)
# TODO: unify with `MsgCodec` by making `._dec` part this?
class MsgDec(Struct):
'''
An IPC msg decoder.
Normally used to decode only a payload: `MsgType.pld:
PayloadT` field before delivery to IPC consumer code.
'''
_dec: msgpack.Decoder
@property
def dec(self) -> msgpack.Decoder:
return self._dec
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
#
# ^-TODO-^: make a wrapper type for this such that alt
# backends can be represented easily without a `Union` needed,
# AND so that we have better support for wire transport.
#
# -[ ] maybe `FieldSpec` is a good name since msg-spec
# better applies to a `MsgType[FieldSpec]`?
#
# -[ ] both as part of the `.open_context()` call AND as part of the
# immediate ack-reponse (see similar below)
# we should do spec matching and fail if anything is awry?
#
# -[ ] eventually spec should be generated/parsed from the
# type-annots as # desired in GH issue:
# https://github.com/goodboy/tractor/issues/365
#
# -[ ] semantics of the mismatch case
# - when caller-callee specs we should raise
# a `MsgTypeError` or `MsgSpecError` or similar?
#
# -[ ] wrapper types for both spec types such that we can easily
# IPC transport them?
# - `TypeSpec: Union[Type]`
# * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions..
# - `MsgSpec: Union[Type[Msg]]
#
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
# |_ historical pep 695: https://peps.python.org/pep-0695/
# |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/
# |_ on annotation scopes:
# https://docs.python.org/3/reference/executionmodel.html#annotation-scopes
# |_ 3.13 will have subscriptable funcs Bo
# https://peps.python.org/pep-0718/
@property
def spec(self) -> Union[Type[Struct]]:
# NOTE: defined and applied inside `mk_codec()`
return self._dec.type
# no difference, as compared to a `MsgCodec` which defines the
# `MsgType.pld: PayloadT` part of its spec separately
pld_spec = spec
# TODO: would get moved into `FieldSpec.__str__()` right?
@property
def spec_str(self) -> str:
# TODO: could also use match: instead?
spec: Union[Type]|Type = self.spec
# `typing.Union` case
if getattr(spec, '__args__', False):
return str(spec)
# just a single type
else:
return spec.__name__
pld_spec_str = spec_str
def decode(
self,
raw: Raw|bytes,
) -> Any:
return self._dec.decode(raw)
@property
def hook(self) -> Callable|None:
return self._dec.dec_hook
def mk_dec(
spec: Union[Type[Struct]]|Any = Any,
dec_hook: Callable|None = None,
) -> MsgDec:
return msgpack.Decoder(
type=spec, # like `Msg[Any]`
dec_hook=dec_hook,
)
# TODO: overall IPC msg-spec features (i.e. in this mod)! # TODO: overall IPC msg-spec features (i.e. in this mod)!
# #
# -[ ] API changes towards being interchange lib agnostic! # -[ ] API changes towards being interchange lib agnostic!
@ -87,8 +196,7 @@ class MsgCodec(Struct):
''' '''
_enc: msgpack.Encoder _enc: msgpack.Encoder
_dec: msgpack.Decoder _dec: msgpack.Decoder
_pld_spec: Type[Struct]|Raw|Any
pld_spec: Union[Type[Struct]]|None
def __repr__(self) -> str: def __repr__(self) -> str:
speclines: str = textwrap.indent( speclines: str = textwrap.indent(
@ -111,14 +219,21 @@ class MsgCodec(Struct):
')>' ')>'
) )
@property
def pld_spec(self) -> Type[Struct]|Raw|Any:
return self._pld_spec
@property @property
def pld_spec_str(self) -> str: def pld_spec_str(self) -> str:
spec: Union[Type]|Type = self.pld_spec
# TODO: could also use match: instead? # TODO: could also use match: instead?
spec: Union[Type]|Type = self.pld_spec
# `typing.Union` case
if getattr(spec, '__args__', False): if getattr(spec, '__args__', False):
# `typing.Union` case
return str(spec) return str(spec)
# just a single type
else: else:
return spec.__name__ return spec.__name__
@ -126,6 +241,7 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
@property @property
def msg_spec(self) -> Union[Type[Struct]]: def msg_spec(self) -> Union[Type[Struct]]:
# NOTE: defined and applied inside `mk_codec()`
return self._dec.type return self._dec.type
def msg_spec_items( def msg_spec_items(
@ -150,31 +266,14 @@ class MsgCodec(Struct):
def pformat_msg_spec( def pformat_msg_spec(
self, self,
msg: MsgType|None = None, msg: MsgType|None = None,
join_char: str = '\n',
) -> str: ) -> str:
return '\n'.join( return join_char.join(
self.msg_spec_items(msg=msg).values() self.msg_spec_items(msg=msg).values()
) )
lib: ModuleType = msgspec lib: ModuleType = msgspec
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# TODO: use `functools.cached_property` for these ? # TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property # https://docs.python.org/3/library/functools.html#functools.cached_property
@property @property
@ -210,7 +309,25 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/usage.html#typed-decoding # https://jcristharif.com/msgspec/usage.html#typed-decoding
return self._dec.decode(msg) return self._dec.decode(msg)
# TODO: do we still want to try and support the sub-decoder with # TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
#
# -[ ] do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives # `.Raw` technique in the case that the `Generic` approach gives
# future grief? # future grief?
# #
@ -398,18 +515,25 @@ def mk_codec(
assert len(ipc_msg_spec.__args__) == len(msg_types) assert len(ipc_msg_spec.__args__) == len(msg_types)
assert ipc_msg_spec assert ipc_msg_spec
# TODO: use this shim instead?
# bc.. unification, err somethin?
# dec: MsgDec = mk_dec(
# spec=ipc_msg_spec,
# dec_hook=dec_hook,
# )
dec = msgpack.Decoder(
type=ipc_msg_spec,
dec_hook=dec_hook,
)
enc = msgpack.Encoder( enc = msgpack.Encoder(
enc_hook=enc_hook, enc_hook=enc_hook,
) )
dec = msgpack.Decoder(
type=ipc_msg_spec, # like `Msg[Any]`
dec_hook=dec_hook,
)
codec = MsgCodec( codec = MsgCodec(
_enc=enc, _enc=enc,
_dec=dec, _dec=dec,
pld_spec=ipc_pld_spec, _pld_spec=ipc_pld_spec,
) )
# sanity on expected backend support # sanity on expected backend support
@ -428,7 +552,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# https://jcristharif.com/msgspec/supported-types.html # https://jcristharif.com/msgspec/supported-types.html
# #
_def_tractor_codec: MsgCodec = mk_codec( _def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any, # TODO: use this for debug mode locking prot?
# ipc_pld_spec=Any,
ipc_pld_spec=Raw,
) )
# TODO: IDEALLY provides for per-`trio.Task` specificity of the # TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
@ -462,11 +588,9 @@ _def_tractor_codec: MsgCodec = mk_codec(
# TODO: STOP USING THIS, since it's basically a global and won't # TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired.. # allow sub-IPC-ctxs to limit the msg-spec however desired..
_ctxvar_MsgCodec: MsgCodec = RunVar( # _ctxvar_MsgCodec: MsgCodec = RunVar(
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
# default=_def_msgspec_codec,
default=_def_tractor_codec, default=_def_tractor_codec,
) )
@ -475,23 +599,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
def apply_codec( def apply_codec(
codec: MsgCodec, codec: MsgCodec,
ctx: Context|None = None,
) -> MsgCodec: ) -> MsgCodec:
''' '''
Dynamically apply a `MsgCodec` to the current task's Dynamically apply a `MsgCodec` to the current task's runtime
runtime context such that all IPC msgs are processed context such that all (of a certain class of payload
with it for that task. containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
processed with it for that task.
Uses a `contextvars.ContextVar` to ensure the scope of any
codec setting matches the current `Context` or
`._rpc.process_messages()` feeder task's prior setting without
mutating any surrounding scope.
When a `ctx` is supplied, only mod its `Context.pld_codec`.
Uses a `tricycle.TreeVar` to ensure the scope of the codec
matches the `@cm` block and DOES NOT change to the original matches the `@cm` block and DOES NOT change to the original
(default) value in new tasks (as it does for `ContextVar`). (default) value in new tasks (as it does for `ContextVar`).
See the docs:
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
orig: MsgCodec = _ctxvar_MsgCodec.get()
if ctx is not None:
var: ContextVar = ctx._var_pld_codec
else:
# use IPC channel-connection "global" codec
var: ContextVar = _ctxvar_MsgCodec
orig: MsgCodec = var.get()
assert orig is not codec assert orig is not codec
if codec.pld_spec is None: if codec.pld_spec is None:
breakpoint() breakpoint()
@ -500,26 +637,29 @@ def apply_codec(
'Applying new msg-spec codec\n\n' 'Applying new msg-spec codec\n\n'
f'{codec}\n' f'{codec}\n'
) )
token: RunVarToken = _ctxvar_MsgCodec.set(codec) token: Token = var.set(codec)
# TODO: for TreeVar approach, see docs for @cm `.being()` API: # ?TODO? for TreeVar approach which copies from the
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables # cancel-scope of the prior value, NOT the prior task
# try: # See the docs:
# with _ctxvar_MsgCodec.being(codec): # - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# new = _ctxvar_MsgCodec.get() # - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
# assert new is codec # ^- see docs for @cm `.being()` API
# yield codec # with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
try: try:
yield _ctxvar_MsgCodec.get() yield var.get()
finally: finally:
_ctxvar_MsgCodec.reset(token) var.reset(token)
log.info(
'Reverted to last msg-spec codec\n\n'
f'{orig}\n'
)
assert var.get() is orig
assert _ctxvar_MsgCodec.get() is orig
log.info(
'Reverted to last msg-spec codec\n\n'
f'{orig}\n'
)
def current_codec() -> MsgCodec: def current_codec() -> MsgCodec:
''' '''
@ -550,7 +690,7 @@ def limit_msg_spec(
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
curr_codec = current_codec() curr_codec: MsgCodec = current_codec()
msgspec_codec: MsgCodec = mk_codec( msgspec_codec: MsgCodec = mk_codec(
ipc_pld_spec=payload_spec, ipc_pld_spec=payload_spec,
**codec_kwargs, **codec_kwargs,

View File

@ -38,6 +38,7 @@ from typing import (
from msgspec import ( from msgspec import (
defstruct, defstruct,
# field, # field,
Raw,
Struct, Struct,
# UNSET, # UNSET,
# UnsetType, # UnsetType,
@ -105,7 +106,7 @@ class Msg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders # TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization # approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below. # approach as take by `mk_msg_spec()` below.
pld: PayloadT pld: PayloadT|Raw
class Aid( class Aid(
@ -265,35 +266,7 @@ class Start(
# TODO: enforcing a msg-spec in terms `Msg.pld` # TODO: enforcing a msg-spec in terms `Msg.pld`
# parameterizable msgs to be used in the appls IPC dialog. # parameterizable msgs to be used in the appls IPC dialog.
# # => SEE `._codec.MsgDec` for more <=
# -[ ] both as part of the `.open_context()` call AND as part of the
# immediate ack-reponse (see similar below)
# we should do spec matching and fail if anything is awry?
#
# -[ ] eventually spec should be generated/parsed from the
# type-annots as # desired in GH issue:
# https://github.com/goodboy/tractor/issues/365
#
# -[ ] semantics of the mismatch case
# - when caller-callee specs we should raise
# a `MsgTypeError` or `MsgSpecError` or similar?
#
# -[ ] wrapper types for both spec types such that we can easily
# IPC transport them?
# - `TypeSpec: Union[Type]`
# * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions..
# - `MsgSpec: Union[Type[Msg]]
#
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
# |_ historical pep 695: https://peps.python.org/pep-0695/
# |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/
# |_ on annotation scopes:
# https://docs.python.org/3/reference/executionmodel.html#annotation-scopes
# |_ 3.13 will have subscriptable funcs Bo
# https://peps.python.org/pep-0718/
pld_spec: str = str(Any) pld_spec: str = str(Any)
@ -332,7 +305,7 @@ class Started(
decorated IPC endpoint. decorated IPC endpoint.
''' '''
pld: PayloadT pld: PayloadT|Raw
# TODO: instead of using our existing `Start` # TODO: instead of using our existing `Start`
@ -349,7 +322,7 @@ class Yield(
Per IPC transmission of a value from `await MsgStream.send(<value>)`. Per IPC transmission of a value from `await MsgStream.send(<value>)`.
''' '''
pld: PayloadT pld: PayloadT|Raw
class Stop( class Stop(
@ -377,11 +350,12 @@ class Return(
func-as-`trio.Task`. func-as-`trio.Task`.
''' '''
pld: PayloadT pld: PayloadT|Raw
class CancelAck( class CancelAck(
Return, Msg,
Generic[PayloadT],
): ):
''' '''
Deliver the `bool` return-value from a cancellation `Actor` Deliver the `bool` return-value from a cancellation `Actor`
@ -710,7 +684,9 @@ def mk_msg_spec(
) )
return ( return (
ipc_spec, ipc_spec,
msgtypes_table[spec_build_method] + ipc_msg_types, msgtypes_table[spec_build_method]
+
ipc_msg_types,
) )