Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet bc9faac218 Init def of "SC shuttle prot" with "msg-spec-limiting"
As per the long outstanding GH issue this starts our rigorous journey
into an attempt at a type-safe, cross-actor SC, IPC protocol Bo

boop -> https://github.com/goodboy/tractor/issues/36

The idea is to "formally" define our SC "shuttle (dialog) protocol" by
specifying a new `.msg.types.Msg` subtype-set which can fully
encapsulate all IPC msg schemas needed in order to accomplish
cross-process SC!

------ - ------

The msg set deviated a little in terms of (type) names from the existing
`dict`-msgs currently used in the runtime impl but, I think the msg name
changes are much better in terms of explicitly representing the internal
semantics of the actor runtime machinery subsystems and IPC-msg-dialog
required for SC enforced RPC. In cursory, the new formal msgs-spec
includes the following msg-subtypes of a new top-level `Msg` boxing type
(that holds the base field schema for all msgs):

- `Start` to request RPC task scheduling by passing a `FuncSpec`
  payload (implemented currently with a `{'cmd': ... }` dict msg)

- `StartAck` to allow the RPC task callee-side to report a `IpcCtxSpec`
  payload immediately back to the caller (currently done naively via
  a `{'functype': ... }` initial response msg)

- `Started` to deliver the first value from `Context.started()`
  (instead of existing `{'started': ... }`)

- `Yield` to shuttle `MsgStream.send()`-ed values (currently
  `{'yield': ... }`s)

- `Stop` to terminate a `Context.open_stream()` session/block (currently
  a `{'stop': True }`)

- `Return` to deliver the final value from the `Actor.start_remote_task()`
  remote task (instead of `{'return': ... }`)

- `Error` to box `RemoteActorError` exceptions via a `.pld: ErrorData`
  payload, also planned to replace/extend the current
  `RemoteActorError.msgdata` mechanism internal to `pack/unpack_error()`

The new `tractor.msg.types` includes all these msg defs as well an API
for rendering a "payload msg specification" using a `payload_type_spec:
Union[Type]` that can be passed to
`msgspec.msgpack.Decoder(type=payload_type_spec)`. This ensures that
(per msg relevant) `Msg.pld: PayloadT` data is type-parameterized using
`msgspec`'s new `Generic[PayloadT]` field support and thus enables
providing for an API where IPC `Context` dialogs can strictly define the
allowed payload-msg-set by a type union!

Iow, this is the foundation for supporting `Channel`/`Context`/`MsgStream`
IPC primitives which are type checked/safe as desired in GH issue:
- https://github.com/goodboy/tractor/issues/365

Misc notes on current impl(s) status:
------ - ------
- add a `.msg.types.mk_msg_spec()` which uses the new `msgspec` support
  for `class MyStruct[Struct, Generic[T]]` parameterize-able fields and
  delivers our boxing SC-msg-(sub)set with the desired `payload_types`
  applied to `.pld`:
  - https://jcristharif.com/msgspec/supported-types.html#generic-types
  - as a note this impl seems to need to use `type.new_class()` dynamic
    subtype generation, though i don't really get *why* still.. but
    without that the `msgspec.msgpack.Decoder` doesn't seem to reject
    `.pld` limited `Msg` subtypes as demonstrated in the new test.

- around this ^ add a `.msg._codec.limit_msg_spec()` cm which exposes
  this payload type limiting API such that it can be applied per task
  via a `MsgCodec` in app code.

- the orig approach in https://github.com/goodboy/tractor/pull/311 was
  the idea of making payload fields `.pld: Raw` wherein we could have
  per-field/sub-msg decoders dynamically loaded depending on the
  particular application-layer schema in use. I don't want to lose the
  idea of this since I think it might be useful for an idea I have about
  capability-based-fields(-sharing, maybe using field-subset
  encryption?), and as such i've kept the (ostensibly) working impls in
  TODO-comments in `.msg._codec` wherein maybe we can add
  a `MsgCodec._payload_decs: dict` table for this later on.
  |_ also left in the `.msg.types.enc/decmsg()` impls but renamed as
    `enc/dec_payload()` (but reworked to not rely on the lifo codec
    stack tables; now removed) such that we can prolly move them to
    `MsgCodec` methods in the future.

- add an unused `._codec.mk_tagged_union_dec()` helper which was
  originally factored out the #311 proto-code but didn't end up working
  as desired with the new parameterized generic fields approach (now
  in `msg.types.mk_msg_spec()`)

Testing/deps work:
------ - ------
- new `test_limit_msgspec()` which ensures all the `.types` content is
  correct but without using the wrapping APIs in `._codec`; i.e. using
  a in-line `Decoder` instead of a `MsgCodec`.

- pin us to `msgspec>=0.18.5` which has the needed generic-types support
  (which took me way too long yester to figure out when implementing all
  this XD)!
2024-03-28 10:45:01 -04:00
Tyler Goodlet d55266f4a2 Move the pretty-`Struct` stuff to a `.pretty_struct`
Leave all the proto native struct-msg stuff in `.types` since i'm
thinking it's the right name for the mod that will hold all the built-in
SCIPP msgspecs longer run. Obvi the naive codec stack stuff needs to be
cleaned out/up and anything useful moved into `._codec` ;)
2024-03-26 18:27:57 -04:00
Tyler Goodlet 79211eab9a Merge original content from PR #311 into `.msg.types` for now 2024-03-26 17:47:55 -04:00
7 changed files with 903 additions and 422 deletions

View File

@ -60,7 +60,7 @@ setup(
'wrapt',
# IPC serialization
'msgspec',
'msgspec>=0.18.5',
# debug mode REPL
'pdbp',

View File

@ -6,12 +6,22 @@ B~)
'''
from typing import (
Any,
_GenericAlias,
Type,
Union,
)
from contextvars import (
Context,
)
# from inspect import Parameter
from msgspec import (
structs,
msgpack,
# defstruct,
Struct,
ValidationError,
)
import tractor
from tractor.msg import (
_def_msgspec_codec,
@ -23,6 +33,12 @@ from tractor.msg import (
apply_codec,
current_msgspec_codec,
)
from tractor.msg.types import (
PayloadT,
Msg,
# Started,
mk_msg_spec,
)
import trio
# TODO: wrap these into `._codec` such that user can just pass
@ -54,7 +70,7 @@ def mk_custom_codec() -> MsgCodec:
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
nsp_codec: MsgCodec = mk_codec(
dec_types=NamespacePath,
ipc_msg_spec=NamespacePath,
enc_hook=enc_hook,
dec_hook=dec_hook,
)
@ -196,3 +212,166 @@ def test_codec_hooks_mod():
await p.cancel_actor()
trio.run(main)
def chk_pld_type(
generic: Msg|_GenericAlias,
payload_type: Type[Struct]|Any,
pld: Any,
) -> bool:
roundtrip: bool = False
pld_val_type: Type = type(pld)
# gen_paramed: _GenericAlias = generic[payload_type]
# TODO: verify that the overridden subtypes
# DO NOT have modified type-annots from original!
# 'Start', .pld: FuncSpec
# 'StartAck', .pld: IpcCtxSpec
# 'Stop', .pld: UNSEt
# 'Error', .pld: ErrorData
# for typedef in (
# [gen_paramed]
# +
# # type-var should always be set for these sub-types
# # as well!
# Msg.__subclasses__()
# ):
# if typedef.__name__ not in [
# 'Msg',
# 'Started',
# 'Yield',
# 'Return',
# ]:
# continue
# payload_type: Type[Struct] = CustomPayload
# TODO: can remove all this right!?
#
# when parameterized (like `Msg[Any]`) then
# we expect an alias as input.
# if isinstance(generic, _GenericAlias):
# assert payload_type in generic.__args__
# else:
# assert PayloadType in generic.__parameters__
# pld_param: Parameter = generic.__signature__.parameters['pld']
# assert pld_param.annotation is PayloadType
type_spec: Union[Type[Struct]]
msg_types: list[Msg[payload_type]]
(
type_spec,
msg_types,
) = mk_msg_spec(
payload_type=payload_type,
)
enc = msgpack.Encoder()
dec = msgpack.Decoder(
type=type_spec, # like `Msg[Any]`
)
# verify the boxed-type for all variable payload-type msgs.
for typedef in msg_types:
pld_field = structs.fields(typedef)[1]
assert pld_field.type in {payload_type, PayloadT}
# TODO: does this need to work to get all subtypes to
# adhere?
assert pld_field.type is payload_type
kwargs: dict[str, Any] = {
'cid': '666',
'pld': pld,
}
enc_msg = typedef(**kwargs)
wire_bytes: bytes = enc.encode(enc_msg)
try:
dec_msg = dec.decode(wire_bytes)
assert dec_msg.pld == pld
assert (roundtrip := (dec_msg == enc_msg))
except ValidationError as ve:
# breakpoint()
if pld_val_type is payload_type:
raise ValueError(
'Got `ValidationError` despite type-var match!?\n'
f'pld_val_type: {pld_val_type}\n'
f'payload_type: {payload_type}\n'
) from ve
else:
# ow we good cuz the pld spec mismatched.
print(
'Got expected `ValidationError` since,\n'
f'{pld_val_type} is not {payload_type}\n'
)
else:
if (
pld_val_type is not payload_type
and payload_type is not Any
):
raise ValueError(
'DID NOT `ValidationError` despite expected type match!?\n'
f'pld_val_type: {pld_val_type}\n'
f'payload_type: {payload_type}\n'
)
return roundtrip
def test_limit_msgspec():
async def main():
async with tractor.open_root_actor(
debug_mode=True
):
# ensure we can round-trip a boxing `Msg`
assert chk_pld_type(
Msg,
Any,
None,
)
# TODO: don't need this any more right since
# `msgspec>=0.15` has the nice generics stuff yah??
#
# manually override the type annot of the payload
# field and ensure it propagates to all msg-subtypes.
# Msg.__annotations__['pld'] = Any
# verify that a mis-typed payload value won't decode
assert not chk_pld_type(
Msg,
int,
pld='doggy',
)
# parametrize the boxed `.pld` type as a custom-struct
# and ensure that parametrization propagates
# to all payload-msg-spec-able subtypes!
class CustomPayload(Struct):
name: str
value: Any
assert not chk_pld_type(
Msg,
CustomPayload,
pld='doggy',
)
assert chk_pld_type(
Msg,
CustomPayload,
pld=CustomPayload(name='doggy', value='urmom')
)
# uhh bc we can `.pause_from_sync()` now! :surfer:
# breakpoint()
trio.run(main)

View File

@ -21,11 +21,10 @@ Built-in messaging patterns, types, APIs and helpers.
from .ptr import (
NamespacePath as NamespacePath,
)
from .types import (
from .pretty_struct import (
Struct as Struct,
)
from ._codec import (
_def_msgspec_codec as _def_msgspec_codec,
_ctxvar_MsgCodec as _ctxvar_MsgCodec,

View File

@ -47,20 +47,25 @@ from types import ModuleType
import msgspec
from msgspec import msgpack
from .types import Struct
from tractor.msg.pretty_struct import Struct
from tractor.msg.types import (
mk_msg_spec,
Msg,
)
# TODO: API changes towards being interchange lib agnostic!
#
# -[ ] capnproto has pre-compiled schema for eg..
# * https://capnproto.org/language.html
# * http://capnproto.github.io/pycapnp/quickstart.html
# * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp
#
class MsgCodec(Struct):
'''
A IPC msg interchange format lib's encoder + decoder pair.
'''
lib: ModuleType = msgspec
# ad-hoc type extensions
@ -70,12 +75,22 @@ class MsgCodec(Struct):
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
types: Union[Type[Struct]]|Any = Any
ipc_msg_spec: Union[Type[Struct]]|Any = Any
payload_msg_spec: Union[Type[Struct]] = Any
# post-configure cached props
_enc: msgpack.Encoder|None = None
_dec: msgpack.Decoder|None = None
# TODO: a sub-decoder system as well?
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# TODO: use `functools.cached_property` for these ?
# https://docs.python.org/3/library/functools.html#functools.cached_property
@ -88,8 +103,9 @@ class MsgCodec(Struct):
enc_hook: Callable|None = None,
reset: bool = False,
# TODO: what's the default for this?
# TODO: what's the default for this, and do we care?
# write_buffer_size: int
#
**kwargs,
) -> msgpack.Encoder:
@ -131,7 +147,7 @@ class MsgCodec(Struct):
def decoder(
self,
types: Union[Type[Struct]]|None = None,
ipc_msg_spec: Union[Type[Struct]]|None = None,
dec_hook: Callable|None = None,
reset: bool = False,
**kwargs,
@ -152,7 +168,7 @@ class MsgCodec(Struct):
or reset
):
self._dec = self.lib.msgpack.Decoder(
types or self.types,
type=ipc_msg_spec or self.ipc_msg_spec,
dec_hook=dec_hook or self.dec_hook,
**kwargs,
)
@ -169,10 +185,39 @@ class MsgCodec(Struct):
determined by the
'''
return self.dec.decode(msg)
def mk_tagged_union_dec(
tagged_structs: list[Struct],
) -> tuple[
list[str],
msgpack.Decoder,
]:
# See "tagged unions" docs:
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# "The quickest way to enable tagged unions is to set tag=True when
# defining every struct type in the union. In this case tag_field
# defaults to "type", and tag defaults to the struct class name
# (e.g. "Get")."
first: Struct = tagged_structs[0]
types_union: Union[Type[Struct]] = Union[
first
]|Any
tags: list[str] = [first.__name__]
for struct in tagged_structs[1:]:
types_union |= struct
tags.append(struct.__name__)
dec = msgpack.Decoder(types_union)
return (
tags,
dec,
)
# TODO: struct aware messaging coders as per:
# - https://github.com/goodboy/tractor/issues/36
# - https://github.com/goodboy/tractor/issues/196
@ -181,13 +226,18 @@ class MsgCodec(Struct):
def mk_codec(
libname: str = 'msgspec',
# for codec-ing boxed `Msg`-with-payload msgs
payload_types: Union[Type[Struct]]|None = None,
# TODO: do we want to allow NOT/using a diff `Msg`-set?
#
# struct type unions set for `Decoder`
# https://jcristharif.com/msgspec/structs.html#tagged-unions
dec_types: Union[Type[Struct]]|Any = Any,
ipc_msg_spec: Union[Type[Struct]]|Any = Any,
cache_now: bool = True,
# proxy to the `Struct.__init__()`
# proxy as `Struct(**kwargs)`
**kwargs,
) -> MsgCodec:
@ -197,14 +247,59 @@ def mk_codec(
`msgspec` ;).
'''
# (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP
# will automatically decode to a type-"limited" payload (`Struct`)
# object (set).
payload_type_spec: Union[Type[Msg]]|None = None
if payload_types:
(
payload_type_spec,
msg_types,
) = mk_msg_spec(
payload_type=payload_types,
)
assert len(payload_type_spec.__args__) == len(msg_types)
# TODO: sub-decode `.pld: Raw`?
# see similar notes inside `.msg.types`..
#
# not sure we'll end up wanting/needing this
# though it might have unforeseen advantages in terms
# of enabling encrypted appliciation layer (only)
# payloads?
#
# register sub-payload decoders to load `.pld: Raw`
# decoded `Msg`-packets using a dynamic lookup (table)
# instead of a pre-defined msg-spec via `Generic`
# parameterization.
#
# (
# tags,
# payload_dec,
# ) = mk_tagged_union_dec(
# tagged_structs=list(payload_types.__args__),
# )
# _payload_decs: (
# dict[str, msgpack.Decoder]|None
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `Msg.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
# for name in tags:
# _payload_decs[name] = payload_dec
codec = MsgCodec(
types=dec_types,
ipc_msg_spec=ipc_msg_spec,
payload_msg_spec=payload_type_spec,
**kwargs,
)
assert codec.lib.__name__ == libname
# by default config and cache the codec pair for given
# input settings.
# by default, config-n-cache the codec pair from input settings.
if cache_now:
assert codec.enc
assert codec.dec
@ -251,3 +346,28 @@ def current_msgspec_codec() -> MsgCodec:
'''
return _ctxvar_MsgCodec.get()
@cm
def limit_msg_spec(
payload_types: Union[Type[Struct]],
# TODO: don't need this approach right?
#
# tagged_structs: list[Struct]|None = None,
**codec_kwargs,
):
'''
Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`.
'''
msgspec_codec: MsgCodec = mk_codec(
payload_types=payload_types,
**codec_kwargs,
)
with apply_codec(msgspec_codec):
yield msgspec_codec

View File

@ -1,208 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Capability-based messaging specifications: or colloquially as "msgspecs".
Includes our SCIPP (structured-con-inter-process-protocol) message type defs
and APIs for applying custom msgspec-sets for implementing un-protocol state machines.
'''
# TODO: integration with our ``enable_modules: list[str]`` caps sys.
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by inserting
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
# entering the ``Actor._process_messages()`` loop).
# https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
# the new "Implicit Namespace Packages" might be relevant?
# - https://www.python.org/dev/peps/pep-0420/
# add implicit serialized message type support so that paths can be
# handed directly to IPC primitives such as streams and `Portal.run()`
# calls:
# - via ``msgspec``:
# - https://jcristharif.com/msgspec/api.html#struct
# - https://jcristharif.com/msgspec/extending.html
# via ``msgpack-python``:
# https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
from __future__ import annotations
from contextlib import contextmanager as cm
from typing import (
Any,
Optional,
Union,
)
from msgspec import Struct, Raw
from msgspec.msgpack import (
Encoder,
Decoder,
)
# LIFO codec stack that is appended when the user opens the
# ``configure_native_msgs()`` cm below to configure a new codec set
# which will be applied to all new (msgspec relevant) IPC transports
# that are spawned **after** the configure call is made.
_lifo_codecs: list[
tuple[
Encoder,
Decoder,
],
] = [(Encoder(), Decoder())]
def get_msg_codecs() -> tuple[
Encoder,
Decoder,
]:
'''
Return the currently configured ``msgspec`` codec set.
The defaults are defined above.
'''
global _lifo_codecs
return _lifo_codecs[-1]
@cm
def configure_native_msgs(
tagged_structs: list[Struct],
):
'''
Push a codec set that will natively decode
tagged structs provied in ``tagged_structs``
in all IPC transports and pop the codec on exit.
'''
# See "tagged unions" docs:
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# "The quickest way to enable tagged unions is to set tag=True when
# defining every struct type in the union. In this case tag_field
# defaults to "type", and tag defaults to the struct class name
# (e.g. "Get")."
enc = Encoder()
types_union = Union[tagged_structs[0]] | Any
for struct in tagged_structs[1:]:
types_union |= struct
dec = Decoder(types_union)
_lifo_codecs.append((enc, dec))
try:
print("YOYOYOOYOYOYOY")
yield enc, dec
finally:
print("NONONONONON")
_lifo_codecs.pop()
class Header(Struct, tag=True):
'''
A msg header which defines payload properties
'''
uid: str
msgtype: Optional[str] = None
class Msg(Struct, tag=True):
'''
The "god" msg type, a box for task level msg types.
'''
header: Header
payload: Raw
_root_dec = Decoder(Msg)
_root_enc = Encoder()
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
_subdecs: dict[
Optional[str],
Decoder] = {
None: Decoder(Any),
}
@cm
def enable_context(
msg_subtypes: list[list[Struct]]
) -> Decoder:
for types in msg_subtypes:
first = types[0]
# register using the default tag_field of "type"
# which seems to map to the class "name".
tags = [first.__name__]
# create a tagged union decoder for this type set
type_union = Union[first]
for typ in types[1:]:
type_union |= typ
tags.append(typ.__name__)
dec = Decoder(type_union)
# register all tags for this union sub-decoder
for tag in tags:
_subdecs[tag] = dec
try:
yield dec
finally:
for tag in tags:
_subdecs.pop(tag)
def decmsg(msg: Msg) -> Any:
msg = _root_dec.decode(msg)
tag_field = msg.header.msgtype
dec = _subdecs[tag_field]
return dec.decode(msg.payload)
def encmsg(
dialog_id: str | int,
payload: Any,
) -> Msg:
tag_field = None
plbytes = _root_enc.encode(payload)
if b'type' in plbytes:
assert isinstance(payload, Struct)
tag_field = type(payload).__name__
payload = Raw(plbytes)
msg = Msg(
Header(dialog_id, tag_field),
payload,
)
return _root_enc.encode(msg)

View File

@ -0,0 +1,269 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Prettified version of `msgspec.Struct` for easier console grokin.
'''
from __future__ import annotations
from collections import UserList
from typing import (
Any,
Iterator,
)
from msgspec import (
msgpack,
Struct as _Struct,
structs,
)
from pprint import (
saferepr,
)
# TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks?
# taken and modified from:
# https://stackoverflow.com/a/57110117
# import inspect
# from typing import List
# def my_function(input_1: str, input_2: int) -> list[int]:
# pass
# def types_of(func):
# specs = inspect.getfullargspec(func)
# return_type = specs.annotations['return']
# input_types = [t.__name__ for s, t in specs.annotations.items() if s != 'return']
# return f'{func.__name__}({": ".join(input_types)}) -> {return_type}'
# types_of(my_function)
class DiffDump(UserList):
'''
Very simple list delegator that repr() dumps (presumed) tuple
elements of the form `tuple[str, Any, Any]` in a nice
multi-line readable form for analyzing `Struct` diffs.
'''
def __repr__(self) -> str:
if not len(self):
return super().__repr__()
# format by displaying item pair's ``repr()`` on multiple,
# indented lines such that they are more easily visually
# comparable when printed to console when printed to
# console.
repstr: str = '[\n'
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
')\n'
)
repstr += ']\n'
return repstr
class Struct(
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
BUT, by default we pop all non-member (aka not defined as
struct fields) fields by default.
'''
asdict: dict = structs.asdict(self)
if include_non_members:
return asdict
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
return sin_props
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(
self,
update: dict | None = None,
) -> Struct:
'''
Validate-typecast all self defined fields, return a copy of
us with all such fields.
NOTE: This is kinda like the default behaviour in
`pydantic.BaseModel` except a copy of the object is
returned making it compat with `frozen=True`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# NOTE: roundtrip serialize to validate
# - enode to msgpack binary format,
# - decode that back to a struct.
return msgpack.Decoder(type=type(self)).decode(
msgpack.Encoder().encode(self)
)
def typecast(
self,
# TODO: allow only casting a named subset?
# fields: set[str] | None = None,
) -> None:
'''
Cast all fields using their declared type annotations
(kinda like what `pydantic` does by default).
NOTE: this of course won't work on frozen types, use
``.copy()`` above in such cases.
'''
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
fi: structs.FieldInfo
for fi in structs.fields(self):
setattr(
self,
fi.name,
fi.type(getattr(self, fi.name)),
)
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B)
'''
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(self):
attr_name: str = fi.name
ours: Any = getattr(self, attr_name)
theirs: Any = getattr(other, attr_name)
if ours != theirs:
diffs.append((
attr_name,
ours,
theirs,
))
return diffs

View File

@ -15,256 +15,378 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Extensions to built-in or (heavily used but 3rd party) friend-lib
types.
Define our strictly typed IPC message spec for the SCIPP:
that is,
the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol".
'''
from __future__ import annotations
from collections import UserList
from pprint import (
saferepr,
)
# from contextlib import contextmanager as cm
import types
from typing import (
Any,
Iterator,
Generic,
Literal,
Type,
TypeVar,
Union,
)
from msgspec import (
msgpack,
Struct as _Struct,
structs,
Raw,
Struct,
UNSET,
)
# TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks?
# taken and modified from:
# https://stackoverflow.com/a/57110117
# import inspect
# from typing import List
# def my_function(input_1: str, input_2: int) -> list[int]:
# pass
# TODO: can also remove yah?
#
# class Header(Struct, tag=True):
# '''
# A msg header which defines payload properties
# def types_of(func):
# specs = inspect.getfullargspec(func)
# return_type = specs.annotations['return']
# input_types = [t.__name__ for s, t in specs.annotations.items() if s != 'return']
# return f'{func.__name__}({": ".join(input_types)}) -> {return_type}'
# '''
# payload_tag: str|None = None
# types_of(my_function)
# type variable for the boxed payload field `.pld`
PayloadT = TypeVar('PayloadT')
class DiffDump(UserList):
'''
Very simple list delegator that repr() dumps (presumed) tuple
elements of the form `tuple[str, Any, Any]` in a nice
multi-line readable form for analyzing `Struct` diffs.
'''
def __repr__(self) -> str:
if not len(self):
return super().__repr__()
# format by displaying item pair's ``repr()`` on multiple,
# indented lines such that they are more easily visually
# comparable when printed to console when printed to
# console.
repstr: str = '[\n'
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
')\n'
)
repstr += ']\n'
return repstr
class Struct(
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
class Msg(
Struct,
Generic[PayloadT],
tag=True,
tag_field='msg_type',
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
The "god" boxing msg type.
Boxes user data-msgs in a `.pld` and uses `msgspec`'s tagged
unions support to enable a spec from a common msg inheritance
tree.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
# header: Header
# TODO: use UNSET here?
cid: str|None # call/context-id
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
# The msgs "payload" (spelled without vowels):
# https://en.wikipedia.org/wiki/Payload_(computing)
#
# NOTE: inherited from any `Msg` (and maybe overriden
# by use of `limit_msg_spec()`), but by default is
# parameterized to be `Any`.
#
# XXX this `Union` must strictly NOT contain `Any` if
# a limited msg-type-spec is intended, such that when
# creating and applying a new `MsgCodec` its
# `.decoder: Decoder` is configured with a `Union[Type[Struct]]` which
# restricts the allowed payload content (this `.pld` field)
# by type system defined loading constraints B)
#
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below.
pld: PayloadT
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
# TODO: better name, like `Call/TaskInput`?
class FuncSpec(Struct):
# TODO: can we combine these 2 into a `NamespacePath` field?
ns: str
func: str
BUT, by default we pop all non-member (aka not defined as
struct fields) fields by default.
kwargs: dict
uid: str # (calling) actor-id
'''
asdict: dict = structs.asdict(self)
if include_non_members:
return asdict
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
class Start(
Msg,
):
'''
Initial request to remotely schedule an RPC `trio.Task` via
`Actor.start_remote_task()`.
return sin_props
It is called by all the following public APIs:
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
- `ActorNursery.run_in_actor()`
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
- `Portal.run()`
`|_.run_from_ns()`
`|_.open_stream_from()`
`|_._submit_for_result()`
'''
# global whitespace indent
ws: str = ' '*indent
- `Context.open_context()`
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
'''
pld: FuncSpec
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
FuncType: Literal[
'asyncfunc',
'asyncgen',
'context', # TODO: the only one eventually?
] = 'context'
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
class IpcCtxSpec(Struct):
'''
An inter-actor-`trio.Task`-comms `Context` spec.
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
'''
functype: FuncType
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
# TODO: as part of the reponse we should report our allowed
# msg spec which should be generated from the type-annots as
# desired in # https://github.com/goodboy/tractor/issues/365
# When this does not match what the starter/caller side
# expects we of course raise a `TypeError` just like if
# a function had been called using an invalid signature.
#
# msgspec: MsgSpec
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
class StartAck(
Msg,
Generic[PayloadT],
):
'''
Init response to a `Cmd` request indicating the far
end's RPC callable "type".
'''
pld: IpcCtxSpec
class Started(
Msg,
Generic[PayloadT],
):
'''
Packet to shuttle the "first value" delivered by
`Context.started(value: Any)` from a `@tractor.context`
decorated IPC endpoint.
'''
# TODO: instead of using our existing `Start`
# for this (as we did with the original `{'cmd': ..}` style)
# class Cancel(Msg):
# cid: str
class Yield(
Msg,
Generic[PayloadT],
):
'''
Per IPC transmission of a value from `await MsgStream.send(<value>)`.
'''
class Stop(Msg):
'''
Stream termination signal much like an IPC version
of `StopAsyncIteration`.
'''
pld: UNSET
class Return(
Msg,
Generic[PayloadT],
):
'''
Final `return <value>` from a remotely scheduled
func-as-`trio.Task`.
'''
class ErrorData(Struct):
'''
Remote actor error meta-data as needed originally by
`RemoteActorError.msgdata: dict`.
'''
src_uid: str
src_type_str: str
boxed_type_str: str
relay_path: list[str]
tb_str: str
# `ContextCancelled`
canceller: str|None = None
# `StreamOverrun`
sender: str|None = None
class Error(Msg):
'''
A pkt that wraps `RemoteActorError`s for relay.
'''
pld: ErrorData
# TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`?
# class Cancelled(Msg):
# cid: str
# TODO what about overruns?
# class Overrun(Msg):
# cid: str
def mk_msg_spec(
payload_type: Union[Type] = Any,
boxing_msg_set: set[Msg] = {
Started,
Yield,
Return,
},
) -> tuple[
Union[Type[Msg]],
list[Type[Msg]],
]:
'''
Generate a payload-type-parameterized `Msg` specification such
that IPC msgs which can be `Msg.pld` (payload) type
limited/filterd are specified given an input `payload_type:
Union[Type]`.
'''
submsg_types: list[Type[Msg]] = Msg.__subclasses__()
# TODO: see below as well,
# => union building approach with `.__class_getitem__()`
# doesn't seem to work..?
#
# payload_type_spec: Union[Type[Msg]]
#
msg_types: list[Msg] = []
for msgtype in boxing_msg_set:
# check inheritance sanity
assert msgtype in submsg_types
# TODO: wait why do we need the dynamic version here?
# -[ ] paraming the `PayloadT` values via `Generic[T]`
# doesn't seem to work at all?
# -[ ] is there a way to get it to work at module level
# just using inheritance or maybe a metaclass?
#
# index_paramed_msg_type: Msg = msgtype[payload_type]
# TODO: WHY do we need to dynamically generate the
# subtype-msgs here to ensure the `.pld` parameterization
# propagates as well as works at all in terms of the
# `msgpack.Decoder()`..?
#
# dynamically create the payload type-spec-limited msg set.
manual_paramed_msg_subtype: Type = types.new_class(
msgtype.__name__,
(
# XXX NOTE XXX this seems to be THE ONLY
# way to get this to work correctly!?!
Msg[payload_type],
Generic[PayloadT],
),
{},
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# TODO: grok the diff here better..
# assert index_paramed_msg_type == manual_paramed_msg_subtype
# __str__ = __repr__ = pformat
__repr__ = pformat
# XXX TODO: why does the manual method work but not the
# `.__class_getitem__()` one!?!
paramed_msg_type = manual_paramed_msg_subtype
def copy(
self,
update: dict | None = None,
# payload_type_spec |= paramed_msg_type
msg_types.append(paramed_msg_type)
) -> Struct:
'''
Validate-typecast all self defined fields, return a copy of
us with all such fields.
NOTE: This is kinda like the default behaviour in
`pydantic.BaseModel` except a copy of the object is
returned making it compat with `frozen=True`.
payload_type_spec: Union[Type[Msg]] = Union[*msg_types]
return (
payload_type_spec,
msg_types,
)
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# NOTE: roundtrip serialize to validate
# - enode to msgpack binary format,
# - decode that back to a struct.
return msgpack.Decoder(type=type(self)).decode(
msgpack.Encoder().encode(self)
)
# TODO: integration with our ``enable_modules: list[str]`` caps sys.
#
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by inserting
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
# entering the ``Actor._process_messages()`` loop).
# https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
def typecast(
self,
# TODO: do we still want to try and support the sub-decoder with
# `Raw` technique in the case that the `Generic` approach gives
# future grief?
#
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
_payload_decs: dict[
str|None,
msgpack.Decoder,
] = {
# default decoder is used when `Header.payload_tag == None`
None: msgpack.Decoder(Any),
}
# TODO: allow only casting a named subset?
# fields: set[str] | None = None,
) -> None:
'''
Cast all fields using their declared type annotations
(kinda like what `pydantic` does by default).
def dec_payload(
msg: Msg,
msg_dec: msgpack.Decoder = msgpack.Decoder(
type=Msg[Any]
),
NOTE: this of course won't work on frozen types, use
``.copy()`` above in such cases.
) -> Any|Struct:
'''
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
fi: structs.FieldInfo
for fi in structs.fields(self):
setattr(
self,
fi.name,
fi.type(getattr(self, fi.name)),
)
msg: Msg = msg_dec.decode(msg)
payload_tag: str = msg.header.payload_tag
payload_dec: msgpack.Decoder = _payload_decs[payload_tag]
return payload_dec.decode(msg.pld)
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B)
def enc_payload(
enc: msgpack.Encoder,
payload: Any,
cid: str,
'''
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(self):
attr_name: str = fi.name
ours: Any = getattr(self, attr_name)
theirs: Any = getattr(other, attr_name)
if ours != theirs:
diffs.append((
attr_name,
ours,
theirs,
))
) -> bytes:
return diffs
# tag_field: str|None = None
plbytes = enc.encode(payload)
if b'msg_type' in plbytes:
assert isinstance(payload, Struct)
# tag_field: str = type(payload).__name__
payload = Raw(plbytes)
msg = Msg(
cid=cid,
pld=payload,
# Header(
# payload_tag=tag_field,
# # dialog_id,
# ),
)
return enc.encode(msg)