diff --git a/piker/tractor.py b/piker/tractor.py index 5a1fa653..6c6f395d 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -24,7 +24,7 @@ log = get_logger('tractor') _current_actor = None # for debugging -log = get_console_log('debug') +log = get_console_log('trace') class ActorFailure(Exception): @@ -50,7 +50,9 @@ async def maybe_open_nursery(nursery=None): async def _invoke( cid, chan, func, kwargs, - treat_as_gen=False, raise_errs=False): + treat_as_gen=False, raise_errs=False, + task_status=trio.TASK_STATUS_IGNORED +): """Invoke local func and return results over provided channel. """ try: @@ -76,17 +78,21 @@ async def _invoke( if treat_as_gen: # XXX: the async-func may spawn further tasks which push # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as above + # manualy construct the response dict-packet-responses as + # above await coro else: await chan.send({'return': await coro, 'cid': cid}) + + task_status.started() except Exception: if not raise_errs: await chan.send({'error': traceback.format_exc(), 'cid': cid}) else: raise -async def get_result(q): + +async def result_from_q(q): """Process a msg from a remote actor. """ first_msg = await q.get() @@ -117,6 +123,7 @@ class Actor: statespace: dict = {}, uid: str = None, allow_rpc: bool = True, + outlive_main: bool = False, ): self.uid = (name, uid or str(uuid.uuid1())) self.rpc_module_paths = rpc_module_paths @@ -126,9 +133,12 @@ class Actor: # @dataclass once we get py3.7 self.statespace = statespace self._allow_rpc = allow_rpc + self._outlive_main = outlive_main # filled in by `_async_main` after fork self._peers = {} + self._no_more_peers = trio.Event() + self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._listeners = [] self._parent_chan = None @@ -158,28 +168,26 @@ class Actor: """ Entry point for new inbound connections to the channel server. """ + self._no_more_peers.clear() chan = Channel(stream=stream) - log.info(f"New {chan} connected to us") + log.info(f"New connection to us {chan}") # send/receive initial handshake response await chan.send(self.uid) uid = await chan.recv() chan.uid = uid log.info(f"Handshake with actor {uid}@{chan.raddr} complete") - # XXX WTF!?!! THIS BLOCKS RANDOMLY? - # assert tuple(raddr) == chan.laddr - + # channel tracking event = self._peers.pop(uid, None) chan.event = event self._peers[uid] = chan - log.info(f"Registered {chan} for {uid}") - log.debug(f"Retrieved event {event}") + log.debug(f"Registered {chan} for {uid}") # Instructing connection: this is likely a new channel to # a recently spawned actor which we'd like to control via # async-rpc calls. if event and getattr(event, 'set', None): - log.info(f"Waking waiters of {event.statistics()}") + log.debug(f"Waking channel waiters {event.statistics()}") # Alert any task waiting on this connection to come up event.set() event.clear() # now consumer can wait on this channel to close @@ -189,10 +197,14 @@ class Actor: try: await self._process_messages(chan) finally: - # Drop ref to channel so it can be gc-ed - self._peers.pop(chan.uid, None) - chan.event.set() - log.debug(f"Releasing channel {chan}") + # Drop ref to channel so it can be gc-ed and disconnected + if chan is not self._parent_chan: + log.debug(f"Releasing channel {chan}") + self._peers.pop(chan.uid, None) + chan.event.set() # signal teardown/disconnection + if not self._peers: # no more channels connected + self._no_more_peers.set() + log.debug(f"No more peer channels") def _push_result(self, actorid, cid, msg): q = self.get_waitq(actorid, cid) @@ -201,20 +213,20 @@ class Actor: def get_waitq(self, actorid, cid): if actorid not in self._actors2calls: - log.warn(f"Caller id {cid} is not yet registered?") + log.debug(f"Registering for results from {actorid}") cids2qs = self._actors2calls.setdefault(actorid, {}) if cid not in cids2qs: - log.warn(f"Caller id {cid} is not yet registered?") + log.debug(f"Registering for result from call id {cid}") return cids2qs.setdefault(cid, trio.Queue(1000)) async def invoke_cmd(self, chan, ns, func, kwargs): """Invoke a remote command by sending a `cmd` message and waiting on the msg processing loop for its response(s). """ - cid = uuid.uuid1() + cid = str(uuid.uuid1()) q = self.get_waitq(chan.uid, cid) - await chan.send((ns, func, kwargs, self.uid, cid)) - return await get_result(q) + await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) + return await result_from_q(q) async def _process_messages(self, chan, treat_as_gen=False): """Process messages async-RPC style. @@ -224,6 +236,9 @@ class Actor: log.debug(f"Entering async-rpc loop for {chan}") async with trio.open_nursery() as nursery: async for msg in chan.aiter_recv(): + if msg is None: # terminate sentinel + log.debug(f"Terminating msg loop for {chan}") + break log.debug(f"Received msg {msg}") # try: cid = msg.get('cid') @@ -265,7 +280,6 @@ class Actor: def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` - log.info(f"self._peers are {self._peers}") log.info( f"Started new {ctx.current_process()} for actor {self.uid}") global _current_actor @@ -286,11 +300,10 @@ class Actor: async with maybe_open_nursery(nursery) as nursery: self._root_nursery = nursery - # Startup up channel server, optionally begin serving RPC - # requests from the parent. + # Startup up channel server host, port = accept_addr - await self._serve_forever( - nursery, accept_host=host, accept_port=port, + nursery.start_soon(partial( + self._serve_forever, accept_host=host, accept_port=port) ) if parent_addr is not None: @@ -303,36 +316,49 @@ class Actor: ) await chan.connect() - # initial handshake, report who we are, figure out who they are + # initial handshake, report who we are, who they are await chan.send(self.uid) uid = await chan.recv() if uid in self._peers: log.warn( f"already have channel for {uid} registered?" ) - else: - self._peers[uid] = chan + # else: + # self._peers[uid] = chan - # handle new connection back to parent + # handle new connection back to parent optionally + # begin responding to RPC if self._allow_rpc: self.load_namespaces() nursery.start_soon(self._process_messages, chan) if self.main: - log.debug(f"Starting main task `{self.main}`") if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") # start "main" routine in a task - nursery.start_soon( + await nursery.start( _invoke, 'main', self._parent_chan, self.main, {}, False, True # treat_as_gen, raise_errs params ) else: # run directly + log.debug(f"Running `{self.main}` directly") result = await self.main() - # blocks here as expected if no nursery was provided until - # the channel server is killed (i.e. this actor is - # cancelled or signalled by the parent actor) + # terminate local in-proc once its main completes + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() + log.debug(f"All peer channels are complete") + + # tear down channel server + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self._server_nursery.cancel_scope.cancel() + + # blocks here as expected if no nursery was provided until + # the channel server is killed (i.e. this actor is + # cancelled or signalled by the parent actor) except Exception: if self._parent_chan: log.exception("Actor errored:") @@ -345,7 +371,6 @@ class Actor: async def _serve_forever( self, - nursery, # spawns main func and channel server *, # (host, port) to bind for channel server accept_host=None, @@ -357,19 +382,24 @@ class Actor: """ log.debug(f"Starting tcp server on {accept_host}:{accept_port}") - listeners = await nursery.start( - partial( - trio.serve_tcp, - self._stream_handler, - handler_nursery=nursery, - port=accept_port, host=accept_host, + async with trio.open_nursery() as nursery: + self._server_nursery = nursery + # TODO: might want to consider having a separate nursery + # for the stream handler such that the server can be cancelled + # whilst leaving existing channels up + listeners = await nursery.start( + partial( + trio.serve_tcp, + self._stream_handler, + handler_nursery=self._root_nursery, + port=accept_port, host=accept_host, + ) ) - ) - self._listeners.extend(listeners) - log.debug(f"Spawned {listeners}") + self._listeners.extend(listeners) + log.debug(f"Spawned {listeners}") - # when launched in-process, trigger awaiter's completion - task_status.started() + # when launched in-process, trigger awaiter's completion + task_status.started() def cancel(self): """This cancels the internal root-most nursery thereby gracefully @@ -455,7 +485,7 @@ class Portal: if resptype == 'yield': async def yield_from_q(): - yield first + yield first_msg for msg in q: try: yield msg['yield'] @@ -506,6 +536,7 @@ class ActorNursery: statespace=None, rpc_module_paths=None, main=None, + outlive_main=False, # sub-actors die when their main task completes ): actor = Actor( name, @@ -513,6 +544,7 @@ class ActorNursery: rpc_module_paths=rpc_module_paths, statespace=statespace, # global proc state vars main=main, # main coroutine to be invoked + outlive_main=outlive_main, ) parent_addr = self._actor.accept_addr proc = ctx.Process( @@ -522,7 +554,6 @@ class ActorNursery: name=name, ) proc.start() - if not proc.is_alive(): raise ActorFailure("Couldn't start sub-actor?") @@ -548,27 +579,48 @@ class ActorNursery: # unblocks when all waiter tasks have completed async with trio.open_nursery() as nursery: - for actor, proc in self._children.values(): + for subactor, proc, main_q in self._children.values(): nursery.start_soon(wait_for_proc, proc) - async def cancel(self): - for actor, proc in self._children.values(): + async def cancel(self, hard_kill=False): + for subactor, proc, main_q in self._children.values(): if proc is mp.current_process(): - actor.cancel() + # XXX: does this even make sense? + subactor.cancel() else: - # send KeyBoardInterrupt (trio abort signal) to underlying - # sub-actors - proc.terminate() - # os.kill(proc.pid, signal.SIGINT) + if hard_kill: + proc.terminate() + # send KeyBoardInterrupt (trio abort signal) to underlying + # sub-actors + # os.kill(proc.pid, signal.SIGINT) + else: + # invoke cancel cmd + actor = self._actor + resptype, first_msg, q = await actor.invoke_cmd( + actor._peers[subactor.uid], # channel + 'self', + 'cancel', + {}, + ) + log.debug(f"Waiting on all subactors to complete") await self.wait() + log.debug(f"All subactors for {self} have terminated") async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ + async def wait_for_actor(actor, proc, q): + ret_type, msg, q = await result_from_q(q) + log.info(f"{actor.uid} main task completed with {msg}") + if not actor._outlive_main: + # trigger msg loop to break + log.info(f"Signalling msg loop exit for {actor.uid}") + await self._actor._peers[actor.uid].send(None) + async with trio.open_nursery() as nursery: - for subactor, proc, q in self._children.values(): - nursery.start_soon(get_result, q) + for subactor, proc, main_q in self._children.values(): + nursery.start_soon(wait_for_actor, subactor, proc, main_q) def current_actor() -> Actor: @@ -627,7 +679,6 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): statespace={}, # global proc state vars main=main, # main coroutine to be invoked ) - # assign process-local actor global _current_actor _current_actor = arbiter @@ -643,7 +694,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): yield LocalPortal(arbiter) # If spawned locally, the arbiter is cancelled when this context - # is complete (i.e the underlying context manager block completes) + # is complete? # nursery.cancel_scope.cancel()