forked from goodboy/tractor
				
			Woot! mypy run is clean!
							parent
							
								
									18c55e2b5f
								
							
						
					
					
						commit
						086df43b59
					
				| 
						 | 
				
			
			@ -7,8 +7,9 @@ from itertools import chain
 | 
			
		|||
import importlib
 | 
			
		||||
import inspect
 | 
			
		||||
import traceback
 | 
			
		||||
import typing
 | 
			
		||||
import uuid
 | 
			
		||||
import typing
 | 
			
		||||
from typing import Dict, List, Tuple, Any, Optional, Union
 | 
			
		||||
 | 
			
		||||
import trio  # type: ignore
 | 
			
		||||
from async_generator import asynccontextmanager, aclosing
 | 
			
		||||
| 
						 | 
				
			
			@ -41,7 +42,7 @@ async def _invoke(
 | 
			
		|||
    cid: str,
 | 
			
		||||
    chan: Channel,
 | 
			
		||||
    func: typing.Callable,
 | 
			
		||||
    kwargs: typing.Dict[str, typing.Any],
 | 
			
		||||
    kwargs: Dict[str, Any],
 | 
			
		||||
    task_status=trio.TASK_STATUS_IGNORED
 | 
			
		||||
):
 | 
			
		||||
    """Invoke local func and return results over provided channel.
 | 
			
		||||
| 
						 | 
				
			
			@ -152,43 +153,45 @@ class Actor:
 | 
			
		|||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str,
 | 
			
		||||
        rpc_module_paths: typing.List[str] = [],
 | 
			
		||||
        statespace: typing.Dict[str, typing.Any] = {},
 | 
			
		||||
        rpc_module_paths: List[str] = [],
 | 
			
		||||
        statespace: Optional[Dict[str, Any]] = None,
 | 
			
		||||
        uid: str = None,
 | 
			
		||||
        loglevel: str = None,
 | 
			
		||||
        arbiter_addr: typing.Tuple[str, int] = None,
 | 
			
		||||
    ):
 | 
			
		||||
        arbiter_addr: Optional[Tuple[str, int]] = None,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self.name = name
 | 
			
		||||
        self.uid = (name, uid or str(uuid.uuid1()))
 | 
			
		||||
        self.rpc_module_paths = rpc_module_paths
 | 
			
		||||
        self._mods = {}
 | 
			
		||||
        self._mods: dict = {}
 | 
			
		||||
        # TODO: consider making this a dynamically defined
 | 
			
		||||
        # @dataclass once we get py3.7
 | 
			
		||||
        self.statespace = statespace
 | 
			
		||||
        self.statespace = statespace or {}
 | 
			
		||||
        self.loglevel = loglevel
 | 
			
		||||
        self._arb_addr = arbiter_addr
 | 
			
		||||
 | 
			
		||||
        # filled in by `_async_main` after fork
 | 
			
		||||
        self._root_nursery = None
 | 
			
		||||
        self._server_nursery = None
 | 
			
		||||
        self._peers = defaultdict(list)
 | 
			
		||||
        self._peer_connected = {}
 | 
			
		||||
        self._root_nursery: trio._core._run.Nursery = None
 | 
			
		||||
        self._server_nursery: trio._core._run.Nursery = None
 | 
			
		||||
        self._peers: defaultdict = defaultdict(list)
 | 
			
		||||
        self._peer_connected: dict = {}
 | 
			
		||||
        self._no_more_peers = trio.Event()
 | 
			
		||||
        self._no_more_peers.set()
 | 
			
		||||
 | 
			
		||||
        self._no_more_rpc_tasks = trio.Event()
 | 
			
		||||
        self._no_more_rpc_tasks.set()
 | 
			
		||||
        self._rpc_tasks = {}
 | 
			
		||||
 | 
			
		||||
        self._actors2calls = {}  # map {uids -> {callids -> waiter queues}}
 | 
			
		||||
        self._listeners = []
 | 
			
		||||
        self._parent_chan = None
 | 
			
		||||
        self._accept_host = None
 | 
			
		||||
        self._forkserver_info = None
 | 
			
		||||
        self._rpc_tasks: Dict[
 | 
			
		||||
            Channel,
 | 
			
		||||
            List[Tuple[trio._core._run.CancelScope, typing.Callable]]
 | 
			
		||||
        ] = {}
 | 
			
		||||
        # map {uids -> {callids -> waiter queues}}
 | 
			
		||||
        self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {}
 | 
			
		||||
        self._listeners: List[trio.abc.Listener] = []
 | 
			
		||||
        self._parent_chan: Optional[Channel] = None
 | 
			
		||||
        self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None
 | 
			
		||||
 | 
			
		||||
    async def wait_for_peer(
 | 
			
		||||
        self, uid: typing.Tuple[str, str]
 | 
			
		||||
    ) -> (trio.Event, Channel):
 | 
			
		||||
        self, uid: Tuple[str, str]
 | 
			
		||||
    ) -> Tuple[trio.Event, Channel]:
 | 
			
		||||
        """Wait for a connection back from a spawned actor with a given
 | 
			
		||||
        ``uid``.
 | 
			
		||||
        """
 | 
			
		||||
| 
						 | 
				
			
			@ -287,7 +290,9 @@ class Actor:
 | 
			
		|||
        await q.put(msg)
 | 
			
		||||
 | 
			
		||||
    def get_waitq(
 | 
			
		||||
        self, actorid: typing.Tuple[str, str], cid: str
 | 
			
		||||
        self,
 | 
			
		||||
        actorid: Tuple[str, str],
 | 
			
		||||
        cid: str
 | 
			
		||||
    ) -> trio.Queue:
 | 
			
		||||
        log.debug(f"Getting result queue for {actorid} cid {cid}")
 | 
			
		||||
        cids2qs = self._actors2calls.setdefault(actorid, {})
 | 
			
		||||
| 
						 | 
				
			
			@ -295,12 +300,13 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
    async def send_cmd(
 | 
			
		||||
        self, chan: Channel, ns: str, func: str, kwargs: dict
 | 
			
		||||
    ) -> typing.Tuple[str, trio.Queue]:
 | 
			
		||||
    ) -> Tuple[str, trio.Queue]:
 | 
			
		||||
        """Send a ``'cmd'`` message to a remote actor and return a
 | 
			
		||||
        caller id and a ``trio.Queue`` that can be used to wait for
 | 
			
		||||
        responses delivered by the local message processing loop.
 | 
			
		||||
        """
 | 
			
		||||
        cid = str(uuid.uuid1())
 | 
			
		||||
        assert chan.uid
 | 
			
		||||
        q = self.get_waitq(chan.uid, cid)
 | 
			
		||||
        log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
 | 
			
		||||
        await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
 | 
			
		||||
| 
						 | 
				
			
			@ -344,6 +350,7 @@ class Actor:
 | 
			
		|||
                    # push any non-rpc-response error to all local consumers
 | 
			
		||||
                    # and mark the channel as errored
 | 
			
		||||
                    chan._exc = err = msg['error']
 | 
			
		||||
                    assert chan.uid
 | 
			
		||||
                    for cid in self._actors2calls[chan.uid]:
 | 
			
		||||
                        await self._push_result(chan.uid, cid, msg)
 | 
			
		||||
                    raise InternalActorError(f"{chan.uid}\n" + err)
 | 
			
		||||
| 
						 | 
				
			
			@ -387,9 +394,10 @@ class Actor:
 | 
			
		|||
            log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
 | 
			
		||||
 | 
			
		||||
    def _fork_main(
 | 
			
		||||
        self, accept_addr: typing.Tuple[str, int],
 | 
			
		||||
        forkserver_info: tuple,
 | 
			
		||||
        parent_addr: typing.Tuple[str, int] = None
 | 
			
		||||
        self,
 | 
			
		||||
        accept_addr: Tuple[str, int],
 | 
			
		||||
        forkserver_info: Tuple[Any, Any, Any, Any, Any],
 | 
			
		||||
        parent_addr: Tuple[str, int] = None
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        # after fork routine which invokes a fresh ``trio.run``
 | 
			
		||||
        # log.warn("Log level after fork is {self.loglevel}")
 | 
			
		||||
| 
						 | 
				
			
			@ -410,9 +418,9 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
    async def _async_main(
 | 
			
		||||
        self,
 | 
			
		||||
        accept_addr: typing.Tuple[str, int],
 | 
			
		||||
        arbiter_addr: typing.Tuple[str, int] = None,
 | 
			
		||||
        parent_addr: typing.Tuple[str, int] = None,
 | 
			
		||||
        accept_addr: Tuple[str, int],
 | 
			
		||||
        arbiter_addr: Optional[Tuple[str, int]] = None,
 | 
			
		||||
        parent_addr: Optional[Tuple[str, int]] = None,
 | 
			
		||||
        task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        """Start the channel server, maybe connect back to the parent, and
 | 
			
		||||
| 
						 | 
				
			
			@ -510,7 +518,7 @@ class Actor:
 | 
			
		|||
        self,
 | 
			
		||||
        *,
 | 
			
		||||
        # (host, port) to bind for channel server
 | 
			
		||||
        accept_host: typing.Tuple[str, int] = None,
 | 
			
		||||
        accept_host: Tuple[str, int] = None,
 | 
			
		||||
        accept_port: int = 0,
 | 
			
		||||
        task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
| 
						 | 
				
			
			@ -539,7 +547,7 @@ class Actor:
 | 
			
		|||
            self._listeners.extend(listeners)
 | 
			
		||||
            task_status.started()
 | 
			
		||||
 | 
			
		||||
    async def _do_unreg(self, arbiter_addr: typing.Tuple[str, int]) -> None:
 | 
			
		||||
    async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None:
 | 
			
		||||
        # UNregister actor from the arbiter
 | 
			
		||||
        try:
 | 
			
		||||
            if arbiter_addr is not None:
 | 
			
		||||
| 
						 | 
				
			
			@ -566,16 +574,16 @@ class Actor:
 | 
			
		|||
        """Cancel all existing RPC responder tasks using the cancel scope
 | 
			
		||||
        registered for each.
 | 
			
		||||
        """
 | 
			
		||||
        scopes = self._rpc_tasks
 | 
			
		||||
        log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}")
 | 
			
		||||
        for chan, scopes in scopes.items():
 | 
			
		||||
        tasks = self._rpc_tasks
 | 
			
		||||
        log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}")
 | 
			
		||||
        for chan, scopes in tasks.items():
 | 
			
		||||
            log.debug(f"Cancelling all tasks for {chan.uid}")
 | 
			
		||||
            for scope, func in scopes:
 | 
			
		||||
                log.debug(f"Cancelling task for {func}")
 | 
			
		||||
                scope.cancel()
 | 
			
		||||
        if scopes:
 | 
			
		||||
        if tasks:
 | 
			
		||||
            log.info(
 | 
			
		||||
                f"Waiting for remaining rpc tasks to complete {scopes}")
 | 
			
		||||
                f"Waiting for remaining rpc tasks to complete {tasks}")
 | 
			
		||||
            await self._no_more_rpc_tasks.wait()
 | 
			
		||||
 | 
			
		||||
    def cancel_server(self) -> None:
 | 
			
		||||
| 
						 | 
				
			
			@ -586,19 +594,20 @@ class Actor:
 | 
			
		|||
        self._server_nursery.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def accept_addr(self) -> typing.Tuple[str, int]:
 | 
			
		||||
    def accept_addr(self) -> Optional[Tuple[str, int]]:
 | 
			
		||||
        """Primary address to which the channel server is bound.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            return self._listeners[0].socket.getsockname()
 | 
			
		||||
        except OSError:
 | 
			
		||||
            return
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
    def get_parent(self) -> Portal:
 | 
			
		||||
        """Return a portal to our parent actor."""
 | 
			
		||||
        assert self._parent_chan, "No parent channel for this actor?"
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    def get_chans(self, uid: typing.Tuple[str, str]) -> typing.List[Channel]:
 | 
			
		||||
    def get_chans(self, uid: Tuple[str, str]) -> List[Channel]:
 | 
			
		||||
        """Return all channels to the actor with provided uid."""
 | 
			
		||||
        return self._peers[uid]
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -619,14 +628,16 @@ class Arbiter(Actor):
 | 
			
		|||
        self._waiters = {}
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def find_actor(self, name: str) -> typing.Tuple[str, int]:
 | 
			
		||||
    def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
 | 
			
		||||
        for uid, sockaddr in self._registry.items():
 | 
			
		||||
            if name in uid:
 | 
			
		||||
                return sockaddr
 | 
			
		||||
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    async def wait_for_actor(
 | 
			
		||||
        self, name: str
 | 
			
		||||
    ) -> typing.List[typing.Tuple[str, int]]:
 | 
			
		||||
    ) -> List[Tuple[str, int]]:
 | 
			
		||||
        """Wait for a particular actor to register.
 | 
			
		||||
 | 
			
		||||
        This is a blocking call if no actor by the provided name is currently
 | 
			
		||||
| 
						 | 
				
			
			@ -648,7 +659,7 @@ class Arbiter(Actor):
 | 
			
		|||
        return sockaddrs
 | 
			
		||||
 | 
			
		||||
    def register_actor(
 | 
			
		||||
        self, uid: typing.Tuple[str, str], sockaddr: typing.Tuple[str, int]
 | 
			
		||||
        self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        name, uuid = uid
 | 
			
		||||
        self._registry[uid] = sockaddr
 | 
			
		||||
| 
						 | 
				
			
			@ -659,16 +670,16 @@ class Arbiter(Actor):
 | 
			
		|||
        for event in events:
 | 
			
		||||
            event.set()
 | 
			
		||||
 | 
			
		||||
    def unregister_actor(self, uid: typing.Tuple[str, str]) -> None:
 | 
			
		||||
    def unregister_actor(self, uid: Tuple[str, str]) -> None:
 | 
			
		||||
        self._registry.pop(uid, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _start_actor(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    main: typing.Coroutine,
 | 
			
		||||
    main: typing.Callable[..., typing.Awaitable],
 | 
			
		||||
    host: str,
 | 
			
		||||
    port: int,
 | 
			
		||||
    arbiter_addr: typing.Tuple[str, int],
 | 
			
		||||
    arbiter_addr: Tuple[str, int],
 | 
			
		||||
    nursery: trio._core._run.Nursery = None
 | 
			
		||||
):
 | 
			
		||||
    """Spawn a local actor by starting a task to execute it's main async
 | 
			
		||||
| 
						 | 
				
			
			@ -709,7 +720,9 @@ async def _start_actor(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def get_arbiter(host: str, port: int) -> Portal:
 | 
			
		||||
async def get_arbiter(
 | 
			
		||||
    host: str, port: int
 | 
			
		||||
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
 | 
			
		||||
    """Return a portal instance connected to a local or remote
 | 
			
		||||
    arbiter.
 | 
			
		||||
    """
 | 
			
		||||
| 
						 | 
				
			
			@ -729,8 +742,8 @@ async def get_arbiter(host: str, port: int) -> Portal:
 | 
			
		|||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def find_actor(
 | 
			
		||||
    name: str, arbiter_sockaddr: typing.Tuple[str, int] = None
 | 
			
		||||
) -> Portal:
 | 
			
		||||
    name: str, arbiter_sockaddr: Tuple[str, int] = None
 | 
			
		||||
) -> typing.AsyncGenerator[Optional[Portal], None]:
 | 
			
		||||
    """Ask the arbiter to find actor(s) by name.
 | 
			
		||||
 | 
			
		||||
    Returns a connected portal to the last registered matching actor
 | 
			
		||||
| 
						 | 
				
			
			@ -752,8 +765,8 @@ async def find_actor(
 | 
			
		|||
@asynccontextmanager
 | 
			
		||||
async def wait_for_actor(
 | 
			
		||||
    name: str,
 | 
			
		||||
    arbiter_sockaddr: typing.Tuple[str, int] = None
 | 
			
		||||
) -> Portal:
 | 
			
		||||
    arbiter_sockaddr: Tuple[str, int] = None
 | 
			
		||||
) -> typing.AsyncGenerator[Portal, None]:
 | 
			
		||||
    """Wait on an actor to register with the arbiter.
 | 
			
		||||
 | 
			
		||||
    A portal to the first actor which registered is be returned.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,15 +15,12 @@ import errno
 | 
			
		|||
import selectors
 | 
			
		||||
import warnings
 | 
			
		||||
 | 
			
		||||
from multiprocessing import (
 | 
			
		||||
    forkserver, semaphore_tracker, spawn, process, util,
 | 
			
		||||
    connection
 | 
			
		||||
)
 | 
			
		||||
from multiprocessing import semaphore_tracker, spawn, process  # type: ignore
 | 
			
		||||
from multiprocessing import forkserver, util, connection  # type: ignore
 | 
			
		||||
from multiprocessing.forkserver import (
 | 
			
		||||
        ForkServer, MAXFDS_TO_SEND
 | 
			
		||||
        # _serve_one,
 | 
			
		||||
)
 | 
			
		||||
from multiprocessing.context import reduction
 | 
			
		||||
from multiprocessing.context import reduction  # type: ignore
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# taken from 3.8
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,6 +2,7 @@
 | 
			
		|||
Inter-process comms abstractions
 | 
			
		||||
"""
 | 
			
		||||
import typing
 | 
			
		||||
from typing import Any, Tuple, Optional
 | 
			
		||||
 | 
			
		||||
import msgpack
 | 
			
		||||
import trio
 | 
			
		||||
| 
						 | 
				
			
			@ -14,7 +15,7 @@ log = get_logger('ipc')
 | 
			
		|||
class StreamQueue:
 | 
			
		||||
    """Stream wrapped as a queue that delivers ``msgpack`` serialized objects.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, stream: trio.SocketStream):
 | 
			
		||||
    def __init__(self, stream: trio.SocketStream) -> None:
 | 
			
		||||
        self.stream = stream
 | 
			
		||||
        self._agen = self._iter_packets()
 | 
			
		||||
        self._laddr = self.stream.socket.getsockname()[:2]
 | 
			
		||||
| 
						 | 
				
			
			@ -28,7 +29,7 @@ class StreamQueue:
 | 
			
		|||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                data = await self.stream.receive_some(2**10)
 | 
			
		||||
                log.trace(f"received {data}")
 | 
			
		||||
                log.trace(f"received {data}")  # type: ignore
 | 
			
		||||
            except trio.BrokenStreamError:
 | 
			
		||||
                log.error(f"Stream connection {self.raddr} broke")
 | 
			
		||||
                return
 | 
			
		||||
| 
						 | 
				
			
			@ -42,19 +43,19 @@ class StreamQueue:
 | 
			
		|||
                yield packet
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def laddr(self) -> typing.Tuple[str, int]:
 | 
			
		||||
    def laddr(self) -> Tuple[str, int]:
 | 
			
		||||
        return self._laddr
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def raddr(self) -> typing.Tuple[str, int]:
 | 
			
		||||
    def raddr(self) -> Tuple[str, int]:
 | 
			
		||||
        return self._raddr
 | 
			
		||||
 | 
			
		||||
    async def put(self, data: typing.Any) -> int:
 | 
			
		||||
    async def put(self, data: Any) -> int:
 | 
			
		||||
        async with self._send_lock:
 | 
			
		||||
            return await self.stream.send_all(
 | 
			
		||||
                msgpack.dumps(data, use_bin_type=True))
 | 
			
		||||
 | 
			
		||||
    async def get(self) -> typing.Any:
 | 
			
		||||
    async def get(self) -> Any:
 | 
			
		||||
        return await self._agen.asend(None)
 | 
			
		||||
 | 
			
		||||
    def __aiter__(self):
 | 
			
		||||
| 
						 | 
				
			
			@ -72,21 +73,24 @@ class Channel:
 | 
			
		|||
    """
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        destaddr: tuple = None,
 | 
			
		||||
        on_reconnect: typing.Coroutine = None,
 | 
			
		||||
        destaddr: Optional[Tuple[str, int]] = None,
 | 
			
		||||
        on_reconnect: typing.Callable[..., typing.Awaitable] = None,
 | 
			
		||||
        auto_reconnect: bool = False,
 | 
			
		||||
        stream: trio.SocketStream = None,  # expected to be active
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self._recon_seq = on_reconnect
 | 
			
		||||
        self._autorecon = auto_reconnect
 | 
			
		||||
        self.squeue = StreamQueue(stream) if stream else None
 | 
			
		||||
        self.squeue: Optional[StreamQueue] = StreamQueue(
 | 
			
		||||
            stream) if stream else None
 | 
			
		||||
        if self.squeue and destaddr:
 | 
			
		||||
            raise ValueError(
 | 
			
		||||
                f"A stream was provided with local addr {self.laddr}"
 | 
			
		||||
            )
 | 
			
		||||
        self._destaddr = destaddr or self.squeue.raddr
 | 
			
		||||
        self._destaddr = self.squeue.raddr if self.squeue else destaddr
 | 
			
		||||
        # set after handshake - always uid of far end
 | 
			
		||||
        self.uid = None
 | 
			
		||||
        self.uid: Optional[Tuple[str, str]] = None
 | 
			
		||||
        # set if far end actor errors internally
 | 
			
		||||
        self._exc: Optional[Exception] = None
 | 
			
		||||
        self._agen = self._aiter_recv()
 | 
			
		||||
 | 
			
		||||
    def __repr__(self) -> str:
 | 
			
		||||
| 
						 | 
				
			
			@ -97,15 +101,15 @@ class Channel:
 | 
			
		|||
        return object.__repr__(self)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def laddr(self) -> typing.Tuple[str, int]:
 | 
			
		||||
        return self.squeue.laddr if self.squeue else (None, None)
 | 
			
		||||
    def laddr(self) -> Optional[Tuple[str, int]]:
 | 
			
		||||
        return self.squeue.laddr if self.squeue else None
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def raddr(self) -> typing.Tuple[str, int]:
 | 
			
		||||
        return self.squeue.raddr if self.squeue else (None, None)
 | 
			
		||||
    def raddr(self) -> Optional[Tuple[str, int]]:
 | 
			
		||||
        return self.squeue.raddr if self.squeue else None
 | 
			
		||||
 | 
			
		||||
    async def connect(
 | 
			
		||||
        self, destaddr: typing.Tuple[str, int] = None, **kwargs
 | 
			
		||||
        self, destaddr: Tuple[str, int] = None, **kwargs
 | 
			
		||||
    ) -> trio.SocketStream:
 | 
			
		||||
        if self.connected():
 | 
			
		||||
            raise RuntimeError("channel is already connected?")
 | 
			
		||||
| 
						 | 
				
			
			@ -114,11 +118,13 @@ class Channel:
 | 
			
		|||
        self.squeue = StreamQueue(stream)
 | 
			
		||||
        return stream
 | 
			
		||||
 | 
			
		||||
    async def send(self, item: typing.Any) -> None:
 | 
			
		||||
        log.trace(f"send `{item}`")
 | 
			
		||||
    async def send(self, item: Any) -> None:
 | 
			
		||||
        log.trace(f"send `{item}`")  # type: ignore
 | 
			
		||||
        assert self.squeue
 | 
			
		||||
        await self.squeue.put(item)
 | 
			
		||||
 | 
			
		||||
    async def recv(self) -> typing.Any:
 | 
			
		||||
    async def recv(self) -> Any:
 | 
			
		||||
        assert self.squeue
 | 
			
		||||
        try:
 | 
			
		||||
            return await self.squeue.get()
 | 
			
		||||
        except trio.BrokenStreamError:
 | 
			
		||||
| 
						 | 
				
			
			@ -128,6 +134,7 @@ class Channel:
 | 
			
		|||
 | 
			
		||||
    async def aclose(self) -> None:
 | 
			
		||||
        log.debug(f"Closing {self}")
 | 
			
		||||
        assert self.squeue
 | 
			
		||||
        await self.squeue.stream.aclose()
 | 
			
		||||
 | 
			
		||||
    async def __aenter__(self):
 | 
			
		||||
| 
						 | 
				
			
			@ -171,9 +178,10 @@ class Channel:
 | 
			
		|||
 | 
			
		||||
    async def _aiter_recv(
 | 
			
		||||
        self
 | 
			
		||||
    ) -> typing.AsyncGenerator[typing.Any, None]:
 | 
			
		||||
    ) -> typing.AsyncGenerator[Any, None]:
 | 
			
		||||
        """Async iterate items from underlying stream.
 | 
			
		||||
        """
 | 
			
		||||
        assert self.squeue
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                async for item in self.squeue:
 | 
			
		||||
| 
						 | 
				
			
			@ -200,7 +208,7 @@ class Channel:
 | 
			
		|||
@asynccontextmanager
 | 
			
		||||
async def _connect_chan(
 | 
			
		||||
    host: str, port: int
 | 
			
		||||
) -> typing.AsyncContextManager[Channel]:
 | 
			
		||||
) -> typing.AsyncGenerator[Channel, None]:
 | 
			
		||||
    """Create and connect a channel with disconnect on context manager
 | 
			
		||||
    teardown.
 | 
			
		||||
    """
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,11 +3,13 @@ Portal api
 | 
			
		|||
"""
 | 
			
		||||
import importlib
 | 
			
		||||
import typing
 | 
			
		||||
from typing import Tuple, Any, Dict, Optional
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from async_generator import asynccontextmanager
 | 
			
		||||
 | 
			
		||||
from ._state import current_actor
 | 
			
		||||
from ._ipc import Channel
 | 
			
		||||
from .log import get_logger
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -32,10 +34,11 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def _do_handshake(
 | 
			
		||||
    actor: 'Actor', chan: 'Channel'
 | 
			
		||||
)-> typing.Tuple[str, str]:
 | 
			
		||||
    actor: 'Actor',  # type: ignore
 | 
			
		||||
    chan: Channel
 | 
			
		||||
)-> Any:
 | 
			
		||||
    await chan.send(actor.uid)
 | 
			
		||||
    uid = await chan.recv()
 | 
			
		||||
    uid: Tuple[str, str] = await chan.recv()
 | 
			
		||||
 | 
			
		||||
    if not isinstance(uid, tuple):
 | 
			
		||||
        raise ValueError(f"{uid} is not a valid uid?!")
 | 
			
		||||
| 
						 | 
				
			
			@ -54,14 +57,16 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
    Think of this like an native async IPC API.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, channel: 'Channel'):
 | 
			
		||||
    def __init__(self, channel: Channel) -> None:
 | 
			
		||||
        self.channel = channel
 | 
			
		||||
        # when this is set to a tuple returned from ``_submit()`` then
 | 
			
		||||
        # it is expected that ``result()`` will be awaited at some point
 | 
			
		||||
        # during the portal's lifetime
 | 
			
		||||
        self._result = None
 | 
			
		||||
        self._exc = None
 | 
			
		||||
        self._expect_result = None
 | 
			
		||||
        self._exc: Optional[RemoteActorError] = None
 | 
			
		||||
        self._expect_result: Optional[
 | 
			
		||||
            Tuple[str, Any, str, Dict[str, Any]]
 | 
			
		||||
        ] = None
 | 
			
		||||
 | 
			
		||||
    async def aclose(self) -> None:
 | 
			
		||||
        log.debug(f"Closing {self}")
 | 
			
		||||
| 
						 | 
				
			
			@ -71,7 +76,7 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
    async def _submit(
 | 
			
		||||
        self, ns: str, func: str, **kwargs
 | 
			
		||||
    ) -> typing.Tuple[str, trio.Queue, str, typing.Dict[str, typing.Any]]:
 | 
			
		||||
    ) -> Tuple[str, trio.Queue, str, Dict[str, Any]]:
 | 
			
		||||
        """Submit a function to be scheduled and run by actor, return the
 | 
			
		||||
        associated caller id, response queue, response type str,
 | 
			
		||||
        first message packet as a tuple.
 | 
			
		||||
| 
						 | 
				
			
			@ -103,7 +108,7 @@ class Portal:
 | 
			
		|||
                "A pending main result has already been submitted"
 | 
			
		||||
        self._expect_result = await self._submit(ns, func, **kwargs)
 | 
			
		||||
 | 
			
		||||
    async def run(self, ns: str, func: str, **kwargs) -> typing.Any:
 | 
			
		||||
    async def run(self, ns: str, func: str, **kwargs) -> Any:
 | 
			
		||||
        """Submit a function to be scheduled and run by actor, wrap and return
 | 
			
		||||
        its (stream of) result(s).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -115,7 +120,7 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
    async def _return_from_resptype(
 | 
			
		||||
        self, cid: str, q: trio.Queue, resptype: str, first_msg: dict
 | 
			
		||||
    ) -> typing.Any:
 | 
			
		||||
    ) -> Any:
 | 
			
		||||
        # TODO: not this needs some serious work and thinking about how
 | 
			
		||||
        # to make async-generators the fundamental IPC API over channels!
 | 
			
		||||
        # (think `yield from`, `gen.send()`, and functional reactive stuff)
 | 
			
		||||
| 
						 | 
				
			
			@ -152,7 +157,7 @@ class Portal:
 | 
			
		|||
        else:
 | 
			
		||||
            raise ValueError(f"Unknown msg response type: {first_msg}")
 | 
			
		||||
 | 
			
		||||
    async def result(self) -> typing.Any:
 | 
			
		||||
    async def result(self) -> Any:
 | 
			
		||||
        """Return the result(s) from the remote actor's "main" task.
 | 
			
		||||
        """
 | 
			
		||||
        if self._expect_result is None:
 | 
			
		||||
| 
						 | 
				
			
			@ -160,7 +165,7 @@ class Portal:
 | 
			
		|||
            # teardown can reraise them
 | 
			
		||||
            exc = self.channel._exc
 | 
			
		||||
            if exc:
 | 
			
		||||
                raise RemoteActorError(f"{self.channel.uid}\n" + exc)
 | 
			
		||||
                raise RemoteActorError(f"{self.channel.uid}\n{exc}")
 | 
			
		||||
            else:
 | 
			
		||||
                raise RuntimeError(
 | 
			
		||||
                    f"Portal for {self.channel.uid} is not expecting a final"
 | 
			
		||||
| 
						 | 
				
			
			@ -205,22 +210,24 @@ class LocalPortal:
 | 
			
		|||
    A compatibility shim for normal portals but for invoking functions
 | 
			
		||||
    using an in process actor instance.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, actor: 'Actor'):
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        actor: 'Actor'  # type: ignore
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self.actor = actor
 | 
			
		||||
 | 
			
		||||
    async def run(self, ns: str, func: str, **kwargs) -> typing.Any:
 | 
			
		||||
    async def run(self, ns: str, func: str, **kwargs) -> Any:
 | 
			
		||||
        """Run a requested function locally and return it's result.
 | 
			
		||||
        """
 | 
			
		||||
        obj = self.actor if ns == 'self' else importlib.import_module(ns)
 | 
			
		||||
        func = getattr(obj, func)
 | 
			
		||||
        return func(**kwargs)
 | 
			
		||||
        return getattr(obj, func)(**kwargs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def open_portal(
 | 
			
		||||
    channel: 'Channel',
 | 
			
		||||
    channel: Channel,
 | 
			
		||||
    nursery: trio._core._run.Nursery = None
 | 
			
		||||
) -> typing.AsyncContextManager[Portal]:
 | 
			
		||||
) -> typing.AsyncGenerator[Portal, None]:
 | 
			
		||||
    """Open a ``Portal`` through the provided ``channel``.
 | 
			
		||||
 | 
			
		||||
    Spawns a background task to handle message processing.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,10 +1,13 @@
 | 
			
		|||
"""
 | 
			
		||||
Per process state
 | 
			
		||||
"""
 | 
			
		||||
_current_actor = None
 | 
			
		||||
from typing import Optional
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def current_actor() -> 'Actor':
 | 
			
		||||
_current_actor: Optional['Actor'] = None  # type: ignore
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def current_actor() -> 'Actor':  # type: ignore
 | 
			
		||||
    """Get the process-local actor instance.
 | 
			
		||||
    """
 | 
			
		||||
    if not _current_actor:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,8 @@
 | 
			
		|||
"""
 | 
			
		||||
import multiprocessing as mp
 | 
			
		||||
import inspect
 | 
			
		||||
from multiprocessing import forkserver, semaphore_tracker
 | 
			
		||||
from multiprocessing import forkserver, semaphore_tracker  # type: ignore
 | 
			
		||||
from typing import Tuple, List, Dict, Optional, Any
 | 
			
		||||
import typing
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
| 
						 | 
				
			
			@ -24,14 +25,17 @@ log = get_logger('tractor')
 | 
			
		|||
class ActorNursery:
 | 
			
		||||
    """Spawn scoped subprocess actors.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, actor: Actor):
 | 
			
		||||
    def __init__(self, actor: Actor) -> None:
 | 
			
		||||
        # self.supervisor = supervisor  # TODO
 | 
			
		||||
        self._actor = actor
 | 
			
		||||
        self._children = {}
 | 
			
		||||
        self._actor: Actor = actor
 | 
			
		||||
        self._children: Dict[
 | 
			
		||||
            Tuple[str, str],
 | 
			
		||||
            Tuple[Actor, mp.Process, Optional[Portal]]
 | 
			
		||||
        ] = {}
 | 
			
		||||
        # portals spawned with ``run_in_actor()``
 | 
			
		||||
        self._cancel_after_result_on_exit = set()
 | 
			
		||||
        self.cancelled = False
 | 
			
		||||
        self._forkserver = None
 | 
			
		||||
        self._cancel_after_result_on_exit: set = set()
 | 
			
		||||
        self.cancelled: bool = False
 | 
			
		||||
        self._forkserver: forkserver.ForkServer = None
 | 
			
		||||
 | 
			
		||||
    async def __aenter__(self):
 | 
			
		||||
        return self
 | 
			
		||||
| 
						 | 
				
			
			@ -39,9 +43,9 @@ class ActorNursery:
 | 
			
		|||
    async def start_actor(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str,
 | 
			
		||||
        bind_addr: (str, int) = ('127.0.0.1', 0),
 | 
			
		||||
        statespace: dict = None,
 | 
			
		||||
        rpc_module_paths: [str] = None,
 | 
			
		||||
        bind_addr: Tuple[str, int] = ('127.0.0.1', 0),
 | 
			
		||||
        statespace: Optional[Dict[str, Any]] = None,
 | 
			
		||||
        rpc_module_paths: List[str] = None,
 | 
			
		||||
        loglevel: str = None,  # set log level per subactor
 | 
			
		||||
    ) -> Portal:
 | 
			
		||||
        loglevel = loglevel or self._actor.loglevel or get_loglevel()
 | 
			
		||||
| 
						 | 
				
			
			@ -71,6 +75,7 @@ class ActorNursery:
 | 
			
		|||
                semaphore_tracker._semaphore_tracker._fd,
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            assert self._actor._forkserver_info
 | 
			
		||||
            fs_info = (
 | 
			
		||||
                fs._forkserver_address,
 | 
			
		||||
                fs._forkserver_alive_fd,
 | 
			
		||||
| 
						 | 
				
			
			@ -88,7 +93,7 @@ class ActorNursery:
 | 
			
		|||
        # register the process before start in case we get a cancel
 | 
			
		||||
        # request before the actor has fully spawned - then we can wait
 | 
			
		||||
        # for it to fully come up before sending a cancel request
 | 
			
		||||
        self._children[actor.uid] = [actor, proc, None]
 | 
			
		||||
        self._children[actor.uid] = (actor, proc, None)
 | 
			
		||||
 | 
			
		||||
        proc.start()
 | 
			
		||||
        if not proc.is_alive():
 | 
			
		||||
| 
						 | 
				
			
			@ -100,15 +105,15 @@ class ActorNursery:
 | 
			
		|||
        # local actor by the time we get a ref to it
 | 
			
		||||
        event, chan = await self._actor.wait_for_peer(actor.uid)
 | 
			
		||||
        portal = Portal(chan)
 | 
			
		||||
        self._children[actor.uid][2] = portal
 | 
			
		||||
        self._children[actor.uid] = (actor, proc, portal)
 | 
			
		||||
        return portal
 | 
			
		||||
 | 
			
		||||
    async def run_in_actor(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str,
 | 
			
		||||
        fn: typing.Callable,
 | 
			
		||||
        bind_addr: (str, int) = ('127.0.0.1', 0),
 | 
			
		||||
        rpc_module_paths: [str] = None,
 | 
			
		||||
        bind_addr: Tuple[str, int] = ('127.0.0.1', 0),
 | 
			
		||||
        rpc_module_paths: List[str] = None,
 | 
			
		||||
        statespace: dict = None,
 | 
			
		||||
        loglevel: str = None,  # set log level per subactor
 | 
			
		||||
        **kwargs,  # explicit args to ``fn``
 | 
			
		||||
| 
						 | 
				
			
			@ -155,7 +160,12 @@ class ActorNursery:
 | 
			
		|||
                            async for item in agen:
 | 
			
		||||
                                log.debug(f"Consuming item {item}")
 | 
			
		||||
 | 
			
		||||
        async def wait_for_proc(proc, actor, portal, cancel_scope):
 | 
			
		||||
        async def wait_for_proc(
 | 
			
		||||
            proc: mp.Process,
 | 
			
		||||
            actor: Actor,
 | 
			
		||||
            portal: Portal,
 | 
			
		||||
            cancel_scope: trio._core._run.CancelScope,
 | 
			
		||||
        ) -> None:
 | 
			
		||||
            # TODO: timeout block here?
 | 
			
		||||
            if proc.is_alive():
 | 
			
		||||
                await trio.hazmat.wait_readable(proc.sentinel)
 | 
			
		||||
| 
						 | 
				
			
			@ -172,9 +182,10 @@ class ActorNursery:
 | 
			
		|||
                cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
        async def wait_for_actor(
 | 
			
		||||
            portal, actor,
 | 
			
		||||
            portal: Portal,
 | 
			
		||||
            actor: Actor,
 | 
			
		||||
            task_status=trio.TASK_STATUS_IGNORED,
 | 
			
		||||
        ):
 | 
			
		||||
        ) -> None:
 | 
			
		||||
            # cancel the actor gracefully
 | 
			
		||||
            with trio.open_cancel_scope() as cs:
 | 
			
		||||
                task_status.started(cs)
 | 
			
		||||
| 
						 | 
				
			
			@ -231,6 +242,7 @@ class ActorNursery:
 | 
			
		|||
                                    do_hard_kill(proc)
 | 
			
		||||
 | 
			
		||||
                        # spawn cancel tasks async
 | 
			
		||||
                        assert portal
 | 
			
		||||
                        n.start_soon(portal.cancel_actor)
 | 
			
		||||
 | 
			
		||||
        log.debug(f"Waiting on all subactors to complete")
 | 
			
		||||
| 
						 | 
				
			
			@ -275,7 +287,7 @@ class ActorNursery:
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def open_nursery() -> typing.AsyncContextManager[ActorNursery]:
 | 
			
		||||
async def open_nursery() -> typing.AsyncGenerator[None, ActorNursery]:
 | 
			
		||||
    """Create and yield a new ``ActorNursery``.
 | 
			
		||||
    """
 | 
			
		||||
    actor = current_actor()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,9 +2,10 @@
 | 
			
		|||
Log like a forester!
 | 
			
		||||
"""
 | 
			
		||||
from functools import partial
 | 
			
		||||
import sys
 | 
			
		||||
import logging
 | 
			
		||||
import colorlog  # type: ignore
 | 
			
		||||
from typing import Optional
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_proj_name = 'tractor'
 | 
			
		||||
_default_loglevel = None
 | 
			
		||||
| 
						 | 
				
			
			@ -86,5 +87,5 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger:
 | 
			
		|||
    return log
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_loglevel() -> str:
 | 
			
		||||
def get_loglevel() -> Optional[str]:
 | 
			
		||||
    return _default_loglevel
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue