From 0a69829ec576f1429f52ec9f3f2ca58607f0c5aa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 26 Mar 2024 15:50:47 -0400 Subject: [PATCH] Proto `MsgCodec`, an interchange fmt modify API Fitting in line with the issues outstanding: - #36: (msg)spec-ing out our SCIPP (structured-con-inter-proc-prot). (https://github.com/goodboy/tractor/issues/36) - #196: adding strictly typed IPC msg dialog schemas, more or less better described as "dialog/transaction scoped message specs" using `msgspec`'s tagged unions and custom codecs. (https://github.com/goodboy/tractor/issues/196) - #365: using modern static type-annots to drive capability based messaging and RPC. (statically https://github.com/goodboy/tractor/issues/365) This is a first draft of a new API for dynamically overriding IPC msg codecs for a given interchange lib from any task in the runtime. Right now we obviously only support `msgspec` but ideally this API holds general enough to be used for other backends eventually (like `capnproto`, and apache arrow). Impl is in a new `tractor.msg._codec` with: - a new `MsgCodec` type for encapsing `msgspec.msgpack.Encoder/Decoder` pairs and configuring any custom enc/dec_hooks or typed decoding. - factory `mk_codec()` for creating new codecs ad-hoc from a task. - `contextvars` support for a new `trio.Task` scoped `_ctxvar_MsgCodec: ContextVar[MsgCodec]` named 'msgspec_codec'. - `apply_codec()` for temporarily modifying the above per task as needed around `.open_context()` / `.open_stream()` operation. A new test (suite) in `test_caps_msging.py`: - verify a parent and its child can enable the same custom codec (in this case to transmit `NamespacePath`s) with tons of pedantic ctx-vars checks. - ToDo: still need to implement #36 msg types in order to be able to get decodes working (as in `MsgStream.receive()` will deliver an already created `NamespacePath` obj) since currently all msgs come packed in `dict`-msg wrapper packets.. -> use the proto from PR #35 to get nested `msgspec.Raw` processing up and running Bo --- tests/test_caps_msging.py | 198 +++++++++++++++++++++++++++++ tractor/_ipc.py | 74 ++++++++--- tractor/msg/__init__.py | 10 ++ tractor/msg/_codec.py | 253 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 519 insertions(+), 16 deletions(-) create mode 100644 tests/test_caps_msging.py create mode 100644 tractor/msg/_codec.py diff --git a/tests/test_caps_msging.py b/tests/test_caps_msging.py new file mode 100644 index 0000000..f659cb1 --- /dev/null +++ b/tests/test_caps_msging.py @@ -0,0 +1,198 @@ +''' +Functional audits for our "capability based messaging (schema)" feats. + +B~) + +''' +from typing import ( + Any, + Type, +) +from contextvars import ( + Context, +) + +import tractor +from tractor.msg import ( + _def_msgspec_codec, + _ctxvar_MsgCodec, + + NamespacePath, + MsgCodec, + mk_codec, + apply_codec, + current_msgspec_codec, +) +import trio + +# TODO: wrap these into `._codec` such that user can just pass +# a type table of some sort? +def enc_hook(obj: Any) -> Any: + if isinstance(obj, NamespacePath): + return str(obj) + else: + raise NotImplementedError( + f'Objects of type {type(obj)} are not supported' + ) + + +def dec_hook(type: Type, obj: Any) -> Any: + print(f'type is: {type}') + if type is NamespacePath: + return NamespacePath(obj) + else: + raise NotImplementedError( + f'Objects of type {type(obj)} are not supported' + ) + + +def ex_func(*args): + print(f'ex_func({args})') + + +def mk_custom_codec() -> MsgCodec: + # apply custom hooks and set a `Decoder` which only + # loads `NamespacePath` types. + nsp_codec: MsgCodec = mk_codec( + dec_types=NamespacePath, + enc_hook=enc_hook, + dec_hook=dec_hook, + ) + + # TODO: validate `MsgCodec` interface/semantics? + # -[ ] simple field tests to ensure caching + reset is workin? + # -[ ] custom / changing `.decoder()` calls? + # + # dec = nsp_codec.decoder( + # types=NamespacePath, + # ) + # assert nsp_codec.dec is dec + return nsp_codec + + +@tractor.context +async def send_back_nsp( + ctx: tractor.Context, + +) -> None: + ''' + Setup up a custom codec to load instances of `NamespacePath` + and ensure we can round trip a func ref with our parent. + + ''' + task: trio.Task = trio.lowlevel.current_task() + task_ctx: Context = task.context + assert _ctxvar_MsgCodec not in task_ctx + + nsp_codec: MsgCodec = mk_custom_codec() + with apply_codec(nsp_codec) as codec: + chk_codec_applied( + custom_codec=nsp_codec, + enter_value=codec, + ) + + nsp = NamespacePath.from_ref(ex_func) + await ctx.started(nsp) + + async with ctx.open_stream() as ipc: + async for msg in ipc: + + assert msg == f'{__name__}:ex_func' + + # TODO: as per below + # assert isinstance(msg, NamespacePath) + assert isinstance(msg, str) + + +def chk_codec_applied( + custom_codec: MsgCodec, + enter_value: MsgCodec, +) -> MsgCodec: + + task: trio.Task = trio.lowlevel.current_task() + task_ctx: Context = task.context + + assert _ctxvar_MsgCodec in task_ctx + curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] + + assert ( + # returned from `mk_codec()` + custom_codec is + + # yielded value from `apply_codec()` + enter_value is + + # read from current task's `contextvars.Context` + curr_codec is + + # public API for all of the above + current_msgspec_codec() + + # the default `msgspec` settings + is not _def_msgspec_codec + ) + + +def test_codec_hooks_mod(): + ''' + Audit the `.msg.MsgCodec` override apis details given our impl + uses `contextvars` to accomplish per `trio` task codec + application around an inter-proc-task-comms context. + + ''' + async def main(): + task: trio.Task = trio.lowlevel.current_task() + task_ctx: Context = task.context + assert _ctxvar_MsgCodec not in task_ctx + + async with tractor.open_nursery() as an: + p: tractor.Portal = await an.start_actor( + 'sub', + enable_modules=[__name__], + ) + + # TODO: 2 cases: + # - codec not modified -> decode nsp as `str` + # - codec modified with hooks -> decode nsp as + # `NamespacePath` + nsp_codec: MsgCodec = mk_custom_codec() + with apply_codec(nsp_codec) as codec: + chk_codec_applied( + custom_codec=nsp_codec, + enter_value=codec, + ) + + async with ( + p.open_context( + send_back_nsp, + ) as (ctx, first), + ctx.open_stream() as ipc, + ): + # ensure codec is still applied across + # `tractor.Context` + its embedded nursery. + chk_codec_applied( + custom_codec=nsp_codec, + enter_value=codec, + ) + + assert first == f'{__name__}:ex_func' + # TODO: actually get the decoder loading + # to native once we spec our SCIPP msgspec + # (structurred-conc-inter-proc-protocol) + # implemented as per, + # https://github.com/goodboy/tractor/issues/36 + # + # assert isinstance(first, NamespacePath) + assert isinstance(first, str) + await ipc.send(first) + + with trio.move_on_after(1): + async for msg in ipc: + + # TODO: as per above + # assert isinstance(msg, NamespacePath) + assert isinstance(msg, str) + + await p.cancel_actor() + + trio.run(main) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 2b5df69..5aafda3 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -23,7 +23,10 @@ from collections.abc import ( AsyncGenerator, AsyncIterator, ) -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) import platform from pprint import pformat import struct @@ -37,12 +40,15 @@ from typing import ( TypeVar, ) -import msgspec from tricycle import BufferedReceiveStream import trio from tractor.log import get_logger from tractor._exceptions import TransportClosed +from tractor.msg import ( + _ctxvar_MsgCodec, + MsgCodec, +) log = get_logger(__name__) @@ -154,13 +160,9 @@ class MsgpackTCPStream(MsgTransport): ) self.prefix_size = prefix_size - # TODO: struct aware messaging coders - self.encode = msgspec.msgpack.Encoder( - enc_hook=codec[0] if codec else None, - ).encode - self.decode = msgspec.msgpack.Decoder( - dec_hook=codec[1] if codec else None, - ).decode + # allow for custom IPC msg interchange format + # dynamic override Bo + self.codec: MsgCodec = codec or MsgCodec() async def _iter_packets(self) -> AsyncGenerator[dict, None]: '''Yield packets from the underlying stream. @@ -199,7 +201,23 @@ class MsgpackTCPStream(MsgTransport): log.transport(f"received {msg_bytes}") # type: ignore try: - yield self.decode(msg_bytes) + # NOTE: lookup the `trio.Task.context`'s var for + # the current `MsgCodec`. + yield _ctxvar_MsgCodec.get().decode(msg_bytes) + + # TODO: remove, was only for orig draft impl + # testing. + # + # curr_codec: MsgCodec = _ctxvar_MsgCodec.get() + # obj = curr_codec.decode(msg_bytes) + # if ( + # curr_codec is not + # _codec._def_msgspec_codec + # ): + # print(f'OBJ: {obj}\n') + # + # yield obj + except ( msgspec.DecodeError, UnicodeDecodeError, @@ -235,7 +253,10 @@ class MsgpackTCPStream(MsgTransport): # __tracebackhide__: bool = hide_tb async with self._send_lock: - bytes_data: bytes = self.encode(msg) + # NOTE: lookup the `trio.Task.context`'s var for + # the current `MsgCodec`. + bytes_data: bytes = _ctxvar_MsgCodec.get().encode(msg) + # bytes_data: bytes = self.codec.encode(msg) # supposedly the fastest says, # https://stackoverflow.com/a/54027962 @@ -335,7 +356,9 @@ class Channel: @property def msgstream(self) -> MsgTransport: - log.info('`Channel.msgstream` is an old name, use `._transport`') + log.info( + '`Channel.msgstream` is an old name, use `._transport`' + ) return self._transport @property @@ -368,10 +391,7 @@ class Channel: # XXX optionally provided codec pair for `msgspec`: # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - codec: tuple[ - Callable[[Any], Any], # coder - Callable[[type, Any], Any], # decoder - ]|None = None, + codec: MsgCodec|None = None, ) -> MsgTransport: type_key = ( @@ -379,14 +399,36 @@ class Channel: or self._transport_key ) + # get transport type, then self._transport = get_msg_transport( type_key + # instantiate an instance of the msg-transport )( stream, codec=codec, ) return self._transport + # TODO: something simliar at the IPC-`Context` + # level so as to support + @cm + def apply_codec( + self, + codec: MsgCodec, + + ) -> None: + ''' + Temporarily override the underlying IPC msg codec for + dynamic enforcement of messaging schema. + + ''' + orig: MsgCodec = self._transport.codec + try: + self._transport.codec = codec + yield + finally: + self._transport.codec = orig + def __repr__(self) -> str: if not self._transport: return '' diff --git a/tractor/msg/__init__.py b/tractor/msg/__init__.py index 906627c..e229678 100644 --- a/tractor/msg/__init__.py +++ b/tractor/msg/__init__.py @@ -24,3 +24,13 @@ from .ptr import ( from .types import ( Struct as Struct, ) +from ._codec import ( + + _def_msgspec_codec as _def_msgspec_codec, + _ctxvar_MsgCodec as _ctxvar_MsgCodec, + + apply_codec as apply_codec, + mk_codec as mk_codec, + MsgCodec as MsgCodec, + current_msgspec_codec as current_msgspec_codec, +) diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py new file mode 100644 index 0000000..0da454a --- /dev/null +++ b/tractor/msg/_codec.py @@ -0,0 +1,253 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +IPC msg interchange codec management. + +Supported backend libs: +- `msgspec.msgpack` + +ToDo: backends we prolly should offer: + +- see project/lib list throughout GH issue discussion comments: + https://github.com/goodboy/tractor/issues/196 + +- `capnproto`: https://capnproto.org/rpc.html + - https://capnproto.org/language.html#language-reference + +''' +from contextvars import ( + ContextVar, + Token, +) +from contextlib import ( + contextmanager as cm, +) +from typing import ( + Any, + Callable, + Type, + Union, +) +from types import ModuleType + +import msgspec +from msgspec import msgpack + +from .types import Struct + + +# TODO: API changes towards being interchange lib agnostic! +# -[ ] capnproto has pre-compiled schema for eg.. +# * https://capnproto.org/language.html +# * http://capnproto.github.io/pycapnp/quickstart.html +# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp +class MsgCodec(Struct): + ''' + A IPC msg interchange format lib's encoder + decoder pair. + + ''' + + lib: ModuleType = msgspec + + # ad-hoc type extensions + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + enc_hook: Callable[[Any], Any]|None = None # coder + dec_hook: Callable[[type, Any], Any]|None = None # decoder + + # struct type unions + # https://jcristharif.com/msgspec/structs.html#tagged-unions + types: Union[Type[Struct]]|Any = Any + + # post-configure cached props + _enc: msgpack.Encoder|None = None + _dec: msgpack.Decoder|None = None + + + # TODO: use `functools.cached_property` for these ? + # https://docs.python.org/3/library/functools.html#functools.cached_property + @property + def enc(self) -> msgpack.Encoder: + return self._enc or self.encoder() + + def encoder( + self, + enc_hook: Callable|None = None, + reset: bool = False, + + # TODO: what's the default for this? + # write_buffer_size: int + **kwargs, + + ) -> msgpack.Encoder: + ''' + Set or get the maybe-cached `msgspec.msgpack.Encoder` + instance configured for this codec. + + When `reset=True` any previously configured encoder will + be recreated and then cached with the new settings passed + as input. + + ''' + if ( + self._enc is None + or reset + ): + self._enc = self.lib.msgpack.Encoder( + enc_hook=enc_hook or self.enc_hook, + # write_buffer_size=write_buffer_size, + ) + + return self._enc + + def encode( + self, + py_obj: Any, + + ) -> bytes: + ''' + Encode input python objects to `msgpack` bytes for transfer + on a tranport protocol connection. + + ''' + return self.enc.encode(py_obj) + + @property + def dec(self) -> msgpack.Decoder: + return self._dec or self.decoder() + + def decoder( + self, + types: Union[Type[Struct]]|None = None, + dec_hook: Callable|None = None, + reset: bool = False, + **kwargs, + # ext_hook: ext_hook_sig + + ) -> msgpack.Decoder: + ''' + Set or get the maybe-cached `msgspec.msgpack.Decoder` + instance configured for this codec. + + When `reset=True` any previously configured decoder will + be recreated and then cached with the new settings passed + as input. + + ''' + if ( + self._dec is None + or reset + ): + self._dec = self.lib.msgpack.Decoder( + types or self.types, + dec_hook=dec_hook or self.dec_hook, + **kwargs, + ) + + return self._dec + + def decode( + self, + msg: bytes, + ) -> Any: + ''' + Decode received `msgpack` bytes into a local python object + with special `msgspec.Struct` (or other type) handling + determined by the + + ''' + + return self.dec.decode(msg) + + +# TODO: struct aware messaging coders as per: +# - https://github.com/goodboy/tractor/issues/36 +# - https://github.com/goodboy/tractor/issues/196 +# - https://github.com/goodboy/tractor/issues/365 + +def mk_codec( + libname: str = 'msgspec', + + # struct type unions set for `Decoder` + # https://jcristharif.com/msgspec/structs.html#tagged-unions + dec_types: Union[Type[Struct]]|Any = Any, + + cache_now: bool = True, + + # proxy to the `Struct.__init__()` + **kwargs, + +) -> MsgCodec: + ''' + Convenience factory for creating codecs eventually meant + to be interchange lib agnostic (i.e. once we support more then just + `msgspec` ;). + + ''' + codec = MsgCodec( + types=dec_types, + **kwargs, + ) + assert codec.lib.__name__ == libname + + # by default config and cache the codec pair for given + # input settings. + if cache_now: + assert codec.enc + assert codec.dec + + return codec + + +# instance of the default `msgspec.msgpack` codec settings, i.e. +# no custom structs, hooks or other special types. +_def_msgspec_codec: MsgCodec = mk_codec() + +# NOTE: provides for per-`trio.Task` specificity of the +# IPC msging codec used by the transport layer when doing +# `Channel.send()/.recv()` of wire data. +_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( + 'msgspec_codec', + default=_def_msgspec_codec, +) + + +@cm +def apply_codec( + codec: MsgCodec, + +) -> MsgCodec: + ''' + Dynamically apply a `MsgCodec` to the current task's + runtime context such that all IPC msgs are processed + with it for that task. + + ''' + token: Token = _ctxvar_MsgCodec.set(codec) + try: + yield _ctxvar_MsgCodec.get() + finally: + _ctxvar_MsgCodec.reset(token) + + +def current_msgspec_codec() -> MsgCodec: + ''' + Return the current `trio.Task.context`'s value + for `msgspec_codec` used by `Channel.send/.recv()` + for wire serialization. + + ''' + return _ctxvar_MsgCodec.get()