diff --git a/setup.py b/setup.py
index 50ee92e..a221937 100755
--- a/setup.py
+++ b/setup.py
@@ -60,7 +60,7 @@ setup(
'wrapt',
# IPC serialization
- 'msgspec',
+ 'msgspec>=0.18.5',
# debug mode REPL
'pdbp',
diff --git a/tests/test_caps_msging.py b/tests/test_caps_msging.py
index f659cb1..b101c1e 100644
--- a/tests/test_caps_msging.py
+++ b/tests/test_caps_msging.py
@@ -6,12 +6,22 @@ B~)
'''
from typing import (
Any,
+ _GenericAlias,
Type,
+ Union,
)
from contextvars import (
Context,
)
+# from inspect import Parameter
+from msgspec import (
+ structs,
+ msgpack,
+ # defstruct,
+ Struct,
+ ValidationError,
+)
import tractor
from tractor.msg import (
_def_msgspec_codec,
@@ -23,6 +33,12 @@ from tractor.msg import (
apply_codec,
current_msgspec_codec,
)
+from tractor.msg.types import (
+ PayloadT,
+ Msg,
+ # Started,
+ mk_msg_spec,
+)
import trio
# TODO: wrap these into `._codec` such that user can just pass
@@ -54,7 +70,7 @@ def mk_custom_codec() -> MsgCodec:
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
nsp_codec: MsgCodec = mk_codec(
- dec_types=NamespacePath,
+ ipc_msg_spec=NamespacePath,
enc_hook=enc_hook,
dec_hook=dec_hook,
)
@@ -196,3 +212,166 @@ def test_codec_hooks_mod():
await p.cancel_actor()
trio.run(main)
+
+
+def chk_pld_type(
+ generic: Msg|_GenericAlias,
+ payload_type: Type[Struct]|Any,
+ pld: Any,
+
+) -> bool:
+
+ roundtrip: bool = False
+ pld_val_type: Type = type(pld)
+
+ # gen_paramed: _GenericAlias = generic[payload_type]
+ # TODO: verify that the overridden subtypes
+ # DO NOT have modified type-annots from original!
+ # 'Start', .pld: FuncSpec
+ # 'StartAck', .pld: IpcCtxSpec
+ # 'Stop', .pld: UNSEt
+ # 'Error', .pld: ErrorData
+ # for typedef in (
+ # [gen_paramed]
+ # +
+
+ # # type-var should always be set for these sub-types
+ # # as well!
+ # Msg.__subclasses__()
+ # ):
+ # if typedef.__name__ not in [
+ # 'Msg',
+ # 'Started',
+ # 'Yield',
+ # 'Return',
+ # ]:
+ # continue
+ # payload_type: Type[Struct] = CustomPayload
+
+ # TODO: can remove all this right!?
+ #
+ # when parameterized (like `Msg[Any]`) then
+ # we expect an alias as input.
+ # if isinstance(generic, _GenericAlias):
+ # assert payload_type in generic.__args__
+ # else:
+ # assert PayloadType in generic.__parameters__
+ # pld_param: Parameter = generic.__signature__.parameters['pld']
+ # assert pld_param.annotation is PayloadType
+
+ type_spec: Union[Type[Struct]]
+ msg_types: list[Msg[payload_type]]
+ (
+ type_spec,
+ msg_types,
+ ) = mk_msg_spec(
+ payload_type=payload_type,
+ )
+ enc = msgpack.Encoder()
+ dec = msgpack.Decoder(
+ type=type_spec, # like `Msg[Any]`
+ )
+
+ # verify the boxed-type for all variable payload-type msgs.
+ for typedef in msg_types:
+
+ pld_field = structs.fields(typedef)[1]
+ assert pld_field.type in {payload_type, PayloadT}
+ # TODO: does this need to work to get all subtypes to
+ # adhere?
+ assert pld_field.type is payload_type
+
+ kwargs: dict[str, Any] = {
+ 'cid': '666',
+ 'pld': pld,
+ }
+ enc_msg = typedef(**kwargs)
+
+ wire_bytes: bytes = enc.encode(enc_msg)
+
+ try:
+ dec_msg = dec.decode(wire_bytes)
+ assert dec_msg.pld == pld
+ assert (roundtrip := (dec_msg == enc_msg))
+
+ except ValidationError as ve:
+ # breakpoint()
+ if pld_val_type is payload_type:
+ raise ValueError(
+ 'Got `ValidationError` despite type-var match!?\n'
+ f'pld_val_type: {pld_val_type}\n'
+ f'payload_type: {payload_type}\n'
+ ) from ve
+
+ else:
+ # ow we good cuz the pld spec mismatched.
+ print(
+ 'Got expected `ValidationError` since,\n'
+ f'{pld_val_type} is not {payload_type}\n'
+ )
+ else:
+ if (
+ pld_val_type is not payload_type
+ and payload_type is not Any
+ ):
+ raise ValueError(
+ 'DID NOT `ValidationError` despite expected type match!?\n'
+ f'pld_val_type: {pld_val_type}\n'
+ f'payload_type: {payload_type}\n'
+ )
+
+ return roundtrip
+
+
+
+def test_limit_msgspec():
+
+ async def main():
+ async with tractor.open_root_actor(
+ debug_mode=True
+ ):
+
+ # ensure we can round-trip a boxing `Msg`
+ assert chk_pld_type(
+ Msg,
+ Any,
+ None,
+ )
+
+ # TODO: don't need this any more right since
+ # `msgspec>=0.15` has the nice generics stuff yah??
+ #
+ # manually override the type annot of the payload
+ # field and ensure it propagates to all msg-subtypes.
+ # Msg.__annotations__['pld'] = Any
+
+ # verify that a mis-typed payload value won't decode
+ assert not chk_pld_type(
+ Msg,
+ int,
+ pld='doggy',
+ )
+
+ # parametrize the boxed `.pld` type as a custom-struct
+ # and ensure that parametrization propagates
+ # to all payload-msg-spec-able subtypes!
+ class CustomPayload(Struct):
+ name: str
+ value: Any
+
+ assert not chk_pld_type(
+ Msg,
+ CustomPayload,
+ pld='doggy',
+ )
+
+ assert chk_pld_type(
+ Msg,
+ CustomPayload,
+ pld=CustomPayload(name='doggy', value='urmom')
+ )
+
+ # uhh bc we can `.pause_from_sync()` now! :surfer:
+ # breakpoint()
+
+ trio.run(main)
diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py
index c26de8d..5ce0205 100644
--- a/tractor/msg/_codec.py
+++ b/tractor/msg/_codec.py
@@ -47,20 +47,25 @@ from types import ModuleType
import msgspec
from msgspec import msgpack
-from .pretty_struct import Struct
+from tractor.msg.pretty_struct import Struct
+from tractor.msg.types import (
+ mk_msg_spec,
+ Msg,
+)
# TODO: API changes towards being interchange lib agnostic!
+#
# -[ ] capnproto has pre-compiled schema for eg..
# * https://capnproto.org/language.html
# * http://capnproto.github.io/pycapnp/quickstart.html
# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp
+#
class MsgCodec(Struct):
'''
A IPC msg interchange format lib's encoder + decoder pair.
'''
-
lib: ModuleType = msgspec
# ad-hoc type extensions
@@ -70,12 +75,22 @@ class MsgCodec(Struct):
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
- types: Union[Type[Struct]]|Any = Any
+ ipc_msg_spec: Union[Type[Struct]]|Any = Any
+ payload_msg_spec: Union[Type[Struct]] = Any
# post-configure cached props
_enc: msgpack.Encoder|None = None
_dec: msgpack.Decoder|None = None
+ # TODO: a sub-decoder system as well?
+ # see related comments in `.msg.types`
+ # _payload_decs: (
+ # dict[
+ # str,
+ # msgpack.Decoder,
+ # ]
+ # |None
+ # ) = None
# TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property
@@ -88,8 +103,9 @@ class MsgCodec(Struct):
enc_hook: Callable|None = None,
reset: bool = False,
- # TODO: what's the default for this?
+ # TODO: what's the default for this, and do we care?
# write_buffer_size: int
+ #
**kwargs,
) -> msgpack.Encoder:
@@ -131,7 +147,7 @@ class MsgCodec(Struct):
def decoder(
self,
- types: Union[Type[Struct]]|None = None,
+ ipc_msg_spec: Union[Type[Struct]]|None = None,
dec_hook: Callable|None = None,
reset: bool = False,
**kwargs,
@@ -152,7 +168,7 @@ class MsgCodec(Struct):
or reset
):
self._dec = self.lib.msgpack.Decoder(
- types or self.types,
+ type=ipc_msg_spec or self.ipc_msg_spec,
dec_hook=dec_hook or self.dec_hook,
**kwargs,
)
@@ -169,10 +185,39 @@ class MsgCodec(Struct):
determined by the
'''
-
return self.dec.decode(msg)
+def mk_tagged_union_dec(
+ tagged_structs: list[Struct],
+
+) -> tuple[
+ list[str],
+ msgpack.Decoder,
+]:
+ # See "tagged unions" docs:
+ # https://jcristharif.com/msgspec/structs.html#tagged-unions
+
+ # "The quickest way to enable tagged unions is to set tag=True when
+ # defining every struct type in the union. In this case tag_field
+ # defaults to "type", and tag defaults to the struct class name
+ # (e.g. "Get")."
+ first: Struct = tagged_structs[0]
+ types_union: Union[Type[Struct]] = Union[
+ first
+ ]|Any
+ tags: list[str] = [first.__name__]
+
+ for struct in tagged_structs[1:]:
+ types_union |= struct
+ tags.append(struct.__name__)
+
+ dec = msgpack.Decoder(types_union)
+ return (
+ tags,
+ dec,
+ )
+
# TODO: struct aware messaging coders as per:
# - https://github.com/goodboy/tractor/issues/36
# - https://github.com/goodboy/tractor/issues/196
@@ -181,13 +226,18 @@ class MsgCodec(Struct):
def mk_codec(
libname: str = 'msgspec',
+ # for codec-ing boxed `Msg`-with-payload msgs
+ payload_types: Union[Type[Struct]]|None = None,
+
+ # TODO: do we want to allow NOT/using a diff `Msg`-set?
+ #
# struct type unions set for `Decoder`
# https://jcristharif.com/msgspec/structs.html#tagged-unions
- dec_types: Union[Type[Struct]]|Any = Any,
+ ipc_msg_spec: Union[Type[Struct]]|Any = Any,
cache_now: bool = True,
- # proxy to the `Struct.__init__()`
+ # proxy as `Struct(**kwargs)`
**kwargs,
) -> MsgCodec:
@@ -197,14 +247,59 @@ def mk_codec(
`msgspec` ;).
'''
+ # (manually) generate a msg-payload-spec for all relevant
+ # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
+ # for the decoder such that all sub-type msgs in our SCIPP
+ # will automatically decode to a type-"limited" payload (`Struct`)
+ # object (set).
+ payload_type_spec: Union[Type[Msg]]|None = None
+ if payload_types:
+ (
+ payload_type_spec,
+ msg_types,
+ ) = mk_msg_spec(
+ payload_type=payload_types,
+ )
+ assert len(payload_type_spec.__args__) == len(msg_types)
+
+ # TODO: sub-decode `.pld: Raw`?
+ # see similar notes inside `.msg.types`..
+ #
+ # not sure we'll end up wanting/needing this
+ # though it might have unforeseen advantages in terms
+ # of enabling encrypted appliciation layer (only)
+ # payloads?
+ #
+ # register sub-payload decoders to load `.pld: Raw`
+ # decoded `Msg`-packets using a dynamic lookup (table)
+ # instead of a pre-defined msg-spec via `Generic`
+ # parameterization.
+ #
+ # (
+ # tags,
+ # payload_dec,
+ # ) = mk_tagged_union_dec(
+ # tagged_structs=list(payload_types.__args__),
+ # )
+ # _payload_decs: (
+ # dict[str, msgpack.Decoder]|None
+ # ) = {
+ # # pre-seed decoders for std-py-type-set for use when
+ # # `Msg.pld == None|Any`.
+ # None: msgpack.Decoder(Any),
+ # Any: msgpack.Decoder(Any),
+ # }
+ # for name in tags:
+ # _payload_decs[name] = payload_dec
+
codec = MsgCodec(
- types=dec_types,
+ ipc_msg_spec=ipc_msg_spec,
+ payload_msg_spec=payload_type_spec,
**kwargs,
)
assert codec.lib.__name__ == libname
- # by default config and cache the codec pair for given
- # input settings.
+ # by default, config-n-cache the codec pair from input settings.
if cache_now:
assert codec.enc
assert codec.dec
@@ -251,3 +346,28 @@ def current_msgspec_codec() -> MsgCodec:
'''
return _ctxvar_MsgCodec.get()
+
+
+@cm
+def limit_msg_spec(
+ payload_types: Union[Type[Struct]],
+
+ # TODO: don't need this approach right?
+ #
+ # tagged_structs: list[Struct]|None = None,
+
+ **codec_kwargs,
+):
+ '''
+ Apply a `MsgCodec` that will natively decode the SC-msg set's
+ `Msg.pld: Union[Type[Struct]]` payload fields using
+ tagged-unions of `msgspec.Struct`s from the `payload_types`
+ for all IPC contexts in use by the current `trio.Task`.
+
+ '''
+ msgspec_codec: MsgCodec = mk_codec(
+ payload_types=payload_types,
+ **codec_kwargs,
+ )
+ with apply_codec(msgspec_codec):
+ yield msgspec_codec
diff --git a/tractor/msg/types.py b/tractor/msg/types.py
index d2fb087..732a0f5 100644
--- a/tractor/msg/types.py
+++ b/tractor/msg/types.py
@@ -15,23 +15,315 @@
# along with this program. If not, see .
'''
-Extensions to built-in or (heavily used but 3rd party) friend-lib
-types.
+Define our strictly typed IPC message spec for the SCIPP:
+
+that is,
+
+the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol".
'''
+
from __future__ import annotations
-from contextlib import contextmanager as cm
+# from contextlib import contextmanager as cm
+import types
from typing import (
Any,
+ Generic,
+ Literal,
+ Type,
+ TypeVar,
Union,
)
from msgspec import (
msgpack,
Raw,
- Struct as _Struct,
+ Struct,
+ UNSET,
)
+
+# TODO: can also remove yah?
+#
+# class Header(Struct, tag=True):
+# '''
+# A msg header which defines payload properties
+
+# '''
+# payload_tag: str|None = None
+
+# type variable for the boxed payload field `.pld`
+PayloadT = TypeVar('PayloadT')
+
+
+class Msg(
+ Struct,
+ Generic[PayloadT],
+ tag=True,
+ tag_field='msg_type',
+):
+ '''
+ The "god" boxing msg type.
+
+ Boxes user data-msgs in a `.pld` and uses `msgspec`'s tagged
+ unions support to enable a spec from a common msg inheritance
+ tree.
+
+ '''
+ # header: Header
+ # TODO: use UNSET here?
+ cid: str|None # call/context-id
+
+ # The msgs "payload" (spelled without vowels):
+ # https://en.wikipedia.org/wiki/Payload_(computing)
+ #
+ # NOTE: inherited from any `Msg` (and maybe overriden
+ # by use of `limit_msg_spec()`), but by default is
+ # parameterized to be `Any`.
+ #
+ # XXX this `Union` must strictly NOT contain `Any` if
+ # a limited msg-type-spec is intended, such that when
+ # creating and applying a new `MsgCodec` its
+ # `.decoder: Decoder` is configured with a `Union[Type[Struct]]` which
+ # restricts the allowed payload content (this `.pld` field)
+ # by type system defined loading constraints B)
+ #
+ # TODO: could also be set to `msgspec.Raw` if the sub-decoders
+ # approach is preferred over the generic parameterization
+ # approach as take by `mk_msg_spec()` below.
+ pld: PayloadT
+
+
+# TODO: better name, like `Call/TaskInput`?
+class FuncSpec(Struct):
+ # TODO: can we combine these 2 into a `NamespacePath` field?
+ ns: str
+ func: str
+
+ kwargs: dict
+ uid: str # (calling) actor-id
+
+
+class Start(
+ Msg,
+):
+ '''
+ Initial request to remotely schedule an RPC `trio.Task` via
+ `Actor.start_remote_task()`.
+
+ It is called by all the following public APIs:
+
+ - `ActorNursery.run_in_actor()`
+
+ - `Portal.run()`
+ `|_.run_from_ns()`
+ `|_.open_stream_from()`
+ `|_._submit_for_result()`
+
+ - `Context.open_context()`
+
+ '''
+ pld: FuncSpec
+
+
+FuncType: Literal[
+ 'asyncfunc',
+ 'asyncgen',
+ 'context', # TODO: the only one eventually?
+] = 'context'
+
+
+class IpcCtxSpec(Struct):
+ '''
+ An inter-actor-`trio.Task`-comms `Context` spec.
+
+ '''
+ functype: FuncType
+
+ # TODO: as part of the reponse we should report our allowed
+ # msg spec which should be generated from the type-annots as
+ # desired in # https://github.com/goodboy/tractor/issues/365
+ # When this does not match what the starter/caller side
+ # expects we of course raise a `TypeError` just like if
+ # a function had been called using an invalid signature.
+ #
+ # msgspec: MsgSpec
+
+
+class StartAck(
+ Msg,
+ Generic[PayloadT],
+):
+ '''
+ Init response to a `Cmd` request indicating the far
+ end's RPC callable "type".
+
+ '''
+ pld: IpcCtxSpec
+
+
+class Started(
+ Msg,
+ Generic[PayloadT],
+):
+ '''
+ Packet to shuttle the "first value" delivered by
+ `Context.started(value: Any)` from a `@tractor.context`
+ decorated IPC endpoint.
+
+ '''
+
+
+# TODO: instead of using our existing `Start`
+# for this (as we did with the original `{'cmd': ..}` style)
+# class Cancel(Msg):
+# cid: str
+
+
+class Yield(
+ Msg,
+ Generic[PayloadT],
+):
+ '''
+ Per IPC transmission of a value from `await MsgStream.send()`.
+
+ '''
+
+
+class Stop(Msg):
+ '''
+ Stream termination signal much like an IPC version
+ of `StopAsyncIteration`.
+
+ '''
+ pld: UNSET
+
+
+class Return(
+ Msg,
+ Generic[PayloadT],
+):
+ '''
+ Final `return ` from a remotely scheduled
+ func-as-`trio.Task`.
+
+ '''
+
+
+class ErrorData(Struct):
+ '''
+ Remote actor error meta-data as needed originally by
+ `RemoteActorError.msgdata: dict`.
+
+ '''
+ src_uid: str
+ src_type_str: str
+ boxed_type_str: str
+
+ relay_path: list[str]
+ tb_str: str
+
+ # `ContextCancelled`
+ canceller: str|None = None
+
+ # `StreamOverrun`
+ sender: str|None = None
+
+
+class Error(Msg):
+ '''
+ A pkt that wraps `RemoteActorError`s for relay.
+
+ '''
+ pld: ErrorData
+
+
+# TODO: should be make a msg version of `ContextCancelled?`
+# and/or with a scope field or a full `ActorCancelled`?
+# class Cancelled(Msg):
+# cid: str
+
+# TODO what about overruns?
+# class Overrun(Msg):
+# cid: str
+
+
+def mk_msg_spec(
+ payload_type: Union[Type] = Any,
+ boxing_msg_set: set[Msg] = {
+ Started,
+ Yield,
+ Return,
+ },
+
+) -> tuple[
+ Union[Type[Msg]],
+ list[Type[Msg]],
+]:
+ '''
+ Generate a payload-type-parameterized `Msg` specification such
+ that IPC msgs which can be `Msg.pld` (payload) type
+ limited/filterd are specified given an input `payload_type:
+ Union[Type]`.
+
+ '''
+ submsg_types: list[Type[Msg]] = Msg.__subclasses__()
+
+ # TODO: see below as well,
+ # => union building approach with `.__class_getitem__()`
+ # doesn't seem to work..?
+ #
+ # payload_type_spec: Union[Type[Msg]]
+ #
+ msg_types: list[Msg] = []
+ for msgtype in boxing_msg_set:
+
+ # check inheritance sanity
+ assert msgtype in submsg_types
+
+ # TODO: wait why do we need the dynamic version here?
+ # -[ ] paraming the `PayloadT` values via `Generic[T]`
+ # doesn't seem to work at all?
+ # -[ ] is there a way to get it to work at module level
+ # just using inheritance or maybe a metaclass?
+ #
+ # index_paramed_msg_type: Msg = msgtype[payload_type]
+
+ # TODO: WHY do we need to dynamically generate the
+ # subtype-msgs here to ensure the `.pld` parameterization
+ # propagates as well as works at all in terms of the
+ # `msgpack.Decoder()`..?
+ #
+ # dynamically create the payload type-spec-limited msg set.
+ manual_paramed_msg_subtype: Type = types.new_class(
+ msgtype.__name__,
+ (
+ # XXX NOTE XXX this seems to be THE ONLY
+ # way to get this to work correctly!?!
+ Msg[payload_type],
+ Generic[PayloadT],
+ ),
+ {},
+ )
+
+ # TODO: grok the diff here better..
+ # assert index_paramed_msg_type == manual_paramed_msg_subtype
+
+ # XXX TODO: why does the manual method work but not the
+ # `.__class_getitem__()` one!?!
+ paramed_msg_type = manual_paramed_msg_subtype
+
+ # payload_type_spec |= paramed_msg_type
+ msg_types.append(paramed_msg_type)
+
+
+ payload_type_spec: Union[Type[Msg]] = Union[*msg_types]
+ return (
+ payload_type_spec,
+ msg_types,
+ )
+
+
# TODO: integration with our ``enable_modules: list[str]`` caps sys.
#
# ``pkgutil.resolve_name()`` internally uses
@@ -43,160 +335,58 @@ from msgspec import (
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
-# the new "Implicit Namespace Packages" might be relevant?
-# - https://www.python.org/dev/peps/pep-0420/
-
-# add implicit serialized message type support so that paths can be
-# handed directly to IPC primitives such as streams and `Portal.run()`
-# calls:
-# - via ``msgspec``:
-# - https://jcristharif.com/msgspec/api.html#struct
-# - https://jcristharif.com/msgspec/extending.html
-# via ``msgpack-python``:
-# https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
-# LIFO codec stack that is appended when the user opens the
-# ``configure_native_msgs()`` cm below to configure a new codec set
-# which will be applied to all new (msgspec relevant) IPC transports
-# that are spawned **after** the configure call is made.
-_lifo_codecs: list[
- tuple[
- msgpack.Encoder,
- msgpack.Decoder,
- ],
-] = [(msgpack.Encoder(), msgpack.Decoder())]
-
-
-def get_msg_codecs() -> tuple[
- msgpack.Encoder,
- msgpack.Decoder,
-]:
- '''
- Return the currently configured ``msgspec`` codec set.
-
- The defaults are defined above.
-
- '''
- global _lifo_codecs
- return _lifo_codecs[-1]
-
-
-@cm
-def configure_native_msgs(
- tagged_structs: list[_Struct],
-):
- '''
- Push a codec set that will natively decode
- tagged structs provied in ``tagged_structs``
- in all IPC transports and pop the codec on exit.
-
- '''
- # See "tagged unions" docs:
- # https://jcristharif.com/msgspec/structs.html#tagged-unions
-
- # "The quickest way to enable tagged unions is to set tag=True when
- # defining every struct type in the union. In this case tag_field
- # defaults to "type", and tag defaults to the struct class name
- # (e.g. "Get")."
- enc = msgpack.Encoder()
-
- types_union = Union[tagged_structs[0]] | Any
- for struct in tagged_structs[1:]:
- types_union |= struct
-
- dec = msgpack.Decoder(types_union)
-
- _lifo_codecs.append((enc, dec))
- try:
- print("YOYOYOOYOYOYOY")
- yield enc, dec
- finally:
- print("NONONONONON")
- _lifo_codecs.pop()
-
-
-class Header(_Struct, tag=True):
- '''
- A msg header which defines payload properties
-
- '''
- uid: str
- msgtype: str|None = None
-
-
-class Msg(_Struct, tag=True):
- '''
- The "god" msg type, a box for task level msg types.
-
- '''
- header: Header
- payload: Raw
-
-
-_root_dec = msgpack.Decoder(Msg)
-_root_enc = msgpack.Encoder()
-
+# TODO: do we still want to try and support the sub-decoder with
+# `Raw` technique in the case that the `Generic` approach gives
+# future grief?
+#
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
-_subdecs: dict[
+_payload_decs: dict[
str|None,
- msgpack.Decoder] = {
+ msgpack.Decoder,
+] = {
+ # default decoder is used when `Header.payload_tag == None`
None: msgpack.Decoder(Any),
}
-@cm
-def enable_context(
- msg_subtypes: list[list[_Struct]]
-) -> msgpack.Decoder:
+def dec_payload(
+ msg: Msg,
+ msg_dec: msgpack.Decoder = msgpack.Decoder(
+ type=Msg[Any]
+ ),
- for types in msg_subtypes:
- first = types[0]
+) -> Any|Struct:
- # register using the default tag_field of "type"
- # which seems to map to the class "name".
- tags = [first.__name__]
-
- # create a tagged union decoder for this type set
- type_union = Union[first]
- for typ in types[1:]:
- type_union |= typ
- tags.append(typ.__name__)
-
- dec = msgpack.Decoder(type_union)
-
- # register all tags for this union sub-decoder
- for tag in tags:
- _subdecs[tag] = dec
- try:
- yield dec
- finally:
- for tag in tags:
- _subdecs.pop(tag)
+ msg: Msg = msg_dec.decode(msg)
+ payload_tag: str = msg.header.payload_tag
+ payload_dec: msgpack.Decoder = _payload_decs[payload_tag]
+ return payload_dec.decode(msg.pld)
-def decmsg(msg: Msg) -> Any:
- msg = _root_dec.decode(msg)
- tag_field = msg.header.msgtype
- dec = _subdecs[tag_field]
- return dec.decode(msg.payload)
-
-
-def encmsg(
- dialog_id: str | int,
+def enc_payload(
+ enc: msgpack.Encoder,
payload: Any,
-) -> Msg:
+ cid: str,
- tag_field = None
+) -> bytes:
- plbytes = _root_enc.encode(payload)
- if b'type' in plbytes:
- assert isinstance(payload, _Struct)
- tag_field = type(payload).__name__
+ # tag_field: str|None = None
+
+ plbytes = enc.encode(payload)
+ if b'msg_type' in plbytes:
+ assert isinstance(payload, Struct)
+
+ # tag_field: str = type(payload).__name__
payload = Raw(plbytes)
msg = Msg(
- Header(dialog_id, tag_field),
- payload,
+ cid=cid,
+ pld=payload,
+ # Header(
+ # payload_tag=tag_field,
+ # # dialog_id,
+ # ),
)
- return _root_enc.encode(msg)
+ return enc.encode(msg)