Flip default codec to our `Msg`-spec

Yes, this is "the switch" and will likely cause the test suite to bail
until a few more fixes some in.

Tweaked a couple `.msg` pkg exports:
- remove `__spec__` (used by modules) and change it to `__msg_types:
  lists[Msg]` as well as add a new `__msg_spec__: TypeAlias`, being the
  default `Any` paramed spec.
- tweak the naming of `msg.types` lists of runtime vs payload msgs to:
  `._runtime_msgs` and `._payload_msgs`.
- just build `__msg_types__` out of the above 2 lists.
runtime_to_msgspec
Tyler Goodlet 2024-04-03 09:45:03 -04:00
parent a65e1e7a88
commit 70ab60ce7c
3 changed files with 47 additions and 30 deletions

View File

@ -18,6 +18,10 @@
Built-in messaging patterns, types, APIs and helpers. Built-in messaging patterns, types, APIs and helpers.
''' '''
from typing import (
Union,
TypeAlias,
)
from .ptr import ( from .ptr import (
NamespacePath as NamespacePath, NamespacePath as NamespacePath,
) )
@ -50,6 +54,10 @@ from .types import (
Error as Error, Error as Error,
# full msg spec set # full msg class set from above as list
__spec__ as __spec__, __msg_types__ as __msg_types__,
) )
# TODO: use new type declaration syntax for msg-type-spec
# https://docs.python.org/3/library/typing.html#type-aliases
# https://docs.python.org/3/reference/simple_stmts.html#type
__msg_spec__: TypeAlias = Union[*__msg_types__]

View File

@ -438,8 +438,8 @@ _ctxvar_MsgCodec: MsgCodec = RunVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec! # TODO: move this to our new `Msg`-spec!
default=_def_msgspec_codec, # default=_def_msgspec_codec,
# default=_def_tractor_codec, default=_def_tractor_codec,
) )

View File

@ -26,7 +26,7 @@ from __future__ import annotations
import types import types
from typing import ( from typing import (
Any, Any,
Callable, # Callable,
Generic, Generic,
Literal, Literal,
Type, Type,
@ -340,50 +340,54 @@ class Error(
# class Overrun(Msg): # class Overrun(Msg):
# cid: str # cid: str
_runtime_msgs: list[Msg] = [
# built-in SC shuttle protocol msg type set in # identity handshake on first IPC `Channel` contact.
# approx order of the IPC txn-state spaces.
__spec__: list[Msg] = [
# identity handshake
Aid, Aid,
# spawn specification from parent # parent-to-child spawn specification passed as 2nd msg after
# handshake ONLY after child connects back to parent.
SpawnSpec, SpawnSpec,
# inter-actor RPC initiation # inter-actor RPC initiation
Start, Start, # schedule remote task-as-func
StartAck, StartAck, # ack the schedule request
# no-outcome-yet IAC (inter-actor-communication) # emission from `MsgStream.aclose()`
Started,
Yield,
Stop, Stop,
# termination outcomes # box remote errors, normally subtypes
Return, # of `RemoteActorError`.
Error, Error,
] ]
_runtime_spec_msgs: list[Msg] = [ # the no-outcome-yet IAC (inter-actor-communication) sub-set which
Aid, # can be `Msg.pld` payload field type-limited by application code
SpawnSpec, # using `apply_codec()` and `limit_msg_spec()`.
Start, _payload_msgs: list[Msg] = [
StartAck, # first <value> from `Context.started(<value>)`
Stop,
Error,
]
_payload_spec_msgs: list[Msg] = [
Started, Started,
# any <value> sent via `MsgStream.send(<value>)`
Yield, Yield,
# the final value returned from a `@context` decorated
# IPC endpoint.
Return, Return,
] ]
# built-in SC shuttle protocol msg type set in
# approx order of the IPC txn-state spaces.
__msg_types__: list[Msg] = (
_runtime_msgs
+
_payload_msgs
)
def mk_msg_spec( def mk_msg_spec(
payload_type_union: Union[Type] = Any, payload_type_union: Union[Type] = Any,
# boxing_msg_set: list[Msg] = _payload_spec_msgs,
spec_build_method: Literal[ spec_build_method: Literal[
'indexed_generics', # works 'indexed_generics', # works
'defstruct', 'defstruct',
@ -424,12 +428,12 @@ def mk_msg_spec(
defs_msg_types: list[Msg] = [] defs_msg_types: list[Msg] = []
nc_msg_types: list[Msg] = [] nc_msg_types: list[Msg] = []
for msgtype in __spec__: for msgtype in __msg_types__:
# for the NON-payload (user api) type specify-able # for the NON-payload (user api) type specify-able
# msgs types, we simply aggregate the def as is # msgs types, we simply aggregate the def as is
# for inclusion in the output type `Union`. # for inclusion in the output type `Union`.
if msgtype not in _payload_spec_msgs: if msgtype not in _payload_msgs:
ipc_msg_types.append(msgtype) ipc_msg_types.append(msgtype)
continue continue
@ -535,6 +539,11 @@ def mk_msg_spec(
# TODO: make something similar to this inside `._codec` such that # TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort? # user can just pass a type table of some sort?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
# and then call `.to_dict()` on them?
# -[ ] we're going to need to re-impl all the stuff changed in the
# runtime port such that it can handle dicts or `Msg`s?
#
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: # def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# ''' # '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does # Deliver a `enc_hook()`/`dec_hook()` pair which does