diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 2b2996f..8ba4ebe 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -116,11 +116,26 @@ async def stream_from(portal): print(value) +async def unpack_reg(actor_or_portal): + ''' + Get and unpack a "registry" RPC request from the "arbiter" registry + system. + + ''' + if getattr(actor_or_portal, 'get_registry', None): + msg = await actor_or_portal.get_registry() + else: + msg = await actor_or_portal.run_from_ns('self', 'get_registry') + + return {tuple(key.split('.')): val for key, val in msg.items()} + + async def spawn_and_check_registry( arb_addr: tuple, use_signal: bool, remote_arbiter: bool = False, with_streaming: bool = False, + ) -> None: async with tractor.open_root_actor( @@ -134,13 +149,11 @@ async def spawn_and_check_registry( assert not actor.is_arbiter if actor.is_arbiter: - - async def get_reg(): - return await actor.get_registry() - extra = 1 # arbiter is local root actor + get_reg = partial(unpack_reg, actor) + else: - get_reg = partial(portal.run_from_ns, 'self', 'get_registry') + get_reg = partial(unpack_reg, portal) extra = 2 # local root actor + remote arbiter # ensure current actor is registered @@ -266,7 +279,7 @@ async def close_chans_before_nursery( ): async with tractor.get_arbiter(*arb_addr) as aportal: try: - get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') + get_reg = partial(unpack_reg, aportal) async with tractor.open_nursery() as tn: portal1 = await tn.start_actor( diff --git a/tractor/_actor.py b/tractor/_actor.py index 8e5d548..faa6412 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -27,7 +27,7 @@ import importlib.util import inspect import uuid import typing -from typing import List, Tuple, Any, Optional, Union +from typing import Any, Optional, Union from types import ModuleType import sys import os @@ -199,7 +199,9 @@ async def _invoke( assert chan.uid ctx = actor._contexts.pop((chan.uid, cid)) if ctx: - log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}') + log.runtime( + f'Context entrypoint for {func} was terminated:\n{ctx}' + ) assert cs if cs.cancelled_caught: @@ -368,10 +370,10 @@ class Actor: self, name: str, *, - enable_modules: List[str] = [], + enable_modules: list[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = None, + arbiter_addr: Optional[tuple[str, int]] = None, spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning @@ -421,25 +423,25 @@ class Actor: # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ - Tuple[Channel, str], - Tuple[trio.CancelScope, typing.Callable, trio.Event] + tuple[Channel, str], + tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} # map {actor uids -> Context} self._contexts: dict[ - Tuple[Tuple[str, str], str], + tuple[tuple[str, str], str], Context ] = {} - self._listeners: List[trio.abc.Listener] = [] + self._listeners: list[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ - Tuple[Any, Any, Any, Any, Any]] = None + tuple[Any, Any, Any, Any, Any]] = None self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa async def wait_for_peer( - self, uid: Tuple[str, str] - ) -> Tuple[trio.Event, Channel]: + self, uid: tuple[str, str] + ) -> tuple[trio.Event, Channel]: """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -1010,8 +1012,8 @@ class Actor: async def _from_parent( self, - parent_addr: Optional[Tuple[str, int]], - ) -> Tuple[Channel, Optional[Tuple[str, int]]]: + parent_addr: Optional[tuple[str, int]], + ) -> tuple[Channel, Optional[tuple[str, int]]]: try: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we @@ -1024,7 +1026,7 @@ class Actor: # Initial handshake: swap names. await self._do_handshake(chan) - accept_addr: Optional[Tuple[str, int]] = None + accept_addr: Optional[tuple[str, int]] = None if self._spawn_method == "trio": # Receive runtime state from our parent @@ -1066,7 +1068,7 @@ class Actor: async def _async_main( self, - accept_addr: Optional[Tuple[str, int]] = None, + accept_addr: Optional[tuple[str, int]] = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1075,7 +1077,7 @@ class Actor: # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. - parent_addr: Optional[Tuple[str, int]] = None, + parent_addr: Optional[tuple[str, int]] = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1261,7 +1263,7 @@ class Actor: handler_nursery: trio.Nursery, *, # (host, port) to bind for channel server - accept_host: Tuple[str, int] = None, + accept_host: tuple[str, int] = None, accept_port: int = 0, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1273,7 +1275,7 @@ class Actor: self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: - l: List[trio.abc.Listener] = await server_n.start( + l: list[trio.abc.Listener] = await server_n.start( partial( trio.serve_tcp, self._stream_handler, @@ -1427,7 +1429,7 @@ class Actor: self._server_n.cancel_scope.cancel() @property - def accept_addr(self) -> Optional[Tuple[str, int]]: + def accept_addr(self) -> Optional[tuple[str, int]]: """Primary address to which the channel server is bound. """ # throws OSError on failure @@ -1438,7 +1440,7 @@ class Actor: assert self._parent_chan, "No parent channel for this actor?" return Portal(self._parent_chan) - def get_chans(self, uid: Tuple[str, str]) -> List[Channel]: + def get_chans(self, uid: tuple[str, str]) -> list[Channel]: """Return all channels to the actor with provided uid.""" return self._peers[uid] @@ -1446,7 +1448,7 @@ class Actor: self, chan: Channel - ) -> Tuple[str, str]: + ) -> tuple[str, str]: """Exchange (name, UUIDs) identifiers as the first communication step. These are essentially the "mailbox addresses" found in actor model @@ -1454,7 +1456,7 @@ class Actor: """ await chan.send(self.uid) value = await chan.recv() - uid: Tuple[str, str] = (str(value[0]), str(value[1])) + uid: tuple[str, str] = (str(value[0]), str(value[1])) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") @@ -1483,14 +1485,14 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): self._registry: dict[ - Tuple[str, str], - Tuple[str, int], + tuple[str, str], + tuple[str, int], ] = {} self._waiters = {} super().__init__(*args, **kwargs) - async def find_actor(self, name: str) -> Optional[Tuple[str, int]]: + async def find_actor(self, name: str) -> Optional[tuple[str, int]]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr @@ -1499,25 +1501,31 @@ class Arbiter(Actor): async def get_registry( self - ) -> dict[Tuple[str, str], Tuple[str, int]]: - '''Return current name registry. + + ) -> dict[tuple[str, str], tuple[str, int]]: + ''' + Return current name registry. This method is async to allow for cross-actor invocation. + ''' # NOTE: requires ``strict_map_key=False`` to the msgpack # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return self._registry + return {'.'.join(key): val for key, val in self._registry.items()} async def wait_for_actor( self, name: str, - ) -> List[Tuple[str, int]]: - '''Wait for a particular actor to register. + + ) -> list[tuple[str, int]]: + ''' + Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently registered. + ''' sockaddrs = [] @@ -1536,8 +1544,8 @@ class Arbiter(Actor): async def register_actor( self, - uid: Tuple[str, str], - sockaddr: Tuple[str, int] + uid: tuple[str, str], + sockaddr: tuple[str, int] ) -> None: uid = name, uuid = (str(uid[0]), str(uid[1])) @@ -1552,7 +1560,8 @@ class Arbiter(Actor): async def unregister_actor( self, - uid: Tuple[str, str] + uid: tuple[str, str] + ) -> None: uid = (str(uid[0]), str(uid[1])) self._registry.pop(uid)