diff --git a/tests/conftest.py b/tests/conftest.py index e7b63da..164dc8b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ def pytest_addoption(parser): @pytest.fixture(scope='session', autouse=True) def loglevel(request): - orig = tractor._default_loglevel - level = tractor._default_loglevel = request.config.option.loglevel + orig = tractor.log._default_loglevel + level = tractor.log._default_loglevel = request.config.option.loglevel yield level - tractor._default_loglevel = orig + tractor.log._default_loglevel = orig diff --git a/tractor/__init__.py b/tractor/__init__.py index 1b10913..ec055ad 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -2,1024 +2,35 @@ tractor: An actor model micro-framework built on ``trio`` and ``multiprocessing``. """ -from collections import defaultdict from functools import partial -from typing import Coroutine -import importlib -import inspect -import multiprocessing as mp -import traceback -import uuid import trio -from async_generator import asynccontextmanager, aclosing -from .ipc import Channel, _connect_chan -from .log import get_console_log, get_logger +from .log import get_console_log, get_logger, get_loglevel +from ._ipc import _connect_chan +from ._actor import ( + Actor, _start_actor, Arbiter, get_arbiter, find_actor +) +from ._trionics import open_nursery +from ._state import current_actor +from ._portal import RemoteActorError + + +__all__ = [ + 'current_actor', 'find_actor', 'get_arbiter', 'open_nursery', + 'RemoteActorError', +] -ctx = mp.get_context("forkserver") -log = get_logger('tractor') # set at startup and after forks -_current_actor = None _default_arbiter_host = '127.0.0.1' _default_arbiter_port = 1616 -_default_loglevel = None - - -def get_loglevel(): - return _default_loglevel - - -class ActorFailure(Exception): - "General actor failure" - - -class RemoteActorError(ActorFailure): - "Remote actor exception bundled locally" - - -@asynccontextmanager -async def maybe_open_nursery(nursery=None): - """Create a new nursery if None provided. - - Blocks on exit as expected if no input nursery is provided. - """ - if nursery is not None: - yield nursery - else: - async with trio.open_nursery() as nursery: - yield nursery - - -async def _invoke( - cid, chan, func, kwargs, - treat_as_gen=False, raise_errs=False, - task_status=trio.TASK_STATUS_IGNORED -): - """Invoke local func and return results over provided channel. - """ - try: - is_async_partial = False - is_async_gen_partial = False - if isinstance(func, partial): - is_async_partial = inspect.iscoroutinefunction(func.func) - is_async_gen_partial = inspect.isasyncgenfunction(func.func) - - if ( - not inspect.iscoroutinefunction(func) and - not inspect.isasyncgenfunction(func) and - not is_async_partial and - not is_async_gen_partial - ): - await chan.send({'return': func(**kwargs), 'cid': cid}) - else: - coro = func(**kwargs) - - if inspect.isasyncgen(coro): - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to properly handle the closing (aclosing) - # of the async gen in order to be sure the cancel - # is propagated! - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - - log.debug(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': None, 'cid': cid}) - else: - if treat_as_gen: - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - await coro - else: - await chan.send({'return': await coro, 'cid': cid}) - - except Exception: - log.exception("Actor errored:") - if not raise_errs: - await chan.send({'error': traceback.format_exc(), 'cid': cid}) - else: - raise - - task_status.started() - - -async def result_from_q(q, chan): - """Process a msg from a remote actor. - """ - first_msg = await q.get() - if 'return' in first_msg: - return 'return', first_msg, q - elif 'yield' in first_msg: - return 'yield', first_msg, q - elif 'error' in first_msg: - raise RemoteActorError(f"{chan.uid}\n" + first_msg['error']) - else: - raise ValueError(f"{first_msg} is an invalid response packet?") - - -async def _do_handshake(actor, chan): - await chan.send(actor.uid) - uid = await chan.recv() - - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") - - chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") - return uid - - -class Actor: - """The fundamental concurrency primitive. - - An *actor* is the combination of a regular Python or - ``multiprocessing.Process`` executing a ``trio`` task tree, communicating - with other actors through "portals" which provide a native async API - around "channels". - """ - is_arbiter = False - - def __init__( - self, - name: str, - main: Coroutine = None, - rpc_module_paths: [str] = [], - statespace: dict = {}, - uid: str = None, - allow_rpc: bool = True, - outlive_main: bool = False, - loglevel: str = None, - arbiter_addr: (str, int) = None, - ): - self.name = name - self.uid = (name, uid or str(uuid.uuid1())) - self.rpc_module_paths = rpc_module_paths - self._mods = {} - self.main = main - # TODO: consider making this a dynamically defined - # @dataclass once we get py3.7 - self.statespace = statespace - self._allow_rpc = allow_rpc - self._outlive_main = outlive_main - self.loglevel = loglevel - self._arb_addr = arbiter_addr - - # filled in by `_async_main` after fork - self._peers = defaultdict(list) - self._peer_connected = {} - self._no_more_peers = trio.Event() - self._main_complete = trio.Event() - self._main_scope = None - self._no_more_peers.set() - self._actors2calls = {} # map {uids -> {callids -> waiter queues}} - self._listeners = [] - self._parent_chan = None - self._accept_host = None - - async def wait_for_peer(self, uid): - """Wait for a connection back from a spawned actor with a given - ``uid``. - """ - log.debug(f"Waiting for peer {uid} to connect") - event = self._peer_connected.setdefault(uid, trio.Event()) - await event.wait() - log.debug(f"{uid} successfully connected back to us") - return event, self._peers[uid][-1] - - def load_namespaces(self): - # We load namespaces after fork since this actor may - # be spawned on a different machine from the original nursery - # and we need to try and load the local module code (if it - # exists) - for path in self.rpc_module_paths: - self._mods[path] = importlib.import_module(path) - - async def _stream_handler( - self, - stream: trio.SocketStream, - ): - """ - Entry point for new inbound connections to the channel server. - """ - self._no_more_peers.clear() - chan = Channel(stream=stream) - log.info(f"New connection to us {chan}") - - # send/receive initial handshake response - try: - uid = await _do_handshake(self, chan) - except StopAsyncIteration: - log.warn(f"Channel {chan} failed to handshake") - return - - # channel tracking - event = self._peer_connected.pop(uid, None) - if event: - # Instructing connection: this is likely a new channel to - # a recently spawned actor which we'd like to control via - # async-rpc calls. - log.debug(f"Waking channel waiters {event.statistics()}") - # Alert any task waiting on this connection to come up - event.set() - - chans = self._peers[uid] - if chans: - log.warn( - f"already have channel(s) for {uid}:{chans}?" - ) - log.debug(f"Registered {chan} for {uid}") - # append new channel - self._peers[uid].append(chan) - - # Begin channel management - respond to remote requests and - # process received reponses. - try: - await self._process_messages(chan) - finally: - # Drop ref to channel so it can be gc-ed and disconnected - log.debug(f"Releasing channel {chan} from {chan.uid}") - chans = self._peers.get(chan.uid) - chans.remove(chan) - if not chans: - log.debug(f"No more channels for {chan.uid}") - self._peers.pop(chan.uid, None) - if not self._actors2calls.get(chan.uid, {}).get('main'): - # fake a "main task" result for any waiting - # nurseries/portals - log.debug(f"Faking result for {chan} from {chan.uid}") - q = self.get_waitq(chan.uid, 'main') - q.put_nowait({'return': None, 'cid': 'main'}) - - log.debug(f"Peers is {self._peers}") - - if not self._peers: # no more channels connected - self._no_more_peers.set() - log.debug(f"Signalling no more peer channels") - - # XXX: is this necessary? - if chan.connected(): - log.debug(f"Disconnecting channel {chan}") - await chan.send(None) - await chan.aclose() - - async def _push_result(self, actorid, cid, msg): - assert actorid, f"`actorid` can't be {actorid}" - q = self.get_waitq(actorid, cid) - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - # maintain backpressure - await q.put(msg) - - def get_waitq(self, actorid, cid): - log.debug(f"Getting result queue for {actorid} cid {cid}") - cids2qs = self._actors2calls.setdefault(actorid, {}) - return cids2qs.setdefault(cid, trio.Queue(1000)) - - async def send_cmd(self, chan, ns, func, kwargs): - """Send a ``'cmd'`` message to a remote actor and return a - caller id and a ``trio.Queue`` that can be used to wait for - responses delivered by the local message processing loop. - """ - cid = str(uuid.uuid1()) - q = self.get_waitq(chan.uid, cid) - log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") - await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, q - - async def _process_messages(self, chan, treat_as_gen=False): - """Process messages async-RPC style. - - Process rpc requests and deliver retrieved responses from channels. - """ - # TODO: once https://github.com/python-trio/trio/issues/467 gets - # worked out we'll likely want to use that! - log.debug(f"Entering msg loop for {chan} from {chan.uid}") - async with trio.open_nursery() as nursery: - try: - async for msg in chan.aiter_recv(): - if msg is None: # terminate sentinel - log.debug( - f"Cancelling all tasks for {chan} from {chan.uid}") - nursery.cancel_scope.cancel() - log.debug( - f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") - break - log.debug(f"Received msg {msg} from {chan.uid}") - cid = msg.get('cid') - if cid: # deliver response to local caller/waiter - await self._push_result(chan.uid, cid, msg) - log.debug( - f"Waiting on next msg for {chan} from {chan.uid}") - continue - else: - ns, funcname, kwargs, actorid, cid = msg['cmd'] - - log.debug( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - if ns == 'self': - func = getattr(self, funcname) - else: - func = getattr(self._mods[ns], funcname) - - # spin up a task for the requested function - sig = inspect.signature(func) - treat_as_gen = False - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its - # signature will be treated as one. - treat_as_gen = True - - log.debug(f"Spawning task for {func}") - nursery.start_soon( - _invoke, cid, chan, func, kwargs, treat_as_gen, - name=funcname - ) - log.debug( - f"Waiting on next msg for {chan} from {chan.uid}") - else: # channel disconnect - log.debug(f"{chan} from {chan.uid} disconnected") - except trio.ClosedStreamError: - log.error(f"{chan} form {chan.uid} broke") - - log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - - def _fork_main(self, accept_addr, parent_addr=None): - # after fork routine which invokes a fresh ``trio.run`` - # log.warn("Log level after fork is {self.loglevel}") - if self.loglevel is not None: - get_console_log(self.loglevel) - log.info( - f"Started new {ctx.current_process()} for actor {self.uid}") - global _current_actor - _current_actor = self - log.debug(f"parent_addr is {parent_addr}") - try: - trio.run(partial( - self._async_main, accept_addr, parent_addr=parent_addr)) - except KeyboardInterrupt: - pass # handle it the same way trio does? - log.debug(f"Actor {self.uid} terminated") - - async def _async_main( - self, - accept_addr, - arbiter_addr=None, - parent_addr=None, - nursery=None - ): - """Start the channel server, maybe connect back to the parent, and - start the main task. - - A "root-most" (or "top-level") nursery for this actor is opened here - and when cancelled effectively cancels the actor. - """ - result = None - arbiter_addr = arbiter_addr or self._arb_addr - registered_with_arbiter = False - try: - async with maybe_open_nursery(nursery) as nursery: - self._root_nursery = nursery - - # Startup up channel server - host, port = accept_addr - await nursery.start(partial( - self._serve_forever, accept_host=host, accept_port=port) - ) - - # XXX: I wonder if a better name is maybe "requester" - # since I don't think the notion of a "parent" actor - # necessarily sticks given that eventually we want - # ``'MainProcess'`` (the actor who initially starts the - # forkserver) to eventually be the only one who is - # allowed to spawn new processes per Python program. - if parent_addr is not None: - try: - # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error ship the - # exception back to the parent actor) - chan = self._parent_chan = Channel( - destaddr=parent_addr, - on_reconnect=self.main - ) - await chan.connect() - # initial handshake, report who we are, who they are - await _do_handshake(self, chan) - except OSError: # failed to connect - log.warn( - f"Failed to connect to parent @ {parent_addr}," - " closing server") - self.cancel_server() - self._parent_chan = None - - # register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) - registered_with_arbiter = True - - # handle new connection back to parent optionally - # begin responding to RPC - if self._allow_rpc: - self.load_namespaces() - if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) - - if self.main: - try: - if self._parent_chan: - async with trio.open_nursery() as n: - self._main_scope = n.cancel_scope - log.debug(f"Starting main task `{self.main}`") - # spawned subactor so deliver "main" - # task result(s) back to parent - await n.start( - _invoke, 'main', - self._parent_chan, self.main, {}, - # treat_as_gen, raise_errs params - False, True - ) - else: - with trio.open_cancel_scope() as main_scope: - self._main_scope = main_scope - # run directly we are an "unspawned actor" - log.debug(f"Running `{self.main}` directly") - result = await self.main() - finally: - # tear down channel server in order to ensure - # we exit normally when the main task is done - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() - log.debug(f"Shutting down root nursery") - nursery.cancel_scope.cancel() - self._main_complete.set() - - if self._main_scope.cancelled_caught: - log.debug("Main task was cancelled sucessfully") - log.debug("Waiting on root nursery to complete") - # blocks here as expected if no nursery was provided until - # the channel server is killed (i.e. this actor is - # cancelled or signalled by the parent actor) - except Exception: - if self._parent_chan: - try: - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'main'}) - except trio.ClosedStreamError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") - log.exception("Actor errored:") - - if not registered_with_arbiter: - log.exception( - f"Failed to register with arbiter @ {arbiter_addr}") - else: - raise - finally: - await self._do_unreg(arbiter_addr) - # terminate actor once all it's peers (actors that connected - # to it as clients) have disappeared - if not self._no_more_peers.is_set(): - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() - log.debug(f"All peer channels are complete") - - # tear down channel server no matter what since we errored - # or completed - log.debug(f"Shutting down channel server") - self.cancel_server() - - return result - - async def _serve_forever( - self, - *, - # (host, port) to bind for channel server - accept_host=None, - accept_port=0, - task_status=trio.TASK_STATUS_IGNORED - ): - """Start the channel server, begin listening for new connections. - - This will cause an actor to continue living (blocking) until - ``cancel_server()`` is called. - """ - async with trio.open_nursery() as nursery: - self._server_nursery = nursery - # TODO: might want to consider having a separate nursery - # for the stream handler such that the server can be cancelled - # whilst leaving existing channels up - listeners = await nursery.start( - partial( - trio.serve_tcp, - self._stream_handler, - # new connections will stay alive even if this server - # is cancelled - handler_nursery=self._root_nursery, - port=accept_port, host=accept_host, - ) - ) - log.debug( - f"Started tcp server(s) on {[l.socket for l in listeners]}") - self._listeners.extend(listeners) - task_status.started() - - async def _do_unreg(self, arbiter_addr): - # UNregister actor from the arbiter - try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'unregister_actor', name=self.name) - except OSError: - log.warn(f"Unable to unregister {self.name} from arbiter") - - async def cancel(self): - """This cancels the internal root-most nursery thereby gracefully - cancelling (for all intents and purposes) this actor. - """ - self.cancel_server() - if self._main_scope: - self._main_scope.cancel() - log.debug("Waiting on main task to complete") - await self._main_complete.wait() - self._root_nursery.cancel_scope.cancel() - - def cancel_server(self): - """Cancel the internal channel server nursery thereby - preventing any new inbound connections from being established. - """ - self._server_nursery.cancel_scope.cancel() - - @property - def accept_addr(self): - """Primary address to which the channel server is bound. - """ - try: - return self._listeners[0].socket.getsockname() - except OSError: - return - - def get_parent(self): - return Portal(self._parent_chan) - - def get_chans(self, actorid): - return self._peers[actorid] - - -class Arbiter(Actor): - """A special actor who knows all the other actors and always has - access to the top level nursery. - - The arbiter is by default the first actor spawned on each host - and is responsible for keeping track of all other actors for - coordination purposes. If a new main process is launched and an - arbiter is already running that arbiter will be used. - """ - _registry = defaultdict(list) - is_arbiter = True - - def find_actor(self, name): - return self._registry[name] - - def register_actor(self, name, sockaddr): - self._registry[name].append(sockaddr) - - def unregister_actor(self, name): - self._registry.pop(name, None) - - -class Portal: - """A 'portal' to a(n) (remote) ``Actor``. - - Allows for invoking remote routines and receiving results through an - underlying ``tractor.Channel`` as though the remote (async) - function / generator was invoked locally. - - Think of this like an native async IPC API. - """ - def __init__(self, channel): - self.channel = channel - self._result = None - - async def aclose(self): - log.debug(f"Closing {self}") - # XXX: won't work until https://github.com/python-trio/trio/pull/460 - # gets in! - await self.channel.aclose() - - async def run(self, ns, func, **kwargs): - """Submit a function to be scheduled and run by actor, return its - (stream of) result(s). - """ - # TODO: not this needs some serious work and thinking about how - # to make async-generators the fundamental IPC API over channels! - # (think `yield from`, `gen.send()`, and functional reactive stuff) - actor = current_actor() - # ship a function call request to the remote actor - cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) - # wait on first response msg and handle - return await self._return_from_resptype( - cid, *(await result_from_q(q, self.channel))) - - async def _return_from_resptype(self, cid, resptype, first_msg, q): - - if resptype == 'yield': - - async def yield_from_q(): - yield first_msg['yield'] - try: - async for msg in q: - try: - yield msg['yield'] - except KeyError: - if 'stop' in msg: - break # far end async gen terminated - else: - raise RemoteActorError(msg['error']) - except GeneratorExit: - log.debug( - f"Cancelling async gen call {cid} to " - f"{self.channel.uid}") - raise - - return yield_from_q() - - elif resptype == 'return': - return first_msg['return'] - else: - raise ValueError(f"Unknown msg response type: {first_msg}") - - async def result(self): - """Return the result(s) from the remote actor's "main" task. - """ - if self._result is None: - q = current_actor().get_waitq(self.channel.uid, 'main') - resptype, first_msg, q = (await result_from_q(q, self.channel)) - self._result = await self._return_from_resptype( - 'main', resptype, first_msg, q) - log.warn( - f"Retrieved first result `{self._result}` " - f"for {self.channel.uid}") - # await q.put(first_msg) # for next consumer (e.g. nursery) - return self._result - - async def close(self): - # trigger remote msg loop `break` - chan = self.channel - log.debug(f"Closing portal for {chan} to {chan.uid}") - await self.channel.send(None) - - async def cancel_actor(self): - """Cancel the actor on the other end of this portal. - """ - log.warn( - f"Sending cancel request to {self.channel.uid} on " - f"{self.channel}") - try: - with trio.move_on_after(0.1) as cancel_scope: - cancel_scope.shield = True - # send cancel cmd - might not get response - await self.run('self', 'cancel') - return True - except trio.ClosedStreamError: - log.warn( - f"{self.channel} for {self.channel.uid} was already closed?") - return False - - -@asynccontextmanager -async def open_portal(channel, nursery=None): - """Open a ``Portal`` through the provided ``channel``. - - Spawns a background task to handle message processing. - """ - actor = current_actor() - assert actor - was_connected = False - - async with maybe_open_nursery(nursery) as nursery: - - if not channel.connected(): - await channel.connect() - was_connected = True - - if channel.uid is None: - await _do_handshake(actor, channel) - - nursery.start_soon(actor._process_messages, channel) - portal = Portal(channel) - yield portal - - # cancel remote channel-msg loop - if channel.connected(): - await portal.close() - - # cancel background msg loop task - nursery.cancel_scope.cancel() - if was_connected: - await channel.aclose() - - -class LocalPortal: - """A 'portal' to a local ``Actor``. - - A compatibility shim for normal portals but for invoking functions - using an in process actor instance. - """ - def __init__(self, actor): - self.actor = actor - - async def run(self, ns, func, **kwargs): - """Run a requested function locally and return it's result. - """ - obj = self.actor if ns == 'self' else importlib.import_module(ns) - func = getattr(obj, func) - return func(**kwargs) - - -class ActorNursery: - """Spawn scoped subprocess actors. - """ - def __init__(self, actor, supervisor=None): - self.supervisor = supervisor # TODO - self._actor = actor - # We'll likely want some way to cancel all sub-actors eventually - # self.cancel_scope = cancel_scope - self._children = {} - self.cancelled = False - - async def __aenter__(self): - return self - - async def start_actor( - self, - name: str, - main=None, - bind_addr=('127.0.0.1', 0), - statespace=None, - rpc_module_paths=None, - outlive_main=False, # sub-actors die when their main task completes - loglevel=None, # set log level per subactor - ): - loglevel = loglevel or self._actor.loglevel or get_loglevel() - actor = Actor( - name, - # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths or [], - statespace=statespace, # global proc state vars - main=main, # main coroutine to be invoked - outlive_main=outlive_main, - loglevel=loglevel, - arbiter_addr=current_actor()._arb_addr, - ) - parent_addr = self._actor.accept_addr - assert parent_addr - proc = ctx.Process( - target=actor._fork_main, - args=(bind_addr, parent_addr), - # daemon=True, - name=name, - ) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") - - log.info(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await self._actor.wait_for_peer(actor.uid) - portal = Portal(chan) - self._children[(name, proc.pid)] = (actor, proc, portal) - return portal - - async def wait(self): - """Wait for all subactors to complete. - """ - async def wait_for_proc(proc, actor, portal): - # TODO: timeout block here? - if proc.is_alive(): - await trio.hazmat.wait_readable(proc.sentinel) - # please god don't hang - proc.join() - log.debug(f"Joined {proc}") - event = self._actor._peers.get(actor.uid) - if isinstance(event, trio.Event): - event.set() - log.warn( - f"Cancelled `wait_for_peer()` call since {actor.uid}" - f" is already dead!") - if not portal._result: - log.debug(f"Faking result for {actor.uid}") - q = self._actor.get_waitq(actor.uid, 'main') - q.put_nowait({'return': None, 'cid': 'main'}) - - async def wait_for_result(portal): - if portal.channel.connected(): - log.debug(f"Waiting on final result from {subactor.uid}") - await portal.result() - - # unblocks when all waiter tasks have completed - async with trio.open_nursery() as nursery: - for subactor, proc, portal in self._children.values(): - nursery.start_soon(wait_for_proc, proc, subactor, portal) - nursery.start_soon(wait_for_result, portal) - - async def cancel(self, hard_kill=False): - """Cancel this nursery by instructing each subactor to cancel - iteslf and wait for all subprocesses to terminate. - - If ``hard_killl`` is set to ``True`` then kill the processes - directly without any far end graceful ``trio`` cancellation. - """ - log.debug(f"Cancelling nursery") - for subactor, proc, portal in self._children.values(): - if proc is mp.current_process(): - # XXX: does this even make sense? - await subactor.cancel() - else: - if hard_kill: - log.warn(f"Hard killing subactors {self._children}") - proc.terminate() - # XXX: doesn't seem to work? - # send KeyBoardInterrupt (trio abort signal) to sub-actors - # os.kill(proc.pid, signal.SIGINT) - else: - await portal.cancel_actor() - - log.debug(f"Waiting on all subactors to complete") - await self.wait() - self.cancelled = True - log.debug(f"All subactors for {self} have terminated") - - async def __aexit__(self, etype, value, tb): - """Wait on all subactor's main routines to complete. - """ - if etype is not None: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case the - # else block here might not complete? Should both be shielded? - if etype is trio.Cancelled: - with trio.open_cancel_scope(shield=True): - log.warn( - f"{current_actor().uid} was cancelled with {etype}" - ", cancelling actor nursery") - await self.cancel() - else: - log.exception( - f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() - else: - # XXX: this is effectively the lone cancellation/supervisor - # strategy which exactly mimicks trio's behaviour - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except Exception as err: - log.warn(f"Nursery caught {err}, cancelling") - await self.cancel() - raise - log.debug(f"Nursery teardown complete") - - -def current_actor() -> Actor: - """Get the process-local actor instance. - """ - return _current_actor - - -@asynccontextmanager -async def open_nursery(supervisor=None): - """Create and yield a new ``ActorNursery``. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - # TODO: figure out supervisors from erlang - async with ActorNursery(current_actor(), supervisor) as nursery: - yield nursery - - -class NoArbiterFound(Exception): - "Couldn't find the arbiter?" - - -async def _start_actor(actor, host, port, arbiter_addr, nursery=None): - """Spawn a local actor by starting a task to execute it's main - async function. - - Blocks if no nursery is provided, in which case it is expected the nursery - provider is responsible for waiting on the task to complete. - """ - # assign process-local actor - global _current_actor - _current_actor = actor - - # start local channel-server and fake the portal API - # NOTE: this won't block since we provide the nursery - log.info(f"Starting local {actor} @ {host}:{port}") - - result = await actor._async_main( - accept_addr=(host, port), - parent_addr=None, - arbiter_addr=arbiter_addr, - nursery=nursery, - ) - # XXX: If spawned locally, the actor is cancelled when this - # context is complete given that there are no more active - # peer channels connected to it. - if not actor._outlive_main: - actor.cancel_server() - - # unset module state - _current_actor = None - log.info("Completed async main") - - return result - - -@asynccontextmanager -async def get_arbiter(host, port): - """Return a portal instance connected to a local or remote - arbiter. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - if actor.is_arbiter: - # we're already the arbiter - # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) - else: - async with _connect_chan(host, port) as chan: - async with open_portal(chan) as arb_portal: - yield arb_portal - - -@asynccontextmanager -async def find_actor( - name, - arbiter_sockaddr=None, -): - """Ask the arbiter to find actor(s) by name. - - Returns a connected portal to the last registered matching actor - known to the arbiter. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'find_actor', name=name) - # TODO: return portals to all available actors - for now just - # the last one that registered - if sockaddrs: - sockaddr = sockaddrs[-1] - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None async def _main(async_fn, args, kwargs, name, arbiter_addr): """Async entry point for ``tractor``. """ + log = get_logger('tractor') main = partial(async_fn, *args) if async_fn else None arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port) diff --git a/tractor/_actor.py b/tractor/_actor.py new file mode 100644 index 0000000..3c4acb2 --- /dev/null +++ b/tractor/_actor.py @@ -0,0 +1,647 @@ +""" +Actor primitives and helpers +""" +import inspect +import importlib +from collections import defaultdict +from functools import partial +from typing import Coroutine +import traceback +import uuid + +import trio +from async_generator import asynccontextmanager, aclosing + +from ._ipc import Channel, _connect_chan +from .log import get_console_log, get_logger +from ._portal import (Portal, open_portal, _do_handshake, LocalPortal, + maybe_open_nursery) +from . import _state +from ._state import current_actor + + +log = get_logger('tractor') + + +class ActorFailure(Exception): + "General actor failure" + + +async def _invoke( + cid, chan, func, kwargs, + treat_as_gen=False, raise_errs=False, + task_status=trio.TASK_STATUS_IGNORED +): + """Invoke local func and return results over provided channel. + """ + try: + is_async_partial = False + is_async_gen_partial = False + if isinstance(func, partial): + is_async_partial = inspect.iscoroutinefunction(func.func) + is_async_gen_partial = inspect.isasyncgenfunction(func.func) + + if ( + not inspect.iscoroutinefunction(func) and + not inspect.isasyncgenfunction(func) and + not is_async_partial and + not is_async_gen_partial + ): + await chan.send({'return': func(**kwargs), 'cid': cid}) + else: + coro = func(**kwargs) + + if inspect.isasyncgen(coro): + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to properly handle the closing (aclosing) + # of the async gen in order to be sure the cancel + # is propagated! + async with aclosing(coro) as agen: + async for item in agen: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) + + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': None, 'cid': cid}) + else: + if treat_as_gen: + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above + await coro + else: + await chan.send({'return': await coro, 'cid': cid}) + + except Exception: + log.exception("Actor errored:") + if not raise_errs: + await chan.send({'error': traceback.format_exc(), 'cid': cid}) + else: + raise + + task_status.started() + + +class Actor: + """The fundamental concurrency primitive. + + An *actor* is the combination of a regular Python or + ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + with other actors through "portals" which provide a native async API + around "channels". + """ + is_arbiter = False + + def __init__( + self, + name: str, + main: Coroutine = None, + rpc_module_paths: [str] = [], + statespace: dict = {}, + uid: str = None, + allow_rpc: bool = True, + outlive_main: bool = False, + loglevel: str = None, + arbiter_addr: (str, int) = None, + ): + self.name = name + self.uid = (name, uid or str(uuid.uuid1())) + self.rpc_module_paths = rpc_module_paths + self._mods = {} + self.main = main + # TODO: consider making this a dynamically defined + # @dataclass once we get py3.7 + self.statespace = statespace + self._allow_rpc = allow_rpc + self._outlive_main = outlive_main + self.loglevel = loglevel + self._arb_addr = arbiter_addr + + # filled in by `_async_main` after fork + self._peers = defaultdict(list) + self._peer_connected = {} + self._no_more_peers = trio.Event() + self._main_complete = trio.Event() + self._main_scope = None + self._no_more_peers.set() + self._actors2calls = {} # map {uids -> {callids -> waiter queues}} + self._listeners = [] + self._parent_chan = None + self._accept_host = None + + async def wait_for_peer(self, uid): + """Wait for a connection back from a spawned actor with a given + ``uid``. + """ + log.debug(f"Waiting for peer {uid} to connect") + event = self._peer_connected.setdefault(uid, trio.Event()) + await event.wait() + log.debug(f"{uid} successfully connected back to us") + return event, self._peers[uid][-1] + + def load_namespaces(self): + # We load namespaces after fork since this actor may + # be spawned on a different machine from the original nursery + # and we need to try and load the local module code (if it + # exists) + for path in self.rpc_module_paths: + self._mods[path] = importlib.import_module(path) + + async def _stream_handler( + self, + stream: trio.SocketStream, + ): + """ + Entry point for new inbound connections to the channel server. + """ + self._no_more_peers.clear() + chan = Channel(stream=stream) + log.info(f"New connection to us {chan}") + + # send/receive initial handshake response + try: + uid = await _do_handshake(self, chan) + except StopAsyncIteration: + log.warn(f"Channel {chan} failed to handshake") + return + + # channel tracking + event = self._peer_connected.pop(uid, None) + if event: + # Instructing connection: this is likely a new channel to + # a recently spawned actor which we'd like to control via + # async-rpc calls. + log.debug(f"Waking channel waiters {event.statistics()}") + # Alert any task waiting on this connection to come up + event.set() + + chans = self._peers[uid] + if chans: + log.warn( + f"already have channel(s) for {uid}:{chans}?" + ) + log.debug(f"Registered {chan} for {uid}") + # append new channel + self._peers[uid].append(chan) + + # Begin channel management - respond to remote requests and + # process received reponses. + try: + await self._process_messages(chan) + finally: + # Drop ref to channel so it can be gc-ed and disconnected + log.debug(f"Releasing channel {chan} from {chan.uid}") + chans = self._peers.get(chan.uid) + chans.remove(chan) + if not chans: + log.debug(f"No more channels for {chan.uid}") + self._peers.pop(chan.uid, None) + if not self._actors2calls.get(chan.uid, {}).get('main'): + # fake a "main task" result for any waiting + # nurseries/portals + log.debug(f"Faking result for {chan} from {chan.uid}") + q = self.get_waitq(chan.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) + + log.debug(f"Peers is {self._peers}") + + if not self._peers: # no more channels connected + self._no_more_peers.set() + log.debug(f"Signalling no more peer channels") + + # XXX: is this necessary? + if chan.connected(): + log.debug(f"Disconnecting channel {chan}") + await chan.send(None) + await chan.aclose() + + async def _push_result(self, actorid, cid, msg): + assert actorid, f"`actorid` can't be {actorid}" + q = self.get_waitq(actorid, cid) + log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + # maintain backpressure + await q.put(msg) + + def get_waitq(self, actorid, cid): + log.debug(f"Getting result queue for {actorid} cid {cid}") + cids2qs = self._actors2calls.setdefault(actorid, {}) + return cids2qs.setdefault(cid, trio.Queue(1000)) + + async def send_cmd(self, chan, ns, func, kwargs): + """Send a ``'cmd'`` message to a remote actor and return a + caller id and a ``trio.Queue`` that can be used to wait for + responses delivered by the local message processing loop. + """ + cid = str(uuid.uuid1()) + q = self.get_waitq(chan.uid, cid) + log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") + await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) + return cid, q + + async def _process_messages(self, chan, treat_as_gen=False): + """Process messages async-RPC style. + + Process rpc requests and deliver retrieved responses from channels. + """ + # TODO: once https://github.com/python-trio/trio/issues/467 gets + # worked out we'll likely want to use that! + log.debug(f"Entering msg loop for {chan} from {chan.uid}") + async with trio.open_nursery() as nursery: + try: + async for msg in chan.aiter_recv(): + if msg is None: # terminate sentinel + log.debug( + f"Cancelling all tasks for {chan} from {chan.uid}") + nursery.cancel_scope.cancel() + log.debug( + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") + break + log.debug(f"Received msg {msg} from {chan.uid}") + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter + await self._push_result(chan.uid, cid, msg) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") + continue + else: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + + log.debug( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + if ns == 'self': + func = getattr(self, funcname) + else: + func = getattr(self._mods[ns], funcname) + + # spin up a task for the requested function + sig = inspect.signature(func) + treat_as_gen = False + if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" + kwargs['chan'] = chan + kwargs['cid'] = cid + # TODO: eventually we want to be more stringent + # about what is considered a far-end async-generator. + # Right now both actual async gens and any async + # function which declares a `chan` kwarg in its + # signature will be treated as one. + treat_as_gen = True + + log.debug(f"Spawning task for {func}") + nursery.start_soon( + _invoke, cid, chan, func, kwargs, treat_as_gen, + name=funcname + ) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") + else: # channel disconnect + log.debug(f"{chan} from {chan.uid} disconnected") + except trio.ClosedStreamError: + log.error(f"{chan} form {chan.uid} broke") + + log.debug(f"Exiting msg loop for {chan} from {chan.uid}") + + def _fork_main(self, accept_addr, parent_addr=None): + # after fork routine which invokes a fresh ``trio.run`` + # log.warn("Log level after fork is {self.loglevel}") + from ._trionics import ctx + if self.loglevel is not None: + get_console_log(self.loglevel) + log.info( + f"Started new {ctx.current_process()} for actor {self.uid}") + _state._current_actor = self + log.debug(f"parent_addr is {parent_addr}") + try: + trio.run(partial( + self._async_main, accept_addr, parent_addr=parent_addr)) + except KeyboardInterrupt: + pass # handle it the same way trio does? + log.debug(f"Actor {self.uid} terminated") + + async def _async_main( + self, + accept_addr, + arbiter_addr=None, + parent_addr=None, + nursery=None + ): + """Start the channel server, maybe connect back to the parent, and + start the main task. + + A "root-most" (or "top-level") nursery for this actor is opened here + and when cancelled effectively cancels the actor. + """ + result = None + arbiter_addr = arbiter_addr or self._arb_addr + registered_with_arbiter = False + try: + async with maybe_open_nursery(nursery) as nursery: + self._root_nursery = nursery + + # Startup up channel server + host, port = accept_addr + await nursery.start(partial( + self._serve_forever, accept_host=host, accept_port=port) + ) + + # XXX: I wonder if a better name is maybe "requester" + # since I don't think the notion of a "parent" actor + # necessarily sticks given that eventually we want + # ``'MainProcess'`` (the actor who initially starts the + # forkserver) to eventually be the only one who is + # allowed to spawn new processes per Python program. + if parent_addr is not None: + try: + # Connect back to the parent actor and conduct initial + # handshake (From this point on if we error ship the + # exception back to the parent actor) + chan = self._parent_chan = Channel( + destaddr=parent_addr, + on_reconnect=self.main + ) + await chan.connect() + # initial handshake, report who we are, who they are + await _do_handshake(self, chan) + except OSError: # failed to connect + log.warn( + f"Failed to connect to parent @ {parent_addr}," + " closing server") + self.cancel_server() + self._parent_chan = None + + # register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) + registered_with_arbiter = True + + # handle new connection back to parent optionally + # begin responding to RPC + if self._allow_rpc: + self.load_namespaces() + if self._parent_chan: + nursery.start_soon( + self._process_messages, self._parent_chan) + + if self.main: + try: + if self._parent_chan: + async with trio.open_nursery() as n: + self._main_scope = n.cancel_scope + log.debug(f"Starting main task `{self.main}`") + # spawned subactor so deliver "main" + # task result(s) back to parent + await n.start( + _invoke, 'main', + self._parent_chan, self.main, {}, + # treat_as_gen, raise_errs params + False, True + ) + else: + with trio.open_cancel_scope() as main_scope: + self._main_scope = main_scope + # run directly we are an "unspawned actor" + log.debug(f"Running `{self.main}` directly") + result = await self.main() + finally: + # tear down channel server in order to ensure + # we exit normally when the main task is done + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + log.debug(f"Shutting down root nursery") + nursery.cancel_scope.cancel() + self._main_complete.set() + + if self._main_scope.cancelled_caught: + log.debug("Main task was cancelled sucessfully") + log.debug("Waiting on root nursery to complete") + # blocks here as expected if no nursery was provided until + # the channel server is killed (i.e. this actor is + # cancelled or signalled by the parent actor) + except Exception: + if self._parent_chan: + try: + await self._parent_chan.send( + {'error': traceback.format_exc(), 'cid': 'main'}) + except trio.ClosedStreamError: + log.error( + f"Failed to ship error to parent " + f"{self._parent_chan.uid}, channel was closed") + log.exception("Actor errored:") + + if not registered_with_arbiter: + log.exception( + f"Failed to register with arbiter @ {arbiter_addr}") + else: + raise + finally: + await self._do_unreg(arbiter_addr) + # terminate actor once all it's peers (actors that connected + # to it as clients) have disappeared + if not self._no_more_peers.is_set(): + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() + log.debug(f"All peer channels are complete") + + # tear down channel server no matter what since we errored + # or completed + log.debug(f"Shutting down channel server") + self.cancel_server() + + return result + + async def _serve_forever( + self, + *, + # (host, port) to bind for channel server + accept_host=None, + accept_port=0, + task_status=trio.TASK_STATUS_IGNORED + ): + """Start the channel server, begin listening for new connections. + + This will cause an actor to continue living (blocking) until + ``cancel_server()`` is called. + """ + async with trio.open_nursery() as nursery: + self._server_nursery = nursery + # TODO: might want to consider having a separate nursery + # for the stream handler such that the server can be cancelled + # whilst leaving existing channels up + listeners = await nursery.start( + partial( + trio.serve_tcp, + self._stream_handler, + # new connections will stay alive even if this server + # is cancelled + handler_nursery=self._root_nursery, + port=accept_port, host=accept_host, + ) + ) + log.debug( + f"Started tcp server(s) on {[l.socket for l in listeners]}") + self._listeners.extend(listeners) + task_status.started() + + async def _do_unreg(self, arbiter_addr): + # UNregister actor from the arbiter + try: + if arbiter_addr is not None: + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'unregister_actor', name=self.name) + except OSError: + log.warn(f"Unable to unregister {self.name} from arbiter") + + async def cancel(self): + """This cancels the internal root-most nursery thereby gracefully + cancelling (for all intents and purposes) this actor. + """ + self.cancel_server() + if self._main_scope: + self._main_scope.cancel() + log.debug("Waiting on main task to complete") + await self._main_complete.wait() + self._root_nursery.cancel_scope.cancel() + + def cancel_server(self): + """Cancel the internal channel server nursery thereby + preventing any new inbound connections from being established. + """ + self._server_nursery.cancel_scope.cancel() + + @property + def accept_addr(self): + """Primary address to which the channel server is bound. + """ + try: + return self._listeners[0].socket.getsockname() + except OSError: + return + + def get_parent(self): + return Portal(self._parent_chan) + + def get_chans(self, actorid): + return self._peers[actorid] + + +class Arbiter(Actor): + """A special actor who knows all the other actors and always has + access to the top level nursery. + + The arbiter is by default the first actor spawned on each host + and is responsible for keeping track of all other actors for + coordination purposes. If a new main process is launched and an + arbiter is already running that arbiter will be used. + """ + _registry = defaultdict(list) + is_arbiter = True + + def find_actor(self, name): + return self._registry[name] + + def register_actor(self, name, sockaddr): + self._registry[name].append(sockaddr) + + def unregister_actor(self, name): + self._registry.pop(name, None) + + +async def _start_actor(actor, host, port, arbiter_addr, nursery=None): + """Spawn a local actor by starting a task to execute it's main + async function. + + Blocks if no nursery is provided, in which case it is expected the nursery + provider is responsible for waiting on the task to complete. + """ + # assign process-local actor + _state._current_actor = actor + + # start local channel-server and fake the portal API + # NOTE: this won't block since we provide the nursery + log.info(f"Starting local {actor} @ {host}:{port}") + + result = await actor._async_main( + accept_addr=(host, port), + parent_addr=None, + arbiter_addr=arbiter_addr, + nursery=nursery, + ) + # XXX: If spawned locally, the actor is cancelled when this + # context is complete given that there are no more active + # peer channels connected to it. + if not actor._outlive_main: + actor.cancel_server() + + # unset module state + _state._current_actor = None + log.info("Completed async main") + + return result + + +@asynccontextmanager +async def get_arbiter(host, port): + """Return a portal instance connected to a local or remote + arbiter. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + if actor.is_arbiter: + # we're already the arbiter + # (likely a re-entrant call from the arbiter actor) + yield LocalPortal(actor) + else: + async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal + + +@asynccontextmanager +async def find_actor( + name, + arbiter_sockaddr=None, +): + """Ask the arbiter to find actor(s) by name. + + Returns a connected portal to the last registered matching actor + known to the arbiter. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just + # the last one that registered + if sockaddrs: + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None diff --git a/tractor/ipc.py b/tractor/_ipc.py similarity index 100% rename from tractor/ipc.py rename to tractor/_ipc.py diff --git a/tractor/_portal.py b/tractor/_portal.py new file mode 100644 index 0000000..b324069 --- /dev/null +++ b/tractor/_portal.py @@ -0,0 +1,205 @@ +""" +Portal api +""" +import importlib + +import trio +from async_generator import asynccontextmanager + +from ._state import current_actor +from .log import get_logger + + +log = get_logger('tractor') + + +class RemoteActorError(RuntimeError): + "Remote actor exception bundled locally" + + +@asynccontextmanager +async def maybe_open_nursery(nursery=None): + """Create a new nursery if None provided. + + Blocks on exit as expected if no input nursery is provided. + """ + if nursery is not None: + yield nursery + else: + async with trio.open_nursery() as nursery: + yield nursery + + +async def _do_handshake(actor, chan): + await chan.send(actor.uid) + uid = await chan.recv() + + if not isinstance(uid, tuple): + raise ValueError(f"{uid} is not a valid uid?!") + + chan.uid = uid + log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + return uid + + +async def result_from_q(q, chan): + """Process a msg from a remote actor. + """ + first_msg = await q.get() + if 'return' in first_msg: + return 'return', first_msg, q + elif 'yield' in first_msg: + return 'yield', first_msg, q + elif 'error' in first_msg: + raise RemoteActorError(f"{chan.uid}\n" + first_msg['error']) + else: + raise ValueError(f"{first_msg} is an invalid response packet?") + + +class Portal: + """A 'portal' to a(n) (remote) ``Actor``. + + Allows for invoking remote routines and receiving results through an + underlying ``tractor.Channel`` as though the remote (async) + function / generator was invoked locally. + + Think of this like an native async IPC API. + """ + def __init__(self, channel): + self.channel = channel + self._result = None + + async def aclose(self): + log.debug(f"Closing {self}") + # XXX: won't work until https://github.com/python-trio/trio/pull/460 + # gets in! + await self.channel.aclose() + + async def run(self, ns, func, **kwargs): + """Submit a function to be scheduled and run by actor, return its + (stream of) result(s). + """ + # TODO: not this needs some serious work and thinking about how + # to make async-generators the fundamental IPC API over channels! + # (think `yield from`, `gen.send()`, and functional reactive stuff) + actor = current_actor() + # ship a function call request to the remote actor + cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) + # wait on first response msg and handle + return await self._return_from_resptype( + cid, *(await result_from_q(q, self.channel))) + + async def _return_from_resptype(self, cid, resptype, first_msg, q): + + if resptype == 'yield': + + async def yield_from_q(): + yield first_msg['yield'] + try: + async for msg in q: + try: + yield msg['yield'] + except KeyError: + if 'stop' in msg: + break # far end async gen terminated + else: + raise RemoteActorError(msg['error']) + except GeneratorExit: + log.debug( + f"Cancelling async gen call {cid} to " + f"{self.channel.uid}") + raise + + return yield_from_q() + + elif resptype == 'return': + return first_msg['return'] + else: + raise ValueError(f"Unknown msg response type: {first_msg}") + + async def result(self): + """Return the result(s) from the remote actor's "main" task. + """ + if self._result is None: + q = current_actor().get_waitq(self.channel.uid, 'main') + resptype, first_msg, q = (await result_from_q(q, self.channel)) + self._result = await self._return_from_resptype( + 'main', resptype, first_msg, q) + log.warn( + f"Retrieved first result `{self._result}` " + f"for {self.channel.uid}") + # await q.put(first_msg) # for next consumer (e.g. nursery) + return self._result + + async def close(self): + # trigger remote msg loop `break` + chan = self.channel + log.debug(f"Closing portal for {chan} to {chan.uid}") + await self.channel.send(None) + + async def cancel_actor(self): + """Cancel the actor on the other end of this portal. + """ + log.warn( + f"Sending cancel request to {self.channel.uid} on " + f"{self.channel}") + try: + with trio.move_on_after(0.1) as cancel_scope: + cancel_scope.shield = True + # send cancel cmd - might not get response + await self.run('self', 'cancel') + return True + except trio.ClosedStreamError: + log.warn( + f"{self.channel} for {self.channel.uid} was already closed?") + return False + + +class LocalPortal: + """A 'portal' to a local ``Actor``. + + A compatibility shim for normal portals but for invoking functions + using an in process actor instance. + """ + def __init__(self, actor): + self.actor = actor + + async def run(self, ns, func, **kwargs): + """Run a requested function locally and return it's result. + """ + obj = self.actor if ns == 'self' else importlib.import_module(ns) + func = getattr(obj, func) + return func(**kwargs) + + +@asynccontextmanager +async def open_portal(channel, nursery=None): + """Open a ``Portal`` through the provided ``channel``. + + Spawns a background task to handle message processing. + """ + actor = current_actor() + assert actor + was_connected = False + + async with maybe_open_nursery(nursery) as nursery: + + if not channel.connected(): + await channel.connect() + was_connected = True + + if channel.uid is None: + await _do_handshake(actor, channel) + + nursery.start_soon(actor._process_messages, channel) + portal = Portal(channel) + yield portal + + # cancel remote channel-msg loop + if channel.connected(): + await portal.close() + + # cancel background msg loop task + nursery.cancel_scope.cancel() + if was_connected: + await channel.aclose() diff --git a/tractor/_state.py b/tractor/_state.py new file mode 100644 index 0000000..a31b1cc --- /dev/null +++ b/tractor/_state.py @@ -0,0 +1,10 @@ +""" +Per process state +""" +_current_actor = None + + +def current_actor() -> 'Actor': + """Get the process-local actor instance. + """ + return _current_actor diff --git a/tractor/_trionics.py b/tractor/_trionics.py new file mode 100644 index 0000000..08856f9 --- /dev/null +++ b/tractor/_trionics.py @@ -0,0 +1,175 @@ +""" +``trio`` inspired apis and helpers +""" +import multiprocessing as mp + +import trio +from async_generator import asynccontextmanager + +from ._state import current_actor +from .log import get_logger, get_loglevel +from ._actor import Actor, ActorFailure +from ._portal import Portal + + +ctx = mp.get_context("forkserver") +log = get_logger('tractor') + + +class ActorNursery: + """Spawn scoped subprocess actors. + """ + def __init__(self, actor, supervisor=None): + self.supervisor = supervisor # TODO + self._actor = actor + # We'll likely want some way to cancel all sub-actors eventually + # self.cancel_scope = cancel_scope + self._children = {} + self.cancelled = False + + async def __aenter__(self): + return self + + async def start_actor( + self, + name: str, + main=None, + bind_addr=('127.0.0.1', 0), + statespace=None, + rpc_module_paths=None, + outlive_main=False, # sub-actors die when their main task completes + loglevel=None, # set log level per subactor + ): + loglevel = loglevel or self._actor.loglevel or get_loglevel() + actor = Actor( + name, + # modules allowed to invoked funcs from + rpc_module_paths=rpc_module_paths or [], + statespace=statespace, # global proc state vars + main=main, # main coroutine to be invoked + outlive_main=outlive_main, + loglevel=loglevel, + arbiter_addr=current_actor()._arb_addr, + ) + parent_addr = self._actor.accept_addr + assert parent_addr + proc = ctx.Process( + target=actor._fork_main, + args=(bind_addr, parent_addr), + # daemon=True, + name=name, + ) + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") + + log.info(f"Started {proc}") + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await self._actor.wait_for_peer(actor.uid) + portal = Portal(chan) + self._children[(name, proc.pid)] = (actor, proc, portal) + return portal + + async def wait(self): + """Wait for all subactors to complete. + """ + async def wait_for_proc(proc, actor, portal): + # TODO: timeout block here? + if proc.is_alive(): + await trio.hazmat.wait_readable(proc.sentinel) + # please god don't hang + proc.join() + log.debug(f"Joined {proc}") + event = self._actor._peers.get(actor.uid) + if isinstance(event, trio.Event): + event.set() + log.warn( + f"Cancelled `wait_for_peer()` call since {actor.uid}" + f" is already dead!") + if not portal._result: + log.debug(f"Faking result for {actor.uid}") + q = self._actor.get_waitq(actor.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) + + async def wait_for_result(portal): + if portal.channel.connected(): + log.debug(f"Waiting on final result from {subactor.uid}") + await portal.result() + + # unblocks when all waiter tasks have completed + async with trio.open_nursery() as nursery: + for subactor, proc, portal in self._children.values(): + nursery.start_soon(wait_for_proc, proc, subactor, portal) + nursery.start_soon(wait_for_result, portal) + + async def cancel(self, hard_kill=False): + """Cancel this nursery by instructing each subactor to cancel + iteslf and wait for all subprocesses to terminate. + + If ``hard_killl`` is set to ``True`` then kill the processes + directly without any far end graceful ``trio`` cancellation. + """ + log.debug(f"Cancelling nursery") + for subactor, proc, portal in self._children.values(): + if proc is mp.current_process(): + # XXX: does this even make sense? + await subactor.cancel() + else: + if hard_kill: + log.warn(f"Hard killing subactors {self._children}") + proc.terminate() + # XXX: doesn't seem to work? + # send KeyBoardInterrupt (trio abort signal) to sub-actors + # os.kill(proc.pid, signal.SIGINT) + else: + await portal.cancel_actor() + + log.debug(f"Waiting on all subactors to complete") + await self.wait() + self.cancelled = True + log.debug(f"All subactors for {self} have terminated") + + async def __aexit__(self, etype, value, tb): + """Wait on all subactor's main routines to complete. + """ + if etype is not None: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case the + # else block here might not complete? Should both be shielded? + if etype is trio.Cancelled: + with trio.open_cancel_scope(shield=True): + log.warn( + f"{current_actor().uid} was cancelled with {etype}" + ", cancelling actor nursery") + await self.cancel() + else: + log.exception( + f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") + await self.cancel() + else: + # XXX: this is effectively the lone cancellation/supervisor + # strategy which exactly mimicks trio's behaviour + log.debug(f"Waiting on subactors {self._children} to complete") + try: + await self.wait() + except Exception as err: + log.warn(f"Nursery caught {err}, cancelling") + await self.cancel() + raise + log.debug(f"Nursery teardown complete") + + +@asynccontextmanager +async def open_nursery(supervisor=None): + """Create and yield a new ``ActorNursery``. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + # TODO: figure out supervisors from erlang + async with ActorNursery(current_actor(), supervisor) as nursery: + yield nursery diff --git a/tractor/log.py b/tractor/log.py index 026716b..3b013d1 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -7,6 +7,7 @@ import logging import colorlog _proj_name = 'tractor' +_default_loglevel = None # Super sexy formatting thanks to ``colorlog``. # (NOTE: we use the '{' format style) @@ -89,3 +90,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: log.addHandler(handler) return log + + +def get_loglevel(): + return _default_loglevel