Go back to `ContextVar` for codec mgmt
Turns out we do want per-task inheritance particularly if there's to be per `Context` dynamic mutation of the spec; we don't want mutation in some task to affect any parent/global setting. Turns out since we use a common "feeder task" in the rpc loop, we need to offer a per `Context` payload decoder sys anyway in order to enable per-task controls for inter-actor multi-task-ctx scenarios.
parent
d51be2a36a
commit
9b4f580470
|
@ -33,25 +33,29 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
# from contextvars import (
|
from contextvars import (
|
||||||
# ContextVar,
|
ContextVar,
|
||||||
# Token,
|
Token,
|
||||||
# )
|
)
|
||||||
import textwrap
|
import textwrap
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
Type,
|
Type,
|
||||||
|
TYPE_CHECKING,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
from msgspec import msgpack
|
from msgspec import (
|
||||||
from trio.lowlevel import (
|
msgpack,
|
||||||
RunVar,
|
Raw,
|
||||||
RunVarToken,
|
|
||||||
)
|
)
|
||||||
|
# from trio.lowlevel import (
|
||||||
|
# RunVar,
|
||||||
|
# RunVarToken,
|
||||||
|
# )
|
||||||
# TODO: see notes below from @mikenerone..
|
# TODO: see notes below from @mikenerone..
|
||||||
# from tricycle import TreeVar
|
# from tricycle import TreeVar
|
||||||
|
|
||||||
|
@ -62,6 +66,9 @@ from tractor.msg.types import (
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor._context import Context
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||||
|
@ -157,24 +164,6 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
lib: ModuleType = msgspec
|
lib: ModuleType = msgspec
|
||||||
|
|
||||||
# TODO: a sub-decoder system as well?
|
|
||||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
|
||||||
# see related comments in `.msg.types`
|
|
||||||
# _payload_decs: (
|
|
||||||
# dict[
|
|
||||||
# str,
|
|
||||||
# msgpack.Decoder,
|
|
||||||
# ]
|
|
||||||
# |None
|
|
||||||
# ) = None
|
|
||||||
# OR
|
|
||||||
# ) = {
|
|
||||||
# # pre-seed decoders for std-py-type-set for use when
|
|
||||||
# # `MsgType.pld == None|Any`.
|
|
||||||
# None: msgpack.Decoder(Any),
|
|
||||||
# Any: msgpack.Decoder(Any),
|
|
||||||
# }
|
|
||||||
|
|
||||||
# TODO: use `functools.cached_property` for these ?
|
# TODO: use `functools.cached_property` for these ?
|
||||||
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
# https://docs.python.org/3/library/functools.html#functools.cached_property
|
||||||
@property
|
@property
|
||||||
|
@ -210,7 +199,25 @@ class MsgCodec(Struct):
|
||||||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||||
return self._dec.decode(msg)
|
return self._dec.decode(msg)
|
||||||
|
|
||||||
# TODO: do we still want to try and support the sub-decoder with
|
# TODO: a sub-decoder system as well?
|
||||||
|
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||||
|
# see related comments in `.msg.types`
|
||||||
|
# _payload_decs: (
|
||||||
|
# dict[
|
||||||
|
# str,
|
||||||
|
# msgpack.Decoder,
|
||||||
|
# ]
|
||||||
|
# |None
|
||||||
|
# ) = None
|
||||||
|
# OR
|
||||||
|
# ) = {
|
||||||
|
# # pre-seed decoders for std-py-type-set for use when
|
||||||
|
# # `MsgType.pld == None|Any`.
|
||||||
|
# None: msgpack.Decoder(Any),
|
||||||
|
# Any: msgpack.Decoder(Any),
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# -[ ] do we still want to try and support the sub-decoder with
|
||||||
# `.Raw` technique in the case that the `Generic` approach gives
|
# `.Raw` technique in the case that the `Generic` approach gives
|
||||||
# future grief?
|
# future grief?
|
||||||
#
|
#
|
||||||
|
@ -429,6 +436,9 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
||||||
#
|
#
|
||||||
_def_tractor_codec: MsgCodec = mk_codec(
|
_def_tractor_codec: MsgCodec = mk_codec(
|
||||||
ipc_pld_spec=Any,
|
ipc_pld_spec=Any,
|
||||||
|
|
||||||
|
# TODO: use this for debug mode locking prot?
|
||||||
|
# ipc_pld_spec=Raw,
|
||||||
)
|
)
|
||||||
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
||||||
# IPC msging codec used by the transport layer when doing
|
# IPC msging codec used by the transport layer when doing
|
||||||
|
@ -462,11 +472,9 @@ _def_tractor_codec: MsgCodec = mk_codec(
|
||||||
|
|
||||||
# TODO: STOP USING THIS, since it's basically a global and won't
|
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||||
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||||
_ctxvar_MsgCodec: MsgCodec = RunVar(
|
# _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
|
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||||
'msgspec_codec',
|
'msgspec_codec',
|
||||||
|
|
||||||
# TODO: move this to our new `Msg`-spec!
|
|
||||||
# default=_def_msgspec_codec,
|
|
||||||
default=_def_tractor_codec,
|
default=_def_tractor_codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -475,23 +483,36 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
def apply_codec(
|
def apply_codec(
|
||||||
codec: MsgCodec,
|
codec: MsgCodec,
|
||||||
|
|
||||||
|
ctx: Context|None = None,
|
||||||
|
|
||||||
) -> MsgCodec:
|
) -> MsgCodec:
|
||||||
'''
|
'''
|
||||||
Dynamically apply a `MsgCodec` to the current task's
|
Dynamically apply a `MsgCodec` to the current task's runtime
|
||||||
runtime context such that all IPC msgs are processed
|
context such that all (of a certain class of payload
|
||||||
with it for that task.
|
containing i.e. `MsgType.pld: PayloadT`) IPC msgs are
|
||||||
|
processed with it for that task.
|
||||||
|
|
||||||
|
Uses a `contextvars.ContextVar` to ensure the scope of any
|
||||||
|
codec setting matches the current `Context` or
|
||||||
|
`._rpc.process_messages()` feeder task's prior setting without
|
||||||
|
mutating any surrounding scope.
|
||||||
|
|
||||||
|
When a `ctx` is supplied, only mod its `Context.pld_codec`.
|
||||||
|
|
||||||
Uses a `tricycle.TreeVar` to ensure the scope of the codec
|
|
||||||
matches the `@cm` block and DOES NOT change to the original
|
matches the `@cm` block and DOES NOT change to the original
|
||||||
(default) value in new tasks (as it does for `ContextVar`).
|
(default) value in new tasks (as it does for `ContextVar`).
|
||||||
|
|
||||||
See the docs:
|
|
||||||
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
|
||||||
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
orig: MsgCodec = _ctxvar_MsgCodec.get()
|
|
||||||
|
if ctx is not None:
|
||||||
|
var: ContextVar = ctx._var_pld_codec
|
||||||
|
else:
|
||||||
|
# use IPC channel-connection "global" codec
|
||||||
|
var: ContextVar = _ctxvar_MsgCodec
|
||||||
|
|
||||||
|
orig: MsgCodec = var.get()
|
||||||
|
|
||||||
assert orig is not codec
|
assert orig is not codec
|
||||||
if codec.pld_spec is None:
|
if codec.pld_spec is None:
|
||||||
breakpoint()
|
breakpoint()
|
||||||
|
@ -500,22 +521,25 @@ def apply_codec(
|
||||||
'Applying new msg-spec codec\n\n'
|
'Applying new msg-spec codec\n\n'
|
||||||
f'{codec}\n'
|
f'{codec}\n'
|
||||||
)
|
)
|
||||||
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
|
token: Token = var.set(codec)
|
||||||
|
|
||||||
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
|
# ?TODO? for TreeVar approach which copies from the
|
||||||
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
# cancel-scope of the prior value, NOT the prior task
|
||||||
# try:
|
# See the docs:
|
||||||
|
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||||
|
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||||
|
# ^- see docs for @cm `.being()` API
|
||||||
# with _ctxvar_MsgCodec.being(codec):
|
# with _ctxvar_MsgCodec.being(codec):
|
||||||
# new = _ctxvar_MsgCodec.get()
|
# new = _ctxvar_MsgCodec.get()
|
||||||
# assert new is codec
|
# assert new is codec
|
||||||
# yield codec
|
# yield codec
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield _ctxvar_MsgCodec.get()
|
yield var.get()
|
||||||
finally:
|
finally:
|
||||||
_ctxvar_MsgCodec.reset(token)
|
var.reset(token)
|
||||||
|
|
||||||
assert _ctxvar_MsgCodec.get() is orig
|
assert var.get() is orig
|
||||||
log.info(
|
log.info(
|
||||||
'Reverted to last msg-spec codec\n\n'
|
'Reverted to last msg-spec codec\n\n'
|
||||||
f'{orig}\n'
|
f'{orig}\n'
|
||||||
|
|
Loading…
Reference in New Issue