diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index 82fd201..766a297 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -33,25 +33,29 @@ from __future__ import annotations from contextlib import ( contextmanager as cm, ) -# from contextvars import ( -# ContextVar, -# Token, -# ) +from contextvars import ( + ContextVar, + Token, +) import textwrap from typing import ( Any, Callable, Type, + TYPE_CHECKING, Union, ) from types import ModuleType import msgspec -from msgspec import msgpack -from trio.lowlevel import ( - RunVar, - RunVarToken, +from msgspec import ( + msgpack, + # Raw, ) +# from trio.lowlevel import ( +# RunVar, +# RunVarToken, +# ) # TODO: see notes below from @mikenerone.. # from tricycle import TreeVar @@ -62,6 +66,9 @@ from tractor.msg.types import ( ) from tractor.log import get_logger +if TYPE_CHECKING: + from tractor._context import Context + log = get_logger(__name__) # TODO: overall IPC msg-spec features (i.e. in this mod)! @@ -157,24 +164,6 @@ class MsgCodec(Struct): lib: ModuleType = msgspec - # TODO: a sub-decoder system as well? - # payload_msg_specs: Union[Type[Struct]] = Any - # see related comments in `.msg.types` - # _payload_decs: ( - # dict[ - # str, - # msgpack.Decoder, - # ] - # |None - # ) = None - # OR - # ) = { - # # pre-seed decoders for std-py-type-set for use when - # # `MsgType.pld == None|Any`. - # None: msgpack.Decoder(Any), - # Any: msgpack.Decoder(Any), - # } - # TODO: use `functools.cached_property` for these ? # https://docs.python.org/3/library/functools.html#functools.cached_property @property @@ -210,7 +199,25 @@ class MsgCodec(Struct): # https://jcristharif.com/msgspec/usage.html#typed-decoding return self._dec.decode(msg) - # TODO: do we still want to try and support the sub-decoder with + # TODO: a sub-decoder system as well? + # payload_msg_specs: Union[Type[Struct]] = Any + # see related comments in `.msg.types` + # _payload_decs: ( + # dict[ + # str, + # msgpack.Decoder, + # ] + # |None + # ) = None + # OR + # ) = { + # # pre-seed decoders for std-py-type-set for use when + # # `MsgType.pld == None|Any`. + # None: msgpack.Decoder(Any), + # Any: msgpack.Decoder(Any), + # } + # + # -[ ] do we still want to try and support the sub-decoder with # `.Raw` technique in the case that the `Generic` approach gives # future grief? # @@ -429,6 +436,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any) # _def_tractor_codec: MsgCodec = mk_codec( ipc_pld_spec=Any, + + # TODO: use this for debug mode locking prot? + # ipc_pld_spec=Raw, ) # TODO: IDEALLY provides for per-`trio.Task` specificity of the # IPC msging codec used by the transport layer when doing @@ -462,11 +472,9 @@ _def_tractor_codec: MsgCodec = mk_codec( # TODO: STOP USING THIS, since it's basically a global and won't # allow sub-IPC-ctxs to limit the msg-spec however desired.. -_ctxvar_MsgCodec: MsgCodec = RunVar( +# _ctxvar_MsgCodec: MsgCodec = RunVar( +_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( 'msgspec_codec', - - # TODO: move this to our new `Msg`-spec! - # default=_def_msgspec_codec, default=_def_tractor_codec, ) @@ -475,23 +483,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar( def apply_codec( codec: MsgCodec, + ctx: Context|None = None, + ) -> MsgCodec: ''' - Dynamically apply a `MsgCodec` to the current task's - runtime context such that all IPC msgs are processed - with it for that task. + Dynamically apply a `MsgCodec` to the current task's runtime + context such that all (of a certain class of payload + containing i.e. `MsgType.pld: PayloadT`) IPC msgs are + processed with it for that task. + + Uses a `contextvars.ContextVar` to ensure the scope of any + codec setting matches the current `Context` or + `._rpc.process_messages()` feeder task's prior setting without + mutating any surrounding scope. + + When a `ctx` is supplied, only mod its `Context.pld_codec`. - Uses a `tricycle.TreeVar` to ensure the scope of the codec matches the `@cm` block and DOES NOT change to the original (default) value in new tasks (as it does for `ContextVar`). - See the docs: - - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables - - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py - ''' __tracebackhide__: bool = True - orig: MsgCodec = _ctxvar_MsgCodec.get() + + if ctx is not None: + var: ContextVar = ctx._var_pld_codec + else: + # use IPC channel-connection "global" codec + var: ContextVar = _ctxvar_MsgCodec + + orig: MsgCodec = var.get() + assert orig is not codec if codec.pld_spec is None: breakpoint() @@ -500,22 +521,25 @@ def apply_codec( 'Applying new msg-spec codec\n\n' f'{codec}\n' ) - token: RunVarToken = _ctxvar_MsgCodec.set(codec) + token: Token = var.set(codec) - # TODO: for TreeVar approach, see docs for @cm `.being()` API: - # https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables - # try: - # with _ctxvar_MsgCodec.being(codec): - # new = _ctxvar_MsgCodec.get() - # assert new is codec - # yield codec + # ?TODO? for TreeVar approach which copies from the + # cancel-scope of the prior value, NOT the prior task + # See the docs: + # - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables + # - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py + # ^- see docs for @cm `.being()` API + # with _ctxvar_MsgCodec.being(codec): + # new = _ctxvar_MsgCodec.get() + # assert new is codec + # yield codec try: - yield _ctxvar_MsgCodec.get() + yield var.get() finally: - _ctxvar_MsgCodec.reset(token) + var.reset(token) - assert _ctxvar_MsgCodec.get() is orig + assert var.get() is orig log.info( 'Reverted to last msg-spec codec\n\n' f'{orig}\n'