Add type annotations to most functions

This is purely for documentation purposes for now as it should be
obvious a bunch of the signatures aren't using the correct "generics"
syntax (i.e. the use of `(str, int)` instead of `typing.Tuple[str, int])`)
in a bunch of places. We're also not using a type checker yet and besides,
`trio` doesn't really expose a lot of its internal types very well.

2SQASH
type_annotations
Tyler Goodlet 2018-08-19 22:13:13 -04:00
parent d65cdc0153
commit b0ceb308ba
6 changed files with 148 additions and 126 deletions

View File

@ -3,6 +3,7 @@ tractor: An actor model micro-framework built on
``trio`` and ``multiprocessing``.
"""
from functools import partial
import typing
import trio
@ -32,7 +33,13 @@ _default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
async def _main(async_fn, args, kwargs, name, arbiter_addr):
async def _main(
async_fn: typing.Callable[..., typing.Awaitable],
args: tuple,
kwargs: dict,
name: str,
arbiter_addr: (str, int)
) -> typing.Any:
"""Async entry point for ``tractor``.
"""
log = get_logger('tractor')
@ -73,11 +80,11 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
def run(
async_fn,
*args,
name=None,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
**kwargs
async_fn: typing.Callable[..., typing.Awaitable],
*args: ...,
name: str = None,
arbiter_addr: (str, int) = (_default_arbiter_host, _default_arbiter_port),
**kwargs: ...
):
"""Run a trio-actor async function in process.

View File

@ -1,13 +1,14 @@
"""
Actor primitives and helpers
"""
import inspect
import importlib
from collections import defaultdict
from functools import partial
import traceback
import uuid
from itertools import chain
import importlib
import inspect
import traceback
import typing
import uuid
import trio
from async_generator import asynccontextmanager, aclosing
@ -36,8 +37,11 @@ class InternalActorError(RuntimeError):
async def _invoke(
actor, cid, chan, func, kwargs,
treat_as_gen=False,
actor: 'Actor',
cid: str,
chan: Channel,
func: typing.Callable,
kwargs: dict,
task_status=trio.TASK_STATUS_IGNORED
):
"""Invoke local func and return results over provided channel.
@ -169,7 +173,7 @@ class Actor:
self._accept_host = None
self._forkserver_info = None
async def wait_for_peer(self, uid):
async def wait_for_peer(self, uid: (str, str)) -> (trio.Event, Channel):
"""Wait for a connection back from a spawned actor with a given
``uid``.
"""
@ -179,7 +183,7 @@ class Actor:
log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
def load_namespaces(self):
def load_namespaces(self) -> None:
# We load namespaces after fork since this actor may
# be spawned on a different machine from the original nursery
# and we need to try and load the local module code (if it
@ -198,7 +202,7 @@ class Actor:
async def _stream_handler(
self,
stream: trio.SocketStream,
):
) -> None:
"""Entry point for new inbound connections to the channel server.
"""
self._no_more_peers.clear()
@ -256,19 +260,21 @@ class Actor:
await chan.send(None)
await chan.aclose()
async def _push_result(self, actorid, cid, msg):
async def _push_result(self, actorid, cid: str, msg: dict) -> None:
assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure
await q.put(msg)
def get_waitq(self, actorid, cid):
def get_waitq(self, actorid: (str, str), cid: str) -> trio.Queue:
log.debug(f"Getting result queue for {actorid} cid {cid}")
cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000))
async def send_cmd(self, chan, ns, func, kwargs):
async def send_cmd(
self, chan: Channel, ns: str, func: str, kwargs: dict
) -> (str, trio.Queue):
"""Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop.
@ -279,7 +285,9 @@ class Actor:
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, q
async def _process_messages(self, chan, treat_as_gen=False):
async def _process_messages(
self, chan: Channel, treat_as_gen: bool = False
) -> None:
"""Process messages async-RPC style.
Process rpc requests and deliver retrieved responses from channels.
@ -371,7 +379,11 @@ class Actor:
finally:
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
def _fork_main(self, accept_addr, forkserver_info, parent_addr=None):
def _fork_main(
self, accept_addr: (str, int),
forkserver_info: tuple,
parent_addr: (str, int) = None
) -> None:
# after fork routine which invokes a fresh ``trio.run``
# log.warn("Log level after fork is {self.loglevel}")
self._forkserver_info = forkserver_info
@ -391,22 +403,17 @@ class Actor:
async def _async_main(
self,
accept_addr,
arbiter_addr=None,
parent_addr=None,
_main_coro=None,
task_status=trio.TASK_STATUS_IGNORED,
):
accept_addr: (str, int),
arbiter_addr: (str, int) = None,
parent_addr: (str, int) = None,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
"""Start the channel server, maybe connect back to the parent, and
start the main task.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
# if this is the `MainProcess` then we get a ref to the main
# task's coroutine object for tossing errors into
self._main_coro = _main_coro
arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False
try:
@ -422,12 +429,6 @@ class Actor:
self._serve_forever, accept_host=host, accept_port=port)
)
# XXX: I wonder if a better name is maybe "requester"
# since I don't think the notion of a "parent" actor
# necessarily sticks given that eventually we want
# ``'MainProcess'`` (the actor who initially starts the
# forkserver) to eventually be the only one who is
# allowed to spawn new processes per Python program.
if parent_addr is not None:
try:
# Connect back to the parent actor and conduct initial
@ -502,10 +503,10 @@ class Actor:
self,
*,
# (host, port) to bind for channel server
accept_host=None,
accept_port=0,
task_status=trio.TASK_STATUS_IGNORED
):
accept_host: (str, int) = None,
accept_port: int = 0,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
"""Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until
@ -531,7 +532,7 @@ class Actor:
self._listeners.extend(listeners)
task_status.started()
async def _do_unreg(self, arbiter_addr):
async def _do_unreg(self, arbiter_addr: (str, int)) -> None:
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
@ -541,7 +542,7 @@ class Actor:
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
async def cancel(self):
async def cancel(self) -> None:
"""Cancel this actor.
The sequence in order is:
@ -554,7 +555,7 @@ class Actor:
self.cancel_server()
self._root_nursery.cancel_scope.cancel()
async def cancel_rpc_tasks(self):
async def cancel_rpc_tasks(self) -> None:
"""Cancel all existing RPC responder tasks using the cancel scope
registered for each.
"""
@ -570,7 +571,7 @@ class Actor:
f"Waiting for remaining rpc tasks to complete {scopes}")
await self._no_more_rpc_tasks.wait()
def cancel_server(self):
def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established.
"""
@ -578,7 +579,7 @@ class Actor:
self._server_nursery.cancel_scope.cancel()
@property
def accept_addr(self):
def accept_addr(self) -> (str, int):
"""Primary address to which the channel server is bound.
"""
try:
@ -586,11 +587,13 @@ class Actor:
except OSError:
return
def get_parent(self):
def get_parent(self) -> Portal:
"""Return a portal to our parent actor."""
return Portal(self._parent_chan)
def get_chans(self, actorid):
return self._peers[actorid]
def get_chans(self, uid: (str, str)) -> [Channel]:
"""Return all channels to the actor with provided uid."""
return self._peers[uid]
class Arbiter(Actor):
@ -609,12 +612,12 @@ class Arbiter(Actor):
self._waiters = {}
super().__init__(*args, **kwargs)
def find_actor(self, name):
def find_actor(self, name: str) -> (str, int):
for uid, sockaddr in self._registry.items():
if name in uid:
return sockaddr
async def wait_for_actor(self, name):
async def wait_for_actor(self, name: str) -> [(str, int)]:
"""Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
@ -635,7 +638,7 @@ class Arbiter(Actor):
return sockaddrs
def register_actor(self, uid, sockaddr):
def register_actor(self, uid: (str, str), sockaddr: (str, int)) -> None:
name, uuid = uid
self._registry[uid] = sockaddr
@ -645,13 +648,20 @@ class Arbiter(Actor):
for event in events:
event.set()
def unregister_actor(self, uid):
def unregister_actor(self, uid: (str, str)) -> None:
self._registry.pop(uid, None)
async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
"""Spawn a local actor by starting a task to execute it's main
async function.
async def _start_actor(
actor: Actor,
main: typing.Coroutine,
host: str,
port: int,
arbiter_addr: (str, int),
nursery: trio._core._run.Nursery = None
):
"""Spawn a local actor by starting a task to execute it's main async
function.
Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete.
@ -664,29 +674,22 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
log.info(f"Starting local {actor} @ {host}:{port}")
async with trio.open_nursery() as nursery:
if main is not None:
main_coro = main()
await nursery.start(
partial(
actor._async_main,
accept_addr=(host, port),
parent_addr=None,
arbiter_addr=arbiter_addr,
_main_coro=main_coro
)
)
if main is not None:
result = await main_coro
result = await main()
# XXX: If spawned with a dedicated "main function",
# the actor is cancelled when this context is complete
# given that there are no more active peer channels connected
actor.cancel_server()
# block on actor to complete
# unset module state
_state._current_actor = None
log.info("Completed async main")
@ -695,7 +698,7 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
@asynccontextmanager
async def get_arbiter(host, port):
async def get_arbiter(host: str, port: int) -> Portal:
"""Return a portal instance connected to a local or remote
arbiter.
"""
@ -714,10 +717,7 @@ async def get_arbiter(host, port):
@asynccontextmanager
async def find_actor(
name,
arbiter_sockaddr=None,
):
async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal:
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
@ -738,9 +738,9 @@ async def find_actor(
@asynccontextmanager
async def wait_for_actor(
name,
arbiter_sockaddr=None,
):
name: str,
arbiter_sockaddr: (str, int) = None
) -> Portal:
"""Wait on an actor to register with the arbiter.
A portal to the first actor which registered is be returned.

View File

@ -1,7 +1,7 @@
"""
Inter-process comms abstractions
"""
from typing import Coroutine, Tuple
import typing
import msgpack
import trio
@ -14,14 +14,14 @@ log = get_logger('ipc')
class StreamQueue:
"""Stream wrapped as a queue that delivers ``msgpack`` serialized objects.
"""
def __init__(self, stream):
def __init__(self, stream: trio.SocketStream):
self.stream = stream
self._agen = self._iter_packets()
self._laddr = self.stream.socket.getsockname()[:2]
self._raddr = self.stream.socket.getpeername()[:2]
self._send_lock = trio.Lock()
async def _iter_packets(self):
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream.
"""
unpacker = msgpack.Unpacker(raw=False, use_list=False)
@ -42,25 +42,25 @@ class StreamQueue:
yield packet
@property
def laddr(self):
def laddr(self) -> (str, int):
return self._laddr
@property
def raddr(self):
def raddr(self) -> (str, int):
return self._raddr
async def put(self, data):
async def put(self, data: typing.Any) -> int:
async with self._send_lock:
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
async def get(self):
async def get(self) -> typing.Any:
return await self._agen.asend(None)
def __aiter__(self):
return self._agen
def connected(self):
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@ -73,7 +73,7 @@ class Channel:
def __init__(
self,
destaddr: tuple = None,
on_reconnect: Coroutine = None,
on_reconnect: typing.Coroutine = None,
auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active
) -> None:
@ -89,7 +89,7 @@ class Channel:
self.uid = None
self._agen = self._aiter_recv()
def __repr__(self):
def __repr__(self) -> str:
if self.squeue:
return repr(
self.squeue.stream.socket._sock).replace(
@ -97,14 +97,16 @@ class Channel:
return object.__repr__(self)
@property
def laddr(self):
def laddr(self) -> (str, int):
return self.squeue.laddr if self.squeue else (None, None)
@property
def raddr(self):
def raddr(self) -> (str, int):
return self.squeue.raddr if self.squeue else (None, None)
async def connect(self, destaddr: Tuple[str, int] = None, **kwargs):
async def connect(
self, destaddr: typing.Tuple[str, int] = None, **kwargs
) -> trio.SocketStream:
if self.connected():
raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr
@ -112,11 +114,11 @@ class Channel:
self.squeue = StreamQueue(stream)
return stream
async def send(self, item):
async def send(self, item: typing.Any) -> None:
log.trace(f"send `{item}`")
await self.squeue.put(item)
async def recv(self):
async def recv(self) -> typing.Any:
try:
return await self.squeue.get()
except trio.BrokenStreamError:
@ -124,7 +126,7 @@ class Channel:
await self._reconnect()
return await self.recv()
async def aclose(self, *args):
async def aclose(self) -> None:
log.debug(f"Closing {self}")
await self.squeue.stream.aclose()
@ -138,7 +140,7 @@ class Channel:
def __aiter__(self):
return self._agen
async def _reconnect(self):
async def _reconnect(self) -> None:
"""Handle connection failures by polling until a reconnect can be
established.
"""
@ -167,7 +169,9 @@ class Channel:
" for re-establishment")
await trio.sleep(1)
async def _aiter_recv(self):
async def _aiter_recv(
self
) -> typing.AsyncGenerator[typing.Any, None]:
"""Async iterate items from underlying stream.
"""
while True:
@ -189,14 +193,16 @@ class Channel:
else:
return
def connected(self):
def connected(self) -> bool:
return self.squeue.connected() if self.squeue else False
@asynccontextmanager
async def _connect_chan(host, port):
"""Create and connect a channel with disconnect on
context manager teardown.
async def _connect_chan(
host: str, port: int
) -> typing.AsyncContextManager[Channel]:
"""Create and connect a channel with disconnect on context manager
teardown.
"""
chan = Channel((host, port))
await chan.connect()

View File

@ -2,6 +2,7 @@
Portal api
"""
import importlib
import typing
import trio
from async_generator import asynccontextmanager
@ -18,7 +19,7 @@ class RemoteActorError(RuntimeError):
@asynccontextmanager
async def maybe_open_nursery(nursery=None):
async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
"""Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
@ -30,7 +31,7 @@ async def maybe_open_nursery(nursery=None):
yield nursery
async def _do_handshake(actor, chan):
async def _do_handshake(actor: 'Actor', chan: 'Channel') -> (str, str):
await chan.send(actor.uid)
uid = await chan.recv()
@ -51,7 +52,7 @@ class Portal:
Think of this like an native async IPC API.
"""
def __init__(self, channel):
def __init__(self, channel: 'Channel'):
self.channel = channel
# when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point
@ -60,13 +61,15 @@ class Portal:
self._exc = None
self._expect_result = None
async def aclose(self):
async def aclose(self) -> None:
log.debug(f"Closing {self}")
# XXX: won't work until https://github.com/python-trio/trio/pull/460
# gets in!
await self.channel.aclose()
async def _submit(self, ns, func, **kwargs):
async def _submit(
self, ns: str, func: str, **kwargs
) -> (str, trio.Queue, str, dict):
"""Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str,
first message packet as a tuple.
@ -93,12 +96,12 @@ class Portal:
return cid, q, resp_type, first_msg
async def _submit_for_result(self, ns, func, **kwargs):
async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
assert self._expect_result is None, \
"A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, **kwargs)
async def run(self, ns, func, **kwargs):
async def run(self, ns: str, func: str, **kwargs) -> typing.Any:
"""Submit a function to be scheduled and run by actor, wrap and return
its (stream of) result(s).
@ -108,7 +111,9 @@ class Portal:
*(await self._submit(ns, func, **kwargs))
)
async def _return_from_resptype(self, cid, q, resptype, first_msg):
async def _return_from_resptype(
self, cid: str, q: trio.Queue, resptype: str, first_msg: dict
) -> typing.Any:
# TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff)
@ -145,7 +150,7 @@ class Portal:
else:
raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self):
async def result(self) -> typing.Any:
"""Return the result(s) from the remote actor's "main" task.
"""
if self._expect_result is None:
@ -165,13 +170,13 @@ class Portal:
)
return self._result
async def close(self):
async def close(self) -> None:
# trigger remote msg loop `break`
chan = self.channel
log.debug(f"Closing portal for {chan} to {chan.uid}")
await self.channel.send(None)
async def cancel_actor(self):
async def cancel_actor(self) -> bool:
"""Cancel the actor on the other end of this portal.
"""
log.warn(
@ -198,10 +203,10 @@ class LocalPortal:
A compatibility shim for normal portals but for invoking functions
using an in process actor instance.
"""
def __init__(self, actor):
def __init__(self, actor: 'Actor'):
self.actor = actor
async def run(self, ns, func, **kwargs):
async def run(self, ns: str, func: str, **kwargs) -> typing.Any:
"""Run a requested function locally and return it's result.
"""
obj = self.actor if ns == 'self' else importlib.import_module(ns)
@ -210,7 +215,10 @@ class LocalPortal:
@asynccontextmanager
async def open_portal(channel, nursery=None):
async def open_portal(
channel: 'Channel',
nursery: trio._core._run.Nursery = None
) -> typing.AsyncContextManager[Portal]:
"""Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing.

View File

@ -4,6 +4,7 @@
import multiprocessing as mp
import inspect
from multiprocessing import forkserver, semaphore_tracker
import typing
import trio
from async_generator import asynccontextmanager, aclosing
@ -23,8 +24,8 @@ log = get_logger('tractor')
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(self, actor, supervisor=None):
self.supervisor = supervisor # TODO
def __init__(self, actor: Actor):
# self.supervisor = supervisor # TODO
self._actor = actor
self._children = {}
# portals spawned with ``run_in_actor()``
@ -38,11 +39,11 @@ class ActorNursery:
async def start_actor(
self,
name: str,
bind_addr=('127.0.0.1', 0),
statespace=None,
rpc_module_paths=None,
loglevel=None, # set log level per subactor
):
bind_addr: (str, int) = ('127.0.0.1', 0),
statespace: dict = None,
rpc_module_paths: [str] = None,
loglevel: str = None, # set log level per subactor
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
actor = Actor(
name,
@ -104,14 +105,14 @@ class ActorNursery:
async def run_in_actor(
self,
name,
fn,
bind_addr=('127.0.0.1', 0),
rpc_module_paths=None,
statespace=None,
loglevel=None, # set log level per subactor
name: str,
fn: typing.Callable,
bind_addr: (str, int) = ('127.0.0.1', 0),
rpc_module_paths: [str] = None,
statespace: dict = None,
loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn``
):
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
return its result.
@ -134,7 +135,7 @@ class ActorNursery:
)
return portal
async def wait(self):
async def wait(self) -> None:
"""Wait for all subactors to complete.
"""
async def maybe_consume_result(portal, actor):
@ -193,7 +194,7 @@ class ActorNursery:
cs = await nursery.start(wait_for_actor, portal, subactor)
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
async def cancel(self, hard_kill=False):
async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel
iteslf and wait for all subprocesses to terminate.
@ -274,7 +275,7 @@ class ActorNursery:
@asynccontextmanager
async def open_nursery(supervisor=None):
async def open_nursery() -> typing.AsyncContextManager[ActorNursery]:
"""Create and yield a new ``ActorNursery``.
"""
actor = current_actor()
@ -282,5 +283,5 @@ async def open_nursery(supervisor=None):
raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang
async with ActorNursery(current_actor(), supervisor) as nursery:
async with ActorNursery(current_actor()) as nursery:
yield nursery

View File

@ -92,5 +92,5 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger:
return log
def get_loglevel():
def get_loglevel() -> str:
return _default_loglevel