Drop `MsgCodec.decoder()/.encoder()` design

Instead just instantiate `msgpack.Encoder/Decoder` instances inside
`mk_codec()` and assign them directly as `._enc/._dec` fields.
Explicitly take in named-args to both and proxy to the coder/decoder
instantiation calls directly.

Shuffling some codec internals:
- rename `mk_codec()` inputs as `ipc_msg_spec` and `ipc_pld_spec`, make
  them mutex such that a payload type spec can't be passed if the
  built-in msg-spec isn't used.
  => expose `MsgCodec.ipc_pld_spec` directly from `._dec.type`
  => presume input `ipc_msg_spec` is `Any` by default when no
    `ipc_pld_spec` is passed since we have no way atm to enable
    a similar type-restricted-payload feature without a wrapping
    "shuttle protocol" ;)

- move all the payload-sub-decoders stuff prototyped in GH#311
  (inside `.types`) to `._codec` as commented-for-later-maybe `MsgCodec`
  methods including:
  - `.mk_pld_subdec()` for registering
  - `.enc/dec_payload()` for sub-codec field loading.

- also comment out `._codec.mk_tagged_union_dec()` as the orig
  tag-to-decoder table factory, now mostly superseded by
  `.types.mk_msg_spec()` which takes the generic parameterizing approach
  instead.

- change naming to `types.mk_msg_spec(payload_type_union)` input, making
  it more explicit that it expects a `Union[Type]`.

Oh right, and start exposing all the `.types.Msg` subtypes in the `.msg`
subpkg in prep for usage throughout the runtime B)
msg_codecs
Tyler Goodlet 2024-03-29 12:46:59 -04:00
parent 8ff18739be
commit b6ed26589a
3 changed files with 297 additions and 247 deletions

View File

@ -33,3 +33,40 @@ from ._codec import (
MsgCodec as MsgCodec, MsgCodec as MsgCodec,
current_msgspec_codec as current_msgspec_codec, current_msgspec_codec as current_msgspec_codec,
) )
from .types import (
Msg as Msg,
Start, # with pld
FuncSpec as FuncSpec,
StartAck, # with pld
IpcCtxSpec as IpcCtxSpec,
Started,
Yield,
Stop,
Return,
Error, # with pld
ErrorData as ErrorData
)
# built-in SC shuttle protocol msg type set in
# approx order of the IPC txn-state spaces.
__spec__: list[Msg] = [
# inter-actor RPC initiation
Start,
StartAck,
# no-outcome-yet IAC (inter-actor-communication)
Started,
Yield,
Stop,
# termination outcomes
Return,
Error,
]

View File

@ -29,6 +29,7 @@ ToDo: backends we prolly should offer:
- https://capnproto.org/language.html#language-reference - https://capnproto.org/language.html#language-reference
''' '''
from __future__ import annotations
from contextvars import ( from contextvars import (
ContextVar, ContextVar,
Token, Token,
@ -54,18 +55,36 @@ from tractor.msg.types import (
) )
# TODO: API changes towards being interchange lib agnostic! # TODO: overall IPC msg-spec features (i.e. in this mod)!
# #
# -[ ] capnproto has pre-compiled schema for eg.. # -[ ] API changes towards being interchange lib agnostic!
# * https://capnproto.org/language.html # -[ ] capnproto has pre-compiled schema for eg..
# * http://capnproto.github.io/pycapnp/quickstart.html # * https://capnproto.org/language.html
# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp # * http://capnproto.github.io/pycapnp/quickstart.html
# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp
#
# -[ ] struct aware messaging coders as per:
# -[x] https://github.com/goodboy/tractor/issues/36
# -[ ] https://github.com/goodboy/tractor/issues/196
# -[ ] https://github.com/goodboy/tractor/issues/365
# #
class MsgCodec(Struct): class MsgCodec(Struct):
''' '''
A IPC msg interchange format lib's encoder + decoder pair. A IPC msg interchange format lib's encoder + decoder pair.
''' '''
# post-configure-cached when prop-accessed (see `mk_codec()`
# OR can be passed directly as,
# `MsgCodec(_enc=<Encoder>, _dec=<Decoder>)`
_enc: msgpack.Encoder|None = None
_dec: msgpack.Decoder|None = None
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
@property
def ipc_pld_spec(self) -> Union[Type[Struct]]:
return self._dec.type
lib: ModuleType = msgspec lib: ModuleType = msgspec
# ad-hoc type extensions # ad-hoc type extensions
@ -73,16 +92,8 @@ class MsgCodec(Struct):
enc_hook: Callable[[Any], Any]|None = None # coder enc_hook: Callable[[Any], Any]|None = None # coder
dec_hook: Callable[[type, Any], Any]|None = None # decoder dec_hook: Callable[[type, Any], Any]|None = None # decoder
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
ipc_msg_spec: Union[Type[Struct]]|Any = Any
payload_msg_spec: Union[Type[Struct]] = Any
# post-configure cached props
_enc: msgpack.Encoder|None = None
_dec: msgpack.Decoder|None = None
# TODO: a sub-decoder system as well? # TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types` # see related comments in `.msg.types`
# _payload_decs: ( # _payload_decs: (
# dict[ # dict[
@ -91,42 +102,18 @@ class MsgCodec(Struct):
# ] # ]
# |None # |None
# ) = None # ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `Msg.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# TODO: use `functools.cached_property` for these ? # TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property # https://docs.python.org/3/library/functools.html#functools.cached_property
@property @property
def enc(self) -> msgpack.Encoder: def enc(self) -> msgpack.Encoder:
return self._enc or self.encoder()
def encoder(
self,
enc_hook: Callable|None = None,
reset: bool = False,
# TODO: what's the default for this, and do we care?
# write_buffer_size: int
#
**kwargs,
) -> msgpack.Encoder:
'''
Set or get the maybe-cached `msgspec.msgpack.Encoder`
instance configured for this codec.
When `reset=True` any previously configured encoder will
be recreated and then cached with the new settings passed
as input.
'''
if (
self._enc is None
or reset
):
self._enc = self.lib.msgpack.Encoder(
enc_hook=enc_hook or self.enc_hook,
# write_buffer_size=write_buffer_size,
)
return self._enc return self._enc
def encode( def encode(
@ -139,40 +126,10 @@ class MsgCodec(Struct):
on a tranport protocol connection. on a tranport protocol connection.
''' '''
return self.enc.encode(py_obj) return self._enc.encode(py_obj)
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._dec or self.decoder()
def decoder(
self,
ipc_msg_spec: Union[Type[Struct]]|None = None,
dec_hook: Callable|None = None,
reset: bool = False,
**kwargs,
# ext_hook: ext_hook_sig
) -> msgpack.Decoder:
'''
Set or get the maybe-cached `msgspec.msgpack.Decoder`
instance configured for this codec.
When `reset=True` any previously configured decoder will
be recreated and then cached with the new settings passed
as input.
'''
if (
self._dec is None
or reset
):
self._dec = self.lib.msgpack.Decoder(
type=ipc_msg_spec or self.ipc_msg_spec,
dec_hook=dec_hook or self.dec_hook,
**kwargs,
)
return self._dec return self._dec
def decode( def decode(
@ -185,60 +142,165 @@ class MsgCodec(Struct):
determined by the determined by the
''' '''
return self.dec.decode(msg) return self._dec.decode(msg)
# TODO: do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives
# future grief?
#
# -[ ] <NEW-ISSUE-FOR-ThIS-HERE>
#
#def mk_pld_subdec(
# self,
# payload_types: Union[Type[Struct]],
#) -> msgpack.Decoder:
# # TODO: sub-decoder suppor for `.pld: Raw`?
# # => see similar notes inside `.msg.types`..
# #
# # not sure we'll end up needing this though it might have
# # unforeseen advantages in terms of enabling encrypted
# # appliciation layer (only) payloads?
# #
# # register sub-payload decoders to load `.pld: Raw`
# # decoded `Msg`-packets using a dynamic lookup (table)
# # instead of a pre-defined msg-spec via `Generic`
# # parameterization.
# #
# (
# tags,
# payload_dec,
# ) = mk_tagged_union_dec(
# tagged_structs=list(payload_types.__args__),
# )
# # register sub-decoders by tag
# subdecs: dict[str, msgpack.Decoder]|None = self._payload_decs
# for name in tags:
# subdecs.setdefault(
# name,
# payload_dec,
# )
# return payload_dec
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
# def dec_payload(
# codec: MsgCodec,
# msg: Msg,
# ) -> Any|Struct:
# msg: Msg = codec.dec.decode(msg)
# payload_tag: str = msg.header.payload_tag
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
# return payload_dec.decode(msg.pld)
# def enc_payload(
# codec: MsgCodec,
# payload: Any,
# cid: str,
# ) -> bytes:
# # tag_field: str|None = None
# plbytes = codec.enc.encode(payload)
# if b'msg_type' in plbytes:
# assert isinstance(payload, Struct)
# # tag_field: str = type(payload).__name__
# payload = msgspec.Raw(plbytes)
# msg = Msg(
# cid=cid,
# pld=payload,
# # Header(
# # payload_tag=tag_field,
# # # dialog_id,
# # ),
# )
# return codec.enc.encode(msg)
def mk_tagged_union_dec( #def mk_tagged_union_dec(
tagged_structs: list[Struct], # tagged_structs: list[Struct],
) -> tuple[ #) -> tuple[
list[str], # list[str],
msgpack.Decoder, # msgpack.Decoder,
]: #]:
# See "tagged unions" docs: # '''
# https://jcristharif.com/msgspec/structs.html#tagged-unions # Create a `msgpack.Decoder` for an input `list[msgspec.Struct]`
# and return a `list[str]` of each struct's `tag_field: str` value
# which can be used to "map to" the initialized dec.
# "The quickest way to enable tagged unions is to set tag=True when # '''
# defining every struct type in the union. In this case tag_field # # See "tagged unions" docs:
# defaults to "type", and tag defaults to the struct class name # # https://jcristharif.com/msgspec/structs.html#tagged-unions
# (e.g. "Get")."
first: Struct = tagged_structs[0]
types_union: Union[Type[Struct]] = Union[
first
]|Any
tags: list[str] = [first.__name__]
for struct in tagged_structs[1:]: # # "The quickest way to enable tagged unions is to set tag=True when
types_union |= struct # # defining every struct type in the union. In this case tag_field
tags.append(struct.__name__) # # defaults to "type", and tag defaults to the struct class name
# # (e.g. "Get")."
# first: Struct = tagged_structs[0]
# types_union: Union[Type[Struct]] = Union[
# first
# ]|Any
# tags: list[str] = [first.__name__]
dec = msgpack.Decoder(types_union) # for struct in tagged_structs[1:]:
return ( # types_union |= struct
tags, # tags.append(
dec, # getattr(
) # struct,
# struct.__struct_config__.tag_field,
# struct.__name__,
# )
# )
# dec = msgpack.Decoder(types_union)
# return (
# tags,
# dec,
# )
# TODO: struct aware messaging coders as per:
# - https://github.com/goodboy/tractor/issues/36
# - https://github.com/goodboy/tractor/issues/196
# - https://github.com/goodboy/tractor/issues/365
def mk_codec( def mk_codec(
libname: str = 'msgspec', ipc_msg_spec: Union[Type[Struct]]|Any|None = None,
# for codec-ing boxed `Msg`-with-payload msgs
payload_types: Union[Type[Struct]]|None = None,
# TODO: do we want to allow NOT/using a diff `Msg`-set?
# #
# ^TODO^: in the long run, do we want to allow using a diff IPC `Msg`-set?
# it would break the runtime, but maybe say if you wanted
# to add some kinda field-specific or wholesale `.pld` ecryption?
# struct type unions set for `Decoder` # struct type unions set for `Decoder`
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
ipc_msg_spec: Union[Type[Struct]]|Any = Any, ipc_pld_spec: Union[Type[Struct]]|Any|None = None,
cache_now: bool = True, # TODO: offering a per-msg(-field) type-spec such that
# the fields can be dynamically NOT decoded and left as `Raw`
# values which are later loaded by a sub-decoder specified
# by `tag_field: str` value key?
# payload_msg_specs: dict[
# str, # tag_field value as sub-decoder key
# Union[Type[Struct]] # `Msg.pld` type spec
# ]|None = None,
libname: str = 'msgspec',
# proxy as `Struct(**kwargs)` # proxy as `Struct(**kwargs)`
# ------ - ------
dec_hook: Callable|None = None,
enc_hook: Callable|None = None,
# ------ - ------
**kwargs, **kwargs,
#
# Encoder:
# write_buffer_size=write_buffer_size,
#
# Decoder:
# ext_hook: ext_hook_sig
) -> MsgCodec: ) -> MsgCodec:
''' '''
@ -247,75 +309,81 @@ def mk_codec(
`msgspec` ;). `msgspec` ;).
''' '''
# (manually) generate a msg-payload-spec for all relevant if (
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` ipc_msg_spec is not None
# for the decoder such that all sub-type msgs in our SCIPP and ipc_pld_spec
# will automatically decode to a type-"limited" payload (`Struct`) ):
# object (set). raise RuntimeError(
payload_type_spec: Union[Type[Msg]]|None = None f'If a payload spec is provided,\n'
if payload_types: "the builtin SC-shuttle-protocol's msg set\n"
f'(i.e. `{Msg}`) MUST be used!\n\n'
f'However both values were passed as => mk_codec(\n'
f' ipc_msg_spec={ipc_msg_spec}`\n'
f' ipc_pld_spec={ipc_pld_spec}`\n)\n'
)
elif (
ipc_pld_spec
and
# XXX required for now (or maybe forever?) until
# we can dream up a way to allow parameterizing and/or
# custom overrides to the `Msg`-spec protocol itself?
ipc_msg_spec is None
):
# (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP
# will automatically decode to a type-"limited" payload (`Struct`)
# object (set).
( (
payload_type_spec, ipc_msg_spec,
msg_types, msg_types,
) = mk_msg_spec( ) = mk_msg_spec(
payload_type=payload_types, payload_type_union=ipc_pld_spec,
) )
assert len(payload_type_spec.__args__) == len(msg_types) assert len(ipc_msg_spec.__args__) == len(msg_types)
assert ipc_msg_spec
# TODO: sub-decode `.pld: Raw`? dec = msgpack.Decoder(
# see similar notes inside `.msg.types`.. type=ipc_msg_spec, # like `Msg[Any]`
# )
# not sure we'll end up wanting/needing this
# though it might have unforeseen advantages in terms else:
# of enabling encrypted appliciation layer (only) ipc_msg_spec = ipc_msg_spec or Any
# payloads?
# enc = msgpack.Encoder(
# register sub-payload decoders to load `.pld: Raw` enc_hook=enc_hook,
# decoded `Msg`-packets using a dynamic lookup (table) )
# instead of a pre-defined msg-spec via `Generic` dec = msgpack.Decoder(
# parameterization. type=ipc_msg_spec, # like `Msg[Any]`
# dec_hook=dec_hook,
# ( )
# tags,
# payload_dec,
# ) = mk_tagged_union_dec(
# tagged_structs=list(payload_types.__args__),
# )
# _payload_decs: (
# dict[str, msgpack.Decoder]|None
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `Msg.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# for name in tags:
# _payload_decs[name] = payload_dec
codec = MsgCodec( codec = MsgCodec(
ipc_msg_spec=ipc_msg_spec, _enc=enc,
payload_msg_spec=payload_type_spec, _dec=dec,
**kwargs, # payload_msg_specs=payload_msg_specs,
# **kwargs,
) )
assert codec.lib.__name__ == libname
# by default, config-n-cache the codec pair from input settings. # sanity on expected backend support
if cache_now: assert codec.lib.__name__ == libname
assert codec.enc
assert codec.dec
return codec return codec
# instance of the default `msgspec.msgpack` codec settings, i.e. # instance of the default `msgspec.msgpack` codec settings, i.e.
# no custom structs, hooks or other special types. # no custom structs, hooks or other special types.
_def_msgspec_codec: MsgCodec = mk_codec() _def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
# NOTE: provides for per-`trio.Task` specificity of the # NOTE: provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
# `Channel.send()/.recv()` of wire data. # `Channel.send()/.recv()` of wire data.
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
default=_def_msgspec_codec, default=_def_msgspec_codec,
) )
@ -353,7 +421,7 @@ def limit_msg_spec(
payload_types: Union[Type[Struct]], payload_types: Union[Type[Struct]],
# TODO: don't need this approach right? # TODO: don't need this approach right?
# # -> related to the `MsgCodec._payload_decs` stuff above..
# tagged_structs: list[Struct]|None = None, # tagged_structs: list[Struct]|None = None,
**codec_kwargs, **codec_kwargs,

View File

@ -22,9 +22,7 @@ that is,
the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol". the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol".
''' '''
from __future__ import annotations from __future__ import annotations
# from contextlib import contextmanager as cm
import types import types
from typing import ( from typing import (
Any, Any,
@ -36,14 +34,12 @@ from typing import (
) )
from msgspec import ( from msgspec import (
msgpack,
Raw,
Struct, Struct,
UNSET, UNSET,
) )
# TODO: sub-decoded `Raw` fields?
# TODO: can also remove yah? # -[ ] see `MsgCodec._payload_decs` notes
# #
# class Header(Struct, tag=True): # class Header(Struct, tag=True):
# ''' # '''
@ -70,7 +66,6 @@ class Msg(
tree. tree.
''' '''
# header: Header
# TODO: use UNSET here? # TODO: use UNSET here?
cid: str|None # call/context-id cid: str|None # call/context-id
@ -94,9 +89,24 @@ class Msg(
pld: PayloadT pld: PayloadT
# TODO: better name, like `Call/TaskInput`? # TODO: caps based RPC support in the payload?
#
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by
# inserting a ``MetaPathFinder`` into ``sys.meta_path`` (which
# we could do before entering the ``Actor._process_messages()``
# loop)?
# - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
#
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
#
# -[ ]better name, like `Call/TaskInput`?
#
class FuncSpec(Struct): class FuncSpec(Struct):
# TODO: can we combine these 2 into a `NamespacePath` field?
ns: str ns: str
func: str func: str
@ -249,7 +259,7 @@ class Error(Msg):
def mk_msg_spec( def mk_msg_spec(
payload_type: Union[Type] = Any, payload_type_union: Union[Type] = Any,
boxing_msg_set: set[Msg] = { boxing_msg_set: set[Msg] = {
Started, Started,
Yield, Yield,
@ -261,10 +271,13 @@ def mk_msg_spec(
list[Type[Msg]], list[Type[Msg]],
]: ]:
''' '''
Generate a payload-type-parameterized `Msg` specification such Create a payload-(data-)type-parameterized IPC message specification.
that IPC msgs which can be `Msg.pld` (payload) type
limited/filterd are specified given an input `payload_type: Allows generating IPC msg types from the above builtin set
Union[Type]`. with a payload (field) restricted data-type via the `Msg.pld:
PayloadT` type var. This allows runtime-task contexts to use
the python type system to limit/filter payload values as
determined by the input `payload_type_union: Union[Type]`.
''' '''
submsg_types: list[Type[Msg]] = Msg.__subclasses__() submsg_types: list[Type[Msg]] = Msg.__subclasses__()
@ -287,7 +300,7 @@ def mk_msg_spec(
# -[ ] is there a way to get it to work at module level # -[ ] is there a way to get it to work at module level
# just using inheritance or maybe a metaclass? # just using inheritance or maybe a metaclass?
# #
# index_paramed_msg_type: Msg = msgtype[payload_type] # index_paramed_msg_type: Msg = msgtype[payload_type_union]
# TODO: WHY do we need to dynamically generate the # TODO: WHY do we need to dynamically generate the
# subtype-msgs here to ensure the `.pld` parameterization # subtype-msgs here to ensure the `.pld` parameterization
@ -300,7 +313,7 @@ def mk_msg_spec(
( (
# XXX NOTE XXX this seems to be THE ONLY # XXX NOTE XXX this seems to be THE ONLY
# way to get this to work correctly!?! # way to get this to work correctly!?!
Msg[payload_type], Msg[payload_type_union],
Generic[PayloadT], Generic[PayloadT],
), ),
{}, {},
@ -322,71 +335,3 @@ def mk_msg_spec(
payload_type_spec, payload_type_spec,
msg_types, msg_types,
) )
# TODO: integration with our ``enable_modules: list[str]`` caps sys.
#
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by inserting
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
# entering the ``Actor._process_messages()`` loop).
# https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
# TODO: do we still want to try and support the sub-decoder with
# `Raw` technique in the case that the `Generic` approach gives
# future grief?
#
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
_payload_decs: dict[
str|None,
msgpack.Decoder,
] = {
# default decoder is used when `Header.payload_tag == None`
None: msgpack.Decoder(Any),
}
def dec_payload(
msg: Msg,
msg_dec: msgpack.Decoder = msgpack.Decoder(
type=Msg[Any]
),
) -> Any|Struct:
msg: Msg = msg_dec.decode(msg)
payload_tag: str = msg.header.payload_tag
payload_dec: msgpack.Decoder = _payload_decs[payload_tag]
return payload_dec.decode(msg.pld)
def enc_payload(
enc: msgpack.Encoder,
payload: Any,
cid: str,
) -> bytes:
# tag_field: str|None = None
plbytes = enc.encode(payload)
if b'msg_type' in plbytes:
assert isinstance(payload, Struct)
# tag_field: str = type(payload).__name__
payload = Raw(plbytes)
msg = Msg(
cid=cid,
pld=payload,
# Header(
# payload_tag=tag_field,
# # dialog_id,
# ),
)
return enc.encode(msg)