diff --git a/.travis.yml b/.travis.yml index 4d69048..8837171 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,4 +12,5 @@ install: - pip install -U . -r requirements-test.txt script: + - mypy tractor/ --ignore-missing-imports - pytest tests/ --no-print-logs diff --git a/requirements-test.txt b/requirements-test.txt index 285c77f..447b18c 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,4 @@ pytest pytest-trio pdbpp +mypy diff --git a/tractor/__init__.py b/tractor/__init__.py index f5146ad..eccd5f7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -3,8 +3,9 @@ tractor: An actor model micro-framework built on ``trio`` and ``multiprocessing``. """ from functools import partial +import typing -import trio +import trio # type: ignore from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel @@ -32,7 +33,13 @@ _default_arbiter_host = '127.0.0.1' _default_arbiter_port = 1616 -async def _main(async_fn, args, kwargs, name, arbiter_addr): +async def _main( + async_fn: typing.Callable[..., typing.Awaitable], + args: typing.Tuple, + kwargs: typing.Dict[str, typing.Any], + name: str, + arbiter_addr: typing.Tuple[str, int] +) -> typing.Any: """Async entry point for ``tractor``. """ log = get_logger('tractor') @@ -73,11 +80,11 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): def run( - async_fn, - *args, - name=None, - arbiter_addr=(_default_arbiter_host, _default_arbiter_port), - **kwargs + async_fn: typing.Callable[..., typing.Awaitable], + *args: typing.Tuple, + name: str = None, + arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port), + **kwargs: typing.Dict[str, typing.Any], ): """Run a trio-actor async function in process. diff --git a/tractor/_actor.py b/tractor/_actor.py index a5e833f..2e00e99 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1,15 +1,17 @@ """ Actor primitives and helpers """ -import inspect -import importlib from collections import defaultdict from functools import partial +from itertools import chain +import importlib +import inspect import traceback import uuid -from itertools import chain +import typing +from typing import Dict, List, Tuple, Any, Optional, Union -import trio +import trio # type: ignore from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan @@ -36,12 +38,28 @@ class InternalActorError(RuntimeError): async def _invoke( - actor, cid, chan, func, kwargs, - treat_as_gen=False, + actor: 'Actor', + cid: str, + chan: Channel, + func: typing.Callable, + kwargs: Dict[str, Any], task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. """ + sig = inspect.signature(func) + treat_as_gen = False + if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" + kwargs['chan'] = chan + kwargs['cid'] = cid + # TODO: eventually we want to be more stringent + # about what is considered a far-end async-generator. + # Right now both actual async gens and any async + # function which declares a `chan` kwarg in its + # signature will be treated as one. + treat_as_gen = True try: is_async_partial = False is_async_gen_partial = False @@ -135,41 +153,45 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: [str] = [], - statespace: dict = {}, + rpc_module_paths: List[str] = [], + statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, - arbiter_addr: (str, int) = None, - ): + arbiter_addr: Optional[Tuple[str, int]] = None, + ) -> None: self.name = name self.uid = (name, uid or str(uuid.uuid1())) self.rpc_module_paths = rpc_module_paths - self._mods = {} + self._mods: dict = {} # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 - self.statespace = statespace + self.statespace = statespace or {} self.loglevel = loglevel self._arb_addr = arbiter_addr # filled in by `_async_main` after fork - self._root_nursery = None - self._server_nursery = None - self._peers = defaultdict(list) - self._peer_connected = {} + self._root_nursery: trio._core._run.Nursery = None + self._server_nursery: trio._core._run.Nursery = None + self._peers: defaultdict = defaultdict(list) + self._peer_connected: dict = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._no_more_rpc_tasks = trio.Event() self._no_more_rpc_tasks.set() - self._rpc_tasks = {} + self._rpc_tasks: Dict[ + Channel, + List[Tuple[trio._core._run.CancelScope, typing.Callable]] + ] = {} + # map {uids -> {callids -> waiter queues}} + self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} + self._listeners: List[trio.abc.Listener] = [] + self._parent_chan: Optional[Channel] = None + self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None - self._actors2calls = {} # map {uids -> {callids -> waiter queues}} - self._listeners = [] - self._parent_chan = None - self._accept_host = None - self._forkserver_info = None - - async def wait_for_peer(self, uid): + async def wait_for_peer( + self, uid: Tuple[str, str] + ) -> Tuple[trio.Event, Channel]: """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -179,11 +201,13 @@ class Actor: log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] - def load_namespaces(self): - # We load namespaces after fork since this actor may - # be spawned on a different machine from the original nursery - # and we need to try and load the local module code (if it - # exists) + def load_modules(self) -> None: + """Load allowed RPC modules locally (after fork). + + Since this actor may be spawned on a different machine from + the original nursery we need to try and load the local module + code (if it exists). + """ for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) @@ -198,7 +222,7 @@ class Actor: async def _stream_handler( self, stream: trio.SocketStream, - ): + ) -> None: """Entry point for new inbound connections to the channel server. """ self._no_more_peers.clear() @@ -256,33 +280,44 @@ class Actor: await chan.send(None) await chan.aclose() - async def _push_result(self, actorid, cid, msg): + async def _push_result(self, actorid, cid: str, msg: dict) -> None: + """Push an RPC result to the local consumer's queue. + """ assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await q.put(msg) - def get_waitq(self, actorid, cid): + def get_waitq( + self, + actorid: Tuple[str, str], + cid: str + ) -> trio.Queue: log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) - async def send_cmd(self, chan, ns, func, kwargs): + async def send_cmd( + self, chan: Channel, ns: str, func: str, kwargs: dict + ) -> Tuple[str, trio.Queue]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ cid = str(uuid.uuid1()) + assert chan.uid q = self.get_waitq(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, q - async def _process_messages(self, chan, treat_as_gen=False): - """Process messages async-RPC style. + async def _process_messages( + self, chan: Channel, treat_as_gen: bool = False + ) -> None: + """Process messages for the channel async-RPC style. - Process rpc requests and deliver retrieved responses from channels. + Receive multiplexed RPC requests and deliver responses over ``chan``. """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! @@ -315,6 +350,7 @@ class Actor: # push any non-rpc-response error to all local consumers # and mark the channel as errored chan._exc = err = msg['error'] + assert chan.uid for cid in self._actors2calls[chan.uid]: await self._push_result(chan.uid, cid, msg) raise InternalActorError(f"{chan.uid}\n" + err) @@ -328,23 +364,9 @@ class Actor: func = getattr(self._mods[ns], funcname) # spin up a task for the requested function - sig = inspect.signature(func) - treat_as_gen = False - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its - # signature will be treated as one. - treat_as_gen = True - log.debug(f"Spawning task for {func}") cs = await self._root_nursery.start( - _invoke, self, cid, chan, func, kwargs, treat_as_gen, + _invoke, self, cid, chan, func, kwargs, name=funcname ) # never allow cancelling cancel requests (results in @@ -371,7 +393,12 @@ class Actor: finally: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - def _fork_main(self, accept_addr, forkserver_info, parent_addr=None): + def _fork_main( + self, + accept_addr: Tuple[str, int], + forkserver_info: Tuple[Any, Any, Any, Any, Any], + parent_addr: Tuple[str, int] = None + ) -> None: # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") self._forkserver_info = forkserver_info @@ -391,22 +418,17 @@ class Actor: async def _async_main( self, - accept_addr, - arbiter_addr=None, - parent_addr=None, - _main_coro=None, - task_status=trio.TASK_STATUS_IGNORED, - ): + accept_addr: Tuple[str, int], + arbiter_addr: Optional[Tuple[str, int]] = None, + parent_addr: Optional[Tuple[str, int]] = None, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: """Start the channel server, maybe connect back to the parent, and start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ - # if this is the `MainProcess` then we get a ref to the main - # task's coroutine object for tossing errors into - self._main_coro = _main_coro - arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False try: @@ -414,7 +436,7 @@ class Actor: self._root_nursery = nursery # load allowed RPC module - self.load_namespaces() + self.load_modules() # Startup up channel server host, port = accept_addr @@ -422,12 +444,6 @@ class Actor: self._serve_forever, accept_host=host, accept_port=port) ) - # XXX: I wonder if a better name is maybe "requester" - # since I don't think the notion of a "parent" actor - # necessarily sticks given that eventually we want - # ``'MainProcess'`` (the actor who initially starts the - # forkserver) to eventually be the only one who is - # allowed to spawn new processes per Python program. if parent_addr is not None: try: # Connect back to the parent actor and conduct initial @@ -502,10 +518,10 @@ class Actor: self, *, # (host, port) to bind for channel server - accept_host=None, - accept_port=0, - task_status=trio.TASK_STATUS_IGNORED - ): + accept_host: Tuple[str, int] = None, + accept_port: int = 0, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: """Start the channel server, begin listening for new connections. This will cause an actor to continue living (blocking) until @@ -531,7 +547,7 @@ class Actor: self._listeners.extend(listeners) task_status.started() - async def _do_unreg(self, arbiter_addr): + async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None: # UNregister actor from the arbiter try: if arbiter_addr is not None: @@ -541,7 +557,7 @@ class Actor: except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") - async def cancel(self): + async def cancel(self) -> None: """Cancel this actor. The sequence in order is: @@ -554,23 +570,23 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() - async def cancel_rpc_tasks(self): + async def cancel_rpc_tasks(self) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ - scopes = self._rpc_tasks - log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}") - for chan, scopes in scopes.items(): + tasks = self._rpc_tasks + log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}") + for chan, scopes in tasks.items(): log.debug(f"Cancelling all tasks for {chan.uid}") for scope, func in scopes: log.debug(f"Cancelling task for {func}") scope.cancel() - if scopes: + if tasks: log.info( - f"Waiting for remaining rpc tasks to complete {scopes}") + f"Waiting for remaining rpc tasks to complete {tasks}") await self._no_more_rpc_tasks.wait() - def cancel_server(self): + def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. """ @@ -578,19 +594,22 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self): + def accept_addr(self) -> Optional[Tuple[str, int]]: """Primary address to which the channel server is bound. """ try: return self._listeners[0].socket.getsockname() except OSError: - return + return None - def get_parent(self): + def get_parent(self) -> Portal: + """Return a portal to our parent actor.""" + assert self._parent_chan, "No parent channel for this actor?" return Portal(self._parent_chan) - def get_chans(self, actorid): - return self._peers[actorid] + def get_chans(self, uid: Tuple[str, str]) -> List[Channel]: + """Return all channels to the actor with provided uid.""" + return self._peers[uid] class Arbiter(Actor): @@ -609,12 +628,16 @@ class Arbiter(Actor): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name): + def find_actor(self, name: str) -> Optional[Tuple[str, int]]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr - async def wait_for_actor(self, name): + return None + + async def wait_for_actor( + self, name: str + ) -> List[Tuple[str, int]]: """Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently @@ -635,7 +658,9 @@ class Arbiter(Actor): return sockaddrs - def register_actor(self, uid, sockaddr): + def register_actor( + self, uid: Tuple[str, str], sockaddr: Tuple[str, int] + ) -> None: name, uuid = uid self._registry[uid] = sockaddr @@ -645,13 +670,20 @@ class Arbiter(Actor): for event in events: event.set() - def unregister_actor(self, uid): + def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) -async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): - """Spawn a local actor by starting a task to execute it's main - async function. +async def _start_actor( + actor: Actor, + main: typing.Callable[..., typing.Awaitable], + host: str, + port: int, + arbiter_addr: Tuple[str, int], + nursery: trio._core._run.Nursery = None +): + """Spawn a local actor by starting a task to execute it's main async + function. Blocks if no nursery is provided, in which case it is expected the nursery provider is responsible for waiting on the task to complete. @@ -664,29 +696,22 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): log.info(f"Starting local {actor} @ {host}:{port}") async with trio.open_nursery() as nursery: - - if main is not None: - main_coro = main() - await nursery.start( partial( actor._async_main, accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, - _main_coro=main_coro ) ) if main is not None: - result = await main_coro + result = await main() # XXX: If spawned with a dedicated "main function", # the actor is cancelled when this context is complete # given that there are no more active peer channels connected actor.cancel_server() - # block on actor to complete - # unset module state _state._current_actor = None log.info("Completed async main") @@ -695,7 +720,9 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): @asynccontextmanager -async def get_arbiter(host, port): +async def get_arbiter( + host: str, port: int +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: """Return a portal instance connected to a local or remote arbiter. """ @@ -715,9 +742,8 @@ async def get_arbiter(host, port): @asynccontextmanager async def find_actor( - name, - arbiter_sockaddr=None, -): + name: str, arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Optional[Portal], None]: """Ask the arbiter to find actor(s) by name. Returns a connected portal to the last registered matching actor @@ -738,9 +764,9 @@ async def find_actor( @asynccontextmanager async def wait_for_actor( - name, - arbiter_sockaddr=None, -): + name: str, + arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Portal, None]: """Wait on an actor to register with the arbiter. A portal to the first actor which registered is be returned. diff --git a/tractor/_forkserver_hackzorz.py b/tractor/_forkserver_hackzorz.py index edd282d..411d190 100644 --- a/tractor/_forkserver_hackzorz.py +++ b/tractor/_forkserver_hackzorz.py @@ -2,6 +2,9 @@ This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py`` with some hackery to prevent any more then a single forkserver and semaphore tracker per ``MainProcess``. + +.. note:: There is no type hinting in this code base (yet) to remain as + a close as possible to upstream. """ import os import socket @@ -12,15 +15,12 @@ import errno import selectors import warnings -from multiprocessing import ( - forkserver, semaphore_tracker, spawn, process, util, - connection -) +from multiprocessing import semaphore_tracker, spawn, process # type: ignore +from multiprocessing import forkserver, util, connection # type: ignore from multiprocessing.forkserver import ( ForkServer, MAXFDS_TO_SEND - # _serve_one, ) -from multiprocessing.context import reduction +from multiprocessing.context import reduction # type: ignore # taken from 3.8 diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 2d86e56..6162150 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,7 +1,8 @@ """ Inter-process comms abstractions """ -from typing import Coroutine, Tuple +import typing +from typing import Any, Tuple, Optional import msgpack import trio @@ -14,21 +15,21 @@ log = get_logger('ipc') class StreamQueue: """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. """ - def __init__(self, stream): + def __init__(self, stream: trio.SocketStream) -> None: self.stream = stream self._agen = self._iter_packets() self._laddr = self.stream.socket.getsockname()[:2] self._raddr = self.stream.socket.getpeername()[:2] self._send_lock = trio.Lock() - async def _iter_packets(self): + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ unpacker = msgpack.Unpacker(raw=False, use_list=False) while True: try: data = await self.stream.receive_some(2**10) - log.trace(f"received {data}") + log.trace(f"received {data}") # type: ignore except trio.BrokenStreamError: log.error(f"Stream connection {self.raddr} broke") return @@ -42,25 +43,25 @@ class StreamQueue: yield packet @property - def laddr(self): + def laddr(self) -> Tuple[str, int]: return self._laddr @property - def raddr(self): + def raddr(self) -> Tuple[str, int]: return self._raddr - async def put(self, data): + async def put(self, data: Any) -> int: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) - async def get(self): + async def get(self) -> Any: return await self._agen.asend(None) def __aiter__(self): return self._agen - def connected(self): + def connected(self) -> bool: return self.stream.socket.fileno() != -1 @@ -72,24 +73,27 @@ class Channel: """ def __init__( self, - destaddr: tuple = None, - on_reconnect: Coroutine = None, + destaddr: Optional[Tuple[str, int]] = None, + on_reconnect: typing.Callable[..., typing.Awaitable] = None, auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active ) -> None: self._recon_seq = on_reconnect self._autorecon = auto_reconnect - self.squeue = StreamQueue(stream) if stream else None + self.squeue: Optional[StreamQueue] = StreamQueue( + stream) if stream else None if self.squeue and destaddr: raise ValueError( f"A stream was provided with local addr {self.laddr}" ) - self._destaddr = destaddr or self.squeue.raddr + self._destaddr = self.squeue.raddr if self.squeue else destaddr # set after handshake - always uid of far end - self.uid = None + self.uid: Optional[Tuple[str, str]] = None + # set if far end actor errors internally + self._exc: Optional[Exception] = None self._agen = self._aiter_recv() - def __repr__(self): + def __repr__(self) -> str: if self.squeue: return repr( self.squeue.stream.socket._sock).replace( @@ -97,14 +101,16 @@ class Channel: return object.__repr__(self) @property - def laddr(self): - return self.squeue.laddr if self.squeue else (None, None) + def laddr(self) -> Optional[Tuple[str, int]]: + return self.squeue.laddr if self.squeue else None @property - def raddr(self): - return self.squeue.raddr if self.squeue else (None, None) + def raddr(self) -> Optional[Tuple[str, int]]: + return self.squeue.raddr if self.squeue else None - async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): + async def connect( + self, destaddr: Tuple[str, int] = None, **kwargs + ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr @@ -112,11 +118,13 @@ class Channel: self.squeue = StreamQueue(stream) return stream - async def send(self, item): - log.trace(f"send `{item}`") + async def send(self, item: Any) -> None: + log.trace(f"send `{item}`") # type: ignore + assert self.squeue await self.squeue.put(item) - async def recv(self): + async def recv(self) -> Any: + assert self.squeue try: return await self.squeue.get() except trio.BrokenStreamError: @@ -124,8 +132,9 @@ class Channel: await self._reconnect() return await self.recv() - async def aclose(self, *args): + async def aclose(self) -> None: log.debug(f"Closing {self}") + assert self.squeue await self.squeue.stream.aclose() async def __aenter__(self): @@ -138,7 +147,7 @@ class Channel: def __aiter__(self): return self._agen - async def _reconnect(self): + async def _reconnect(self) -> None: """Handle connection failures by polling until a reconnect can be established. """ @@ -167,9 +176,12 @@ class Channel: " for re-establishment") await trio.sleep(1) - async def _aiter_recv(self): + async def _aiter_recv( + self + ) -> typing.AsyncGenerator[Any, None]: """Async iterate items from underlying stream. """ + assert self.squeue while True: try: async for item in self.squeue: @@ -189,14 +201,16 @@ class Channel: else: return - def connected(self): + def connected(self) -> bool: return self.squeue.connected() if self.squeue else False @asynccontextmanager -async def _connect_chan(host, port): - """Create and connect a channel with disconnect on - context manager teardown. +async def _connect_chan( + host: str, port: int +) -> typing.AsyncGenerator[Channel, None]: + """Create and connect a channel with disconnect on context manager + teardown. """ chan = Channel((host, port)) await chan.connect() diff --git a/tractor/_portal.py b/tractor/_portal.py index 835f918..ddbea76 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -2,11 +2,14 @@ Portal api """ import importlib +import typing +from typing import Tuple, Any, Dict, Optional import trio from async_generator import asynccontextmanager from ._state import current_actor +from ._ipc import Channel from .log import get_logger @@ -18,7 +21,7 @@ class RemoteActorError(RuntimeError): @asynccontextmanager -async def maybe_open_nursery(nursery=None): +async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -30,9 +33,12 @@ async def maybe_open_nursery(nursery=None): yield nursery -async def _do_handshake(actor, chan): +async def _do_handshake( + actor: 'Actor', # type: ignore + chan: Channel +)-> Any: await chan.send(actor.uid) - uid = await chan.recv() + uid: Tuple[str, str] = await chan.recv() if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") @@ -51,22 +57,26 @@ class Portal: Think of this like an native async IPC API. """ - def __init__(self, channel): + def __init__(self, channel: Channel) -> None: self.channel = channel # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result = None - self._exc = None - self._expect_result = None + self._exc: Optional[RemoteActorError] = None + self._expect_result: Optional[ + Tuple[str, Any, str, Dict[str, Any]] + ] = None - async def aclose(self): + async def aclose(self) -> None: log.debug(f"Closing {self}") # XXX: won't work until https://github.com/python-trio/trio/pull/460 # gets in! await self.channel.aclose() - async def _submit(self, ns, func, **kwargs): + async def _submit( + self, ns: str, func: str, **kwargs + ) -> Tuple[str, trio.Queue, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -93,12 +103,12 @@ class Portal: return cid, q, resp_type, first_msg - async def _submit_for_result(self, ns, func, **kwargs): + async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, **kwargs) - async def run(self, ns, func, **kwargs): + async def run(self, ns: str, func: str, **kwargs) -> Any: """Submit a function to be scheduled and run by actor, wrap and return its (stream of) result(s). @@ -108,7 +118,9 @@ class Portal: *(await self._submit(ns, func, **kwargs)) ) - async def _return_from_resptype(self, cid, q, resptype, first_msg): + async def _return_from_resptype( + self, cid: str, q: trio.Queue, resptype: str, first_msg: dict + ) -> Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) @@ -145,7 +157,7 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") - async def result(self): + async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: @@ -153,7 +165,7 @@ class Portal: # teardown can reraise them exc = self.channel._exc if exc: - raise RemoteActorError(f"{self.channel.uid}\n" + exc) + raise RemoteActorError(f"{self.channel.uid}\n{exc}") else: raise RuntimeError( f"Portal for {self.channel.uid} is not expecting a final" @@ -165,13 +177,13 @@ class Portal: ) return self._result - async def close(self): + async def close(self) -> None: # trigger remote msg loop `break` chan = self.channel log.debug(f"Closing portal for {chan} to {chan.uid}") await self.channel.send(None) - async def cancel_actor(self): + async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ log.warn( @@ -198,19 +210,24 @@ class LocalPortal: A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__(self, actor): + def __init__( + self, + actor: 'Actor' # type: ignore + ) -> None: self.actor = actor - async def run(self, ns, func, **kwargs): + async def run(self, ns: str, func: str, **kwargs) -> Any: """Run a requested function locally and return it's result. """ obj = self.actor if ns == 'self' else importlib.import_module(ns) - func = getattr(obj, func) - return func(**kwargs) + return getattr(obj, func)(**kwargs) @asynccontextmanager -async def open_portal(channel, nursery=None): +async def open_portal( + channel: Channel, + nursery: trio._core._run.Nursery = None +) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing. diff --git a/tractor/_state.py b/tractor/_state.py index 767bb27..704fae7 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,10 +1,13 @@ """ Per process state """ -_current_actor = None +from typing import Optional -def current_actor() -> 'Actor': +_current_actor: Optional['Actor'] = None # type: ignore + + +def current_actor() -> 'Actor': # type: ignore """Get the process-local actor instance. """ if not _current_actor: diff --git a/tractor/_trionics.py b/tractor/_trionics.py index d78cbdd..c25137e 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,7 +3,9 @@ """ import multiprocessing as mp import inspect -from multiprocessing import forkserver, semaphore_tracker +from multiprocessing import forkserver, semaphore_tracker # type: ignore +from typing import Tuple, List, Dict, Optional, Any +import typing import trio from async_generator import asynccontextmanager, aclosing @@ -23,14 +25,17 @@ log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor, supervisor=None): - self.supervisor = supervisor # TODO - self._actor = actor - self._children = {} + def __init__(self, actor: Actor) -> None: + # self.supervisor = supervisor # TODO + self._actor: Actor = actor + self._children: Dict[ + Tuple[str, str], + Tuple[Actor, mp.Process, Optional[Portal]] + ] = {} # portals spawned with ``run_in_actor()`` - self._cancel_after_result_on_exit = set() - self.cancelled = False - self._forkserver = None + self._cancel_after_result_on_exit: set = set() + self.cancelled: bool = False + self._forkserver: forkserver.ForkServer = None async def __aenter__(self): return self @@ -38,11 +43,11 @@ class ActorNursery: async def start_actor( self, name: str, - bind_addr=('127.0.0.1', 0), - statespace=None, - rpc_module_paths=None, - loglevel=None, # set log level per subactor - ): + bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + statespace: Optional[Dict[str, Any]] = None, + rpc_module_paths: List[str] = None, + loglevel: str = None, # set log level per subactor + ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() actor = Actor( name, @@ -70,6 +75,7 @@ class ActorNursery: semaphore_tracker._semaphore_tracker._fd, ) else: + assert self._actor._forkserver_info fs_info = ( fs._forkserver_address, fs._forkserver_alive_fd, @@ -87,7 +93,7 @@ class ActorNursery: # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait # for it to fully come up before sending a cancel request - self._children[actor.uid] = [actor, proc, None] + self._children[actor.uid] = (actor, proc, None) proc.start() if not proc.is_alive(): @@ -99,19 +105,19 @@ class ActorNursery: # local actor by the time we get a ref to it event, chan = await self._actor.wait_for_peer(actor.uid) portal = Portal(chan) - self._children[actor.uid][2] = portal + self._children[actor.uid] = (actor, proc, portal) return portal async def run_in_actor( self, - name, - fn, - bind_addr=('127.0.0.1', 0), - rpc_module_paths=None, - statespace=None, - loglevel=None, # set log level per subactor + name: str, + fn: typing.Callable, + bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + rpc_module_paths: List[str] = None, + statespace: dict = None, + loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` - ): + ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and return its result. @@ -134,7 +140,7 @@ class ActorNursery: ) return portal - async def wait(self): + async def wait(self) -> None: """Wait for all subactors to complete. """ async def maybe_consume_result(portal, actor): @@ -154,7 +160,12 @@ class ActorNursery: async for item in agen: log.debug(f"Consuming item {item}") - async def wait_for_proc(proc, actor, portal, cancel_scope): + async def wait_for_proc( + proc: mp.Process, + actor: Actor, + portal: Portal, + cancel_scope: trio._core._run.CancelScope, + ) -> None: # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) @@ -171,9 +182,10 @@ class ActorNursery: cancel_scope.cancel() async def wait_for_actor( - portal, actor, + portal: Portal, + actor: Actor, task_status=trio.TASK_STATUS_IGNORED, - ): + ) -> None: # cancel the actor gracefully with trio.open_cancel_scope() as cs: task_status.started(cs) @@ -193,7 +205,7 @@ class ActorNursery: cs = await nursery.start(wait_for_actor, portal, subactor) nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) - async def cancel(self, hard_kill=False): + async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel iteslf and wait for all subprocesses to terminate. @@ -230,6 +242,7 @@ class ActorNursery: do_hard_kill(proc) # spawn cancel tasks async + assert portal n.start_soon(portal.cancel_actor) log.debug(f"Waiting on all subactors to complete") @@ -274,7 +287,7 @@ class ActorNursery: @asynccontextmanager -async def open_nursery(supervisor=None): +async def open_nursery() -> typing.AsyncGenerator[None, ActorNursery]: """Create and yield a new ``ActorNursery``. """ actor = current_actor() @@ -282,5 +295,5 @@ async def open_nursery(supervisor=None): raise RuntimeError("No actor instance has been defined yet?") # TODO: figure out supervisors from erlang - async with ActorNursery(current_actor(), supervisor) as nursery: + async with ActorNursery(current_actor()) as nursery: yield nursery diff --git a/tractor/log.py b/tractor/log.py index 3b013d1..5a65603 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -2,9 +2,10 @@ Log like a forester! """ from functools import partial -import sys import logging -import colorlog +import colorlog # type: ignore +from typing import Optional + _proj_name = 'tractor' _default_loglevel = None @@ -69,28 +70,22 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: log = get_logger(name) # our root logger if not level: - return + return log log.setLevel(level.upper() if not isinstance(level, int) else level) - - if not any( - handler.stream == sys.stderr for handler in log.handlers - if getattr(handler, 'stream', None) - ): - handler = logging.StreamHandler() - - formatter = colorlog.ColoredFormatter( - LOG_FORMAT, - datefmt=DATE_FORMAT, - log_colors=STD_PALETTE, - secondary_log_colors=BOLD_PALETTE, - style='{', - ) - handler.setFormatter(formatter) - log.addHandler(handler) + handler = logging.StreamHandler() + formatter = colorlog.ColoredFormatter( + LOG_FORMAT, + datefmt=DATE_FORMAT, + log_colors=STD_PALETTE, + secondary_log_colors=BOLD_PALETTE, + style='{', + ) + handler.setFormatter(formatter) + log.addHandler(handler) return log -def get_loglevel(): +def get_loglevel() -> Optional[str]: return _default_loglevel