Get msg spec type limiting working with a `RunVar`

Since `contextvars.ContextVar` seems to reset to the default in every
new task, switching to using `trio.lowlevel.RunVar` kinda gets close to
what we'd like where a child scope can override what's in the rent but
ideally without modifying the rent's. I tried `tricycle.TreeVar` as well
but it also seems to reset across (embedded) nurseries in our runtime;
need to try it again bc apparently that's not how it's suppose to work?

Surrounding and in support of all this the `Msg`-set impl deats changed
a bit as well as various stuff in `.msg` sub-mods:

- drop the `.pld` struct types for `Error`, `Start`, `StartAck` since we
  don't really need the `.pld` payload field in those cases since
  they're runtime control msgs for starting RPC tasks and handling
  remote errors; we can just put the fields directly on each msg since
  the user will never want/need to override the `.pld` field type.

- add a couple new runtime msgs and include them in `msg.__spec__`
  and make them NOT inherit from `Msg` since they are runtime-specific
  and thus have no need for `.pld` type constraints:
  - `Aid` the actor-id identity handshake msg.
  - `SpawnSpec`: the spawn data passed from a parent actor down to a
    a child in `Actor._from_parent()` for which we need a shuttle
    protocol msg, so might as well make it a pendatic one ;)

- fix some `Actor.uid` field types that were type-borked on `Error`

- add notes about how we need built-in `debug_mode` msgs in order to
  avoid msg-type errors when using the TTY lock machinery and
  a different `.pld` spec then the default `Any` is in use..
  -> since `devx._debug.lock_tty_for_child()` and it's client side
  `wait_for_parent_stdin_hijack()` use `Context.started('Locked')`
  and `MsgStream.send('pdb_unlock')` string values as their `.pld`
  contents we'd need to either always do a `ipc_pld_spec | str` or
  pre-define some dedicated `Msg` types which get `Union`-ed in
  for this?

- break out `msg.pretty_struct.Struct._sin_props()` into a helper func
  `iter_fields()` since the impl doesn't require a struct instance.

- as mentioned above since `ContextVar` didn't work as anticipated
  I next tried `tricycle.TreeVar` but that too didn't seem to keep
  the `apply_codec()` setting intact across
  `Portal.open_context()`/`Context.open_stream()` (it kept reverting to
  the default `.pld: Any` default setting) so I finalized on
  a trio.lowlevel.RunVar` for now despite it basically being
  a `global`..
  -> will probably come back to test this with `TreeVar` and some hot
  tips i picked up from @mikenerone in the `trio` gitter, which i put in
  comments surrounding proto-code.
Tyler Goodlet 2024-04-02 09:21:30 -04:00
parent 25ffdedc06
commit a315f01acc
4 changed files with 338 additions and 85 deletions

View File

@ -31,25 +31,24 @@ from ._codec import (
apply_codec as apply_codec,
mk_codec as mk_codec,
MsgCodec as MsgCodec,
current_msgspec_codec as current_msgspec_codec,
current_codec as current_codec,
)
from .types import (
Msg as Msg,
Start as Start, # with pld
FuncSpec as FuncSpec,
Aid as Aid,
SpawnSpec as SpawnSpec,
StartAck as StartAck, # with pld
IpcCtxSpec as IpcCtxSpec,
Start as Start,
StartAck as StartAck,
Started as Started,
Yield as Yield,
Stop as Stop,
Return as Return,
Error as Error, # with pld
ErrorData as ErrorData,
Error as Error,
# full msg spec set
__spec__ as __spec__,

View File

@ -30,13 +30,13 @@ ToDo: backends we prolly should offer:
'''
from __future__ import annotations
from contextvars import (
ContextVar,
Token,
)
from contextlib import (
contextmanager as cm,
)
# from contextvars import (
# ContextVar,
# Token,
# )
from typing import (
Any,
Callable,
@ -47,6 +47,12 @@ from types import ModuleType
import msgspec
from msgspec import msgpack
from trio.lowlevel import (
RunVar,
RunVarToken,
)
# TODO: see notes below from @mikenerone..
# from tricycle import TreeVar
from tractor.msg.pretty_struct import Struct
from tractor.msg.types import (
@ -72,6 +78,9 @@ class MsgCodec(Struct):
'''
A IPC msg interchange format lib's encoder + decoder pair.
Pretty much nothing more then delegation to underlying
`msgspec.<interchange-protocol>.Encoder/Decoder`s for now.
'''
_enc: msgpack.Encoder
_dec: msgpack.Decoder
@ -86,11 +95,6 @@ class MsgCodec(Struct):
lib: ModuleType = msgspec
# ad-hoc type extensions
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
enc_hook: Callable[[Any], Any]|None = None # coder
dec_hook: Callable[[type, Any], Any]|None = None # decoder
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
@ -304,7 +308,8 @@ def mk_codec(
libname: str = 'msgspec',
# proxy as `Struct(**kwargs)`
# proxy as `Struct(**kwargs)` for ad-hoc type extensions
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# ------ - ------
dec_hook: Callable|None = None,
enc_hook: Callable|None = None,
@ -389,14 +394,52 @@ def mk_codec(
# no custom structs, hooks or other special types.
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
# NOTE: provides for per-`trio.Task` specificity of the
# The built-in IPC `Msg` spec.
# Our composing "shuttle" protocol which allows `tractor`-app code
# to use any `msgspec` supported type as the `Msg.pld` payload,
# https://jcristharif.com/msgspec/supported-types.html
#
_def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any,
)
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing
# `Channel.send()/.recv()` of wire data.
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
# ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!?
# _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
# TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery
# even though it's supposed to inherit from a parent context ???
#
# _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar(
#
# ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter:
#
# 22:02:54 <mikenerone> even for regular contextvars, all you have to do is:
# `task: Task = trio.lowlevel.current_task()`
# `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)`
#
# From a comment in his prop code he couldn't share outright:
# 1. For every TreeVar set in the current task (which covers what
# we need from SynchronizerFacade), walk up the tree until the
# root or finding one where the TreeVar is already set, setting
# it in all of the contexts along the way.
# 2. For each of those, we also forcibly set the values that are
# pending for child nurseries that have not yet accessed the
# TreeVar.
# 3. We similarly set the pending values for the child nurseries
# of the *current* task.
#
# TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired..
_ctxvar_MsgCodec: MsgCodec = RunVar(
'msgspec_codec',
# TODO: move this to our new `Msg`-spec!
default=_def_msgspec_codec,
# default=_def_msgspec_codec,
default=_def_tractor_codec,
)
@ -410,15 +453,36 @@ def apply_codec(
runtime context such that all IPC msgs are processed
with it for that task.
Uses a `tricycle.TreeVar` to ensure the scope of the codec
matches the `@cm` block and DOES NOT change to the original
(default) value in new tasks (as it does for `ContextVar`).
See the docs:
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
'''
token: Token = _ctxvar_MsgCodec.set(codec)
orig: MsgCodec = _ctxvar_MsgCodec.get()
assert orig is not codec
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# try:
# with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
try:
yield _ctxvar_MsgCodec.get()
finally:
_ctxvar_MsgCodec.reset(token)
assert _ctxvar_MsgCodec.get() is orig
def current_msgspec_codec() -> MsgCodec:
def current_codec() -> MsgCodec:
'''
Return the current `trio.Task.context`'s value
for `msgspec_codec` used by `Channel.send/.recv()`
@ -449,5 +513,6 @@ def limit_msg_spec(
payload_types=payload_types,
**codec_kwargs,
)
with apply_codec(msgspec_codec):
with apply_codec(msgspec_codec) as applied_codec:
assert applied_codec is msgspec_codec
yield msgspec_codec

View File

@ -80,6 +80,28 @@ class DiffDump(UserList):
return repstr
def iter_fields(struct: Struct) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(struct):
key: str = fi.name
val: Any = getattr(struct, key)
yield (
fi,
key,
val,
)
class Struct(
_Struct,
@ -91,23 +113,6 @@ class Struct(
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
@ -130,7 +135,7 @@ class Struct(
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
for fi, k, v in iter_fields(self):
sin_props[k] = asdict[k]
return sin_props
@ -159,7 +164,7 @@ class Struct(
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
for fi, k, v in iter_fields(self):
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the

View File

@ -26,6 +26,7 @@ from __future__ import annotations
import types
from typing import (
Any,
Callable,
Generic,
Literal,
Type,
@ -37,8 +38,12 @@ from msgspec import (
defstruct,
# field,
Struct,
UNSET,
UnsetType,
# UNSET,
# UnsetType,
)
from tractor.msg import (
pretty_struct,
)
# type variable for the boxed payload field `.pld`
@ -48,11 +53,19 @@ PayloadT = TypeVar('PayloadT')
class Msg(
Struct,
Generic[PayloadT],
# https://jcristharif.com/msgspec/structs.html#tagged-unions
tag=True,
tag_field='msg_type',
# eq=True,
# https://jcristharif.com/msgspec/structs.html#field-ordering
# kw_only=True,
# https://jcristharif.com/msgspec/structs.html#equality-and-order
# order=True,
# https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays
# as_array=True,
):
'''
The "god" boxing msg type.
@ -90,6 +103,53 @@ class Msg(
pld: PayloadT
class Aid(
Struct,
tag=True,
tag_field='msg_type',
):
'''
Actor-identity msg.
Initial contact exchange enabling an actor "mailbox handshake"
delivering the peer identity (and maybe eventually contact)
info.
Used by discovery protocol to register actors as well as
conduct the initial comms (capability) filtering.
'''
name: str
uuid: str
# TODO: use built-in support for UUIDs?
# -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
class SpawnSpec(
pretty_struct.Struct,
tag=True,
tag_field='msg_type',
):
'''
Initial runtime spec handed down from a spawning parent to its
child subactor immediately following first contact via an
`Aid` msg.
'''
_parent_main_data: dict
_runtime_vars: dict[str, Any]
# module import capability
enable_modules: dict[str, str]
# TODO: not just sockaddr pairs?
# -[ ] abstract into a `TransportAddr` type?
reg_addrs: list[tuple[str, int]]
bind_addrs: list[tuple[str, int]]
# TODO: caps based RPC support in the payload?
#
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
@ -105,18 +165,31 @@ class Msg(
#
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
#
# -[ ]better name, like `Call/TaskInput`?
# -[ ] better name, like `Call/TaskInput`?
#
class FuncSpec(Struct):
ns: str
func: str
kwargs: dict
uid: str # (calling) actor-id
# -[ ] XXX a debugger lock msg transaction with payloads like,
# child -> `.pld: DebugLock` -> root
# child <- `.pld: DebugLocked` <- root
# child -> `.pld: DebugRelease` -> root
#
# WHY => when a pld spec is provided it might not allow for
# debug mode msgs as they currently are (using plain old `pld.
# str` payloads) so we only when debug_mode=True we need to
# union in this debugger payload set?
#
# mk_msg_spec(
# MyPldSpec,
# debug_mode=True,
# ) -> (
# Union[MyPldSpec]
# | Union[DebugLock, DebugLocked, DebugRelease]
# )
class Start(
Msg,
Struct,
tag=True,
tag_field='msg_type',
):
'''
Initial request to remotely schedule an RPC `trio.Task` via
@ -134,14 +207,26 @@ class Start(
- `Context.open_context()`
'''
pld: FuncSpec
cid: str
ns: str
func: str
kwargs: dict
uid: tuple[str, str] # (calling) actor-id
class IpcCtxSpec(Struct):
class StartAck(
Struct,
tag=True,
tag_field='msg_type',
):
'''
An inter-actor-`trio.Task`-comms `Context` spec.
Init response to a `Cmd` request indicating the far
end's RPC spec, namely its callable "type".
'''
cid: str
# TODO: maybe better names for all these?
# -[ ] obvi ^ would need sync with `._rpc`
functype: Literal[
@ -160,18 +245,6 @@ class IpcCtxSpec(Struct):
# msgspec: MsgSpec
class StartAck(
Msg,
Generic[PayloadT],
):
'''
Init response to a `Cmd` request indicating the far
end's RPC callable "type".
'''
pld: IpcCtxSpec
class Started(
Msg,
Generic[PayloadT],
@ -202,13 +275,19 @@ class Yield(
pld: PayloadT
class Stop(Msg):
class Stop(
Struct,
tag=True,
tag_field='msg_type',
):
'''
Stream termination signal much like an IPC version
of `StopAsyncIteration`.
'''
pld: UnsetType = UNSET
cid: str
# TODO: do we want to support a payload on stop?
# pld: UnsetType = UNSET
class Return(
@ -223,32 +302,33 @@ class Return(
pld: PayloadT
class ErrorData(Struct):
class Error(
Struct,
tag=True,
tag_field='msg_type',
):
'''
Remote actor error meta-data as needed originally by
A pkt that wraps `RemoteActorError`s for relay and raising.
Fields are 1-to-1 meta-data as needed originally by
`RemoteActorError.msgdata: dict`.
'''
src_uid: str
src_uid: tuple[str, str]
src_type_str: str
boxed_type_str: str
relay_path: list[str]
relay_path: list[tuple[str, str]]
tb_str: str
cid: str|None = None
# TODO: use UNSET or don't include them via
#
# `ContextCancelled`
canceller: str|None = None
canceller: tuple[str, str]|None = None
# `StreamOverrun`
sender: str|None = None
class Error(Msg):
'''
A pkt that wraps `RemoteActorError`s for relay.
'''
pld: ErrorData
sender: tuple[str, str]|None = None
# TODO: should be make a msg version of `ContextCancelled?`
@ -265,6 +345,12 @@ class Error(Msg):
# approx order of the IPC txn-state spaces.
__spec__: list[Msg] = [
# identity handshake
Aid,
# spawn specification from parent
SpawnSpec,
# inter-actor RPC initiation
Start,
StartAck,
@ -280,6 +366,8 @@ __spec__: list[Msg] = [
]
_runtime_spec_msgs: list[Msg] = [
Aid,
SpawnSpec,
Start,
StartAck,
Stop,
@ -443,3 +531,99 @@ def mk_msg_spec(
pld_spec | runtime_spec,
msgtypes_table[spec_build_method] + ipc_msg_types,
)
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
# Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
# '''
# def enc_to_dict(msg: Any) -> Any:
# '''
# Encode `Msg`-structs to `dict` msgs instead
# of using `msgspec.msgpack.Decoder.type`-ed
# features.
# '''
# match msg:
# case Start():
# dctmsg: dict = pretty_struct.Struct.to_dict(
# msg
# )['pld']
# case Error():
# dctmsg: dict = pretty_struct.Struct.to_dict(
# msg
# )['pld']
# return {'error': dctmsg}
# def dec_from_dict(
# type: Type,
# obj: Any,
# ) -> Any:
# '''
# Decode to `Msg`-structs from `dict` msgs instead
# of using `msgspec.msgpack.Decoder.type`-ed
# features.
# '''
# cid: str = obj.get('cid')
# match obj:
# case {'cmd': pld}:
# return Start(
# cid=cid,
# pld=pld,
# )
# case {'functype': pld}:
# return StartAck(
# cid=cid,
# functype=pld,
# # pld=IpcCtxSpec(
# # functype=pld,
# # ),
# )
# case {'started': pld}:
# return Started(
# cid=cid,
# pld=pld,
# )
# case {'yield': pld}:
# return Yield(
# cid=obj['cid'],
# pld=pld,
# )
# case {'stop': pld}:
# return Stop(
# cid=cid,
# )
# case {'return': pld}:
# return Return(
# cid=cid,
# pld=pld,
# )
# case {'error': pld}:
# return Error(
# cid=cid,
# pld=ErrorData(
# **pld
# ),
# )
# return (
# # enc_to_dict,
# dec_from_dict,
# )