diff --git a/piker/tractor.py b/piker/tractor.py index 6c6f395..b660251 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -24,7 +24,7 @@ log = get_logger('tractor') _current_actor = None # for debugging -log = get_console_log('trace') +log = get_console_log('info') class ActorFailure(Exception): @@ -219,14 +219,16 @@ class Actor: log.debug(f"Registering for result from call id {cid}") return cids2qs.setdefault(cid, trio.Queue(1000)) - async def invoke_cmd(self, chan, ns, func, kwargs): - """Invoke a remote command by sending a `cmd` message and waiting - on the msg processing loop for its response(s). + async def send_cmd(self, chan, ns, func, kwargs): + """Send a ``'cmd'`` message to a remote actor and return a + caller id and a ``trio.Queue`` that can be used to wait for + responses delivered by the local message processing loop. """ cid = str(uuid.uuid1()) q = self.get_waitq(chan.uid, cid) + log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return await result_from_q(q) + return cid, q async def _process_messages(self, chan, treat_as_gen=False): """Process messages async-RPC style. @@ -240,16 +242,15 @@ class Actor: log.debug(f"Terminating msg loop for {chan}") break log.debug(f"Received msg {msg}") - # try: cid = msg.get('cid') if cid: # deliver response to local caller/waiter self._push_result(chan.uid, cid, msg) + if 'error' in msg: + # TODO: need something better then this slop + raise RemoteActorError(msg['error']) continue else: ns, funcname, kwargs, actorid, cid = msg['cmd'] - # except Exception: - # await chan.send({'error': traceback.format_exc()}) - # break log.debug( f"Processing request from {actorid}\n" @@ -264,7 +265,10 @@ class Actor: sig = inspect.signature(func) treat_as_gen = False if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" kwargs['chan'] = chan + kwargs['cid'] = cid # TODO: eventually we want to be more stringent # about what is considered a far-end async-generator. # Right now both actual async gens and any async @@ -302,7 +306,7 @@ class Actor: # Startup up channel server host, port = accept_addr - nursery.start_soon(partial( + await nursery.start(partial( self._serve_forever, accept_host=host, accept_port=port) ) @@ -323,8 +327,6 @@ class Actor: log.warn( f"already have channel for {uid} registered?" ) - # else: - # self._peers[uid] = chan # handle new connection back to parent optionally # begin responding to RPC @@ -354,7 +356,7 @@ class Actor: # tear down channel server if not self._outlive_main: log.debug(f"Shutting down channel server") - self._server_nursery.cancel_scope.cancel() + self.cancel_server() # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -397,8 +399,6 @@ class Actor: ) self._listeners.extend(listeners) log.debug(f"Spawned {listeners}") - - # when launched in-process, trigger awaiter's completion task_status.started() def cancel(self): @@ -407,6 +407,12 @@ class Actor: """ self._root_nursery.cancel_scope.cancel() + def cancel_server(self): + """Cancel the internal channel server nursery thereby + preventing any new inbound connections from being established. + """ + self._server_nursery.cancel_scope.cancel() + @property def accept_addr(self): """Primary address to which the channel server is bound. @@ -417,6 +423,9 @@ class Actor: def get_parent(self): return Portal(self._parent_chan) + def get_chan(self, actorid): + return self._peers[actorid] + class Arbiter(Actor): """A special actor who knows all the other actors and always has @@ -480,12 +489,14 @@ class Portal: # ship a function call request to the remote actor actor = current_actor() - resptype, first_msg, q = await actor.invoke_cmd(chan, ns, func, kwargs) + cid, q = await actor.send_cmd(chan, ns, func, kwargs) + # wait on first response msg + resptype, first_msg, q = await result_from_q(q) if resptype == 'yield': async def yield_from_q(): - yield first_msg + yield first_msg['yield'] for msg in q: try: yield msg['yield'] @@ -583,26 +594,27 @@ class ActorNursery: nursery.start_soon(wait_for_proc, proc) async def cancel(self, hard_kill=False): + log.debug(f"Cancelling nursery") for subactor, proc, main_q in self._children.values(): if proc is mp.current_process(): # XXX: does this even make sense? - subactor.cancel() + await subactor.cancel() else: if hard_kill: + log.warn(f"Hard killing subactors {self._children}") proc.terminate() # send KeyBoardInterrupt (trio abort signal) to underlying # sub-actors # os.kill(proc.pid, signal.SIGINT) else: - # invoke cancel cmd + # send cancel cmd - likely no response from subactor actor = self._actor - resptype, first_msg, q = await actor.invoke_cmd( - actor._peers[subactor.uid], # channel + cid, q = await actor.send_cmd( + actor.get_chan(subactor.uid), # channel lookup 'self', 'cancel', {}, ) - log.debug(f"Waiting on all subactors to complete") await self.wait() log.debug(f"All subactors for {self} have terminated") @@ -611,16 +623,25 @@ class ActorNursery: """Wait on all subactor's main routines to complete. """ async def wait_for_actor(actor, proc, q): - ret_type, msg, q = await result_from_q(q) - log.info(f"{actor.uid} main task completed with {msg}") - if not actor._outlive_main: - # trigger msg loop to break - log.info(f"Signalling msg loop exit for {actor.uid}") - await self._actor._peers[actor.uid].send(None) + if proc.is_alive(): + ret_type, msg, q = await result_from_q(q) + log.info(f"{actor.uid} main task completed with {msg}") + if not actor._outlive_main: + # trigger msg loop to break + log.info(f"Signalling msg loop exit for {actor.uid}") + await self._actor.get_chan(actor.uid).send(None) - async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_actor, subactor, proc, main_q) + if etype is not None: + log.warn(f"{current_actor().uid} errored with {etype}, " + "cancelling nursery") + await self.cancel() + else: + log.debug(f"Waiting on subactors to complete") + async with trio.open_nursery() as nursery: + for subactor, proc, main_q in self._children.values(): + nursery.start_soon(wait_for_actor, subactor, proc, main_q) + + log.debug(f"Nursery teardown complete") def current_actor() -> Actor: @@ -693,9 +714,10 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): yield LocalPortal(arbiter) - # If spawned locally, the arbiter is cancelled when this context - # is complete? - # nursery.cancel_scope.cancel() + # XXX: If spawned locally, the arbiter is cancelled when this + # context is complete given that there are no more active + # peer channels connected to it. + arbiter.cancel_server() @asynccontextmanager @@ -737,6 +759,8 @@ async def _main(async_fn, args, kwargs, name): await serve_local_actor( actor, accept_addr=kwargs.pop('accept_addr', (None, 0))) log.info("Completed async main") + # TODO: when the local actor's main has completed we cancel? + # actor.cancel() else: # block waiting for the arbiter main task to complete pass