forked from goodboy/tractor
WIP tagged union message type API
parent
e5ee2e3de8
commit
ec226463e3
|
@ -40,10 +40,19 @@ Built-in messaging patterns, types, APIs and helpers.
|
||||||
# - https://jcristharif.com/msgspec/api.html#struct
|
# - https://jcristharif.com/msgspec/api.html#struct
|
||||||
# - https://jcristharif.com/msgspec/extending.html
|
# - https://jcristharif.com/msgspec/extending.html
|
||||||
# via ``msgpack-python``:
|
# via ``msgpack-python``:
|
||||||
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
|
# https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from contextlib import contextmanager as cm
|
||||||
from pkgutil import resolve_name
|
from pkgutil import resolve_name
|
||||||
|
from typing import Union, Any
|
||||||
|
|
||||||
|
|
||||||
|
from msgspec import Struct
|
||||||
|
from msgspec.msgpack import (
|
||||||
|
Encoder,
|
||||||
|
Decoder,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class NamespacePath(str):
|
class NamespacePath(str):
|
||||||
|
@ -78,3 +87,65 @@ class NamespacePath(str):
|
||||||
(ref.__module__,
|
(ref.__module__,
|
||||||
getattr(ref, '__name__', ''))
|
getattr(ref, '__name__', ''))
|
||||||
))
|
))
|
||||||
|
|
||||||
|
|
||||||
|
# LIFO codec stack that is appended when the user opens the
|
||||||
|
# ``configure_native_msgs()`` cm below to configure a new codec set
|
||||||
|
# which will be applied to all new (msgspec relevant) IPC transports
|
||||||
|
# that are spawned **after** the configure call is made.
|
||||||
|
_lifo_codecs: list[
|
||||||
|
tuple[
|
||||||
|
Encoder,
|
||||||
|
Decoder,
|
||||||
|
],
|
||||||
|
] = [(Encoder(), Decoder())]
|
||||||
|
|
||||||
|
|
||||||
|
def get_msg_codecs() -> tuple[
|
||||||
|
Encoder,
|
||||||
|
Decoder,
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Return the currently configured ``msgspec`` codec set.
|
||||||
|
|
||||||
|
The defaults are defined above.
|
||||||
|
|
||||||
|
'''
|
||||||
|
global _lifo_codecs
|
||||||
|
return _lifo_codecs[-1]
|
||||||
|
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def configure_native_msgs(
|
||||||
|
tagged_structs: list[Struct],
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Push a codec set that will natively decode
|
||||||
|
tagged structs provied in ``tagged_structs``
|
||||||
|
in all IPC transports and pop the codec on exit.
|
||||||
|
|
||||||
|
'''
|
||||||
|
global _lifo_codecs
|
||||||
|
|
||||||
|
# See "tagged unions" docs:
|
||||||
|
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||||
|
|
||||||
|
# "The quickest way to enable tagged unions is to set tag=True when
|
||||||
|
# defining every struct type in the union. In this case tag_field
|
||||||
|
# defaults to "type", and tag defaults to the struct class name
|
||||||
|
# (e.g. "Get")."
|
||||||
|
enc = Encoder()
|
||||||
|
|
||||||
|
types_union = Union[tagged_structs[0]] | Any
|
||||||
|
for struct in tagged_structs[1:]:
|
||||||
|
types_union |= struct
|
||||||
|
|
||||||
|
dec = Decoder(types_union)
|
||||||
|
|
||||||
|
_lifo_codecs.append((enc, dec))
|
||||||
|
try:
|
||||||
|
print("YOYOYOOYOYOYOY")
|
||||||
|
yield enc, dec
|
||||||
|
finally:
|
||||||
|
print("NONONONONON")
|
||||||
|
_lifo_codecs.pop()
|
||||||
|
|
Loading…
Reference in New Issue