diff --git a/piker/tractor.py b/piker/tractor.py index da9dfa8..5a1fa65 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -1,13 +1,14 @@ """ tracor: An actor model micro-framework. """ -import uuid -import inspect -import importlib -from functools import partial -import multiprocessing as mp -from typing import Coroutine from collections import defaultdict +from functools import partial +from typing import Coroutine +import importlib +import inspect +import multiprocessing as mp +import traceback +import uuid import trio from async_generator import asynccontextmanager @@ -19,6 +20,9 @@ from .log import get_console_log, get_logger ctx = mp.get_context("forkserver") log = get_logger('tractor') +# set at startup and after forks +_current_actor = None + # for debugging log = get_console_log('debug') @@ -27,8 +31,8 @@ class ActorFailure(Exception): "General actor failure" -# set at startup and after forks -_current_actor = None +class RemoteActorError(ActorFailure): + "Remote actor exception bundled locally" @asynccontextmanager @@ -44,6 +48,58 @@ async def maybe_open_nursery(nursery=None): yield nursery +async def _invoke( + cid, chan, func, kwargs, + treat_as_gen=False, raise_errs=False): + """Invoke local func and return results over provided channel. + """ + try: + is_async_func = False + if isinstance(func, partial): + is_async_func = inspect.iscoroutinefunction(func.func) + + if not inspect.iscoroutinefunction(func) and not is_async_func: + await chan.send({'return': func(**kwargs), 'cid': cid}) + else: + coro = func(**kwargs) + + if inspect.isasyncgen(coro): + # await chan.send('gen') + async for item in coro: + # TODO: can we send values back in here? + # How do we do it, spawn another task? + # to_send = await chan.recv() + # if to_send is not None: + # await coro.send(to_send) + await chan.send({'yield': item, 'cid': cid}) + else: + if treat_as_gen: + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as above + await coro + else: + await chan.send({'return': await coro, 'cid': cid}) + except Exception: + if not raise_errs: + await chan.send({'error': traceback.format_exc(), 'cid': cid}) + else: + raise + +async def get_result(q): + """Process a msg from a remote actor. + """ + first_msg = await q.get() + if 'return' in first_msg: + return 'return', first_msg, q + elif 'yield' in first_msg: + return 'yield', first_msg, q + elif 'error' in first_msg: + raise RemoteActorError(first_msg['error']) + else: + raise ValueError(f"{first_msg} is an invalid response packet?") + + class Actor: """The fundamental concurrency primitive. @@ -56,14 +112,14 @@ class Actor: def __init__( self, name: str, - namespaces: [str], - main: Coroutine, - statespace: dict, + main: Coroutine = None, + rpc_module_paths: [str] = [], + statespace: dict = {}, uid: str = None, allow_rpc: bool = True, ): self.uid = (name, uid or str(uuid.uuid1())) - self.namespaces = namespaces + self.rpc_module_paths = rpc_module_paths self._mods = {} self.main = main # TODO: consider making this a dynamically defined @@ -73,6 +129,7 @@ class Actor: # filled in by `_async_main` after fork self._peers = {} + self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._listeners = [] self._parent_chan = None self._accept_host = None @@ -91,33 +148,29 @@ class Actor: # be spawned on a different machine from the original nursery # and we need to try and load the local module code (if it # exists) - for path in self.namespaces: + for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) async def _stream_handler( self, stream: trio.SocketStream, ): - """Receive requests and deliver responses spinning up new - channels where necessary. - - Basically RPC with an async twist ;) + """ + Entry point for new inbound connections to the channel server. """ chan = Channel(stream=stream) log.info(f"New {chan} connected to us") # send/receive initial handshake response await chan.send(self.uid) uid = await chan.recv() + chan.uid = uid log.info(f"Handshake with actor {uid}@{chan.raddr} complete") # XXX WTF!?!! THIS BLOCKS RANDOMLY? # assert tuple(raddr) == chan.laddr - # execute main coroutine provided by spawner - if self.main: - await self.main(actor=self) - event = self._peers.pop(uid, None) + chan.event = event self._peers[uid] = chan log.info(f"Registered {chan} for {uid}") log.debug(f"Retrieved event {event}") @@ -126,59 +179,63 @@ class Actor: # a recently spawned actor which we'd like to control via # async-rpc calls. if event and getattr(event, 'set', None): - log.info(f"Waking waiters of {event.statistics()}") # Alert any task waiting on this connection to come up - # and don't manage channel messages as some external task is - # waiting to use the channel - # (usually an actor nursery) event.set() - event.clear() + event.clear() # now consumer can wait on this channel to close - # wait for channel consumer (usually a portal) to be - # done with the channel - await event.wait() - - # Drop ref to channel so it can be gc-ed - self._peers.pop(self._uid, None) - - # Remote controlled connection, we are likely a subactor - # being told what to do so manage the channel with async-rpc - else: + # Begin channel management - respond to remote requests and + # process received reponses. + try: await self._process_messages(chan) + finally: + # Drop ref to channel so it can be gc-ed + self._peers.pop(chan.uid, None) + chan.event.set() + log.debug(f"Releasing channel {chan}") + + def _push_result(self, actorid, cid, msg): + q = self.get_waitq(actorid, cid) + log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + q.put_nowait(msg) + + def get_waitq(self, actorid, cid): + if actorid not in self._actors2calls: + log.warn(f"Caller id {cid} is not yet registered?") + cids2qs = self._actors2calls.setdefault(actorid, {}) + if cid not in cids2qs: + log.warn(f"Caller id {cid} is not yet registered?") + return cids2qs.setdefault(cid, trio.Queue(1000)) + + async def invoke_cmd(self, chan, ns, func, kwargs): + """Invoke a remote command by sending a `cmd` message and waiting + on the msg processing loop for its response(s). + """ + cid = uuid.uuid1() + q = self.get_waitq(chan.uid, cid) + await chan.send((ns, func, kwargs, self.uid, cid)) + return await get_result(q) async def _process_messages(self, chan, treat_as_gen=False): - """Process inbound messages async-RPC style. + """Process messages async-RPC style. + + Process rpc requests and deliver retrieved responses from channels. """ - async def invoke(func, kwargs): - if not inspect.iscoroutinefunction(func): - await chan.send('func') - await chan.send(func(**kwargs)) - else: - coro = func(**kwargs) - - if inspect.isasyncgen(coro): - await chan.send('gen') - async for item in coro: - # TODO: can we send values back in here? - # How do we do it, spawn another task? - # to_send = await chan.recv() - # if to_send is not None: - # await coro.send(to_send) - await chan.send(item) - else: - if treat_as_gen: - await chan.send('gen') - else: - await chan.send('func') - - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would - await chan.send(await coro) - - log.debug(f"Entering async-rpc loop for {chan.laddr}->{chan.raddr}") + log.debug(f"Entering async-rpc loop for {chan}") async with trio.open_nursery() as nursery: - async for ns, funcname, kwargs, actorid in chan.aiter_recv(): + async for msg in chan.aiter_recv(): + log.debug(f"Received msg {msg}") + # try: + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter + self._push_result(chan.uid, cid, msg) + continue + else: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + # except Exception: + # await chan.send({'error': traceback.format_exc()}) + # break + log.debug( f"Processing request from {actorid}\n" f"{ns}.{funcname}({kwargs})") @@ -200,7 +257,11 @@ class Actor: # signature will be treated as one. treat_as_gen = True - nursery.start_soon(invoke, func, kwargs, name=funcname) + nursery.start_soon( + _invoke, cid, chan, func, kwargs, treat_as_gen, + name=funcname + ) + log.debug(f"Exiting msg loop for {chan}") def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` @@ -220,23 +281,67 @@ class Actor: A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ - async with maybe_open_nursery(nursery) as nursery: - self._root_nursery = nursery + result = None + try: + async with maybe_open_nursery(nursery) as nursery: + self._root_nursery = nursery - # Startup up channel server, optionally begin serving RPC - # requests from the parent. - host, port = accept_addr - await self._serve_forever( - nursery, accept_host=host, accept_port=port, - parent_addr=parent_addr - ) + # Startup up channel server, optionally begin serving RPC + # requests from the parent. + host, port = accept_addr + await self._serve_forever( + nursery, accept_host=host, accept_port=port, + ) - # start "main" routine in a task - if self.main: - await self.main(self) + if parent_addr is not None: + # Connect back to the parent actor and conduct initial + # handshake (From this point on if we error ship the + # exception back to the parent actor) + chan = self._parent_chan = Channel( + destaddr=parent_addr, + on_reconnect=self.main + ) + await chan.connect() - # blocks here as expected if no nursery was provided until - # the channel server is killed + # initial handshake, report who we are, figure out who they are + await chan.send(self.uid) + uid = await chan.recv() + if uid in self._peers: + log.warn( + f"already have channel for {uid} registered?" + ) + else: + self._peers[uid] = chan + + # handle new connection back to parent + if self._allow_rpc: + self.load_namespaces() + nursery.start_soon(self._process_messages, chan) + + if self.main: + log.debug(f"Starting main task `{self.main}`") + if self._parent_chan: + # start "main" routine in a task + nursery.start_soon( + _invoke, 'main', self._parent_chan, self.main, {}, + False, True # treat_as_gen, raise_errs params + ) + else: + # run directly + result = await self.main() + + # blocks here as expected if no nursery was provided until + # the channel server is killed (i.e. this actor is + # cancelled or signalled by the parent actor) + except Exception: + if self._parent_chan: + log.exception("Actor errored:") + await self._parent_chan.send( + {'error': traceback.format_exc(), 'cid': 'main'}) + else: + raise + + return result async def _serve_forever( self, @@ -245,7 +350,6 @@ class Actor: # (host, port) to bind for channel server accept_host=None, accept_port=0, - parent_addr=None, task_status=trio.TASK_STATUS_IGNORED ): """Main coroutine: connect back to the parent, spawn main task, begin @@ -264,31 +368,6 @@ class Actor: self._listeners.extend(listeners) log.debug(f"Spawned {listeners}") - if parent_addr is not None: - # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error ship the - # exception back to the parent actor) - chan = self._parent_chan = Channel( - destaddr=parent_addr, - on_reconnect=self.main - ) - await chan.connect() - - # initial handshake, report who we are, figure out who they are - await chan.send(self.uid) - uid = await chan.recv() - if uid in self._peers: - log.warn( - f"already have channel for {uid} registered?" - ) - else: - self._peers[uid] = chan - - # handle new connection back to parent - if self._allow_rpc: - self.load_namespaces() - nursery.start_soon(self._process_messages, chan) - # when launched in-process, trigger awaiter's completion task_status.started() @@ -339,7 +418,7 @@ class Portal: """ def __init__(self, channel, event=None): self.channel = channel - self._uid = None + self._uid = channel.uid self._event = event async def __aenter__(self): @@ -369,13 +448,26 @@ class Portal: # (think `yield from`, `gen.send()`, and functional reactive stuff) chan = self.channel # ship a function call request to the remote actor - await chan.send((ns, func, kwargs, _current_actor.uid)) - # get expected response type - functype = await chan.recv() - if functype == 'gen': - return chan.aiter_recv() + actor = current_actor() + + resptype, first_msg, q = await actor.invoke_cmd(chan, ns, func, kwargs) + + if resptype == 'yield': + + async def yield_from_q(): + yield first + for msg in q: + try: + yield msg['yield'] + except KeyError: + raise RemoteActorError(msg['error']) + + return yield_from_q() + + elif resptype == 'return': + return first_msg['return'] else: - return await chan.recv() + raise ValueError(f"Unknown msg response type: {first_msg}") class LocalPortal: @@ -398,9 +490,9 @@ class LocalPortal: class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, parent_actor, supervisor=None): + def __init__(self, actor, supervisor=None): self.supervisor = supervisor - self._parent_actor = parent_actor + self._actor = actor # We'll likely want some way to cancel all sub-actors eventually # self.cancel_scope = cancel_scope self._children = {} @@ -409,38 +501,43 @@ class ActorNursery: return self async def start_actor( - self, name, module_paths, + self, name, bind_addr=('127.0.0.1', 0), statespace=None, + rpc_module_paths=None, main=None, ): actor = Actor( name, - module_paths, # modules allowed to invoked funcs from + # modules allowed to invoked funcs from + rpc_module_paths=rpc_module_paths, statespace=statespace, # global proc state vars main=main, # main coroutine to be invoked ) - parent_addr = self._parent_actor.accept_addr + parent_addr = self._actor.accept_addr proc = ctx.Process( target=actor._fork_main, args=(bind_addr, parent_addr), daemon=True, name=name, ) - self._children[(name, proc.pid)] = (actor, proc) proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") + # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it - if proc.is_alive(): - event, chan = await self._parent_actor.wait_for_peer(actor.uid) - else: - raise ActorFailure("Couldn't start sub-actor?") + event, chan = await self._actor.wait_for_peer(actor.uid) + # channel is up, get queue which delivers result from main routine + main_q = self._actor.get_waitq(actor.uid, 'main') + self._children[(name, proc.pid)] = (actor, proc, main_q) - return Portal(chan) + return Portal(chan, event=event) + + async def wait(self): - async def cancel(self): async def wait_for_proc(proc): # TODO: timeout block here? if proc.is_alive(): @@ -452,17 +549,26 @@ class ActorNursery: # unblocks when all waiter tasks have completed async with trio.open_nursery() as nursery: for actor, proc in self._children.values(): - if proc is mp.current_process(): - actor.cancel() - else: - # send KeyBoardInterrupt (trio abort signal) to underlying - # sub-actors - proc.terminate() - # os.kill(proc.pid, signal.SIGINT) - nursery.start_soon(wait_for_proc, proc) + nursery.start_soon(wait_for_proc, proc) + + async def cancel(self): + for actor, proc in self._children.values(): + if proc is mp.current_process(): + actor.cancel() + else: + # send KeyBoardInterrupt (trio abort signal) to underlying + # sub-actors + proc.terminate() + # os.kill(proc.pid, signal.SIGINT) + + await self.wait() async def __aexit__(self, etype, value, tb): - await self.cancel() + """Wait on all subactor's main routines to complete. + """ + async with trio.open_nursery() as nursery: + for subactor, proc, q in self._children.values(): + nursery.start_soon(get_result, q) def current_actor() -> Actor: @@ -517,7 +623,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): # no arbiter found on this host so start one in-process arbiter = Arbiter( 'arbiter', - namespaces=[], # the arbiter doesn't allow module rpc + rpc_module_paths=[], # the arbiter doesn't allow module rpc statespace={}, # global proc state vars main=main, # main coroutine to be invoked ) @@ -538,7 +644,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): # If spawned locally, the arbiter is cancelled when this context # is complete (i.e the underlying context manager block completes) - nursery.cancel_scope.cancel() + # nursery.cancel_scope.cancel() @asynccontextmanager @@ -559,27 +665,26 @@ async def find_actor(name): async def _main(async_fn, args, kwargs, name): + main = partial(async_fn, *args) # Creates an internal nursery which shouldn't be cancelled even if # the one opened below is (this is desirable because the arbitter should # stay up until a re-election process has taken place - which is not # implemented yet FYI). async with get_arbiter( - host=kwargs.get('arbiter_host', '127.0.0.1'), - port=kwargs.get('arbiter_port', 1616), - main=partial(async_fn, *args, **kwargs) + host=kwargs.pop('arbiter_host', '127.0.0.1'), + port=kwargs.pop('arbiter_port', 1616), + main=main, ) as portal: if not current_actor().is_arbiter: # create a local actor and start it up its main routine actor = Actor( name or 'anonymous', - # namespaces=kwargs.get('namespaces'), - # statespace=kwargs.get('statespace'), - # main=async_fn, # main coroutine to be invoked + main=main, # main coroutine to be invoked **kwargs ) # this will block and yield control to the `trio` run loop await serve_local_actor( - actor, accept_addr=kwargs.get('accept_addr', (None, 0))) + actor, accept_addr=kwargs.pop('accept_addr', (None, 0))) log.info("Completed async main") else: # block waiting for the arbiter main task to complete