Use proper `typing` annotations

type_annotations
Tyler Goodlet 2018-08-26 13:12:29 -04:00
parent c3eee1f228
commit 11cbf9ea55
5 changed files with 52 additions and 37 deletions

View File

@ -5,7 +5,7 @@ tractor: An actor model micro-framework built on
from functools import partial from functools import partial
import typing import typing
import trio import trio # type: ignore
from .log import get_console_log, get_logger, get_loglevel from .log import get_console_log, get_logger, get_loglevel
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
@ -35,10 +35,10 @@ _default_arbiter_port = 1616
async def _main( async def _main(
async_fn: typing.Callable[..., typing.Awaitable], async_fn: typing.Callable[..., typing.Awaitable],
args: tuple, args: typing.Tuple,
kwargs: dict, kwargs: typing.Dict[str, typing.Any],
name: str, name: str,
arbiter_addr: (str, int) arbiter_addr: typing.Tuple[str, int]
) -> typing.Any: ) -> typing.Any:
"""Async entry point for ``tractor``. """Async entry point for ``tractor``.
""" """
@ -81,10 +81,10 @@ async def _main(
def run( def run(
async_fn: typing.Callable[..., typing.Awaitable], async_fn: typing.Callable[..., typing.Awaitable],
*args: ..., *args: typing.Tuple,
name: str = None, name: str = None,
arbiter_addr: (str, int) = (_default_arbiter_host, _default_arbiter_port), arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port),
**kwargs: ... **kwargs: typing.Dict[str, typing.Any],
): ):
"""Run a trio-actor async function in process. """Run a trio-actor async function in process.

View File

@ -10,7 +10,7 @@ import traceback
import typing import typing
import uuid import uuid
import trio import trio # type: ignore
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan from ._ipc import Channel, _connect_chan
@ -41,7 +41,7 @@ async def _invoke(
cid: str, cid: str,
chan: Channel, chan: Channel,
func: typing.Callable, func: typing.Callable,
kwargs: dict, kwargs: typing.Dict[str, typing.Any],
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
"""Invoke local func and return results over provided channel. """Invoke local func and return results over provided channel.
@ -152,11 +152,11 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
rpc_module_paths: [str] = [], rpc_module_paths: typing.List[str] = [],
statespace: dict = {}, statespace: typing.Dict[str, typing.Any] = {},
uid: str = None, uid: str = None,
loglevel: str = None, loglevel: str = None,
arbiter_addr: (str, int) = None, arbiter_addr: typing.Tuple[str, int] = None,
): ):
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid1())) self.uid = (name, uid or str(uuid.uuid1()))
@ -186,7 +186,9 @@ class Actor:
self._accept_host = None self._accept_host = None
self._forkserver_info = None self._forkserver_info = None
async def wait_for_peer(self, uid: (str, str)) -> (trio.Event, Channel): async def wait_for_peer(
self, uid: typing.Tuple[str, str]
) -> (trio.Event, Channel):
"""Wait for a connection back from a spawned actor with a given """Wait for a connection back from a spawned actor with a given
``uid``. ``uid``.
""" """
@ -284,14 +286,16 @@ class Actor:
# maintain backpressure # maintain backpressure
await q.put(msg) await q.put(msg)
def get_waitq(self, actorid: (str, str), cid: str) -> trio.Queue: def get_waitq(
self, actorid: typing.Tuple[str, str], cid: str
) -> trio.Queue:
log.debug(f"Getting result queue for {actorid} cid {cid}") log.debug(f"Getting result queue for {actorid} cid {cid}")
cids2qs = self._actors2calls.setdefault(actorid, {}) cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000)) return cids2qs.setdefault(cid, trio.Queue(1000))
async def send_cmd( async def send_cmd(
self, chan: Channel, ns: str, func: str, kwargs: dict self, chan: Channel, ns: str, func: str, kwargs: dict
) -> (str, trio.Queue): ) -> typing.Tuple[str, trio.Queue]:
"""Send a ``'cmd'`` message to a remote actor and return a """Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop. responses delivered by the local message processing loop.
@ -383,9 +387,9 @@ class Actor:
log.debug(f"Exiting msg loop for {chan} from {chan.uid}") log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
def _fork_main( def _fork_main(
self, accept_addr: (str, int), self, accept_addr: typing.Tuple[str, int],
forkserver_info: tuple, forkserver_info: tuple,
parent_addr: (str, int) = None parent_addr: typing.Tuple[str, int] = None
) -> None: ) -> None:
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
# log.warn("Log level after fork is {self.loglevel}") # log.warn("Log level after fork is {self.loglevel}")
@ -406,9 +410,9 @@ class Actor:
async def _async_main( async def _async_main(
self, self,
accept_addr: (str, int), accept_addr: typing.Tuple[str, int],
arbiter_addr: (str, int) = None, arbiter_addr: typing.Tuple[str, int] = None,
parent_addr: (str, int) = None, parent_addr: typing.Tuple[str, int] = None,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Start the channel server, maybe connect back to the parent, and """Start the channel server, maybe connect back to the parent, and
@ -506,7 +510,7 @@ class Actor:
self, self,
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
accept_host: (str, int) = None, accept_host: typing.Tuple[str, int] = None,
accept_port: int = 0, accept_port: int = 0,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -535,7 +539,7 @@ class Actor:
self._listeners.extend(listeners) self._listeners.extend(listeners)
task_status.started() task_status.started()
async def _do_unreg(self, arbiter_addr: (str, int)) -> None: async def _do_unreg(self, arbiter_addr: typing.Tuple[str, int]) -> None:
# UNregister actor from the arbiter # UNregister actor from the arbiter
try: try:
if arbiter_addr is not None: if arbiter_addr is not None:
@ -582,7 +586,7 @@ class Actor:
self._server_nursery.cancel_scope.cancel() self._server_nursery.cancel_scope.cancel()
@property @property
def accept_addr(self) -> (str, int): def accept_addr(self) -> typing.Tuple[str, int]:
"""Primary address to which the channel server is bound. """Primary address to which the channel server is bound.
""" """
try: try:
@ -594,7 +598,7 @@ class Actor:
"""Return a portal to our parent actor.""" """Return a portal to our parent actor."""
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chans(self, uid: (str, str)) -> [Channel]: def get_chans(self, uid: typing.Tuple[str, str]) -> typing.List[Channel]:
"""Return all channels to the actor with provided uid.""" """Return all channels to the actor with provided uid."""
return self._peers[uid] return self._peers[uid]
@ -615,12 +619,14 @@ class Arbiter(Actor):
self._waiters = {} self._waiters = {}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def find_actor(self, name: str) -> (str, int): def find_actor(self, name: str) -> typing.Tuple[str, int]:
for uid, sockaddr in self._registry.items(): for uid, sockaddr in self._registry.items():
if name in uid: if name in uid:
return sockaddr return sockaddr
async def wait_for_actor(self, name: str) -> [(str, int)]: async def wait_for_actor(
self, name: str
) -> typing.List[typing.Tuple[str, int]]:
"""Wait for a particular actor to register. """Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently This is a blocking call if no actor by the provided name is currently
@ -641,7 +647,9 @@ class Arbiter(Actor):
return sockaddrs return sockaddrs
def register_actor(self, uid: (str, str), sockaddr: (str, int)) -> None: def register_actor(
self, uid: typing.Tuple[str, str], sockaddr: typing.Tuple[str, int]
) -> None:
name, uuid = uid name, uuid = uid
self._registry[uid] = sockaddr self._registry[uid] = sockaddr
@ -651,7 +659,7 @@ class Arbiter(Actor):
for event in events: for event in events:
event.set() event.set()
def unregister_actor(self, uid: (str, str)) -> None: def unregister_actor(self, uid: typing.Tuple[str, str]) -> None:
self._registry.pop(uid, None) self._registry.pop(uid, None)
@ -660,7 +668,7 @@ async def _start_actor(
main: typing.Coroutine, main: typing.Coroutine,
host: str, host: str,
port: int, port: int,
arbiter_addr: (str, int), arbiter_addr: typing.Tuple[str, int],
nursery: trio._core._run.Nursery = None nursery: trio._core._run.Nursery = None
): ):
"""Spawn a local actor by starting a task to execute it's main async """Spawn a local actor by starting a task to execute it's main async
@ -720,7 +728,9 @@ async def get_arbiter(host: str, port: int) -> Portal:
@asynccontextmanager @asynccontextmanager
async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal: async def find_actor(
name: str, arbiter_sockaddr: typing.Tuple[str, int] = None
) -> Portal:
"""Ask the arbiter to find actor(s) by name. """Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor Returns a connected portal to the last registered matching actor
@ -742,7 +752,7 @@ async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal:
@asynccontextmanager @asynccontextmanager
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
arbiter_sockaddr: (str, int) = None arbiter_sockaddr: typing.Tuple[str, int] = None
) -> Portal: ) -> Portal:
"""Wait on an actor to register with the arbiter. """Wait on an actor to register with the arbiter.

View File

@ -2,6 +2,9 @@
This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py`` This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py``
with some hackery to prevent any more then a single forkserver and with some hackery to prevent any more then a single forkserver and
semaphore tracker per ``MainProcess``. semaphore tracker per ``MainProcess``.
.. note:: There is no type hinting in this code base (yet) to remain as
a close as possible to upstream.
""" """
import os import os
import socket import socket

View File

@ -42,11 +42,11 @@ class StreamQueue:
yield packet yield packet
@property @property
def laddr(self) -> (str, int): def laddr(self) -> typing.Tuple[str, int]:
return self._laddr return self._laddr
@property @property
def raddr(self) -> (str, int): def raddr(self) -> typing.Tuple[str, int]:
return self._raddr return self._raddr
async def put(self, data: typing.Any) -> int: async def put(self, data: typing.Any) -> int:
@ -97,11 +97,11 @@ class Channel:
return object.__repr__(self) return object.__repr__(self)
@property @property
def laddr(self) -> (str, int): def laddr(self) -> typing.Tuple[str, int]:
return self.squeue.laddr if self.squeue else (None, None) return self.squeue.laddr if self.squeue else (None, None)
@property @property
def raddr(self) -> (str, int): def raddr(self) -> typing.Tuple[str, int]:
return self.squeue.raddr if self.squeue else (None, None) return self.squeue.raddr if self.squeue else (None, None)
async def connect( async def connect(

View File

@ -31,7 +31,9 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
yield nursery yield nursery
async def _do_handshake(actor: 'Actor', chan: 'Channel') -> (str, str): async def _do_handshake(
actor: 'Actor', chan: 'Channel'
)-> typing.Tuple[str, str]:
await chan.send(actor.uid) await chan.send(actor.uid)
uid = await chan.recv() uid = await chan.recv()
@ -69,7 +71,7 @@ class Portal:
async def _submit( async def _submit(
self, ns: str, func: str, **kwargs self, ns: str, func: str, **kwargs
) -> (str, trio.Queue, str, dict): ) -> typing.Tuple[str, trio.Queue, str, typing.Dict[str, typing.Any]]:
"""Submit a function to be scheduled and run by actor, return the """Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str, associated caller id, response queue, response type str,
first message packet as a tuple. first message packet as a tuple.