diff --git a/tractor/__init__.py b/tractor/__init__.py index f5146ad..569760f 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -3,6 +3,7 @@ tractor: An actor model micro-framework built on ``trio`` and ``multiprocessing``. """ from functools import partial +import typing import trio @@ -32,7 +33,13 @@ _default_arbiter_host = '127.0.0.1' _default_arbiter_port = 1616 -async def _main(async_fn, args, kwargs, name, arbiter_addr): +async def _main( + async_fn: typing.Callable[..., typing.Awaitable], + args: tuple, + kwargs: dict, + name: str, + arbiter_addr: (str, int) +) -> typing.Any: """Async entry point for ``tractor``. """ log = get_logger('tractor') @@ -73,11 +80,11 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): def run( - async_fn, - *args, - name=None, - arbiter_addr=(_default_arbiter_host, _default_arbiter_port), - **kwargs + async_fn: typing.Callable[..., typing.Awaitable], + *args: ..., + name: str = None, + arbiter_addr: (str, int) = (_default_arbiter_host, _default_arbiter_port), + **kwargs: ... ): """Run a trio-actor async function in process. diff --git a/tractor/_actor.py b/tractor/_actor.py index a5e833f..4be6381 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1,13 +1,14 @@ """ Actor primitives and helpers """ -import inspect -import importlib from collections import defaultdict from functools import partial -import traceback -import uuid from itertools import chain +import importlib +import inspect +import traceback +import typing +import uuid import trio from async_generator import asynccontextmanager, aclosing @@ -36,8 +37,11 @@ class InternalActorError(RuntimeError): async def _invoke( - actor, cid, chan, func, kwargs, - treat_as_gen=False, + actor: 'Actor', + cid: str, + chan: Channel, + func: typing.Callable, + kwargs: dict, task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. @@ -169,7 +173,7 @@ class Actor: self._accept_host = None self._forkserver_info = None - async def wait_for_peer(self, uid): + async def wait_for_peer(self, uid: (str, str)) -> (trio.Event, Channel): """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -179,7 +183,7 @@ class Actor: log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] - def load_namespaces(self): + def load_namespaces(self) -> None: # We load namespaces after fork since this actor may # be spawned on a different machine from the original nursery # and we need to try and load the local module code (if it @@ -198,7 +202,7 @@ class Actor: async def _stream_handler( self, stream: trio.SocketStream, - ): + ) -> None: """Entry point for new inbound connections to the channel server. """ self._no_more_peers.clear() @@ -256,19 +260,21 @@ class Actor: await chan.send(None) await chan.aclose() - async def _push_result(self, actorid, cid, msg): + async def _push_result(self, actorid, cid: str, msg: dict) -> None: assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await q.put(msg) - def get_waitq(self, actorid, cid): + def get_waitq(self, actorid: (str, str), cid: str) -> trio.Queue: log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) - async def send_cmd(self, chan, ns, func, kwargs): + async def send_cmd( + self, chan: Channel, ns: str, func: str, kwargs: dict + ) -> (str, trio.Queue): """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. @@ -279,7 +285,9 @@ class Actor: await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, q - async def _process_messages(self, chan, treat_as_gen=False): + async def _process_messages( + self, chan: Channel, treat_as_gen: bool = False + ) -> None: """Process messages async-RPC style. Process rpc requests and deliver retrieved responses from channels. @@ -371,7 +379,11 @@ class Actor: finally: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - def _fork_main(self, accept_addr, forkserver_info, parent_addr=None): + def _fork_main( + self, accept_addr: (str, int), + forkserver_info: tuple, + parent_addr: (str, int) = None + ) -> None: # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") self._forkserver_info = forkserver_info @@ -391,22 +403,17 @@ class Actor: async def _async_main( self, - accept_addr, - arbiter_addr=None, - parent_addr=None, - _main_coro=None, - task_status=trio.TASK_STATUS_IGNORED, - ): + accept_addr: (str, int), + arbiter_addr: (str, int) = None, + parent_addr: (str, int) = None, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: """Start the channel server, maybe connect back to the parent, and start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ - # if this is the `MainProcess` then we get a ref to the main - # task's coroutine object for tossing errors into - self._main_coro = _main_coro - arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False try: @@ -422,12 +429,6 @@ class Actor: self._serve_forever, accept_host=host, accept_port=port) ) - # XXX: I wonder if a better name is maybe "requester" - # since I don't think the notion of a "parent" actor - # necessarily sticks given that eventually we want - # ``'MainProcess'`` (the actor who initially starts the - # forkserver) to eventually be the only one who is - # allowed to spawn new processes per Python program. if parent_addr is not None: try: # Connect back to the parent actor and conduct initial @@ -502,10 +503,10 @@ class Actor: self, *, # (host, port) to bind for channel server - accept_host=None, - accept_port=0, - task_status=trio.TASK_STATUS_IGNORED - ): + accept_host: (str, int) = None, + accept_port: int = 0, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: """Start the channel server, begin listening for new connections. This will cause an actor to continue living (blocking) until @@ -531,7 +532,7 @@ class Actor: self._listeners.extend(listeners) task_status.started() - async def _do_unreg(self, arbiter_addr): + async def _do_unreg(self, arbiter_addr: (str, int)) -> None: # UNregister actor from the arbiter try: if arbiter_addr is not None: @@ -541,7 +542,7 @@ class Actor: except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") - async def cancel(self): + async def cancel(self) -> None: """Cancel this actor. The sequence in order is: @@ -554,7 +555,7 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() - async def cancel_rpc_tasks(self): + async def cancel_rpc_tasks(self) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ @@ -570,7 +571,7 @@ class Actor: f"Waiting for remaining rpc tasks to complete {scopes}") await self._no_more_rpc_tasks.wait() - def cancel_server(self): + def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. """ @@ -578,7 +579,7 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self): + def accept_addr(self) -> (str, int): """Primary address to which the channel server is bound. """ try: @@ -586,11 +587,13 @@ class Actor: except OSError: return - def get_parent(self): + def get_parent(self) -> Portal: + """Return a portal to our parent actor.""" return Portal(self._parent_chan) - def get_chans(self, actorid): - return self._peers[actorid] + def get_chans(self, uid: (str, str)) -> [Channel]: + """Return all channels to the actor with provided uid.""" + return self._peers[uid] class Arbiter(Actor): @@ -609,12 +612,12 @@ class Arbiter(Actor): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name): + def find_actor(self, name: str) -> (str, int): for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr - async def wait_for_actor(self, name): + async def wait_for_actor(self, name: str) -> [(str, int)]: """Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently @@ -635,7 +638,7 @@ class Arbiter(Actor): return sockaddrs - def register_actor(self, uid, sockaddr): + def register_actor(self, uid: (str, str), sockaddr: (str, int)) -> None: name, uuid = uid self._registry[uid] = sockaddr @@ -645,13 +648,20 @@ class Arbiter(Actor): for event in events: event.set() - def unregister_actor(self, uid): + def unregister_actor(self, uid: (str, str)) -> None: self._registry.pop(uid, None) -async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): - """Spawn a local actor by starting a task to execute it's main - async function. +async def _start_actor( + actor: Actor, + main: typing.Coroutine, + host: str, + port: int, + arbiter_addr: (str, int), + nursery: trio._core._run.Nursery = None +): + """Spawn a local actor by starting a task to execute it's main async + function. Blocks if no nursery is provided, in which case it is expected the nursery provider is responsible for waiting on the task to complete. @@ -664,29 +674,22 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): log.info(f"Starting local {actor} @ {host}:{port}") async with trio.open_nursery() as nursery: - - if main is not None: - main_coro = main() - await nursery.start( partial( actor._async_main, accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, - _main_coro=main_coro ) ) if main is not None: - result = await main_coro + result = await main() # XXX: If spawned with a dedicated "main function", # the actor is cancelled when this context is complete # given that there are no more active peer channels connected actor.cancel_server() - # block on actor to complete - # unset module state _state._current_actor = None log.info("Completed async main") @@ -695,7 +698,7 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): @asynccontextmanager -async def get_arbiter(host, port): +async def get_arbiter(host: str, port: int) -> Portal: """Return a portal instance connected to a local or remote arbiter. """ @@ -714,10 +717,7 @@ async def get_arbiter(host, port): @asynccontextmanager -async def find_actor( - name, - arbiter_sockaddr=None, -): +async def find_actor(name: str, arbiter_sockaddr: (str, int) = None) -> Portal: """Ask the arbiter to find actor(s) by name. Returns a connected portal to the last registered matching actor @@ -738,9 +738,9 @@ async def find_actor( @asynccontextmanager async def wait_for_actor( - name, - arbiter_sockaddr=None, -): + name: str, + arbiter_sockaddr: (str, int) = None +) -> Portal: """Wait on an actor to register with the arbiter. A portal to the first actor which registered is be returned. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 2d86e56..87ef4e3 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,7 +1,7 @@ """ Inter-process comms abstractions """ -from typing import Coroutine, Tuple +import typing import msgpack import trio @@ -14,14 +14,14 @@ log = get_logger('ipc') class StreamQueue: """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. """ - def __init__(self, stream): + def __init__(self, stream: trio.SocketStream): self.stream = stream self._agen = self._iter_packets() self._laddr = self.stream.socket.getsockname()[:2] self._raddr = self.stream.socket.getpeername()[:2] self._send_lock = trio.Lock() - async def _iter_packets(self): + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ unpacker = msgpack.Unpacker(raw=False, use_list=False) @@ -42,25 +42,25 @@ class StreamQueue: yield packet @property - def laddr(self): + def laddr(self) -> (str, int): return self._laddr @property - def raddr(self): + def raddr(self) -> (str, int): return self._raddr - async def put(self, data): + async def put(self, data: typing.Any) -> int: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) - async def get(self): + async def get(self) -> typing.Any: return await self._agen.asend(None) def __aiter__(self): return self._agen - def connected(self): + def connected(self) -> bool: return self.stream.socket.fileno() != -1 @@ -73,7 +73,7 @@ class Channel: def __init__( self, destaddr: tuple = None, - on_reconnect: Coroutine = None, + on_reconnect: typing.Coroutine = None, auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active ) -> None: @@ -89,7 +89,7 @@ class Channel: self.uid = None self._agen = self._aiter_recv() - def __repr__(self): + def __repr__(self) -> str: if self.squeue: return repr( self.squeue.stream.socket._sock).replace( @@ -97,14 +97,16 @@ class Channel: return object.__repr__(self) @property - def laddr(self): + def laddr(self) -> (str, int): return self.squeue.laddr if self.squeue else (None, None) @property - def raddr(self): + def raddr(self) -> (str, int): return self.squeue.raddr if self.squeue else (None, None) - async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): + async def connect( + self, destaddr: typing.Tuple[str, int] = None, **kwargs + ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr @@ -112,11 +114,11 @@ class Channel: self.squeue = StreamQueue(stream) return stream - async def send(self, item): + async def send(self, item: typing.Any) -> None: log.trace(f"send `{item}`") await self.squeue.put(item) - async def recv(self): + async def recv(self) -> typing.Any: try: return await self.squeue.get() except trio.BrokenStreamError: @@ -124,7 +126,7 @@ class Channel: await self._reconnect() return await self.recv() - async def aclose(self, *args): + async def aclose(self) -> None: log.debug(f"Closing {self}") await self.squeue.stream.aclose() @@ -138,7 +140,7 @@ class Channel: def __aiter__(self): return self._agen - async def _reconnect(self): + async def _reconnect(self) -> None: """Handle connection failures by polling until a reconnect can be established. """ @@ -167,7 +169,9 @@ class Channel: " for re-establishment") await trio.sleep(1) - async def _aiter_recv(self): + async def _aiter_recv( + self + ) -> typing.AsyncGenerator[typing.Any, None]: """Async iterate items from underlying stream. """ while True: @@ -189,14 +193,16 @@ class Channel: else: return - def connected(self): + def connected(self) -> bool: return self.squeue.connected() if self.squeue else False @asynccontextmanager -async def _connect_chan(host, port): - """Create and connect a channel with disconnect on - context manager teardown. +async def _connect_chan( + host: str, port: int +) -> typing.AsyncContextManager[Channel]: + """Create and connect a channel with disconnect on context manager + teardown. """ chan = Channel((host, port)) await chan.connect() diff --git a/tractor/_portal.py b/tractor/_portal.py index 835f918..6087509 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -2,6 +2,7 @@ Portal api """ import importlib +import typing import trio from async_generator import asynccontextmanager @@ -18,7 +19,7 @@ class RemoteActorError(RuntimeError): @asynccontextmanager -async def maybe_open_nursery(nursery=None): +async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -30,7 +31,7 @@ async def maybe_open_nursery(nursery=None): yield nursery -async def _do_handshake(actor, chan): +async def _do_handshake(actor: 'Actor', chan: 'Channel') -> (str, str): await chan.send(actor.uid) uid = await chan.recv() @@ -51,7 +52,7 @@ class Portal: Think of this like an native async IPC API. """ - def __init__(self, channel): + def __init__(self, channel: 'Channel'): self.channel = channel # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point @@ -60,13 +61,15 @@ class Portal: self._exc = None self._expect_result = None - async def aclose(self): + async def aclose(self) -> None: log.debug(f"Closing {self}") # XXX: won't work until https://github.com/python-trio/trio/pull/460 # gets in! await self.channel.aclose() - async def _submit(self, ns, func, **kwargs): + async def _submit( + self, ns: str, func: str, **kwargs + ) -> (str, trio.Queue, str, dict): """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -93,12 +96,12 @@ class Portal: return cid, q, resp_type, first_msg - async def _submit_for_result(self, ns, func, **kwargs): + async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, **kwargs) - async def run(self, ns, func, **kwargs): + async def run(self, ns: str, func: str, **kwargs) -> typing.Any: """Submit a function to be scheduled and run by actor, wrap and return its (stream of) result(s). @@ -108,7 +111,9 @@ class Portal: *(await self._submit(ns, func, **kwargs)) ) - async def _return_from_resptype(self, cid, q, resptype, first_msg): + async def _return_from_resptype( + self, cid: str, q: trio.Queue, resptype: str, first_msg: dict + ) -> typing.Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) @@ -145,7 +150,7 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") - async def result(self): + async def result(self) -> typing.Any: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: @@ -165,13 +170,13 @@ class Portal: ) return self._result - async def close(self): + async def close(self) -> None: # trigger remote msg loop `break` chan = self.channel log.debug(f"Closing portal for {chan} to {chan.uid}") await self.channel.send(None) - async def cancel_actor(self): + async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ log.warn( @@ -198,10 +203,10 @@ class LocalPortal: A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__(self, actor): + def __init__(self, actor: 'Actor'): self.actor = actor - async def run(self, ns, func, **kwargs): + async def run(self, ns: str, func: str, **kwargs) -> typing.Any: """Run a requested function locally and return it's result. """ obj = self.actor if ns == 'self' else importlib.import_module(ns) @@ -210,7 +215,10 @@ class LocalPortal: @asynccontextmanager -async def open_portal(channel, nursery=None): +async def open_portal( + channel: 'Channel', + nursery: trio._core._run.Nursery = None +) -> typing.AsyncContextManager[Portal]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing. diff --git a/tractor/_trionics.py b/tractor/_trionics.py index d78cbdd..310fea9 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -4,6 +4,7 @@ import multiprocessing as mp import inspect from multiprocessing import forkserver, semaphore_tracker +import typing import trio from async_generator import asynccontextmanager, aclosing @@ -23,8 +24,8 @@ log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor, supervisor=None): - self.supervisor = supervisor # TODO + def __init__(self, actor: Actor): + # self.supervisor = supervisor # TODO self._actor = actor self._children = {} # portals spawned with ``run_in_actor()`` @@ -38,11 +39,11 @@ class ActorNursery: async def start_actor( self, name: str, - bind_addr=('127.0.0.1', 0), - statespace=None, - rpc_module_paths=None, - loglevel=None, # set log level per subactor - ): + bind_addr: (str, int) = ('127.0.0.1', 0), + statespace: dict = None, + rpc_module_paths: [str] = None, + loglevel: str = None, # set log level per subactor + ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() actor = Actor( name, @@ -104,14 +105,14 @@ class ActorNursery: async def run_in_actor( self, - name, - fn, - bind_addr=('127.0.0.1', 0), - rpc_module_paths=None, - statespace=None, - loglevel=None, # set log level per subactor + name: str, + fn: typing.Callable, + bind_addr: (str, int) = ('127.0.0.1', 0), + rpc_module_paths: [str] = None, + statespace: dict = None, + loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` - ): + ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and return its result. @@ -134,7 +135,7 @@ class ActorNursery: ) return portal - async def wait(self): + async def wait(self) -> None: """Wait for all subactors to complete. """ async def maybe_consume_result(portal, actor): @@ -193,7 +194,7 @@ class ActorNursery: cs = await nursery.start(wait_for_actor, portal, subactor) nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) - async def cancel(self, hard_kill=False): + async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel iteslf and wait for all subprocesses to terminate. @@ -274,7 +275,7 @@ class ActorNursery: @asynccontextmanager -async def open_nursery(supervisor=None): +async def open_nursery() -> typing.AsyncContextManager[ActorNursery]: """Create and yield a new ``ActorNursery``. """ actor = current_actor() @@ -282,5 +283,5 @@ async def open_nursery(supervisor=None): raise RuntimeError("No actor instance has been defined yet?") # TODO: figure out supervisors from erlang - async with ActorNursery(current_actor(), supervisor) as nursery: + async with ActorNursery(current_actor()) as nursery: yield nursery diff --git a/tractor/log.py b/tractor/log.py index 3b013d1..d3e2a7f 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -92,5 +92,5 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: return log -def get_loglevel(): +def get_loglevel() -> str: return _default_loglevel