From f3ca8608d5e86285f01aa696a8abaf0e6cf6ae07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 2 Apr 2024 09:21:30 -0400 Subject: [PATCH] Get msg spec type limiting working with a `RunVar` Since `contextvars.ContextVar` seems to reset to the default in every new task, switching to using `trio.lowlevel.RunVar` kinda gets close to what we'd like where a child scope can override what's in the rent but ideally without modifying the rent's. I tried `tricycle.TreeVar` as well but it also seems to reset across (embedded) nurseries in our runtime; need to try it again bc apparently that's not how it's suppose to work? NOTE that for now i'm keeping the `.msg.types._ctxvar_MsgCodec` set to the `msgspec` default (`Any` types) so that the test suite will still pass until the runtime is ported to the new msg-spec + codec. Surrounding and in support of all this the `Msg`-set impl deats changed a bit as well as various stuff in `.msg` sub-mods: - drop the `.pld` struct types for `Error`, `Start`, `StartAck` since we don't really need the `.pld` payload field in those cases since they're runtime control msgs for starting RPC tasks and handling remote errors; we can just put the fields directly on each msg since the user will never want/need to override the `.pld` field type. - add a couple new runtime msgs and include them in `msg.__spec__` and make them NOT inherit from `Msg` since they are runtime-specific and thus have no need for `.pld` type constraints: - `Aid` the actor-id identity handshake msg. - `SpawnSpec`: the spawn data passed from a parent actor down to a a child in `Actor._from_parent()` for which we need a shuttle protocol msg, so might as well make it a pendatic one ;) - fix some `Actor.uid` field types that were type-borked on `Error` - add notes about how we need built-in `debug_mode` msgs in order to avoid msg-type errors when using the TTY lock machinery and a different `.pld` spec then the default `Any` is in use.. -> since `devx._debug.lock_tty_for_child()` and it's client side `wait_for_parent_stdin_hijack()` use `Context.started('Locked')` and `MsgStream.send('pdb_unlock')` string values as their `.pld` contents we'd need to either always do a `ipc_pld_spec | str` or pre-define some dedicated `Msg` types which get `Union`-ed in for this? - break out `msg.pretty_struct.Struct._sin_props()` into a helper func `iter_fields()` since the impl doesn't require a struct instance. - as mentioned above since `ContextVar` didn't work as anticipated I next tried `tricycle.TreeVar` but that too didn't seem to keep the `apply_codec()` setting intact across `Portal.open_context()`/`Context.open_stream()` (it kept reverting to the default `.pld: Any` default setting) so I finalized on a trio.lowlevel.RunVar` for now despite it basically being a `global`.. -> will probably come back to test this with `TreeVar` and some hot tips i picked up from @mikenerone in the `trio` gitter, which i put in comments surrounding proto-code. --- tractor/msg/__init__.py | 13 +- tractor/msg/_codec.py | 95 ++++++++++-- tractor/msg/pretty_struct.py | 43 +++--- tractor/msg/types.py | 270 +++++++++++++++++++++++++++++------ 4 files changed, 337 insertions(+), 84 deletions(-) diff --git a/tractor/msg/__init__.py b/tractor/msg/__init__.py index 0c8809a..d8f3747 100644 --- a/tractor/msg/__init__.py +++ b/tractor/msg/__init__.py @@ -31,25 +31,24 @@ from ._codec import ( apply_codec as apply_codec, mk_codec as mk_codec, MsgCodec as MsgCodec, - current_msgspec_codec as current_msgspec_codec, + current_codec as current_codec, ) from .types import ( Msg as Msg, - Start as Start, # with pld - FuncSpec as FuncSpec, + Aid as Aid, + SpawnSpec as SpawnSpec, - StartAck as StartAck, # with pld - IpcCtxSpec as IpcCtxSpec, + Start as Start, + StartAck as StartAck, Started as Started, Yield as Yield, Stop as Stop, Return as Return, - Error as Error, # with pld - ErrorData as ErrorData, + Error as Error, # full msg spec set __spec__ as __spec__, diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index 4477d39..32a58a5 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -30,13 +30,13 @@ ToDo: backends we prolly should offer: ''' from __future__ import annotations -from contextvars import ( - ContextVar, - Token, -) from contextlib import ( contextmanager as cm, ) +# from contextvars import ( +# ContextVar, +# Token, +# ) from typing import ( Any, Callable, @@ -47,6 +47,12 @@ from types import ModuleType import msgspec from msgspec import msgpack +from trio.lowlevel import ( + RunVar, + RunVarToken, +) +# TODO: see notes below from @mikenerone.. +# from tricycle import TreeVar from tractor.msg.pretty_struct import Struct from tractor.msg.types import ( @@ -72,6 +78,9 @@ class MsgCodec(Struct): ''' A IPC msg interchange format lib's encoder + decoder pair. + Pretty much nothing more then delegation to underlying + `msgspec..Encoder/Decoder`s for now. + ''' _enc: msgpack.Encoder _dec: msgpack.Decoder @@ -86,11 +95,6 @@ class MsgCodec(Struct): lib: ModuleType = msgspec - # ad-hoc type extensions - # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - enc_hook: Callable[[Any], Any]|None = None # coder - dec_hook: Callable[[type, Any], Any]|None = None # decoder - # TODO: a sub-decoder system as well? # payload_msg_specs: Union[Type[Struct]] = Any # see related comments in `.msg.types` @@ -304,7 +308,8 @@ def mk_codec( libname: str = 'msgspec', - # proxy as `Struct(**kwargs)` + # proxy as `Struct(**kwargs)` for ad-hoc type extensions + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # ------ - ------ dec_hook: Callable|None = None, enc_hook: Callable|None = None, @@ -389,14 +394,52 @@ def mk_codec( # no custom structs, hooks or other special types. _def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any) -# NOTE: provides for per-`trio.Task` specificity of the +# The built-in IPC `Msg` spec. +# Our composing "shuttle" protocol which allows `tractor`-app code +# to use any `msgspec` supported type as the `Msg.pld` payload, +# https://jcristharif.com/msgspec/supported-types.html +# +_def_tractor_codec: MsgCodec = mk_codec( + ipc_pld_spec=Any, +) +# TODO: IDEALLY provides for per-`trio.Task` specificity of the # IPC msging codec used by the transport layer when doing # `Channel.send()/.recv()` of wire data. -_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( + +# ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!? +# _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( + +# TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery +# even though it's supposed to inherit from a parent context ??? +# +# _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar( +# +# ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter: +# +# 22:02:54 even for regular contextvars, all you have to do is: +# `task: Task = trio.lowlevel.current_task()` +# `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)` +# +# From a comment in his prop code he couldn't share outright: +# 1. For every TreeVar set in the current task (which covers what +# we need from SynchronizerFacade), walk up the tree until the +# root or finding one where the TreeVar is already set, setting +# it in all of the contexts along the way. +# 2. For each of those, we also forcibly set the values that are +# pending for child nurseries that have not yet accessed the +# TreeVar. +# 3. We similarly set the pending values for the child nurseries +# of the *current* task. +# + +# TODO: STOP USING THIS, since it's basically a global and won't +# allow sub-IPC-ctxs to limit the msg-spec however desired.. +_ctxvar_MsgCodec: MsgCodec = RunVar( 'msgspec_codec', # TODO: move this to our new `Msg`-spec! default=_def_msgspec_codec, + # default=_def_tractor_codec, ) @@ -410,15 +453,36 @@ def apply_codec( runtime context such that all IPC msgs are processed with it for that task. + Uses a `tricycle.TreeVar` to ensure the scope of the codec + matches the `@cm` block and DOES NOT change to the original + (default) value in new tasks (as it does for `ContextVar`). + + See the docs: + - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables + - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py + ''' - token: Token = _ctxvar_MsgCodec.set(codec) + orig: MsgCodec = _ctxvar_MsgCodec.get() + assert orig is not codec + token: RunVarToken = _ctxvar_MsgCodec.set(codec) + + # TODO: for TreeVar approach, see docs for @cm `.being()` API: + # https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables + # try: + # with _ctxvar_MsgCodec.being(codec): + # new = _ctxvar_MsgCodec.get() + # assert new is codec + # yield codec + try: yield _ctxvar_MsgCodec.get() finally: _ctxvar_MsgCodec.reset(token) + assert _ctxvar_MsgCodec.get() is orig -def current_msgspec_codec() -> MsgCodec: + +def current_codec() -> MsgCodec: ''' Return the current `trio.Task.context`'s value for `msgspec_codec` used by `Channel.send/.recv()` @@ -449,5 +513,6 @@ def limit_msg_spec( payload_types=payload_types, **codec_kwargs, ) - with apply_codec(msgspec_codec): + with apply_codec(msgspec_codec) as applied_codec: + assert applied_codec is msgspec_codec yield msgspec_codec diff --git a/tractor/msg/pretty_struct.py b/tractor/msg/pretty_struct.py index 143fc7a..412b6ed 100644 --- a/tractor/msg/pretty_struct.py +++ b/tractor/msg/pretty_struct.py @@ -80,6 +80,28 @@ class DiffDump(UserList): return repstr +def iter_fields(struct: Struct) -> Iterator[ + tuple[ + structs.FieldIinfo, + str, + Any, + ] +]: + ''' + Iterate over all non-@property fields of this struct. + + ''' + fi: structs.FieldInfo + for fi in structs.fields(struct): + key: str = fi.name + val: Any = getattr(struct, key) + yield ( + fi, + key, + val, + ) + + class Struct( _Struct, @@ -91,23 +113,6 @@ class Struct( A "human friendlier" (aka repl buddy) struct subtype. ''' - def _sin_props(self) -> Iterator[ - tuple[ - structs.FieldIinfo, - str, - Any, - ] - ]: - ''' - Iterate over all non-@property fields of this struct. - - ''' - fi: structs.FieldInfo - for fi in structs.fields(self): - key: str = fi.name - val: Any = getattr(self, key) - yield fi, key, val - def to_dict( self, include_non_members: bool = True, @@ -130,7 +135,7 @@ class Struct( # added as type-defined `@property` methods! sin_props: dict = {} fi: structs.FieldInfo - for fi, k, v in self._sin_props(): + for fi, k, v in iter_fields(self): sin_props[k] = asdict[k] return sin_props @@ -159,7 +164,7 @@ class Struct( fi: structs.FieldInfo k: str v: Any - for fi, k, v in self._sin_props(): + for fi, k, v in iter_fields(self): # TODO: how can we prefer `Literal['option1', 'option2, # ..]` over .__name__ == `Literal` but still get only the diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 2411f0f..a81473d 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -26,6 +26,7 @@ from __future__ import annotations import types from typing import ( Any, + Callable, Generic, Literal, Type, @@ -37,8 +38,12 @@ from msgspec import ( defstruct, # field, Struct, - UNSET, - UnsetType, + # UNSET, + # UnsetType, +) + +from tractor.msg import ( + pretty_struct, ) # type variable for the boxed payload field `.pld` @@ -48,11 +53,19 @@ PayloadT = TypeVar('PayloadT') class Msg( Struct, Generic[PayloadT], + + # https://jcristharif.com/msgspec/structs.html#tagged-unions tag=True, tag_field='msg_type', - # eq=True, + # https://jcristharif.com/msgspec/structs.html#field-ordering + # kw_only=True, + + # https://jcristharif.com/msgspec/structs.html#equality-and-order # order=True, + + # https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays + # as_array=True, ): ''' The "god" boxing msg type. @@ -90,6 +103,53 @@ class Msg( pld: PayloadT +class Aid( + Struct, + tag=True, + tag_field='msg_type', +): + ''' + Actor-identity msg. + + Initial contact exchange enabling an actor "mailbox handshake" + delivering the peer identity (and maybe eventually contact) + info. + + Used by discovery protocol to register actors as well as + conduct the initial comms (capability) filtering. + + ''' + name: str + uuid: str + # TODO: use built-in support for UUIDs? + # -[ ] `uuid.UUID` which has multi-protocol support + # https://jcristharif.com/msgspec/supported-types.html#uuid + + +class SpawnSpec( + pretty_struct.Struct, + tag=True, + tag_field='msg_type', +): + ''' + Initial runtime spec handed down from a spawning parent to its + child subactor immediately following first contact via an + `Aid` msg. + + ''' + _parent_main_data: dict + _runtime_vars: dict[str, Any] + + # module import capability + enable_modules: dict[str, str] + + # TODO: not just sockaddr pairs? + # -[ ] abstract into a `TransportAddr` type? + reg_addrs: list[tuple[str, int]] + bind_addrs: list[tuple[str, int]] + + + # TODO: caps based RPC support in the payload? # # -[ ] integration with our ``enable_modules: list[str]`` caps sys. @@ -105,18 +165,31 @@ class Msg( # # -[ ] can we combine .ns + .func into a native `NamespacePath` field? # -# -[ ]better name, like `Call/TaskInput`? +# -[ ] better name, like `Call/TaskInput`? # -class FuncSpec(Struct): - ns: str - func: str - - kwargs: dict - uid: str # (calling) actor-id +# -[ ] XXX a debugger lock msg transaction with payloads like, +# child -> `.pld: DebugLock` -> root +# child <- `.pld: DebugLocked` <- root +# child -> `.pld: DebugRelease` -> root +# +# WHY => when a pld spec is provided it might not allow for +# debug mode msgs as they currently are (using plain old `pld. +# str` payloads) so we only when debug_mode=True we need to +# union in this debugger payload set? +# +# mk_msg_spec( +# MyPldSpec, +# debug_mode=True, +# ) -> ( +# Union[MyPldSpec] +# | Union[DebugLock, DebugLocked, DebugRelease] +# ) class Start( - Msg, + Struct, + tag=True, + tag_field='msg_type', ): ''' Initial request to remotely schedule an RPC `trio.Task` via @@ -134,14 +207,26 @@ class Start( - `Context.open_context()` ''' - pld: FuncSpec + cid: str + + ns: str + func: str + + kwargs: dict + uid: tuple[str, str] # (calling) actor-id -class IpcCtxSpec(Struct): +class StartAck( + Struct, + tag=True, + tag_field='msg_type', +): ''' - An inter-actor-`trio.Task`-comms `Context` spec. + Init response to a `Cmd` request indicating the far + end's RPC spec, namely its callable "type". ''' + cid: str # TODO: maybe better names for all these? # -[ ] obvi ^ would need sync with `._rpc` functype: Literal[ @@ -160,18 +245,6 @@ class IpcCtxSpec(Struct): # msgspec: MsgSpec -class StartAck( - Msg, - Generic[PayloadT], -): - ''' - Init response to a `Cmd` request indicating the far - end's RPC callable "type". - - ''' - pld: IpcCtxSpec - - class Started( Msg, Generic[PayloadT], @@ -202,13 +275,19 @@ class Yield( pld: PayloadT -class Stop(Msg): +class Stop( + Struct, + tag=True, + tag_field='msg_type', +): ''' Stream termination signal much like an IPC version of `StopAsyncIteration`. ''' - pld: UnsetType = UNSET + cid: str + # TODO: do we want to support a payload on stop? + # pld: UnsetType = UNSET class Return( @@ -223,32 +302,33 @@ class Return( pld: PayloadT -class ErrorData(Struct): +class Error( + Struct, + tag=True, + tag_field='msg_type', +): ''' - Remote actor error meta-data as needed originally by + A pkt that wraps `RemoteActorError`s for relay and raising. + + Fields are 1-to-1 meta-data as needed originally by `RemoteActorError.msgdata: dict`. ''' - src_uid: str + src_uid: tuple[str, str] src_type_str: str boxed_type_str: str - - relay_path: list[str] + relay_path: list[tuple[str, str]] tb_str: str + cid: str|None = None + + # TODO: use UNSET or don't include them via + # # `ContextCancelled` - canceller: str|None = None + canceller: tuple[str, str]|None = None # `StreamOverrun` - sender: str|None = None - - -class Error(Msg): - ''' - A pkt that wraps `RemoteActorError`s for relay. - - ''' - pld: ErrorData + sender: tuple[str, str]|None = None # TODO: should be make a msg version of `ContextCancelled?` @@ -265,6 +345,12 @@ class Error(Msg): # approx order of the IPC txn-state spaces. __spec__: list[Msg] = [ + # identity handshake + Aid, + + # spawn specification from parent + SpawnSpec, + # inter-actor RPC initiation Start, StartAck, @@ -280,6 +366,8 @@ __spec__: list[Msg] = [ ] _runtime_spec_msgs: list[Msg] = [ + Aid, + SpawnSpec, Start, StartAck, Stop, @@ -443,3 +531,99 @@ def mk_msg_spec( pld_spec | runtime_spec, msgtypes_table[spec_build_method] + ipc_msg_types, ) + + +# TODO: make something similar to this inside `._codec` such that +# user can just pass a type table of some sort? +# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: +# ''' +# Deliver a `enc_hook()`/`dec_hook()` pair which does +# manual convertion from our above native `Msg` set +# to `dict` equivalent (wire msgs) in order to keep legacy compat +# with the original runtime implementation. + +# Note: this is is/was primarly used while moving the core +# runtime over to using native `Msg`-struct types wherein we +# start with the send side emitting without loading +# a typed-decoder and then later flipping the switch over to +# load to the native struct types once all runtime usage has +# been adjusted appropriately. + +# ''' +# def enc_to_dict(msg: Any) -> Any: +# ''' +# Encode `Msg`-structs to `dict` msgs instead +# of using `msgspec.msgpack.Decoder.type`-ed +# features. + +# ''' +# match msg: +# case Start(): +# dctmsg: dict = pretty_struct.Struct.to_dict( +# msg +# )['pld'] + +# case Error(): +# dctmsg: dict = pretty_struct.Struct.to_dict( +# msg +# )['pld'] +# return {'error': dctmsg} + + +# def dec_from_dict( +# type: Type, +# obj: Any, +# ) -> Any: +# ''' +# Decode to `Msg`-structs from `dict` msgs instead +# of using `msgspec.msgpack.Decoder.type`-ed +# features. + +# ''' +# cid: str = obj.get('cid') +# match obj: +# case {'cmd': pld}: +# return Start( +# cid=cid, +# pld=pld, +# ) +# case {'functype': pld}: +# return StartAck( +# cid=cid, +# functype=pld, +# # pld=IpcCtxSpec( +# # functype=pld, +# # ), +# ) +# case {'started': pld}: +# return Started( +# cid=cid, +# pld=pld, +# ) +# case {'yield': pld}: +# return Yield( +# cid=obj['cid'], +# pld=pld, +# ) +# case {'stop': pld}: +# return Stop( +# cid=cid, +# ) +# case {'return': pld}: +# return Return( +# cid=cid, +# pld=pld, +# ) + +# case {'error': pld}: +# return Error( +# cid=cid, +# pld=ErrorData( +# **pld +# ), +# ) + +# return ( +# # enc_to_dict, +# dec_from_dict, +# )