From 79c152fe381ce30c45e45688f49835d35eeca894 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Dec 2019 00:55:03 -0500 Subject: [PATCH] Make latest mpypy happy --- tractor/__init__.py | 17 ++++++------ tractor/_actor.py | 66 +++++++++++++++++++++++++------------------- tractor/_ipc.py | 28 +++++++++++++------ tractor/_portal.py | 15 ++++++---- tractor/_trionics.py | 12 ++++---- 5 files changed, 81 insertions(+), 57 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index fb9b095..10bb895 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any +from typing import Tuple, Any, Optional import typing import trio # type: ignore @@ -47,8 +47,8 @@ async def _main( async_fn: typing.Callable[..., typing.Awaitable], args: Tuple, kwargs: typing.Dict[str, typing.Any], - name: str, - arbiter_addr: Tuple[str, int] + arbiter_addr: Tuple[str, int], + name: Optional[str] = None, ) -> typing.Any: """Async entry point for ``tractor``. """ @@ -89,26 +89,27 @@ async def _main( # for it to stay up indefinitely until a re-election process has # taken place - which is not implemented yet FYI). return await _start_actor( - actor, main, host, port, arbiter_addr=arbiter_addr) + actor, main, host, port, arbiter_addr=arbiter_addr + ) def run( async_fn: typing.Callable[..., typing.Awaitable], - *args: Tuple, - name: str = None, + *args, + name: Optional[str] = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), # the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods start_method: str = 'forkserver', - **kwargs: typing.Dict[str, typing.Any], + **kwargs, ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ _spawn.try_set_start_method(start_method) - return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) + return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) def run_daemon( diff --git a/tractor/_actor.py b/tractor/_actor.py index 5b5e7ff..6755b0b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -11,6 +11,7 @@ import typing from typing import Dict, List, Tuple, Any, Optional import trio # type: ignore +from trio_typing import TaskStatus from async_generator import aclosing from ._ipc import Channel @@ -155,7 +156,11 @@ class Actor: with other actors through "portals" which provide a native async API around "channels". """ - is_arbiter = False + is_arbiter: bool = False + + # placeholders filled in by `_async_main` after fork + _root_nursery: trio.Nursery + _server_nursery: trio.Nursery def __init__( self, @@ -170,15 +175,13 @@ class Actor: self.uid = (name, uid or str(uuid.uuid4())) self.rpc_module_paths = rpc_module_paths self._mods: dict = {} + # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.statespace = statespace or {} self.loglevel = loglevel self._arb_addr = arbiter_addr - # filled in by `_async_main` after fork - self._root_nursery: trio._core._run.Nursery = None - self._server_nursery: trio._core._run.Nursery = None self._peers: defaultdict = defaultdict(list) self._peer_connected: dict = {} self._no_more_peers = trio.Event() @@ -188,15 +191,20 @@ class Actor: # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: Dict[ Tuple[Channel, str], - Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] + Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} self._cids2qs: Dict[ Tuple[Tuple[str, str], str], - trio.abc.SendChannel[Any]] = {} + Tuple[ + trio.abc.SendChannel[Any], + trio.abc.ReceiveChannel[Any] + ] + ] = {} self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None - self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None + self._forkserver_info: Optional[ + Tuple[Any, Any, Any, Any, Any]] = None async def wait_for_peer( self, uid: Tuple[str, str] @@ -303,8 +311,8 @@ class Actor: actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" cid = msg['cid'] - send_chan = self._cids2qs[(actorid, cid)] - assert send_chan.cid == cid + send_chan, recv_chan = self._cids2qs[(actorid, cid)] + assert send_chan.cid == cid # type: ignore if 'stop' in msg: log.debug(f"{send_chan} was terminated at remote end") return await send_chan.aclose() @@ -321,16 +329,17 @@ class Actor: self, actorid: Tuple[str, str], cid: str - ) -> trio.abc.ReceiveChannel: + ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: log.debug(f"Getting result queue for {actorid} cid {cid}") try: - recv_chan = self._cids2qs[(actorid, cid)] + send_chan, recv_chan = self._cids2qs[(actorid, cid)] except KeyError: send_chan, recv_chan = trio.open_memory_channel(1000) - send_chan.cid = cid - self._cids2qs[(actorid, cid)] = send_chan + send_chan.cid = cid # type: ignore + recv_chan.cid = cid # type: ignore + self._cids2qs[(actorid, cid)] = send_chan, recv_chan - return recv_chan + return send_chan, recv_chan async def send_cmd( self, @@ -345,7 +354,7 @@ class Actor: """ cid = str(uuid.uuid4()) assert chan.uid - recv_chan = self.get_memchans(chan.uid, cid) + send_chan, recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, recv_chan @@ -355,7 +364,7 @@ class Actor: chan: Channel, treat_as_gen: bool = False, shield: bool = False, - task_status=trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Process messages for the channel async-RPC style. @@ -511,7 +520,7 @@ class Actor: accept_addr: Tuple[str, int], arbiter_addr: Optional[Tuple[str, int]] = None, parent_addr: Optional[Tuple[str, int]] = None, - task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, maybe connect back to the parent, and start the main task. @@ -548,10 +557,12 @@ class Actor: " closing server") await self.cancel() self._parent_chan = None - - # handle new connection back to parent - nursery.start_soon( - self._process_messages, self._parent_chan) + raise + else: + # handle new connection back to parent + assert self._parent_chan + nursery.start_soon( + self._process_messages, self._parent_chan) # load exposed/allowed RPC modules # XXX: do this **after** establishing connection to parent @@ -560,6 +571,7 @@ class Actor: # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") + assert isinstance(arbiter_addr, tuple) async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( 'self', 'register_actor', @@ -615,7 +627,7 @@ class Actor: # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, accept_port: int = 0, - task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, begin listening for new connections. @@ -720,13 +732,11 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self) -> Optional[Tuple[str, int]]: + def accept_addr(self) -> Tuple[str, int]: """Primary address to which the channel server is bound. """ - try: - return self._listeners[0].socket.getsockname() - except OSError: - return None + # throws OSError on failure + return self._listeners[0].socket.getsockname() # type: ignore def get_parent(self) -> Portal: """Return a portal to our parent actor.""" @@ -826,7 +836,7 @@ async def _start_actor( host: str, port: int, arbiter_addr: Tuple[str, int], - nursery: trio._core._run.Nursery = None + nursery: trio.Nursery = None ): """Spawn a local actor by starting a task to execute it's main async function. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 94f978f..0d24307 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -17,9 +17,16 @@ class MsgpackStream: """ def __init__(self, stream: trio.SocketStream) -> None: self.stream = stream + assert self.stream.socket + # should both be IP sockets + lsockname = stream.socket.getsockname() + assert isinstance(lsockname, tuple) + self._laddr = lsockname[:2] + rsockname = stream.socket.getpeername() + assert isinstance(rsockname, tuple) + self._raddr = rsockname[:2] + self._agen = self._iter_packets() - self._laddr = self.stream.socket.getsockname()[:2] - self._raddr = self.stream.socket.getpeername()[:2] self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: @@ -43,14 +50,15 @@ class MsgpackStream: yield packet @property - def laddr(self) -> Tuple[str, int]: + def laddr(self) -> Tuple[Any, ...]: return self._laddr @property - def raddr(self) -> Tuple[str, int]: + def raddr(self) -> Tuple[Any, ...]: return self._raddr - async def send(self, data: Any) -> int: + # XXX: should this instead be called `.sendall()`? + async def send(self, data: Any) -> None: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) @@ -95,24 +103,26 @@ class Channel: def __repr__(self) -> str: if self.msgstream: return repr( - self.msgstream.stream.socket._sock).replace( + self.msgstream.stream.socket._sock).replace( # type: ignore "socket.socket", "Channel") return object.__repr__(self) @property - def laddr(self) -> Optional[Tuple[str, int]]: + def laddr(self) -> Optional[Tuple[Any, ...]]: return self.msgstream.laddr if self.msgstream else None @property - def raddr(self) -> Optional[Tuple[str, int]]: + def raddr(self) -> Optional[Tuple[Any, ...]]: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[str, int] = None, **kwargs + self, destaddr: Tuple[Any, ...] = None, + **kwargs ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr + assert isinstance(destaddr, tuple) stream = await trio.open_tcp_stream(*destaddr, **kwargs) self.msgstream = MsgpackStream(stream) return stream diff --git a/tractor/_portal.py b/tractor/_portal.py index 925e3ad..bd49170 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -21,7 +21,9 @@ log = get_logger('tractor') @asynccontextmanager -async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): +async def maybe_open_nursery( + nursery: trio.Nursery = None +) -> typing.AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -252,14 +254,14 @@ class Portal: for stream in self._streams.copy(): await stream.aclose() - async def aclose(self) -> None: + async def aclose(self): log.debug(f"Closing {self}") # TODO: once we move to implementing our own `ReceiveChannel` # (including remote task cancellation inside its `.aclose()`) # we'll need to .aclose all those channels here await self._cancel_streams() - async def cancel_actor(self) -> bool: + async def cancel_actor(self): """Cancel the actor on the other end of this portal. """ if not self.channel.connected(): @@ -279,7 +281,9 @@ class Portal: return True if cancel_scope.cancelled_caught: log.warning(f"May have failed to cancel {self.channel.uid}") - return False + + # if we get here some weird cancellation case happened + return False except trio.ClosedResourceError: log.warning( f"{self.channel} for {self.channel.uid} was already closed?") @@ -309,7 +313,7 @@ class LocalPortal: @asynccontextmanager async def open_portal( channel: Channel, - nursery: trio._core._run.Nursery = None + nursery: Optional[trio.Nursery] = None ) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -320,7 +324,6 @@ async def open_portal( was_connected = False async with maybe_open_nursery(nursery) as nursery: - if not channel.connected(): await channel.connect() was_connected = True diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 6954367..5250f81 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -64,7 +64,6 @@ class ActorNursery: arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr - assert parent_addr proc = _spawn.new_proc( name, actor, @@ -192,8 +191,7 @@ class ActorNursery: async def wait_for_proc( proc: mp.Process, actor: Actor, - portal: Portal, - cancel_scope: Optional[trio._core._run.CancelScope] = None, + cancel_scope: Optional[trio.CancelScope] = None, ) -> None: # TODO: timeout block here? if proc.is_alive(): @@ -207,7 +205,7 @@ class ActorNursery: # proc terminated, cancel result waiter that may have # been spawned in tandem if not done already - if cancel_scope: # and not portal._cancelled: + if cancel_scope: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -223,11 +221,12 @@ class ActorNursery: cs = None # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: + assert portal cs = await nursery.start( cancel_on_completion, portal, subactor) # TODO: how do we handle remote host spawned actors? nursery.start_soon( - wait_for_proc, proc, subactor, portal, cs) + wait_for_proc, proc, subactor, cs) if errors: if not self.cancelled: @@ -247,7 +246,8 @@ class ActorNursery: async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): # TODO: how do we handle remote host spawned actors? - nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) + assert portal + nursery.start_soon(wait_for_proc, proc, subactor, cs) log.debug(f"All subactors for {self} have terminated") if errors: