From 77ddc073e8086ef2f7b91308e3b6555df6276e8a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Feb 2022 12:47:50 -0500 Subject: [PATCH 1/9] Use lists by default like `msgspec` --- tractor/_ipc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index af33338..cf4c392 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -96,7 +96,8 @@ class MsgTransport(Protocol[MsgType]): class MsgpackTCPStream: - '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + ''' + A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. ''' @@ -124,7 +125,6 @@ class MsgpackTCPStream: """ unpacker = msgpack.Unpacker( raw=False, - use_list=False, strict_map_key=False ) while True: From 17bfa120ccb17c5640c3601d054d75eb3b0aaa8e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Feb 2022 14:05:32 -0500 Subject: [PATCH 2/9] Port to msgpec `0.4.0` imports --- tractor/_ipc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index cf4c392..9e0ced1 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -222,8 +222,8 @@ class MsgspecTCPStream(MsgpackTCPStream): self.prefix_size = prefix_size # TODO: struct aware messaging coders - self.encode = msgspec.Encoder().encode - self.decode = msgspec.Decoder().decode # dict[str, Any]) + self.encode = msgspec.msgpack.Encoder().encode + self.decode = msgspec.msgpack.Decoder().decode # dict[str, Any]) async def _iter_packets(self) -> AsyncGenerator[dict, None]: '''Yield packets from the underlying stream. From 927decc88da689440ab76d417491e9980e0a5a3d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Feb 2022 14:14:05 -0500 Subject: [PATCH 3/9] Pin to latest `msgspec` version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 77b43f1..631d151 100755 --- a/setup.py +++ b/setup.py @@ -62,7 +62,7 @@ setup( extras_require={ # serialization - 'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"], + 'msgspec': ['msgspec >= "0.4.0"'], }, tests_require=['pytest'], From c65756ed804ac0525bf2fa3baa05d57aab98137d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Feb 2022 15:34:21 -0500 Subject: [PATCH 4/9] Add nooz --- nooz/300.misc.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 nooz/300.misc.rst diff --git a/nooz/300.misc.rst b/nooz/300.misc.rst new file mode 100644 index 0000000..bd5e217 --- /dev/null +++ b/nooz/300.misc.rst @@ -0,0 +1,3 @@ +Update to and pin latest `msgpack-python` (0.5.6) and `msgspec` (0.4.0) +both of which required adjustments for backwards imcompatible API +tweaks. From 17e195aacfc9ef101a1c8857294a9aecb5ca412e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Feb 2022 16:00:29 -0500 Subject: [PATCH 5/9] They renamed to `msgpack` and the version is 1.0.3 --- nooz/300.misc.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nooz/300.misc.rst b/nooz/300.misc.rst index bd5e217..0fe5c38 100644 --- a/nooz/300.misc.rst +++ b/nooz/300.misc.rst @@ -1,3 +1,3 @@ -Update to and pin latest `msgpack-python` (0.5.6) and `msgspec` (0.4.0) +Update to and pin latest `msgpack` (1.0.3) and `msgspec` (0.4.0) both of which required adjustments for backwards imcompatible API tweaks. diff --git a/setup.py b/setup.py index 631d151..dd00715 100755 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ setup( 'pdbpp', # serialization - 'msgpack', + 'msgpack>=1.0.3', ], extras_require={ From c5acc3b969c2b6687c2252711c68fc3dc7ace2b2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Feb 2022 08:48:07 -0500 Subject: [PATCH 6/9] Pack tuple keys as . delim strs in registry tests --- tests/test_discovery.py | 25 ++++++++++---- tractor/_actor.py | 75 +++++++++++++++++++++++------------------ 2 files changed, 61 insertions(+), 39 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 2b2996f..8ba4ebe 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -116,11 +116,26 @@ async def stream_from(portal): print(value) +async def unpack_reg(actor_or_portal): + ''' + Get and unpack a "registry" RPC request from the "arbiter" registry + system. + + ''' + if getattr(actor_or_portal, 'get_registry', None): + msg = await actor_or_portal.get_registry() + else: + msg = await actor_or_portal.run_from_ns('self', 'get_registry') + + return {tuple(key.split('.')): val for key, val in msg.items()} + + async def spawn_and_check_registry( arb_addr: tuple, use_signal: bool, remote_arbiter: bool = False, with_streaming: bool = False, + ) -> None: async with tractor.open_root_actor( @@ -134,13 +149,11 @@ async def spawn_and_check_registry( assert not actor.is_arbiter if actor.is_arbiter: - - async def get_reg(): - return await actor.get_registry() - extra = 1 # arbiter is local root actor + get_reg = partial(unpack_reg, actor) + else: - get_reg = partial(portal.run_from_ns, 'self', 'get_registry') + get_reg = partial(unpack_reg, portal) extra = 2 # local root actor + remote arbiter # ensure current actor is registered @@ -266,7 +279,7 @@ async def close_chans_before_nursery( ): async with tractor.get_arbiter(*arb_addr) as aportal: try: - get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') + get_reg = partial(unpack_reg, aportal) async with tractor.open_nursery() as tn: portal1 = await tn.start_actor( diff --git a/tractor/_actor.py b/tractor/_actor.py index 8e5d548..faa6412 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -27,7 +27,7 @@ import importlib.util import inspect import uuid import typing -from typing import List, Tuple, Any, Optional, Union +from typing import Any, Optional, Union from types import ModuleType import sys import os @@ -199,7 +199,9 @@ async def _invoke( assert chan.uid ctx = actor._contexts.pop((chan.uid, cid)) if ctx: - log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}') + log.runtime( + f'Context entrypoint for {func} was terminated:\n{ctx}' + ) assert cs if cs.cancelled_caught: @@ -368,10 +370,10 @@ class Actor: self, name: str, *, - enable_modules: List[str] = [], + enable_modules: list[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = None, + arbiter_addr: Optional[tuple[str, int]] = None, spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning @@ -421,25 +423,25 @@ class Actor: # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ - Tuple[Channel, str], - Tuple[trio.CancelScope, typing.Callable, trio.Event] + tuple[Channel, str], + tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} # map {actor uids -> Context} self._contexts: dict[ - Tuple[Tuple[str, str], str], + tuple[tuple[str, str], str], Context ] = {} - self._listeners: List[trio.abc.Listener] = [] + self._listeners: list[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ - Tuple[Any, Any, Any, Any, Any]] = None + tuple[Any, Any, Any, Any, Any]] = None self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa async def wait_for_peer( - self, uid: Tuple[str, str] - ) -> Tuple[trio.Event, Channel]: + self, uid: tuple[str, str] + ) -> tuple[trio.Event, Channel]: """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -1010,8 +1012,8 @@ class Actor: async def _from_parent( self, - parent_addr: Optional[Tuple[str, int]], - ) -> Tuple[Channel, Optional[Tuple[str, int]]]: + parent_addr: Optional[tuple[str, int]], + ) -> tuple[Channel, Optional[tuple[str, int]]]: try: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we @@ -1024,7 +1026,7 @@ class Actor: # Initial handshake: swap names. await self._do_handshake(chan) - accept_addr: Optional[Tuple[str, int]] = None + accept_addr: Optional[tuple[str, int]] = None if self._spawn_method == "trio": # Receive runtime state from our parent @@ -1066,7 +1068,7 @@ class Actor: async def _async_main( self, - accept_addr: Optional[Tuple[str, int]] = None, + accept_addr: Optional[tuple[str, int]] = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1075,7 +1077,7 @@ class Actor: # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. - parent_addr: Optional[Tuple[str, int]] = None, + parent_addr: Optional[tuple[str, int]] = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1261,7 +1263,7 @@ class Actor: handler_nursery: trio.Nursery, *, # (host, port) to bind for channel server - accept_host: Tuple[str, int] = None, + accept_host: tuple[str, int] = None, accept_port: int = 0, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1273,7 +1275,7 @@ class Actor: self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: - l: List[trio.abc.Listener] = await server_n.start( + l: list[trio.abc.Listener] = await server_n.start( partial( trio.serve_tcp, self._stream_handler, @@ -1427,7 +1429,7 @@ class Actor: self._server_n.cancel_scope.cancel() @property - def accept_addr(self) -> Optional[Tuple[str, int]]: + def accept_addr(self) -> Optional[tuple[str, int]]: """Primary address to which the channel server is bound. """ # throws OSError on failure @@ -1438,7 +1440,7 @@ class Actor: assert self._parent_chan, "No parent channel for this actor?" return Portal(self._parent_chan) - def get_chans(self, uid: Tuple[str, str]) -> List[Channel]: + def get_chans(self, uid: tuple[str, str]) -> list[Channel]: """Return all channels to the actor with provided uid.""" return self._peers[uid] @@ -1446,7 +1448,7 @@ class Actor: self, chan: Channel - ) -> Tuple[str, str]: + ) -> tuple[str, str]: """Exchange (name, UUIDs) identifiers as the first communication step. These are essentially the "mailbox addresses" found in actor model @@ -1454,7 +1456,7 @@ class Actor: """ await chan.send(self.uid) value = await chan.recv() - uid: Tuple[str, str] = (str(value[0]), str(value[1])) + uid: tuple[str, str] = (str(value[0]), str(value[1])) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") @@ -1483,14 +1485,14 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): self._registry: dict[ - Tuple[str, str], - Tuple[str, int], + tuple[str, str], + tuple[str, int], ] = {} self._waiters = {} super().__init__(*args, **kwargs) - async def find_actor(self, name: str) -> Optional[Tuple[str, int]]: + async def find_actor(self, name: str) -> Optional[tuple[str, int]]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr @@ -1499,25 +1501,31 @@ class Arbiter(Actor): async def get_registry( self - ) -> dict[Tuple[str, str], Tuple[str, int]]: - '''Return current name registry. + + ) -> dict[tuple[str, str], tuple[str, int]]: + ''' + Return current name registry. This method is async to allow for cross-actor invocation. + ''' # NOTE: requires ``strict_map_key=False`` to the msgpack # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return self._registry + return {'.'.join(key): val for key, val in self._registry.items()} async def wait_for_actor( self, name: str, - ) -> List[Tuple[str, int]]: - '''Wait for a particular actor to register. + + ) -> list[tuple[str, int]]: + ''' + Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently registered. + ''' sockaddrs = [] @@ -1536,8 +1544,8 @@ class Arbiter(Actor): async def register_actor( self, - uid: Tuple[str, str], - sockaddr: Tuple[str, int] + uid: tuple[str, str], + sockaddr: tuple[str, int] ) -> None: uid = name, uuid = (str(uid[0]), str(uid[1])) @@ -1552,7 +1560,8 @@ class Arbiter(Actor): async def unregister_actor( self, - uid: Tuple[str, str] + uid: tuple[str, str] + ) -> None: uid = (str(uid[0]), str(uid[1])) self._registry.pop(uid) From 0edc6a26bc28f13c80746b57d6ee67c33aa907d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Feb 2022 08:48:43 -0500 Subject: [PATCH 7/9] Go back to strict map keys --- tractor/_ipc.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 9e0ced1..b996c5f 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -121,11 +121,12 @@ class MsgpackTCPStream: self.drained: list[dict] = [] async def _iter_packets(self) -> AsyncGenerator[dict, None]: - """Yield packets from the underlying stream. - """ + ''' + Yield packets from the underlying stream. + + ''' unpacker = msgpack.Unpacker( raw=False, - strict_map_key=False ) while True: try: From 4eab4a0213ea1a253c646a3f574575f7c0a09fb4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Feb 2022 10:04:55 -0500 Subject: [PATCH 8/9] Type fix --- tractor/trionics/_mngrs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 76f6467..d9e392a 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -71,7 +71,7 @@ async def gather_contexts( mngrs: Sequence[AsyncContextManager[T]], -) -> AsyncGenerator[tuple[T, ...], None]: +) -> AsyncGenerator[tuple[Optional[T], ...], None]: ''' Concurrently enter a sequence of async context managers, each in a separate ``trio`` task and deliver the unwrapped values in the @@ -84,7 +84,7 @@ async def gather_contexts( entered and exited cancellation just works. ''' - unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) + unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs) all_entered = trio.Event() parent_exit = trio.Event() From 76a0492028ed16efbc7f636162efcda6c08cbdf4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Feb 2022 08:52:04 -0500 Subject: [PATCH 9/9] Fix type annot --- tractor/_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index faa6412..0991ed2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1502,7 +1502,7 @@ class Arbiter(Actor): async def get_registry( self - ) -> dict[tuple[str, str], tuple[str, int]]: + ) -> dict[str, tuple[str, int]]: ''' Return current name registry.