Compare commits
No commits in common. "cb728e3bd67333530951328dac97e83d61ff46b3" and "995af130cf200d7e12731b00eb334f8c44197307" have entirely different histories.
cb728e3bd6
...
995af130cf
|
@ -1,6 +1,5 @@
|
|||
'''
|
||||
Low-level functional audits for our
|
||||
"capability based messaging"-spec feats.
|
||||
Functional audits for our "capability based messaging (schema)" feats.
|
||||
|
||||
B~)
|
||||
|
||||
|
@ -23,7 +22,6 @@ from msgspec import (
|
|||
Struct,
|
||||
ValidationError,
|
||||
)
|
||||
import pytest
|
||||
import tractor
|
||||
from tractor.msg import (
|
||||
_def_msgspec_codec,
|
||||
|
@ -35,31 +33,14 @@ from tractor.msg import (
|
|||
apply_codec,
|
||||
current_msgspec_codec,
|
||||
)
|
||||
from tractor.msg import types
|
||||
from tractor.msg.types import (
|
||||
# PayloadT,
|
||||
PayloadT,
|
||||
Msg,
|
||||
# Started,
|
||||
mk_msg_spec,
|
||||
)
|
||||
import trio
|
||||
|
||||
|
||||
def test_msg_spec_xor_pld_spec():
|
||||
'''
|
||||
If the `.msg.types.Msg`-set is overridden, we
|
||||
can't also support a `Msg.pld` spec.
|
||||
|
||||
'''
|
||||
# apply custom hooks and set a `Decoder` which only
|
||||
# loads `NamespacePath` types.
|
||||
with pytest.raises(RuntimeError):
|
||||
mk_codec(
|
||||
ipc_msg_spec=Any,
|
||||
ipc_pld_spec=NamespacePath,
|
||||
)
|
||||
|
||||
|
||||
# TODO: wrap these into `._codec` such that user can just pass
|
||||
# a type table of some sort?
|
||||
def enc_hook(obj: Any) -> Any:
|
||||
|
@ -85,13 +66,11 @@ def ex_func(*args):
|
|||
print(f'ex_func({args})')
|
||||
|
||||
|
||||
def mk_custom_codec(
|
||||
ipc_msg_spec: Type[Any] = Any,
|
||||
) -> MsgCodec:
|
||||
def mk_custom_codec() -> MsgCodec:
|
||||
# apply custom hooks and set a `Decoder` which only
|
||||
# loads `NamespacePath` types.
|
||||
nsp_codec: MsgCodec = mk_codec(
|
||||
ipc_msg_spec=ipc_msg_spec,
|
||||
ipc_msg_spec=NamespacePath,
|
||||
enc_hook=enc_hook,
|
||||
dec_hook=dec_hook,
|
||||
)
|
||||
|
@ -236,154 +215,115 @@ def test_codec_hooks_mod():
|
|||
|
||||
|
||||
def chk_pld_type(
|
||||
payload_spec: Type[Struct]|Any,
|
||||
generic: Msg|_GenericAlias,
|
||||
payload_type: Type[Struct]|Any,
|
||||
pld: Any,
|
||||
|
||||
expect_roundtrip: bool|None = None,
|
||||
|
||||
) -> bool:
|
||||
|
||||
roundtrip: bool = False
|
||||
pld_val_type: Type = type(pld)
|
||||
|
||||
# gen_paramed: _GenericAlias = generic[payload_type]
|
||||
# TODO: verify that the overridden subtypes
|
||||
# DO NOT have modified type-annots from original!
|
||||
# 'Start', .pld: FuncSpec
|
||||
# 'StartAck', .pld: IpcCtxSpec
|
||||
# 'Stop', .pld: UNSEt
|
||||
# 'Error', .pld: ErrorData
|
||||
# for typedef in (
|
||||
# [gen_paramed]
|
||||
# +
|
||||
|
||||
codec: MsgCodec = mk_codec(
|
||||
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
|
||||
# type union.
|
||||
ipc_pld_spec=payload_spec,
|
||||
)
|
||||
# # type-var should always be set for these sub-types
|
||||
# # as well!
|
||||
# Msg.__subclasses__()
|
||||
# ):
|
||||
# if typedef.__name__ not in [
|
||||
# 'Msg',
|
||||
# 'Started',
|
||||
# 'Yield',
|
||||
# 'Return',
|
||||
# ]:
|
||||
# continue
|
||||
# payload_type: Type[Struct] = CustomPayload
|
||||
|
||||
# make a one-off dec to compare with our `MsgCodec` instance
|
||||
# which does the below `mk_msg_spec()` call internally
|
||||
ipc_msg_spec: Union[Type[Struct]]
|
||||
msg_types: list[Msg[payload_spec]]
|
||||
# TODO: can remove all this right!?
|
||||
#
|
||||
# when parameterized (like `Msg[Any]`) then
|
||||
# we expect an alias as input.
|
||||
# if isinstance(generic, _GenericAlias):
|
||||
# assert payload_type in generic.__args__
|
||||
# else:
|
||||
# assert PayloadType in generic.__parameters__
|
||||
# pld_param: Parameter = generic.__signature__.parameters['pld']
|
||||
# assert pld_param.annotation is PayloadType
|
||||
|
||||
type_spec: Union[Type[Struct]]
|
||||
msg_types: list[Msg[payload_type]]
|
||||
(
|
||||
ipc_msg_spec,
|
||||
type_spec,
|
||||
msg_types,
|
||||
) = mk_msg_spec(
|
||||
payload_type_union=payload_spec,
|
||||
payload_type=payload_type,
|
||||
)
|
||||
_enc = msgpack.Encoder()
|
||||
_dec = msgpack.Decoder(
|
||||
type=ipc_msg_spec or Any, # like `Msg[Any]`
|
||||
)
|
||||
|
||||
assert (
|
||||
payload_spec
|
||||
==
|
||||
codec.pld_spec
|
||||
)
|
||||
|
||||
# assert codec.dec == dec
|
||||
#
|
||||
# ^-XXX-^ not sure why these aren't "equal" but when cast
|
||||
# to `str` they seem to match ?? .. kk
|
||||
|
||||
assert (
|
||||
str(ipc_msg_spec)
|
||||
==
|
||||
str(codec.msg_spec)
|
||||
==
|
||||
str(_dec.type)
|
||||
==
|
||||
str(codec.dec.type)
|
||||
enc = msgpack.Encoder()
|
||||
dec = msgpack.Decoder(
|
||||
type=type_spec, # like `Msg[Any]`
|
||||
)
|
||||
|
||||
# verify the boxed-type for all variable payload-type msgs.
|
||||
if not msg_types:
|
||||
breakpoint()
|
||||
|
||||
roundtrip: bool|None = None
|
||||
pld_spec_msg_names: list[str] = [
|
||||
td.__name__ for td in types._payload_spec_msgs
|
||||
]
|
||||
for typedef in msg_types:
|
||||
skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names
|
||||
if skip_runtime_msg:
|
||||
continue
|
||||
|
||||
pld_field = structs.fields(typedef)[1]
|
||||
assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere?
|
||||
assert pld_field.type in {payload_type, PayloadT}
|
||||
# TODO: does this need to work to get all subtypes to
|
||||
# adhere?
|
||||
assert pld_field.type is payload_type
|
||||
|
||||
kwargs: dict[str, Any] = {
|
||||
'cid': '666',
|
||||
'pld': pld,
|
||||
}
|
||||
enc_msg: Msg = typedef(**kwargs)
|
||||
enc_msg = typedef(**kwargs)
|
||||
|
||||
_wire_bytes: bytes = _enc.encode(enc_msg)
|
||||
wire_bytes: bytes = codec.enc.encode(enc_msg)
|
||||
assert _wire_bytes == wire_bytes
|
||||
wire_bytes: bytes = enc.encode(enc_msg)
|
||||
|
||||
ve: ValidationError|None = None
|
||||
try:
|
||||
dec_msg = codec.dec.decode(wire_bytes)
|
||||
_dec_msg = _dec.decode(wire_bytes)
|
||||
dec_msg = dec.decode(wire_bytes)
|
||||
assert dec_msg.pld == pld
|
||||
assert (roundtrip := (dec_msg == enc_msg))
|
||||
|
||||
# decoded msg and thus payload should be exactly same!
|
||||
assert (roundtrip := (
|
||||
_dec_msg
|
||||
==
|
||||
dec_msg
|
||||
==
|
||||
enc_msg
|
||||
))
|
||||
|
||||
if (
|
||||
expect_roundtrip is not None
|
||||
and expect_roundtrip != roundtrip
|
||||
):
|
||||
breakpoint()
|
||||
|
||||
assert (
|
||||
pld
|
||||
==
|
||||
dec_msg.pld
|
||||
==
|
||||
enc_msg.pld
|
||||
)
|
||||
# assert (roundtrip := (_dec_msg == enc_msg))
|
||||
|
||||
except ValidationError as _ve:
|
||||
ve = _ve
|
||||
roundtrip: bool = False
|
||||
if pld_val_type is payload_spec:
|
||||
except ValidationError as ve:
|
||||
# breakpoint()
|
||||
if pld_val_type is payload_type:
|
||||
raise ValueError(
|
||||
'Got `ValidationError` despite type-var match!?\n'
|
||||
f'pld_val_type: {pld_val_type}\n'
|
||||
f'payload_type: {payload_spec}\n'
|
||||
f'payload_type: {payload_type}\n'
|
||||
) from ve
|
||||
|
||||
else:
|
||||
# ow we good cuz the pld spec mismatched.
|
||||
print(
|
||||
'Got expected `ValidationError` since,\n'
|
||||
f'{pld_val_type} is not {payload_spec}\n'
|
||||
f'{pld_val_type} is not {payload_type}\n'
|
||||
)
|
||||
else:
|
||||
if (
|
||||
payload_spec is not Any
|
||||
and
|
||||
pld_val_type is not payload_spec
|
||||
pld_val_type is not payload_type
|
||||
and payload_type is not Any
|
||||
):
|
||||
raise ValueError(
|
||||
'DID NOT `ValidationError` despite expected type match!?\n'
|
||||
f'pld_val_type: {pld_val_type}\n'
|
||||
f'payload_type: {payload_spec}\n'
|
||||
f'payload_type: {payload_type}\n'
|
||||
)
|
||||
|
||||
# full code decode should always be attempted!
|
||||
if roundtrip is None:
|
||||
breakpoint()
|
||||
|
||||
return roundtrip
|
||||
|
||||
|
||||
|
||||
def test_limit_msgspec():
|
||||
|
||||
async def main():
|
||||
|
@ -393,10 +333,9 @@ def test_limit_msgspec():
|
|||
|
||||
# ensure we can round-trip a boxing `Msg`
|
||||
assert chk_pld_type(
|
||||
# Msg,
|
||||
Msg,
|
||||
Any,
|
||||
None,
|
||||
expect_roundtrip=True,
|
||||
)
|
||||
|
||||
# TODO: don't need this any more right since
|
||||
|
@ -408,7 +347,7 @@ def test_limit_msgspec():
|
|||
|
||||
# verify that a mis-typed payload value won't decode
|
||||
assert not chk_pld_type(
|
||||
# Msg,
|
||||
Msg,
|
||||
int,
|
||||
pld='doggy',
|
||||
)
|
||||
|
@ -421,13 +360,13 @@ def test_limit_msgspec():
|
|||
value: Any
|
||||
|
||||
assert not chk_pld_type(
|
||||
# Msg,
|
||||
Msg,
|
||||
CustomPayload,
|
||||
pld='doggy',
|
||||
)
|
||||
|
||||
assert chk_pld_type(
|
||||
# Msg,
|
||||
Msg,
|
||||
CustomPayload,
|
||||
pld=CustomPayload(name='doggy', value='urmom')
|
||||
)
|
|
@ -536,9 +536,7 @@ def pack_error(
|
|||
# content's `.msgdata`).
|
||||
error_msg['tb_str'] = tb_str
|
||||
|
||||
pkt: dict = {
|
||||
'error': error_msg,
|
||||
}
|
||||
pkt: dict = {'error': error_msg}
|
||||
if cid:
|
||||
pkt['cid'] = cid
|
||||
|
||||
|
|
|
@ -48,7 +48,6 @@ from tractor._exceptions import TransportClosed
|
|||
from tractor.msg import (
|
||||
_ctxvar_MsgCodec,
|
||||
MsgCodec,
|
||||
mk_codec,
|
||||
)
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -163,7 +162,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
|
||||
# allow for custom IPC msg interchange format
|
||||
# dynamic override Bo
|
||||
self.codec: MsgCodec = codec or mk_codec()
|
||||
self.codec: MsgCodec = codec or MsgCodec()
|
||||
|
||||
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
||||
'''
|
||||
|
|
|
@ -89,10 +89,7 @@ async def _invoke_non_context(
|
|||
|
||||
# TODO: can we unify this with the `context=True` impl below?
|
||||
if inspect.isasyncgen(coro):
|
||||
await chan.send({
|
||||
'cid': cid,
|
||||
'functype': 'asyncgen',
|
||||
})
|
||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||
# XXX: massive gotcha! If the containing scope
|
||||
# is cancelled and we execute the below line,
|
||||
# any ``ActorNursery.__aexit__()`` WON'T be
|
||||
|
@ -112,27 +109,18 @@ async def _invoke_non_context(
|
|||
# to_send = await chan.recv_nowait()
|
||||
# if to_send is not None:
|
||||
# to_yield = await coro.asend(to_send)
|
||||
await chan.send({
|
||||
'yield': item,
|
||||
'cid': cid,
|
||||
})
|
||||
await chan.send({'yield': item, 'cid': cid})
|
||||
|
||||
log.runtime(f"Finished iterating {coro}")
|
||||
# TODO: we should really support a proper
|
||||
# `StopAsyncIteration` system here for returning a final
|
||||
# value if desired
|
||||
await chan.send({
|
||||
'stop': True,
|
||||
'cid': cid,
|
||||
})
|
||||
await chan.send({'stop': True, 'cid': cid})
|
||||
|
||||
# one way @stream func that gets treated like an async gen
|
||||
# TODO: can we unify this with the `context=True` impl below?
|
||||
elif treat_as_gen:
|
||||
await chan.send({
|
||||
'cid': cid,
|
||||
'functype': 'asyncgen',
|
||||
})
|
||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||
# XXX: the async-func may spawn further tasks which push
|
||||
# back values like an async-generator would but must
|
||||
# manualy construct the response dict-packet-responses as
|
||||
|
@ -145,10 +133,7 @@ async def _invoke_non_context(
|
|||
if not cs.cancelled_caught:
|
||||
# task was not cancelled so we can instruct the
|
||||
# far end async gen to tear down
|
||||
await chan.send({
|
||||
'stop': True,
|
||||
'cid': cid
|
||||
})
|
||||
await chan.send({'stop': True, 'cid': cid})
|
||||
else:
|
||||
# regular async function/method
|
||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
||||
|
@ -197,10 +182,10 @@ async def _invoke_non_context(
|
|||
and chan.connected()
|
||||
):
|
||||
try:
|
||||
await chan.send({
|
||||
'return': result,
|
||||
'cid': cid,
|
||||
})
|
||||
await chan.send(
|
||||
{'return': result,
|
||||
'cid': cid}
|
||||
)
|
||||
except (
|
||||
BrokenPipeError,
|
||||
trio.BrokenResourceError,
|
||||
|
@ -494,8 +479,8 @@ async def _invoke(
|
|||
# "least sugary" type of RPC ep with support for
|
||||
# bi-dir streaming B)
|
||||
await chan.send({
|
||||
'cid': cid,
|
||||
'functype': 'context',
|
||||
'cid': cid
|
||||
})
|
||||
|
||||
# TODO: should we also use an `.open_context()` equiv
|
||||
|
|
|
@ -33,24 +33,3 @@ from ._codec import (
|
|||
MsgCodec as MsgCodec,
|
||||
current_msgspec_codec as current_msgspec_codec,
|
||||
)
|
||||
|
||||
from .types import (
|
||||
Msg as Msg,
|
||||
|
||||
Start as Start, # with pld
|
||||
FuncSpec as FuncSpec,
|
||||
|
||||
StartAck as StartAck, # with pld
|
||||
IpcCtxSpec as IpcCtxSpec,
|
||||
|
||||
Started as Started,
|
||||
Yield as Yield,
|
||||
Stop as Stop,
|
||||
Return as Return,
|
||||
|
||||
Error as Error, # with pld
|
||||
ErrorData as ErrorData,
|
||||
|
||||
# full msg spec set
|
||||
__spec__ as __spec__,
|
||||
)
|
||||
|
|
|
@ -29,7 +29,6 @@ ToDo: backends we prolly should offer:
|
|||
- https://capnproto.org/language.html#language-reference
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextvars import (
|
||||
ContextVar,
|
||||
Token,
|
||||
|
@ -55,35 +54,18 @@ from tractor.msg.types import (
|
|||
)
|
||||
|
||||
|
||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||
# TODO: API changes towards being interchange lib agnostic!
|
||||
#
|
||||
# -[ ] API changes towards being interchange lib agnostic!
|
||||
# -[ ] capnproto has pre-compiled schema for eg..
|
||||
# * https://capnproto.org/language.html
|
||||
# * http://capnproto.github.io/pycapnp/quickstart.html
|
||||
# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp
|
||||
#
|
||||
# -[ ] struct aware messaging coders as per:
|
||||
# -[x] https://github.com/goodboy/tractor/issues/36
|
||||
# -[ ] https://github.com/goodboy/tractor/issues/196
|
||||
# -[ ] https://github.com/goodboy/tractor/issues/365
|
||||
#
|
||||
class MsgCodec(Struct):
|
||||
'''
|
||||
A IPC msg interchange format lib's encoder + decoder pair.
|
||||
|
||||
'''
|
||||
_enc: msgpack.Encoder
|
||||
_dec: msgpack.Decoder
|
||||
|
||||
pld_spec: Union[Type[Struct]]|None
|
||||
|
||||
# struct type unions
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
@property
|
||||
def msg_spec(self) -> Union[Type[Struct]]:
|
||||
return self._dec.type
|
||||
|
||||
lib: ModuleType = msgspec
|
||||
|
||||
# ad-hoc type extensions
|
||||
|
@ -91,8 +73,16 @@ class MsgCodec(Struct):
|
|||
enc_hook: Callable[[Any], Any]|None = None # coder
|
||||
dec_hook: Callable[[type, Any], Any]|None = None # decoder
|
||||
|
||||
# struct type unions
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
ipc_msg_spec: Union[Type[Struct]]|Any = Any
|
||||
payload_msg_spec: Union[Type[Struct]] = Any
|
||||
|
||||
# post-configure cached props
|
||||
_enc: msgpack.Encoder|None = None
|
||||
_dec: msgpack.Decoder|None = None
|
||||
|
||||
# TODO: a sub-decoder system as well?
|
||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||
# see related comments in `.msg.types`
|
||||
# _payload_decs: (
|
||||
# dict[
|
||||
|
@ -101,18 +91,42 @@ class MsgCodec(Struct):
|
|||
# ]
|
||||
# |None
|
||||
# ) = None
|
||||
# OR
|
||||
# ) = {
|
||||
# # pre-seed decoders for std-py-type-set for use when
|
||||
# # `Msg.pld == None|Any`.
|
||||
# None: msgpack.Decoder(Any),
|
||||
# Any: msgpack.Decoder(Any),
|
||||
# }
|
||||
|
||||
# TODO: use `functools.cached_property` for these ?
|
||||
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
||||
@property
|
||||
def enc(self) -> msgpack.Encoder:
|
||||
return self._enc or self.encoder()
|
||||
|
||||
def encoder(
|
||||
self,
|
||||
enc_hook: Callable|None = None,
|
||||
reset: bool = False,
|
||||
|
||||
# TODO: what's the default for this, and do we care?
|
||||
# write_buffer_size: int
|
||||
#
|
||||
**kwargs,
|
||||
|
||||
) -> msgpack.Encoder:
|
||||
'''
|
||||
Set or get the maybe-cached `msgspec.msgpack.Encoder`
|
||||
instance configured for this codec.
|
||||
|
||||
When `reset=True` any previously configured encoder will
|
||||
be recreated and then cached with the new settings passed
|
||||
as input.
|
||||
|
||||
'''
|
||||
if (
|
||||
self._enc is None
|
||||
or reset
|
||||
):
|
||||
self._enc = self.lib.msgpack.Encoder(
|
||||
enc_hook=enc_hook or self.enc_hook,
|
||||
# write_buffer_size=write_buffer_size,
|
||||
)
|
||||
|
||||
return self._enc
|
||||
|
||||
def encode(
|
||||
|
@ -125,10 +139,40 @@ class MsgCodec(Struct):
|
|||
on a tranport protocol connection.
|
||||
|
||||
'''
|
||||
return self._enc.encode(py_obj)
|
||||
return self.enc.encode(py_obj)
|
||||
|
||||
@property
|
||||
def dec(self) -> msgpack.Decoder:
|
||||
return self._dec or self.decoder()
|
||||
|
||||
def decoder(
|
||||
self,
|
||||
ipc_msg_spec: Union[Type[Struct]]|None = None,
|
||||
dec_hook: Callable|None = None,
|
||||
reset: bool = False,
|
||||
**kwargs,
|
||||
# ext_hook: ext_hook_sig
|
||||
|
||||
) -> msgpack.Decoder:
|
||||
'''
|
||||
Set or get the maybe-cached `msgspec.msgpack.Decoder`
|
||||
instance configured for this codec.
|
||||
|
||||
When `reset=True` any previously configured decoder will
|
||||
be recreated and then cached with the new settings passed
|
||||
as input.
|
||||
|
||||
'''
|
||||
if (
|
||||
self._dec is None
|
||||
or reset
|
||||
):
|
||||
self._dec = self.lib.msgpack.Decoder(
|
||||
type=ipc_msg_spec or self.ipc_msg_spec,
|
||||
dec_hook=dec_hook or self.dec_hook,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
return self._dec
|
||||
|
||||
def decode(
|
||||
|
@ -141,181 +185,60 @@ class MsgCodec(Struct):
|
|||
determined by the
|
||||
|
||||
'''
|
||||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||
return self._dec.decode(msg)
|
||||
|
||||
# TODO: do we still want to try and support the sub-decoder with
|
||||
# `.Raw` technique in the case that the `Generic` approach gives
|
||||
# future grief?
|
||||
#
|
||||
# -[ ] <NEW-ISSUE-FOR-ThIS-HERE>
|
||||
# -> https://jcristharif.com/msgspec/api.html#raw
|
||||
#
|
||||
#def mk_pld_subdec(
|
||||
# self,
|
||||
# payload_types: Union[Type[Struct]],
|
||||
|
||||
#) -> msgpack.Decoder:
|
||||
# # TODO: sub-decoder suppor for `.pld: Raw`?
|
||||
# # => see similar notes inside `.msg.types`..
|
||||
# #
|
||||
# # not sure we'll end up needing this though it might have
|
||||
# # unforeseen advantages in terms of enabling encrypted
|
||||
# # appliciation layer (only) payloads?
|
||||
# #
|
||||
# # register sub-payload decoders to load `.pld: Raw`
|
||||
# # decoded `Msg`-packets using a dynamic lookup (table)
|
||||
# # instead of a pre-defined msg-spec via `Generic`
|
||||
# # parameterization.
|
||||
# #
|
||||
# (
|
||||
# tags,
|
||||
# payload_dec,
|
||||
# ) = mk_tagged_union_dec(
|
||||
# tagged_structs=list(payload_types.__args__),
|
||||
# )
|
||||
# # register sub-decoders by tag
|
||||
# subdecs: dict[str, msgpack.Decoder]|None = self._payload_decs
|
||||
# for name in tags:
|
||||
# subdecs.setdefault(
|
||||
# name,
|
||||
# payload_dec,
|
||||
# )
|
||||
|
||||
# return payload_dec
|
||||
|
||||
# sub-decoders for retreiving embedded
|
||||
# payload data and decoding to a sender
|
||||
# side defined (struct) type.
|
||||
# def dec_payload(
|
||||
# codec: MsgCodec,
|
||||
# msg: Msg,
|
||||
|
||||
# ) -> Any|Struct:
|
||||
|
||||
# msg: Msg = codec.dec.decode(msg)
|
||||
# payload_tag: str = msg.header.payload_tag
|
||||
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
||||
# return payload_dec.decode(msg.pld)
|
||||
|
||||
# def enc_payload(
|
||||
# codec: MsgCodec,
|
||||
# payload: Any,
|
||||
# cid: str,
|
||||
|
||||
# ) -> bytes:
|
||||
|
||||
# # tag_field: str|None = None
|
||||
|
||||
# plbytes = codec.enc.encode(payload)
|
||||
# if b'msg_type' in plbytes:
|
||||
# assert isinstance(payload, Struct)
|
||||
|
||||
# # tag_field: str = type(payload).__name__
|
||||
# payload = msgspec.Raw(plbytes)
|
||||
|
||||
# msg = Msg(
|
||||
# cid=cid,
|
||||
# pld=payload,
|
||||
# # Header(
|
||||
# # payload_tag=tag_field,
|
||||
# # # dialog_id,
|
||||
# # ),
|
||||
# )
|
||||
# return codec.enc.encode(msg)
|
||||
return self.dec.decode(msg)
|
||||
|
||||
|
||||
def mk_tagged_union_dec(
|
||||
tagged_structs: list[Struct],
|
||||
|
||||
# TODO: sub-decoded `Raw` fields?
|
||||
# -[ ] see `MsgCodec._payload_decs` notes
|
||||
#
|
||||
# XXX if we wanted something more complex then field name str-keys
|
||||
# we might need a header field type to describe the lookup sys?
|
||||
# class Header(Struct, tag=True):
|
||||
# '''
|
||||
# A msg header which defines payload properties
|
||||
) -> tuple[
|
||||
list[str],
|
||||
msgpack.Decoder,
|
||||
]:
|
||||
# See "tagged unions" docs:
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
|
||||
# '''
|
||||
# payload_tag: str|None = None
|
||||
# "The quickest way to enable tagged unions is to set tag=True when
|
||||
# defining every struct type in the union. In this case tag_field
|
||||
# defaults to "type", and tag defaults to the struct class name
|
||||
# (e.g. "Get")."
|
||||
first: Struct = tagged_structs[0]
|
||||
types_union: Union[Type[Struct]] = Union[
|
||||
first
|
||||
]|Any
|
||||
tags: list[str] = [first.__name__]
|
||||
|
||||
for struct in tagged_structs[1:]:
|
||||
types_union |= struct
|
||||
tags.append(struct.__name__)
|
||||
|
||||
#def mk_tagged_union_dec(
|
||||
# tagged_structs: list[Struct],
|
||||
|
||||
#) -> tuple[
|
||||
# list[str],
|
||||
# msgpack.Decoder,
|
||||
#]:
|
||||
# '''
|
||||
# Create a `msgpack.Decoder` for an input `list[msgspec.Struct]`
|
||||
# and return a `list[str]` of each struct's `tag_field: str` value
|
||||
# which can be used to "map to" the initialized dec.
|
||||
|
||||
# '''
|
||||
# # See "tagged unions" docs:
|
||||
# # https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
|
||||
# # "The quickest way to enable tagged unions is to set tag=True when
|
||||
# # defining every struct type in the union. In this case tag_field
|
||||
# # defaults to "type", and tag defaults to the struct class name
|
||||
# # (e.g. "Get")."
|
||||
# first: Struct = tagged_structs[0]
|
||||
# types_union: Union[Type[Struct]] = Union[
|
||||
# first
|
||||
# ]|Any
|
||||
# tags: list[str] = [first.__name__]
|
||||
|
||||
# for struct in tagged_structs[1:]:
|
||||
# types_union |= struct
|
||||
# tags.append(
|
||||
# getattr(
|
||||
# struct,
|
||||
# struct.__struct_config__.tag_field,
|
||||
# struct.__name__,
|
||||
# )
|
||||
# )
|
||||
|
||||
# dec = msgpack.Decoder(types_union)
|
||||
# return (
|
||||
# tags,
|
||||
# dec,
|
||||
# )
|
||||
dec = msgpack.Decoder(types_union)
|
||||
return (
|
||||
tags,
|
||||
dec,
|
||||
)
|
||||
|
||||
# TODO: struct aware messaging coders as per:
|
||||
# - https://github.com/goodboy/tractor/issues/36
|
||||
# - https://github.com/goodboy/tractor/issues/196
|
||||
# - https://github.com/goodboy/tractor/issues/365
|
||||
|
||||
def mk_codec(
|
||||
ipc_msg_spec: Union[Type[Struct]]|Any|None = None,
|
||||
#
|
||||
# ^TODO^: in the long run, do we want to allow using a diff IPC `Msg`-set?
|
||||
# it would break the runtime, but maybe say if you wanted
|
||||
# to add some kinda field-specific or wholesale `.pld` ecryption?
|
||||
|
||||
# struct type unions set for `Decoder`
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
ipc_pld_spec: Union[Type[Struct]]|Any|None = None,
|
||||
|
||||
# TODO: offering a per-msg(-field) type-spec such that
|
||||
# the fields can be dynamically NOT decoded and left as `Raw`
|
||||
# values which are later loaded by a sub-decoder specified
|
||||
# by `tag_field: str` value key?
|
||||
# payload_msg_specs: dict[
|
||||
# str, # tag_field value as sub-decoder key
|
||||
# Union[Type[Struct]] # `Msg.pld` type spec
|
||||
# ]|None = None,
|
||||
|
||||
libname: str = 'msgspec',
|
||||
|
||||
# for codec-ing boxed `Msg`-with-payload msgs
|
||||
payload_types: Union[Type[Struct]]|None = None,
|
||||
|
||||
# TODO: do we want to allow NOT/using a diff `Msg`-set?
|
||||
#
|
||||
# struct type unions set for `Decoder`
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
ipc_msg_spec: Union[Type[Struct]]|Any = Any,
|
||||
|
||||
cache_now: bool = True,
|
||||
|
||||
# proxy as `Struct(**kwargs)`
|
||||
# ------ - ------
|
||||
dec_hook: Callable|None = None,
|
||||
enc_hook: Callable|None = None,
|
||||
# ------ - ------
|
||||
**kwargs,
|
||||
#
|
||||
# Encoder:
|
||||
# write_buffer_size=write_buffer_size,
|
||||
#
|
||||
# Decoder:
|
||||
# ext_hook: ext_hook_sig
|
||||
|
||||
) -> MsgCodec:
|
||||
'''
|
||||
|
@ -324,78 +247,75 @@ def mk_codec(
|
|||
`msgspec` ;).
|
||||
|
||||
'''
|
||||
if (
|
||||
ipc_msg_spec is not None
|
||||
and ipc_pld_spec
|
||||
):
|
||||
raise RuntimeError(
|
||||
f'If a payload spec is provided,\n'
|
||||
"the builtin SC-shuttle-protocol's msg set\n"
|
||||
f'(i.e. `{Msg}`) MUST be used!\n\n'
|
||||
f'However both values were passed as => mk_codec(\n'
|
||||
f' ipc_msg_spec={ipc_msg_spec}`\n'
|
||||
f' ipc_pld_spec={ipc_pld_spec}`\n)\n'
|
||||
)
|
||||
|
||||
elif (
|
||||
ipc_pld_spec
|
||||
and
|
||||
|
||||
# XXX required for now (or maybe forever?) until
|
||||
# we can dream up a way to allow parameterizing and/or
|
||||
# custom overrides to the `Msg`-spec protocol itself?
|
||||
ipc_msg_spec is None
|
||||
):
|
||||
# (manually) generate a msg-payload-spec for all relevant
|
||||
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
|
||||
# for the decoder such that all sub-type msgs in our SCIPP
|
||||
# will automatically decode to a type-"limited" payload (`Struct`)
|
||||
# object (set).
|
||||
payload_type_spec: Union[Type[Msg]]|None = None
|
||||
if payload_types:
|
||||
(
|
||||
ipc_msg_spec,
|
||||
payload_type_spec,
|
||||
msg_types,
|
||||
) = mk_msg_spec(
|
||||
payload_type_union=ipc_pld_spec,
|
||||
payload_type=payload_types,
|
||||
)
|
||||
assert len(ipc_msg_spec.__args__) == len(msg_types)
|
||||
assert ipc_msg_spec
|
||||
assert len(payload_type_spec.__args__) == len(msg_types)
|
||||
|
||||
else:
|
||||
ipc_msg_spec = ipc_msg_spec or Any
|
||||
|
||||
enc = msgpack.Encoder(
|
||||
enc_hook=enc_hook,
|
||||
)
|
||||
dec = msgpack.Decoder(
|
||||
type=ipc_msg_spec, # like `Msg[Any]`
|
||||
dec_hook=dec_hook,
|
||||
)
|
||||
# TODO: sub-decode `.pld: Raw`?
|
||||
# see similar notes inside `.msg.types`..
|
||||
#
|
||||
# not sure we'll end up wanting/needing this
|
||||
# though it might have unforeseen advantages in terms
|
||||
# of enabling encrypted appliciation layer (only)
|
||||
# payloads?
|
||||
#
|
||||
# register sub-payload decoders to load `.pld: Raw`
|
||||
# decoded `Msg`-packets using a dynamic lookup (table)
|
||||
# instead of a pre-defined msg-spec via `Generic`
|
||||
# parameterization.
|
||||
#
|
||||
# (
|
||||
# tags,
|
||||
# payload_dec,
|
||||
# ) = mk_tagged_union_dec(
|
||||
# tagged_structs=list(payload_types.__args__),
|
||||
# )
|
||||
# _payload_decs: (
|
||||
# dict[str, msgpack.Decoder]|None
|
||||
# ) = {
|
||||
# # pre-seed decoders for std-py-type-set for use when
|
||||
# # `Msg.pld == None|Any`.
|
||||
# None: msgpack.Decoder(Any),
|
||||
# Any: msgpack.Decoder(Any),
|
||||
# }
|
||||
# for name in tags:
|
||||
# _payload_decs[name] = payload_dec
|
||||
|
||||
codec = MsgCodec(
|
||||
_enc=enc,
|
||||
_dec=dec,
|
||||
pld_spec=ipc_pld_spec,
|
||||
# payload_msg_specs=payload_msg_specs,
|
||||
# **kwargs,
|
||||
ipc_msg_spec=ipc_msg_spec,
|
||||
payload_msg_spec=payload_type_spec,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# sanity on expected backend support
|
||||
assert codec.lib.__name__ == libname
|
||||
|
||||
# by default, config-n-cache the codec pair from input settings.
|
||||
if cache_now:
|
||||
assert codec.enc
|
||||
assert codec.dec
|
||||
|
||||
return codec
|
||||
|
||||
|
||||
# instance of the default `msgspec.msgpack` codec settings, i.e.
|
||||
# no custom structs, hooks or other special types.
|
||||
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
|
||||
_def_msgspec_codec: MsgCodec = mk_codec()
|
||||
|
||||
# NOTE: provides for per-`trio.Task` specificity of the
|
||||
# IPC msging codec used by the transport layer when doing
|
||||
# `Channel.send()/.recv()` of wire data.
|
||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||
'msgspec_codec',
|
||||
|
||||
# TODO: move this to our new `Msg`-spec!
|
||||
default=_def_msgspec_codec,
|
||||
)
|
||||
|
||||
|
@ -433,7 +353,7 @@ def limit_msg_spec(
|
|||
payload_types: Union[Type[Struct]],
|
||||
|
||||
# TODO: don't need this approach right?
|
||||
# -> related to the `MsgCodec._payload_decs` stuff above..
|
||||
#
|
||||
# tagged_structs: list[Struct]|None = None,
|
||||
|
||||
**codec_kwargs,
|
||||
|
|
|
@ -22,7 +22,9 @@ that is,
|
|||
the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol".
|
||||
|
||||
'''
|
||||
|
||||
from __future__ import annotations
|
||||
# from contextlib import contextmanager as cm
|
||||
import types
|
||||
from typing import (
|
||||
Any,
|
||||
|
@ -34,13 +36,22 @@ from typing import (
|
|||
)
|
||||
|
||||
from msgspec import (
|
||||
defstruct,
|
||||
# field,
|
||||
msgpack,
|
||||
Raw,
|
||||
Struct,
|
||||
UNSET,
|
||||
UnsetType,
|
||||
)
|
||||
|
||||
|
||||
# TODO: can also remove yah?
|
||||
#
|
||||
# class Header(Struct, tag=True):
|
||||
# '''
|
||||
# A msg header which defines payload properties
|
||||
|
||||
# '''
|
||||
# payload_tag: str|None = None
|
||||
|
||||
# type variable for the boxed payload field `.pld`
|
||||
PayloadT = TypeVar('PayloadT')
|
||||
|
||||
|
@ -50,9 +61,6 @@ class Msg(
|
|||
Generic[PayloadT],
|
||||
tag=True,
|
||||
tag_field='msg_type',
|
||||
|
||||
# eq=True,
|
||||
# order=True,
|
||||
):
|
||||
'''
|
||||
The "god" boxing msg type.
|
||||
|
@ -62,13 +70,9 @@ class Msg(
|
|||
tree.
|
||||
|
||||
'''
|
||||
# header: Header
|
||||
# TODO: use UNSET here?
|
||||
cid: str|None # call/context-id
|
||||
# ^-TODO-^: more explicit type?
|
||||
# -[ ] use UNSET here?
|
||||
# https://jcristharif.com/msgspec/supported-types.html#unset
|
||||
#
|
||||
# -[ ] `uuid.UUID` which has multi-protocol support
|
||||
# https://jcristharif.com/msgspec/supported-types.html#uuid
|
||||
|
||||
# The msgs "payload" (spelled without vowels):
|
||||
# https://en.wikipedia.org/wiki/Payload_(computing)
|
||||
|
@ -90,24 +94,9 @@ class Msg(
|
|||
pld: PayloadT
|
||||
|
||||
|
||||
# TODO: caps based RPC support in the payload?
|
||||
#
|
||||
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
|
||||
# ``pkgutil.resolve_name()`` internally uses
|
||||
# ``importlib.import_module()`` which can be filtered by
|
||||
# inserting a ``MetaPathFinder`` into ``sys.meta_path`` (which
|
||||
# we could do before entering the ``Actor._process_messages()``
|
||||
# loop)?
|
||||
# - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
|
||||
# - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
|
||||
# - https://stackoverflow.com/a/63320902
|
||||
# - https://docs.python.org/3/library/sys.html#sys.meta_path
|
||||
#
|
||||
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
|
||||
#
|
||||
# -[ ]better name, like `Call/TaskInput`?
|
||||
#
|
||||
# TODO: better name, like `Call/TaskInput`?
|
||||
class FuncSpec(Struct):
|
||||
# TODO: can we combine these 2 into a `NamespacePath` field?
|
||||
ns: str
|
||||
func: str
|
||||
|
||||
|
@ -137,18 +126,19 @@ class Start(
|
|||
pld: FuncSpec
|
||||
|
||||
|
||||
FuncType: Literal[
|
||||
'asyncfunc',
|
||||
'asyncgen',
|
||||
'context', # TODO: the only one eventually?
|
||||
] = 'context'
|
||||
|
||||
|
||||
class IpcCtxSpec(Struct):
|
||||
'''
|
||||
An inter-actor-`trio.Task`-comms `Context` spec.
|
||||
|
||||
'''
|
||||
# TODO: maybe better names for all these?
|
||||
# -[ ] obvi ^ would need sync with `._rpc`
|
||||
functype: Literal[
|
||||
'asyncfunc',
|
||||
'asyncgen',
|
||||
'context', # TODO: the only one eventually?
|
||||
]
|
||||
functype: FuncType
|
||||
|
||||
# TODO: as part of the reponse we should report our allowed
|
||||
# msg spec which should be generated from the type-annots as
|
||||
|
@ -182,7 +172,6 @@ class Started(
|
|||
decorated IPC endpoint.
|
||||
|
||||
'''
|
||||
pld: PayloadT
|
||||
|
||||
|
||||
# TODO: instead of using our existing `Start`
|
||||
|
@ -199,7 +188,6 @@ class Yield(
|
|||
Per IPC transmission of a value from `await MsgStream.send(<value>)`.
|
||||
|
||||
'''
|
||||
pld: PayloadT
|
||||
|
||||
|
||||
class Stop(Msg):
|
||||
|
@ -208,7 +196,7 @@ class Stop(Msg):
|
|||
of `StopAsyncIteration`.
|
||||
|
||||
'''
|
||||
pld: UnsetType = UNSET
|
||||
pld: UNSET
|
||||
|
||||
|
||||
class Return(
|
||||
|
@ -220,7 +208,6 @@ class Return(
|
|||
func-as-`trio.Task`.
|
||||
|
||||
'''
|
||||
pld: PayloadT
|
||||
|
||||
|
||||
class ErrorData(Struct):
|
||||
|
@ -261,115 +248,46 @@ class Error(Msg):
|
|||
# cid: str
|
||||
|
||||
|
||||
# built-in SC shuttle protocol msg type set in
|
||||
# approx order of the IPC txn-state spaces.
|
||||
__spec__: list[Msg] = [
|
||||
|
||||
# inter-actor RPC initiation
|
||||
Start,
|
||||
StartAck,
|
||||
|
||||
# no-outcome-yet IAC (inter-actor-communication)
|
||||
Started,
|
||||
Yield,
|
||||
Stop,
|
||||
|
||||
# termination outcomes
|
||||
Return,
|
||||
Error,
|
||||
]
|
||||
|
||||
_runtime_spec_msgs: list[Msg] = [
|
||||
Start,
|
||||
StartAck,
|
||||
Stop,
|
||||
Error,
|
||||
]
|
||||
_payload_spec_msgs: list[Msg] = [
|
||||
Started,
|
||||
Yield,
|
||||
Return,
|
||||
]
|
||||
|
||||
|
||||
def mk_msg_spec(
|
||||
payload_type_union: Union[Type] = Any,
|
||||
|
||||
# boxing_msg_set: list[Msg] = _payload_spec_msgs,
|
||||
spec_build_method: Literal[
|
||||
'indexed_generics', # works
|
||||
'defstruct',
|
||||
'types_new_class',
|
||||
|
||||
] = 'indexed_generics',
|
||||
payload_type: Union[Type] = Any,
|
||||
boxing_msg_set: set[Msg] = {
|
||||
Started,
|
||||
Yield,
|
||||
Return,
|
||||
},
|
||||
|
||||
) -> tuple[
|
||||
Union[Type[Msg]],
|
||||
list[Type[Msg]],
|
||||
]:
|
||||
'''
|
||||
Create a payload-(data-)type-parameterized IPC message specification.
|
||||
|
||||
Allows generating IPC msg types from the above builtin set
|
||||
with a payload (field) restricted data-type via the `Msg.pld:
|
||||
PayloadT` type var. This allows runtime-task contexts to use
|
||||
the python type system to limit/filter payload values as
|
||||
determined by the input `payload_type_union: Union[Type]`.
|
||||
Generate a payload-type-parameterized `Msg` specification such
|
||||
that IPC msgs which can be `Msg.pld` (payload) type
|
||||
limited/filterd are specified given an input `payload_type:
|
||||
Union[Type]`.
|
||||
|
||||
'''
|
||||
submsg_types: list[Type[Msg]] = Msg.__subclasses__()
|
||||
bases: tuple = (
|
||||
# XXX NOTE XXX the below generic-parameterization seems to
|
||||
# be THE ONLY way to get this to work correctly in terms
|
||||
# of getting ValidationError on a roundtrip?
|
||||
Msg[payload_type_union],
|
||||
Generic[PayloadT],
|
||||
)
|
||||
defstruct_bases: tuple = (
|
||||
Msg, # [payload_type_union],
|
||||
# Generic[PayloadT],
|
||||
# ^-XXX-^: not allowed? lul..
|
||||
)
|
||||
ipc_msg_types: list[Msg] = []
|
||||
|
||||
idx_msg_types: list[Msg] = []
|
||||
defs_msg_types: list[Msg] = []
|
||||
nc_msg_types: list[Msg] = []
|
||||
|
||||
for msgtype in __spec__:
|
||||
|
||||
# for the NON-payload (user api) type specify-able
|
||||
# msgs types, we simply aggregate the def as is
|
||||
# for inclusion in the output type `Union`.
|
||||
if msgtype not in _payload_spec_msgs:
|
||||
ipc_msg_types.append(msgtype)
|
||||
continue
|
||||
# TODO: see below as well,
|
||||
# => union building approach with `.__class_getitem__()`
|
||||
# doesn't seem to work..?
|
||||
#
|
||||
# payload_type_spec: Union[Type[Msg]]
|
||||
#
|
||||
msg_types: list[Msg] = []
|
||||
for msgtype in boxing_msg_set:
|
||||
|
||||
# check inheritance sanity
|
||||
assert msgtype in submsg_types
|
||||
|
||||
# TODO: wait why do we need the dynamic version here?
|
||||
# XXX ANSWER XXX -> BC INHERITANCE.. don't work w generics..
|
||||
#
|
||||
# NOTE previously bc msgtypes WERE NOT inheritting
|
||||
# directly the `Generic[PayloadT]` type, the manual method
|
||||
# of generic-paraming with `.__class_getitem__()` wasn't
|
||||
# working..
|
||||
#
|
||||
# XXX but bc i changed that to make every subtype inherit
|
||||
# it, this manual "indexed parameterization" method seems
|
||||
# to work?
|
||||
#
|
||||
# -[x] paraming the `PayloadT` values via `Generic[T]`
|
||||
# does work it seems but WITHOUT inheritance of generics
|
||||
#
|
||||
# -[-] is there a way to get it to work at module level
|
||||
# -[ ] paraming the `PayloadT` values via `Generic[T]`
|
||||
# doesn't seem to work at all?
|
||||
# -[ ] is there a way to get it to work at module level
|
||||
# just using inheritance or maybe a metaclass?
|
||||
# => thot that `defstruct` might work, but NOPE, see
|
||||
# below..
|
||||
#
|
||||
idxed_msg_type: Msg = msgtype[payload_type_union]
|
||||
idx_msg_types.append(idxed_msg_type)
|
||||
# index_paramed_msg_type: Msg = msgtype[payload_type]
|
||||
|
||||
# TODO: WHY do we need to dynamically generate the
|
||||
# subtype-msgs here to ensure the `.pld` parameterization
|
||||
|
@ -377,69 +295,98 @@ def mk_msg_spec(
|
|||
# `msgpack.Decoder()`..?
|
||||
#
|
||||
# dynamically create the payload type-spec-limited msg set.
|
||||
newclass_msgtype: Type = types.new_class(
|
||||
name=msgtype.__name__,
|
||||
bases=bases,
|
||||
kwds={},
|
||||
)
|
||||
nc_msg_types.append(
|
||||
newclass_msgtype[payload_type_union]
|
||||
manual_paramed_msg_subtype: Type = types.new_class(
|
||||
msgtype.__name__,
|
||||
(
|
||||
# XXX NOTE XXX this seems to be THE ONLY
|
||||
# way to get this to work correctly!?!
|
||||
Msg[payload_type],
|
||||
Generic[PayloadT],
|
||||
),
|
||||
{},
|
||||
)
|
||||
|
||||
# with `msgspec.structs.defstruct`
|
||||
# XXX ALSO DOESN'T WORK
|
||||
defstruct_msgtype = defstruct(
|
||||
name=msgtype.__name__,
|
||||
fields=[
|
||||
('cid', str),
|
||||
|
||||
# XXX doesn't seem to work..
|
||||
# ('pld', PayloadT),
|
||||
|
||||
('pld', payload_type_union),
|
||||
],
|
||||
bases=defstruct_bases,
|
||||
)
|
||||
defs_msg_types.append(defstruct_msgtype)
|
||||
|
||||
# TODO: grok the diff here better..
|
||||
# assert index_paramed_msg_type == manual_paramed_msg_subtype
|
||||
|
||||
# paramed_msg_type = manual_paramed_msg_subtype
|
||||
# XXX TODO: why does the manual method work but not the
|
||||
# `.__class_getitem__()` one!?!
|
||||
paramed_msg_type = manual_paramed_msg_subtype
|
||||
|
||||
# ipc_payload_msgs_type_union |= index_paramed_msg_type
|
||||
# payload_type_spec |= paramed_msg_type
|
||||
msg_types.append(paramed_msg_type)
|
||||
|
||||
idx_spec: Union[Type[Msg]] = Union[*idx_msg_types]
|
||||
def_spec: Union[Type[Msg]] = Union[*defs_msg_types]
|
||||
nc_spec: Union[Type[Msg]] = Union[*nc_msg_types]
|
||||
|
||||
specs: dict[str, Union[Type[Msg]]] = {
|
||||
'indexed_generics': idx_spec,
|
||||
'defstruct': def_spec,
|
||||
'types_new_class': nc_spec,
|
||||
}
|
||||
msgtypes_table: dict[str, list[Msg]] = {
|
||||
'indexed_generics': idx_msg_types,
|
||||
'defstruct': defs_msg_types,
|
||||
'types_new_class': nc_msg_types,
|
||||
}
|
||||
|
||||
# XXX lol apparently type unions can't ever
|
||||
# be equal eh?
|
||||
# TODO: grok the diff here better..
|
||||
#
|
||||
# assert (
|
||||
# idx_spec
|
||||
# ==
|
||||
# nc_spec
|
||||
# ==
|
||||
# def_spec
|
||||
# )
|
||||
# breakpoint()
|
||||
|
||||
pld_spec: Union[Type] = specs[spec_build_method]
|
||||
runtime_spec: Union[Type] = Union[*ipc_msg_types]
|
||||
|
||||
payload_type_spec: Union[Type[Msg]] = Union[*msg_types]
|
||||
return (
|
||||
pld_spec | runtime_spec,
|
||||
msgtypes_table[spec_build_method] + ipc_msg_types,
|
||||
payload_type_spec,
|
||||
msg_types,
|
||||
)
|
||||
|
||||
|
||||
# TODO: integration with our ``enable_modules: list[str]`` caps sys.
|
||||
#
|
||||
# ``pkgutil.resolve_name()`` internally uses
|
||||
# ``importlib.import_module()`` which can be filtered by inserting
|
||||
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
|
||||
# entering the ``Actor._process_messages()`` loop).
|
||||
# https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
|
||||
# https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
|
||||
# - https://stackoverflow.com/a/63320902
|
||||
# - https://docs.python.org/3/library/sys.html#sys.meta_path
|
||||
|
||||
# TODO: do we still want to try and support the sub-decoder with
|
||||
# `Raw` technique in the case that the `Generic` approach gives
|
||||
# future grief?
|
||||
#
|
||||
# sub-decoders for retreiving embedded
|
||||
# payload data and decoding to a sender
|
||||
# side defined (struct) type.
|
||||
_payload_decs: dict[
|
||||
str|None,
|
||||
msgpack.Decoder,
|
||||
] = {
|
||||
# default decoder is used when `Header.payload_tag == None`
|
||||
None: msgpack.Decoder(Any),
|
||||
}
|
||||
|
||||
|
||||
def dec_payload(
|
||||
msg: Msg,
|
||||
msg_dec: msgpack.Decoder = msgpack.Decoder(
|
||||
type=Msg[Any]
|
||||
),
|
||||
|
||||
) -> Any|Struct:
|
||||
|
||||
msg: Msg = msg_dec.decode(msg)
|
||||
payload_tag: str = msg.header.payload_tag
|
||||
payload_dec: msgpack.Decoder = _payload_decs[payload_tag]
|
||||
return payload_dec.decode(msg.pld)
|
||||
|
||||
|
||||
def enc_payload(
|
||||
enc: msgpack.Encoder,
|
||||
payload: Any,
|
||||
cid: str,
|
||||
|
||||
) -> bytes:
|
||||
|
||||
# tag_field: str|None = None
|
||||
|
||||
plbytes = enc.encode(payload)
|
||||
if b'msg_type' in plbytes:
|
||||
assert isinstance(payload, Struct)
|
||||
|
||||
# tag_field: str = type(payload).__name__
|
||||
payload = Raw(plbytes)
|
||||
|
||||
msg = Msg(
|
||||
cid=cid,
|
||||
pld=payload,
|
||||
# Header(
|
||||
# payload_tag=tag_field,
|
||||
# # dialog_id,
|
||||
# ),
|
||||
)
|
||||
return enc.encode(msg)
|
||||
|
|
Loading…
Reference in New Issue