forked from goodboy/tractor
1
0
Fork 0

Add a `MsgDec` for receive-only decoding

In prep for a "payload receiver" abstraction that will wrap
`MsgType.pld`-IO delivery from `Context` and `MsgStream`, adds a small
`msgspec.msgpack.Decoder` shim which delegates an API similar to
`MsgCodec` and is offered via a `.msg._codec.mk_dec()` factory.

Detalles:
- move over the TODOs/comments from `.msg.types.Start` to to
  `MsgDec.spec` since it's probably the ideal spot to start thinking
  about it from a consumer code PoV.
- move codec reversion assert and log emit into `finally:` block.
- flip default `.types._tractor_codec = mk_codec_ipc_pld(ipc_pld_spec=Raw)`
  in prep for always doing payload-delayed decodes.
- make `MsgCodec._dec` private with public property getter.
- change `CancelAck` to NOT derive from `Return` so it's mutex in
  `match/case:` handling.
runtime_to_msgspec
Tyler Goodlet 2024-04-22 18:24:42 -04:00
parent 0df7d557db
commit a51632ffa6
3 changed files with 141 additions and 50 deletions

View File

@ -34,6 +34,7 @@ from ._codec import (
apply_codec as apply_codec, apply_codec as apply_codec,
mk_codec as mk_codec, mk_codec as mk_codec,
MsgCodec as MsgCodec, MsgCodec as MsgCodec,
MsgDec as MsgDec,
current_codec as current_codec, current_codec as current_codec,
) )
@ -50,6 +51,7 @@ from .types import (
Yield as Yield, Yield as Yield,
Stop as Stop, Stop as Stop,
Return as Return, Return as Return,
CancelAck as CancelAck,
Error as Error, Error as Error,

View File

@ -50,7 +50,7 @@ from types import ModuleType
import msgspec import msgspec
from msgspec import ( from msgspec import (
msgpack, msgpack,
# Raw, Raw,
) )
# from trio.lowlevel import ( # from trio.lowlevel import (
# RunVar, # RunVar,
@ -71,6 +71,108 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
# TODO: unify with `MsgCodec` by making `._dec` part this?
class MsgDec(Struct):
'''
An IPC msg decoder.
Normally used to decode only a payload: `MsgType.pld:
PayloadT` field before delivery to IPC consumer code.
'''
_dec: msgpack.Decoder
@property
def dec(self) -> msgpack.Decoder:
return self._dec
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
#
# ^-TODO-^: make a wrapper type for this such that alt
# backends can be represented easily without a `Union` needed,
# AND so that we have better support for wire transport.
#
# -[ ] maybe `FieldSpec` is a good name since msg-spec
# better applies to a `MsgType[FieldSpec]`?
#
# -[ ] both as part of the `.open_context()` call AND as part of the
# immediate ack-reponse (see similar below)
# we should do spec matching and fail if anything is awry?
#
# -[ ] eventually spec should be generated/parsed from the
# type-annots as # desired in GH issue:
# https://github.com/goodboy/tractor/issues/365
#
# -[ ] semantics of the mismatch case
# - when caller-callee specs we should raise
# a `MsgTypeError` or `MsgSpecError` or similar?
#
# -[ ] wrapper types for both spec types such that we can easily
# IPC transport them?
# - `TypeSpec: Union[Type]`
# * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions..
# - `MsgSpec: Union[Type[Msg]]
#
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
# |_ historical pep 695: https://peps.python.org/pep-0695/
# |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/
# |_ on annotation scopes:
# https://docs.python.org/3/reference/executionmodel.html#annotation-scopes
# |_ 3.13 will have subscriptable funcs Bo
# https://peps.python.org/pep-0718/
@property
def spec(self) -> Union[Type[Struct]]:
# NOTE: defined and applied inside `mk_codec()`
return self._dec.type
# no difference, as compared to a `MsgCodec` which defines the
# `MsgType.pld: PayloadT` part of its spec separately
pld_spec = spec
# TODO: would get moved into `FieldSpec.__str__()` right?
@property
def spec_str(self) -> str:
# TODO: could also use match: instead?
spec: Union[Type]|Type = self.spec
# `typing.Union` case
if getattr(spec, '__args__', False):
return str(spec)
# just a single type
else:
return spec.__name__
pld_spec_str = spec_str
def decode(
self,
raw: Raw|bytes,
) -> Any:
return self._dec.decode(raw)
@property
def hook(self) -> Callable|None:
return self._dec.dec_hook
def mk_dec(
spec: Union[Type[Struct]]|Any = Any,
dec_hook: Callable|None = None,
) -> MsgDec:
return msgpack.Decoder(
type=spec, # like `Msg[Any]`
dec_hook=dec_hook,
)
# TODO: overall IPC msg-spec features (i.e. in this mod)! # TODO: overall IPC msg-spec features (i.e. in this mod)!
# #
# -[ ] API changes towards being interchange lib agnostic! # -[ ] API changes towards being interchange lib agnostic!
@ -94,8 +196,7 @@ class MsgCodec(Struct):
''' '''
_enc: msgpack.Encoder _enc: msgpack.Encoder
_dec: msgpack.Decoder _dec: msgpack.Decoder
_pld_spec: Type[Struct]|Raw|Any
pld_spec: Union[Type[Struct]]|None
def __repr__(self) -> str: def __repr__(self) -> str:
speclines: str = textwrap.indent( speclines: str = textwrap.indent(
@ -118,14 +219,21 @@ class MsgCodec(Struct):
')>' ')>'
) )
@property
def pld_spec(self) -> Type[Struct]|Raw|Any:
return self._pld_spec
@property @property
def pld_spec_str(self) -> str: def pld_spec_str(self) -> str:
spec: Union[Type]|Type = self.pld_spec
# TODO: could also use match: instead? # TODO: could also use match: instead?
spec: Union[Type]|Type = self.pld_spec
# `typing.Union` case
if getattr(spec, '__args__', False): if getattr(spec, '__args__', False):
# `typing.Union` case
return str(spec) return str(spec)
# just a single type
else: else:
return spec.__name__ return spec.__name__
@ -133,6 +241,7 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
@property @property
def msg_spec(self) -> Union[Type[Struct]]: def msg_spec(self) -> Union[Type[Struct]]:
# NOTE: defined and applied inside `mk_codec()`
return self._dec.type return self._dec.type
def msg_spec_items( def msg_spec_items(
@ -157,8 +266,9 @@ class MsgCodec(Struct):
def pformat_msg_spec( def pformat_msg_spec(
self, self,
msg: MsgType|None = None, msg: MsgType|None = None,
join_char: str = '\n',
) -> str: ) -> str:
return '\n'.join( return join_char.join(
self.msg_spec_items(msg=msg).values() self.msg_spec_items(msg=msg).values()
) )
@ -405,18 +515,25 @@ def mk_codec(
assert len(ipc_msg_spec.__args__) == len(msg_types) assert len(ipc_msg_spec.__args__) == len(msg_types)
assert ipc_msg_spec assert ipc_msg_spec
# TODO: use this shim instead?
# bc.. unification, err somethin?
# dec: MsgDec = mk_dec(
# spec=ipc_msg_spec,
# dec_hook=dec_hook,
# )
dec = msgpack.Decoder(
type=ipc_msg_spec,
dec_hook=dec_hook,
)
enc = msgpack.Encoder( enc = msgpack.Encoder(
enc_hook=enc_hook, enc_hook=enc_hook,
) )
dec = msgpack.Decoder(
type=ipc_msg_spec, # like `Msg[Any]`
dec_hook=dec_hook,
)
codec = MsgCodec( codec = MsgCodec(
_enc=enc, _enc=enc,
_dec=dec, _dec=dec,
pld_spec=ipc_pld_spec, _pld_spec=ipc_pld_spec,
) )
# sanity on expected backend support # sanity on expected backend support
@ -435,10 +552,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# https://jcristharif.com/msgspec/supported-types.html # https://jcristharif.com/msgspec/supported-types.html
# #
_def_tractor_codec: MsgCodec = mk_codec( _def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any,
# TODO: use this for debug mode locking prot? # TODO: use this for debug mode locking prot?
# ipc_pld_spec=Raw, # ipc_pld_spec=Any,
ipc_pld_spec=Raw,
) )
# TODO: IDEALLY provides for per-`trio.Task` specificity of the # TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
@ -538,12 +654,12 @@ def apply_codec(
yield var.get() yield var.get()
finally: finally:
var.reset(token) var.reset(token)
log.info(
'Reverted to last msg-spec codec\n\n'
f'{orig}\n'
)
assert var.get() is orig
assert var.get() is orig
log.info(
'Reverted to last msg-spec codec\n\n'
f'{orig}\n'
)
def current_codec() -> MsgCodec: def current_codec() -> MsgCodec:
''' '''
@ -574,7 +690,7 @@ def limit_msg_spec(
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
curr_codec = current_codec() curr_codec: MsgCodec = current_codec()
msgspec_codec: MsgCodec = mk_codec( msgspec_codec: MsgCodec = mk_codec(
ipc_pld_spec=payload_spec, ipc_pld_spec=payload_spec,
**codec_kwargs, **codec_kwargs,

View File

@ -266,35 +266,7 @@ class Start(
# TODO: enforcing a msg-spec in terms `Msg.pld` # TODO: enforcing a msg-spec in terms `Msg.pld`
# parameterizable msgs to be used in the appls IPC dialog. # parameterizable msgs to be used in the appls IPC dialog.
# # => SEE `._codec.MsgDec` for more <=
# -[ ] both as part of the `.open_context()` call AND as part of the
# immediate ack-reponse (see similar below)
# we should do spec matching and fail if anything is awry?
#
# -[ ] eventually spec should be generated/parsed from the
# type-annots as # desired in GH issue:
# https://github.com/goodboy/tractor/issues/365
#
# -[ ] semantics of the mismatch case
# - when caller-callee specs we should raise
# a `MsgTypeError` or `MsgSpecError` or similar?
#
# -[ ] wrapper types for both spec types such that we can easily
# IPC transport them?
# - `TypeSpec: Union[Type]`
# * also a `.__contains__()` for doing `None in
# TypeSpec[None|int]` since rn you need to do it on
# `.__args__` for unions..
# - `MsgSpec: Union[Type[Msg]]
#
# -[ ] auto-genning this from new (in 3.12) type parameter lists Bo
# |_ https://docs.python.org/3/reference/compound_stmts.html#type-params
# |_ historical pep 695: https://peps.python.org/pep-0695/
# |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/
# |_ on annotation scopes:
# https://docs.python.org/3/reference/executionmodel.html#annotation-scopes
# |_ 3.13 will have subscriptable funcs Bo
# https://peps.python.org/pep-0718/
pld_spec: str = str(Any) pld_spec: str = str(Any)
@ -382,7 +354,8 @@ class Return(
class CancelAck( class CancelAck(
Return, Msg,
Generic[PayloadT],
): ):
''' '''
Deliver the `bool` return-value from a cancellation `Actor` Deliver the `bool` return-value from a cancellation `Actor`