From 336db8425e8e14085ac94778d4e40082499d378c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Jul 2022 15:48:16 -0400 Subject: [PATCH] Re-think, `msgspec`-multi-typed msg dialogs The greasy details are strewn throughout a `msgspec` issue: https://github.com/jcrist/msgspec/issues/140 and specifically this code was mostly written as part of POC example in this comment: https://github.com/jcrist/msgspec/issues/140#issuecomment-1177850792 This work obviously pertains to our desire and prep for typed messaging and capabilities aware msg-oriented-protocols in #196. I added a "wants to have" method to `Context` showing how I think we could offer a pretty neat msg-type-set-as-capability-for-protocol system. XXX NOTE XXX: this commit was rewritten during a rebase from a very old version as per the prior commit. --- tractor/_streaming.py | 11 +++++ tractor/msg/_old_msg.py | 95 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 102 insertions(+), 4 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index e0015fe..90c33d3 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -533,6 +533,17 @@ class MsgStream(trio.abc.Channel): else: raise + # TODO: msg capability context api1 + # @acm + # async def enable_msg_caps( + # self, + # msg_subtypes: Union[ + # list[list[Struct]], + # Protocol, # hypothetical type that wraps a msg set + # ], + # ) -> tuple[Callable, Callable]: # payload enc, dec pair + # ... + def stream(func: Callable) -> Callable: ''' diff --git a/tractor/msg/_old_msg.py b/tractor/msg/_old_msg.py index 823228a..240b2ec 100644 --- a/tractor/msg/_old_msg.py +++ b/tractor/msg/_old_msg.py @@ -48,11 +48,12 @@ and APIs for applying custom msgspec-sets for implementing un-protocol state mac from __future__ import annotations from contextlib import contextmanager as cm from typing import ( - Union, Any, + Optional, + Union, ) -from msgspec import Struct +from msgspec import Struct, Raw from msgspec.msgpack import ( Encoder, Decoder, @@ -95,8 +96,6 @@ def configure_native_msgs( in all IPC transports and pop the codec on exit. ''' - global _lifo_codecs - # See "tagged unions" docs: # https://jcristharif.com/msgspec/structs.html#tagged-unions @@ -119,3 +118,91 @@ def configure_native_msgs( finally: print("NONONONONON") _lifo_codecs.pop() + + +class Header(Struct, tag=True): + ''' + A msg header which defines payload properties + + ''' + uid: str + msgtype: Optional[str] = None + + +class Msg(Struct, tag=True): + ''' + The "god" msg type, a box for task level msg types. + + ''' + header: Header + payload: Raw + + +_root_dec = Decoder(Msg) +_root_enc = Encoder() + +# sub-decoders for retreiving embedded +# payload data and decoding to a sender +# side defined (struct) type. +_subdecs: dict[ + Optional[str], + Decoder] = { + None: Decoder(Any), +} + + +@cm +def enable_context( + msg_subtypes: list[list[Struct]] +) -> Decoder: + + for types in msg_subtypes: + first = types[0] + + # register using the default tag_field of "type" + # which seems to map to the class "name". + tags = [first.__name__] + + # create a tagged union decoder for this type set + type_union = Union[first] + for typ in types[1:]: + type_union |= typ + tags.append(typ.__name__) + + dec = Decoder(type_union) + + # register all tags for this union sub-decoder + for tag in tags: + _subdecs[tag] = dec + try: + yield dec + finally: + for tag in tags: + _subdecs.pop(tag) + + +def decmsg(msg: Msg) -> Any: + msg = _root_dec.decode(msg) + tag_field = msg.header.msgtype + dec = _subdecs[tag_field] + return dec.decode(msg.payload) + + +def encmsg( + dialog_id: str | int, + payload: Any, +) -> Msg: + + tag_field = None + + plbytes = _root_enc.encode(payload) + if b'type' in plbytes: + assert isinstance(payload, Struct) + tag_field = type(payload).__name__ + payload = Raw(plbytes) + + msg = Msg( + Header(dialog_id, tag_field), + payload, + ) + return _root_enc.encode(msg)