forked from goodboy/tractor
1
0
Fork 0

Merge pull request #35 from tgoodlet/type_annotations

Add type annotations to most functions
tests_reorg
goodboy 2018-08-31 20:04:39 -04:00 committed by GitHub
commit 22ac567230
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 300 additions and 223 deletions

View File

@ -12,4 +12,5 @@ install:
- pip install -U . -r requirements-test.txt - pip install -U . -r requirements-test.txt
script: script:
- mypy tractor/ --ignore-missing-imports
- pytest tests/ --no-print-logs - pytest tests/ --no-print-logs

View File

@ -1,3 +1,4 @@
pytest pytest
pytest-trio pytest-trio
pdbpp pdbpp
mypy

View File

@ -3,8 +3,9 @@ tractor: An actor model micro-framework built on
``trio`` and ``multiprocessing``. ``trio`` and ``multiprocessing``.
""" """
from functools import partial from functools import partial
import typing
import trio import trio # type: ignore
from .log import get_console_log, get_logger, get_loglevel from .log import get_console_log, get_logger, get_loglevel
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
@ -32,7 +33,13 @@ _default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616 _default_arbiter_port = 1616
async def _main(async_fn, args, kwargs, name, arbiter_addr): async def _main(
async_fn: typing.Callable[..., typing.Awaitable],
args: typing.Tuple,
kwargs: typing.Dict[str, typing.Any],
name: str,
arbiter_addr: typing.Tuple[str, int]
) -> typing.Any:
"""Async entry point for ``tractor``. """Async entry point for ``tractor``.
""" """
log = get_logger('tractor') log = get_logger('tractor')
@ -73,11 +80,11 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
def run( def run(
async_fn, async_fn: typing.Callable[..., typing.Awaitable],
*args, *args: typing.Tuple,
name=None, name: str = None,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port), arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port),
**kwargs **kwargs: typing.Dict[str, typing.Any],
): ):
"""Run a trio-actor async function in process. """Run a trio-actor async function in process.

View File

@ -1,15 +1,17 @@
""" """
Actor primitives and helpers Actor primitives and helpers
""" """
import inspect
import importlib
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from itertools import chain
import importlib
import inspect
import traceback import traceback
import uuid import uuid
from itertools import chain import typing
from typing import Dict, List, Tuple, Any, Optional, Union
import trio import trio # type: ignore
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan from ._ipc import Channel, _connect_chan
@ -36,12 +38,28 @@ class InternalActorError(RuntimeError):
async def _invoke( async def _invoke(
actor, cid, chan, func, kwargs, actor: 'Actor',
treat_as_gen=False, cid: str,
chan: Channel,
func: typing.Callable,
kwargs: Dict[str, Any],
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
"""Invoke local func and return results over provided channel. """Invoke local func and return results over provided channel.
""" """
sig = inspect.signature(func)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
try: try:
is_async_partial = False is_async_partial = False
is_async_gen_partial = False is_async_gen_partial = False
@ -135,41 +153,45 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
rpc_module_paths: [str] = [], rpc_module_paths: List[str] = [],
statespace: dict = {}, statespace: Optional[Dict[str, Any]] = None,
uid: str = None, uid: str = None,
loglevel: str = None, loglevel: str = None,
arbiter_addr: (str, int) = None, arbiter_addr: Optional[Tuple[str, int]] = None,
): ) -> None:
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid1())) self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths self.rpc_module_paths = rpc_module_paths
self._mods = {} self._mods: dict = {}
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.statespace = statespace self.statespace = statespace or {}
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr self._arb_addr = arbiter_addr
# filled in by `_async_main` after fork # filled in by `_async_main` after fork
self._root_nursery = None self._root_nursery: trio._core._run.Nursery = None
self._server_nursery = None self._server_nursery: trio._core._run.Nursery = None
self._peers = defaultdict(list) self._peers: defaultdict = defaultdict(list)
self._peer_connected = {} self._peer_connected: dict = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._no_more_peers.set() self._no_more_peers.set()
self._no_more_rpc_tasks = trio.Event() self._no_more_rpc_tasks = trio.Event()
self._no_more_rpc_tasks.set() self._no_more_rpc_tasks.set()
self._rpc_tasks = {} self._rpc_tasks: Dict[
Channel,
List[Tuple[trio._core._run.CancelScope, typing.Callable]]
] = {}
# map {uids -> {callids -> waiter queues}}
self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {}
self._listeners: List[trio.abc.Listener] = []
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None
self._actors2calls = {} # map {uids -> {callids -> waiter queues}} async def wait_for_peer(
self._listeners = [] self, uid: Tuple[str, str]
self._parent_chan = None ) -> Tuple[trio.Event, Channel]:
self._accept_host = None
self._forkserver_info = None
async def wait_for_peer(self, uid):
"""Wait for a connection back from a spawned actor with a given """Wait for a connection back from a spawned actor with a given
``uid``. ``uid``.
""" """
@ -179,11 +201,13 @@ class Actor:
log.debug(f"{uid} successfully connected back to us") log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1] return event, self._peers[uid][-1]
def load_namespaces(self): def load_modules(self) -> None:
# We load namespaces after fork since this actor may """Load allowed RPC modules locally (after fork).
# be spawned on a different machine from the original nursery
# and we need to try and load the local module code (if it Since this actor may be spawned on a different machine from
# exists) the original nursery we need to try and load the local module
code (if it exists).
"""
for path in self.rpc_module_paths: for path in self.rpc_module_paths:
self._mods[path] = importlib.import_module(path) self._mods[path] = importlib.import_module(path)
@ -198,7 +222,7 @@ class Actor:
async def _stream_handler( async def _stream_handler(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
): ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers.clear() self._no_more_peers.clear()
@ -256,33 +280,44 @@ class Actor:
await chan.send(None) await chan.send(None)
await chan.aclose() await chan.aclose()
async def _push_result(self, actorid, cid, msg): async def _push_result(self, actorid, cid: str, msg: dict) -> None:
"""Push an RPC result to the local consumer's queue.
"""
assert actorid, f"`actorid` can't be {actorid}" assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid) q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure # maintain backpressure
await q.put(msg) await q.put(msg)
def get_waitq(self, actorid, cid): def get_waitq(
self,
actorid: Tuple[str, str],
cid: str
) -> trio.Queue:
log.debug(f"Getting result queue for {actorid} cid {cid}") log.debug(f"Getting result queue for {actorid} cid {cid}")
cids2qs = self._actors2calls.setdefault(actorid, {}) cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000)) return cids2qs.setdefault(cid, trio.Queue(1000))
async def send_cmd(self, chan, ns, func, kwargs): async def send_cmd(
self, chan: Channel, ns: str, func: str, kwargs: dict
) -> Tuple[str, trio.Queue]:
"""Send a ``'cmd'`` message to a remote actor and return a """Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop. responses delivered by the local message processing loop.
""" """
cid = str(uuid.uuid1()) cid = str(uuid.uuid1())
assert chan.uid
q = self.get_waitq(chan.uid, cid) q = self.get_waitq(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, q return cid, q
async def _process_messages(self, chan, treat_as_gen=False): async def _process_messages(
"""Process messages async-RPC style. self, chan: Channel, treat_as_gen: bool = False
) -> None:
"""Process messages for the channel async-RPC style.
Process rpc requests and deliver retrieved responses from channels. Receive multiplexed RPC requests and deliver responses over ``chan``.
""" """
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
@ -315,6 +350,7 @@ class Actor:
# push any non-rpc-response error to all local consumers # push any non-rpc-response error to all local consumers
# and mark the channel as errored # and mark the channel as errored
chan._exc = err = msg['error'] chan._exc = err = msg['error']
assert chan.uid
for cid in self._actors2calls[chan.uid]: for cid in self._actors2calls[chan.uid]:
await self._push_result(chan.uid, cid, msg) await self._push_result(chan.uid, cid, msg)
raise InternalActorError(f"{chan.uid}\n" + err) raise InternalActorError(f"{chan.uid}\n" + err)
@ -328,23 +364,9 @@ class Actor:
func = getattr(self._mods[ns], funcname) func = getattr(self._mods[ns], funcname)
# spin up a task for the requested function # spin up a task for the requested function
sig = inspect.signature(func)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
log.debug(f"Spawning task for {func}") log.debug(f"Spawning task for {func}")
cs = await self._root_nursery.start( cs = await self._root_nursery.start(
_invoke, self, cid, chan, func, kwargs, treat_as_gen, _invoke, self, cid, chan, func, kwargs,
name=funcname name=funcname
) )
# never allow cancelling cancel requests (results in # never allow cancelling cancel requests (results in
@ -371,7 +393,12 @@ class Actor:
finally: finally:
log.debug(f"Exiting msg loop for {chan} from {chan.uid}") log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
def _fork_main(self, accept_addr, forkserver_info, parent_addr=None): def _fork_main(
self,
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
parent_addr: Tuple[str, int] = None
) -> None:
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
# log.warn("Log level after fork is {self.loglevel}") # log.warn("Log level after fork is {self.loglevel}")
self._forkserver_info = forkserver_info self._forkserver_info = forkserver_info
@ -391,22 +418,17 @@ class Actor:
async def _async_main( async def _async_main(
self, self,
accept_addr, accept_addr: Tuple[str, int],
arbiter_addr=None, arbiter_addr: Optional[Tuple[str, int]] = None,
parent_addr=None, parent_addr: Optional[Tuple[str, int]] = None,
_main_coro=None, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
task_status=trio.TASK_STATUS_IGNORED, ) -> None:
):
"""Start the channel server, maybe connect back to the parent, and """Start the channel server, maybe connect back to the parent, and
start the main task. start the main task.
A "root-most" (or "top-level") nursery for this actor is opened here A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
""" """
# if this is the `MainProcess` then we get a ref to the main
# task's coroutine object for tossing errors into
self._main_coro = _main_coro
arbiter_addr = arbiter_addr or self._arb_addr arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False registered_with_arbiter = False
try: try:
@ -414,7 +436,7 @@ class Actor:
self._root_nursery = nursery self._root_nursery = nursery
# load allowed RPC module # load allowed RPC module
self.load_namespaces() self.load_modules()
# Startup up channel server # Startup up channel server
host, port = accept_addr host, port = accept_addr
@ -422,12 +444,6 @@ class Actor:
self._serve_forever, accept_host=host, accept_port=port) self._serve_forever, accept_host=host, accept_port=port)
) )
# XXX: I wonder if a better name is maybe "requester"
# since I don't think the notion of a "parent" actor
# necessarily sticks given that eventually we want
# ``'MainProcess'`` (the actor who initially starts the
# forkserver) to eventually be the only one who is
# allowed to spawn new processes per Python program.
if parent_addr is not None: if parent_addr is not None:
try: try:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
@ -502,10 +518,10 @@ class Actor:
self, self,
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
accept_host=None, accept_host: Tuple[str, int] = None,
accept_port=0, accept_port: int = 0,
task_status=trio.TASK_STATUS_IGNORED task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
): ) -> None:
"""Start the channel server, begin listening for new connections. """Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until This will cause an actor to continue living (blocking) until
@ -531,7 +547,7 @@ class Actor:
self._listeners.extend(listeners) self._listeners.extend(listeners)
task_status.started() task_status.started()
async def _do_unreg(self, arbiter_addr): async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None:
# UNregister actor from the arbiter # UNregister actor from the arbiter
try: try:
if arbiter_addr is not None: if arbiter_addr is not None:
@ -541,7 +557,7 @@ class Actor:
except OSError: except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter") log.warn(f"Unable to unregister {self.name} from arbiter")
async def cancel(self): async def cancel(self) -> None:
"""Cancel this actor. """Cancel this actor.
The sequence in order is: The sequence in order is:
@ -554,23 +570,23 @@ class Actor:
self.cancel_server() self.cancel_server()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
async def cancel_rpc_tasks(self): async def cancel_rpc_tasks(self) -> None:
"""Cancel all existing RPC responder tasks using the cancel scope """Cancel all existing RPC responder tasks using the cancel scope
registered for each. registered for each.
""" """
scopes = self._rpc_tasks tasks = self._rpc_tasks
log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}") log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}")
for chan, scopes in scopes.items(): for chan, scopes in tasks.items():
log.debug(f"Cancelling all tasks for {chan.uid}") log.debug(f"Cancelling all tasks for {chan.uid}")
for scope, func in scopes: for scope, func in scopes:
log.debug(f"Cancelling task for {func}") log.debug(f"Cancelling task for {func}")
scope.cancel() scope.cancel()
if scopes: if tasks:
log.info( log.info(
f"Waiting for remaining rpc tasks to complete {scopes}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._no_more_rpc_tasks.wait() await self._no_more_rpc_tasks.wait()
def cancel_server(self): def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby """Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established. preventing any new inbound connections from being established.
""" """
@ -578,19 +594,22 @@ class Actor:
self._server_nursery.cancel_scope.cancel() self._server_nursery.cancel_scope.cancel()
@property @property
def accept_addr(self): def accept_addr(self) -> Optional[Tuple[str, int]]:
"""Primary address to which the channel server is bound. """Primary address to which the channel server is bound.
""" """
try: try:
return self._listeners[0].socket.getsockname() return self._listeners[0].socket.getsockname()
except OSError: except OSError:
return return None
def get_parent(self): def get_parent(self) -> Portal:
"""Return a portal to our parent actor."""
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chans(self, actorid): def get_chans(self, uid: Tuple[str, str]) -> List[Channel]:
return self._peers[actorid] """Return all channels to the actor with provided uid."""
return self._peers[uid]
class Arbiter(Actor): class Arbiter(Actor):
@ -609,12 +628,16 @@ class Arbiter(Actor):
self._waiters = {} self._waiters = {}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def find_actor(self, name): def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
for uid, sockaddr in self._registry.items(): for uid, sockaddr in self._registry.items():
if name in uid: if name in uid:
return sockaddr return sockaddr
async def wait_for_actor(self, name): return None
async def wait_for_actor(
self, name: str
) -> List[Tuple[str, int]]:
"""Wait for a particular actor to register. """Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently This is a blocking call if no actor by the provided name is currently
@ -635,7 +658,9 @@ class Arbiter(Actor):
return sockaddrs return sockaddrs
def register_actor(self, uid, sockaddr): def register_actor(
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
) -> None:
name, uuid = uid name, uuid = uid
self._registry[uid] = sockaddr self._registry[uid] = sockaddr
@ -645,13 +670,20 @@ class Arbiter(Actor):
for event in events: for event in events:
event.set() event.set()
def unregister_actor(self, uid): def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid, None) self._registry.pop(uid, None)
async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): async def _start_actor(
"""Spawn a local actor by starting a task to execute it's main actor: Actor,
async function. main: typing.Callable[..., typing.Awaitable],
host: str,
port: int,
arbiter_addr: Tuple[str, int],
nursery: trio._core._run.Nursery = None
):
"""Spawn a local actor by starting a task to execute it's main async
function.
Blocks if no nursery is provided, in which case it is expected the nursery Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete. provider is responsible for waiting on the task to complete.
@ -664,29 +696,22 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
log.info(f"Starting local {actor} @ {host}:{port}") log.info(f"Starting local {actor} @ {host}:{port}")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if main is not None:
main_coro = main()
await nursery.start( await nursery.start(
partial( partial(
actor._async_main, actor._async_main,
accept_addr=(host, port), accept_addr=(host, port),
parent_addr=None, parent_addr=None,
arbiter_addr=arbiter_addr, arbiter_addr=arbiter_addr,
_main_coro=main_coro
) )
) )
if main is not None: if main is not None:
result = await main_coro result = await main()
# XXX: If spawned with a dedicated "main function", # XXX: If spawned with a dedicated "main function",
# the actor is cancelled when this context is complete # the actor is cancelled when this context is complete
# given that there are no more active peer channels connected # given that there are no more active peer channels connected
actor.cancel_server() actor.cancel_server()
# block on actor to complete
# unset module state # unset module state
_state._current_actor = None _state._current_actor = None
log.info("Completed async main") log.info("Completed async main")
@ -695,7 +720,9 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
@asynccontextmanager @asynccontextmanager
async def get_arbiter(host, port): async def get_arbiter(
host: str, port: int
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote """Return a portal instance connected to a local or remote
arbiter. arbiter.
""" """
@ -715,9 +742,8 @@ async def get_arbiter(host, port):
@asynccontextmanager @asynccontextmanager
async def find_actor( async def find_actor(
name, name: str, arbiter_sockaddr: Tuple[str, int] = None
arbiter_sockaddr=None, ) -> typing.AsyncGenerator[Optional[Portal], None]:
):
"""Ask the arbiter to find actor(s) by name. """Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor Returns a connected portal to the last registered matching actor
@ -738,9 +764,9 @@ async def find_actor(
@asynccontextmanager @asynccontextmanager
async def wait_for_actor( async def wait_for_actor(
name, name: str,
arbiter_sockaddr=None, arbiter_sockaddr: Tuple[str, int] = None
): ) -> typing.AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter. """Wait on an actor to register with the arbiter.
A portal to the first actor which registered is be returned. A portal to the first actor which registered is be returned.

View File

@ -2,6 +2,9 @@
This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py`` This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py``
with some hackery to prevent any more then a single forkserver and with some hackery to prevent any more then a single forkserver and
semaphore tracker per ``MainProcess``. semaphore tracker per ``MainProcess``.
.. note:: There is no type hinting in this code base (yet) to remain as
a close as possible to upstream.
""" """
import os import os
import socket import socket
@ -12,15 +15,12 @@ import errno
import selectors import selectors
import warnings import warnings
from multiprocessing import ( from multiprocessing import semaphore_tracker, spawn, process # type: ignore
forkserver, semaphore_tracker, spawn, process, util, from multiprocessing import forkserver, util, connection # type: ignore
connection
)
from multiprocessing.forkserver import ( from multiprocessing.forkserver import (
ForkServer, MAXFDS_TO_SEND ForkServer, MAXFDS_TO_SEND
# _serve_one,
) )
from multiprocessing.context import reduction from multiprocessing.context import reduction # type: ignore
# taken from 3.8 # taken from 3.8

View File

@ -1,7 +1,8 @@
""" """
Inter-process comms abstractions Inter-process comms abstractions
""" """
from typing import Coroutine, Tuple import typing
from typing import Any, Tuple, Optional
import msgpack import msgpack
import trio import trio
@ -14,21 +15,21 @@ log = get_logger('ipc')
class StreamQueue: class StreamQueue:
"""Stream wrapped as a queue that delivers ``msgpack`` serialized objects. """Stream wrapped as a queue that delivers ``msgpack`` serialized objects.
""" """
def __init__(self, stream): def __init__(self, stream: trio.SocketStream) -> None:
self.stream = stream self.stream = stream
self._agen = self._iter_packets() self._agen = self._iter_packets()
self._laddr = self.stream.socket.getsockname()[:2] self._laddr = self.stream.socket.getsockname()[:2]
self._raddr = self.stream.socket.getpeername()[:2] self._raddr = self.stream.socket.getpeername()[:2]
self._send_lock = trio.Lock() self._send_lock = trio.Lock()
async def _iter_packets(self): async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream. """Yield packets from the underlying stream.
""" """
unpacker = msgpack.Unpacker(raw=False, use_list=False) unpacker = msgpack.Unpacker(raw=False, use_list=False)
while True: while True:
try: try:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)
log.trace(f"received {data}") log.trace(f"received {data}") # type: ignore
except trio.BrokenStreamError: except trio.BrokenStreamError:
log.error(f"Stream connection {self.raddr} broke") log.error(f"Stream connection {self.raddr} broke")
return return
@ -42,25 +43,25 @@ class StreamQueue:
yield packet yield packet
@property @property
def laddr(self): def laddr(self) -> Tuple[str, int]:
return self._laddr return self._laddr
@property @property
def raddr(self): def raddr(self) -> Tuple[str, int]:
return self._raddr return self._raddr
async def put(self, data): async def put(self, data: Any) -> int:
async with self._send_lock: async with self._send_lock:
return await self.stream.send_all( return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True)) msgpack.dumps(data, use_bin_type=True))
async def get(self): async def get(self) -> Any:
return await self._agen.asend(None) return await self._agen.asend(None)
def __aiter__(self): def __aiter__(self):
return self._agen return self._agen
def connected(self): def connected(self) -> bool:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
@ -72,24 +73,27 @@ class Channel:
""" """
def __init__( def __init__(
self, self,
destaddr: tuple = None, destaddr: Optional[Tuple[str, int]] = None,
on_reconnect: Coroutine = None, on_reconnect: typing.Callable[..., typing.Awaitable] = None,
auto_reconnect: bool = False, auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active stream: trio.SocketStream = None, # expected to be active
) -> None: ) -> None:
self._recon_seq = on_reconnect self._recon_seq = on_reconnect
self._autorecon = auto_reconnect self._autorecon = auto_reconnect
self.squeue = StreamQueue(stream) if stream else None self.squeue: Optional[StreamQueue] = StreamQueue(
stream) if stream else None
if self.squeue and destaddr: if self.squeue and destaddr:
raise ValueError( raise ValueError(
f"A stream was provided with local addr {self.laddr}" f"A stream was provided with local addr {self.laddr}"
) )
self._destaddr = destaddr or self.squeue.raddr self._destaddr = self.squeue.raddr if self.squeue else destaddr
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid = None self.uid: Optional[Tuple[str, str]] = None
# set if far end actor errors internally
self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
def __repr__(self): def __repr__(self) -> str:
if self.squeue: if self.squeue:
return repr( return repr(
self.squeue.stream.socket._sock).replace( self.squeue.stream.socket._sock).replace(
@ -97,14 +101,16 @@ class Channel:
return object.__repr__(self) return object.__repr__(self)
@property @property
def laddr(self): def laddr(self) -> Optional[Tuple[str, int]]:
return self.squeue.laddr if self.squeue else (None, None) return self.squeue.laddr if self.squeue else None
@property @property
def raddr(self): def raddr(self) -> Optional[Tuple[str, int]]:
return self.squeue.raddr if self.squeue else (None, None) return self.squeue.raddr if self.squeue else None
async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): async def connect(
self, destaddr: Tuple[str, int] = None, **kwargs
) -> trio.SocketStream:
if self.connected(): if self.connected():
raise RuntimeError("channel is already connected?") raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
@ -112,11 +118,13 @@ class Channel:
self.squeue = StreamQueue(stream) self.squeue = StreamQueue(stream)
return stream return stream
async def send(self, item): async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") log.trace(f"send `{item}`") # type: ignore
assert self.squeue
await self.squeue.put(item) await self.squeue.put(item)
async def recv(self): async def recv(self) -> Any:
assert self.squeue
try: try:
return await self.squeue.get() return await self.squeue.get()
except trio.BrokenStreamError: except trio.BrokenStreamError:
@ -124,8 +132,9 @@ class Channel:
await self._reconnect() await self._reconnect()
return await self.recv() return await self.recv()
async def aclose(self, *args): async def aclose(self) -> None:
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
assert self.squeue
await self.squeue.stream.aclose() await self.squeue.stream.aclose()
async def __aenter__(self): async def __aenter__(self):
@ -138,7 +147,7 @@ class Channel:
def __aiter__(self): def __aiter__(self):
return self._agen return self._agen
async def _reconnect(self): async def _reconnect(self) -> None:
"""Handle connection failures by polling until a reconnect can be """Handle connection failures by polling until a reconnect can be
established. established.
""" """
@ -167,9 +176,12 @@ class Channel:
" for re-establishment") " for re-establishment")
await trio.sleep(1) await trio.sleep(1)
async def _aiter_recv(self): async def _aiter_recv(
self
) -> typing.AsyncGenerator[Any, None]:
"""Async iterate items from underlying stream. """Async iterate items from underlying stream.
""" """
assert self.squeue
while True: while True:
try: try:
async for item in self.squeue: async for item in self.squeue:
@ -189,14 +201,16 @@ class Channel:
else: else:
return return
def connected(self): def connected(self) -> bool:
return self.squeue.connected() if self.squeue else False return self.squeue.connected() if self.squeue else False
@asynccontextmanager @asynccontextmanager
async def _connect_chan(host, port): async def _connect_chan(
"""Create and connect a channel with disconnect on host: str, port: int
context manager teardown. ) -> typing.AsyncGenerator[Channel, None]:
"""Create and connect a channel with disconnect on context manager
teardown.
""" """
chan = Channel((host, port)) chan = Channel((host, port))
await chan.connect() await chan.connect()

View File

@ -2,11 +2,14 @@
Portal api Portal api
""" """
import importlib import importlib
import typing
from typing import Tuple, Any, Dict, Optional
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from ._state import current_actor from ._state import current_actor
from ._ipc import Channel
from .log import get_logger from .log import get_logger
@ -18,7 +21,7 @@ class RemoteActorError(RuntimeError):
@asynccontextmanager @asynccontextmanager
async def maybe_open_nursery(nursery=None): async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
"""Create a new nursery if None provided. """Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided. Blocks on exit as expected if no input nursery is provided.
@ -30,9 +33,12 @@ async def maybe_open_nursery(nursery=None):
yield nursery yield nursery
async def _do_handshake(actor, chan): async def _do_handshake(
actor: 'Actor', # type: ignore
chan: Channel
)-> Any:
await chan.send(actor.uid) await chan.send(actor.uid)
uid = await chan.recv() uid: Tuple[str, str] = await chan.recv()
if not isinstance(uid, tuple): if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!") raise ValueError(f"{uid} is not a valid uid?!")
@ -51,22 +57,26 @@ class Portal:
Think of this like an native async IPC API. Think of this like an native async IPC API.
""" """
def __init__(self, channel): def __init__(self, channel: Channel) -> None:
self.channel = channel self.channel = channel
# when this is set to a tuple returned from ``_submit()`` then # when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point # it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result = None self._result = None
self._exc = None self._exc: Optional[RemoteActorError] = None
self._expect_result = None self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]]
] = None
async def aclose(self): async def aclose(self) -> None:
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
# XXX: won't work until https://github.com/python-trio/trio/pull/460 # XXX: won't work until https://github.com/python-trio/trio/pull/460
# gets in! # gets in!
await self.channel.aclose() await self.channel.aclose()
async def _submit(self, ns, func, **kwargs): async def _submit(
self, ns: str, func: str, **kwargs
) -> Tuple[str, trio.Queue, str, Dict[str, Any]]:
"""Submit a function to be scheduled and run by actor, return the """Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str, associated caller id, response queue, response type str,
first message packet as a tuple. first message packet as a tuple.
@ -93,12 +103,12 @@ class Portal:
return cid, q, resp_type, first_msg return cid, q, resp_type, first_msg
async def _submit_for_result(self, ns, func, **kwargs): async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
assert self._expect_result is None, \ assert self._expect_result is None, \
"A pending main result has already been submitted" "A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, **kwargs) self._expect_result = await self._submit(ns, func, **kwargs)
async def run(self, ns, func, **kwargs): async def run(self, ns: str, func: str, **kwargs) -> Any:
"""Submit a function to be scheduled and run by actor, wrap and return """Submit a function to be scheduled and run by actor, wrap and return
its (stream of) result(s). its (stream of) result(s).
@ -108,7 +118,9 @@ class Portal:
*(await self._submit(ns, func, **kwargs)) *(await self._submit(ns, func, **kwargs))
) )
async def _return_from_resptype(self, cid, q, resptype, first_msg): async def _return_from_resptype(
self, cid: str, q: trio.Queue, resptype: str, first_msg: dict
) -> Any:
# TODO: not this needs some serious work and thinking about how # TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels! # to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff) # (think `yield from`, `gen.send()`, and functional reactive stuff)
@ -145,7 +157,7 @@ class Portal:
else: else:
raise ValueError(f"Unknown msg response type: {first_msg}") raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self): async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._expect_result is None: if self._expect_result is None:
@ -153,7 +165,7 @@ class Portal:
# teardown can reraise them # teardown can reraise them
exc = self.channel._exc exc = self.channel._exc
if exc: if exc:
raise RemoteActorError(f"{self.channel.uid}\n" + exc) raise RemoteActorError(f"{self.channel.uid}\n{exc}")
else: else:
raise RuntimeError( raise RuntimeError(
f"Portal for {self.channel.uid} is not expecting a final" f"Portal for {self.channel.uid} is not expecting a final"
@ -165,13 +177,13 @@ class Portal:
) )
return self._result return self._result
async def close(self): async def close(self) -> None:
# trigger remote msg loop `break` # trigger remote msg loop `break`
chan = self.channel chan = self.channel
log.debug(f"Closing portal for {chan} to {chan.uid}") log.debug(f"Closing portal for {chan} to {chan.uid}")
await self.channel.send(None) await self.channel.send(None)
async def cancel_actor(self): async def cancel_actor(self) -> bool:
"""Cancel the actor on the other end of this portal. """Cancel the actor on the other end of this portal.
""" """
log.warn( log.warn(
@ -198,19 +210,24 @@ class LocalPortal:
A compatibility shim for normal portals but for invoking functions A compatibility shim for normal portals but for invoking functions
using an in process actor instance. using an in process actor instance.
""" """
def __init__(self, actor): def __init__(
self,
actor: 'Actor' # type: ignore
) -> None:
self.actor = actor self.actor = actor
async def run(self, ns, func, **kwargs): async def run(self, ns: str, func: str, **kwargs) -> Any:
"""Run a requested function locally and return it's result. """Run a requested function locally and return it's result.
""" """
obj = self.actor if ns == 'self' else importlib.import_module(ns) obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func) return getattr(obj, func)(**kwargs)
return func(**kwargs)
@asynccontextmanager @asynccontextmanager
async def open_portal(channel, nursery=None): async def open_portal(
channel: Channel,
nursery: trio._core._run.Nursery = None
) -> typing.AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. """Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing. Spawns a background task to handle message processing.

View File

@ -1,10 +1,13 @@
""" """
Per process state Per process state
""" """
_current_actor = None from typing import Optional
def current_actor() -> 'Actor': _current_actor: Optional['Actor'] = None # type: ignore
def current_actor() -> 'Actor': # type: ignore
"""Get the process-local actor instance. """Get the process-local actor instance.
""" """
if not _current_actor: if not _current_actor:

View File

@ -3,7 +3,9 @@
""" """
import multiprocessing as mp import multiprocessing as mp
import inspect import inspect
from multiprocessing import forkserver, semaphore_tracker from multiprocessing import forkserver, semaphore_tracker # type: ignore
from typing import Tuple, List, Dict, Optional, Any
import typing
import trio import trio
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
@ -23,14 +25,17 @@ log = get_logger('tractor')
class ActorNursery: class ActorNursery:
"""Spawn scoped subprocess actors. """Spawn scoped subprocess actors.
""" """
def __init__(self, actor, supervisor=None): def __init__(self, actor: Actor) -> None:
self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor = actor self._actor: Actor = actor
self._children = {} self._children: Dict[
Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]]
] = {}
# portals spawned with ``run_in_actor()`` # portals spawned with ``run_in_actor()``
self._cancel_after_result_on_exit = set() self._cancel_after_result_on_exit: set = set()
self.cancelled = False self.cancelled: bool = False
self._forkserver = None self._forkserver: forkserver.ForkServer = None
async def __aenter__(self): async def __aenter__(self):
return self return self
@ -38,11 +43,11 @@ class ActorNursery:
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
bind_addr=('127.0.0.1', 0), bind_addr: Tuple[str, int] = ('127.0.0.1', 0),
statespace=None, statespace: Optional[Dict[str, Any]] = None,
rpc_module_paths=None, rpc_module_paths: List[str] = None,
loglevel=None, # set log level per subactor loglevel: str = None, # set log level per subactor
): ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
actor = Actor( actor = Actor(
name, name,
@ -70,6 +75,7 @@ class ActorNursery:
semaphore_tracker._semaphore_tracker._fd, semaphore_tracker._semaphore_tracker._fd,
) )
else: else:
assert self._actor._forkserver_info
fs_info = ( fs_info = (
fs._forkserver_address, fs._forkserver_address,
fs._forkserver_alive_fd, fs._forkserver_alive_fd,
@ -87,7 +93,7 @@ class ActorNursery:
# register the process before start in case we get a cancel # register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait # request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request # for it to fully come up before sending a cancel request
self._children[actor.uid] = [actor, proc, None] self._children[actor.uid] = (actor, proc, None)
proc.start() proc.start()
if not proc.is_alive(): if not proc.is_alive():
@ -99,19 +105,19 @@ class ActorNursery:
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid) event, chan = await self._actor.wait_for_peer(actor.uid)
portal = Portal(chan) portal = Portal(chan)
self._children[actor.uid][2] = portal self._children[actor.uid] = (actor, proc, portal)
return portal return portal
async def run_in_actor( async def run_in_actor(
self, self,
name, name: str,
fn, fn: typing.Callable,
bind_addr=('127.0.0.1', 0), bind_addr: Tuple[str, int] = ('127.0.0.1', 0),
rpc_module_paths=None, rpc_module_paths: List[str] = None,
statespace=None, statespace: dict = None,
loglevel=None, # set log level per subactor loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
): ) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and """Spawn a new actor, run a lone task, then terminate the actor and
return its result. return its result.
@ -134,7 +140,7 @@ class ActorNursery:
) )
return portal return portal
async def wait(self): async def wait(self) -> None:
"""Wait for all subactors to complete. """Wait for all subactors to complete.
""" """
async def maybe_consume_result(portal, actor): async def maybe_consume_result(portal, actor):
@ -154,7 +160,12 @@ class ActorNursery:
async for item in agen: async for item in agen:
log.debug(f"Consuming item {item}") log.debug(f"Consuming item {item}")
async def wait_for_proc(proc, actor, portal, cancel_scope): async def wait_for_proc(
proc: mp.Process,
actor: Actor,
portal: Portal,
cancel_scope: trio._core._run.CancelScope,
) -> None:
# TODO: timeout block here? # TODO: timeout block here?
if proc.is_alive(): if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel) await trio.hazmat.wait_readable(proc.sentinel)
@ -171,9 +182,10 @@ class ActorNursery:
cancel_scope.cancel() cancel_scope.cancel()
async def wait_for_actor( async def wait_for_actor(
portal, actor, portal: Portal,
actor: Actor,
task_status=trio.TASK_STATUS_IGNORED, task_status=trio.TASK_STATUS_IGNORED,
): ) -> None:
# cancel the actor gracefully # cancel the actor gracefully
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
task_status.started(cs) task_status.started(cs)
@ -193,7 +205,7 @@ class ActorNursery:
cs = await nursery.start(wait_for_actor, portal, subactor) cs = await nursery.start(wait_for_actor, portal, subactor)
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
async def cancel(self, hard_kill=False): async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel """Cancel this nursery by instructing each subactor to cancel
iteslf and wait for all subprocesses to terminate. iteslf and wait for all subprocesses to terminate.
@ -230,6 +242,7 @@ class ActorNursery:
do_hard_kill(proc) do_hard_kill(proc)
# spawn cancel tasks async # spawn cancel tasks async
assert portal
n.start_soon(portal.cancel_actor) n.start_soon(portal.cancel_actor)
log.debug(f"Waiting on all subactors to complete") log.debug(f"Waiting on all subactors to complete")
@ -274,7 +287,7 @@ class ActorNursery:
@asynccontextmanager @asynccontextmanager
async def open_nursery(supervisor=None): async def open_nursery() -> typing.AsyncGenerator[None, ActorNursery]:
"""Create and yield a new ``ActorNursery``. """Create and yield a new ``ActorNursery``.
""" """
actor = current_actor() actor = current_actor()
@ -282,5 +295,5 @@ async def open_nursery(supervisor=None):
raise RuntimeError("No actor instance has been defined yet?") raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang # TODO: figure out supervisors from erlang
async with ActorNursery(current_actor(), supervisor) as nursery: async with ActorNursery(current_actor()) as nursery:
yield nursery yield nursery

View File

@ -2,9 +2,10 @@
Log like a forester! Log like a forester!
""" """
from functools import partial from functools import partial
import sys
import logging import logging
import colorlog import colorlog # type: ignore
from typing import Optional
_proj_name = 'tractor' _proj_name = 'tractor'
_default_loglevel = None _default_loglevel = None
@ -69,28 +70,22 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger:
log = get_logger(name) # our root logger log = get_logger(name) # our root logger
if not level: if not level:
return return log
log.setLevel(level.upper() if not isinstance(level, int) else level) log.setLevel(level.upper() if not isinstance(level, int) else level)
handler = logging.StreamHandler()
if not any( formatter = colorlog.ColoredFormatter(
handler.stream == sys.stderr for handler in log.handlers LOG_FORMAT,
if getattr(handler, 'stream', None) datefmt=DATE_FORMAT,
): log_colors=STD_PALETTE,
handler = logging.StreamHandler() secondary_log_colors=BOLD_PALETTE,
style='{',
formatter = colorlog.ColoredFormatter( )
LOG_FORMAT, handler.setFormatter(formatter)
datefmt=DATE_FORMAT, log.addHandler(handler)
log_colors=STD_PALETTE,
secondary_log_colors=BOLD_PALETTE,
style='{',
)
handler.setFormatter(formatter)
log.addHandler(handler)
return log return log
def get_loglevel(): def get_loglevel() -> Optional[str]:
return _default_loglevel return _default_loglevel