From 11cbf9ea5542f700932da876b102c8fab64ba407 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Aug 2018 13:12:29 -0400 Subject: [PATCH] Use proper `typing` annotations --- tractor/__init__.py | 14 ++++---- tractor/_actor.py | 58 +++++++++++++++++++-------------- tractor/_forkserver_hackzorz.py | 3 ++ tractor/_ipc.py | 8 ++--- tractor/_portal.py | 6 ++-- 5 files changed, 52 insertions(+), 37 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 569760f..eccd5f7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -5,7 +5,7 @@ tractor: An actor model micro-framework built on from functools import partial import typing -import trio +import trio # type: ignore from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel @@ -35,10 +35,10 @@ _default_arbiter_port = 1616 async def _main( async_fn: typing.Callable[..., typing.Awaitable], - args: tuple, - kwargs: dict, + args: typing.Tuple, + kwargs: typing.Dict[str, typing.Any], name: str, - arbiter_addr: (str, int) + arbiter_addr: typing.Tuple[str, int] ) -> typing.Any: """Async entry point for ``tractor``. """ @@ -81,10 +81,10 @@ async def _main( def run( async_fn: typing.Callable[..., typing.Awaitable], - *args: ..., + *args: typing.Tuple, name: str = None, - arbiter_addr: (str, int) = (_default_arbiter_host, _default_arbiter_port), - **kwargs: ... + arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port), + **kwargs: typing.Dict[str, typing.Any], ): """Run a trio-actor async function in process. diff --git a/tractor/_actor.py b/tractor/_actor.py index 270a339..febd07e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -10,7 +10,7 @@ import traceback import typing import uuid -import trio +import trio # type: ignore from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan @@ -41,7 +41,7 @@ async def _invoke( cid: str, chan: Channel, func: typing.Callable, - kwargs: dict, + kwargs: typing.Dict[str, typing.Any], task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. @@ -152,11 +152,11 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: [str] = [], - statespace: dict = {}, + rpc_module_paths: typing.List[str] = [], + statespace: typing.Dict[str, typing.Any] = {}, uid: str = None, loglevel: str = None, - arbiter_addr: (str, int) = None, + arbiter_addr: typing.Tuple[str, int] = None, ): self.name = name self.uid = (name, uid or str(uuid.uuid1())) @@ -186,7 +186,9 @@ class Actor: self._accept_host = None self._forkserver_info = None - async def wait_for_peer(self, uid: (str, str)) -> (trio.Event, Channel): + async def wait_for_peer( + self, uid: typing.Tuple[str, str] + ) -> (trio.Event, Channel): """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -284,14 +286,16 @@ class Actor: # maintain backpressure await q.put(msg) - def get_waitq(self, actorid: (str, str), cid: str) -> trio.Queue: + def get_waitq( + self, actorid: typing.Tuple[str, str], cid: str + ) -> trio.Queue: log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) async def send_cmd( self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> (str, trio.Queue): + ) -> typing.Tuple[str, trio.Queue]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. @@ -383,9 +387,9 @@ class Actor: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") def _fork_main( - self, accept_addr: (str, int), + self, accept_addr: typing.Tuple[str, int], forkserver_info: tuple, - parent_addr: (str, int) = None + parent_addr: typing.Tuple[str, int] = None ) -> None: # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") @@ -406,9 +410,9 @@ class Actor: async def _async_main( self, - accept_addr: (str, int), - arbiter_addr: (str, int) = None, - parent_addr: (str, int) = None, + accept_addr: typing.Tuple[str, int], + arbiter_addr: typing.Tuple[str, int] = None, + parent_addr: typing.Tuple[str, int] = None, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, maybe connect back to the parent, and @@ -506,7 +510,7 @@ class Actor: self, *, # (host, port) to bind for channel server - accept_host: (str, int) = None, + accept_host: typing.Tuple[str, int] = None, accept_port: int = 0, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: @@ -535,7 +539,7 @@ class Actor: self._listeners.extend(listeners) task_status.started() - async def _do_unreg(self, arbiter_addr: (str, int)) -> None: + async def _do_unreg(self, arbiter_addr: typing.Tuple[str, int]) -> None: # UNregister actor from the arbiter try: if arbiter_addr is not None: @@ -582,7 +586,7 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self) -> (str, int): + def accept_addr(self) -> typing.Tuple[str, int]: """Primary address to which the channel server is bound. """ try: @@ -594,7 +598,7 @@ class Actor: """Return a portal to our parent actor.""" return Portal(self._parent_chan) - def get_chans(self, uid: (str, str)) -> [Channel]: + def get_chans(self, uid: typing.Tuple[str, str]) -> typing.List[Channel]: """Return all channels to the actor with provided uid.""" return self._peers[uid] @@ -615,12 +619,14 @@ class Arbiter(Actor): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name: str) -> (str, int): + def find_actor(self, name: str) -> typing.Tuple[str, int]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr - async def wait_for_actor(self, name: str) -> [(str, int)]: + async def wait_for_actor( + self, name: str + ) -> typing.List[typing.Tuple[str, int]]: """Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently @@ -641,7 +647,9 @@ class Arbiter(Actor): return sockaddrs - def register_actor(self, uid: (str, str), sockaddr: (str, int)) -> None: + def register_actor( + self, uid: typing.Tuple[str, str], sockaddr: typing.Tuple[str, int] + ) -> None: name, uuid = uid self._registry[uid] = sockaddr @@ -651,7 +659,7 @@ class Arbiter(Actor): for event in events: event.set() - def unregister_actor(self, uid: (str, str)) -> None: + def unregister_actor(self, uid: typing.Tuple[str, str]) -> None: self._registry.pop(uid, None) @@ -660,7 +668,7 @@ async def _start_actor( main: typing.Coroutine, host: str, port: int, - arbiter_addr: (str, int), + arbiter_addr: typing.Tuple[str, int], nursery: trio._core._run.Nursery = None ): """Spawn a local actor by starting a task to execute it's main async @@ -720,7 +728,9 @@ async def get_arbiter(host: str, port: int) -> Portal: @asynccontextmanager -async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal: +async def find_actor( + name: str, arbiter_sockaddr: typing.Tuple[str, int] = None +) -> Portal: """Ask the arbiter to find actor(s) by name. Returns a connected portal to the last registered matching actor @@ -742,7 +752,7 @@ async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal: @asynccontextmanager async def wait_for_actor( name: str, - arbiter_sockaddr: (str, int) = None + arbiter_sockaddr: typing.Tuple[str, int] = None ) -> Portal: """Wait on an actor to register with the arbiter. diff --git a/tractor/_forkserver_hackzorz.py b/tractor/_forkserver_hackzorz.py index edd282d..827f744 100644 --- a/tractor/_forkserver_hackzorz.py +++ b/tractor/_forkserver_hackzorz.py @@ -2,6 +2,9 @@ This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py`` with some hackery to prevent any more then a single forkserver and semaphore tracker per ``MainProcess``. + +.. note:: There is no type hinting in this code base (yet) to remain as + a close as possible to upstream. """ import os import socket diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 87ef4e3..24b84a8 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -42,11 +42,11 @@ class StreamQueue: yield packet @property - def laddr(self) -> (str, int): + def laddr(self) -> typing.Tuple[str, int]: return self._laddr @property - def raddr(self) -> (str, int): + def raddr(self) -> typing.Tuple[str, int]: return self._raddr async def put(self, data: typing.Any) -> int: @@ -97,11 +97,11 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> (str, int): + def laddr(self) -> typing.Tuple[str, int]: return self.squeue.laddr if self.squeue else (None, None) @property - def raddr(self) -> (str, int): + def raddr(self) -> typing.Tuple[str, int]: return self.squeue.raddr if self.squeue else (None, None) async def connect( diff --git a/tractor/_portal.py b/tractor/_portal.py index 6087509..a42d5f8 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -31,7 +31,9 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): yield nursery -async def _do_handshake(actor: 'Actor', chan: 'Channel') -> (str, str): +async def _do_handshake( + actor: 'Actor', chan: 'Channel' +)-> typing.Tuple[str, str]: await chan.send(actor.uid) uid = await chan.recv() @@ -69,7 +71,7 @@ class Portal: async def _submit( self, ns: str, func: str, **kwargs - ) -> (str, trio.Queue, str, dict): + ) -> typing.Tuple[str, trio.Queue, str, typing.Dict[str, typing.Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple.