Init def of "SC shuttle prot" with "msg-spec-limiting"
As per the long outstanding GH issue this starts our rigorous journey into an attempt at a type-safe, cross-actor SC, IPC protocol Bo boop -> https://github.com/goodboy/tractor/issues/36 The idea is to "formally" define our SC "shuttle (dialog) protocol" by specifying a new `.msg.types.Msg` subtype-set which can fully encapsulate all IPC msg schemas needed in order to accomplish cross-process SC! The msg set deviated a little in terms of (type) names from the existing `dict`-msgs currently used in the runtime impl but, I think the name changes are much better in terms of explicitly representing the internal semantics of the actor runtime machinery/subsystems and the IPC-msg-dialog required for SC enforced RPC. ------ - ------ In cursory, the new formal msgs-spec includes the following msg-subtypes of a new top-level `Msg` boxing type (that holds the base field schema for all msgs): - `Start` to request RPC task scheduling by passing a `FuncSpec` payload (to replace the currently used `{'cmd': ... }` dict msg impl) - `StartAck` to allow the RPC task callee-side to report a `IpcCtxSpec` payload immediately back to the caller (currently responded naively via a `{'functype': ... }` msg) - `Started` to deliver the first value from `Context.started()` (instead of the existing `{'started': ... }`) - `Yield` to shuttle `MsgStream.send()`-ed values (instead of our `{'yield': ... }`) - `Stop` to terminate a `Context.open_stream()` session/block (over `{'stop': True }`) - `Return` to deliver the final value from the `Actor.start_remote_task()` (which is a `{'return': ... }`) - `Error` to box `RemoteActorError` exceptions via a `.pld: ErrorData` payload, planned to replace/extend the current `RemoteActorError.msgdata` mechanism internal to `._exceptions.pack/unpack_error()` The new `tractor.msg.types` includes all the above msg defs as well an API for rendering a "payload type specification" using a `payload_type_spec: Union[Type]` that can be passed to `msgspec.msgpack.Decoder(type=payload_type_spec)`. This ensures that (for a subset of the above msg set) `Msg.pld: PayloadT` data is type-parameterized using `msgspec`'s new `Generic[PayloadT]` field support and thus enables providing for an API where IPC `Context` dialogs can strictly define the allowed payload-datatype-set via type union! Iow, this is the foundation for supporting `Channel`/`Context`/`MsgStream` IPC primitives which are type checked/safe as desired in GH issue: - https://github.com/goodboy/tractor/issues/365 Misc notes on current impl(s) status: ------ - ------ - add a `.msg.types.mk_msg_spec()` which uses the new `msgspec` support for `class MyStruct[Struct, Generic[T]]` parameterize-able fields and delivers our boxing SC-msg-(sub)set with the desired `payload_types` applied to `.pld`: - https://jcristharif.com/msgspec/supported-types.html#generic-types - as a note this impl seems to need to use `type.new_class()` dynamic subtype generation, though i don't really get *why* still.. but without that the `msgspec.msgpack.Decoder` doesn't seem to reject `.pld` limited `Msg` subtypes as demonstrated in the new test. - around this ^ add a `.msg._codec.limit_msg_spec()` cm which exposes this payload type limiting API such that it can be applied per task via a `MsgCodec` in app code. - the orig approach in https://github.com/goodboy/tractor/pull/311 was the idea of making payload fields `.pld: Raw` wherein we could have per-field/sub-msg decoders dynamically loaded depending on the particular application-layer schema in use. I don't want to lose the idea of this since I think it might be useful for an idea I have about capability-based-fields(-sharing, maybe using field-subset encryption?), and as such i've kept the (ostensibly) working impls in TODO-comments in `.msg._codec` wherein maybe we can add a `MsgCodec._payload_decs: dict` table for this later on. |_ also left in the `.msg.types.enc/decmsg()` impls but renamed as `enc/dec_payload()` (but reworked to not rely on the lifo codec stack tables; now removed) such that we can prolly move them to `MsgCodec` methods in the future. - add an unused `._codec.mk_tagged_union_dec()` helper which was originally factored out the #311 proto-code but didn't end up working as desired with the new parameterized generic fields approach (now in `msg.types.mk_msg_spec()`) Testing/deps work: ------ - ------ - new `test_limit_msgspec()` which ensures all the `.types` content is correct but without using the wrapping APIs in `._codec`; i.e. using a in-line `Decoder` instead of a `MsgCodec`. - pin us to `msgspec>=0.18.5` which has the needed generic-types support (which took me way too long yester to figure out when implementing all this XD)!msg_codecs
							parent
							
								
									d55266f4a2
								
							
						
					
					
						commit
						995af130cf
					
				
							
								
								
									
										2
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										2
									
								
								setup.py
								
								
								
								
							|  | @ -60,7 +60,7 @@ setup( | |||
|         'wrapt', | ||||
| 
 | ||||
|         # IPC serialization | ||||
|         'msgspec', | ||||
|         'msgspec>=0.18.5', | ||||
| 
 | ||||
|         # debug mode REPL | ||||
|         'pdbp', | ||||
|  |  | |||
|  | @ -6,12 +6,22 @@ B~) | |||
| ''' | ||||
| from typing import ( | ||||
|     Any, | ||||
|     _GenericAlias, | ||||
|     Type, | ||||
|     Union, | ||||
| ) | ||||
| from contextvars import ( | ||||
|     Context, | ||||
| ) | ||||
| # from inspect import Parameter | ||||
| 
 | ||||
| from msgspec import ( | ||||
|     structs, | ||||
|     msgpack, | ||||
|     # defstruct, | ||||
|     Struct, | ||||
|     ValidationError, | ||||
| ) | ||||
| import tractor | ||||
| from tractor.msg import ( | ||||
|     _def_msgspec_codec, | ||||
|  | @ -23,6 +33,12 @@ from tractor.msg import ( | |||
|     apply_codec, | ||||
|     current_msgspec_codec, | ||||
| ) | ||||
| from tractor.msg.types import ( | ||||
|     PayloadT, | ||||
|     Msg, | ||||
|     # Started, | ||||
|     mk_msg_spec, | ||||
| ) | ||||
| import trio | ||||
| 
 | ||||
| # TODO: wrap these into `._codec` such that user can just pass | ||||
|  | @ -54,7 +70,7 @@ def mk_custom_codec() -> MsgCodec: | |||
|     # apply custom hooks and set a `Decoder` which only | ||||
|     # loads `NamespacePath` types. | ||||
|     nsp_codec: MsgCodec = mk_codec( | ||||
|         dec_types=NamespacePath, | ||||
|         ipc_msg_spec=NamespacePath, | ||||
|         enc_hook=enc_hook, | ||||
|         dec_hook=dec_hook, | ||||
|     ) | ||||
|  | @ -196,3 +212,166 @@ def test_codec_hooks_mod(): | |||
|             await p.cancel_actor() | ||||
| 
 | ||||
|     trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| def chk_pld_type( | ||||
|     generic: Msg|_GenericAlias, | ||||
|     payload_type: Type[Struct]|Any, | ||||
|     pld: Any, | ||||
| 
 | ||||
| ) -> bool: | ||||
| 
 | ||||
|     roundtrip: bool = False | ||||
|     pld_val_type: Type = type(pld) | ||||
| 
 | ||||
|     # gen_paramed: _GenericAlias = generic[payload_type] | ||||
|     # TODO: verify that the overridden subtypes | ||||
|     # DO NOT have modified type-annots from original! | ||||
|     # 'Start',  .pld: FuncSpec | ||||
|     # 'StartAck',  .pld: IpcCtxSpec | ||||
|     # 'Stop',  .pld: UNSEt | ||||
|     # 'Error',  .pld: ErrorData | ||||
|     # for typedef in ( | ||||
|     #     [gen_paramed] | ||||
|     #     + | ||||
| 
 | ||||
|     #     # type-var should always be set for these sub-types | ||||
|     #     # as well! | ||||
|     #     Msg.__subclasses__() | ||||
|     # ): | ||||
|     #     if typedef.__name__ not in [ | ||||
|     #         'Msg', | ||||
|     #         'Started', | ||||
|     #         'Yield', | ||||
|     #         'Return', | ||||
|     #     ]: | ||||
|     #         continue | ||||
|     # payload_type: Type[Struct] = CustomPayload | ||||
| 
 | ||||
|     # TODO: can remove all this right!? | ||||
|     # | ||||
|     # when parameterized (like `Msg[Any]`) then | ||||
|     # we expect an alias as input. | ||||
|     # if isinstance(generic, _GenericAlias): | ||||
|     #     assert payload_type in generic.__args__ | ||||
|     # else: | ||||
|         # assert PayloadType in generic.__parameters__ | ||||
|         # pld_param: Parameter = generic.__signature__.parameters['pld'] | ||||
|         # assert pld_param.annotation is PayloadType | ||||
| 
 | ||||
|     type_spec: Union[Type[Struct]] | ||||
|     msg_types: list[Msg[payload_type]] | ||||
|     ( | ||||
|         type_spec, | ||||
|         msg_types, | ||||
|     ) = mk_msg_spec( | ||||
|         payload_type=payload_type, | ||||
|     ) | ||||
|     enc = msgpack.Encoder() | ||||
|     dec = msgpack.Decoder( | ||||
|         type=type_spec,  # like `Msg[Any]` | ||||
|     ) | ||||
| 
 | ||||
|     # verify the boxed-type for all variable payload-type msgs. | ||||
|     for typedef in msg_types: | ||||
| 
 | ||||
|         pld_field = structs.fields(typedef)[1] | ||||
|         assert pld_field.type in {payload_type, PayloadT} | ||||
|         # TODO: does this need to work to get all subtypes to | ||||
|         # adhere? | ||||
|         assert pld_field.type is payload_type | ||||
| 
 | ||||
|         kwargs: dict[str, Any] = { | ||||
|             'cid': '666', | ||||
|             'pld': pld, | ||||
|         } | ||||
|         enc_msg = typedef(**kwargs) | ||||
| 
 | ||||
|         wire_bytes: bytes = enc.encode(enc_msg) | ||||
| 
 | ||||
|         try: | ||||
|             dec_msg = dec.decode(wire_bytes) | ||||
|             assert dec_msg.pld == pld | ||||
|             assert (roundtrip := (dec_msg == enc_msg)) | ||||
| 
 | ||||
|         except ValidationError as ve: | ||||
|             # breakpoint() | ||||
|             if pld_val_type is payload_type: | ||||
|                 raise ValueError( | ||||
|                    'Got `ValidationError` despite type-var match!?\n' | ||||
|                     f'pld_val_type: {pld_val_type}\n' | ||||
|                     f'payload_type: {payload_type}\n' | ||||
|                 ) from ve | ||||
| 
 | ||||
|             else: | ||||
|                 # ow we good cuz the pld spec mismatched. | ||||
|                 print( | ||||
|                     'Got expected `ValidationError` since,\n' | ||||
|                     f'{pld_val_type} is not {payload_type}\n' | ||||
|                 ) | ||||
|         else: | ||||
|             if ( | ||||
|                 pld_val_type is not payload_type | ||||
|                 and payload_type is not Any | ||||
|             ): | ||||
|                 raise ValueError( | ||||
|                    'DID NOT `ValidationError` despite expected type match!?\n' | ||||
|                     f'pld_val_type: {pld_val_type}\n' | ||||
|                     f'payload_type: {payload_type}\n' | ||||
|                 ) | ||||
| 
 | ||||
|     return roundtrip | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def test_limit_msgspec(): | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with tractor.open_root_actor( | ||||
|             debug_mode=True | ||||
|         ): | ||||
| 
 | ||||
|             # ensure we can round-trip a boxing `Msg` | ||||
|             assert chk_pld_type( | ||||
|                 Msg, | ||||
|                 Any, | ||||
|                 None, | ||||
|             ) | ||||
| 
 | ||||
|             # TODO: don't need this any more right since | ||||
|             # `msgspec>=0.15` has the nice generics stuff yah?? | ||||
|             # | ||||
|             # manually override the type annot of the payload | ||||
|             # field and ensure it propagates to all msg-subtypes. | ||||
|             # Msg.__annotations__['pld'] = Any | ||||
| 
 | ||||
|             # verify that a mis-typed payload value won't decode | ||||
|             assert not chk_pld_type( | ||||
|                 Msg, | ||||
|                 int, | ||||
|                 pld='doggy', | ||||
|             ) | ||||
| 
 | ||||
|             # parametrize the boxed `.pld` type as a custom-struct | ||||
|             # and ensure that parametrization propagates | ||||
|             # to all payload-msg-spec-able subtypes! | ||||
|             class CustomPayload(Struct): | ||||
|                 name: str | ||||
|                 value: Any | ||||
| 
 | ||||
|             assert not chk_pld_type( | ||||
|                 Msg, | ||||
|                 CustomPayload, | ||||
|                 pld='doggy', | ||||
|             ) | ||||
| 
 | ||||
|             assert chk_pld_type( | ||||
|                 Msg, | ||||
|                 CustomPayload, | ||||
|                 pld=CustomPayload(name='doggy', value='urmom') | ||||
|             ) | ||||
| 
 | ||||
|             # uhh bc we can `.pause_from_sync()` now! :surfer: | ||||
|             # breakpoint() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  |  | |||
|  | @ -47,20 +47,25 @@ from types import ModuleType | |||
| import msgspec | ||||
| from msgspec import msgpack | ||||
| 
 | ||||
| from .pretty_struct import Struct | ||||
| from tractor.msg.pretty_struct import Struct | ||||
| from tractor.msg.types import ( | ||||
|     mk_msg_spec, | ||||
|     Msg, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: API changes towards being interchange lib agnostic! | ||||
| # | ||||
| # -[ ] capnproto has pre-compiled schema for eg.. | ||||
| #  * https://capnproto.org/language.html | ||||
| #  * http://capnproto.github.io/pycapnp/quickstart.html | ||||
| #   * https://github.com/capnproto/pycapnp/blob/master/examples/addressbook.capnp | ||||
| # | ||||
| class MsgCodec(Struct): | ||||
|     ''' | ||||
|     A IPC msg interchange format lib's encoder + decoder pair. | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
|     lib: ModuleType = msgspec | ||||
| 
 | ||||
|     # ad-hoc type extensions | ||||
|  | @ -70,12 +75,22 @@ class MsgCodec(Struct): | |||
| 
 | ||||
|     # struct type unions | ||||
|     # https://jcristharif.com/msgspec/structs.html#tagged-unions | ||||
|     types: Union[Type[Struct]]|Any = Any | ||||
|     ipc_msg_spec: Union[Type[Struct]]|Any = Any | ||||
|     payload_msg_spec: Union[Type[Struct]] = Any | ||||
| 
 | ||||
|     # post-configure cached props | ||||
|     _enc: msgpack.Encoder|None = None | ||||
|     _dec: msgpack.Decoder|None = None | ||||
| 
 | ||||
|     # TODO: a sub-decoder system as well? | ||||
|     # see related comments in `.msg.types` | ||||
|     # _payload_decs: ( | ||||
|     #     dict[ | ||||
|     #         str, | ||||
|     #         msgpack.Decoder, | ||||
|     #     ] | ||||
|     #     |None | ||||
|     # ) = None | ||||
| 
 | ||||
|     # TODO: use `functools.cached_property` for these ? | ||||
|     # https://docs.python.org/3/library/functools.html#functools.cached_property | ||||
|  | @ -88,8 +103,9 @@ class MsgCodec(Struct): | |||
|         enc_hook: Callable|None = None, | ||||
|         reset: bool = False, | ||||
| 
 | ||||
|         # TODO: what's the default for this? | ||||
|         # TODO: what's the default for this, and do we care? | ||||
|         # write_buffer_size: int | ||||
|         # | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> msgpack.Encoder: | ||||
|  | @ -131,7 +147,7 @@ class MsgCodec(Struct): | |||
| 
 | ||||
|     def decoder( | ||||
|         self, | ||||
|         types: Union[Type[Struct]]|None = None, | ||||
|         ipc_msg_spec: Union[Type[Struct]]|None = None, | ||||
|         dec_hook: Callable|None = None, | ||||
|         reset: bool = False, | ||||
|         **kwargs, | ||||
|  | @ -152,7 +168,7 @@ class MsgCodec(Struct): | |||
|             or reset | ||||
|         ): | ||||
|             self._dec = self.lib.msgpack.Decoder( | ||||
|                 types or self.types, | ||||
|                 type=ipc_msg_spec or self.ipc_msg_spec, | ||||
|                 dec_hook=dec_hook or self.dec_hook, | ||||
|                 **kwargs, | ||||
|             ) | ||||
|  | @ -169,10 +185,39 @@ class MsgCodec(Struct): | |||
|         determined by the  | ||||
| 
 | ||||
|         ''' | ||||
| 
 | ||||
|         return self.dec.decode(msg) | ||||
| 
 | ||||
| 
 | ||||
| def mk_tagged_union_dec( | ||||
|     tagged_structs: list[Struct], | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|     list[str], | ||||
|     msgpack.Decoder, | ||||
| ]: | ||||
|     # See "tagged unions" docs: | ||||
|     # https://jcristharif.com/msgspec/structs.html#tagged-unions | ||||
| 
 | ||||
|     # "The quickest way to enable tagged unions is to set tag=True when | ||||
|     # defining every struct type in the union. In this case tag_field | ||||
|     # defaults to "type", and tag defaults to the struct class name | ||||
|     # (e.g. "Get")." | ||||
|     first: Struct = tagged_structs[0] | ||||
|     types_union: Union[Type[Struct]] = Union[ | ||||
|        first | ||||
|     ]|Any | ||||
|     tags: list[str] = [first.__name__] | ||||
| 
 | ||||
|     for struct in tagged_structs[1:]: | ||||
|         types_union |= struct | ||||
|         tags.append(struct.__name__) | ||||
| 
 | ||||
|     dec = msgpack.Decoder(types_union) | ||||
|     return ( | ||||
|         tags, | ||||
|         dec, | ||||
|     ) | ||||
| 
 | ||||
| # TODO: struct aware messaging coders as per: | ||||
| # - https://github.com/goodboy/tractor/issues/36 | ||||
| # - https://github.com/goodboy/tractor/issues/196 | ||||
|  | @ -181,13 +226,18 @@ class MsgCodec(Struct): | |||
| def mk_codec( | ||||
|     libname: str = 'msgspec', | ||||
| 
 | ||||
|     # for codec-ing boxed `Msg`-with-payload msgs | ||||
|     payload_types: Union[Type[Struct]]|None = None, | ||||
| 
 | ||||
|     # TODO: do we want to allow NOT/using a diff `Msg`-set? | ||||
|     # | ||||
|     # struct type unions set for `Decoder` | ||||
|     # https://jcristharif.com/msgspec/structs.html#tagged-unions | ||||
|     dec_types: Union[Type[Struct]]|Any = Any, | ||||
|     ipc_msg_spec: Union[Type[Struct]]|Any = Any, | ||||
| 
 | ||||
|     cache_now: bool = True, | ||||
| 
 | ||||
|     # proxy to the `Struct.__init__()` | ||||
|     # proxy as `Struct(**kwargs)` | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> MsgCodec: | ||||
|  | @ -197,14 +247,59 @@ def mk_codec( | |||
|     `msgspec` ;). | ||||
| 
 | ||||
|     ''' | ||||
|     # (manually) generate a msg-payload-spec for all relevant | ||||
|     # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` | ||||
|     # for the decoder such that all sub-type msgs in our SCIPP | ||||
|     # will automatically decode to a type-"limited" payload (`Struct`) | ||||
|     # object (set). | ||||
|     payload_type_spec: Union[Type[Msg]]|None = None | ||||
|     if payload_types: | ||||
|         ( | ||||
|             payload_type_spec, | ||||
|             msg_types, | ||||
|         ) = mk_msg_spec( | ||||
|             payload_type=payload_types, | ||||
|         ) | ||||
|         assert len(payload_type_spec.__args__) == len(msg_types) | ||||
| 
 | ||||
|         # TODO: sub-decode `.pld: Raw`? | ||||
|         # see similar notes inside `.msg.types`.. | ||||
|         # | ||||
|         # not sure we'll end up wanting/needing this | ||||
|         # though it might have unforeseen advantages in terms | ||||
|         # of enabling encrypted appliciation layer (only) | ||||
|         # payloads? | ||||
|         # | ||||
|         # register sub-payload decoders to load `.pld: Raw` | ||||
|         # decoded `Msg`-packets using a dynamic lookup (table) | ||||
|         # instead of a pre-defined msg-spec via `Generic` | ||||
|         # parameterization. | ||||
|         # | ||||
|         # ( | ||||
|         #     tags, | ||||
|         #     payload_dec, | ||||
|         # ) = mk_tagged_union_dec( | ||||
|         #     tagged_structs=list(payload_types.__args__), | ||||
|         # ) | ||||
|         # _payload_decs: ( | ||||
|         #     dict[str, msgpack.Decoder]|None | ||||
|         # ) = { | ||||
|         #     # pre-seed decoders for std-py-type-set for use when | ||||
|         #     # `Msg.pld == None|Any`. | ||||
|         #     None: msgpack.Decoder(Any), | ||||
|         #     Any: msgpack.Decoder(Any), | ||||
|         # } | ||||
|         # for name in tags: | ||||
|         #     _payload_decs[name] = payload_dec | ||||
| 
 | ||||
|     codec = MsgCodec( | ||||
|         types=dec_types, | ||||
|         ipc_msg_spec=ipc_msg_spec, | ||||
|         payload_msg_spec=payload_type_spec, | ||||
|         **kwargs, | ||||
|     ) | ||||
|     assert codec.lib.__name__ == libname | ||||
| 
 | ||||
|     # by default config and cache the codec pair for given | ||||
|     # input settings. | ||||
|     # by default, config-n-cache the codec pair from input settings. | ||||
|     if cache_now: | ||||
|         assert codec.enc | ||||
|         assert codec.dec | ||||
|  | @ -251,3 +346,28 @@ def current_msgspec_codec() -> MsgCodec: | |||
| 
 | ||||
|     ''' | ||||
|     return _ctxvar_MsgCodec.get() | ||||
| 
 | ||||
| 
 | ||||
| @cm | ||||
| def limit_msg_spec( | ||||
|     payload_types: Union[Type[Struct]], | ||||
| 
 | ||||
|     # TODO: don't need this approach right? | ||||
|     # | ||||
|     # tagged_structs: list[Struct]|None = None, | ||||
| 
 | ||||
|     **codec_kwargs, | ||||
| ): | ||||
|     ''' | ||||
|     Apply a `MsgCodec` that will natively decode the SC-msg set's | ||||
|     `Msg.pld: Union[Type[Struct]]` payload fields using | ||||
|     tagged-unions of `msgspec.Struct`s from the `payload_types` | ||||
|     for all IPC contexts in use by the current `trio.Task`. | ||||
| 
 | ||||
|     ''' | ||||
|     msgspec_codec: MsgCodec = mk_codec( | ||||
|         payload_types=payload_types, | ||||
|         **codec_kwargs, | ||||
|     ) | ||||
|     with apply_codec(msgspec_codec): | ||||
|         yield msgspec_codec | ||||
|  |  | |||
|  | @ -15,23 +15,315 @@ | |||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| Extensions to built-in or (heavily used but 3rd party) friend-lib | ||||
| types. | ||||
| Define our strictly typed IPC message spec for the SCIPP: | ||||
| 
 | ||||
| that is, | ||||
| 
 | ||||
| the "Structurred-Concurrency-Inter-Process-(dialog)-(un)Protocol". | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
| from __future__ import annotations | ||||
| from contextlib import contextmanager as cm | ||||
| # from contextlib import contextmanager as cm | ||||
| import types | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Generic, | ||||
|     Literal, | ||||
|     Type, | ||||
|     TypeVar, | ||||
|     Union, | ||||
| ) | ||||
| 
 | ||||
| from msgspec import ( | ||||
|     msgpack, | ||||
|     Raw, | ||||
|     Struct as _Struct, | ||||
|     Struct, | ||||
|     UNSET, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: can also remove yah? | ||||
| # | ||||
| # class Header(Struct, tag=True): | ||||
| #     ''' | ||||
| #     A msg header which defines payload properties | ||||
| 
 | ||||
| #     ''' | ||||
| #     payload_tag: str|None = None | ||||
| 
 | ||||
| # type variable for the boxed payload field `.pld` | ||||
| PayloadT = TypeVar('PayloadT') | ||||
| 
 | ||||
| 
 | ||||
| class Msg( | ||||
|     Struct, | ||||
|     Generic[PayloadT], | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| ): | ||||
|     ''' | ||||
|     The "god" boxing msg type. | ||||
| 
 | ||||
|     Boxes user data-msgs in a `.pld` and uses `msgspec`'s tagged | ||||
|     unions support to enable a spec from a common msg inheritance | ||||
|     tree. | ||||
| 
 | ||||
|     ''' | ||||
|     # header: Header | ||||
|     # TODO: use UNSET here? | ||||
|     cid: str|None  # call/context-id | ||||
| 
 | ||||
|     # The msgs "payload" (spelled without vowels): | ||||
|     # https://en.wikipedia.org/wiki/Payload_(computing) | ||||
|     # | ||||
|     # NOTE: inherited from any `Msg` (and maybe overriden | ||||
|     # by use of `limit_msg_spec()`), but by default is | ||||
|     # parameterized to be `Any`. | ||||
|     # | ||||
|     # XXX this `Union` must strictly NOT contain `Any` if | ||||
|     # a limited msg-type-spec is intended, such that when | ||||
|     # creating and applying a new `MsgCodec` its  | ||||
|     # `.decoder: Decoder` is configured with a `Union[Type[Struct]]` which | ||||
|     # restricts the allowed payload content (this `.pld` field)  | ||||
|     # by type system defined loading constraints B) | ||||
|     # | ||||
|     # TODO: could also be set to `msgspec.Raw` if the sub-decoders | ||||
|     # approach is preferred over the generic parameterization  | ||||
|     # approach as take by `mk_msg_spec()` below. | ||||
|     pld: PayloadT | ||||
| 
 | ||||
| 
 | ||||
| # TODO: better name, like `Call/TaskInput`? | ||||
| class FuncSpec(Struct): | ||||
|     # TODO: can we combine these 2 into a `NamespacePath` field? | ||||
|     ns: str | ||||
|     func: str | ||||
| 
 | ||||
|     kwargs: dict | ||||
|     uid: str  # (calling) actor-id | ||||
| 
 | ||||
| 
 | ||||
| class Start( | ||||
|     Msg, | ||||
| ): | ||||
|     ''' | ||||
|     Initial request to remotely schedule an RPC `trio.Task` via | ||||
|     `Actor.start_remote_task()`. | ||||
| 
 | ||||
|     It is called by all the following public APIs: | ||||
| 
 | ||||
|     - `ActorNursery.run_in_actor()` | ||||
| 
 | ||||
|     - `Portal.run()` | ||||
|           `|_.run_from_ns()` | ||||
|           `|_.open_stream_from()` | ||||
|           `|_._submit_for_result()` | ||||
| 
 | ||||
|     - `Context.open_context()` | ||||
| 
 | ||||
|     ''' | ||||
|     pld: FuncSpec | ||||
| 
 | ||||
| 
 | ||||
| FuncType: Literal[ | ||||
|     'asyncfunc', | ||||
|     'asyncgen', | ||||
|     'context',  # TODO: the only one eventually? | ||||
| ] = 'context' | ||||
| 
 | ||||
| 
 | ||||
| class IpcCtxSpec(Struct): | ||||
|     ''' | ||||
|     An inter-actor-`trio.Task`-comms `Context` spec. | ||||
| 
 | ||||
|     ''' | ||||
|     functype: FuncType | ||||
| 
 | ||||
|     # TODO: as part of the reponse we should report our allowed | ||||
|     # msg spec which should be generated from the type-annots as | ||||
|     # desired in # https://github.com/goodboy/tractor/issues/365 | ||||
|     # When this does not match what the starter/caller side | ||||
|     # expects we of course raise a `TypeError` just like if | ||||
|     # a function had been called using an invalid signature. | ||||
|     # | ||||
|     # msgspec: MsgSpec | ||||
| 
 | ||||
| 
 | ||||
| class StartAck( | ||||
|     Msg, | ||||
|     Generic[PayloadT], | ||||
| ): | ||||
|     ''' | ||||
|     Init response to a `Cmd` request indicating the far | ||||
|     end's RPC callable "type". | ||||
| 
 | ||||
|     ''' | ||||
|     pld: IpcCtxSpec | ||||
| 
 | ||||
| 
 | ||||
| class Started( | ||||
|     Msg, | ||||
|     Generic[PayloadT], | ||||
| ): | ||||
|     ''' | ||||
|     Packet to shuttle the "first value" delivered by | ||||
|     `Context.started(value: Any)` from a `@tractor.context` | ||||
|     decorated IPC endpoint. | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| # TODO: instead of using our existing `Start` | ||||
| # for this (as we did with the original `{'cmd': ..}` style) | ||||
| # class Cancel(Msg): | ||||
| #     cid: str | ||||
| 
 | ||||
| 
 | ||||
| class Yield( | ||||
|     Msg, | ||||
|     Generic[PayloadT], | ||||
| ): | ||||
|     ''' | ||||
|     Per IPC transmission of a value from `await MsgStream.send(<value>)`. | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| class Stop(Msg): | ||||
|     ''' | ||||
|     Stream termination signal much like an IPC version  | ||||
|     of `StopAsyncIteration`. | ||||
| 
 | ||||
|     ''' | ||||
|     pld: UNSET | ||||
| 
 | ||||
| 
 | ||||
| class Return( | ||||
|     Msg, | ||||
|     Generic[PayloadT], | ||||
| ): | ||||
|     ''' | ||||
|     Final `return <value>` from a remotely scheduled | ||||
|     func-as-`trio.Task`. | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| class ErrorData(Struct): | ||||
|     ''' | ||||
|     Remote actor error meta-data as needed originally by | ||||
|     `RemoteActorError.msgdata: dict`. | ||||
| 
 | ||||
|     ''' | ||||
|     src_uid: str | ||||
|     src_type_str: str | ||||
|     boxed_type_str: str | ||||
| 
 | ||||
|     relay_path: list[str] | ||||
|     tb_str: str | ||||
| 
 | ||||
|     # `ContextCancelled` | ||||
|     canceller: str|None = None | ||||
| 
 | ||||
|     # `StreamOverrun` | ||||
|     sender: str|None = None | ||||
| 
 | ||||
| 
 | ||||
| class Error(Msg): | ||||
|     ''' | ||||
|     A pkt that wraps `RemoteActorError`s for relay. | ||||
| 
 | ||||
|     ''' | ||||
|     pld: ErrorData | ||||
| 
 | ||||
| 
 | ||||
| # TODO: should be make a msg version of `ContextCancelled?` | ||||
| # and/or with a scope field or a full `ActorCancelled`? | ||||
| # class Cancelled(Msg): | ||||
| #     cid: str | ||||
| 
 | ||||
| # TODO what about overruns? | ||||
| # class Overrun(Msg): | ||||
| #     cid: str | ||||
| 
 | ||||
| 
 | ||||
| def mk_msg_spec( | ||||
|     payload_type: Union[Type] = Any, | ||||
|     boxing_msg_set: set[Msg] = { | ||||
|         Started, | ||||
|         Yield, | ||||
|         Return, | ||||
|     }, | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|     Union[Type[Msg]], | ||||
|     list[Type[Msg]], | ||||
| ]: | ||||
|     ''' | ||||
|     Generate a payload-type-parameterized `Msg` specification such | ||||
|     that IPC msgs which can be `Msg.pld` (payload) type | ||||
|     limited/filterd are specified given an input `payload_type: | ||||
|     Union[Type]`. | ||||
| 
 | ||||
|     ''' | ||||
|     submsg_types: list[Type[Msg]] = Msg.__subclasses__() | ||||
| 
 | ||||
|     # TODO: see below as well, | ||||
|     # => union building approach with `.__class_getitem__()` | ||||
|     # doesn't seem to work..? | ||||
|     # | ||||
|     # payload_type_spec: Union[Type[Msg]] | ||||
|     # | ||||
|     msg_types: list[Msg] = [] | ||||
|     for msgtype in boxing_msg_set: | ||||
| 
 | ||||
|         # check inheritance sanity | ||||
|         assert msgtype in submsg_types | ||||
| 
 | ||||
|         # TODO: wait why do we need the dynamic version here? | ||||
|         # -[ ] paraming the `PayloadT` values via `Generic[T]` | ||||
|         #   doesn't seem to work at all? | ||||
|         # -[ ] is there a way to get it to work at module level | ||||
|         #   just using inheritance or maybe a metaclass? | ||||
|         # | ||||
|         # index_paramed_msg_type: Msg = msgtype[payload_type] | ||||
| 
 | ||||
|         # TODO: WHY do we need to dynamically generate the | ||||
|         # subtype-msgs here to ensure the `.pld` parameterization | ||||
|         # propagates as well as works at all in terms of the | ||||
|         # `msgpack.Decoder()`..? | ||||
|         # | ||||
|         # dynamically create the payload type-spec-limited msg set. | ||||
|         manual_paramed_msg_subtype: Type = types.new_class( | ||||
|             msgtype.__name__, | ||||
|             ( | ||||
|                 # XXX NOTE XXX this seems to be THE ONLY | ||||
|                 # way to get this to work correctly!?! | ||||
|                 Msg[payload_type], | ||||
|                 Generic[PayloadT], | ||||
|             ), | ||||
|             {}, | ||||
|         ) | ||||
| 
 | ||||
|         # TODO: grok the diff here better.. | ||||
|         # assert index_paramed_msg_type == manual_paramed_msg_subtype | ||||
| 
 | ||||
|         # XXX TODO: why does the manual method work but not the | ||||
|         # `.__class_getitem__()` one!?! | ||||
|         paramed_msg_type = manual_paramed_msg_subtype | ||||
| 
 | ||||
|         # payload_type_spec |= paramed_msg_type | ||||
|         msg_types.append(paramed_msg_type) | ||||
| 
 | ||||
| 
 | ||||
|     payload_type_spec: Union[Type[Msg]] = Union[*msg_types] | ||||
|     return ( | ||||
|         payload_type_spec, | ||||
|         msg_types, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: integration with our ``enable_modules: list[str]`` caps sys. | ||||
| # | ||||
| # ``pkgutil.resolve_name()`` internally uses | ||||
|  | @ -43,160 +335,58 @@ from msgspec import ( | |||
| #   - https://stackoverflow.com/a/63320902 | ||||
| #   - https://docs.python.org/3/library/sys.html#sys.meta_path | ||||
| 
 | ||||
| # the new "Implicit Namespace Packages" might be relevant? | ||||
| # - https://www.python.org/dev/peps/pep-0420/ | ||||
| 
 | ||||
| # add implicit serialized message type support so that paths can be | ||||
| # handed directly to IPC primitives such as streams and `Portal.run()` | ||||
| # calls: | ||||
| # - via ``msgspec``: | ||||
| #   - https://jcristharif.com/msgspec/api.html#struct | ||||
| #   - https://jcristharif.com/msgspec/extending.html | ||||
| # via ``msgpack-python``: | ||||
| # https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type | ||||
| # LIFO codec stack that is appended when the user opens the | ||||
| # ``configure_native_msgs()`` cm below to configure a new codec set | ||||
| # which will be applied to all new (msgspec relevant) IPC transports | ||||
| # that are spawned **after** the configure call is made. | ||||
| _lifo_codecs: list[ | ||||
|     tuple[ | ||||
|         msgpack.Encoder, | ||||
|         msgpack.Decoder, | ||||
|     ], | ||||
| ] = [(msgpack.Encoder(), msgpack.Decoder())] | ||||
| 
 | ||||
| 
 | ||||
| def get_msg_codecs() -> tuple[ | ||||
|     msgpack.Encoder, | ||||
|     msgpack.Decoder, | ||||
| ]: | ||||
|     ''' | ||||
|     Return the currently configured ``msgspec`` codec set. | ||||
| 
 | ||||
|     The defaults are defined above. | ||||
| 
 | ||||
|     ''' | ||||
|     global _lifo_codecs | ||||
|     return _lifo_codecs[-1] | ||||
| 
 | ||||
| 
 | ||||
| @cm | ||||
| def configure_native_msgs( | ||||
|     tagged_structs: list[_Struct], | ||||
| ): | ||||
|     ''' | ||||
|     Push a codec set that will natively decode | ||||
|     tagged structs provied in ``tagged_structs`` | ||||
|     in all IPC transports and pop the codec on exit. | ||||
| 
 | ||||
|     ''' | ||||
|     # See "tagged unions" docs: | ||||
|     # https://jcristharif.com/msgspec/structs.html#tagged-unions | ||||
| 
 | ||||
|     # "The quickest way to enable tagged unions is to set tag=True when | ||||
|     # defining every struct type in the union. In this case tag_field | ||||
|     # defaults to "type", and tag defaults to the struct class name | ||||
|     # (e.g. "Get")." | ||||
|     enc = msgpack.Encoder() | ||||
| 
 | ||||
|     types_union = Union[tagged_structs[0]] | Any | ||||
|     for struct in tagged_structs[1:]: | ||||
|         types_union |= struct | ||||
| 
 | ||||
|     dec = msgpack.Decoder(types_union) | ||||
| 
 | ||||
|     _lifo_codecs.append((enc, dec)) | ||||
|     try: | ||||
|         print("YOYOYOOYOYOYOY") | ||||
|         yield enc, dec | ||||
|     finally: | ||||
|         print("NONONONONON") | ||||
|         _lifo_codecs.pop() | ||||
| 
 | ||||
| 
 | ||||
| class Header(_Struct, tag=True): | ||||
|     ''' | ||||
|     A msg header which defines payload properties | ||||
| 
 | ||||
|     ''' | ||||
|     uid: str | ||||
|     msgtype: str|None = None | ||||
| 
 | ||||
| 
 | ||||
| class Msg(_Struct, tag=True): | ||||
|     ''' | ||||
|     The "god" msg type, a box for task level msg types. | ||||
| 
 | ||||
|     ''' | ||||
|     header: Header | ||||
|     payload: Raw | ||||
| 
 | ||||
| 
 | ||||
| _root_dec = msgpack.Decoder(Msg) | ||||
| _root_enc = msgpack.Encoder() | ||||
| 
 | ||||
| # TODO: do we still want to try and support the sub-decoder with | ||||
| # `Raw` technique in the case that the `Generic` approach gives | ||||
| # future grief? | ||||
| # | ||||
| # sub-decoders for retreiving embedded | ||||
| # payload data and decoding to a sender | ||||
| # side defined (struct) type. | ||||
| _subdecs:  dict[ | ||||
| _payload_decs:  dict[ | ||||
|     str|None, | ||||
|     msgpack.Decoder] = { | ||||
|     msgpack.Decoder, | ||||
| ] = { | ||||
|     # default decoder is used when `Header.payload_tag == None` | ||||
|     None: msgpack.Decoder(Any), | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| @cm | ||||
| def enable_context( | ||||
|     msg_subtypes: list[list[_Struct]] | ||||
| ) -> msgpack.Decoder: | ||||
| def dec_payload( | ||||
|     msg: Msg, | ||||
|     msg_dec: msgpack.Decoder = msgpack.Decoder( | ||||
|         type=Msg[Any] | ||||
|     ), | ||||
| 
 | ||||
|     for types in msg_subtypes: | ||||
|         first = types[0] | ||||
| ) -> Any|Struct: | ||||
| 
 | ||||
|         # register using the default tag_field of "type" | ||||
|         # which seems to map to the class "name". | ||||
|         tags = [first.__name__] | ||||
| 
 | ||||
|         # create a tagged union decoder for this type set | ||||
|         type_union = Union[first] | ||||
|         for typ in types[1:]: | ||||
|             type_union |= typ | ||||
|             tags.append(typ.__name__) | ||||
| 
 | ||||
|         dec = msgpack.Decoder(type_union) | ||||
| 
 | ||||
|         # register all tags for this union sub-decoder | ||||
|         for tag in tags: | ||||
|             _subdecs[tag] = dec | ||||
|         try: | ||||
|             yield dec | ||||
|         finally: | ||||
|             for tag in tags: | ||||
|                 _subdecs.pop(tag) | ||||
|     msg: Msg = msg_dec.decode(msg) | ||||
|     payload_tag: str = msg.header.payload_tag | ||||
|     payload_dec: msgpack.Decoder = _payload_decs[payload_tag] | ||||
|     return payload_dec.decode(msg.pld) | ||||
| 
 | ||||
| 
 | ||||
| def decmsg(msg: Msg) -> Any: | ||||
|     msg = _root_dec.decode(msg) | ||||
|     tag_field = msg.header.msgtype | ||||
|     dec = _subdecs[tag_field] | ||||
|     return dec.decode(msg.payload) | ||||
| 
 | ||||
| 
 | ||||
| def encmsg( | ||||
|     dialog_id: str | int, | ||||
| def enc_payload( | ||||
|     enc: msgpack.Encoder, | ||||
|     payload: Any, | ||||
| ) -> Msg: | ||||
|     cid: str, | ||||
| 
 | ||||
|     tag_field = None | ||||
| ) -> bytes: | ||||
| 
 | ||||
|     plbytes = _root_enc.encode(payload) | ||||
|     if b'type' in plbytes: | ||||
|         assert isinstance(payload, _Struct) | ||||
|         tag_field = type(payload).__name__ | ||||
|     # tag_field: str|None = None | ||||
| 
 | ||||
|     plbytes = enc.encode(payload) | ||||
|     if b'msg_type' in plbytes: | ||||
|         assert isinstance(payload, Struct) | ||||
| 
 | ||||
|         # tag_field: str = type(payload).__name__ | ||||
|         payload = Raw(plbytes) | ||||
| 
 | ||||
|     msg = Msg( | ||||
|         Header(dialog_id, tag_field), | ||||
|         payload, | ||||
|         cid=cid, | ||||
|         pld=payload, | ||||
|         # Header( | ||||
|         #     payload_tag=tag_field, | ||||
|         #     # dialog_id, | ||||
|         # ), | ||||
|     ) | ||||
|     return _root_enc.encode(msg) | ||||
|     return enc.encode(msg) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue