diff --git a/tractor/msg/__init__.py b/tractor/msg/__init__.py index 8f13f5f..d968f6c 100644 --- a/tractor/msg/__init__.py +++ b/tractor/msg/__init__.py @@ -34,6 +34,7 @@ from ._codec import ( apply_codec as apply_codec, mk_codec as mk_codec, MsgCodec as MsgCodec, + MsgDec as MsgDec, current_codec as current_codec, ) @@ -50,6 +51,7 @@ from .types import ( Yield as Yield, Stop as Stop, Return as Return, + CancelAck as CancelAck, Error as Error, diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index 766a297..104f7d9 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -50,7 +50,7 @@ from types import ModuleType import msgspec from msgspec import ( msgpack, - # Raw, + Raw, ) # from trio.lowlevel import ( # RunVar, @@ -71,6 +71,108 @@ if TYPE_CHECKING: log = get_logger(__name__) + +# TODO: unify with `MsgCodec` by making `._dec` part this? +class MsgDec(Struct): + ''' + An IPC msg decoder. + + Normally used to decode only a payload: `MsgType.pld: + PayloadT` field before delivery to IPC consumer code. + + ''' + _dec: msgpack.Decoder + + @property + def dec(self) -> msgpack.Decoder: + return self._dec + + # struct type unions + # https://jcristharif.com/msgspec/structs.html#tagged-unions + # + # ^-TODO-^: make a wrapper type for this such that alt + # backends can be represented easily without a `Union` needed, + # AND so that we have better support for wire transport. + # + # -[ ] maybe `FieldSpec` is a good name since msg-spec + # better applies to a `MsgType[FieldSpec]`? + # + # -[ ] both as part of the `.open_context()` call AND as part of the + # immediate ack-reponse (see similar below) + # we should do spec matching and fail if anything is awry? + # + # -[ ] eventually spec should be generated/parsed from the + # type-annots as # desired in GH issue: + # https://github.com/goodboy/tractor/issues/365 + # + # -[ ] semantics of the mismatch case + # - when caller-callee specs we should raise + # a `MsgTypeError` or `MsgSpecError` or similar? + # + # -[ ] wrapper types for both spec types such that we can easily + # IPC transport them? + # - `TypeSpec: Union[Type]` + # * also a `.__contains__()` for doing `None in + # TypeSpec[None|int]` since rn you need to do it on + # `.__args__` for unions.. + # - `MsgSpec: Union[Type[Msg]] + # + # -[ ] auto-genning this from new (in 3.12) type parameter lists Bo + # |_ https://docs.python.org/3/reference/compound_stmts.html#type-params + # |_ historical pep 695: https://peps.python.org/pep-0695/ + # |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/ + # |_ on annotation scopes: + # https://docs.python.org/3/reference/executionmodel.html#annotation-scopes + # |_ 3.13 will have subscriptable funcs Bo + # https://peps.python.org/pep-0718/ + @property + def spec(self) -> Union[Type[Struct]]: + # NOTE: defined and applied inside `mk_codec()` + return self._dec.type + + # no difference, as compared to a `MsgCodec` which defines the + # `MsgType.pld: PayloadT` part of its spec separately + pld_spec = spec + + # TODO: would get moved into `FieldSpec.__str__()` right? + @property + def spec_str(self) -> str: + + # TODO: could also use match: instead? + spec: Union[Type]|Type = self.spec + + # `typing.Union` case + if getattr(spec, '__args__', False): + return str(spec) + + # just a single type + else: + return spec.__name__ + + pld_spec_str = spec_str + + def decode( + self, + raw: Raw|bytes, + ) -> Any: + return self._dec.decode(raw) + + @property + def hook(self) -> Callable|None: + return self._dec.dec_hook + + +def mk_dec( + spec: Union[Type[Struct]]|Any = Any, + dec_hook: Callable|None = None, + +) -> MsgDec: + + return msgpack.Decoder( + type=spec, # like `Msg[Any]` + dec_hook=dec_hook, + ) + # TODO: overall IPC msg-spec features (i.e. in this mod)! # # -[ ] API changes towards being interchange lib agnostic! @@ -94,8 +196,7 @@ class MsgCodec(Struct): ''' _enc: msgpack.Encoder _dec: msgpack.Decoder - - pld_spec: Union[Type[Struct]]|None + _pld_spec: Type[Struct]|Raw|Any def __repr__(self) -> str: speclines: str = textwrap.indent( @@ -118,14 +219,21 @@ class MsgCodec(Struct): ')>' ) + @property + def pld_spec(self) -> Type[Struct]|Raw|Any: + return self._pld_spec + @property def pld_spec_str(self) -> str: - spec: Union[Type]|Type = self.pld_spec # TODO: could also use match: instead? + spec: Union[Type]|Type = self.pld_spec + + # `typing.Union` case if getattr(spec, '__args__', False): - # `typing.Union` case return str(spec) + + # just a single type else: return spec.__name__ @@ -133,6 +241,7 @@ class MsgCodec(Struct): # https://jcristharif.com/msgspec/structs.html#tagged-unions @property def msg_spec(self) -> Union[Type[Struct]]: + # NOTE: defined and applied inside `mk_codec()` return self._dec.type def msg_spec_items( @@ -157,8 +266,9 @@ class MsgCodec(Struct): def pformat_msg_spec( self, msg: MsgType|None = None, + join_char: str = '\n', ) -> str: - return '\n'.join( + return join_char.join( self.msg_spec_items(msg=msg).values() ) @@ -405,18 +515,25 @@ def mk_codec( assert len(ipc_msg_spec.__args__) == len(msg_types) assert ipc_msg_spec + # TODO: use this shim instead? + # bc.. unification, err somethin? + # dec: MsgDec = mk_dec( + # spec=ipc_msg_spec, + # dec_hook=dec_hook, + # ) + + dec = msgpack.Decoder( + type=ipc_msg_spec, + dec_hook=dec_hook, + ) enc = msgpack.Encoder( enc_hook=enc_hook, ) - dec = msgpack.Decoder( - type=ipc_msg_spec, # like `Msg[Any]` - dec_hook=dec_hook, - ) codec = MsgCodec( _enc=enc, _dec=dec, - pld_spec=ipc_pld_spec, + _pld_spec=ipc_pld_spec, ) # sanity on expected backend support @@ -435,10 +552,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any) # https://jcristharif.com/msgspec/supported-types.html # _def_tractor_codec: MsgCodec = mk_codec( - ipc_pld_spec=Any, - # TODO: use this for debug mode locking prot? - # ipc_pld_spec=Raw, + # ipc_pld_spec=Any, + ipc_pld_spec=Raw, ) # TODO: IDEALLY provides for per-`trio.Task` specificity of the # IPC msging codec used by the transport layer when doing @@ -538,12 +654,12 @@ def apply_codec( yield var.get() finally: var.reset(token) + log.info( + 'Reverted to last msg-spec codec\n\n' + f'{orig}\n' + ) + assert var.get() is orig - assert var.get() is orig - log.info( - 'Reverted to last msg-spec codec\n\n' - f'{orig}\n' - ) def current_codec() -> MsgCodec: ''' @@ -574,7 +690,7 @@ def limit_msg_spec( ''' __tracebackhide__: bool = True - curr_codec = current_codec() + curr_codec: MsgCodec = current_codec() msgspec_codec: MsgCodec = mk_codec( ipc_pld_spec=payload_spec, **codec_kwargs, diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 59ec2a4..cb12432 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -266,35 +266,7 @@ class Start( # TODO: enforcing a msg-spec in terms `Msg.pld` # parameterizable msgs to be used in the appls IPC dialog. - # - # -[ ] both as part of the `.open_context()` call AND as part of the - # immediate ack-reponse (see similar below) - # we should do spec matching and fail if anything is awry? - # - # -[ ] eventually spec should be generated/parsed from the - # type-annots as # desired in GH issue: - # https://github.com/goodboy/tractor/issues/365 - # - # -[ ] semantics of the mismatch case - # - when caller-callee specs we should raise - # a `MsgTypeError` or `MsgSpecError` or similar? - # - # -[ ] wrapper types for both spec types such that we can easily - # IPC transport them? - # - `TypeSpec: Union[Type]` - # * also a `.__contains__()` for doing `None in - # TypeSpec[None|int]` since rn you need to do it on - # `.__args__` for unions.. - # - `MsgSpec: Union[Type[Msg]] - # - # -[ ] auto-genning this from new (in 3.12) type parameter lists Bo - # |_ https://docs.python.org/3/reference/compound_stmts.html#type-params - # |_ historical pep 695: https://peps.python.org/pep-0695/ - # |_ full lang spec: https://typing.readthedocs.io/en/latest/spec/ - # |_ on annotation scopes: - # https://docs.python.org/3/reference/executionmodel.html#annotation-scopes - # |_ 3.13 will have subscriptable funcs Bo - # https://peps.python.org/pep-0718/ + # => SEE `._codec.MsgDec` for more <= pld_spec: str = str(Any) @@ -382,7 +354,8 @@ class Return( class CancelAck( - Return, + Msg, + Generic[PayloadT], ): ''' Deliver the `bool` return-value from a cancellation `Actor`