Go back to `ContextVar` for codec mgmt

Turns out we do want per-task inheritance particularly if there's to be
per `Context` dynamic mutation of the spec; we don't want mutation in
some task to affect any parent/global setting.

Turns out since we use a common "feeder task" in the rpc loop, we need to
offer a per `Context` payload decoder sys anyway in order to enable
per-task controls for inter-actor multi-task-ctx scenarios.
runtime_to_msgspec
Tyler Goodlet 2024-04-18 16:24:59 -04:00
parent d51be2a36a
commit dd6a4d49d8
1 changed files with 75 additions and 51 deletions

View File

@ -33,25 +33,29 @@ from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
# from contextvars import (
# ContextVar,
# Token,
# )
from contextvars import (
ContextVar,
Token,
)
import textwrap
from typing import (
Any,
Callable,
Type,
TYPE_CHECKING,
Union,
)
from types import ModuleType
import msgspec
from msgspec import msgpack
from trio.lowlevel import (
RunVar,
RunVarToken,
from msgspec import (
msgpack,
# Raw,
)
# from trio.lowlevel import (
# RunVar,
# RunVarToken,
# )
# TODO: see notes below from @mikenerone..
# from tricycle import TreeVar
@ -62,6 +66,9 @@ from tractor.msg.types import (
)
from tractor.log import get_logger
if TYPE_CHECKING:
from tractor._context import Context
log = get_logger(__name__)
# TODO: overall IPC msg-spec features (i.e. in this mod)!
@ -157,24 +164,6 @@ class MsgCodec(Struct):
lib: ModuleType = msgspec
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property
@property
@ -210,7 +199,25 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/usage.html#typed-decoding
return self._dec.decode(msg)
# TODO: do we still want to try and support the sub-decoder with
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
#
# -[ ] do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives
# future grief?
#
@ -429,6 +436,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
#
_def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any,
# TODO: use this for debug mode locking prot?
# ipc_pld_spec=Raw,
)
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing
@ -462,11 +472,9 @@ _def_tractor_codec: MsgCodec = mk_codec(
# TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired..
_ctxvar_MsgCodec: MsgCodec = RunVar(
# _ctxvar_MsgCodec: MsgCodec = RunVar(
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
# default=_def_msgspec_codec,
default=_def_tractor_codec,
)
@ -475,23 +483,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
def apply_codec(
codec: MsgCodec,
ctx: Context|None = None,
) -> MsgCodec:
'''
Dynamically apply a `MsgCodec` to the current task's
runtime context such that all IPC msgs are processed
with it for that task.
Dynamically apply a `MsgCodec` to the current task's runtime
context such that all (of a certain class of payload
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
processed with it for that task.
Uses a `contextvars.ContextVar` to ensure the scope of any
codec setting matches the current `Context` or
`._rpc.process_messages()` feeder task's prior setting without
mutating any surrounding scope.
When a `ctx` is supplied, only mod its `Context.pld_codec`.
Uses a `tricycle.TreeVar` to ensure the scope of the codec
matches the `@cm` block and DOES NOT change to the original
(default) value in new tasks (as it does for `ContextVar`).
See the docs:
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
'''
__tracebackhide__: bool = True
orig: MsgCodec = _ctxvar_MsgCodec.get()
if ctx is not None:
var: ContextVar = ctx._var_pld_codec
else:
# use IPC channel-connection "global" codec
var: ContextVar = _ctxvar_MsgCodec
orig: MsgCodec = var.get()
assert orig is not codec
if codec.pld_spec is None:
breakpoint()
@ -500,22 +521,25 @@ def apply_codec(
'Applying new msg-spec codec\n\n'
f'{codec}\n'
)
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
token: Token = var.set(codec)
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# try:
# with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
# ?TODO? for TreeVar approach which copies from the
# cancel-scope of the prior value, NOT the prior task
# See the docs:
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
# ^- see docs for @cm `.being()` API
# with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
try:
yield _ctxvar_MsgCodec.get()
yield var.get()
finally:
_ctxvar_MsgCodec.reset(token)
var.reset(token)
assert _ctxvar_MsgCodec.get() is orig
assert var.get() is orig
log.info(
'Reverted to last msg-spec codec\n\n'
f'{orig}\n'