Proto `MsgCodec`, an interchange fmt modify API

Fitting in line with the issues outstanding:
- #36: (msg)spec-ing out our SCIPP (structured-con-inter-proc-prot).
  (https://github.com/goodboy/tractor/issues/36)

- #196: adding strictly typed IPC msg dialog schemas, more or less
  better described as "dialog/transaction scoped message specs"
  using `msgspec`'s tagged unions and custom codecs.
  (https://github.com/goodboy/tractor/issues/196)

- #365: using modern static type-annots to drive capability based
  messaging and RPC.
  (statically https://github.com/goodboy/tractor/issues/365)

This is a first draft of a new API for dynamically overriding IPC msg
codecs for a given interchange lib from any task in the runtime. Right
now we obviously only support `msgspec` but ideally this API holds
general enough to be used for other backends eventually (like
`capnproto`, and apache arrow).

Impl is in a new `tractor.msg._codec` with:
- a new `MsgCodec` type for encapsing `msgspec.msgpack.Encoder/Decoder`
  pairs and configuring any custom enc/dec_hooks or typed decoding.
- factory `mk_codec()` for creating new codecs ad-hoc from a task.
- `contextvars` support for a new `trio.Task` scoped
  `_ctxvar_MsgCodec: ContextVar[MsgCodec]` named 'msgspec_codec'.
- `apply_codec()` for temporarily modifying the above per task
  as needed around `.open_context()` / `.open_stream()` operation.

A new test (suite) in `test_caps_msging.py`:
- verify a parent and its child can enable the same custom codec (in
  this case to transmit `NamespacePath`s) with tons of pedantic ctx-vars
  checks.
- ToDo: still need to implement #36 msg types in order to be able to get
  decodes working (as in `MsgStream.receive()` will deliver an already
  created `NamespacePath` obj) since currently all msgs come packed in `dict`-msg
  wrapper packets..
  -> use the proto from PR #35 to get nested `msgspec.Raw` processing up
  and running Bo
old_msg_types
Tyler Goodlet 2024-03-26 15:50:47 -04:00
parent 496dce57a8
commit 0a69829ec5
4 changed files with 519 additions and 16 deletions

View File

@ -0,0 +1,198 @@
'''
Functional audits for our "capability based messaging (schema)" feats.
B~)
'''
from typing import (
Any,
Type,
)
from contextvars import (
Context,
)
import tractor
from tractor.msg import (
_def_msgspec_codec,
_ctxvar_MsgCodec,
NamespacePath,
MsgCodec,
mk_codec,
apply_codec,
current_msgspec_codec,
)
import trio
# TODO: wrap these into `._codec` such that user can just pass
# a type table of some sort?
def enc_hook(obj: Any) -> Any:
if isinstance(obj, NamespacePath):
return str(obj)
else:
raise NotImplementedError(
f'Objects of type {type(obj)} are not supported'
)
def dec_hook(type: Type, obj: Any) -> Any:
print(f'type is: {type}')
if type is NamespacePath:
return NamespacePath(obj)
else:
raise NotImplementedError(
f'Objects of type {type(obj)} are not supported'
)
def ex_func(*args):
print(f'ex_func({args})')
def mk_custom_codec() -> MsgCodec:
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
nsp_codec: MsgCodec = mk_codec(
dec_types=NamespacePath,
enc_hook=enc_hook,
dec_hook=dec_hook,
)
# TODO: validate `MsgCodec` interface/semantics?
# -[ ] simple field tests to ensure caching + reset is workin?
# -[ ] custom / changing `.decoder()` calls?
#
# dec = nsp_codec.decoder(
# types=NamespacePath,
# )
# assert nsp_codec.dec is dec
return nsp_codec
@tractor.context
async def send_back_nsp(
ctx: tractor.Context,
) -> None:
'''
Setup up a custom codec to load instances of `NamespacePath`
and ensure we can round trip a func ref with our parent.
'''
task: trio.Task = trio.lowlevel.current_task()
task_ctx: Context = task.context
assert _ctxvar_MsgCodec not in task_ctx
nsp_codec: MsgCodec = mk_custom_codec()
with apply_codec(nsp_codec) as codec:
chk_codec_applied(
custom_codec=nsp_codec,
enter_value=codec,
)
nsp = NamespacePath.from_ref(ex_func)
await ctx.started(nsp)
async with ctx.open_stream() as ipc:
async for msg in ipc:
assert msg == f'{__name__}:ex_func'
# TODO: as per below
# assert isinstance(msg, NamespacePath)
assert isinstance(msg, str)
def chk_codec_applied(
custom_codec: MsgCodec,
enter_value: MsgCodec,
) -> MsgCodec:
task: trio.Task = trio.lowlevel.current_task()
task_ctx: Context = task.context
assert _ctxvar_MsgCodec in task_ctx
curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
assert (
# returned from `mk_codec()`
custom_codec is
# yielded value from `apply_codec()`
enter_value is
# read from current task's `contextvars.Context`
curr_codec is
# public API for all of the above
current_msgspec_codec()
# the default `msgspec` settings
is not _def_msgspec_codec
)
def test_codec_hooks_mod():
'''
Audit the `.msg.MsgCodec` override apis details given our impl
uses `contextvars` to accomplish per `trio` task codec
application around an inter-proc-task-comms context.
'''
async def main():
task: trio.Task = trio.lowlevel.current_task()
task_ctx: Context = task.context
assert _ctxvar_MsgCodec not in task_ctx
async with tractor.open_nursery() as an:
p: tractor.Portal = await an.start_actor(
'sub',
enable_modules=[__name__],
)
# TODO: 2 cases:
# - codec not modified -> decode nsp as `str`
# - codec modified with hooks -> decode nsp as
# `NamespacePath`
nsp_codec: MsgCodec = mk_custom_codec()
with apply_codec(nsp_codec) as codec:
chk_codec_applied(
custom_codec=nsp_codec,
enter_value=codec,
)
async with (
p.open_context(
send_back_nsp,
) as (ctx, first),
ctx.open_stream() as ipc,
):
# ensure codec is still applied across
# `tractor.Context` + its embedded nursery.
chk_codec_applied(
custom_codec=nsp_codec,
enter_value=codec,
)
assert first == f'{__name__}:ex_func'
# TODO: actually get the decoder loading
# to native once we spec our SCIPP msgspec
# (structurred-conc-inter-proc-protocol)
# implemented as per,
# https://github.com/goodboy/tractor/issues/36
#
# assert isinstance(first, NamespacePath)
assert isinstance(first, str)
await ipc.send(first)
with trio.move_on_after(1):
async for msg in ipc:
# TODO: as per above
# assert isinstance(msg, NamespacePath)
assert isinstance(msg, str)
await p.cancel_actor()
trio.run(main)

View File

@ -23,7 +23,10 @@ from collections.abc import (
AsyncGenerator, AsyncGenerator,
AsyncIterator, AsyncIterator,
) )
from contextlib import asynccontextmanager as acm from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
import platform import platform
from pprint import pformat from pprint import pformat
import struct import struct
@ -37,12 +40,15 @@ from typing import (
TypeVar, TypeVar,
) )
import msgspec
from tricycle import BufferedReceiveStream from tricycle import BufferedReceiveStream
import trio import trio
from tractor.log import get_logger from tractor.log import get_logger
from tractor._exceptions import TransportClosed from tractor._exceptions import TransportClosed
from tractor.msg import (
_ctxvar_MsgCodec,
MsgCodec,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -154,13 +160,9 @@ class MsgpackTCPStream(MsgTransport):
) )
self.prefix_size = prefix_size self.prefix_size = prefix_size
# TODO: struct aware messaging coders # allow for custom IPC msg interchange format
self.encode = msgspec.msgpack.Encoder( # dynamic override Bo
enc_hook=codec[0] if codec else None, self.codec: MsgCodec = codec or MsgCodec()
).encode
self.decode = msgspec.msgpack.Decoder(
dec_hook=codec[1] if codec else None,
).decode
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
'''Yield packets from the underlying stream. '''Yield packets from the underlying stream.
@ -199,7 +201,23 @@ class MsgpackTCPStream(MsgTransport):
log.transport(f"received {msg_bytes}") # type: ignore log.transport(f"received {msg_bytes}") # type: ignore
try: try:
yield self.decode(msg_bytes) # NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
yield _ctxvar_MsgCodec.get().decode(msg_bytes)
# TODO: remove, was only for orig draft impl
# testing.
#
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
# obj = curr_codec.decode(msg_bytes)
# if (
# curr_codec is not
# _codec._def_msgspec_codec
# ):
# print(f'OBJ: {obj}\n')
#
# yield obj
except ( except (
msgspec.DecodeError, msgspec.DecodeError,
UnicodeDecodeError, UnicodeDecodeError,
@ -235,7 +253,10 @@ class MsgpackTCPStream(MsgTransport):
# __tracebackhide__: bool = hide_tb # __tracebackhide__: bool = hide_tb
async with self._send_lock: async with self._send_lock:
bytes_data: bytes = self.encode(msg) # NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
bytes_data: bytes = _ctxvar_MsgCodec.get().encode(msg)
# bytes_data: bytes = self.codec.encode(msg)
# supposedly the fastest says, # supposedly the fastest says,
# https://stackoverflow.com/a/54027962 # https://stackoverflow.com/a/54027962
@ -335,7 +356,9 @@ class Channel:
@property @property
def msgstream(self) -> MsgTransport: def msgstream(self) -> MsgTransport:
log.info('`Channel.msgstream` is an old name, use `._transport`') log.info(
'`Channel.msgstream` is an old name, use `._transport`'
)
return self._transport return self._transport
@property @property
@ -368,10 +391,7 @@ class Channel:
# XXX optionally provided codec pair for `msgspec`: # XXX optionally provided codec pair for `msgspec`:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
codec: tuple[ codec: MsgCodec|None = None,
Callable[[Any], Any], # coder
Callable[[type, Any], Any], # decoder
]|None = None,
) -> MsgTransport: ) -> MsgTransport:
type_key = ( type_key = (
@ -379,14 +399,36 @@ class Channel:
or or
self._transport_key self._transport_key
) )
# get transport type, then
self._transport = get_msg_transport( self._transport = get_msg_transport(
type_key type_key
# instantiate an instance of the msg-transport
)( )(
stream, stream,
codec=codec, codec=codec,
) )
return self._transport return self._transport
# TODO: something simliar at the IPC-`Context`
# level so as to support
@cm
def apply_codec(
self,
codec: MsgCodec,
) -> None:
'''
Temporarily override the underlying IPC msg codec for
dynamic enforcement of messaging schema.
'''
orig: MsgCodec = self._transport.codec
try:
self._transport.codec = codec
yield
finally:
self._transport.codec = orig
def __repr__(self) -> str: def __repr__(self) -> str:
if not self._transport: if not self._transport:
return '<Channel with inactive transport?>' return '<Channel with inactive transport?>'

View File

@ -24,3 +24,13 @@ from .ptr import (
from .types import ( from .types import (
Struct as Struct, Struct as Struct,
) )
from ._codec import (
_def_msgspec_codec as _def_msgspec_codec,
_ctxvar_MsgCodec as _ctxvar_MsgCodec,
apply_codec as apply_codec,
mk_codec as mk_codec,
MsgCodec as MsgCodec,
current_msgspec_codec as current_msgspec_codec,
)

View File

@ -0,0 +1,253 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
IPC msg interchange codec management.
Supported backend libs:
- `msgspec.msgpack`
ToDo: backends we prolly should offer:
- see project/lib list throughout GH issue discussion comments:
https://github.com/goodboy/tractor/issues/196
- `capnproto`: https://capnproto.org/rpc.html
- https://capnproto.org/language.html#language-reference
'''
from contextvars import (
ContextVar,
Token,
)
from contextlib import (
contextmanager as cm,
)
from typing import (
Any,
Callable,
Type,
Union,
)
from types import ModuleType
import msgspec
from msgspec import msgpack
from .types import Struct
# TODO: API changes towards being interchange lib agnostic!
# -[ ] capnproto has pre-compiled schema for eg..
# * https://capnproto.org/language.html
# * http://capnproto.github.io/pycapnp/quickstart.html
# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp
class MsgCodec(Struct):
'''
A IPC msg interchange format lib's encoder + decoder pair.
'''
lib: ModuleType = msgspec
# ad-hoc type extensions
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
enc_hook: Callable[[Any], Any]|None = None # coder
dec_hook: Callable[[type, Any], Any]|None = None # decoder
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
types: Union[Type[Struct]]|Any = Any
# post-configure cached props
_enc: msgpack.Encoder|None = None
_dec: msgpack.Decoder|None = None
# TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property
@property
def enc(self) -> msgpack.Encoder:
return self._enc or self.encoder()
def encoder(
self,
enc_hook: Callable|None = None,
reset: bool = False,
# TODO: what's the default for this?
# write_buffer_size: int
**kwargs,
) -> msgpack.Encoder:
'''
Set or get the maybe-cached `msgspec.msgpack.Encoder`
instance configured for this codec.
When `reset=True` any previously configured encoder will
be recreated and then cached with the new settings passed
as input.
'''
if (
self._enc is None
or reset
):
self._enc = self.lib.msgpack.Encoder(
enc_hook=enc_hook or self.enc_hook,
# write_buffer_size=write_buffer_size,
)
return self._enc
def encode(
self,
py_obj: Any,
) -> bytes:
'''
Encode input python objects to `msgpack` bytes for transfer
on a tranport protocol connection.
'''
return self.enc.encode(py_obj)
@property
def dec(self) -> msgpack.Decoder:
return self._dec or self.decoder()
def decoder(
self,
types: Union[Type[Struct]]|None = None,
dec_hook: Callable|None = None,
reset: bool = False,
**kwargs,
# ext_hook: ext_hook_sig
) -> msgpack.Decoder:
'''
Set or get the maybe-cached `msgspec.msgpack.Decoder`
instance configured for this codec.
When `reset=True` any previously configured decoder will
be recreated and then cached with the new settings passed
as input.
'''
if (
self._dec is None
or reset
):
self._dec = self.lib.msgpack.Decoder(
types or self.types,
dec_hook=dec_hook or self.dec_hook,
**kwargs,
)
return self._dec
def decode(
self,
msg: bytes,
) -> Any:
'''
Decode received `msgpack` bytes into a local python object
with special `msgspec.Struct` (or other type) handling
determined by the
'''
return self.dec.decode(msg)
# TODO: struct aware messaging coders as per:
# - https://github.com/goodboy/tractor/issues/36
# - https://github.com/goodboy/tractor/issues/196
# - https://github.com/goodboy/tractor/issues/365
def mk_codec(
libname: str = 'msgspec',
# struct type unions set for `Decoder`
# https://jcristharif.com/msgspec/structs.html#tagged-unions
dec_types: Union[Type[Struct]]|Any = Any,
cache_now: bool = True,
# proxy to the `Struct.__init__()`
**kwargs,
) -> MsgCodec:
'''
Convenience factory for creating codecs eventually meant
to be interchange lib agnostic (i.e. once we support more then just
`msgspec` ;).
'''
codec = MsgCodec(
types=dec_types,
**kwargs,
)
assert codec.lib.__name__ == libname
# by default config and cache the codec pair for given
# input settings.
if cache_now:
assert codec.enc
assert codec.dec
return codec
# instance of the default `msgspec.msgpack` codec settings, i.e.
# no custom structs, hooks or other special types.
_def_msgspec_codec: MsgCodec = mk_codec()
# NOTE: provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing
# `Channel.send()/.recv()` of wire data.
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec',
default=_def_msgspec_codec,
)
@cm
def apply_codec(
codec: MsgCodec,
) -> MsgCodec:
'''
Dynamically apply a `MsgCodec` to the current task's
runtime context such that all IPC msgs are processed
with it for that task.
'''
token: Token = _ctxvar_MsgCodec.set(codec)
try:
yield _ctxvar_MsgCodec.get()
finally:
_ctxvar_MsgCodec.reset(token)
def current_msgspec_codec() -> MsgCodec:
'''
Return the current `trio.Task.context`'s value
for `msgspec_codec` used by `Channel.send/.recv()`
for wire serialization.
'''
return _ctxvar_MsgCodec.get()