From b0ceb308bafa5b11adf09d9744313ed349ccfd32 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 19 Aug 2018 22:13:13 -0400 Subject: [PATCH 1/6] Add type annotations to most functions This is purely for documentation purposes for now as it should be obvious a bunch of the signatures aren't using the correct "generics" syntax (i.e. the use of `(str, int)` instead of `typing.Tuple[str, int])`) in a bunch of places. We're also not using a type checker yet and besides, `trio` doesn't really expose a lot of its internal types very well. 2SQASH --- tractor/__init__.py | 19 +++++-- tractor/_actor.py | 130 +++++++++++++++++++++---------------------- tractor/_ipc.py | 50 +++++++++-------- tractor/_portal.py | 36 +++++++----- tractor/_trionics.py | 37 ++++++------ tractor/log.py | 2 +- 6 files changed, 148 insertions(+), 126 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index f5146ad..569760f 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -3,6 +3,7 @@ tractor: An actor model micro-framework built on ``trio`` and ``multiprocessing``. """ from functools import partial +import typing import trio @@ -32,7 +33,13 @@ _default_arbiter_host = '127.0.0.1' _default_arbiter_port = 1616 -async def _main(async_fn, args, kwargs, name, arbiter_addr): +async def _main( + async_fn: typing.Callable[..., typing.Awaitable], + args: tuple, + kwargs: dict, + name: str, + arbiter_addr: (str, int) +) -> typing.Any: """Async entry point for ``tractor``. """ log = get_logger('tractor') @@ -73,11 +80,11 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): def run( - async_fn, - *args, - name=None, - arbiter_addr=(_default_arbiter_host, _default_arbiter_port), - **kwargs + async_fn: typing.Callable[..., typing.Awaitable], + *args: ..., + name: str = None, + arbiter_addr: (str, int) = (_default_arbiter_host, _default_arbiter_port), + **kwargs: ... ): """Run a trio-actor async function in process. diff --git a/tractor/_actor.py b/tractor/_actor.py index a5e833f..4be6381 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1,13 +1,14 @@ """ Actor primitives and helpers """ -import inspect -import importlib from collections import defaultdict from functools import partial -import traceback -import uuid from itertools import chain +import importlib +import inspect +import traceback +import typing +import uuid import trio from async_generator import asynccontextmanager, aclosing @@ -36,8 +37,11 @@ class InternalActorError(RuntimeError): async def _invoke( - actor, cid, chan, func, kwargs, - treat_as_gen=False, + actor: 'Actor', + cid: str, + chan: Channel, + func: typing.Callable, + kwargs: dict, task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. @@ -169,7 +173,7 @@ class Actor: self._accept_host = None self._forkserver_info = None - async def wait_for_peer(self, uid): + async def wait_for_peer(self, uid: (str, str)) -> (trio.Event, Channel): """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -179,7 +183,7 @@ class Actor: log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] - def load_namespaces(self): + def load_namespaces(self) -> None: # We load namespaces after fork since this actor may # be spawned on a different machine from the original nursery # and we need to try and load the local module code (if it @@ -198,7 +202,7 @@ class Actor: async def _stream_handler( self, stream: trio.SocketStream, - ): + ) -> None: """Entry point for new inbound connections to the channel server. """ self._no_more_peers.clear() @@ -256,19 +260,21 @@ class Actor: await chan.send(None) await chan.aclose() - async def _push_result(self, actorid, cid, msg): + async def _push_result(self, actorid, cid: str, msg: dict) -> None: assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await q.put(msg) - def get_waitq(self, actorid, cid): + def get_waitq(self, actorid: (str, str), cid: str) -> trio.Queue: log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) - async def send_cmd(self, chan, ns, func, kwargs): + async def send_cmd( + self, chan: Channel, ns: str, func: str, kwargs: dict + ) -> (str, trio.Queue): """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. @@ -279,7 +285,9 @@ class Actor: await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, q - async def _process_messages(self, chan, treat_as_gen=False): + async def _process_messages( + self, chan: Channel, treat_as_gen: bool = False + ) -> None: """Process messages async-RPC style. Process rpc requests and deliver retrieved responses from channels. @@ -371,7 +379,11 @@ class Actor: finally: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - def _fork_main(self, accept_addr, forkserver_info, parent_addr=None): + def _fork_main( + self, accept_addr: (str, int), + forkserver_info: tuple, + parent_addr: (str, int) = None + ) -> None: # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") self._forkserver_info = forkserver_info @@ -391,22 +403,17 @@ class Actor: async def _async_main( self, - accept_addr, - arbiter_addr=None, - parent_addr=None, - _main_coro=None, - task_status=trio.TASK_STATUS_IGNORED, - ): + accept_addr: (str, int), + arbiter_addr: (str, int) = None, + parent_addr: (str, int) = None, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: """Start the channel server, maybe connect back to the parent, and start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ - # if this is the `MainProcess` then we get a ref to the main - # task's coroutine object for tossing errors into - self._main_coro = _main_coro - arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False try: @@ -422,12 +429,6 @@ class Actor: self._serve_forever, accept_host=host, accept_port=port) ) - # XXX: I wonder if a better name is maybe "requester" - # since I don't think the notion of a "parent" actor - # necessarily sticks given that eventually we want - # ``'MainProcess'`` (the actor who initially starts the - # forkserver) to eventually be the only one who is - # allowed to spawn new processes per Python program. if parent_addr is not None: try: # Connect back to the parent actor and conduct initial @@ -502,10 +503,10 @@ class Actor: self, *, # (host, port) to bind for channel server - accept_host=None, - accept_port=0, - task_status=trio.TASK_STATUS_IGNORED - ): + accept_host: (str, int) = None, + accept_port: int = 0, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: """Start the channel server, begin listening for new connections. This will cause an actor to continue living (blocking) until @@ -531,7 +532,7 @@ class Actor: self._listeners.extend(listeners) task_status.started() - async def _do_unreg(self, arbiter_addr): + async def _do_unreg(self, arbiter_addr: (str, int)) -> None: # UNregister actor from the arbiter try: if arbiter_addr is not None: @@ -541,7 +542,7 @@ class Actor: except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") - async def cancel(self): + async def cancel(self) -> None: """Cancel this actor. The sequence in order is: @@ -554,7 +555,7 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() - async def cancel_rpc_tasks(self): + async def cancel_rpc_tasks(self) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ @@ -570,7 +571,7 @@ class Actor: f"Waiting for remaining rpc tasks to complete {scopes}") await self._no_more_rpc_tasks.wait() - def cancel_server(self): + def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. """ @@ -578,7 +579,7 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self): + def accept_addr(self) -> (str, int): """Primary address to which the channel server is bound. """ try: @@ -586,11 +587,13 @@ class Actor: except OSError: return - def get_parent(self): + def get_parent(self) -> Portal: + """Return a portal to our parent actor.""" return Portal(self._parent_chan) - def get_chans(self, actorid): - return self._peers[actorid] + def get_chans(self, uid: (str, str)) -> [Channel]: + """Return all channels to the actor with provided uid.""" + return self._peers[uid] class Arbiter(Actor): @@ -609,12 +612,12 @@ class Arbiter(Actor): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name): + def find_actor(self, name: str) -> (str, int): for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr - async def wait_for_actor(self, name): + async def wait_for_actor(self, name: str) -> [(str, int)]: """Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently @@ -635,7 +638,7 @@ class Arbiter(Actor): return sockaddrs - def register_actor(self, uid, sockaddr): + def register_actor(self, uid: (str, str), sockaddr: (str, int)) -> None: name, uuid = uid self._registry[uid] = sockaddr @@ -645,13 +648,20 @@ class Arbiter(Actor): for event in events: event.set() - def unregister_actor(self, uid): + def unregister_actor(self, uid: (str, str)) -> None: self._registry.pop(uid, None) -async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): - """Spawn a local actor by starting a task to execute it's main - async function. +async def _start_actor( + actor: Actor, + main: typing.Coroutine, + host: str, + port: int, + arbiter_addr: (str, int), + nursery: trio._core._run.Nursery = None +): + """Spawn a local actor by starting a task to execute it's main async + function. Blocks if no nursery is provided, in which case it is expected the nursery provider is responsible for waiting on the task to complete. @@ -664,29 +674,22 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): log.info(f"Starting local {actor} @ {host}:{port}") async with trio.open_nursery() as nursery: - - if main is not None: - main_coro = main() - await nursery.start( partial( actor._async_main, accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, - _main_coro=main_coro ) ) if main is not None: - result = await main_coro + result = await main() # XXX: If spawned with a dedicated "main function", # the actor is cancelled when this context is complete # given that there are no more active peer channels connected actor.cancel_server() - # block on actor to complete - # unset module state _state._current_actor = None log.info("Completed async main") @@ -695,7 +698,7 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): @asynccontextmanager -async def get_arbiter(host, port): +async def get_arbiter(host: str, port: int) -> Portal: """Return a portal instance connected to a local or remote arbiter. """ @@ -714,10 +717,7 @@ async def get_arbiter(host, port): @asynccontextmanager -async def find_actor( - name, - arbiter_sockaddr=None, -): +async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal: """Ask the arbiter to find actor(s) by name. Returns a connected portal to the last registered matching actor @@ -738,9 +738,9 @@ async def find_actor( @asynccontextmanager async def wait_for_actor( - name, - arbiter_sockaddr=None, -): + name: str, + arbiter_sockaddr: (str, int) = None +) -> Portal: """Wait on an actor to register with the arbiter. A portal to the first actor which registered is be returned. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 2d86e56..87ef4e3 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,7 +1,7 @@ """ Inter-process comms abstractions """ -from typing import Coroutine, Tuple +import typing import msgpack import trio @@ -14,14 +14,14 @@ log = get_logger('ipc') class StreamQueue: """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. """ - def __init__(self, stream): + def __init__(self, stream: trio.SocketStream): self.stream = stream self._agen = self._iter_packets() self._laddr = self.stream.socket.getsockname()[:2] self._raddr = self.stream.socket.getpeername()[:2] self._send_lock = trio.Lock() - async def _iter_packets(self): + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ unpacker = msgpack.Unpacker(raw=False, use_list=False) @@ -42,25 +42,25 @@ class StreamQueue: yield packet @property - def laddr(self): + def laddr(self) -> (str, int): return self._laddr @property - def raddr(self): + def raddr(self) -> (str, int): return self._raddr - async def put(self, data): + async def put(self, data: typing.Any) -> int: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) - async def get(self): + async def get(self) -> typing.Any: return await self._agen.asend(None) def __aiter__(self): return self._agen - def connected(self): + def connected(self) -> bool: return self.stream.socket.fileno() != -1 @@ -73,7 +73,7 @@ class Channel: def __init__( self, destaddr: tuple = None, - on_reconnect: Coroutine = None, + on_reconnect: typing.Coroutine = None, auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active ) -> None: @@ -89,7 +89,7 @@ class Channel: self.uid = None self._agen = self._aiter_recv() - def __repr__(self): + def __repr__(self) -> str: if self.squeue: return repr( self.squeue.stream.socket._sock).replace( @@ -97,14 +97,16 @@ class Channel: return object.__repr__(self) @property - def laddr(self): + def laddr(self) -> (str, int): return self.squeue.laddr if self.squeue else (None, None) @property - def raddr(self): + def raddr(self) -> (str, int): return self.squeue.raddr if self.squeue else (None, None) - async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): + async def connect( + self, destaddr: typing.Tuple[str, int] = None, **kwargs + ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr @@ -112,11 +114,11 @@ class Channel: self.squeue = StreamQueue(stream) return stream - async def send(self, item): + async def send(self, item: typing.Any) -> None: log.trace(f"send `{item}`") await self.squeue.put(item) - async def recv(self): + async def recv(self) -> typing.Any: try: return await self.squeue.get() except trio.BrokenStreamError: @@ -124,7 +126,7 @@ class Channel: await self._reconnect() return await self.recv() - async def aclose(self, *args): + async def aclose(self) -> None: log.debug(f"Closing {self}") await self.squeue.stream.aclose() @@ -138,7 +140,7 @@ class Channel: def __aiter__(self): return self._agen - async def _reconnect(self): + async def _reconnect(self) -> None: """Handle connection failures by polling until a reconnect can be established. """ @@ -167,7 +169,9 @@ class Channel: " for re-establishment") await trio.sleep(1) - async def _aiter_recv(self): + async def _aiter_recv( + self + ) -> typing.AsyncGenerator[typing.Any, None]: """Async iterate items from underlying stream. """ while True: @@ -189,14 +193,16 @@ class Channel: else: return - def connected(self): + def connected(self) -> bool: return self.squeue.connected() if self.squeue else False @asynccontextmanager -async def _connect_chan(host, port): - """Create and connect a channel with disconnect on - context manager teardown. +async def _connect_chan( + host: str, port: int +) -> typing.AsyncContextManager[Channel]: + """Create and connect a channel with disconnect on context manager + teardown. """ chan = Channel((host, port)) await chan.connect() diff --git a/tractor/_portal.py b/tractor/_portal.py index 835f918..6087509 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -2,6 +2,7 @@ Portal api """ import importlib +import typing import trio from async_generator import asynccontextmanager @@ -18,7 +19,7 @@ class RemoteActorError(RuntimeError): @asynccontextmanager -async def maybe_open_nursery(nursery=None): +async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -30,7 +31,7 @@ async def maybe_open_nursery(nursery=None): yield nursery -async def _do_handshake(actor, chan): +async def _do_handshake(actor: 'Actor', chan: 'Channel') -> (str, str): await chan.send(actor.uid) uid = await chan.recv() @@ -51,7 +52,7 @@ class Portal: Think of this like an native async IPC API. """ - def __init__(self, channel): + def __init__(self, channel: 'Channel'): self.channel = channel # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point @@ -60,13 +61,15 @@ class Portal: self._exc = None self._expect_result = None - async def aclose(self): + async def aclose(self) -> None: log.debug(f"Closing {self}") # XXX: won't work until https://github.com/python-trio/trio/pull/460 # gets in! await self.channel.aclose() - async def _submit(self, ns, func, **kwargs): + async def _submit( + self, ns: str, func: str, **kwargs + ) -> (str, trio.Queue, str, dict): """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -93,12 +96,12 @@ class Portal: return cid, q, resp_type, first_msg - async def _submit_for_result(self, ns, func, **kwargs): + async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, **kwargs) - async def run(self, ns, func, **kwargs): + async def run(self, ns: str, func: str, **kwargs) -> typing.Any: """Submit a function to be scheduled and run by actor, wrap and return its (stream of) result(s). @@ -108,7 +111,9 @@ class Portal: *(await self._submit(ns, func, **kwargs)) ) - async def _return_from_resptype(self, cid, q, resptype, first_msg): + async def _return_from_resptype( + self, cid: str, q: trio.Queue, resptype: str, first_msg: dict + ) -> typing.Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) @@ -145,7 +150,7 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") - async def result(self): + async def result(self) -> typing.Any: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: @@ -165,13 +170,13 @@ class Portal: ) return self._result - async def close(self): + async def close(self) -> None: # trigger remote msg loop `break` chan = self.channel log.debug(f"Closing portal for {chan} to {chan.uid}") await self.channel.send(None) - async def cancel_actor(self): + async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ log.warn( @@ -198,10 +203,10 @@ class LocalPortal: A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__(self, actor): + def __init__(self, actor: 'Actor'): self.actor = actor - async def run(self, ns, func, **kwargs): + async def run(self, ns: str, func: str, **kwargs) -> typing.Any: """Run a requested function locally and return it's result. """ obj = self.actor if ns == 'self' else importlib.import_module(ns) @@ -210,7 +215,10 @@ class LocalPortal: @asynccontextmanager -async def open_portal(channel, nursery=None): +async def open_portal( + channel: 'Channel', + nursery: trio._core._run.Nursery = None +) -> typing.AsyncContextManager[Portal]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing. diff --git a/tractor/_trionics.py b/tractor/_trionics.py index d78cbdd..310fea9 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -4,6 +4,7 @@ import multiprocessing as mp import inspect from multiprocessing import forkserver, semaphore_tracker +import typing import trio from async_generator import asynccontextmanager, aclosing @@ -23,8 +24,8 @@ log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor, supervisor=None): - self.supervisor = supervisor # TODO + def __init__(self, actor: Actor): + # self.supervisor = supervisor # TODO self._actor = actor self._children = {} # portals spawned with ``run_in_actor()`` @@ -38,11 +39,11 @@ class ActorNursery: async def start_actor( self, name: str, - bind_addr=('127.0.0.1', 0), - statespace=None, - rpc_module_paths=None, - loglevel=None, # set log level per subactor - ): + bind_addr: (str, int) = ('127.0.0.1', 0), + statespace: dict = None, + rpc_module_paths: [str] = None, + loglevel: str = None, # set log level per subactor + ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() actor = Actor( name, @@ -104,14 +105,14 @@ class ActorNursery: async def run_in_actor( self, - name, - fn, - bind_addr=('127.0.0.1', 0), - rpc_module_paths=None, - statespace=None, - loglevel=None, # set log level per subactor + name: str, + fn: typing.Callable, + bind_addr: (str, int) = ('127.0.0.1', 0), + rpc_module_paths: [str] = None, + statespace: dict = None, + loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` - ): + ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and return its result. @@ -134,7 +135,7 @@ class ActorNursery: ) return portal - async def wait(self): + async def wait(self) -> None: """Wait for all subactors to complete. """ async def maybe_consume_result(portal, actor): @@ -193,7 +194,7 @@ class ActorNursery: cs = await nursery.start(wait_for_actor, portal, subactor) nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) - async def cancel(self, hard_kill=False): + async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel iteslf and wait for all subprocesses to terminate. @@ -274,7 +275,7 @@ class ActorNursery: @asynccontextmanager -async def open_nursery(supervisor=None): +async def open_nursery() -> typing.AsyncContextManager[ActorNursery]: """Create and yield a new ``ActorNursery``. """ actor = current_actor() @@ -282,5 +283,5 @@ async def open_nursery(supervisor=None): raise RuntimeError("No actor instance has been defined yet?") # TODO: figure out supervisors from erlang - async with ActorNursery(current_actor(), supervisor) as nursery: + async with ActorNursery(current_actor()) as nursery: yield nursery diff --git a/tractor/log.py b/tractor/log.py index 3b013d1..d3e2a7f 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -92,5 +92,5 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: return log -def get_loglevel(): +def get_loglevel() -> str: return _default_loglevel From c3eee1f228bc4eeaf2ce86e1a9046a3cde5b379a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 21 Aug 2018 00:10:24 -0400 Subject: [PATCH 2/6] Move "treat_as_gen" detection into `_invoke()` --- tractor/_actor.py | 49 +++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 4be6381..270a339 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -46,6 +46,19 @@ async def _invoke( ): """Invoke local func and return results over provided channel. """ + sig = inspect.signature(func) + treat_as_gen = False + if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" + kwargs['chan'] = chan + kwargs['cid'] = cid + # TODO: eventually we want to be more stringent + # about what is considered a far-end async-generator. + # Right now both actual async gens and any async + # function which declares a `chan` kwarg in its + # signature will be treated as one. + treat_as_gen = True try: is_async_partial = False is_async_gen_partial = False @@ -183,11 +196,13 @@ class Actor: log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] - def load_namespaces(self) -> None: - # We load namespaces after fork since this actor may - # be spawned on a different machine from the original nursery - # and we need to try and load the local module code (if it - # exists) + def load_modules(self) -> None: + """Load allowed RPC modules locally (after fork). + + Since this actor may be spawned on a different machine from + the original nursery we need to try and load the local module + code (if it exists). + """ for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) @@ -261,6 +276,8 @@ class Actor: await chan.aclose() async def _push_result(self, actorid, cid: str, msg: dict) -> None: + """Push an RPC result to the local consumer's queue. + """ assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") @@ -288,9 +305,9 @@ class Actor: async def _process_messages( self, chan: Channel, treat_as_gen: bool = False ) -> None: - """Process messages async-RPC style. + """Process messages for the channel async-RPC style. - Process rpc requests and deliver retrieved responses from channels. + Receive multiplexed RPC requests and deliver responses over ``chan``. """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! @@ -336,23 +353,9 @@ class Actor: func = getattr(self._mods[ns], funcname) # spin up a task for the requested function - sig = inspect.signature(func) - treat_as_gen = False - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its - # signature will be treated as one. - treat_as_gen = True - log.debug(f"Spawning task for {func}") cs = await self._root_nursery.start( - _invoke, self, cid, chan, func, kwargs, treat_as_gen, + _invoke, self, cid, chan, func, kwargs, name=funcname ) # never allow cancelling cancel requests (results in @@ -421,7 +424,7 @@ class Actor: self._root_nursery = nursery # load allowed RPC module - self.load_namespaces() + self.load_modules() # Startup up channel server host, port = accept_addr From 11cbf9ea5542f700932da876b102c8fab64ba407 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Aug 2018 13:12:29 -0400 Subject: [PATCH 3/6] Use proper `typing` annotations --- tractor/__init__.py | 14 ++++---- tractor/_actor.py | 58 +++++++++++++++++++-------------- tractor/_forkserver_hackzorz.py | 3 ++ tractor/_ipc.py | 8 ++--- tractor/_portal.py | 6 ++-- 5 files changed, 52 insertions(+), 37 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 569760f..eccd5f7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -5,7 +5,7 @@ tractor: An actor model micro-framework built on from functools import partial import typing -import trio +import trio # type: ignore from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel @@ -35,10 +35,10 @@ _default_arbiter_port = 1616 async def _main( async_fn: typing.Callable[..., typing.Awaitable], - args: tuple, - kwargs: dict, + args: typing.Tuple, + kwargs: typing.Dict[str, typing.Any], name: str, - arbiter_addr: (str, int) + arbiter_addr: typing.Tuple[str, int] ) -> typing.Any: """Async entry point for ``tractor``. """ @@ -81,10 +81,10 @@ async def _main( def run( async_fn: typing.Callable[..., typing.Awaitable], - *args: ..., + *args: typing.Tuple, name: str = None, - arbiter_addr: (str, int) = (_default_arbiter_host, _default_arbiter_port), - **kwargs: ... + arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port), + **kwargs: typing.Dict[str, typing.Any], ): """Run a trio-actor async function in process. diff --git a/tractor/_actor.py b/tractor/_actor.py index 270a339..febd07e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -10,7 +10,7 @@ import traceback import typing import uuid -import trio +import trio # type: ignore from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan @@ -41,7 +41,7 @@ async def _invoke( cid: str, chan: Channel, func: typing.Callable, - kwargs: dict, + kwargs: typing.Dict[str, typing.Any], task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. @@ -152,11 +152,11 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: [str] = [], - statespace: dict = {}, + rpc_module_paths: typing.List[str] = [], + statespace: typing.Dict[str, typing.Any] = {}, uid: str = None, loglevel: str = None, - arbiter_addr: (str, int) = None, + arbiter_addr: typing.Tuple[str, int] = None, ): self.name = name self.uid = (name, uid or str(uuid.uuid1())) @@ -186,7 +186,9 @@ class Actor: self._accept_host = None self._forkserver_info = None - async def wait_for_peer(self, uid: (str, str)) -> (trio.Event, Channel): + async def wait_for_peer( + self, uid: typing.Tuple[str, str] + ) -> (trio.Event, Channel): """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -284,14 +286,16 @@ class Actor: # maintain backpressure await q.put(msg) - def get_waitq(self, actorid: (str, str), cid: str) -> trio.Queue: + def get_waitq( + self, actorid: typing.Tuple[str, str], cid: str + ) -> trio.Queue: log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) async def send_cmd( self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> (str, trio.Queue): + ) -> typing.Tuple[str, trio.Queue]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. @@ -383,9 +387,9 @@ class Actor: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") def _fork_main( - self, accept_addr: (str, int), + self, accept_addr: typing.Tuple[str, int], forkserver_info: tuple, - parent_addr: (str, int) = None + parent_addr: typing.Tuple[str, int] = None ) -> None: # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") @@ -406,9 +410,9 @@ class Actor: async def _async_main( self, - accept_addr: (str, int), - arbiter_addr: (str, int) = None, - parent_addr: (str, int) = None, + accept_addr: typing.Tuple[str, int], + arbiter_addr: typing.Tuple[str, int] = None, + parent_addr: typing.Tuple[str, int] = None, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, maybe connect back to the parent, and @@ -506,7 +510,7 @@ class Actor: self, *, # (host, port) to bind for channel server - accept_host: (str, int) = None, + accept_host: typing.Tuple[str, int] = None, accept_port: int = 0, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: @@ -535,7 +539,7 @@ class Actor: self._listeners.extend(listeners) task_status.started() - async def _do_unreg(self, arbiter_addr: (str, int)) -> None: + async def _do_unreg(self, arbiter_addr: typing.Tuple[str, int]) -> None: # UNregister actor from the arbiter try: if arbiter_addr is not None: @@ -582,7 +586,7 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self) -> (str, int): + def accept_addr(self) -> typing.Tuple[str, int]: """Primary address to which the channel server is bound. """ try: @@ -594,7 +598,7 @@ class Actor: """Return a portal to our parent actor.""" return Portal(self._parent_chan) - def get_chans(self, uid: (str, str)) -> [Channel]: + def get_chans(self, uid: typing.Tuple[str, str]) -> typing.List[Channel]: """Return all channels to the actor with provided uid.""" return self._peers[uid] @@ -615,12 +619,14 @@ class Arbiter(Actor): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name: str) -> (str, int): + def find_actor(self, name: str) -> typing.Tuple[str, int]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr - async def wait_for_actor(self, name: str) -> [(str, int)]: + async def wait_for_actor( + self, name: str + ) -> typing.List[typing.Tuple[str, int]]: """Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently @@ -641,7 +647,9 @@ class Arbiter(Actor): return sockaddrs - def register_actor(self, uid: (str, str), sockaddr: (str, int)) -> None: + def register_actor( + self, uid: typing.Tuple[str, str], sockaddr: typing.Tuple[str, int] + ) -> None: name, uuid = uid self._registry[uid] = sockaddr @@ -651,7 +659,7 @@ class Arbiter(Actor): for event in events: event.set() - def unregister_actor(self, uid: (str, str)) -> None: + def unregister_actor(self, uid: typing.Tuple[str, str]) -> None: self._registry.pop(uid, None) @@ -660,7 +668,7 @@ async def _start_actor( main: typing.Coroutine, host: str, port: int, - arbiter_addr: (str, int), + arbiter_addr: typing.Tuple[str, int], nursery: trio._core._run.Nursery = None ): """Spawn a local actor by starting a task to execute it's main async @@ -720,7 +728,9 @@ async def get_arbiter(host: str, port: int) -> Portal: @asynccontextmanager -async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal: +async def find_actor( + name: str, arbiter_sockaddr: typing.Tuple[str, int] = None +) -> Portal: """Ask the arbiter to find actor(s) by name. Returns a connected portal to the last registered matching actor @@ -742,7 +752,7 @@ async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal: @asynccontextmanager async def wait_for_actor( name: str, - arbiter_sockaddr: (str, int) = None + arbiter_sockaddr: typing.Tuple[str, int] = None ) -> Portal: """Wait on an actor to register with the arbiter. diff --git a/tractor/_forkserver_hackzorz.py b/tractor/_forkserver_hackzorz.py index edd282d..827f744 100644 --- a/tractor/_forkserver_hackzorz.py +++ b/tractor/_forkserver_hackzorz.py @@ -2,6 +2,9 @@ This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py`` with some hackery to prevent any more then a single forkserver and semaphore tracker per ``MainProcess``. + +.. note:: There is no type hinting in this code base (yet) to remain as + a close as possible to upstream. """ import os import socket diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 87ef4e3..24b84a8 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -42,11 +42,11 @@ class StreamQueue: yield packet @property - def laddr(self) -> (str, int): + def laddr(self) -> typing.Tuple[str, int]: return self._laddr @property - def raddr(self) -> (str, int): + def raddr(self) -> typing.Tuple[str, int]: return self._raddr async def put(self, data: typing.Any) -> int: @@ -97,11 +97,11 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> (str, int): + def laddr(self) -> typing.Tuple[str, int]: return self.squeue.laddr if self.squeue else (None, None) @property - def raddr(self) -> (str, int): + def raddr(self) -> typing.Tuple[str, int]: return self.squeue.raddr if self.squeue else (None, None) async def connect( diff --git a/tractor/_portal.py b/tractor/_portal.py index 6087509..a42d5f8 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -31,7 +31,9 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): yield nursery -async def _do_handshake(actor: 'Actor', chan: 'Channel') -> (str, str): +async def _do_handshake( + actor: 'Actor', chan: 'Channel' +)-> typing.Tuple[str, str]: await chan.send(actor.uid) uid = await chan.recv() @@ -69,7 +71,7 @@ class Portal: async def _submit( self, ns: str, func: str, **kwargs - ) -> (str, trio.Queue, str, dict): + ) -> typing.Tuple[str, trio.Queue, str, typing.Dict[str, typing.Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. From 18c55e2b5f1df940fcb917979884319cd3825018 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Aug 2018 13:12:59 -0400 Subject: [PATCH 4/6] Type igore `colorlog` --- tractor/log.py | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/tractor/log.py b/tractor/log.py index d3e2a7f..8d52d1b 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -4,7 +4,7 @@ Log like a forester! from functools import partial import sys import logging -import colorlog +import colorlog # type: ignore _proj_name = 'tractor' _default_loglevel = None @@ -69,25 +69,19 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: log = get_logger(name) # our root logger if not level: - return + return log log.setLevel(level.upper() if not isinstance(level, int) else level) - - if not any( - handler.stream == sys.stderr for handler in log.handlers - if getattr(handler, 'stream', None) - ): - handler = logging.StreamHandler() - - formatter = colorlog.ColoredFormatter( - LOG_FORMAT, - datefmt=DATE_FORMAT, - log_colors=STD_PALETTE, - secondary_log_colors=BOLD_PALETTE, - style='{', - ) - handler.setFormatter(formatter) - log.addHandler(handler) + handler = logging.StreamHandler() + formatter = colorlog.ColoredFormatter( + LOG_FORMAT, + datefmt=DATE_FORMAT, + log_colors=STD_PALETTE, + secondary_log_colors=BOLD_PALETTE, + style='{', + ) + handler.setFormatter(formatter) + log.addHandler(handler) return log From 086df43b59decff2007b652c8cb1458840256969 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Aug 2018 17:16:24 -0400 Subject: [PATCH 5/6] Woot! mypy run is clean! --- tractor/_actor.py | 113 ++++++++++++++++++-------------- tractor/_forkserver_hackzorz.py | 9 +-- tractor/_ipc.py | 50 ++++++++------ tractor/_portal.py | 41 +++++++----- tractor/_state.py | 7 +- tractor/_trionics.py | 48 +++++++++----- tractor/log.py | 5 +- 7 files changed, 157 insertions(+), 116 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index febd07e..2e00e99 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -7,8 +7,9 @@ from itertools import chain import importlib import inspect import traceback -import typing import uuid +import typing +from typing import Dict, List, Tuple, Any, Optional, Union import trio # type: ignore from async_generator import asynccontextmanager, aclosing @@ -41,7 +42,7 @@ async def _invoke( cid: str, chan: Channel, func: typing.Callable, - kwargs: typing.Dict[str, typing.Any], + kwargs: Dict[str, Any], task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. @@ -152,43 +153,45 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: typing.List[str] = [], - statespace: typing.Dict[str, typing.Any] = {}, + rpc_module_paths: List[str] = [], + statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, - arbiter_addr: typing.Tuple[str, int] = None, - ): + arbiter_addr: Optional[Tuple[str, int]] = None, + ) -> None: self.name = name self.uid = (name, uid or str(uuid.uuid1())) self.rpc_module_paths = rpc_module_paths - self._mods = {} + self._mods: dict = {} # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 - self.statespace = statespace + self.statespace = statespace or {} self.loglevel = loglevel self._arb_addr = arbiter_addr # filled in by `_async_main` after fork - self._root_nursery = None - self._server_nursery = None - self._peers = defaultdict(list) - self._peer_connected = {} + self._root_nursery: trio._core._run.Nursery = None + self._server_nursery: trio._core._run.Nursery = None + self._peers: defaultdict = defaultdict(list) + self._peer_connected: dict = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._no_more_rpc_tasks = trio.Event() self._no_more_rpc_tasks.set() - self._rpc_tasks = {} - - self._actors2calls = {} # map {uids -> {callids -> waiter queues}} - self._listeners = [] - self._parent_chan = None - self._accept_host = None - self._forkserver_info = None + self._rpc_tasks: Dict[ + Channel, + List[Tuple[trio._core._run.CancelScope, typing.Callable]] + ] = {} + # map {uids -> {callids -> waiter queues}} + self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} + self._listeners: List[trio.abc.Listener] = [] + self._parent_chan: Optional[Channel] = None + self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None async def wait_for_peer( - self, uid: typing.Tuple[str, str] - ) -> (trio.Event, Channel): + self, uid: Tuple[str, str] + ) -> Tuple[trio.Event, Channel]: """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -287,7 +290,9 @@ class Actor: await q.put(msg) def get_waitq( - self, actorid: typing.Tuple[str, str], cid: str + self, + actorid: Tuple[str, str], + cid: str ) -> trio.Queue: log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) @@ -295,12 +300,13 @@ class Actor: async def send_cmd( self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> typing.Tuple[str, trio.Queue]: + ) -> Tuple[str, trio.Queue]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ cid = str(uuid.uuid1()) + assert chan.uid q = self.get_waitq(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) @@ -344,6 +350,7 @@ class Actor: # push any non-rpc-response error to all local consumers # and mark the channel as errored chan._exc = err = msg['error'] + assert chan.uid for cid in self._actors2calls[chan.uid]: await self._push_result(chan.uid, cid, msg) raise InternalActorError(f"{chan.uid}\n" + err) @@ -387,9 +394,10 @@ class Actor: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") def _fork_main( - self, accept_addr: typing.Tuple[str, int], - forkserver_info: tuple, - parent_addr: typing.Tuple[str, int] = None + self, + accept_addr: Tuple[str, int], + forkserver_info: Tuple[Any, Any, Any, Any, Any], + parent_addr: Tuple[str, int] = None ) -> None: # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") @@ -410,9 +418,9 @@ class Actor: async def _async_main( self, - accept_addr: typing.Tuple[str, int], - arbiter_addr: typing.Tuple[str, int] = None, - parent_addr: typing.Tuple[str, int] = None, + accept_addr: Tuple[str, int], + arbiter_addr: Optional[Tuple[str, int]] = None, + parent_addr: Optional[Tuple[str, int]] = None, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, maybe connect back to the parent, and @@ -510,7 +518,7 @@ class Actor: self, *, # (host, port) to bind for channel server - accept_host: typing.Tuple[str, int] = None, + accept_host: Tuple[str, int] = None, accept_port: int = 0, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: @@ -539,7 +547,7 @@ class Actor: self._listeners.extend(listeners) task_status.started() - async def _do_unreg(self, arbiter_addr: typing.Tuple[str, int]) -> None: + async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None: # UNregister actor from the arbiter try: if arbiter_addr is not None: @@ -566,16 +574,16 @@ class Actor: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ - scopes = self._rpc_tasks - log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}") - for chan, scopes in scopes.items(): + tasks = self._rpc_tasks + log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}") + for chan, scopes in tasks.items(): log.debug(f"Cancelling all tasks for {chan.uid}") for scope, func in scopes: log.debug(f"Cancelling task for {func}") scope.cancel() - if scopes: + if tasks: log.info( - f"Waiting for remaining rpc tasks to complete {scopes}") + f"Waiting for remaining rpc tasks to complete {tasks}") await self._no_more_rpc_tasks.wait() def cancel_server(self) -> None: @@ -586,19 +594,20 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self) -> typing.Tuple[str, int]: + def accept_addr(self) -> Optional[Tuple[str, int]]: """Primary address to which the channel server is bound. """ try: return self._listeners[0].socket.getsockname() except OSError: - return + return None def get_parent(self) -> Portal: """Return a portal to our parent actor.""" + assert self._parent_chan, "No parent channel for this actor?" return Portal(self._parent_chan) - def get_chans(self, uid: typing.Tuple[str, str]) -> typing.List[Channel]: + def get_chans(self, uid: Tuple[str, str]) -> List[Channel]: """Return all channels to the actor with provided uid.""" return self._peers[uid] @@ -619,14 +628,16 @@ class Arbiter(Actor): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name: str) -> typing.Tuple[str, int]: + def find_actor(self, name: str) -> Optional[Tuple[str, int]]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr + return None + async def wait_for_actor( self, name: str - ) -> typing.List[typing.Tuple[str, int]]: + ) -> List[Tuple[str, int]]: """Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently @@ -648,7 +659,7 @@ class Arbiter(Actor): return sockaddrs def register_actor( - self, uid: typing.Tuple[str, str], sockaddr: typing.Tuple[str, int] + self, uid: Tuple[str, str], sockaddr: Tuple[str, int] ) -> None: name, uuid = uid self._registry[uid] = sockaddr @@ -659,16 +670,16 @@ class Arbiter(Actor): for event in events: event.set() - def unregister_actor(self, uid: typing.Tuple[str, str]) -> None: + def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) async def _start_actor( actor: Actor, - main: typing.Coroutine, + main: typing.Callable[..., typing.Awaitable], host: str, port: int, - arbiter_addr: typing.Tuple[str, int], + arbiter_addr: Tuple[str, int], nursery: trio._core._run.Nursery = None ): """Spawn a local actor by starting a task to execute it's main async @@ -709,7 +720,9 @@ async def _start_actor( @asynccontextmanager -async def get_arbiter(host: str, port: int) -> Portal: +async def get_arbiter( + host: str, port: int +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: """Return a portal instance connected to a local or remote arbiter. """ @@ -729,8 +742,8 @@ async def get_arbiter(host: str, port: int) -> Portal: @asynccontextmanager async def find_actor( - name: str, arbiter_sockaddr: typing.Tuple[str, int] = None -) -> Portal: + name: str, arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Optional[Portal], None]: """Ask the arbiter to find actor(s) by name. Returns a connected portal to the last registered matching actor @@ -752,8 +765,8 @@ async def find_actor( @asynccontextmanager async def wait_for_actor( name: str, - arbiter_sockaddr: typing.Tuple[str, int] = None -) -> Portal: + arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Portal, None]: """Wait on an actor to register with the arbiter. A portal to the first actor which registered is be returned. diff --git a/tractor/_forkserver_hackzorz.py b/tractor/_forkserver_hackzorz.py index 827f744..411d190 100644 --- a/tractor/_forkserver_hackzorz.py +++ b/tractor/_forkserver_hackzorz.py @@ -15,15 +15,12 @@ import errno import selectors import warnings -from multiprocessing import ( - forkserver, semaphore_tracker, spawn, process, util, - connection -) +from multiprocessing import semaphore_tracker, spawn, process # type: ignore +from multiprocessing import forkserver, util, connection # type: ignore from multiprocessing.forkserver import ( ForkServer, MAXFDS_TO_SEND - # _serve_one, ) -from multiprocessing.context import reduction +from multiprocessing.context import reduction # type: ignore # taken from 3.8 diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 24b84a8..6162150 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -2,6 +2,7 @@ Inter-process comms abstractions """ import typing +from typing import Any, Tuple, Optional import msgpack import trio @@ -14,7 +15,7 @@ log = get_logger('ipc') class StreamQueue: """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. """ - def __init__(self, stream: trio.SocketStream): + def __init__(self, stream: trio.SocketStream) -> None: self.stream = stream self._agen = self._iter_packets() self._laddr = self.stream.socket.getsockname()[:2] @@ -28,7 +29,7 @@ class StreamQueue: while True: try: data = await self.stream.receive_some(2**10) - log.trace(f"received {data}") + log.trace(f"received {data}") # type: ignore except trio.BrokenStreamError: log.error(f"Stream connection {self.raddr} broke") return @@ -42,19 +43,19 @@ class StreamQueue: yield packet @property - def laddr(self) -> typing.Tuple[str, int]: + def laddr(self) -> Tuple[str, int]: return self._laddr @property - def raddr(self) -> typing.Tuple[str, int]: + def raddr(self) -> Tuple[str, int]: return self._raddr - async def put(self, data: typing.Any) -> int: + async def put(self, data: Any) -> int: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) - async def get(self) -> typing.Any: + async def get(self) -> Any: return await self._agen.asend(None) def __aiter__(self): @@ -72,21 +73,24 @@ class Channel: """ def __init__( self, - destaddr: tuple = None, - on_reconnect: typing.Coroutine = None, + destaddr: Optional[Tuple[str, int]] = None, + on_reconnect: typing.Callable[..., typing.Awaitable] = None, auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active ) -> None: self._recon_seq = on_reconnect self._autorecon = auto_reconnect - self.squeue = StreamQueue(stream) if stream else None + self.squeue: Optional[StreamQueue] = StreamQueue( + stream) if stream else None if self.squeue and destaddr: raise ValueError( f"A stream was provided with local addr {self.laddr}" ) - self._destaddr = destaddr or self.squeue.raddr + self._destaddr = self.squeue.raddr if self.squeue else destaddr # set after handshake - always uid of far end - self.uid = None + self.uid: Optional[Tuple[str, str]] = None + # set if far end actor errors internally + self._exc: Optional[Exception] = None self._agen = self._aiter_recv() def __repr__(self) -> str: @@ -97,15 +101,15 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> typing.Tuple[str, int]: - return self.squeue.laddr if self.squeue else (None, None) + def laddr(self) -> Optional[Tuple[str, int]]: + return self.squeue.laddr if self.squeue else None @property - def raddr(self) -> typing.Tuple[str, int]: - return self.squeue.raddr if self.squeue else (None, None) + def raddr(self) -> Optional[Tuple[str, int]]: + return self.squeue.raddr if self.squeue else None async def connect( - self, destaddr: typing.Tuple[str, int] = None, **kwargs + self, destaddr: Tuple[str, int] = None, **kwargs ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") @@ -114,11 +118,13 @@ class Channel: self.squeue = StreamQueue(stream) return stream - async def send(self, item: typing.Any) -> None: - log.trace(f"send `{item}`") + async def send(self, item: Any) -> None: + log.trace(f"send `{item}`") # type: ignore + assert self.squeue await self.squeue.put(item) - async def recv(self) -> typing.Any: + async def recv(self) -> Any: + assert self.squeue try: return await self.squeue.get() except trio.BrokenStreamError: @@ -128,6 +134,7 @@ class Channel: async def aclose(self) -> None: log.debug(f"Closing {self}") + assert self.squeue await self.squeue.stream.aclose() async def __aenter__(self): @@ -171,9 +178,10 @@ class Channel: async def _aiter_recv( self - ) -> typing.AsyncGenerator[typing.Any, None]: + ) -> typing.AsyncGenerator[Any, None]: """Async iterate items from underlying stream. """ + assert self.squeue while True: try: async for item in self.squeue: @@ -200,7 +208,7 @@ class Channel: @asynccontextmanager async def _connect_chan( host: str, port: int -) -> typing.AsyncContextManager[Channel]: +) -> typing.AsyncGenerator[Channel, None]: """Create and connect a channel with disconnect on context manager teardown. """ diff --git a/tractor/_portal.py b/tractor/_portal.py index a42d5f8..ddbea76 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -3,11 +3,13 @@ Portal api """ import importlib import typing +from typing import Tuple, Any, Dict, Optional import trio from async_generator import asynccontextmanager from ._state import current_actor +from ._ipc import Channel from .log import get_logger @@ -32,10 +34,11 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): async def _do_handshake( - actor: 'Actor', chan: 'Channel' -)-> typing.Tuple[str, str]: + actor: 'Actor', # type: ignore + chan: Channel +)-> Any: await chan.send(actor.uid) - uid = await chan.recv() + uid: Tuple[str, str] = await chan.recv() if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") @@ -54,14 +57,16 @@ class Portal: Think of this like an native async IPC API. """ - def __init__(self, channel: 'Channel'): + def __init__(self, channel: Channel) -> None: self.channel = channel # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result = None - self._exc = None - self._expect_result = None + self._exc: Optional[RemoteActorError] = None + self._expect_result: Optional[ + Tuple[str, Any, str, Dict[str, Any]] + ] = None async def aclose(self) -> None: log.debug(f"Closing {self}") @@ -71,7 +76,7 @@ class Portal: async def _submit( self, ns: str, func: str, **kwargs - ) -> typing.Tuple[str, trio.Queue, str, typing.Dict[str, typing.Any]]: + ) -> Tuple[str, trio.Queue, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -103,7 +108,7 @@ class Portal: "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, **kwargs) - async def run(self, ns: str, func: str, **kwargs) -> typing.Any: + async def run(self, ns: str, func: str, **kwargs) -> Any: """Submit a function to be scheduled and run by actor, wrap and return its (stream of) result(s). @@ -115,7 +120,7 @@ class Portal: async def _return_from_resptype( self, cid: str, q: trio.Queue, resptype: str, first_msg: dict - ) -> typing.Any: + ) -> Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) @@ -152,7 +157,7 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") - async def result(self) -> typing.Any: + async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: @@ -160,7 +165,7 @@ class Portal: # teardown can reraise them exc = self.channel._exc if exc: - raise RemoteActorError(f"{self.channel.uid}\n" + exc) + raise RemoteActorError(f"{self.channel.uid}\n{exc}") else: raise RuntimeError( f"Portal for {self.channel.uid} is not expecting a final" @@ -205,22 +210,24 @@ class LocalPortal: A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__(self, actor: 'Actor'): + def __init__( + self, + actor: 'Actor' # type: ignore + ) -> None: self.actor = actor - async def run(self, ns: str, func: str, **kwargs) -> typing.Any: + async def run(self, ns: str, func: str, **kwargs) -> Any: """Run a requested function locally and return it's result. """ obj = self.actor if ns == 'self' else importlib.import_module(ns) - func = getattr(obj, func) - return func(**kwargs) + return getattr(obj, func)(**kwargs) @asynccontextmanager async def open_portal( - channel: 'Channel', + channel: Channel, nursery: trio._core._run.Nursery = None -) -> typing.AsyncContextManager[Portal]: +) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing. diff --git a/tractor/_state.py b/tractor/_state.py index 767bb27..704fae7 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,10 +1,13 @@ """ Per process state """ -_current_actor = None +from typing import Optional -def current_actor() -> 'Actor': +_current_actor: Optional['Actor'] = None # type: ignore + + +def current_actor() -> 'Actor': # type: ignore """Get the process-local actor instance. """ if not _current_actor: diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 310fea9..c25137e 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,7 +3,8 @@ """ import multiprocessing as mp import inspect -from multiprocessing import forkserver, semaphore_tracker +from multiprocessing import forkserver, semaphore_tracker # type: ignore +from typing import Tuple, List, Dict, Optional, Any import typing import trio @@ -24,14 +25,17 @@ log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor: Actor): + def __init__(self, actor: Actor) -> None: # self.supervisor = supervisor # TODO - self._actor = actor - self._children = {} + self._actor: Actor = actor + self._children: Dict[ + Tuple[str, str], + Tuple[Actor, mp.Process, Optional[Portal]] + ] = {} # portals spawned with ``run_in_actor()`` - self._cancel_after_result_on_exit = set() - self.cancelled = False - self._forkserver = None + self._cancel_after_result_on_exit: set = set() + self.cancelled: bool = False + self._forkserver: forkserver.ForkServer = None async def __aenter__(self): return self @@ -39,9 +43,9 @@ class ActorNursery: async def start_actor( self, name: str, - bind_addr: (str, int) = ('127.0.0.1', 0), - statespace: dict = None, - rpc_module_paths: [str] = None, + bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + statespace: Optional[Dict[str, Any]] = None, + rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() @@ -71,6 +75,7 @@ class ActorNursery: semaphore_tracker._semaphore_tracker._fd, ) else: + assert self._actor._forkserver_info fs_info = ( fs._forkserver_address, fs._forkserver_alive_fd, @@ -88,7 +93,7 @@ class ActorNursery: # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait # for it to fully come up before sending a cancel request - self._children[actor.uid] = [actor, proc, None] + self._children[actor.uid] = (actor, proc, None) proc.start() if not proc.is_alive(): @@ -100,15 +105,15 @@ class ActorNursery: # local actor by the time we get a ref to it event, chan = await self._actor.wait_for_peer(actor.uid) portal = Portal(chan) - self._children[actor.uid][2] = portal + self._children[actor.uid] = (actor, proc, portal) return portal async def run_in_actor( self, name: str, fn: typing.Callable, - bind_addr: (str, int) = ('127.0.0.1', 0), - rpc_module_paths: [str] = None, + bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + rpc_module_paths: List[str] = None, statespace: dict = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` @@ -155,7 +160,12 @@ class ActorNursery: async for item in agen: log.debug(f"Consuming item {item}") - async def wait_for_proc(proc, actor, portal, cancel_scope): + async def wait_for_proc( + proc: mp.Process, + actor: Actor, + portal: Portal, + cancel_scope: trio._core._run.CancelScope, + ) -> None: # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) @@ -172,9 +182,10 @@ class ActorNursery: cancel_scope.cancel() async def wait_for_actor( - portal, actor, + portal: Portal, + actor: Actor, task_status=trio.TASK_STATUS_IGNORED, - ): + ) -> None: # cancel the actor gracefully with trio.open_cancel_scope() as cs: task_status.started(cs) @@ -231,6 +242,7 @@ class ActorNursery: do_hard_kill(proc) # spawn cancel tasks async + assert portal n.start_soon(portal.cancel_actor) log.debug(f"Waiting on all subactors to complete") @@ -275,7 +287,7 @@ class ActorNursery: @asynccontextmanager -async def open_nursery() -> typing.AsyncContextManager[ActorNursery]: +async def open_nursery() -> typing.AsyncGenerator[None, ActorNursery]: """Create and yield a new ``ActorNursery``. """ actor = current_actor() diff --git a/tractor/log.py b/tractor/log.py index 8d52d1b..5a65603 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -2,9 +2,10 @@ Log like a forester! """ from functools import partial -import sys import logging import colorlog # type: ignore +from typing import Optional + _proj_name = 'tractor' _default_loglevel = None @@ -86,5 +87,5 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: return log -def get_loglevel() -> str: +def get_loglevel() -> Optional[str]: return _default_loglevel From 4d63125a3c3735e7b7be6e6345bbae094904e423 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Aug 2018 18:03:21 -0400 Subject: [PATCH 6/6] Add mypy checking to CI! --- .travis.yml | 1 + requirements-test.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 4d69048..8837171 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,4 +12,5 @@ install: - pip install -U . -r requirements-test.txt script: + - mypy tractor/ --ignore-missing-imports - pytest tests/ --no-print-logs diff --git a/requirements-test.txt b/requirements-test.txt index 285c77f..447b18c 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,4 @@ pytest pytest-trio pdbpp +mypy