forked from goodboy/tractor
				
			Reorg everything into private modules
							parent
							
								
									f636bfdf83
								
							
						
					
					
						commit
						64cbb922dc
					
				| 
						 | 
				
			
			@ -12,7 +12,7 @@ def pytest_addoption(parser):
 | 
			
		|||
 | 
			
		||||
@pytest.fixture(scope='session', autouse=True)
 | 
			
		||||
def loglevel(request):
 | 
			
		||||
    orig = tractor._default_loglevel
 | 
			
		||||
    level = tractor._default_loglevel = request.config.option.loglevel
 | 
			
		||||
    orig = tractor.log._default_loglevel
 | 
			
		||||
    level = tractor.log._default_loglevel = request.config.option.loglevel
 | 
			
		||||
    yield level
 | 
			
		||||
    tractor._default_loglevel = orig
 | 
			
		||||
    tractor.log._default_loglevel = orig
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										1019
									
								
								tractor/__init__.py
								
								
								
								
							
							
						
						
									
										1019
									
								
								tractor/__init__.py
								
								
								
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| 
						 | 
				
			
			@ -0,0 +1,647 @@
 | 
			
		|||
"""
 | 
			
		||||
Actor primitives and helpers
 | 
			
		||||
"""
 | 
			
		||||
import inspect
 | 
			
		||||
import importlib
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
from functools import partial
 | 
			
		||||
from typing import Coroutine
 | 
			
		||||
import traceback
 | 
			
		||||
import uuid
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from async_generator import asynccontextmanager, aclosing
 | 
			
		||||
 | 
			
		||||
from ._ipc import Channel, _connect_chan
 | 
			
		||||
from .log import get_console_log, get_logger
 | 
			
		||||
from ._portal import (Portal, open_portal, _do_handshake, LocalPortal,
 | 
			
		||||
                      maybe_open_nursery)
 | 
			
		||||
from . import _state
 | 
			
		||||
from ._state import current_actor
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger('tractor')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ActorFailure(Exception):
 | 
			
		||||
    "General actor failure"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _invoke(
 | 
			
		||||
    cid, chan, func, kwargs,
 | 
			
		||||
    treat_as_gen=False, raise_errs=False,
 | 
			
		||||
    task_status=trio.TASK_STATUS_IGNORED
 | 
			
		||||
):
 | 
			
		||||
    """Invoke local func and return results over provided channel.
 | 
			
		||||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        is_async_partial = False
 | 
			
		||||
        is_async_gen_partial = False
 | 
			
		||||
        if isinstance(func, partial):
 | 
			
		||||
            is_async_partial = inspect.iscoroutinefunction(func.func)
 | 
			
		||||
            is_async_gen_partial = inspect.isasyncgenfunction(func.func)
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            not inspect.iscoroutinefunction(func) and
 | 
			
		||||
            not inspect.isasyncgenfunction(func) and
 | 
			
		||||
            not is_async_partial and
 | 
			
		||||
            not is_async_gen_partial
 | 
			
		||||
        ):
 | 
			
		||||
            await chan.send({'return': func(**kwargs), 'cid': cid})
 | 
			
		||||
        else:
 | 
			
		||||
            coro = func(**kwargs)
 | 
			
		||||
 | 
			
		||||
            if inspect.isasyncgen(coro):
 | 
			
		||||
                # XXX: massive gotcha! If the containing scope
 | 
			
		||||
                # is cancelled and we execute the below line,
 | 
			
		||||
                # any ``ActorNursery.__aexit__()`` WON'T be
 | 
			
		||||
                # triggered in the underlying async gen! So we
 | 
			
		||||
                # have to properly handle the closing (aclosing)
 | 
			
		||||
                # of the async gen in order to be sure the cancel
 | 
			
		||||
                # is propagated!
 | 
			
		||||
                async with aclosing(coro) as agen:
 | 
			
		||||
                    async for item in agen:
 | 
			
		||||
                        # TODO: can we send values back in here?
 | 
			
		||||
                        # it's gonna require a `while True:` and
 | 
			
		||||
                        # some non-blocking way to retrieve new `asend()`
 | 
			
		||||
                        # values from the channel:
 | 
			
		||||
                        # to_send = await chan.recv_nowait()
 | 
			
		||||
                        # if to_send is not None:
 | 
			
		||||
                        #     to_yield = await coro.asend(to_send)
 | 
			
		||||
                        await chan.send({'yield': item, 'cid': cid})
 | 
			
		||||
 | 
			
		||||
                    log.debug(f"Finished iterating {coro}")
 | 
			
		||||
                    # TODO: we should really support a proper
 | 
			
		||||
                    # `StopAsyncIteration` system here for returning a final
 | 
			
		||||
                    # value if desired
 | 
			
		||||
                    await chan.send({'stop': None, 'cid': cid})
 | 
			
		||||
            else:
 | 
			
		||||
                if treat_as_gen:
 | 
			
		||||
                    # XXX: the async-func may spawn further tasks which push
 | 
			
		||||
                    # back values like an async-generator would but must
 | 
			
		||||
                    # manualy construct the response dict-packet-responses as
 | 
			
		||||
                    # above
 | 
			
		||||
                    await coro
 | 
			
		||||
                else:
 | 
			
		||||
                    await chan.send({'return': await coro, 'cid': cid})
 | 
			
		||||
 | 
			
		||||
    except Exception:
 | 
			
		||||
        log.exception("Actor errored:")
 | 
			
		||||
        if not raise_errs:
 | 
			
		||||
            await chan.send({'error': traceback.format_exc(), 'cid': cid})
 | 
			
		||||
        else:
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    task_status.started()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Actor:
 | 
			
		||||
    """The fundamental concurrency primitive.
 | 
			
		||||
 | 
			
		||||
    An *actor* is the combination of a regular Python or
 | 
			
		||||
    ``multiprocessing.Process`` executing a ``trio`` task tree, communicating
 | 
			
		||||
    with other actors through "portals" which provide a native async API
 | 
			
		||||
    around "channels".
 | 
			
		||||
    """
 | 
			
		||||
    is_arbiter = False
 | 
			
		||||
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str,
 | 
			
		||||
        main: Coroutine = None,
 | 
			
		||||
        rpc_module_paths: [str] = [],
 | 
			
		||||
        statespace: dict = {},
 | 
			
		||||
        uid: str = None,
 | 
			
		||||
        allow_rpc: bool = True,
 | 
			
		||||
        outlive_main: bool = False,
 | 
			
		||||
        loglevel: str = None,
 | 
			
		||||
        arbiter_addr: (str, int) = None,
 | 
			
		||||
    ):
 | 
			
		||||
        self.name = name
 | 
			
		||||
        self.uid = (name, uid or str(uuid.uuid1()))
 | 
			
		||||
        self.rpc_module_paths = rpc_module_paths
 | 
			
		||||
        self._mods = {}
 | 
			
		||||
        self.main = main
 | 
			
		||||
        # TODO: consider making this a dynamically defined
 | 
			
		||||
        # @dataclass once we get py3.7
 | 
			
		||||
        self.statespace = statespace
 | 
			
		||||
        self._allow_rpc = allow_rpc
 | 
			
		||||
        self._outlive_main = outlive_main
 | 
			
		||||
        self.loglevel = loglevel
 | 
			
		||||
        self._arb_addr = arbiter_addr
 | 
			
		||||
 | 
			
		||||
        # filled in by `_async_main` after fork
 | 
			
		||||
        self._peers = defaultdict(list)
 | 
			
		||||
        self._peer_connected = {}
 | 
			
		||||
        self._no_more_peers = trio.Event()
 | 
			
		||||
        self._main_complete = trio.Event()
 | 
			
		||||
        self._main_scope = None
 | 
			
		||||
        self._no_more_peers.set()
 | 
			
		||||
        self._actors2calls = {}  # map {uids -> {callids -> waiter queues}}
 | 
			
		||||
        self._listeners = []
 | 
			
		||||
        self._parent_chan = None
 | 
			
		||||
        self._accept_host = None
 | 
			
		||||
 | 
			
		||||
    async def wait_for_peer(self, uid):
 | 
			
		||||
        """Wait for a connection back from a spawned actor with a given
 | 
			
		||||
        ``uid``.
 | 
			
		||||
        """
 | 
			
		||||
        log.debug(f"Waiting for peer {uid} to connect")
 | 
			
		||||
        event = self._peer_connected.setdefault(uid, trio.Event())
 | 
			
		||||
        await event.wait()
 | 
			
		||||
        log.debug(f"{uid} successfully connected back to us")
 | 
			
		||||
        return event, self._peers[uid][-1]
 | 
			
		||||
 | 
			
		||||
    def load_namespaces(self):
 | 
			
		||||
        # We load namespaces after fork since this actor may
 | 
			
		||||
        # be spawned on a different machine from the original nursery
 | 
			
		||||
        # and we need to try and load the local module code (if it
 | 
			
		||||
        # exists)
 | 
			
		||||
        for path in self.rpc_module_paths:
 | 
			
		||||
            self._mods[path] = importlib.import_module(path)
 | 
			
		||||
 | 
			
		||||
    async def _stream_handler(
 | 
			
		||||
        self,
 | 
			
		||||
        stream: trio.SocketStream,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Entry point for new inbound connections to the channel server.
 | 
			
		||||
        """
 | 
			
		||||
        self._no_more_peers.clear()
 | 
			
		||||
        chan = Channel(stream=stream)
 | 
			
		||||
        log.info(f"New connection to us {chan}")
 | 
			
		||||
 | 
			
		||||
        # send/receive initial handshake response
 | 
			
		||||
        try:
 | 
			
		||||
            uid = await _do_handshake(self, chan)
 | 
			
		||||
        except StopAsyncIteration:
 | 
			
		||||
            log.warn(f"Channel {chan} failed to handshake")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # channel tracking
 | 
			
		||||
        event = self._peer_connected.pop(uid, None)
 | 
			
		||||
        if event:
 | 
			
		||||
            # Instructing connection: this is likely a new channel to
 | 
			
		||||
            # a recently spawned actor which we'd like to control via
 | 
			
		||||
            # async-rpc calls.
 | 
			
		||||
            log.debug(f"Waking channel waiters {event.statistics()}")
 | 
			
		||||
            # Alert any task waiting on this connection to come up
 | 
			
		||||
            event.set()
 | 
			
		||||
 | 
			
		||||
        chans = self._peers[uid]
 | 
			
		||||
        if chans:
 | 
			
		||||
            log.warn(
 | 
			
		||||
                f"already have channel(s) for {uid}:{chans}?"
 | 
			
		||||
            )
 | 
			
		||||
        log.debug(f"Registered {chan} for {uid}")
 | 
			
		||||
        # append new channel
 | 
			
		||||
        self._peers[uid].append(chan)
 | 
			
		||||
 | 
			
		||||
        # Begin channel management - respond to remote requests and
 | 
			
		||||
        # process received reponses.
 | 
			
		||||
        try:
 | 
			
		||||
            await self._process_messages(chan)
 | 
			
		||||
        finally:
 | 
			
		||||
            # Drop ref to channel so it can be gc-ed and disconnected
 | 
			
		||||
            log.debug(f"Releasing channel {chan} from {chan.uid}")
 | 
			
		||||
            chans = self._peers.get(chan.uid)
 | 
			
		||||
            chans.remove(chan)
 | 
			
		||||
            if not chans:
 | 
			
		||||
                log.debug(f"No more channels for {chan.uid}")
 | 
			
		||||
                self._peers.pop(chan.uid, None)
 | 
			
		||||
                if not self._actors2calls.get(chan.uid, {}).get('main'):
 | 
			
		||||
                    # fake a "main task" result for any waiting
 | 
			
		||||
                    # nurseries/portals
 | 
			
		||||
                    log.debug(f"Faking result for {chan} from {chan.uid}")
 | 
			
		||||
                    q = self.get_waitq(chan.uid, 'main')
 | 
			
		||||
                    q.put_nowait({'return': None, 'cid': 'main'})
 | 
			
		||||
 | 
			
		||||
            log.debug(f"Peers is {self._peers}")
 | 
			
		||||
 | 
			
		||||
            if not self._peers:  # no more channels connected
 | 
			
		||||
                self._no_more_peers.set()
 | 
			
		||||
                log.debug(f"Signalling no more peer channels")
 | 
			
		||||
 | 
			
		||||
            # XXX: is this necessary?
 | 
			
		||||
            if chan.connected():
 | 
			
		||||
                log.debug(f"Disconnecting channel {chan}")
 | 
			
		||||
                await chan.send(None)
 | 
			
		||||
                await chan.aclose()
 | 
			
		||||
 | 
			
		||||
    async def _push_result(self, actorid, cid, msg):
 | 
			
		||||
        assert actorid, f"`actorid` can't be {actorid}"
 | 
			
		||||
        q = self.get_waitq(actorid, cid)
 | 
			
		||||
        log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
 | 
			
		||||
        # maintain backpressure
 | 
			
		||||
        await q.put(msg)
 | 
			
		||||
 | 
			
		||||
    def get_waitq(self, actorid, cid):
 | 
			
		||||
        log.debug(f"Getting result queue for {actorid} cid {cid}")
 | 
			
		||||
        cids2qs = self._actors2calls.setdefault(actorid, {})
 | 
			
		||||
        return cids2qs.setdefault(cid, trio.Queue(1000))
 | 
			
		||||
 | 
			
		||||
    async def send_cmd(self, chan, ns, func, kwargs):
 | 
			
		||||
        """Send a ``'cmd'`` message to a remote actor and return a
 | 
			
		||||
        caller id and a ``trio.Queue`` that can be used to wait for
 | 
			
		||||
        responses delivered by the local message processing loop.
 | 
			
		||||
        """
 | 
			
		||||
        cid = str(uuid.uuid1())
 | 
			
		||||
        q = self.get_waitq(chan.uid, cid)
 | 
			
		||||
        log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
 | 
			
		||||
        await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
 | 
			
		||||
        return cid, q
 | 
			
		||||
 | 
			
		||||
    async def _process_messages(self, chan, treat_as_gen=False):
 | 
			
		||||
        """Process messages async-RPC style.
 | 
			
		||||
 | 
			
		||||
        Process rpc requests and deliver retrieved responses from channels.
 | 
			
		||||
        """
 | 
			
		||||
        # TODO: once https://github.com/python-trio/trio/issues/467 gets
 | 
			
		||||
        # worked out we'll likely want to use that!
 | 
			
		||||
        log.debug(f"Entering msg loop for {chan} from {chan.uid}")
 | 
			
		||||
        async with trio.open_nursery() as nursery:
 | 
			
		||||
            try:
 | 
			
		||||
                async for msg in chan.aiter_recv():
 | 
			
		||||
                    if msg is None:  # terminate sentinel
 | 
			
		||||
                        log.debug(
 | 
			
		||||
                            f"Cancelling all tasks for {chan} from {chan.uid}")
 | 
			
		||||
                        nursery.cancel_scope.cancel()
 | 
			
		||||
                        log.debug(
 | 
			
		||||
                                f"Msg loop signalled to terminate for"
 | 
			
		||||
                                f" {chan} from {chan.uid}")
 | 
			
		||||
                        break
 | 
			
		||||
                    log.debug(f"Received msg {msg} from {chan.uid}")
 | 
			
		||||
                    cid = msg.get('cid')
 | 
			
		||||
                    if cid:  # deliver response to local caller/waiter
 | 
			
		||||
                        await self._push_result(chan.uid, cid, msg)
 | 
			
		||||
                        log.debug(
 | 
			
		||||
                            f"Waiting on next msg for {chan} from {chan.uid}")
 | 
			
		||||
                        continue
 | 
			
		||||
                    else:
 | 
			
		||||
                        ns, funcname, kwargs, actorid, cid = msg['cmd']
 | 
			
		||||
 | 
			
		||||
                    log.debug(
 | 
			
		||||
                        f"Processing request from {actorid}\n"
 | 
			
		||||
                        f"{ns}.{funcname}({kwargs})")
 | 
			
		||||
                    if ns == 'self':
 | 
			
		||||
                        func = getattr(self, funcname)
 | 
			
		||||
                    else:
 | 
			
		||||
                        func = getattr(self._mods[ns], funcname)
 | 
			
		||||
 | 
			
		||||
                    # spin up a task for the requested function
 | 
			
		||||
                    sig = inspect.signature(func)
 | 
			
		||||
                    treat_as_gen = False
 | 
			
		||||
                    if 'chan' in sig.parameters:
 | 
			
		||||
                        assert 'cid' in sig.parameters, \
 | 
			
		||||
                            f"{func} must accept a `cid` (caller id) kwarg"
 | 
			
		||||
                        kwargs['chan'] = chan
 | 
			
		||||
                        kwargs['cid'] = cid
 | 
			
		||||
                        # TODO: eventually we want to be more stringent
 | 
			
		||||
                        # about what is considered a far-end async-generator.
 | 
			
		||||
                        # Right now both actual async gens and any async
 | 
			
		||||
                        # function which declares a `chan` kwarg in its
 | 
			
		||||
                        # signature will be treated as one.
 | 
			
		||||
                        treat_as_gen = True
 | 
			
		||||
 | 
			
		||||
                    log.debug(f"Spawning task for {func}")
 | 
			
		||||
                    nursery.start_soon(
 | 
			
		||||
                        _invoke, cid, chan, func, kwargs, treat_as_gen,
 | 
			
		||||
                        name=funcname
 | 
			
		||||
                    )
 | 
			
		||||
                    log.debug(
 | 
			
		||||
                        f"Waiting on next msg for {chan} from {chan.uid}")
 | 
			
		||||
                else:  # channel disconnect
 | 
			
		||||
                    log.debug(f"{chan} from {chan.uid} disconnected")
 | 
			
		||||
            except trio.ClosedStreamError:
 | 
			
		||||
                log.error(f"{chan} form {chan.uid} broke")
 | 
			
		||||
 | 
			
		||||
        log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
 | 
			
		||||
 | 
			
		||||
    def _fork_main(self, accept_addr, parent_addr=None):
 | 
			
		||||
        # after fork routine which invokes a fresh ``trio.run``
 | 
			
		||||
        # log.warn("Log level after fork is {self.loglevel}")
 | 
			
		||||
        from ._trionics import ctx
 | 
			
		||||
        if self.loglevel is not None:
 | 
			
		||||
            get_console_log(self.loglevel)
 | 
			
		||||
        log.info(
 | 
			
		||||
            f"Started new {ctx.current_process()} for actor {self.uid}")
 | 
			
		||||
        _state._current_actor = self
 | 
			
		||||
        log.debug(f"parent_addr is {parent_addr}")
 | 
			
		||||
        try:
 | 
			
		||||
            trio.run(partial(
 | 
			
		||||
                self._async_main, accept_addr, parent_addr=parent_addr))
 | 
			
		||||
        except KeyboardInterrupt:
 | 
			
		||||
            pass  # handle it the same way trio does?
 | 
			
		||||
        log.debug(f"Actor {self.uid} terminated")
 | 
			
		||||
 | 
			
		||||
    async def _async_main(
 | 
			
		||||
        self,
 | 
			
		||||
        accept_addr,
 | 
			
		||||
        arbiter_addr=None,
 | 
			
		||||
        parent_addr=None,
 | 
			
		||||
        nursery=None
 | 
			
		||||
    ):
 | 
			
		||||
        """Start the channel server, maybe connect back to the parent, and
 | 
			
		||||
        start the main task.
 | 
			
		||||
 | 
			
		||||
        A "root-most" (or "top-level") nursery for this actor is opened here
 | 
			
		||||
        and when cancelled effectively cancels the actor.
 | 
			
		||||
        """
 | 
			
		||||
        result = None
 | 
			
		||||
        arbiter_addr = arbiter_addr or self._arb_addr
 | 
			
		||||
        registered_with_arbiter = False
 | 
			
		||||
        try:
 | 
			
		||||
            async with maybe_open_nursery(nursery) as nursery:
 | 
			
		||||
                self._root_nursery = nursery
 | 
			
		||||
 | 
			
		||||
                # Startup up channel server
 | 
			
		||||
                host, port = accept_addr
 | 
			
		||||
                await nursery.start(partial(
 | 
			
		||||
                    self._serve_forever, accept_host=host, accept_port=port)
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # XXX: I wonder if a better name is maybe "requester"
 | 
			
		||||
                # since I don't think the notion of a "parent" actor
 | 
			
		||||
                # necessarily sticks given that eventually we want
 | 
			
		||||
                # ``'MainProcess'`` (the actor who initially starts the
 | 
			
		||||
                # forkserver) to eventually be the only one who is
 | 
			
		||||
                # allowed to spawn new processes per Python program.
 | 
			
		||||
                if parent_addr is not None:
 | 
			
		||||
                    try:
 | 
			
		||||
                        # Connect back to the parent actor and conduct initial
 | 
			
		||||
                        # handshake (From this point on if we error ship the
 | 
			
		||||
                        # exception back to the parent actor)
 | 
			
		||||
                        chan = self._parent_chan = Channel(
 | 
			
		||||
                            destaddr=parent_addr,
 | 
			
		||||
                            on_reconnect=self.main
 | 
			
		||||
                        )
 | 
			
		||||
                        await chan.connect()
 | 
			
		||||
                        # initial handshake, report who we are, who they are
 | 
			
		||||
                        await _do_handshake(self, chan)
 | 
			
		||||
                    except OSError:  # failed to connect
 | 
			
		||||
                        log.warn(
 | 
			
		||||
                            f"Failed to connect to parent @ {parent_addr},"
 | 
			
		||||
                            " closing server")
 | 
			
		||||
                        self.cancel_server()
 | 
			
		||||
                        self._parent_chan = None
 | 
			
		||||
 | 
			
		||||
                # register with the arbiter if we're told its addr
 | 
			
		||||
                log.debug(f"Registering {self} for role `{self.name}`")
 | 
			
		||||
                async with get_arbiter(*arbiter_addr) as arb_portal:
 | 
			
		||||
                    await arb_portal.run(
 | 
			
		||||
                        'self', 'register_actor',
 | 
			
		||||
                        name=self.name, sockaddr=self.accept_addr)
 | 
			
		||||
                    registered_with_arbiter = True
 | 
			
		||||
 | 
			
		||||
                # handle new connection back to parent optionally
 | 
			
		||||
                # begin responding to RPC
 | 
			
		||||
                if self._allow_rpc:
 | 
			
		||||
                    self.load_namespaces()
 | 
			
		||||
                    if self._parent_chan:
 | 
			
		||||
                        nursery.start_soon(
 | 
			
		||||
                            self._process_messages, self._parent_chan)
 | 
			
		||||
 | 
			
		||||
                if self.main:
 | 
			
		||||
                    try:
 | 
			
		||||
                        if self._parent_chan:
 | 
			
		||||
                            async with trio.open_nursery() as n:
 | 
			
		||||
                                self._main_scope = n.cancel_scope
 | 
			
		||||
                                log.debug(f"Starting main task `{self.main}`")
 | 
			
		||||
                                # spawned subactor so deliver "main"
 | 
			
		||||
                                # task result(s) back to parent
 | 
			
		||||
                                await n.start(
 | 
			
		||||
                                    _invoke, 'main',
 | 
			
		||||
                                    self._parent_chan, self.main, {},
 | 
			
		||||
                                    # treat_as_gen, raise_errs params
 | 
			
		||||
                                    False, True
 | 
			
		||||
                                )
 | 
			
		||||
                        else:
 | 
			
		||||
                            with trio.open_cancel_scope() as main_scope:
 | 
			
		||||
                                self._main_scope = main_scope
 | 
			
		||||
                                # run directly we are an "unspawned actor"
 | 
			
		||||
                                log.debug(f"Running `{self.main}` directly")
 | 
			
		||||
                                result = await self.main()
 | 
			
		||||
                    finally:
 | 
			
		||||
                        # tear down channel server in order to ensure
 | 
			
		||||
                        # we exit normally when the main task is done
 | 
			
		||||
                        if not self._outlive_main:
 | 
			
		||||
                            log.debug(f"Shutting down channel server")
 | 
			
		||||
                            self.cancel_server()
 | 
			
		||||
                            log.debug(f"Shutting down root nursery")
 | 
			
		||||
                            nursery.cancel_scope.cancel()
 | 
			
		||||
                        self._main_complete.set()
 | 
			
		||||
 | 
			
		||||
                    if self._main_scope.cancelled_caught:
 | 
			
		||||
                        log.debug("Main task was cancelled sucessfully")
 | 
			
		||||
                log.debug("Waiting on root nursery to complete")
 | 
			
		||||
            # blocks here as expected if no nursery was provided until
 | 
			
		||||
            # the channel server is killed (i.e. this actor is
 | 
			
		||||
            # cancelled or signalled by the parent actor)
 | 
			
		||||
        except Exception:
 | 
			
		||||
            if self._parent_chan:
 | 
			
		||||
                try:
 | 
			
		||||
                    await self._parent_chan.send(
 | 
			
		||||
                        {'error': traceback.format_exc(), 'cid': 'main'})
 | 
			
		||||
                except trio.ClosedStreamError:
 | 
			
		||||
                    log.error(
 | 
			
		||||
                        f"Failed to ship error to parent "
 | 
			
		||||
                        f"{self._parent_chan.uid}, channel was closed")
 | 
			
		||||
                    log.exception("Actor errored:")
 | 
			
		||||
 | 
			
		||||
            if not registered_with_arbiter:
 | 
			
		||||
                log.exception(
 | 
			
		||||
                    f"Failed to register with arbiter @ {arbiter_addr}")
 | 
			
		||||
            else:
 | 
			
		||||
                raise
 | 
			
		||||
        finally:
 | 
			
		||||
            await self._do_unreg(arbiter_addr)
 | 
			
		||||
            # terminate actor once all it's peers (actors that connected
 | 
			
		||||
            # to it as clients) have disappeared
 | 
			
		||||
            if not self._no_more_peers.is_set():
 | 
			
		||||
                log.debug(
 | 
			
		||||
                    f"Waiting for remaining peers {self._peers} to clear")
 | 
			
		||||
                await self._no_more_peers.wait()
 | 
			
		||||
            log.debug(f"All peer channels are complete")
 | 
			
		||||
 | 
			
		||||
            # tear down channel server no matter what since we errored
 | 
			
		||||
            # or completed
 | 
			
		||||
            log.debug(f"Shutting down channel server")
 | 
			
		||||
            self.cancel_server()
 | 
			
		||||
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    async def _serve_forever(
 | 
			
		||||
        self,
 | 
			
		||||
        *,
 | 
			
		||||
        # (host, port) to bind for channel server
 | 
			
		||||
        accept_host=None,
 | 
			
		||||
        accept_port=0,
 | 
			
		||||
        task_status=trio.TASK_STATUS_IGNORED
 | 
			
		||||
    ):
 | 
			
		||||
        """Start the channel server, begin listening for new connections.
 | 
			
		||||
 | 
			
		||||
        This will cause an actor to continue living (blocking) until
 | 
			
		||||
        ``cancel_server()`` is called.
 | 
			
		||||
        """
 | 
			
		||||
        async with trio.open_nursery() as nursery:
 | 
			
		||||
            self._server_nursery = nursery
 | 
			
		||||
            # TODO: might want to consider having a separate nursery
 | 
			
		||||
            # for the stream handler such that the server can be cancelled
 | 
			
		||||
            # whilst leaving existing channels up
 | 
			
		||||
            listeners = await nursery.start(
 | 
			
		||||
                partial(
 | 
			
		||||
                    trio.serve_tcp,
 | 
			
		||||
                    self._stream_handler,
 | 
			
		||||
                    # new connections will stay alive even if this server
 | 
			
		||||
                    # is cancelled
 | 
			
		||||
                    handler_nursery=self._root_nursery,
 | 
			
		||||
                    port=accept_port, host=accept_host,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
            log.debug(
 | 
			
		||||
                f"Started tcp server(s) on {[l.socket for l in listeners]}")
 | 
			
		||||
            self._listeners.extend(listeners)
 | 
			
		||||
            task_status.started()
 | 
			
		||||
 | 
			
		||||
    async def _do_unreg(self, arbiter_addr):
 | 
			
		||||
        # UNregister actor from the arbiter
 | 
			
		||||
        try:
 | 
			
		||||
            if arbiter_addr is not None:
 | 
			
		||||
                async with get_arbiter(*arbiter_addr) as arb_portal:
 | 
			
		||||
                    await arb_portal.run(
 | 
			
		||||
                        'self', 'unregister_actor', name=self.name)
 | 
			
		||||
        except OSError:
 | 
			
		||||
            log.warn(f"Unable to unregister {self.name} from arbiter")
 | 
			
		||||
 | 
			
		||||
    async def cancel(self):
 | 
			
		||||
        """This cancels the internal root-most nursery thereby gracefully
 | 
			
		||||
        cancelling (for all intents and purposes) this actor.
 | 
			
		||||
        """
 | 
			
		||||
        self.cancel_server()
 | 
			
		||||
        if self._main_scope:
 | 
			
		||||
            self._main_scope.cancel()
 | 
			
		||||
            log.debug("Waiting on main task to complete")
 | 
			
		||||
            await self._main_complete.wait()
 | 
			
		||||
        self._root_nursery.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    def cancel_server(self):
 | 
			
		||||
        """Cancel the internal channel server nursery thereby
 | 
			
		||||
        preventing any new inbound connections from being established.
 | 
			
		||||
        """
 | 
			
		||||
        self._server_nursery.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def accept_addr(self):
 | 
			
		||||
        """Primary address to which the channel server is bound.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            return self._listeners[0].socket.getsockname()
 | 
			
		||||
        except OSError:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
    def get_parent(self):
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    def get_chans(self, actorid):
 | 
			
		||||
        return self._peers[actorid]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Arbiter(Actor):
 | 
			
		||||
    """A special actor who knows all the other actors and always has
 | 
			
		||||
    access to the top level nursery.
 | 
			
		||||
 | 
			
		||||
    The arbiter is by default the first actor spawned on each host
 | 
			
		||||
    and is responsible for keeping track of all other actors for
 | 
			
		||||
    coordination purposes. If a new main process is launched and an
 | 
			
		||||
    arbiter is already running that arbiter will be used.
 | 
			
		||||
    """
 | 
			
		||||
    _registry = defaultdict(list)
 | 
			
		||||
    is_arbiter = True
 | 
			
		||||
 | 
			
		||||
    def find_actor(self, name):
 | 
			
		||||
        return self._registry[name]
 | 
			
		||||
 | 
			
		||||
    def register_actor(self, name, sockaddr):
 | 
			
		||||
        self._registry[name].append(sockaddr)
 | 
			
		||||
 | 
			
		||||
    def unregister_actor(self, name):
 | 
			
		||||
        self._registry.pop(name, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
 | 
			
		||||
    """Spawn a local actor by starting a task to execute it's main
 | 
			
		||||
    async function.
 | 
			
		||||
 | 
			
		||||
    Blocks if no nursery is provided, in which case it is expected the nursery
 | 
			
		||||
    provider is responsible for waiting on the task to complete.
 | 
			
		||||
    """
 | 
			
		||||
    # assign process-local actor
 | 
			
		||||
    _state._current_actor = actor
 | 
			
		||||
 | 
			
		||||
    # start local channel-server and fake the portal API
 | 
			
		||||
    # NOTE: this won't block since we provide the nursery
 | 
			
		||||
    log.info(f"Starting local {actor} @ {host}:{port}")
 | 
			
		||||
 | 
			
		||||
    result = await actor._async_main(
 | 
			
		||||
        accept_addr=(host, port),
 | 
			
		||||
        parent_addr=None,
 | 
			
		||||
        arbiter_addr=arbiter_addr,
 | 
			
		||||
        nursery=nursery,
 | 
			
		||||
    )
 | 
			
		||||
    # XXX: If spawned locally, the actor is cancelled when this
 | 
			
		||||
    # context is complete given that there are no more active
 | 
			
		||||
    # peer channels connected to it.
 | 
			
		||||
    if not actor._outlive_main:
 | 
			
		||||
        actor.cancel_server()
 | 
			
		||||
 | 
			
		||||
    # unset module state
 | 
			
		||||
    _state._current_actor = None
 | 
			
		||||
    log.info("Completed async main")
 | 
			
		||||
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def get_arbiter(host, port):
 | 
			
		||||
    """Return a portal instance connected to a local or remote
 | 
			
		||||
    arbiter.
 | 
			
		||||
    """
 | 
			
		||||
    actor = current_actor()
 | 
			
		||||
    if not actor:
 | 
			
		||||
        raise RuntimeError("No actor instance has been defined yet?")
 | 
			
		||||
 | 
			
		||||
    if actor.is_arbiter:
 | 
			
		||||
        # we're already the arbiter
 | 
			
		||||
        # (likely a re-entrant call from the arbiter actor)
 | 
			
		||||
        yield LocalPortal(actor)
 | 
			
		||||
    else:
 | 
			
		||||
        async with _connect_chan(host, port) as chan:
 | 
			
		||||
            async with open_portal(chan) as arb_portal:
 | 
			
		||||
                yield arb_portal
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def find_actor(
 | 
			
		||||
    name,
 | 
			
		||||
    arbiter_sockaddr=None,
 | 
			
		||||
):
 | 
			
		||||
    """Ask the arbiter to find actor(s) by name.
 | 
			
		||||
 | 
			
		||||
    Returns a connected portal to the last registered matching actor
 | 
			
		||||
    known to the arbiter.
 | 
			
		||||
    """
 | 
			
		||||
    actor = current_actor()
 | 
			
		||||
    if not actor:
 | 
			
		||||
        raise RuntimeError("No actor instance has been defined yet?")
 | 
			
		||||
 | 
			
		||||
    async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
 | 
			
		||||
        sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
 | 
			
		||||
        # TODO: return portals to all available actors - for now just
 | 
			
		||||
        # the last one that registered
 | 
			
		||||
        if sockaddrs:
 | 
			
		||||
            sockaddr = sockaddrs[-1]
 | 
			
		||||
            async with _connect_chan(*sockaddr) as chan:
 | 
			
		||||
                async with open_portal(chan) as portal:
 | 
			
		||||
                    yield portal
 | 
			
		||||
        else:
 | 
			
		||||
            yield None
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,205 @@
 | 
			
		|||
"""
 | 
			
		||||
Portal api
 | 
			
		||||
"""
 | 
			
		||||
import importlib
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from async_generator import asynccontextmanager
 | 
			
		||||
 | 
			
		||||
from ._state import current_actor
 | 
			
		||||
from .log import get_logger
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger('tractor')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RemoteActorError(RuntimeError):
 | 
			
		||||
    "Remote actor exception bundled locally"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def maybe_open_nursery(nursery=None):
 | 
			
		||||
    """Create a new nursery if None provided.
 | 
			
		||||
 | 
			
		||||
    Blocks on exit as expected if no input nursery is provided.
 | 
			
		||||
    """
 | 
			
		||||
    if nursery is not None:
 | 
			
		||||
        yield nursery
 | 
			
		||||
    else:
 | 
			
		||||
        async with trio.open_nursery() as nursery:
 | 
			
		||||
            yield nursery
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _do_handshake(actor, chan):
 | 
			
		||||
    await chan.send(actor.uid)
 | 
			
		||||
    uid = await chan.recv()
 | 
			
		||||
 | 
			
		||||
    if not isinstance(uid, tuple):
 | 
			
		||||
        raise ValueError(f"{uid} is not a valid uid?!")
 | 
			
		||||
 | 
			
		||||
    chan.uid = uid
 | 
			
		||||
    log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
 | 
			
		||||
    return uid
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def result_from_q(q, chan):
 | 
			
		||||
    """Process a msg from a remote actor.
 | 
			
		||||
    """
 | 
			
		||||
    first_msg = await q.get()
 | 
			
		||||
    if 'return' in first_msg:
 | 
			
		||||
        return 'return', first_msg, q
 | 
			
		||||
    elif 'yield' in first_msg:
 | 
			
		||||
        return 'yield', first_msg, q
 | 
			
		||||
    elif 'error' in first_msg:
 | 
			
		||||
        raise RemoteActorError(f"{chan.uid}\n" + first_msg['error'])
 | 
			
		||||
    else:
 | 
			
		||||
        raise ValueError(f"{first_msg} is an invalid response packet?")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Portal:
 | 
			
		||||
    """A 'portal' to a(n) (remote) ``Actor``.
 | 
			
		||||
 | 
			
		||||
    Allows for invoking remote routines and receiving results through an
 | 
			
		||||
    underlying ``tractor.Channel`` as though the remote (async)
 | 
			
		||||
    function / generator was invoked locally.
 | 
			
		||||
 | 
			
		||||
    Think of this like an native async IPC API.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, channel):
 | 
			
		||||
        self.channel = channel
 | 
			
		||||
        self._result = None
 | 
			
		||||
 | 
			
		||||
    async def aclose(self):
 | 
			
		||||
        log.debug(f"Closing {self}")
 | 
			
		||||
        # XXX: won't work until https://github.com/python-trio/trio/pull/460
 | 
			
		||||
        # gets in!
 | 
			
		||||
        await self.channel.aclose()
 | 
			
		||||
 | 
			
		||||
    async def run(self, ns, func, **kwargs):
 | 
			
		||||
        """Submit a function to be scheduled and run by actor, return its
 | 
			
		||||
        (stream of) result(s).
 | 
			
		||||
        """
 | 
			
		||||
        # TODO: not this needs some serious work and thinking about how
 | 
			
		||||
        # to make async-generators the fundamental IPC API over channels!
 | 
			
		||||
        # (think `yield from`, `gen.send()`, and functional reactive stuff)
 | 
			
		||||
        actor = current_actor()
 | 
			
		||||
        # ship a function call request to the remote actor
 | 
			
		||||
        cid, q = await actor.send_cmd(self.channel, ns, func, kwargs)
 | 
			
		||||
        # wait on first response msg and handle
 | 
			
		||||
        return await self._return_from_resptype(
 | 
			
		||||
            cid, *(await result_from_q(q, self.channel)))
 | 
			
		||||
 | 
			
		||||
    async def _return_from_resptype(self, cid, resptype, first_msg, q):
 | 
			
		||||
 | 
			
		||||
        if resptype == 'yield':
 | 
			
		||||
 | 
			
		||||
            async def yield_from_q():
 | 
			
		||||
                yield first_msg['yield']
 | 
			
		||||
                try:
 | 
			
		||||
                    async for msg in q:
 | 
			
		||||
                        try:
 | 
			
		||||
                            yield msg['yield']
 | 
			
		||||
                        except KeyError:
 | 
			
		||||
                            if 'stop' in msg:
 | 
			
		||||
                                break  # far end async gen terminated
 | 
			
		||||
                            else:
 | 
			
		||||
                                raise RemoteActorError(msg['error'])
 | 
			
		||||
                except GeneratorExit:
 | 
			
		||||
                    log.debug(
 | 
			
		||||
                        f"Cancelling async gen call {cid} to "
 | 
			
		||||
                        f"{self.channel.uid}")
 | 
			
		||||
                    raise
 | 
			
		||||
 | 
			
		||||
            return yield_from_q()
 | 
			
		||||
 | 
			
		||||
        elif resptype == 'return':
 | 
			
		||||
            return first_msg['return']
 | 
			
		||||
        else:
 | 
			
		||||
            raise ValueError(f"Unknown msg response type: {first_msg}")
 | 
			
		||||
 | 
			
		||||
    async def result(self):
 | 
			
		||||
        """Return the result(s) from the remote actor's "main" task.
 | 
			
		||||
        """
 | 
			
		||||
        if self._result is None:
 | 
			
		||||
            q = current_actor().get_waitq(self.channel.uid, 'main')
 | 
			
		||||
            resptype, first_msg, q = (await result_from_q(q, self.channel))
 | 
			
		||||
            self._result = await self._return_from_resptype(
 | 
			
		||||
                'main', resptype, first_msg, q)
 | 
			
		||||
            log.warn(
 | 
			
		||||
                f"Retrieved first result `{self._result}` "
 | 
			
		||||
                f"for {self.channel.uid}")
 | 
			
		||||
        # await q.put(first_msg)  # for next consumer (e.g. nursery)
 | 
			
		||||
        return self._result
 | 
			
		||||
 | 
			
		||||
    async def close(self):
 | 
			
		||||
        # trigger remote msg loop `break`
 | 
			
		||||
        chan = self.channel
 | 
			
		||||
        log.debug(f"Closing portal for {chan} to {chan.uid}")
 | 
			
		||||
        await self.channel.send(None)
 | 
			
		||||
 | 
			
		||||
    async def cancel_actor(self):
 | 
			
		||||
        """Cancel the actor on the other end of this portal.
 | 
			
		||||
        """
 | 
			
		||||
        log.warn(
 | 
			
		||||
            f"Sending cancel request to {self.channel.uid} on "
 | 
			
		||||
            f"{self.channel}")
 | 
			
		||||
        try:
 | 
			
		||||
            with trio.move_on_after(0.1) as cancel_scope:
 | 
			
		||||
                cancel_scope.shield = True
 | 
			
		||||
                # send cancel cmd - might not get response
 | 
			
		||||
                await self.run('self', 'cancel')
 | 
			
		||||
                return True
 | 
			
		||||
        except trio.ClosedStreamError:
 | 
			
		||||
            log.warn(
 | 
			
		||||
                f"{self.channel} for {self.channel.uid} was already closed?")
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LocalPortal:
 | 
			
		||||
    """A 'portal' to a local ``Actor``.
 | 
			
		||||
 | 
			
		||||
    A compatibility shim for normal portals but for invoking functions
 | 
			
		||||
    using an in process actor instance.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, actor):
 | 
			
		||||
        self.actor = actor
 | 
			
		||||
 | 
			
		||||
    async def run(self, ns, func, **kwargs):
 | 
			
		||||
        """Run a requested function locally and return it's result.
 | 
			
		||||
        """
 | 
			
		||||
        obj = self.actor if ns == 'self' else importlib.import_module(ns)
 | 
			
		||||
        func = getattr(obj, func)
 | 
			
		||||
        return func(**kwargs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def open_portal(channel, nursery=None):
 | 
			
		||||
    """Open a ``Portal`` through the provided ``channel``.
 | 
			
		||||
 | 
			
		||||
    Spawns a background task to handle message processing.
 | 
			
		||||
    """
 | 
			
		||||
    actor = current_actor()
 | 
			
		||||
    assert actor
 | 
			
		||||
    was_connected = False
 | 
			
		||||
 | 
			
		||||
    async with maybe_open_nursery(nursery) as nursery:
 | 
			
		||||
 | 
			
		||||
        if not channel.connected():
 | 
			
		||||
            await channel.connect()
 | 
			
		||||
            was_connected = True
 | 
			
		||||
 | 
			
		||||
        if channel.uid is None:
 | 
			
		||||
            await _do_handshake(actor, channel)
 | 
			
		||||
 | 
			
		||||
        nursery.start_soon(actor._process_messages, channel)
 | 
			
		||||
        portal = Portal(channel)
 | 
			
		||||
        yield portal
 | 
			
		||||
 | 
			
		||||
        # cancel remote channel-msg loop
 | 
			
		||||
        if channel.connected():
 | 
			
		||||
            await portal.close()
 | 
			
		||||
 | 
			
		||||
        # cancel background msg loop task
 | 
			
		||||
        nursery.cancel_scope.cancel()
 | 
			
		||||
        if was_connected:
 | 
			
		||||
            await channel.aclose()
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,10 @@
 | 
			
		|||
"""
 | 
			
		||||
Per process state
 | 
			
		||||
"""
 | 
			
		||||
_current_actor = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def current_actor() -> 'Actor':
 | 
			
		||||
    """Get the process-local actor instance.
 | 
			
		||||
    """
 | 
			
		||||
    return _current_actor
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,175 @@
 | 
			
		|||
"""
 | 
			
		||||
``trio`` inspired apis and helpers
 | 
			
		||||
"""
 | 
			
		||||
import multiprocessing as mp
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from async_generator import asynccontextmanager
 | 
			
		||||
 | 
			
		||||
from ._state import current_actor
 | 
			
		||||
from .log import get_logger, get_loglevel
 | 
			
		||||
from ._actor import Actor, ActorFailure
 | 
			
		||||
from ._portal import Portal
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ctx = mp.get_context("forkserver")
 | 
			
		||||
log = get_logger('tractor')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ActorNursery:
 | 
			
		||||
    """Spawn scoped subprocess actors.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, actor, supervisor=None):
 | 
			
		||||
        self.supervisor = supervisor  # TODO
 | 
			
		||||
        self._actor = actor
 | 
			
		||||
        # We'll likely want some way to cancel all sub-actors eventually
 | 
			
		||||
        # self.cancel_scope = cancel_scope
 | 
			
		||||
        self._children = {}
 | 
			
		||||
        self.cancelled = False
 | 
			
		||||
 | 
			
		||||
    async def __aenter__(self):
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    async def start_actor(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str,
 | 
			
		||||
        main=None,
 | 
			
		||||
        bind_addr=('127.0.0.1', 0),
 | 
			
		||||
        statespace=None,
 | 
			
		||||
        rpc_module_paths=None,
 | 
			
		||||
        outlive_main=False,  # sub-actors die when their main task completes
 | 
			
		||||
        loglevel=None,  # set log level per subactor
 | 
			
		||||
    ):
 | 
			
		||||
        loglevel = loglevel or self._actor.loglevel or get_loglevel()
 | 
			
		||||
        actor = Actor(
 | 
			
		||||
            name,
 | 
			
		||||
            # modules allowed to invoked funcs from
 | 
			
		||||
            rpc_module_paths=rpc_module_paths or [],
 | 
			
		||||
            statespace=statespace,  # global proc state vars
 | 
			
		||||
            main=main,  # main coroutine to be invoked
 | 
			
		||||
            outlive_main=outlive_main,
 | 
			
		||||
            loglevel=loglevel,
 | 
			
		||||
            arbiter_addr=current_actor()._arb_addr,
 | 
			
		||||
        )
 | 
			
		||||
        parent_addr = self._actor.accept_addr
 | 
			
		||||
        assert parent_addr
 | 
			
		||||
        proc = ctx.Process(
 | 
			
		||||
            target=actor._fork_main,
 | 
			
		||||
            args=(bind_addr, parent_addr),
 | 
			
		||||
            # daemon=True,
 | 
			
		||||
            name=name,
 | 
			
		||||
        )
 | 
			
		||||
        proc.start()
 | 
			
		||||
        if not proc.is_alive():
 | 
			
		||||
            raise ActorFailure("Couldn't start sub-actor?")
 | 
			
		||||
 | 
			
		||||
        log.info(f"Started {proc}")
 | 
			
		||||
        # wait for actor to spawn and connect back to us
 | 
			
		||||
        # channel should have handshake completed by the
 | 
			
		||||
        # local actor by the time we get a ref to it
 | 
			
		||||
        event, chan = await self._actor.wait_for_peer(actor.uid)
 | 
			
		||||
        portal = Portal(chan)
 | 
			
		||||
        self._children[(name, proc.pid)] = (actor, proc, portal)
 | 
			
		||||
        return portal
 | 
			
		||||
 | 
			
		||||
    async def wait(self):
 | 
			
		||||
        """Wait for all subactors to complete.
 | 
			
		||||
        """
 | 
			
		||||
        async def wait_for_proc(proc, actor, portal):
 | 
			
		||||
            # TODO: timeout block here?
 | 
			
		||||
            if proc.is_alive():
 | 
			
		||||
                await trio.hazmat.wait_readable(proc.sentinel)
 | 
			
		||||
            # please god don't hang
 | 
			
		||||
            proc.join()
 | 
			
		||||
            log.debug(f"Joined {proc}")
 | 
			
		||||
            event = self._actor._peers.get(actor.uid)
 | 
			
		||||
            if isinstance(event, trio.Event):
 | 
			
		||||
                event.set()
 | 
			
		||||
                log.warn(
 | 
			
		||||
                    f"Cancelled `wait_for_peer()` call since {actor.uid}"
 | 
			
		||||
                    f" is already dead!")
 | 
			
		||||
            if not portal._result:
 | 
			
		||||
                log.debug(f"Faking result for {actor.uid}")
 | 
			
		||||
                q = self._actor.get_waitq(actor.uid, 'main')
 | 
			
		||||
                q.put_nowait({'return': None, 'cid': 'main'})
 | 
			
		||||
 | 
			
		||||
        async def wait_for_result(portal):
 | 
			
		||||
            if portal.channel.connected():
 | 
			
		||||
                log.debug(f"Waiting on final result from {subactor.uid}")
 | 
			
		||||
                await portal.result()
 | 
			
		||||
 | 
			
		||||
        # unblocks when all waiter tasks have completed
 | 
			
		||||
        async with trio.open_nursery() as nursery:
 | 
			
		||||
            for subactor, proc, portal in self._children.values():
 | 
			
		||||
                nursery.start_soon(wait_for_proc, proc, subactor, portal)
 | 
			
		||||
                nursery.start_soon(wait_for_result, portal)
 | 
			
		||||
 | 
			
		||||
    async def cancel(self, hard_kill=False):
 | 
			
		||||
        """Cancel this nursery by instructing each subactor to cancel
 | 
			
		||||
        iteslf and wait for all subprocesses to terminate.
 | 
			
		||||
 | 
			
		||||
        If ``hard_killl`` is set to ``True`` then kill the processes
 | 
			
		||||
        directly without any far end graceful ``trio`` cancellation.
 | 
			
		||||
        """
 | 
			
		||||
        log.debug(f"Cancelling nursery")
 | 
			
		||||
        for subactor, proc, portal in self._children.values():
 | 
			
		||||
            if proc is mp.current_process():
 | 
			
		||||
                # XXX: does this even make sense?
 | 
			
		||||
                await subactor.cancel()
 | 
			
		||||
            else:
 | 
			
		||||
                if hard_kill:
 | 
			
		||||
                    log.warn(f"Hard killing subactors {self._children}")
 | 
			
		||||
                    proc.terminate()
 | 
			
		||||
                    # XXX: doesn't seem to work?
 | 
			
		||||
                    # send KeyBoardInterrupt (trio abort signal) to sub-actors
 | 
			
		||||
                    # os.kill(proc.pid, signal.SIGINT)
 | 
			
		||||
                else:
 | 
			
		||||
                    await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
        log.debug(f"Waiting on all subactors to complete")
 | 
			
		||||
        await self.wait()
 | 
			
		||||
        self.cancelled = True
 | 
			
		||||
        log.debug(f"All subactors for {self} have terminated")
 | 
			
		||||
 | 
			
		||||
    async def __aexit__(self, etype, value, tb):
 | 
			
		||||
        """Wait on all subactor's main routines to complete.
 | 
			
		||||
        """
 | 
			
		||||
        if etype is not None:
 | 
			
		||||
            # XXX: hypothetically an error could be raised and then
 | 
			
		||||
            # a cancel signal shows up slightly after in which case the
 | 
			
		||||
            # else block here might not complete? Should both be shielded?
 | 
			
		||||
            if etype is trio.Cancelled:
 | 
			
		||||
                with trio.open_cancel_scope(shield=True):
 | 
			
		||||
                    log.warn(
 | 
			
		||||
                        f"{current_actor().uid} was cancelled with {etype}"
 | 
			
		||||
                        ", cancelling actor nursery")
 | 
			
		||||
                    await self.cancel()
 | 
			
		||||
            else:
 | 
			
		||||
                log.exception(
 | 
			
		||||
                    f"{current_actor().uid} errored with {etype}, "
 | 
			
		||||
                    "cancelling actor nursery")
 | 
			
		||||
                await self.cancel()
 | 
			
		||||
        else:
 | 
			
		||||
            # XXX: this is effectively the lone cancellation/supervisor
 | 
			
		||||
            # strategy which exactly mimicks trio's behaviour
 | 
			
		||||
            log.debug(f"Waiting on subactors {self._children} to complete")
 | 
			
		||||
            try:
 | 
			
		||||
                await self.wait()
 | 
			
		||||
            except Exception as err:
 | 
			
		||||
                log.warn(f"Nursery caught {err}, cancelling")
 | 
			
		||||
                await self.cancel()
 | 
			
		||||
                raise
 | 
			
		||||
            log.debug(f"Nursery teardown complete")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def open_nursery(supervisor=None):
 | 
			
		||||
    """Create and yield a new ``ActorNursery``.
 | 
			
		||||
    """
 | 
			
		||||
    actor = current_actor()
 | 
			
		||||
    if not actor:
 | 
			
		||||
        raise RuntimeError("No actor instance has been defined yet?")
 | 
			
		||||
 | 
			
		||||
    # TODO: figure out supervisors from erlang
 | 
			
		||||
    async with ActorNursery(current_actor(), supervisor) as nursery:
 | 
			
		||||
        yield nursery
 | 
			
		||||
| 
						 | 
				
			
			@ -7,6 +7,7 @@ import logging
 | 
			
		|||
import colorlog
 | 
			
		||||
 | 
			
		||||
_proj_name = 'tractor'
 | 
			
		||||
_default_loglevel = None
 | 
			
		||||
 | 
			
		||||
# Super sexy formatting thanks to ``colorlog``.
 | 
			
		||||
# (NOTE: we use the '{' format style)
 | 
			
		||||
| 
						 | 
				
			
			@ -89,3 +90,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger:
 | 
			
		|||
        log.addHandler(handler)
 | 
			
		||||
 | 
			
		||||
    return log
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_loglevel():
 | 
			
		||||
    return _default_loglevel
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue