From 995af130cf200d7e12731b00eb334f8c44197307 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 28 Mar 2024 10:45:01 -0400 Subject: [PATCH] Init def of "SC shuttle prot" with "msg-spec-limiting" As per the long outstanding GH issue this starts our rigorous journey into an attempt at a type-safe, cross-actor SC, IPC protocol Bo boop -> https://github.com/goodboy/tractor/issues/36 The idea is to "formally" define our SC "shuttle (dialog) protocol" by specifying a new `.msg.types.Msg` subtype-set which can fully encapsulate all IPC msg schemas needed in order to accomplish cross-process SC! The msg set deviated a little in terms of (type) names from the existing `dict`-msgs currently used in the runtime impl but, I think the name changes are much better in terms of explicitly representing the internal semantics of the actor runtime machinery/subsystems and the IPC-msg-dialog required for SC enforced RPC. ------ - ------ In cursory, the new formal msgs-spec includes the following msg-subtypes of a new top-level `Msg` boxing type (that holds the base field schema for all msgs): - `Start` to request RPC task scheduling by passing a `FuncSpec` payload (to replace the currently used `{'cmd': ... }` dict msg impl) - `StartAck` to allow the RPC task callee-side to report a `IpcCtxSpec` payload immediately back to the caller (currently responded naively via a `{'functype': ... }` msg) - `Started` to deliver the first value from `Context.started()` (instead of the existing `{'started': ... }`) - `Yield` to shuttle `MsgStream.send()`-ed values (instead of our `{'yield': ... }`) - `Stop` to terminate a `Context.open_stream()` session/block (over `{'stop': True }`) - `Return` to deliver the final value from the `Actor.start_remote_task()` (which is a `{'return': ... }`) - `Error` to box `RemoteActorError` exceptions via a `.pld: ErrorData` payload, planned to replace/extend the current `RemoteActorError.msgdata` mechanism internal to `._exceptions.pack/unpack_error()` The new `tractor.msg.types` includes all the above msg defs as well an API for rendering a "payload type specification" using a `payload_type_spec: Union[Type]` that can be passed to `msgspec.msgpack.Decoder(type=payload_type_spec)`. This ensures that (for a subset of the above msg set) `Msg.pld: PayloadT` data is type-parameterized using `msgspec`'s new `Generic[PayloadT]` field support and thus enables providing for an API where IPC `Context` dialogs can strictly define the allowed payload-datatype-set via type union! Iow, this is the foundation for supporting `Channel`/`Context`/`MsgStream` IPC primitives which are type checked/safe as desired in GH issue: - https://github.com/goodboy/tractor/issues/365 Misc notes on current impl(s) status: ------ - ------ - add a `.msg.types.mk_msg_spec()` which uses the new `msgspec` support for `class MyStruct[Struct, Generic[T]]` parameterize-able fields and delivers our boxing SC-msg-(sub)set with the desired `payload_types` applied to `.pld`: - https://jcristharif.com/msgspec/supported-types.html#generic-types - as a note this impl seems to need to use `type.new_class()` dynamic subtype generation, though i don't really get *why* still.. but without that the `msgspec.msgpack.Decoder` doesn't seem to reject `.pld` limited `Msg` subtypes as demonstrated in the new test. - around this ^ add a `.msg._codec.limit_msg_spec()` cm which exposes this payload type limiting API such that it can be applied per task via a `MsgCodec` in app code. - the orig approach in https://github.com/goodboy/tractor/pull/311 was the idea of making payload fields `.pld: Raw` wherein we could have per-field/sub-msg decoders dynamically loaded depending on the particular application-layer schema in use. I don't want to lose the idea of this since I think it might be useful for an idea I have about capability-based-fields(-sharing, maybe using field-subset encryption?), and as such i've kept the (ostensibly) working impls in TODO-comments in `.msg._codec` wherein maybe we can add a `MsgCodec._payload_decs: dict` table for this later on. |_ also left in the `.msg.types.enc/decmsg()` impls but renamed as `enc/dec_payload()` (but reworked to not rely on the lifo codec stack tables; now removed) such that we can prolly move them to `MsgCodec` methods in the future. - add an unused `._codec.mk_tagged_union_dec()` helper which was originally factored out the #311 proto-code but didn't end up working as desired with the new parameterized generic fields approach (now in `msg.types.mk_msg_spec()`) Testing/deps work: ------ - ------ - new `test_limit_msgspec()` which ensures all the `.types` content is correct but without using the wrapping APIs in `._codec`; i.e. using a in-line `Decoder` instead of a `MsgCodec`. - pin us to `msgspec>=0.18.5` which has the needed generic-types support (which took me way too long yester to figure out when implementing all this XD)! --- setup.py | 2 +- tests/test_caps_msging.py | 181 ++++++++++++++- tractor/msg/_codec.py | 144 +++++++++++- tractor/msg/types.py | 474 ++++++++++++++++++++++++++------------ 4 files changed, 645 insertions(+), 156 deletions(-) diff --git a/setup.py b/setup.py index 50ee92e..a221937 100755 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ setup( 'wrapt', # IPC serialization - 'msgspec', + 'msgspec>=0.18.5', # debug mode REPL 'pdbp', diff --git a/tests/test_caps_msging.py b/tests/test_caps_msging.py index f659cb1..b101c1e 100644 --- a/tests/test_caps_msging.py +++ b/tests/test_caps_msging.py @@ -6,12 +6,22 @@ B~) ''' from typing import ( Any, + _GenericAlias, Type, + Union, ) from contextvars import ( Context, ) +# from inspect import Parameter +from msgspec import ( + structs, + msgpack, + # defstruct, + Struct, + ValidationError, +) import tractor from tractor.msg import ( _def_msgspec_codec, @@ -23,6 +33,12 @@ from tractor.msg import ( apply_codec, current_msgspec_codec, ) +from tractor.msg.types import ( + PayloadT, + Msg, + # Started, + mk_msg_spec, +) import trio # TODO: wrap these into `._codec` such that user can just pass @@ -54,7 +70,7 @@ def mk_custom_codec() -> MsgCodec: # apply custom hooks and set a `Decoder` which only # loads `NamespacePath` types. nsp_codec: MsgCodec = mk_codec( - dec_types=NamespacePath, + ipc_msg_spec=NamespacePath, enc_hook=enc_hook, dec_hook=dec_hook, ) @@ -196,3 +212,166 @@ def test_codec_hooks_mod(): await p.cancel_actor() trio.run(main) + + +def chk_pld_type( + generic: Msg|_GenericAlias, + payload_type: Type[Struct]|Any, + pld: Any, + +) -> bool: + + roundtrip: bool = False + pld_val_type: Type = type(pld) + + # gen_paramed: _GenericAlias = generic[payload_type] + # TODO: verify that the overridden subtypes + # DO NOT have modified type-annots from original! + # 'Start', .pld: FuncSpec + # 'StartAck', .pld: IpcCtxSpec + # 'Stop', .pld: UNSEt + # 'Error', .pld: ErrorData + # for typedef in ( + # [gen_paramed] + # + + + # # type-var should always be set for these sub-types + # # as well! + # Msg.__subclasses__() + # ): + # if typedef.__name__ not in [ + # 'Msg', + # 'Started', + # 'Yield', + # 'Return', + # ]: + # continue + # payload_type: Type[Struct] = CustomPayload + + # TODO: can remove all this right!? + # + # when parameterized (like `Msg[Any]`) then + # we expect an alias as input. + # if isinstance(generic, _GenericAlias): + # assert payload_type in generic.__args__ + # else: + # assert PayloadType in generic.__parameters__ + # pld_param: Parameter = generic.__signature__.parameters['pld'] + # assert pld_param.annotation is PayloadType + + type_spec: Union[Type[Struct]] + msg_types: list[Msg[payload_type]] + ( + type_spec, + msg_types, + ) = mk_msg_spec( + payload_type=payload_type, + ) + enc = msgpack.Encoder() + dec = msgpack.Decoder( + type=type_spec, # like `Msg[Any]` + ) + + # verify the boxed-type for all variable payload-type msgs. + for typedef in msg_types: + + pld_field = structs.fields(typedef)[1] + assert pld_field.type in {payload_type, PayloadT} + # TODO: does this need to work to get all subtypes to + # adhere? + assert pld_field.type is payload_type + + kwargs: dict[str, Any] = { + 'cid': '666', + 'pld': pld, + } + enc_msg = typedef(**kwargs) + + wire_bytes: bytes = enc.encode(enc_msg) + + try: + dec_msg = dec.decode(wire_bytes) + assert dec_msg.pld == pld + assert (roundtrip := (dec_msg == enc_msg)) + + except ValidationError as ve: + # breakpoint() + if pld_val_type is payload_type: + raise ValueError( + 'Got `ValidationError` despite type-var match!?\n' + f'pld_val_type: {pld_val_type}\n' + f'payload_type: {payload_type}\n' + ) from ve + + else: + # ow we good cuz the pld spec mismatched. + print( + 'Got expected `ValidationError` since,\n' + f'{pld_val_type} is not {payload_type}\n' + ) + else: + if ( + pld_val_type is not payload_type + and payload_type is not Any + ): + raise ValueError( + 'DID NOT `ValidationError` despite expected type match!?\n' + f'pld_val_type: {pld_val_type}\n' + f'payload_type: {payload_type}\n' + ) + + return roundtrip + + + +def test_limit_msgspec(): + + async def main(): + async with tractor.open_root_actor( + debug_mode=True + ): + + # ensure we can round-trip a boxing `Msg` + assert chk_pld_type( + Msg, + Any, + None, + ) + + # TODO: don't need this any more right since + # `msgspec>=0.15` has the nice generics stuff yah?? + # + # manually override the type annot of the payload + # field and ensure it propagates to all msg-subtypes. + # Msg.__annotations__['pld'] = Any + + # verify that a mis-typed payload value won't decode + assert not chk_pld_type( + Msg, + int, + pld='doggy', + ) + + # parametrize the boxed `.pld` type as a custom-struct + # and ensure that parametrization propagates + # to all payload-msg-spec-able subtypes! + class CustomPayload(Struct): + name: str + value: Any + + assert not chk_pld_type( + Msg, + CustomPayload, + pld='doggy', + ) + + assert chk_pld_type( + Msg, + CustomPayload, + pld=CustomPayload(name='doggy', value='urmom') + ) + + # uhh bc we can `.pause_from_sync()` now! :surfer: + # breakpoint() + + trio.run(main) diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index c26de8d..5ce0205 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -47,20 +47,25 @@ from types import ModuleType import msgspec from msgspec import msgpack -from .pretty_struct import Struct +from tractor.msg.pretty_struct import Struct +from tractor.msg.types import ( + mk_msg_spec, + Msg, +) # TODO: API changes towards being interchange lib agnostic! +# # -[ ] capnproto has pre-compiled schema for eg.. # * https://capnproto.org/language.html # * http://capnproto.github.io/pycapnp/quickstart.html # * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp +# class MsgCodec(Struct): ''' A IPC msg interchange format lib's encoder + decoder pair. ''' - lib: ModuleType = msgspec # ad-hoc type extensions @@ -70,12 +75,22 @@ class MsgCodec(Struct): # struct type unions # https://jcristharif.com/msgspec/structs.html#tagged-unions - types: Union[Type[Struct]]|Any = Any + ipc_msg_spec: Union[Type[Struct]]|Any = Any + payload_msg_spec: Union[Type[Struct]] = Any # post-configure cached props _enc: msgpack.Encoder|None = None _dec: msgpack.Decoder|None = None + # TODO: a sub-decoder system as well? + # see related comments in `.msg.types` + # _payload_decs: ( + # dict[ + # str, + # msgpack.Decoder, + # ] + # |None + # ) = None # TODO: use `functools.cached_property` for these ? # https://docs.python.org/3/library/functools.html#functools.cached_property @@ -88,8 +103,9 @@ class MsgCodec(Struct): enc_hook: Callable|None = None, reset: bool = False, - # TODO: what's the default for this? + # TODO: what's the default for this, and do we care? # write_buffer_size: int + # **kwargs, ) -> msgpack.Encoder: @@ -131,7 +147,7 @@ class MsgCodec(Struct): def decoder( self, - types: Union[Type[Struct]]|None = None, + ipc_msg_spec: Union[Type[Struct]]|None = None, dec_hook: Callable|None = None, reset: bool = False, **kwargs, @@ -152,7 +168,7 @@ class MsgCodec(Struct): or reset ): self._dec = self.lib.msgpack.Decoder( - types or self.types, + type=ipc_msg_spec or self.ipc_msg_spec, dec_hook=dec_hook or self.dec_hook, **kwargs, ) @@ -169,10 +185,39 @@ class MsgCodec(Struct): determined by the ''' - return self.dec.decode(msg) +def mk_tagged_union_dec( + tagged_structs: list[Struct], + +) -> tuple[ + list[str], + msgpack.Decoder, +]: + # See "tagged unions" docs: + # https://jcristharif.com/msgspec/structs.html#tagged-unions + + # "The quickest way to enable tagged unions is to set tag=True when + # defining every struct type in the union. In this case tag_field + # defaults to "type", and tag defaults to the struct class name + # (e.g. "Get")." + first: Struct = tagged_structs[0] + types_union: Union[Type[Struct]] = Union[ + first + ]|Any + tags: list[str] = [first.__name__] + + for struct in tagged_structs[1:]: + types_union |= struct + tags.append(struct.__name__) + + dec = msgpack.Decoder(types_union) + return ( + tags, + dec, + ) + # TODO: struct aware messaging coders as per: # - https://github.com/goodboy/tractor/issues/36 # - https://github.com/goodboy/tractor/issues/196 @@ -181,13 +226,18 @@ class MsgCodec(Struct): def mk_codec( libname: str = 'msgspec', + # for codec-ing boxed `Msg`-with-payload msgs + payload_types: Union[Type[Struct]]|None = None, + + # TODO: do we want to allow NOT/using a diff `Msg`-set? + # # struct type unions set for `Decoder` # https://jcristharif.com/msgspec/structs.html#tagged-unions - dec_types: Union[Type[Struct]]|Any = Any, + ipc_msg_spec: Union[Type[Struct]]|Any = Any, cache_now: bool = True, - # proxy to the `Struct.__init__()` + # proxy as `Struct(**kwargs)` **kwargs, ) -> MsgCodec: @@ -197,14 +247,59 @@ def mk_codec( `msgspec` ;). ''' + # (manually) generate a msg-payload-spec for all relevant + # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` + # for the decoder such that all sub-type msgs in our SCIPP + # will automatically decode to a type-"limited" payload (`Struct`) + # object (set). + payload_type_spec: Union[Type[Msg]]|None = None + if payload_types: + ( + payload_type_spec, + msg_types, + ) = mk_msg_spec( + payload_type=payload_types, + ) + assert len(payload_type_spec.__args__) == len(msg_types) + + # TODO: sub-decode `.pld: Raw`? + # see similar notes inside `.msg.types`.. + # + # not sure we'll end up wanting/needing this + # though it might have unforeseen advantages in terms + # of enabling encrypted appliciation layer (only) + # payloads? + # + # register sub-payload decoders to load `.pld: Raw` + # decoded `Msg`-packets using a dynamic lookup (table) + # instead of a pre-defined msg-spec via `Generic` + # parameterization. + # + # ( + # tags, + # payload_dec, + # ) = mk_tagged_union_dec( + # tagged_structs=list(payload_types.__args__), + # ) + # _payload_decs: ( + # dict[str, msgpack.Decoder]|None + # ) = { + # # pre-seed decoders for std-py-type-set for use when + # # `Msg.pld == None|Any`. + # None: msgpack.Decoder(Any), + # Any: msgpack.Decoder(Any), + # } + # for name in tags: + # _payload_decs[name] = payload_dec + codec = MsgCodec( - types=dec_types, + ipc_msg_spec=ipc_msg_spec, + payload_msg_spec=payload_type_spec, **kwargs, ) assert codec.lib.__name__ == libname - # by default config and cache the codec pair for given - # input settings. + # by default, config-n-cache the codec pair from input settings. if cache_now: assert codec.enc assert codec.dec @@ -251,3 +346,28 @@ def current_msgspec_codec() -> MsgCodec: ''' return _ctxvar_MsgCodec.get() + + +@cm +def limit_msg_spec( + payload_types: Union[Type[Struct]], + + # TODO: don't need this approach right? + # + # tagged_structs: list[Struct]|None = None, + + **codec_kwargs, +): + ''' + Apply a `MsgCodec` that will natively decode the SC-msg set's + `Msg.pld: Union[Type[Struct]]` payload fields using + tagged-unions of `msgspec.Struct`s from the `payload_types` + for all IPC contexts in use by the current `trio.Task`. + + ''' + msgspec_codec: MsgCodec = mk_codec( + payload_types=payload_types, + **codec_kwargs, + ) + with apply_codec(msgspec_codec): + yield msgspec_codec diff --git a/tractor/msg/types.py b/tractor/msg/types.py index d2fb087..732a0f5 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -15,23 +15,315 @@ # along with this program. If not, see . ''' -Extensions to built-in or (heavily used but 3rd party) friend-lib -types. +Define our strictly typed IPC message spec for the SCIPP: + +that is, + +the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol". ''' + from __future__ import annotations -from contextlib import contextmanager as cm +# from contextlib import contextmanager as cm +import types from typing import ( Any, + Generic, + Literal, + Type, + TypeVar, Union, ) from msgspec import ( msgpack, Raw, - Struct as _Struct, + Struct, + UNSET, ) + +# TODO: can also remove yah? +# +# class Header(Struct, tag=True): +# ''' +# A msg header which defines payload properties + +# ''' +# payload_tag: str|None = None + +# type variable for the boxed payload field `.pld` +PayloadT = TypeVar('PayloadT') + + +class Msg( + Struct, + Generic[PayloadT], + tag=True, + tag_field='msg_type', +): + ''' + The "god" boxing msg type. + + Boxes user data-msgs in a `.pld` and uses `msgspec`'s tagged + unions support to enable a spec from a common msg inheritance + tree. + + ''' + # header: Header + # TODO: use UNSET here? + cid: str|None # call/context-id + + # The msgs "payload" (spelled without vowels): + # https://en.wikipedia.org/wiki/Payload_(computing) + # + # NOTE: inherited from any `Msg` (and maybe overriden + # by use of `limit_msg_spec()`), but by default is + # parameterized to be `Any`. + # + # XXX this `Union` must strictly NOT contain `Any` if + # a limited msg-type-spec is intended, such that when + # creating and applying a new `MsgCodec` its + # `.decoder: Decoder` is configured with a `Union[Type[Struct]]` which + # restricts the allowed payload content (this `.pld` field) + # by type system defined loading constraints B) + # + # TODO: could also be set to `msgspec.Raw` if the sub-decoders + # approach is preferred over the generic parameterization + # approach as take by `mk_msg_spec()` below. + pld: PayloadT + + +# TODO: better name, like `Call/TaskInput`? +class FuncSpec(Struct): + # TODO: can we combine these 2 into a `NamespacePath` field? + ns: str + func: str + + kwargs: dict + uid: str # (calling) actor-id + + +class Start( + Msg, +): + ''' + Initial request to remotely schedule an RPC `trio.Task` via + `Actor.start_remote_task()`. + + It is called by all the following public APIs: + + - `ActorNursery.run_in_actor()` + + - `Portal.run()` + `|_.run_from_ns()` + `|_.open_stream_from()` + `|_._submit_for_result()` + + - `Context.open_context()` + + ''' + pld: FuncSpec + + +FuncType: Literal[ + 'asyncfunc', + 'asyncgen', + 'context', # TODO: the only one eventually? +] = 'context' + + +class IpcCtxSpec(Struct): + ''' + An inter-actor-`trio.Task`-comms `Context` spec. + + ''' + functype: FuncType + + # TODO: as part of the reponse we should report our allowed + # msg spec which should be generated from the type-annots as + # desired in # https://github.com/goodboy/tractor/issues/365 + # When this does not match what the starter/caller side + # expects we of course raise a `TypeError` just like if + # a function had been called using an invalid signature. + # + # msgspec: MsgSpec + + +class StartAck( + Msg, + Generic[PayloadT], +): + ''' + Init response to a `Cmd` request indicating the far + end's RPC callable "type". + + ''' + pld: IpcCtxSpec + + +class Started( + Msg, + Generic[PayloadT], +): + ''' + Packet to shuttle the "first value" delivered by + `Context.started(value: Any)` from a `@tractor.context` + decorated IPC endpoint. + + ''' + + +# TODO: instead of using our existing `Start` +# for this (as we did with the original `{'cmd': ..}` style) +# class Cancel(Msg): +# cid: str + + +class Yield( + Msg, + Generic[PayloadT], +): + ''' + Per IPC transmission of a value from `await MsgStream.send()`. + + ''' + + +class Stop(Msg): + ''' + Stream termination signal much like an IPC version + of `StopAsyncIteration`. + + ''' + pld: UNSET + + +class Return( + Msg, + Generic[PayloadT], +): + ''' + Final `return ` from a remotely scheduled + func-as-`trio.Task`. + + ''' + + +class ErrorData(Struct): + ''' + Remote actor error meta-data as needed originally by + `RemoteActorError.msgdata: dict`. + + ''' + src_uid: str + src_type_str: str + boxed_type_str: str + + relay_path: list[str] + tb_str: str + + # `ContextCancelled` + canceller: str|None = None + + # `StreamOverrun` + sender: str|None = None + + +class Error(Msg): + ''' + A pkt that wraps `RemoteActorError`s for relay. + + ''' + pld: ErrorData + + +# TODO: should be make a msg version of `ContextCancelled?` +# and/or with a scope field or a full `ActorCancelled`? +# class Cancelled(Msg): +# cid: str + +# TODO what about overruns? +# class Overrun(Msg): +# cid: str + + +def mk_msg_spec( + payload_type: Union[Type] = Any, + boxing_msg_set: set[Msg] = { + Started, + Yield, + Return, + }, + +) -> tuple[ + Union[Type[Msg]], + list[Type[Msg]], +]: + ''' + Generate a payload-type-parameterized `Msg` specification such + that IPC msgs which can be `Msg.pld` (payload) type + limited/filterd are specified given an input `payload_type: + Union[Type]`. + + ''' + submsg_types: list[Type[Msg]] = Msg.__subclasses__() + + # TODO: see below as well, + # => union building approach with `.__class_getitem__()` + # doesn't seem to work..? + # + # payload_type_spec: Union[Type[Msg]] + # + msg_types: list[Msg] = [] + for msgtype in boxing_msg_set: + + # check inheritance sanity + assert msgtype in submsg_types + + # TODO: wait why do we need the dynamic version here? + # -[ ] paraming the `PayloadT` values via `Generic[T]` + # doesn't seem to work at all? + # -[ ] is there a way to get it to work at module level + # just using inheritance or maybe a metaclass? + # + # index_paramed_msg_type: Msg = msgtype[payload_type] + + # TODO: WHY do we need to dynamically generate the + # subtype-msgs here to ensure the `.pld` parameterization + # propagates as well as works at all in terms of the + # `msgpack.Decoder()`..? + # + # dynamically create the payload type-spec-limited msg set. + manual_paramed_msg_subtype: Type = types.new_class( + msgtype.__name__, + ( + # XXX NOTE XXX this seems to be THE ONLY + # way to get this to work correctly!?! + Msg[payload_type], + Generic[PayloadT], + ), + {}, + ) + + # TODO: grok the diff here better.. + # assert index_paramed_msg_type == manual_paramed_msg_subtype + + # XXX TODO: why does the manual method work but not the + # `.__class_getitem__()` one!?! + paramed_msg_type = manual_paramed_msg_subtype + + # payload_type_spec |= paramed_msg_type + msg_types.append(paramed_msg_type) + + + payload_type_spec: Union[Type[Msg]] = Union[*msg_types] + return ( + payload_type_spec, + msg_types, + ) + + # TODO: integration with our ``enable_modules: list[str]`` caps sys. # # ``pkgutil.resolve_name()`` internally uses @@ -43,160 +335,58 @@ from msgspec import ( # - https://stackoverflow.com/a/63320902 # - https://docs.python.org/3/library/sys.html#sys.meta_path -# the new "Implicit Namespace Packages" might be relevant? -# - https://www.python.org/dev/peps/pep-0420/ - -# add implicit serialized message type support so that paths can be -# handed directly to IPC primitives such as streams and `Portal.run()` -# calls: -# - via ``msgspec``: -# - https://jcristharif.com/msgspec/api.html#struct -# - https://jcristharif.com/msgspec/extending.html -# via ``msgpack-python``: -# https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type -# LIFO codec stack that is appended when the user opens the -# ``configure_native_msgs()`` cm below to configure a new codec set -# which will be applied to all new (msgspec relevant) IPC transports -# that are spawned **after** the configure call is made. -_lifo_codecs: list[ - tuple[ - msgpack.Encoder, - msgpack.Decoder, - ], -] = [(msgpack.Encoder(), msgpack.Decoder())] - - -def get_msg_codecs() -> tuple[ - msgpack.Encoder, - msgpack.Decoder, -]: - ''' - Return the currently configured ``msgspec`` codec set. - - The defaults are defined above. - - ''' - global _lifo_codecs - return _lifo_codecs[-1] - - -@cm -def configure_native_msgs( - tagged_structs: list[_Struct], -): - ''' - Push a codec set that will natively decode - tagged structs provied in ``tagged_structs`` - in all IPC transports and pop the codec on exit. - - ''' - # See "tagged unions" docs: - # https://jcristharif.com/msgspec/structs.html#tagged-unions - - # "The quickest way to enable tagged unions is to set tag=True when - # defining every struct type in the union. In this case tag_field - # defaults to "type", and tag defaults to the struct class name - # (e.g. "Get")." - enc = msgpack.Encoder() - - types_union = Union[tagged_structs[0]] | Any - for struct in tagged_structs[1:]: - types_union |= struct - - dec = msgpack.Decoder(types_union) - - _lifo_codecs.append((enc, dec)) - try: - print("YOYOYOOYOYOYOY") - yield enc, dec - finally: - print("NONONONONON") - _lifo_codecs.pop() - - -class Header(_Struct, tag=True): - ''' - A msg header which defines payload properties - - ''' - uid: str - msgtype: str|None = None - - -class Msg(_Struct, tag=True): - ''' - The "god" msg type, a box for task level msg types. - - ''' - header: Header - payload: Raw - - -_root_dec = msgpack.Decoder(Msg) -_root_enc = msgpack.Encoder() - +# TODO: do we still want to try and support the sub-decoder with +# `Raw` technique in the case that the `Generic` approach gives +# future grief? +# # sub-decoders for retreiving embedded # payload data and decoding to a sender # side defined (struct) type. -_subdecs: dict[ +_payload_decs: dict[ str|None, - msgpack.Decoder] = { + msgpack.Decoder, +] = { + # default decoder is used when `Header.payload_tag == None` None: msgpack.Decoder(Any), } -@cm -def enable_context( - msg_subtypes: list[list[_Struct]] -) -> msgpack.Decoder: +def dec_payload( + msg: Msg, + msg_dec: msgpack.Decoder = msgpack.Decoder( + type=Msg[Any] + ), - for types in msg_subtypes: - first = types[0] +) -> Any|Struct: - # register using the default tag_field of "type" - # which seems to map to the class "name". - tags = [first.__name__] - - # create a tagged union decoder for this type set - type_union = Union[first] - for typ in types[1:]: - type_union |= typ - tags.append(typ.__name__) - - dec = msgpack.Decoder(type_union) - - # register all tags for this union sub-decoder - for tag in tags: - _subdecs[tag] = dec - try: - yield dec - finally: - for tag in tags: - _subdecs.pop(tag) + msg: Msg = msg_dec.decode(msg) + payload_tag: str = msg.header.payload_tag + payload_dec: msgpack.Decoder = _payload_decs[payload_tag] + return payload_dec.decode(msg.pld) -def decmsg(msg: Msg) -> Any: - msg = _root_dec.decode(msg) - tag_field = msg.header.msgtype - dec = _subdecs[tag_field] - return dec.decode(msg.payload) - - -def encmsg( - dialog_id: str | int, +def enc_payload( + enc: msgpack.Encoder, payload: Any, -) -> Msg: + cid: str, - tag_field = None +) -> bytes: - plbytes = _root_enc.encode(payload) - if b'type' in plbytes: - assert isinstance(payload, _Struct) - tag_field = type(payload).__name__ + # tag_field: str|None = None + + plbytes = enc.encode(payload) + if b'msg_type' in plbytes: + assert isinstance(payload, Struct) + + # tag_field: str = type(payload).__name__ payload = Raw(plbytes) msg = Msg( - Header(dialog_id, tag_field), - payload, + cid=cid, + pld=payload, + # Header( + # payload_tag=tag_field, + # # dialog_id, + # ), ) - return _root_enc.encode(msg) + return enc.encode(msg)