From b6ed26589a3e38b45bfc78cf36ab8b3f5d92fe07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 29 Mar 2024 12:46:59 -0400 Subject: [PATCH] Drop `MsgCodec.decoder()/.encoder()` design Instead just instantiate `msgpack.Encoder/Decoder` instances inside `mk_codec()` and assign them directly as `._enc/._dec` fields. Explicitly take in named-args to both and proxy to the coder/decoder instantiation calls directly. Shuffling some codec internals: - rename `mk_codec()` inputs as `ipc_msg_spec` and `ipc_pld_spec`, make them mutex such that a payload type spec can't be passed if the built-in msg-spec isn't used. => expose `MsgCodec.ipc_pld_spec` directly from `._dec.type` => presume input `ipc_msg_spec` is `Any` by default when no `ipc_pld_spec` is passed since we have no way atm to enable a similar type-restricted-payload feature without a wrapping "shuttle protocol" ;) - move all the payload-sub-decoders stuff prototyped in GH#311 (inside `.types`) to `._codec` as commented-for-later-maybe `MsgCodec` methods including: - `.mk_pld_subdec()` for registering - `.enc/dec_payload()` for sub-codec field loading. - also comment out `._codec.mk_tagged_union_dec()` as the orig tag-to-decoder table factory, now mostly superseded by `.types.mk_msg_spec()` which takes the generic parameterizing approach instead. - change naming to `types.mk_msg_spec(payload_type_union)` input, making it more explicit that it expects a `Union[Type]`. Oh right, and start exposing all the `.types.Msg` subtypes in the `.msg` subpkg in prep for usage throughout the runtime B) --- tractor/msg/__init__.py | 37 ++++ tractor/msg/_codec.py | 394 +++++++++++++++++++++++----------------- tractor/msg/types.py | 113 +++--------- 3 files changed, 297 insertions(+), 247 deletions(-) diff --git a/tractor/msg/__init__.py b/tractor/msg/__init__.py index b5c261c..a93fa88 100644 --- a/tractor/msg/__init__.py +++ b/tractor/msg/__init__.py @@ -33,3 +33,40 @@ from ._codec import ( MsgCodec as MsgCodec, current_msgspec_codec as current_msgspec_codec, ) + +from .types import ( + Msg as Msg, + + Start, # with pld + FuncSpec as FuncSpec, + + StartAck, # with pld + IpcCtxSpec as IpcCtxSpec, + + Started, + Yield, + Stop, + Return, + + Error, # with pld + ErrorData as ErrorData +) + + +# built-in SC shuttle protocol msg type set in +# approx order of the IPC txn-state spaces. +__spec__: list[Msg] = [ + + # inter-actor RPC initiation + Start, + StartAck, + + # no-outcome-yet IAC (inter-actor-communication) + Started, + Yield, + Stop, + + # termination outcomes + Return, + Error, +] diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index 5ce0205..e6cb4f1 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -29,6 +29,7 @@ ToDo: backends we prolly should offer: - https://capnproto.org/language.html#language-reference ''' +from __future__ import annotations from contextvars import ( ContextVar, Token, @@ -54,18 +55,36 @@ from tractor.msg.types import ( ) -# TODO: API changes towards being interchange lib agnostic! +# TODO: overall IPC msg-spec features (i.e. in this mod)! # -# -[ ] capnproto has pre-compiled schema for eg.. -# * https://capnproto.org/language.html -# * http://capnproto.github.io/pycapnp/quickstart.html -# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp +# -[ ] API changes towards being interchange lib agnostic! +# -[ ] capnproto has pre-compiled schema for eg.. +# * https://capnproto.org/language.html +# * http://capnproto.github.io/pycapnp/quickstart.html +# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp +# +# -[ ] struct aware messaging coders as per: +# -[x] https://github.com/goodboy/tractor/issues/36 +# -[ ] https://github.com/goodboy/tractor/issues/196 +# -[ ] https://github.com/goodboy/tractor/issues/365 # class MsgCodec(Struct): ''' A IPC msg interchange format lib's encoder + decoder pair. ''' + # post-configure-cached when prop-accessed (see `mk_codec()` + # OR can be passed directly as, + # `MsgCodec(_enc=, _dec=)` + _enc: msgpack.Encoder|None = None + _dec: msgpack.Decoder|None = None + + # struct type unions + # https://jcristharif.com/msgspec/structs.html#tagged-unions + @property + def ipc_pld_spec(self) -> Union[Type[Struct]]: + return self._dec.type + lib: ModuleType = msgspec # ad-hoc type extensions @@ -73,16 +92,8 @@ class MsgCodec(Struct): enc_hook: Callable[[Any], Any]|None = None # coder dec_hook: Callable[[type, Any], Any]|None = None # decoder - # struct type unions - # https://jcristharif.com/msgspec/structs.html#tagged-unions - ipc_msg_spec: Union[Type[Struct]]|Any = Any - payload_msg_spec: Union[Type[Struct]] = Any - - # post-configure cached props - _enc: msgpack.Encoder|None = None - _dec: msgpack.Decoder|None = None - # TODO: a sub-decoder system as well? + # payload_msg_specs: Union[Type[Struct]] = Any # see related comments in `.msg.types` # _payload_decs: ( # dict[ @@ -91,42 +102,18 @@ class MsgCodec(Struct): # ] # |None # ) = None + # OR + # ) = { + # # pre-seed decoders for std-py-type-set for use when + # # `Msg.pld == None|Any`. + # None: msgpack.Decoder(Any), + # Any: msgpack.Decoder(Any), + # } # TODO: use `functools.cached_property` for these ? # https://docs.python.org/3/library/functools.html#functools.cached_property @property def enc(self) -> msgpack.Encoder: - return self._enc or self.encoder() - - def encoder( - self, - enc_hook: Callable|None = None, - reset: bool = False, - - # TODO: what's the default for this, and do we care? - # write_buffer_size: int - # - **kwargs, - - ) -> msgpack.Encoder: - ''' - Set or get the maybe-cached `msgspec.msgpack.Encoder` - instance configured for this codec. - - When `reset=True` any previously configured encoder will - be recreated and then cached with the new settings passed - as input. - - ''' - if ( - self._enc is None - or reset - ): - self._enc = self.lib.msgpack.Encoder( - enc_hook=enc_hook or self.enc_hook, - # write_buffer_size=write_buffer_size, - ) - return self._enc def encode( @@ -139,40 +126,10 @@ class MsgCodec(Struct): on a tranport protocol connection. ''' - return self.enc.encode(py_obj) + return self._enc.encode(py_obj) @property def dec(self) -> msgpack.Decoder: - return self._dec or self.decoder() - - def decoder( - self, - ipc_msg_spec: Union[Type[Struct]]|None = None, - dec_hook: Callable|None = None, - reset: bool = False, - **kwargs, - # ext_hook: ext_hook_sig - - ) -> msgpack.Decoder: - ''' - Set or get the maybe-cached `msgspec.msgpack.Decoder` - instance configured for this codec. - - When `reset=True` any previously configured decoder will - be recreated and then cached with the new settings passed - as input. - - ''' - if ( - self._dec is None - or reset - ): - self._dec = self.lib.msgpack.Decoder( - type=ipc_msg_spec or self.ipc_msg_spec, - dec_hook=dec_hook or self.dec_hook, - **kwargs, - ) - return self._dec def decode( @@ -185,60 +142,165 @@ class MsgCodec(Struct): determined by the ''' - return self.dec.decode(msg) + return self._dec.decode(msg) + + # TODO: do we still want to try and support the sub-decoder with + # `.Raw` technique in the case that the `Generic` approach gives + # future grief? + # + # -[ ] + # + #def mk_pld_subdec( + # self, + # payload_types: Union[Type[Struct]], + + #) -> msgpack.Decoder: + # # TODO: sub-decoder suppor for `.pld: Raw`? + # # => see similar notes inside `.msg.types`.. + # # + # # not sure we'll end up needing this though it might have + # # unforeseen advantages in terms of enabling encrypted + # # appliciation layer (only) payloads? + # # + # # register sub-payload decoders to load `.pld: Raw` + # # decoded `Msg`-packets using a dynamic lookup (table) + # # instead of a pre-defined msg-spec via `Generic` + # # parameterization. + # # + # ( + # tags, + # payload_dec, + # ) = mk_tagged_union_dec( + # tagged_structs=list(payload_types.__args__), + # ) + # # register sub-decoders by tag + # subdecs: dict[str, msgpack.Decoder]|None = self._payload_decs + # for name in tags: + # subdecs.setdefault( + # name, + # payload_dec, + # ) + + # return payload_dec + + # sub-decoders for retreiving embedded + # payload data and decoding to a sender + # side defined (struct) type. + # def dec_payload( + # codec: MsgCodec, + # msg: Msg, + + # ) -> Any|Struct: + + # msg: Msg = codec.dec.decode(msg) + # payload_tag: str = msg.header.payload_tag + # payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag] + # return payload_dec.decode(msg.pld) + + # def enc_payload( + # codec: MsgCodec, + # payload: Any, + # cid: str, + + # ) -> bytes: + + # # tag_field: str|None = None + + # plbytes = codec.enc.encode(payload) + # if b'msg_type' in plbytes: + # assert isinstance(payload, Struct) + + # # tag_field: str = type(payload).__name__ + # payload = msgspec.Raw(plbytes) + + # msg = Msg( + # cid=cid, + # pld=payload, + # # Header( + # # payload_tag=tag_field, + # # # dialog_id, + # # ), + # ) + # return codec.enc.encode(msg) -def mk_tagged_union_dec( - tagged_structs: list[Struct], + #def mk_tagged_union_dec( + # tagged_structs: list[Struct], -) -> tuple[ - list[str], - msgpack.Decoder, -]: - # See "tagged unions" docs: - # https://jcristharif.com/msgspec/structs.html#tagged-unions + #) -> tuple[ + # list[str], + # msgpack.Decoder, + #]: + # ''' + # Create a `msgpack.Decoder` for an input `list[msgspec.Struct]` + # and return a `list[str]` of each struct's `tag_field: str` value + # which can be used to "map to" the initialized dec. - # "The quickest way to enable tagged unions is to set tag=True when - # defining every struct type in the union. In this case tag_field - # defaults to "type", and tag defaults to the struct class name - # (e.g. "Get")." - first: Struct = tagged_structs[0] - types_union: Union[Type[Struct]] = Union[ - first - ]|Any - tags: list[str] = [first.__name__] + # ''' + # # See "tagged unions" docs: + # # https://jcristharif.com/msgspec/structs.html#tagged-unions - for struct in tagged_structs[1:]: - types_union |= struct - tags.append(struct.__name__) + # # "The quickest way to enable tagged unions is to set tag=True when + # # defining every struct type in the union. In this case tag_field + # # defaults to "type", and tag defaults to the struct class name + # # (e.g. "Get")." + # first: Struct = tagged_structs[0] + # types_union: Union[Type[Struct]] = Union[ + # first + # ]|Any + # tags: list[str] = [first.__name__] - dec = msgpack.Decoder(types_union) - return ( - tags, - dec, - ) + # for struct in tagged_structs[1:]: + # types_union |= struct + # tags.append( + # getattr( + # struct, + # struct.__struct_config__.tag_field, + # struct.__name__, + # ) + # ) + + # dec = msgpack.Decoder(types_union) + # return ( + # tags, + # dec, + # ) -# TODO: struct aware messaging coders as per: -# - https://github.com/goodboy/tractor/issues/36 -# - https://github.com/goodboy/tractor/issues/196 -# - https://github.com/goodboy/tractor/issues/365 def mk_codec( - libname: str = 'msgspec', - - # for codec-ing boxed `Msg`-with-payload msgs - payload_types: Union[Type[Struct]]|None = None, - - # TODO: do we want to allow NOT/using a diff `Msg`-set? + ipc_msg_spec: Union[Type[Struct]]|Any|None = None, # + # ^TODO^: in the long run, do we want to allow using a diff IPC `Msg`-set? + # it would break the runtime, but maybe say if you wanted + # to add some kinda field-specific or wholesale `.pld` ecryption? + # struct type unions set for `Decoder` # https://jcristharif.com/msgspec/structs.html#tagged-unions - ipc_msg_spec: Union[Type[Struct]]|Any = Any, + ipc_pld_spec: Union[Type[Struct]]|Any|None = None, - cache_now: bool = True, + # TODO: offering a per-msg(-field) type-spec such that + # the fields can be dynamically NOT decoded and left as `Raw` + # values which are later loaded by a sub-decoder specified + # by `tag_field: str` value key? + # payload_msg_specs: dict[ + # str, # tag_field value as sub-decoder key + # Union[Type[Struct]] # `Msg.pld` type spec + # ]|None = None, + + libname: str = 'msgspec', # proxy as `Struct(**kwargs)` + # ------ - ------ + dec_hook: Callable|None = None, + enc_hook: Callable|None = None, + # ------ - ------ **kwargs, + # + # Encoder: + # write_buffer_size=write_buffer_size, + # + # Decoder: + # ext_hook: ext_hook_sig ) -> MsgCodec: ''' @@ -247,75 +309,81 @@ def mk_codec( `msgspec` ;). ''' - # (manually) generate a msg-payload-spec for all relevant - # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` - # for the decoder such that all sub-type msgs in our SCIPP - # will automatically decode to a type-"limited" payload (`Struct`) - # object (set). - payload_type_spec: Union[Type[Msg]]|None = None - if payload_types: + if ( + ipc_msg_spec is not None + and ipc_pld_spec + ): + raise RuntimeError( + f'If a payload spec is provided,\n' + "the builtin SC-shuttle-protocol's msg set\n" + f'(i.e. `{Msg}`) MUST be used!\n\n' + f'However both values were passed as => mk_codec(\n' + f' ipc_msg_spec={ipc_msg_spec}`\n' + f' ipc_pld_spec={ipc_pld_spec}`\n)\n' + ) + + elif ( + ipc_pld_spec + and + + # XXX required for now (or maybe forever?) until + # we can dream up a way to allow parameterizing and/or + # custom overrides to the `Msg`-spec protocol itself? + ipc_msg_spec is None + ): + # (manually) generate a msg-payload-spec for all relevant + # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` + # for the decoder such that all sub-type msgs in our SCIPP + # will automatically decode to a type-"limited" payload (`Struct`) + # object (set). ( - payload_type_spec, + ipc_msg_spec, msg_types, ) = mk_msg_spec( - payload_type=payload_types, + payload_type_union=ipc_pld_spec, ) - assert len(payload_type_spec.__args__) == len(msg_types) + assert len(ipc_msg_spec.__args__) == len(msg_types) + assert ipc_msg_spec - # TODO: sub-decode `.pld: Raw`? - # see similar notes inside `.msg.types`.. - # - # not sure we'll end up wanting/needing this - # though it might have unforeseen advantages in terms - # of enabling encrypted appliciation layer (only) - # payloads? - # - # register sub-payload decoders to load `.pld: Raw` - # decoded `Msg`-packets using a dynamic lookup (table) - # instead of a pre-defined msg-spec via `Generic` - # parameterization. - # - # ( - # tags, - # payload_dec, - # ) = mk_tagged_union_dec( - # tagged_structs=list(payload_types.__args__), - # ) - # _payload_decs: ( - # dict[str, msgpack.Decoder]|None - # ) = { - # # pre-seed decoders for std-py-type-set for use when - # # `Msg.pld == None|Any`. - # None: msgpack.Decoder(Any), - # Any: msgpack.Decoder(Any), - # } - # for name in tags: - # _payload_decs[name] = payload_dec + dec = msgpack.Decoder( + type=ipc_msg_spec, # like `Msg[Any]` + ) + + else: + ipc_msg_spec = ipc_msg_spec or Any + + enc = msgpack.Encoder( + enc_hook=enc_hook, + ) + dec = msgpack.Decoder( + type=ipc_msg_spec, # like `Msg[Any]` + dec_hook=dec_hook, + ) codec = MsgCodec( - ipc_msg_spec=ipc_msg_spec, - payload_msg_spec=payload_type_spec, - **kwargs, + _enc=enc, + _dec=dec, + # payload_msg_specs=payload_msg_specs, + # **kwargs, ) - assert codec.lib.__name__ == libname - # by default, config-n-cache the codec pair from input settings. - if cache_now: - assert codec.enc - assert codec.dec + # sanity on expected backend support + assert codec.lib.__name__ == libname return codec # instance of the default `msgspec.msgpack` codec settings, i.e. # no custom structs, hooks or other special types. -_def_msgspec_codec: MsgCodec = mk_codec() +_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any) # NOTE: provides for per-`trio.Task` specificity of the # IPC msging codec used by the transport layer when doing # `Channel.send()/.recv()` of wire data. _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( 'msgspec_codec', + + # TODO: move this to our new `Msg`-spec! default=_def_msgspec_codec, ) @@ -353,7 +421,7 @@ def limit_msg_spec( payload_types: Union[Type[Struct]], # TODO: don't need this approach right? - # + # -> related to the `MsgCodec._payload_decs` stuff above.. # tagged_structs: list[Struct]|None = None, **codec_kwargs, diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 732a0f5..7d64e76 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -22,9 +22,7 @@ that is, the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol". ''' - from __future__ import annotations -# from contextlib import contextmanager as cm import types from typing import ( Any, @@ -36,14 +34,12 @@ from typing import ( ) from msgspec import ( - msgpack, - Raw, Struct, UNSET, ) - -# TODO: can also remove yah? +# TODO: sub-decoded `Raw` fields? +# -[ ] see `MsgCodec._payload_decs` notes # # class Header(Struct, tag=True): # ''' @@ -70,7 +66,6 @@ class Msg( tree. ''' - # header: Header # TODO: use UNSET here? cid: str|None # call/context-id @@ -94,9 +89,24 @@ class Msg( pld: PayloadT -# TODO: better name, like `Call/TaskInput`? +# TODO: caps based RPC support in the payload? +# +# -[ ] integration with our ``enable_modules: list[str]`` caps sys. +# ``pkgutil.resolve_name()`` internally uses +# ``importlib.import_module()`` which can be filtered by +# inserting a ``MetaPathFinder`` into ``sys.meta_path`` (which +# we could do before entering the ``Actor._process_messages()`` +# loop)? +# - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645 +# - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules +# - https://stackoverflow.com/a/63320902 +# - https://docs.python.org/3/library/sys.html#sys.meta_path +# +# -[ ] can we combine .ns + .func into a native `NamespacePath` field? +# +# -[ ]better name, like `Call/TaskInput`? +# class FuncSpec(Struct): - # TODO: can we combine these 2 into a `NamespacePath` field? ns: str func: str @@ -249,7 +259,7 @@ class Error(Msg): def mk_msg_spec( - payload_type: Union[Type] = Any, + payload_type_union: Union[Type] = Any, boxing_msg_set: set[Msg] = { Started, Yield, @@ -261,10 +271,13 @@ def mk_msg_spec( list[Type[Msg]], ]: ''' - Generate a payload-type-parameterized `Msg` specification such - that IPC msgs which can be `Msg.pld` (payload) type - limited/filterd are specified given an input `payload_type: - Union[Type]`. + Create a payload-(data-)type-parameterized IPC message specification. + + Allows generating IPC msg types from the above builtin set + with a payload (field) restricted data-type via the `Msg.pld: + PayloadT` type var. This allows runtime-task contexts to use + the python type system to limit/filter payload values as + determined by the input `payload_type_union: Union[Type]`. ''' submsg_types: list[Type[Msg]] = Msg.__subclasses__() @@ -287,7 +300,7 @@ def mk_msg_spec( # -[ ] is there a way to get it to work at module level # just using inheritance or maybe a metaclass? # - # index_paramed_msg_type: Msg = msgtype[payload_type] + # index_paramed_msg_type: Msg = msgtype[payload_type_union] # TODO: WHY do we need to dynamically generate the # subtype-msgs here to ensure the `.pld` parameterization @@ -300,7 +313,7 @@ def mk_msg_spec( ( # XXX NOTE XXX this seems to be THE ONLY # way to get this to work correctly!?! - Msg[payload_type], + Msg[payload_type_union], Generic[PayloadT], ), {}, @@ -322,71 +335,3 @@ def mk_msg_spec( payload_type_spec, msg_types, ) - - -# TODO: integration with our ``enable_modules: list[str]`` caps sys. -# -# ``pkgutil.resolve_name()`` internally uses -# ``importlib.import_module()`` which can be filtered by inserting -# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before -# entering the ``Actor._process_messages()`` loop). -# https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645 -# https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules -# - https://stackoverflow.com/a/63320902 -# - https://docs.python.org/3/library/sys.html#sys.meta_path - -# TODO: do we still want to try and support the sub-decoder with -# `Raw` technique in the case that the `Generic` approach gives -# future grief? -# -# sub-decoders for retreiving embedded -# payload data and decoding to a sender -# side defined (struct) type. -_payload_decs: dict[ - str|None, - msgpack.Decoder, -] = { - # default decoder is used when `Header.payload_tag == None` - None: msgpack.Decoder(Any), -} - - -def dec_payload( - msg: Msg, - msg_dec: msgpack.Decoder = msgpack.Decoder( - type=Msg[Any] - ), - -) -> Any|Struct: - - msg: Msg = msg_dec.decode(msg) - payload_tag: str = msg.header.payload_tag - payload_dec: msgpack.Decoder = _payload_decs[payload_tag] - return payload_dec.decode(msg.pld) - - -def enc_payload( - enc: msgpack.Encoder, - payload: Any, - cid: str, - -) -> bytes: - - # tag_field: str|None = None - - plbytes = enc.encode(payload) - if b'msg_type' in plbytes: - assert isinstance(payload, Struct) - - # tag_field: str = type(payload).__name__ - payload = Raw(plbytes) - - msg = Msg( - cid=cid, - pld=payload, - # Header( - # payload_tag=tag_field, - # # dialog_id, - # ), - ) - return enc.encode(msg)