Compare commits

...

5 Commits

Author SHA1 Message Date
Tyler Goodlet cb728e3bd6 Be mega pedantic with msg-spec building
Turns out the generics based payload speccing API, as in
https://jcristharif.com/msgspec/supported-types.html#generic-types,
DOES WORK properly as long as we don't rely on inheritance from `Msg`
a parent `Generic`..

So let's get real pedantic in the `mk_msg_spec()` internals as well as
verification in the test suite!

Fixes in `.msg.types`:
- implement (as part of tinker testing) multiple spec union building
  methods via a `spec_build_method: str` to `mk_msg_spec()` and leave a
  buncha notes around what did and didn't work:
  - 'indexed_generics' is the only method THAT WORKS and the one that
    you'd expect being closest to the `msgspec` docs (link above).
  - 'defstruct' using dynamically defined msgs => doesn't work!
  - 'types_new_class' using dynamically defined msgs but with
    `types.new_clas()` => ALSO doesn't work..

- explicitly separate the `.pld` type-constrainable by user code msg
  set into `types._payload_spec_msgs` putting the others in
  a `types._runtime_spec_msgs` and the full set defined as `.__spec__`
  (moving it out of the pkg-mod and back to `.types` as well).

- for the `_payload_spec_msgs` msgs manually make them inherit `Generic[PayloadT]`
  and (redunantly) define a `.pld: PayloadT` field.

- make `IpcCtxSpec.functype` an in line `Literal`.

- toss in some TODO notes about choosing a better `Msg.cid` type.

Fixes/tweaks around `.msg._codec`:
- rename `MsgCodec.ipc/pld_msg_spec` -> `.msg/pld_spec`
- make `._enc/._dec` non optional fields
- wow, ^facepalm^ , make sure `._ipc.MsgpackTCPStream.__init__()` uses
  `mk_codec()` since `MsgCodec` can't be (easily) constructed directly.

Get more detailed in testing:
- inside the `chk_pld_type()` helper ensure `roundtrip` is always set to
  some value, `None` by default but a bool depending on legit outcome.
  - drop input `generic`; no longer used.
  - drop the masked `typedef` loop from `Msg.__subclasses__()`.
  - for add an `expect_roundtrip: bool` and use to jump into debugger
    when any expectation doesn't match the outcome.
- use new `MsgCodec` field names (as per first section above).
- ensure the encoded msg matches the decoded one from both the ad-hoc
  decoder and codec loaded values.
- ensure the pld checking is only applied to msgs in the
  `types._payload_spec_msgs` set by `typef.__name__` filtering
  since `mk_msg_spec()` now returns the full `.types.Msg` set.
2024-03-29 19:15:00 -04:00
Tyler Goodlet fb8196e354 Tweak msging tests to match codec api changes
Mostly adjusting input args/logic to various spec/codec signatures and
new runtime semantics:

- `test_msg_spec_xor_pld_spec()` to verify that a shuttle prot spec and
  payload spec are necessarily mutex and that `mk_codec()` enforces it.
- switch to `ipc_msg_spec` input in `mk_custom_codec()` helper.
- drop buncha commented cruft from `test_limit_msgspec()` including no
  longer needed type union instance checks in dunder attributes.
2024-03-29 13:48:08 -04:00
Tyler Goodlet b6ed26589a Drop `MsgCodec.decoder()/.encoder()` design
Instead just instantiate `msgpack.Encoder/Decoder` instances inside
`mk_codec()` and assign them directly as `._enc/._dec` fields.
Explicitly take in named-args to both and proxy to the coder/decoder
instantiation calls directly.

Shuffling some codec internals:
- rename `mk_codec()` inputs as `ipc_msg_spec` and `ipc_pld_spec`, make
  them mutex such that a payload type spec can't be passed if the
  built-in msg-spec isn't used.
  => expose `MsgCodec.ipc_pld_spec` directly from `._dec.type`
  => presume input `ipc_msg_spec` is `Any` by default when no
    `ipc_pld_spec` is passed since we have no way atm to enable
    a similar type-restricted-payload feature without a wrapping
    "shuttle protocol" ;)

- move all the payload-sub-decoders stuff prototyped in GH#311
  (inside `.types`) to `._codec` as commented-for-later-maybe `MsgCodec`
  methods including:
  - `.mk_pld_subdec()` for registering
  - `.enc/dec_payload()` for sub-codec field loading.

- also comment out `._codec.mk_tagged_union_dec()` as the orig
  tag-to-decoder table factory, now mostly superseded by
  `.types.mk_msg_spec()` which takes the generic parameterizing approach
  instead.

- change naming to `types.mk_msg_spec(payload_type_union)` input, making
  it more explicit that it expects a `Union[Type]`.

Oh right, and start exposing all the `.types.Msg` subtypes in the `.msg`
subpkg in prep for usage throughout the runtime B)
2024-03-29 12:46:59 -04:00
Tyler Goodlet 8ff18739be Change to multi-line-static-`dict` style msgs
Re-arranging such that element-orders are line-arranged to our new
IPC `.msg.types.Msg` fields spec in prep for replacing the current
`dict`-as-msg impls with the `msgspec.Struct` native versions!
2024-03-28 13:08:18 -04:00
Tyler Goodlet 456979dd12 Tweak msg-spec test suite mod name 2024-03-28 13:07:03 -04:00
7 changed files with 607 additions and 374 deletions

View File

@ -1,5 +1,6 @@
''' '''
Functional audits for our "capability based messaging (schema)" feats. Low-level functional audits for our
"capability based messaging"-spec feats.
B~) B~)
@ -22,6 +23,7 @@ from msgspec import (
Struct, Struct,
ValidationError, ValidationError,
) )
import pytest
import tractor import tractor
from tractor.msg import ( from tractor.msg import (
_def_msgspec_codec, _def_msgspec_codec,
@ -33,14 +35,31 @@ from tractor.msg import (
apply_codec, apply_codec,
current_msgspec_codec, current_msgspec_codec,
) )
from tractor.msg import types
from tractor.msg.types import ( from tractor.msg.types import (
PayloadT, # PayloadT,
Msg, Msg,
# Started, # Started,
mk_msg_spec, mk_msg_spec,
) )
import trio import trio
def test_msg_spec_xor_pld_spec():
'''
If the `.msg.types.Msg`-set is overridden, we
can't also support a `Msg.pld` spec.
'''
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
with pytest.raises(RuntimeError):
mk_codec(
ipc_msg_spec=Any,
ipc_pld_spec=NamespacePath,
)
# TODO: wrap these into `._codec` such that user can just pass # TODO: wrap these into `._codec` such that user can just pass
# a type table of some sort? # a type table of some sort?
def enc_hook(obj: Any) -> Any: def enc_hook(obj: Any) -> Any:
@ -66,11 +85,13 @@ def ex_func(*args):
print(f'ex_func({args})') print(f'ex_func({args})')
def mk_custom_codec() -> MsgCodec: def mk_custom_codec(
ipc_msg_spec: Type[Any] = Any,
) -> MsgCodec:
# apply custom hooks and set a `Decoder` which only # apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types. # loads `NamespacePath` types.
nsp_codec: MsgCodec = mk_codec( nsp_codec: MsgCodec = mk_codec(
ipc_msg_spec=NamespacePath, ipc_msg_spec=ipc_msg_spec,
enc_hook=enc_hook, enc_hook=enc_hook,
dec_hook=dec_hook, dec_hook=dec_hook,
) )
@ -215,113 +236,152 @@ def test_codec_hooks_mod():
def chk_pld_type( def chk_pld_type(
generic: Msg|_GenericAlias, payload_spec: Type[Struct]|Any,
payload_type: Type[Struct]|Any,
pld: Any, pld: Any,
expect_roundtrip: bool|None = None,
) -> bool: ) -> bool:
roundtrip: bool = False
pld_val_type: Type = type(pld) pld_val_type: Type = type(pld)
# gen_paramed: _GenericAlias = generic[payload_type]
# TODO: verify that the overridden subtypes # TODO: verify that the overridden subtypes
# DO NOT have modified type-annots from original! # DO NOT have modified type-annots from original!
# 'Start', .pld: FuncSpec # 'Start', .pld: FuncSpec
# 'StartAck', .pld: IpcCtxSpec # 'StartAck', .pld: IpcCtxSpec
# 'Stop', .pld: UNSEt # 'Stop', .pld: UNSEt
# 'Error', .pld: ErrorData # 'Error', .pld: ErrorData
# for typedef in (
# [gen_paramed]
# +
# # type-var should always be set for these sub-types codec: MsgCodec = mk_codec(
# # as well! # NOTE: this ONLY accepts `Msg.pld` fields of a specified
# Msg.__subclasses__() # type union.
# ): ipc_pld_spec=payload_spec,
# if typedef.__name__ not in [ )
# 'Msg',
# 'Started',
# 'Yield',
# 'Return',
# ]:
# continue
# payload_type: Type[Struct] = CustomPayload
# TODO: can remove all this right!? # make a one-off dec to compare with our `MsgCodec` instance
# # which does the below `mk_msg_spec()` call internally
# when parameterized (like `Msg[Any]`) then ipc_msg_spec: Union[Type[Struct]]
# we expect an alias as input. msg_types: list[Msg[payload_spec]]
# if isinstance(generic, _GenericAlias):
# assert payload_type in generic.__args__
# else:
# assert PayloadType in generic.__parameters__
# pld_param: Parameter = generic.__signature__.parameters['pld']
# assert pld_param.annotation is PayloadType
type_spec: Union[Type[Struct]]
msg_types: list[Msg[payload_type]]
( (
type_spec, ipc_msg_spec,
msg_types, msg_types,
) = mk_msg_spec( ) = mk_msg_spec(
payload_type=payload_type, payload_type_union=payload_spec,
) )
enc = msgpack.Encoder() _enc = msgpack.Encoder()
dec = msgpack.Decoder( _dec = msgpack.Decoder(
type=type_spec, # like `Msg[Any]` type=ipc_msg_spec or Any, # like `Msg[Any]`
)
assert (
payload_spec
==
codec.pld_spec
)
# assert codec.dec == dec
#
# ^-XXX-^ not sure why these aren't "equal" but when cast
# to `str` they seem to match ?? .. kk
assert (
str(ipc_msg_spec)
==
str(codec.msg_spec)
==
str(_dec.type)
==
str(codec.dec.type)
) )
# verify the boxed-type for all variable payload-type msgs. # verify the boxed-type for all variable payload-type msgs.
if not msg_types:
breakpoint()
roundtrip: bool|None = None
pld_spec_msg_names: list[str] = [
td.__name__ for td in types._payload_spec_msgs
]
for typedef in msg_types: for typedef in msg_types:
skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names
if skip_runtime_msg:
continue
pld_field = structs.fields(typedef)[1] pld_field = structs.fields(typedef)[1]
assert pld_field.type in {payload_type, PayloadT} assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere?
# TODO: does this need to work to get all subtypes to
# adhere?
assert pld_field.type is payload_type
kwargs: dict[str, Any] = { kwargs: dict[str, Any] = {
'cid': '666', 'cid': '666',
'pld': pld, 'pld': pld,
} }
enc_msg = typedef(**kwargs) enc_msg: Msg = typedef(**kwargs)
wire_bytes: bytes = enc.encode(enc_msg) _wire_bytes: bytes = _enc.encode(enc_msg)
wire_bytes: bytes = codec.enc.encode(enc_msg)
assert _wire_bytes == wire_bytes
ve: ValidationError|None = None
try: try:
dec_msg = dec.decode(wire_bytes) dec_msg = codec.dec.decode(wire_bytes)
assert dec_msg.pld == pld _dec_msg = _dec.decode(wire_bytes)
assert (roundtrip := (dec_msg == enc_msg))
except ValidationError as ve: # decoded msg and thus payload should be exactly same!
# breakpoint() assert (roundtrip := (
if pld_val_type is payload_type: _dec_msg
==
dec_msg
==
enc_msg
))
if (
expect_roundtrip is not None
and expect_roundtrip != roundtrip
):
breakpoint()
assert (
pld
==
dec_msg.pld
==
enc_msg.pld
)
# assert (roundtrip := (_dec_msg == enc_msg))
except ValidationError as _ve:
ve = _ve
roundtrip: bool = False
if pld_val_type is payload_spec:
raise ValueError( raise ValueError(
'Got `ValidationError` despite type-var match!?\n' 'Got `ValidationError` despite type-var match!?\n'
f'pld_val_type: {pld_val_type}\n' f'pld_val_type: {pld_val_type}\n'
f'payload_type: {payload_type}\n' f'payload_type: {payload_spec}\n'
) from ve ) from ve
else: else:
# ow we good cuz the pld spec mismatched. # ow we good cuz the pld spec mismatched.
print( print(
'Got expected `ValidationError` since,\n' 'Got expected `ValidationError` since,\n'
f'{pld_val_type} is not {payload_type}\n' f'{pld_val_type} is not {payload_spec}\n'
) )
else: else:
if ( if (
pld_val_type is not payload_type payload_spec is not Any
and payload_type is not Any and
pld_val_type is not payload_spec
): ):
raise ValueError( raise ValueError(
'DID NOT `ValidationError` despite expected type match!?\n' 'DID NOT `ValidationError` despite expected type match!?\n'
f'pld_val_type: {pld_val_type}\n' f'pld_val_type: {pld_val_type}\n'
f'payload_type: {payload_type}\n' f'payload_type: {payload_spec}\n'
) )
return roundtrip # full code decode should always be attempted!
if roundtrip is None:
breakpoint()
return roundtrip
def test_limit_msgspec(): def test_limit_msgspec():
@ -333,9 +393,10 @@ def test_limit_msgspec():
# ensure we can round-trip a boxing `Msg` # ensure we can round-trip a boxing `Msg`
assert chk_pld_type( assert chk_pld_type(
Msg, # Msg,
Any, Any,
None, None,
expect_roundtrip=True,
) )
# TODO: don't need this any more right since # TODO: don't need this any more right since
@ -347,7 +408,7 @@ def test_limit_msgspec():
# verify that a mis-typed payload value won't decode # verify that a mis-typed payload value won't decode
assert not chk_pld_type( assert not chk_pld_type(
Msg, # Msg,
int, int,
pld='doggy', pld='doggy',
) )
@ -360,13 +421,13 @@ def test_limit_msgspec():
value: Any value: Any
assert not chk_pld_type( assert not chk_pld_type(
Msg, # Msg,
CustomPayload, CustomPayload,
pld='doggy', pld='doggy',
) )
assert chk_pld_type( assert chk_pld_type(
Msg, # Msg,
CustomPayload, CustomPayload,
pld=CustomPayload(name='doggy', value='urmom') pld=CustomPayload(name='doggy', value='urmom')
) )

View File

@ -536,7 +536,9 @@ def pack_error(
# content's `.msgdata`). # content's `.msgdata`).
error_msg['tb_str'] = tb_str error_msg['tb_str'] = tb_str
pkt: dict = {'error': error_msg} pkt: dict = {
'error': error_msg,
}
if cid: if cid:
pkt['cid'] = cid pkt['cid'] = cid

View File

@ -48,6 +48,7 @@ from tractor._exceptions import TransportClosed
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
MsgCodec, MsgCodec,
mk_codec,
) )
log = get_logger(__name__) log = get_logger(__name__)
@ -162,7 +163,7 @@ class MsgpackTCPStream(MsgTransport):
# allow for custom IPC msg interchange format # allow for custom IPC msg interchange format
# dynamic override Bo # dynamic override Bo
self.codec: MsgCodec = codec or MsgCodec() self.codec: MsgCodec = codec or mk_codec()
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
''' '''

View File

@ -89,7 +89,10 @@ async def _invoke_non_context(
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({
'cid': cid,
'functype': 'asyncgen',
})
# XXX: massive gotcha! If the containing scope # XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line, # is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be # any ``ActorNursery.__aexit__()`` WON'T be
@ -109,18 +112,27 @@ async def _invoke_non_context(
# to_send = await chan.recv_nowait() # to_send = await chan.recv_nowait()
# if to_send is not None: # if to_send is not None:
# to_yield = await coro.asend(to_send) # to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid}) await chan.send({
'yield': item,
'cid': cid,
})
log.runtime(f"Finished iterating {coro}") log.runtime(f"Finished iterating {coro}")
# TODO: we should really support a proper # TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final # `StopAsyncIteration` system here for returning a final
# value if desired # value if desired
await chan.send({'stop': True, 'cid': cid}) await chan.send({
'stop': True,
'cid': cid,
})
# one way @stream func that gets treated like an async gen # one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen: elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({
'cid': cid,
'functype': 'asyncgen',
})
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must # back values like an async-generator would but must
# manualy construct the response dict-packet-responses as # manualy construct the response dict-packet-responses as
@ -133,7 +145,10 @@ async def _invoke_non_context(
if not cs.cancelled_caught: if not cs.cancelled_caught:
# task was not cancelled so we can instruct the # task was not cancelled so we can instruct the
# far end async gen to tear down # far end async gen to tear down
await chan.send({'stop': True, 'cid': cid}) await chan.send({
'stop': True,
'cid': cid
})
else: else:
# regular async function/method # regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()` # XXX: possibly just a scheduled `Actor._cancel_task()`
@ -182,10 +197,10 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
await chan.send( await chan.send({
{'return': result, 'return': result,
'cid': cid} 'cid': cid,
) })
except ( except (
BrokenPipeError, BrokenPipeError,
trio.BrokenResourceError, trio.BrokenResourceError,
@ -479,8 +494,8 @@ async def _invoke(
# "least sugary" type of RPC ep with support for # "least sugary" type of RPC ep with support for
# bi-dir streaming B) # bi-dir streaming B)
await chan.send({ await chan.send({
'cid': cid,
'functype': 'context', 'functype': 'context',
'cid': cid
}) })
# TODO: should we also use an `.open_context()` equiv # TODO: should we also use an `.open_context()` equiv

View File

@ -33,3 +33,24 @@ from ._codec import (
MsgCodec as MsgCodec, MsgCodec as MsgCodec,
current_msgspec_codec as current_msgspec_codec, current_msgspec_codec as current_msgspec_codec,
) )
from .types import (
Msg as Msg,
Start as Start, # with pld
FuncSpec as FuncSpec,
StartAck as StartAck, # with pld
IpcCtxSpec as IpcCtxSpec,
Started as Started,
Yield as Yield,
Stop as Stop,
Return as Return,
Error as Error, # with pld
ErrorData as ErrorData,
# full msg spec set
__spec__ as __spec__,
)

View File

@ -29,6 +29,7 @@ ToDo: backends we prolly should offer:
- https://capnproto.org/language.html#language-reference - https://capnproto.org/language.html#language-reference
''' '''
from __future__ import annotations
from contextvars import ( from contextvars import (
ContextVar, ContextVar,
Token, Token,
@ -54,18 +55,35 @@ from tractor.msg.types import (
) )
# TODO: API changes towards being interchange lib agnostic! # TODO: overall IPC msg-spec features (i.e. in this mod)!
# #
# -[ ] API changes towards being interchange lib agnostic!
# -[ ] capnproto has pre-compiled schema for eg.. # -[ ] capnproto has pre-compiled schema for eg..
# * https://capnproto.org/language.html # * https://capnproto.org/language.html
# * http://capnproto.github.io/pycapnp/quickstart.html # * http://capnproto.github.io/pycapnp/quickstart.html
# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp # * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp
# #
# -[ ] struct aware messaging coders as per:
# -[x] https://github.com/goodboy/tractor/issues/36
# -[ ] https://github.com/goodboy/tractor/issues/196
# -[ ] https://github.com/goodboy/tractor/issues/365
#
class MsgCodec(Struct): class MsgCodec(Struct):
''' '''
A IPC msg interchange format lib's encoder + decoder pair. A IPC msg interchange format lib's encoder + decoder pair.
''' '''
_enc: msgpack.Encoder
_dec: msgpack.Decoder
pld_spec: Union[Type[Struct]]|None
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
@property
def msg_spec(self) -> Union[Type[Struct]]:
return self._dec.type
lib: ModuleType = msgspec lib: ModuleType = msgspec
# ad-hoc type extensions # ad-hoc type extensions
@ -73,16 +91,8 @@ class MsgCodec(Struct):
enc_hook: Callable[[Any], Any]|None = None # coder enc_hook: Callable[[Any], Any]|None = None # coder
dec_hook: Callable[[type, Any], Any]|None = None # decoder dec_hook: Callable[[type, Any], Any]|None = None # decoder
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
ipc_msg_spec: Union[Type[Struct]]|Any = Any
payload_msg_spec: Union[Type[Struct]] = Any
# post-configure cached props
_enc: msgpack.Encoder|None = None
_dec: msgpack.Decoder|None = None
# TODO: a sub-decoder system as well? # TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types` # see related comments in `.msg.types`
# _payload_decs: ( # _payload_decs: (
# dict[ # dict[
@ -91,42 +101,18 @@ class MsgCodec(Struct):
# ] # ]
# |None # |None
# ) = None # ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `Msg.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# TODO: use `functools.cached_property` for these ? # TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property # https://docs.python.org/3/library/functools.html#functools.cached_property
@property @property
def enc(self) -> msgpack.Encoder: def enc(self) -> msgpack.Encoder:
return self._enc or self.encoder()
def encoder(
self,
enc_hook: Callable|None = None,
reset: bool = False,
# TODO: what's the default for this, and do we care?
# write_buffer_size: int
#
**kwargs,
) -> msgpack.Encoder:
'''
Set or get the maybe-cached `msgspec.msgpack.Encoder`
instance configured for this codec.
When `reset=True` any previously configured encoder will
be recreated and then cached with the new settings passed
as input.
'''
if (
self._enc is None
or reset
):
self._enc = self.lib.msgpack.Encoder(
enc_hook=enc_hook or self.enc_hook,
# write_buffer_size=write_buffer_size,
)
return self._enc return self._enc
def encode( def encode(
@ -139,40 +125,10 @@ class MsgCodec(Struct):
on a tranport protocol connection. on a tranport protocol connection.
''' '''
return self.enc.encode(py_obj) return self._enc.encode(py_obj)
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._dec or self.decoder()
def decoder(
self,
ipc_msg_spec: Union[Type[Struct]]|None = None,
dec_hook: Callable|None = None,
reset: bool = False,
**kwargs,
# ext_hook: ext_hook_sig
) -> msgpack.Decoder:
'''
Set or get the maybe-cached `msgspec.msgpack.Decoder`
instance configured for this codec.
When `reset=True` any previously configured decoder will
be recreated and then cached with the new settings passed
as input.
'''
if (
self._dec is None
or reset
):
self._dec = self.lib.msgpack.Decoder(
type=ipc_msg_spec or self.ipc_msg_spec,
dec_hook=dec_hook or self.dec_hook,
**kwargs,
)
return self._dec return self._dec
def decode( def decode(
@ -185,60 +141,181 @@ class MsgCodec(Struct):
determined by the determined by the
''' '''
return self.dec.decode(msg) # https://jcristharif.com/msgspec/usage.html#typed-decoding
return self._dec.decode(msg)
# TODO: do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives
# future grief?
#
# -[ ] <NEW-ISSUE-FOR-ThIS-HERE>
# -> https://jcristharif.com/msgspec/api.html#raw
#
#def mk_pld_subdec(
# self,
# payload_types: Union[Type[Struct]],
#) -> msgpack.Decoder:
# # TODO: sub-decoder suppor for `.pld: Raw`?
# # => see similar notes inside `.msg.types`..
# #
# # not sure we'll end up needing this though it might have
# # unforeseen advantages in terms of enabling encrypted
# # appliciation layer (only) payloads?
# #
# # register sub-payload decoders to load `.pld: Raw`
# # decoded `Msg`-packets using a dynamic lookup (table)
# # instead of a pre-defined msg-spec via `Generic`
# # parameterization.
# #
# (
# tags,
# payload_dec,
# ) = mk_tagged_union_dec(
# tagged_structs=list(payload_types.__args__),
# )
# # register sub-decoders by tag
# subdecs: dict[str, msgpack.Decoder]|None = self._payload_decs
# for name in tags:
# subdecs.setdefault(
# name,
# payload_dec,
# )
# return payload_dec
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
# def dec_payload(
# codec: MsgCodec,
# msg: Msg,
# ) -> Any|Struct:
# msg: Msg = codec.dec.decode(msg)
# payload_tag: str = msg.header.payload_tag
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
# return payload_dec.decode(msg.pld)
# def enc_payload(
# codec: MsgCodec,
# payload: Any,
# cid: str,
# ) -> bytes:
# # tag_field: str|None = None
# plbytes = codec.enc.encode(payload)
# if b'msg_type' in plbytes:
# assert isinstance(payload, Struct)
# # tag_field: str = type(payload).__name__
# payload = msgspec.Raw(plbytes)
# msg = Msg(
# cid=cid,
# pld=payload,
# # Header(
# # payload_tag=tag_field,
# # # dialog_id,
# # ),
# )
# return codec.enc.encode(msg)
def mk_tagged_union_dec(
tagged_structs: list[Struct],
) -> tuple[ # TODO: sub-decoded `Raw` fields?
list[str], # -[ ] see `MsgCodec._payload_decs` notes
msgpack.Decoder, #
]: # XXX if we wanted something more complex then field name str-keys
# See "tagged unions" docs: # we might need a header field type to describe the lookup sys?
# https://jcristharif.com/msgspec/structs.html#tagged-unions # class Header(Struct, tag=True):
# '''
# A msg header which defines payload properties
# "The quickest way to enable tagged unions is to set tag=True when # '''
# defining every struct type in the union. In this case tag_field # payload_tag: str|None = None
# defaults to "type", and tag defaults to the struct class name
# (e.g. "Get")."
first: Struct = tagged_structs[0]
types_union: Union[Type[Struct]] = Union[
first
]|Any
tags: list[str] = [first.__name__]
for struct in tagged_structs[1:]:
types_union |= struct
tags.append(struct.__name__)
dec = msgpack.Decoder(types_union) #def mk_tagged_union_dec(
return ( # tagged_structs: list[Struct],
tags,
dec, #) -> tuple[
) # list[str],
# msgpack.Decoder,
#]:
# '''
# Create a `msgpack.Decoder` for an input `list[msgspec.Struct]`
# and return a `list[str]` of each struct's `tag_field: str` value
# which can be used to "map to" the initialized dec.
# '''
# # See "tagged unions" docs:
# # https://jcristharif.com/msgspec/structs.html#tagged-unions
# # "The quickest way to enable tagged unions is to set tag=True when
# # defining every struct type in the union. In this case tag_field
# # defaults to "type", and tag defaults to the struct class name
# # (e.g. "Get")."
# first: Struct = tagged_structs[0]
# types_union: Union[Type[Struct]] = Union[
# first
# ]|Any
# tags: list[str] = [first.__name__]
# for struct in tagged_structs[1:]:
# types_union |= struct
# tags.append(
# getattr(
# struct,
# struct.__struct_config__.tag_field,
# struct.__name__,
# )
# )
# dec = msgpack.Decoder(types_union)
# return (
# tags,
# dec,
# )
# TODO: struct aware messaging coders as per:
# - https://github.com/goodboy/tractor/issues/36
# - https://github.com/goodboy/tractor/issues/196
# - https://github.com/goodboy/tractor/issues/365
def mk_codec( def mk_codec(
libname: str = 'msgspec', ipc_msg_spec: Union[Type[Struct]]|Any|None = None,
# for codec-ing boxed `Msg`-with-payload msgs
payload_types: Union[Type[Struct]]|None = None,
# TODO: do we want to allow NOT/using a diff `Msg`-set?
# #
# ^TODO^: in the long run, do we want to allow using a diff IPC `Msg`-set?
# it would break the runtime, but maybe say if you wanted
# to add some kinda field-specific or wholesale `.pld` ecryption?
# struct type unions set for `Decoder` # struct type unions set for `Decoder`
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
ipc_msg_spec: Union[Type[Struct]]|Any = Any, ipc_pld_spec: Union[Type[Struct]]|Any|None = None,
cache_now: bool = True, # TODO: offering a per-msg(-field) type-spec such that
# the fields can be dynamically NOT decoded and left as `Raw`
# values which are later loaded by a sub-decoder specified
# by `tag_field: str` value key?
# payload_msg_specs: dict[
# str, # tag_field value as sub-decoder key
# Union[Type[Struct]] # `Msg.pld` type spec
# ]|None = None,
libname: str = 'msgspec',
# proxy as `Struct(**kwargs)` # proxy as `Struct(**kwargs)`
# ------ - ------
dec_hook: Callable|None = None,
enc_hook: Callable|None = None,
# ------ - ------
**kwargs, **kwargs,
#
# Encoder:
# write_buffer_size=write_buffer_size,
#
# Decoder:
# ext_hook: ext_hook_sig
) -> MsgCodec: ) -> MsgCodec:
''' '''
@ -247,75 +324,78 @@ def mk_codec(
`msgspec` ;). `msgspec` ;).
''' '''
if (
ipc_msg_spec is not None
and ipc_pld_spec
):
raise RuntimeError(
f'If a payload spec is provided,\n'
"the builtin SC-shuttle-protocol's msg set\n"
f'(i.e. `{Msg}`) MUST be used!\n\n'
f'However both values were passed as => mk_codec(\n'
f' ipc_msg_spec={ipc_msg_spec}`\n'
f' ipc_pld_spec={ipc_pld_spec}`\n)\n'
)
elif (
ipc_pld_spec
and
# XXX required for now (or maybe forever?) until
# we can dream up a way to allow parameterizing and/or
# custom overrides to the `Msg`-spec protocol itself?
ipc_msg_spec is None
):
# (manually) generate a msg-payload-spec for all relevant # (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP # for the decoder such that all sub-type msgs in our SCIPP
# will automatically decode to a type-"limited" payload (`Struct`) # will automatically decode to a type-"limited" payload (`Struct`)
# object (set). # object (set).
payload_type_spec: Union[Type[Msg]]|None = None
if payload_types:
( (
payload_type_spec, ipc_msg_spec,
msg_types, msg_types,
) = mk_msg_spec( ) = mk_msg_spec(
payload_type=payload_types, payload_type_union=ipc_pld_spec,
) )
assert len(payload_type_spec.__args__) == len(msg_types) assert len(ipc_msg_spec.__args__) == len(msg_types)
assert ipc_msg_spec
# TODO: sub-decode `.pld: Raw`? else:
# see similar notes inside `.msg.types`.. ipc_msg_spec = ipc_msg_spec or Any
#
# not sure we'll end up wanting/needing this enc = msgpack.Encoder(
# though it might have unforeseen advantages in terms enc_hook=enc_hook,
# of enabling encrypted appliciation layer (only) )
# payloads? dec = msgpack.Decoder(
# type=ipc_msg_spec, # like `Msg[Any]`
# register sub-payload decoders to load `.pld: Raw` dec_hook=dec_hook,
# decoded `Msg`-packets using a dynamic lookup (table) )
# instead of a pre-defined msg-spec via `Generic`
# parameterization.
#
# (
# tags,
# payload_dec,
# ) = mk_tagged_union_dec(
# tagged_structs=list(payload_types.__args__),
# )
# _payload_decs: (
# dict[str, msgpack.Decoder]|None
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `Msg.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# for name in tags:
# _payload_decs[name] = payload_dec
codec = MsgCodec( codec = MsgCodec(
ipc_msg_spec=ipc_msg_spec, _enc=enc,
payload_msg_spec=payload_type_spec, _dec=dec,
**kwargs, pld_spec=ipc_pld_spec,
# payload_msg_specs=payload_msg_specs,
# **kwargs,
) )
assert codec.lib.__name__ == libname
# by default, config-n-cache the codec pair from input settings. # sanity on expected backend support
if cache_now: assert codec.lib.__name__ == libname
assert codec.enc
assert codec.dec
return codec return codec
# instance of the default `msgspec.msgpack` codec settings, i.e. # instance of the default `msgspec.msgpack` codec settings, i.e.
# no custom structs, hooks or other special types. # no custom structs, hooks or other special types.
_def_msgspec_codec: MsgCodec = mk_codec() _def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
# NOTE: provides for per-`trio.Task` specificity of the # NOTE: provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
# `Channel.send()/.recv()` of wire data. # `Channel.send()/.recv()` of wire data.
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
default=_def_msgspec_codec, default=_def_msgspec_codec,
) )
@ -353,7 +433,7 @@ def limit_msg_spec(
payload_types: Union[Type[Struct]], payload_types: Union[Type[Struct]],
# TODO: don't need this approach right? # TODO: don't need this approach right?
# # -> related to the `MsgCodec._payload_decs` stuff above..
# tagged_structs: list[Struct]|None = None, # tagged_structs: list[Struct]|None = None,
**codec_kwargs, **codec_kwargs,

View File

@ -22,9 +22,7 @@ that is,
the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol". the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol".
''' '''
from __future__ import annotations from __future__ import annotations
# from contextlib import contextmanager as cm
import types import types
from typing import ( from typing import (
Any, Any,
@ -36,22 +34,13 @@ from typing import (
) )
from msgspec import ( from msgspec import (
msgpack, defstruct,
Raw, # field,
Struct, Struct,
UNSET, UNSET,
UnsetType,
) )
# TODO: can also remove yah?
#
# class Header(Struct, tag=True):
# '''
# A msg header which defines payload properties
# '''
# payload_tag: str|None = None
# type variable for the boxed payload field `.pld` # type variable for the boxed payload field `.pld`
PayloadT = TypeVar('PayloadT') PayloadT = TypeVar('PayloadT')
@ -61,6 +50,9 @@ class Msg(
Generic[PayloadT], Generic[PayloadT],
tag=True, tag=True,
tag_field='msg_type', tag_field='msg_type',
# eq=True,
# order=True,
): ):
''' '''
The "god" boxing msg type. The "god" boxing msg type.
@ -70,9 +62,13 @@ class Msg(
tree. tree.
''' '''
# header: Header
# TODO: use UNSET here?
cid: str|None # call/context-id cid: str|None # call/context-id
# ^-TODO-^: more explicit type?
# -[ ] use UNSET here?
# https://jcristharif.com/msgspec/supported-types.html#unset
#
# -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
# The msgs "payload" (spelled without vowels): # The msgs "payload" (spelled without vowels):
# https://en.wikipedia.org/wiki/Payload_(computing) # https://en.wikipedia.org/wiki/Payload_(computing)
@ -94,9 +90,24 @@ class Msg(
pld: PayloadT pld: PayloadT
# TODO: better name, like `Call/TaskInput`? # TODO: caps based RPC support in the payload?
#
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by
# inserting a ``MetaPathFinder`` into ``sys.meta_path`` (which
# we could do before entering the ``Actor._process_messages()``
# loop)?
# - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
#
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
#
# -[ ]better name, like `Call/TaskInput`?
#
class FuncSpec(Struct): class FuncSpec(Struct):
# TODO: can we combine these 2 into a `NamespacePath` field?
ns: str ns: str
func: str func: str
@ -126,19 +137,18 @@ class Start(
pld: FuncSpec pld: FuncSpec
FuncType: Literal[
'asyncfunc',
'asyncgen',
'context', # TODO: the only one eventually?
] = 'context'
class IpcCtxSpec(Struct): class IpcCtxSpec(Struct):
''' '''
An inter-actor-`trio.Task`-comms `Context` spec. An inter-actor-`trio.Task`-comms `Context` spec.
''' '''
functype: FuncType # TODO: maybe better names for all these?
# -[ ] obvi ^ would need sync with `._rpc`
functype: Literal[
'asyncfunc',
'asyncgen',
'context', # TODO: the only one eventually?
]
# TODO: as part of the reponse we should report our allowed # TODO: as part of the reponse we should report our allowed
# msg spec which should be generated from the type-annots as # msg spec which should be generated from the type-annots as
@ -172,6 +182,7 @@ class Started(
decorated IPC endpoint. decorated IPC endpoint.
''' '''
pld: PayloadT
# TODO: instead of using our existing `Start` # TODO: instead of using our existing `Start`
@ -188,6 +199,7 @@ class Yield(
Per IPC transmission of a value from `await MsgStream.send(<value>)`. Per IPC transmission of a value from `await MsgStream.send(<value>)`.
''' '''
pld: PayloadT
class Stop(Msg): class Stop(Msg):
@ -196,7 +208,7 @@ class Stop(Msg):
of `StopAsyncIteration`. of `StopAsyncIteration`.
''' '''
pld: UNSET pld: UnsetType = UNSET
class Return( class Return(
@ -208,6 +220,7 @@ class Return(
func-as-`trio.Task`. func-as-`trio.Task`.
''' '''
pld: PayloadT
class ErrorData(Struct): class ErrorData(Struct):
@ -248,46 +261,115 @@ class Error(Msg):
# cid: str # cid: str
def mk_msg_spec( # built-in SC shuttle protocol msg type set in
payload_type: Union[Type] = Any, # approx order of the IPC txn-state spaces.
boxing_msg_set: set[Msg] = { __spec__: list[Msg] = [
# inter-actor RPC initiation
Start,
StartAck,
# no-outcome-yet IAC (inter-actor-communication)
Started,
Yield,
Stop,
# termination outcomes
Return,
Error,
]
_runtime_spec_msgs: list[Msg] = [
Start,
StartAck,
Stop,
Error,
]
_payload_spec_msgs: list[Msg] = [
Started, Started,
Yield, Yield,
Return, Return,
}, ]
def mk_msg_spec(
payload_type_union: Union[Type] = Any,
# boxing_msg_set: list[Msg] = _payload_spec_msgs,
spec_build_method: Literal[
'indexed_generics', # works
'defstruct',
'types_new_class',
] = 'indexed_generics',
) -> tuple[ ) -> tuple[
Union[Type[Msg]], Union[Type[Msg]],
list[Type[Msg]], list[Type[Msg]],
]: ]:
''' '''
Generate a payload-type-parameterized `Msg` specification such Create a payload-(data-)type-parameterized IPC message specification.
that IPC msgs which can be `Msg.pld` (payload) type
limited/filterd are specified given an input `payload_type: Allows generating IPC msg types from the above builtin set
Union[Type]`. with a payload (field) restricted data-type via the `Msg.pld:
PayloadT` type var. This allows runtime-task contexts to use
the python type system to limit/filter payload values as
determined by the input `payload_type_union: Union[Type]`.
''' '''
submsg_types: list[Type[Msg]] = Msg.__subclasses__() submsg_types: list[Type[Msg]] = Msg.__subclasses__()
bases: tuple = (
# XXX NOTE XXX the below generic-parameterization seems to
# be THE ONLY way to get this to work correctly in terms
# of getting ValidationError on a roundtrip?
Msg[payload_type_union],
Generic[PayloadT],
)
defstruct_bases: tuple = (
Msg, # [payload_type_union],
# Generic[PayloadT],
# ^-XXX-^: not allowed? lul..
)
ipc_msg_types: list[Msg] = []
# TODO: see below as well, idx_msg_types: list[Msg] = []
# => union building approach with `.__class_getitem__()` defs_msg_types: list[Msg] = []
# doesn't seem to work..? nc_msg_types: list[Msg] = []
#
# payload_type_spec: Union[Type[Msg]] for msgtype in __spec__:
#
msg_types: list[Msg] = [] # for the NON-payload (user api) type specify-able
for msgtype in boxing_msg_set: # msgs types, we simply aggregate the def as is
# for inclusion in the output type `Union`.
if msgtype not in _payload_spec_msgs:
ipc_msg_types.append(msgtype)
continue
# check inheritance sanity # check inheritance sanity
assert msgtype in submsg_types assert msgtype in submsg_types
# TODO: wait why do we need the dynamic version here? # TODO: wait why do we need the dynamic version here?
# -[ ] paraming the `PayloadT` values via `Generic[T]` # XXX ANSWER XXX -> BC INHERITANCE.. don't work w generics..
# doesn't seem to work at all?
# -[ ] is there a way to get it to work at module level
# just using inheritance or maybe a metaclass?
# #
# index_paramed_msg_type: Msg = msgtype[payload_type] # NOTE previously bc msgtypes WERE NOT inheritting
# directly the `Generic[PayloadT]` type, the manual method
# of generic-paraming with `.__class_getitem__()` wasn't
# working..
#
# XXX but bc i changed that to make every subtype inherit
# it, this manual "indexed parameterization" method seems
# to work?
#
# -[x] paraming the `PayloadT` values via `Generic[T]`
# does work it seems but WITHOUT inheritance of generics
#
# -[-] is there a way to get it to work at module level
# just using inheritance or maybe a metaclass?
# => thot that `defstruct` might work, but NOPE, see
# below..
#
idxed_msg_type: Msg = msgtype[payload_type_union]
idx_msg_types.append(idxed_msg_type)
# TODO: WHY do we need to dynamically generate the # TODO: WHY do we need to dynamically generate the
# subtype-msgs here to ensure the `.pld` parameterization # subtype-msgs here to ensure the `.pld` parameterization
@ -295,98 +377,69 @@ def mk_msg_spec(
# `msgpack.Decoder()`..? # `msgpack.Decoder()`..?
# #
# dynamically create the payload type-spec-limited msg set. # dynamically create the payload type-spec-limited msg set.
manual_paramed_msg_subtype: Type = types.new_class( newclass_msgtype: Type = types.new_class(
msgtype.__name__, name=msgtype.__name__,
( bases=bases,
# XXX NOTE XXX this seems to be THE ONLY kwds={},
# way to get this to work correctly!?! )
Msg[payload_type], nc_msg_types.append(
Generic[PayloadT], newclass_msgtype[payload_type_union]
),
{},
) )
# TODO: grok the diff here better.. # with `msgspec.structs.defstruct`
# XXX ALSO DOESN'T WORK
defstruct_msgtype = defstruct(
name=msgtype.__name__,
fields=[
('cid', str),
# XXX doesn't seem to work..
# ('pld', PayloadT),
('pld', payload_type_union),
],
bases=defstruct_bases,
)
defs_msg_types.append(defstruct_msgtype)
# assert index_paramed_msg_type == manual_paramed_msg_subtype # assert index_paramed_msg_type == manual_paramed_msg_subtype
# XXX TODO: why does the manual method work but not the # paramed_msg_type = manual_paramed_msg_subtype
# `.__class_getitem__()` one!?!
paramed_msg_type = manual_paramed_msg_subtype
# payload_type_spec |= paramed_msg_type # ipc_payload_msgs_type_union |= index_paramed_msg_type
msg_types.append(paramed_msg_type)
idx_spec: Union[Type[Msg]] = Union[*idx_msg_types]
def_spec: Union[Type[Msg]] = Union[*defs_msg_types]
nc_spec: Union[Type[Msg]] = Union[*nc_msg_types]
payload_type_spec: Union[Type[Msg]] = Union[*msg_types] specs: dict[str, Union[Type[Msg]]] = {
return ( 'indexed_generics': idx_spec,
payload_type_spec, 'defstruct': def_spec,
msg_types, 'types_new_class': nc_spec,
) }
msgtypes_table: dict[str, list[Msg]] = {
'indexed_generics': idx_msg_types,
# TODO: integration with our ``enable_modules: list[str]`` caps sys. 'defstruct': defs_msg_types,
# 'types_new_class': nc_msg_types,
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by inserting
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
# entering the ``Actor._process_messages()`` loop).
# https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
# TODO: do we still want to try and support the sub-decoder with
# `Raw` technique in the case that the `Generic` approach gives
# future grief?
#
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
_payload_decs: dict[
str|None,
msgpack.Decoder,
] = {
# default decoder is used when `Header.payload_tag == None`
None: msgpack.Decoder(Any),
} }
# XXX lol apparently type unions can't ever
# be equal eh?
# TODO: grok the diff here better..
#
# assert (
# idx_spec
# ==
# nc_spec
# ==
# def_spec
# )
# breakpoint()
def dec_payload( pld_spec: Union[Type] = specs[spec_build_method]
msg: Msg, runtime_spec: Union[Type] = Union[*ipc_msg_types]
msg_dec: msgpack.Decoder = msgpack.Decoder(
type=Msg[Any]
),
) -> Any|Struct: return (
pld_spec | runtime_spec,
msg: Msg = msg_dec.decode(msg) msgtypes_table[spec_build_method] + ipc_msg_types,
payload_tag: str = msg.header.payload_tag
payload_dec: msgpack.Decoder = _payload_decs[payload_tag]
return payload_dec.decode(msg.pld)
def enc_payload(
enc: msgpack.Encoder,
payload: Any,
cid: str,
) -> bytes:
# tag_field: str|None = None
plbytes = enc.encode(payload)
if b'msg_type' in plbytes:
assert isinstance(payload, Struct)
# tag_field: str = type(payload).__name__
payload = Raw(plbytes)
msg = Msg(
cid=cid,
pld=payload,
# Header(
# payload_tag=tag_field,
# # dialog_id,
# ),
) )
return enc.encode(msg)