From 61f66902613887f2406b37a7c5c83204f4c342f4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Mar 2025 14:13:36 -0500 Subject: [PATCH] Finally get type-extended `msgspec` fields workinn By using our new `PldRx` design we can, - pass through the pld-spec & a `dec_hook()` to our `MsgDec` which is used to configure the underlying `.dec: msgspec.msgpack.Decoder` - pass through a `enc_hook()` to `mk_codec()` and use it to conf the equiv `MsgCodec.enc` such that sent msg-plds are converted prior to transport. The trick ended up being just to always union the `mk_dec()` extension-types spec with the normaly with the `msgspec.Raw` pld-spec such that the `dec_hook()` is only invoked for payload types tagged by the encoder/sender side B) A variety of impl tweaks to make it all happen as well as various cleanups in the `.msg._codec` mod include, - `mk_dec()` no defaul `spec` arg, better doc string, accept the new `ext_types` arg, doing the union of that with `msgspec.Raw`. - proto-ed a now unused `mk_boxed_ext_struct()` which will likely get removed since it ended up that our `PayloadMsg` structs already cover the ext-type-hook requirement that the decoder is passed a `.type=msgspec.Struct` of some sort in order for `.dec_hook` to be used. - add a `unpack_spec_types()` util fn for getting the `set[Type]` from from a `Union[Type]` annotation instance. - mk the default `mk_codec(pc_pld_spec = Raw,)` since the `PldRx` design was already passing/overriding it and it doesn't make much sense to use `Any` anymore for the same reason; it will cause various `Context` apis to now break. |_ also accept a `enc_hook()` and `ext_types` which are used to maybe config the `.msgpack.Encoder` - generally tweak a bunch of comments-as-docs and todos namely the ones that are completed after the pld-rx design was implemented. Also, - mask the non-functioning `'defstruct'` approach `inside `.msg.types.mk_msg_spec()` to prep for its removal. Adjust the test suite (rn called `test_caps_based_msging`), - add a new suite `test_custom_extension_types` and move and use the `enc/dec_nsp()` hooks to the mod level for its use. - prolly planning to drop the `test_limit_msgspec` suite since it's mostly replaced by the `test_pldrx_limiting` mod's version? - originally was tweaking a bunch in `test_codec_hooks_mod` but likely it will get mostly rewritten to be simpler and simply verify that ext-typed fields can be used over IPC `Context`s between actors (as originally intended for this sub-suite). --- tests/test_caps_based_msging.py | 359 +++++++++++++++++++++++--------- tractor/msg/__init__.py | 1 + tractor/msg/_codec.py | 273 +++++++++++++++++++----- tractor/msg/_ops.py | 10 +- tractor/msg/types.py | 45 ++-- 5 files changed, 513 insertions(+), 175 deletions(-) diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index ba2bb10..3c2a73c 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -15,13 +15,16 @@ from typing import ( from msgspec import ( structs, msgpack, + Raw, Struct, ValidationError, ) import pytest +import trio import tractor from tractor import ( + Actor, _state, MsgTypeError, Context, @@ -32,7 +35,9 @@ from tractor.msg import ( NamespacePath, MsgCodec, + MsgDec, mk_codec, + mk_dec, apply_codec, current_codec, ) @@ -43,101 +48,34 @@ from tractor.msg.types import ( Started, mk_msg_spec, ) -import trio +from tractor.msg._ops import ( + limit_plds, +) def mk_custom_codec( - pld_spec: Union[Type]|Any, add_hooks: bool, -) -> MsgCodec: +) -> tuple[ + MsgCodec, # encode to send + MsgDec, # pld receive-n-decode +]: ''' Create custom `msgpack` enc/dec-hooks and set a `Decoder` which only loads `pld_spec` (like `NamespacePath`) types. ''' - uid: tuple[str, str] = tractor.current_actor().uid # XXX NOTE XXX: despite defining `NamespacePath` as a type # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair # to cast to/from that type on the wire. See the docs: # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - def enc_nsp(obj: Any) -> Any: - print(f'{uid} ENC HOOK') - match obj: - case NamespacePath(): - print( - f'{uid}: `NamespacePath`-Only ENCODE?\n' - f'obj-> `{obj}`: {type(obj)}\n' - ) - # if type(obj) != NamespacePath: - # breakpoint() - return str(obj) - - print( - f'{uid}\n' - 'CUSTOM ENCODE\n' - f'obj-arg-> `{obj}`: {type(obj)}\n' - ) - logmsg: str = ( - f'{uid}\n' - 'FAILED ENCODE\n' - f'obj-> `{obj}: {type(obj)}`\n' - ) - raise NotImplementedError(logmsg) - - def dec_nsp( - obj_type: Type, - obj: Any, - - ) -> Any: - print( - f'{uid}\n' - 'CUSTOM DECODE\n' - f'type-arg-> {obj_type}\n' - f'obj-arg-> `{obj}`: {type(obj)}\n' - ) - nsp = None - - if ( - obj_type is NamespacePath - and isinstance(obj, str) - and ':' in obj - ): - nsp = NamespacePath(obj) - # TODO: we could built a generic handler using - # JUST matching the obj_type part? - # nsp = obj_type(obj) - - if nsp: - print(f'Returning NSP instance: {nsp}') - return nsp - - logmsg: str = ( - f'{uid}\n' - 'FAILED DECODE\n' - f'type-> {obj_type}\n' - f'obj-arg-> `{obj}`: {type(obj)}\n\n' - f'current codec:\n' - f'{current_codec()}\n' - ) - # TODO: figure out the ignore subsys for this! - # -[ ] option whether to defense-relay backc the msg - # inside an `Invalid`/`Ignore` - # -[ ] how to make this handling pluggable such that a - # `Channel`/`MsgTransport` can intercept and process - # back msgs either via exception handling or some other - # signal? - log.warning(logmsg) - # NOTE: this delivers the invalid - # value up to `msgspec`'s decoding - # machinery for error raising. - return obj - # raise NotImplementedError(logmsg) + # if pld_spec is Any: + # pld_spec = Raw nsp_codec: MsgCodec = mk_codec( - ipc_pld_spec=pld_spec, + # ipc_pld_spec=Raw, # default! # NOTE XXX: the encode hook MUST be used no matter what since # our `NamespacePath` is not any of a `Any` native type nor @@ -153,8 +91,9 @@ def mk_custom_codec( # XXX NOTE: pretty sure this is mutex with the `type=` to # `Decoder`? so it won't work in tandem with the # `ipc_pld_spec` passed above? - dec_hook=dec_nsp if add_hooks else None, + ext_types=[NamespacePath], ) + # dec_hook=dec_nsp if add_hooks else None, return nsp_codec @@ -365,7 +304,7 @@ async def send_back_values( expect_debug: bool, pld_spec_type_strs: list[str], add_hooks: bool, - started_msg_bytes: bytes, + # started_msg_bytes: bytes, expect_ipc_send: dict[str, tuple[Any, bool]], ) -> None: @@ -392,24 +331,36 @@ async def send_back_values( # same as on parent side config. nsp_codec: MsgCodec = mk_custom_codec( - pld_spec=ipc_pld_spec, add_hooks=add_hooks, ) with ( apply_codec(nsp_codec) as codec, + limit_plds(ipc_pld_spec) as codec, ): + # we SHOULD NOT be swapping the global codec since it breaks + # `Context.starte()` roundtripping checks! chk_codec_applied( expect_codec=nsp_codec, - enter_value=codec, ) + # XXX SO NOT THIS! + # chk_codec_applied( + # expect_codec=nsp_codec, + # enter_value=codec, + # ) print( f'{uid}: attempting `Started`-bytes DECODE..\n' ) try: - msg: Started = nsp_codec.decode(started_msg_bytes) - expected_pld_spec_str: str = msg.pld - assert pld_spec_str == expected_pld_spec_str + # msg: Started = nsp_codec.decode(started_msg_bytes) + + ipc_spec: Type = ctx._pld_rx._pld_dec.spec + expected_pld_spec_str: str = str(ipc_spec) + assert ( + pld_spec_str == expected_pld_spec_str + and + ipc_pld_spec == ipc_spec + ) # TODO: maybe we should add our own wrapper error so as to # be interchange-lib agnostic? @@ -427,12 +378,15 @@ async def send_back_values( else: print( f'{uid}: (correctly) unable to DECODE `Started`-bytes\n' - f'{started_msg_bytes}\n' + # f'{started_msg_bytes}\n' ) iter_send_val_items = iter(expect_ipc_send.values()) sent: list[Any] = [] - for send_value, expect_send in iter_send_val_items: + for ( + send_value, + expect_send, + ) in iter_send_val_items: try: print( f'{uid}: attempting to `.started({send_value})`\n' @@ -457,12 +411,13 @@ async def send_back_values( break # move on to streaming block.. - except tractor.MsgTypeError: - await tractor.pause() + except tractor.MsgTypeError as _mte: + mte = _mte + # await tractor.pause() if expect_send: raise RuntimeError( - f'EXPECTED to `.started()` value given spec:\n' + f'EXPECTED to `.started()` value given spec ??\n\n' f'ipc_pld_spec -> {ipc_pld_spec}\n' f'value -> {send_value}: {type(send_value)}\n' ) @@ -530,10 +485,6 @@ async def send_back_values( # ) -def ex_func(*args): - print(f'ex_func({args})') - - @pytest.mark.parametrize( 'ipc_pld_spec', [ @@ -593,7 +544,6 @@ def test_codec_hooks_mod( # - codec modified with hooks -> decode nsp as # `NamespacePath` nsp_codec: MsgCodec = mk_custom_codec( - pld_spec=ipc_pld_spec, add_hooks=add_codec_hooks, ) with apply_codec(nsp_codec) as codec: @@ -609,7 +559,11 @@ def test_codec_hooks_mod( f'ipc_pld_spec: {ipc_pld_spec}\n' ' ------ - ------\n' ) - for val_type_str, val, expect_send in iter_maybe_sends( + for ( + val_type_str, + val, + expect_send, + )in iter_maybe_sends( send_items, ipc_pld_spec, add_codec_hooks=add_codec_hooks, @@ -618,7 +572,10 @@ def test_codec_hooks_mod( f'send_value: {val}: {type(val)} ' f'=> expect_send: {expect_send}\n' ) - expect_ipc_send[val_type_str] = (val, expect_send) + expect_ipc_send[val_type_str] = ( + val, + expect_send, + ) print( report + @@ -627,9 +584,24 @@ def test_codec_hooks_mod( assert len(expect_ipc_send) == len(send_items) # now try over real IPC with a the subactor # expect_ipc_rountrip: bool = True + + if ( + subtypes := getattr( + ipc_pld_spec, '__args__', False + ) + ): + pld_types_str: str = '|'.join(subtypes) + breakpoint() + else: + pld_types_str: str = ipc_pld_spec.__name__ + expected_started = Started( cid='cid', - pld=str(ipc_pld_spec), + # pld=str(pld_types_str), + pld=ipc_pld_spec, + ) + started_msg_bytes: bytes = nsp_codec.encode( + expected_started, ) # build list of values we expect to receive from # the subactor. @@ -655,7 +627,7 @@ def test_codec_hooks_mod( expect_debug=debug_mode, pld_spec_type_strs=pld_spec_type_strs, add_hooks=add_codec_hooks, - started_msg_bytes=nsp_codec.encode(expected_started), + started_msg_bytes=started_msg_bytes, # XXX NOTE bc we send a `NamespacePath` in this kwarg expect_ipc_send=expect_ipc_send, @@ -673,6 +645,8 @@ def test_codec_hooks_mod( # test with `limit_msg_spec()` above? # await tractor.pause() print('PARENT opening IPC ctx!\n') + ctx: tractor.Context + ipc: tractor.MsgStream async with ( # XXX should raise an mte (`MsgTypeError`) @@ -877,6 +851,10 @@ def chk_pld_type( def test_limit_msgspec( debug_mode: bool, ): + ''' + Verify that type-limiting the + + ''' async def main(): async with tractor.open_root_actor( debug_mode=debug_mode, @@ -915,3 +893,188 @@ def test_limit_msgspec( # breakpoint() trio.run(main) + + +def enc_nsp(obj: Any) -> Any: + actor: Actor = tractor.current_actor( + err_on_no_runtime=False, + ) + uid: tuple[str, str]|None = None if not actor else actor.uid + print(f'{uid} ENC HOOK') + + match obj: + # case NamespacePath()|str(): + case NamespacePath(): + encoded: str = str(obj) + print( + f'----- ENCODING `NamespacePath` as `str` ------\n' + f'|_obj:{type(obj)!r} = {obj!r}\n' + f'|_encoded: str = {encoded!r}\n' + ) + # if type(obj) != NamespacePath: + # breakpoint() + return encoded + case _: + logmsg: str = ( + f'{uid}\n' + 'FAILED ENCODE\n' + f'obj-> `{obj}: {type(obj)}`\n' + ) + raise NotImplementedError(logmsg) + + +def dec_nsp( + obj_type: Type, + obj: Any, + +) -> Any: + # breakpoint() + actor: Actor = tractor.current_actor( + err_on_no_runtime=False, + ) + uid: tuple[str, str]|None = None if not actor else actor.uid + print( + f'{uid}\n' + 'CUSTOM DECODE\n' + f'type-arg-> {obj_type}\n' + f'obj-arg-> `{obj}`: {type(obj)}\n' + ) + nsp = None + # XXX, never happens right? + if obj_type is Raw: + breakpoint() + + if ( + obj_type is NamespacePath + and isinstance(obj, str) + and ':' in obj + ): + nsp = NamespacePath(obj) + # TODO: we could built a generic handler using + # JUST matching the obj_type part? + # nsp = obj_type(obj) + + if nsp: + print(f'Returning NSP instance: {nsp}') + return nsp + + logmsg: str = ( + f'{uid}\n' + 'FAILED DECODE\n' + f'type-> {obj_type}\n' + f'obj-arg-> `{obj}`: {type(obj)}\n\n' + f'current codec:\n' + f'{current_codec()}\n' + ) + # TODO: figure out the ignore subsys for this! + # -[ ] option whether to defense-relay backc the msg + # inside an `Invalid`/`Ignore` + # -[ ] how to make this handling pluggable such that a + # `Channel`/`MsgTransport` can intercept and process + # back msgs either via exception handling or some other + # signal? + log.warning(logmsg) + # NOTE: this delivers the invalid + # value up to `msgspec`'s decoding + # machinery for error raising. + return obj + # raise NotImplementedError(logmsg) + + +def ex_func(*args): + ''' + A mod level func we can ref and load via our `NamespacePath` + python-object pointer `str` subtype. + + ''' + print(f'ex_func({args})') + + +@pytest.mark.parametrize( + 'add_codec_hooks', + [ + True, + False, + ], + ids=['use_codec_hooks', 'no_codec_hooks'], +) +def test_custom_extension_types( + debug_mode: bool, + add_codec_hooks: bool +): + ''' + Verify that a `MsgCodec` (used for encoding all outbound IPC msgs + and decoding all inbound `PayloadMsg`s) and a paired `MsgDec` + (used for decoding the `PayloadMsg.pld: Raw` received within a given + task's ipc `Context` scope) can both send and receive "extension types" + as supported via custom converter hooks passed to `msgspec`. + + ''' + nsp_pld_dec: MsgDec = mk_dec( + spec=None, # ONLY support the ext type + dec_hook=dec_nsp if add_codec_hooks else None, + ext_types=[NamespacePath], + ) + nsp_codec: MsgCodec = mk_codec( + # ipc_pld_spec=Raw, # default! + + # NOTE XXX: the encode hook MUST be used no matter what since + # our `NamespacePath` is not any of a `Any` native type nor + # a `msgspec.Struct` subtype - so `msgspec` has no way to know + # how to encode it unless we provide the custom hook. + # + # AGAIN that is, regardless of whether we spec an + # `Any`-decoded-pld the enc has no knowledge (by default) + # how to enc `NamespacePath` (nsp), so we add a custom + # hook to do that ALWAYS. + enc_hook=enc_nsp if add_codec_hooks else None, + + # XXX NOTE: pretty sure this is mutex with the `type=` to + # `Decoder`? so it won't work in tandem with the + # `ipc_pld_spec` passed above? + ext_types=[NamespacePath], + + # TODO? is it useful to have the `.pld` decoded *prior* to + # the `PldRx`?? like perf or mem related? + # ext_dec=nsp_pld_dec, + ) + if add_codec_hooks: + assert nsp_codec.dec.dec_hook is None + + # TODO? if we pass `ext_dec` above? + # assert nsp_codec.dec.dec_hook is dec_nsp + + assert nsp_codec.enc.enc_hook is enc_nsp + + nsp = NamespacePath.from_ref(ex_func) + + try: + nsp_bytes: bytes = nsp_codec.encode(nsp) + nsp_rt_sin_msg = nsp_pld_dec.decode(nsp_bytes) + nsp_rt_sin_msg.load_ref() is ex_func + except TypeError: + if not add_codec_hooks: + pass + + try: + msg_bytes: bytes = nsp_codec.encode( + Started( + cid='cid', + pld=nsp, + ) + ) + # since the ext-type obj should also be set as the msg.pld + assert nsp_bytes in msg_bytes + started_rt: Started = nsp_codec.decode(msg_bytes) + pld: Raw = started_rt.pld + assert isinstance(pld, Raw) + nsp_rt: NamespacePath = nsp_pld_dec.decode(pld) + assert isinstance(nsp_rt, NamespacePath) + # in obj comparison terms they should be the same + assert nsp_rt == nsp + # ensure we've decoded to ext type! + assert nsp_rt.load_ref() is ex_func + + except TypeError: + if not add_codec_hooks: + pass diff --git a/tractor/msg/__init__.py b/tractor/msg/__init__.py index 44586f2..8822005 100644 --- a/tractor/msg/__init__.py +++ b/tractor/msg/__init__.py @@ -33,6 +33,7 @@ from ._codec import ( apply_codec as apply_codec, mk_codec as mk_codec, + mk_dec as mk_dec, MsgCodec as MsgCodec, MsgDec as MsgDec, current_codec as current_codec, diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index 32f690f..46716d4 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -61,6 +61,7 @@ from tractor.msg.pretty_struct import Struct from tractor.msg.types import ( mk_msg_spec, MsgType, + PayloadMsg, ) from tractor.log import get_logger @@ -80,6 +81,7 @@ class MsgDec(Struct): ''' _dec: msgpack.Decoder + # _ext_types_box: Struct|None = None @property def dec(self) -> msgpack.Decoder: @@ -179,23 +181,122 @@ class MsgDec(Struct): def mk_dec( - spec: Union[Type[Struct]]|Any = Any, + spec: Union[Type[Struct]]|Type|None, + + # NOTE, required for ad-hoc type extensions to the underlying + # serialization proto (which is default `msgpack`), + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types dec_hook: Callable|None = None, + ext_types: list[Type]|None = None, ) -> MsgDec: ''' - Create an IPC msg decoder, normally used as the - `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`. + Create an IPC msg decoder, a slightly higher level wrapper around + a `msgspec.msgpack.Decoder` which provides, + + - easier introspection of the underlying type spec via + the `.spec` and `.spec_str` attrs, + - `.hook` access to the `Decoder.dec_hook()`, + - automatic custom extension-types decode support when + `dec_hook()` is provided such that any `PayloadMsg.pld` tagged + as a type from from `ext_types` (presuming the `MsgCodec.encode()` also used + a `.enc_hook()`) is processed and constructed by a `PldRx` implicitily. + + NOTE, as mentioned a `MsgDec` is normally used for `PayloadMsg.pld: PayloadT` field + decoding inside an IPC-ctx-oriented `PldRx`. ''' + if ( + spec is None + and + ext_types is None + ): + raise ValueError( + f'You must provide a type-spec for a msg decoder!\n' + f'The only time `spec=None` is permitted is if custom extension types ' + f'are expected to be supported, in which case `ext_types` must be non-`None`' + f'and it is presumed that only the `ext_types` (supported by the paired `dec_hook()`) ' + f'will be permitted within the type-`spec`!\n' + + f'tpec = {spec!r}\n' + f'dec_hook = {dec_hook!r}\n' + f'ext_types = {ext_types!r}\n' + ) + + if dec_hook: + if ext_types is None: + raise ValueError( + f'If extending the serializable types with a custom decoder hook, ' + f'you must also provide the expected type set `dec_hook()` will handle ' + f'via the `ext_types: Union[Type]|None = None` argument!\n' + f'dec_hook = {dec_hook!r}\n' + f'ext_types = {ext_types!r}\n' + ) + + # XXX, i *thought* we would require a boxing struct as per docs, + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + # |_ see comment, + # > Note that typed deserialization is required for + # > successful roundtripping here, so we pass `MyMessage` to + # > `Decoder`. + # + # BUT, turns out as long as you spec a union with `Raw` it + # will work? kk B) + # + # maybe_box_struct = mk_boxed_ext_struct(ext_types) + spec = Raw | Union[*ext_types] + return MsgDec( _dec=msgpack.Decoder( type=spec, # like `MsgType[Any]` dec_hook=dec_hook, - ) + ), ) +# TODO? remove since didn't end up needing this? +def mk_boxed_ext_struct( + ext_types: list[Type], +) -> Struct: + # NOTE, originally was to wrap non-msgpack-supported "extension + # types" in a field-typed boxing struct, see notes around the + # `dec_hook()` branch in `mk_dec()`. + ext_types_union = Union[*ext_types] + repr_ext_types_union: str = ( + str(ext_types_union) + or + "|".join(ext_types) + ) + BoxedExtType = msgspec.defstruct( + f'BoxedExts[{repr_ext_types_union}]', + fields=[ + ('boxed', ext_types_union), + ], + ) + return BoxedExtType + + +def unpack_spec_types( + spec: Union[Type]|Type, +) -> set[Type]: + ''' + Given an input type-`spec`, either a lone type + or a `Union` of types (like `str|int|MyThing`), + return a set of individual types. + + When `spec` is not a type-union returns `{spec,}`. + + ''' + spec_subtypes: set[Union[Type]] = ( + getattr( + spec, + '__args__', + {spec,}, + ) + ) + return spec_subtypes + + def mk_msgspec_table( dec: msgpack.Decoder, msg: MsgType|None = None, @@ -273,6 +374,8 @@ class MsgCodec(Struct): _dec: msgpack.Decoder _pld_spec: Type[Struct]|Raw|Any + # _ext_types_box: Struct|None = None + def __repr__(self) -> str: speclines: str = textwrap.indent( pformat_msgspec(codec=self), @@ -339,12 +442,14 @@ class MsgCodec(Struct): def encode( self, - py_obj: Any, + py_obj: Any|PayloadMsg, use_buf: bool = False, # ^-XXX-^ uhh why am i getting this? # |_BufferError: Existing exports of data: object cannot be re-sized + as_ext_type: bool = False, + ) -> bytes: ''' Encode input python objects to `msgpack` bytes for @@ -357,8 +462,33 @@ class MsgCodec(Struct): if use_buf: self._enc.encode_into(py_obj, self._buf) return self._buf - else: - return self._enc.encode(py_obj) + + return self._enc.encode(py_obj) + # TODO! REMOVE once i'm confident we won't ever need it! + # + # box: Struct = self._ext_types_box + # if ( + # as_ext_type + # or + # ( + # # XXX NOTE, auto-detect if the input type + # box + # and + # (ext_types := unpack_spec_types( + # spec=box.__annotations__['boxed']) + # ) + # ) + # ): + # match py_obj: + # # case PayloadMsg(pld=pld) if ( + # # type(pld) in ext_types + # # ): + # # py_obj.pld = box(boxed=py_obj) + # # breakpoint() + # case _ if ( + # type(py_obj) in ext_types + # ): + # py_obj = box(boxed=py_obj) @property def dec(self) -> msgpack.Decoder: @@ -378,21 +508,30 @@ class MsgCodec(Struct): return self._dec.decode(msg) -# [x] TODO: a sub-decoder system as well? => No! +# ?TODO? time to remove this finally? +# +# -[x] TODO: a sub-decoder system as well? +# => No! already re-architected to include a "payload-receiver" +# now found in `._ops`. # # -[x] do we still want to try and support the sub-decoder with # `.Raw` technique in the case that the `Generic` approach gives # future grief? -# => NO, since we went with the `PldRx` approach instead B) +# => well YES but NO, since we went with the `PldRx` approach +# instead! # # IF however you want to see the code that was staged for this # from wayyy back, see the pure removal commit. def mk_codec( - # struct type unions set for `Decoder` - # https://jcristharif.com/msgspec/structs.html#tagged-unions - ipc_pld_spec: Union[Type[Struct]]|Any = Any, + ipc_pld_spec: Union[Type[Struct]]|Any|Raw = Raw, + # tagged-struct-types-union set for `Decoder`ing of payloads, as + # per https://jcristharif.com/msgspec/structs.html#tagged-unions. + # NOTE that the default `Raw` here **is very intentional** since + # the `PldRx._pld_dec: MsgDec` is responsible for per ipc-ctx-task + # decoding of msg-specs defined by the user as part of **their** + # `tractor` "app's" type-limited IPC msg-spec. # TODO: offering a per-msg(-field) type-spec such that # the fields can be dynamically NOT decoded and left as `Raw` @@ -405,13 +544,18 @@ def mk_codec( libname: str = 'msgspec', - # proxy as `Struct(**kwargs)` for ad-hoc type extensions + # settings for encoding-to-send extension-types, # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - # ------ - ------ - dec_hook: Callable|None = None, + # dec_hook: Callable|None = None, enc_hook: Callable|None = None, - # ------ - ------ + ext_types: list[Type]|None = None, + + # optionally provided msg-decoder from which we pull its, + # |_.dec_hook() + # |_.type + ext_dec: MsgDec|None = None # + # ?TODO? other params we might want to support # Encoder: # write_buffer_size=write_buffer_size, # @@ -425,26 +569,43 @@ def mk_codec( `msgspec` ;). ''' - # (manually) generate a msg-payload-spec for all relevant - # god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT` - # for the decoder such that all sub-type msgs in our SCIPP - # will automatically decode to a type-"limited" payload (`Struct`) - # object (set). + pld_spec = ipc_pld_spec + if enc_hook: + if not ext_types: + raise ValueError( + f'If extending the serializable types with a custom decoder hook, ' + f'you must also provide the expected type set `enc_hook()` will handle ' + f'via the `ext_types: Union[Type]|None = None` argument!\n' + f'enc_hook = {enc_hook!r}\n' + f'ext_types = {ext_types!r}\n' + ) + + dec_hook: Callable|None = None + if ext_dec: + dec: msgspec.Decoder = ext_dec.dec + dec_hook = dec.dec_hook + pld_spec |= dec.type + if ext_types: + pld_spec |= Union[*ext_types] + + # (manually) generate a msg-spec (how appropes) for all relevant + # payload-boxing-struct-msg-types, parameterizing the + # `PayloadMsg.pld: PayloadT` for the decoder such that all msgs + # in our SC-RPC-protocol will automatically decode to + # a type-"limited" payload (`Struct`) object (set). ( ipc_msg_spec, msg_types, ) = mk_msg_spec( - payload_type_union=ipc_pld_spec, + payload_type_union=pld_spec, ) - assert len(ipc_msg_spec.__args__) == len(msg_types) - assert ipc_msg_spec - # TODO: use this shim instead? - # bc.. unification, err somethin? - # dec: MsgDec = mk_dec( - # spec=ipc_msg_spec, - # dec_hook=dec_hook, - # ) + msg_spec_types: set[Type] = unpack_spec_types(ipc_msg_spec) + assert ( + len(ipc_msg_spec.__args__) == len(msg_types) + and + len(msg_spec_types) == len(msg_types) + ) dec = msgpack.Decoder( type=ipc_msg_spec, @@ -453,22 +614,29 @@ def mk_codec( enc = msgpack.Encoder( enc_hook=enc_hook, ) - codec = MsgCodec( _enc=enc, _dec=dec, - _pld_spec=ipc_pld_spec, + _pld_spec=pld_spec, ) - # sanity on expected backend support assert codec.lib.__name__ == libname - return codec # instance of the default `msgspec.msgpack` codec settings, i.e. # no custom structs, hooks or other special types. -_def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any) +# +# XXX NOTE XXX, this will break our `Context.start()` call! +# +# * by default we roundtrip the started pld-`value` and if you apply +# this codec (globally anyway with `apply_codec()`) then the +# `roundtripped` value will include a non-`.pld: Raw` which will +# then type-error on the consequent `._ops.validte_payload_msg()`.. +# +_def_msgspec_codec: MsgCodec = mk_codec( + ipc_pld_spec=Any, +) # The built-in IPC `Msg` spec. # Our composing "shuttle" protocol which allows `tractor`-app code @@ -476,13 +644,13 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any) # https://jcristharif.com/msgspec/supported-types.html # _def_tractor_codec: MsgCodec = mk_codec( - # TODO: use this for debug mode locking prot? - # ipc_pld_spec=Any, - ipc_pld_spec=Raw, + ipc_pld_spec=Raw, # XXX should be default righ!? ) -# TODO: IDEALLY provides for per-`trio.Task` specificity of the + +# -[x] TODO, IDEALLY provides for per-`trio.Task` specificity of the # IPC msging codec used by the transport layer when doing # `Channel.send()/.recv()` of wire data. +# => impled as our `PldRx` which is `Context` scoped B) # ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!? # _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( @@ -559,17 +727,6 @@ def apply_codec( ) token: Token = var.set(codec) - # ?TODO? for TreeVar approach which copies from the - # cancel-scope of the prior value, NOT the prior task - # See the docs: - # - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables - # - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py - # ^- see docs for @cm `.being()` API - # with _ctxvar_MsgCodec.being(codec): - # new = _ctxvar_MsgCodec.get() - # assert new is codec - # yield codec - try: yield var.get() finally: @@ -580,6 +737,19 @@ def apply_codec( ) assert var.get() is orig + # ?TODO? for TreeVar approach which copies from the + # cancel-scope of the prior value, NOT the prior task + # + # See the docs: + # - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables + # - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py + # ^- see docs for @cm `.being()` API + # + # with _ctxvar_MsgCodec.being(codec): + # new = _ctxvar_MsgCodec.get() + # assert new is codec + # yield codec + def current_codec() -> MsgCodec: ''' @@ -599,6 +769,7 @@ def limit_msg_spec( # -> related to the `MsgCodec._payload_decs` stuff above.. # tagged_structs: list[Struct]|None = None, + hide_tb: bool = True, **codec_kwargs, ) -> MsgCodec: @@ -609,7 +780,7 @@ def limit_msg_spec( for all IPC contexts in use by the current `trio.Task`. ''' - __tracebackhide__: bool = True + __tracebackhide__: bool = hide_tb curr_codec: MsgCodec = current_codec() msgspec_codec: MsgCodec = mk_codec( ipc_pld_spec=payload_spec, diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index dc63221..6f178ba 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -50,7 +50,9 @@ from tractor._exceptions import ( _mk_recv_mte, pack_error, ) -from tractor._state import current_ipc_ctx +from tractor._state import ( + current_ipc_ctx, +) from ._codec import ( mk_dec, MsgDec, @@ -78,7 +80,7 @@ if TYPE_CHECKING: log = get_logger(__name__) -_def_any_pldec: MsgDec[Any] = mk_dec() +_def_any_pldec: MsgDec[Any] = mk_dec(spec=Any) class PldRx(Struct): @@ -148,6 +150,10 @@ class PldRx(Struct): exit. ''' + # TODO, ensure we pull the current `MsgCodec`'s custom + # dec/enc_hook settings as well ? + # -[ ] see `._codec.mk_codec()` inputs + # orig_dec: MsgDec = self._pld_dec limit_dec: MsgDec = mk_dec( spec=spec, diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 0904411..1cc8b78 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -599,15 +599,15 @@ def mk_msg_spec( Msg[payload_type_union], Generic[PayloadT], ) - defstruct_bases: tuple = ( - Msg, # [payload_type_union], - # Generic[PayloadT], - # ^-XXX-^: not allowed? lul.. - ) + # defstruct_bases: tuple = ( + # Msg, # [payload_type_union], + # # Generic[PayloadT], + # # ^-XXX-^: not allowed? lul.. + # ) ipc_msg_types: list[Msg] = [] idx_msg_types: list[Msg] = [] - defs_msg_types: list[Msg] = [] + # defs_msg_types: list[Msg] = [] nc_msg_types: list[Msg] = [] for msgtype in __msg_types__: @@ -625,7 +625,7 @@ def mk_msg_spec( # TODO: wait why do we need the dynamic version here? # XXX ANSWER XXX -> BC INHERITANCE.. don't work w generics.. # - # NOTE previously bc msgtypes WERE NOT inheritting + # NOTE previously bc msgtypes WERE NOT inheriting # directly the `Generic[PayloadT]` type, the manual method # of generic-paraming with `.__class_getitem__()` wasn't # working.. @@ -662,38 +662,35 @@ def mk_msg_spec( # with `msgspec.structs.defstruct` # XXX ALSO DOESN'T WORK - defstruct_msgtype = defstruct( - name=msgtype.__name__, - fields=[ - ('cid', str), + # defstruct_msgtype = defstruct( + # name=msgtype.__name__, + # fields=[ + # ('cid', str), - # XXX doesn't seem to work.. - # ('pld', PayloadT), - - ('pld', payload_type_union), - ], - bases=defstruct_bases, - ) - defs_msg_types.append(defstruct_msgtype) + # # XXX doesn't seem to work.. + # # ('pld', PayloadT), + # ('pld', payload_type_union), + # ], + # bases=defstruct_bases, + # ) + # defs_msg_types.append(defstruct_msgtype) # assert index_paramed_msg_type == manual_paramed_msg_subtype - # paramed_msg_type = manual_paramed_msg_subtype - # ipc_payload_msgs_type_union |= index_paramed_msg_type idx_spec: Union[Type[Msg]] = Union[*idx_msg_types] - def_spec: Union[Type[Msg]] = Union[*defs_msg_types] + # def_spec: Union[Type[Msg]] = Union[*defs_msg_types] nc_spec: Union[Type[Msg]] = Union[*nc_msg_types] specs: dict[str, Union[Type[Msg]]] = { 'indexed_generics': idx_spec, - 'defstruct': def_spec, + # 'defstruct': def_spec, 'types_new_class': nc_spec, } msgtypes_table: dict[str, list[Msg]] = { 'indexed_generics': idx_msg_types, - 'defstruct': defs_msg_types, + # 'defstruct': defs_msg_types, 'types_new_class': nc_msg_types, }