diff --git a/piker/tractor.py b/piker/tractor.py index 1a048633..7ead6045 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -16,7 +16,6 @@ from async_generator import asynccontextmanager from .ipc import Channel from .log import get_console_log, get_logger - ctx = mp.get_context("forkserver") log = get_logger('tractor') @@ -279,12 +278,14 @@ class Actor: ) log.debug(f"Exiting msg loop for {chan}") - def _fork_main(self, accept_addr, parent_addr=None): + def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): # after fork routine which invokes a fresh ``trio.run`` log.info( f"Started new {ctx.current_process()} for actor {self.uid}") global _current_actor _current_actor = self + if loglevel: + get_console_log(loglevel) log.debug(f"parent_addr is {parent_addr}") trio.run( partial(self._async_main, accept_addr, parent_addr=parent_addr)) @@ -421,7 +422,7 @@ class Actor: return Portal(self._parent_chan) def get_chan(self, actorid): - return self._peers[actorid] + return self._peers.get(actorid) class Arbiter(Actor): @@ -494,7 +495,7 @@ class Portal: async def yield_from_q(): yield first_msg['yield'] - for msg in q: + async for msg in q: try: yield msg['yield'] except KeyError: @@ -539,12 +540,14 @@ class ActorNursery: return self async def start_actor( - self, name, + self, + name: str, + main=None, bind_addr=('127.0.0.1', 0), statespace=None, rpc_module_paths=None, - main=None, outlive_main=False, # sub-actors die when their main task completes + loglevel=None, # set console logging per subactor ): actor = Actor( name, @@ -557,7 +560,7 @@ class ActorNursery: parent_addr = self._actor.accept_addr proc = ctx.Process( target=actor._fork_main, - args=(bind_addr, parent_addr), + args=(bind_addr, parent_addr, loglevel), daemon=True, name=name, ) @@ -606,12 +609,16 @@ class ActorNursery: else: # send cancel cmd - likely no response from subactor actor = self._actor - cid, q = await actor.send_cmd( - actor.get_chan(subactor.uid), # channel lookup - 'self', - 'cancel', - {}, - ) + chan = actor.get_chan(subactor.uid) + if chan: + cid, q = await actor.send_cmd( + chan, # channel lookup + 'self', + 'cancel', + {}, + ) + else: + log.warn(f"Channel for {subactor.uid} is already down?") log.debug(f"Waiting on all subactors to complete") await self.wait() log.debug(f"All subactors for {self} have terminated") @@ -680,7 +687,7 @@ class NoArbiterFound: @asynccontextmanager -async def get_arbiter(host='127.0.0.1', port=1616, main=None): +async def get_arbiter(host='127.0.0.1', port=1616, main=None, **kwargs): actor = current_actor() if actor and not actor.is_arbiter: try: @@ -690,31 +697,36 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None): except OSError as err: raise NoArbiterFound(err) else: - # no arbiter found on this host so start one in-process - arbiter = Arbiter( - 'arbiter', - rpc_module_paths=[], # the arbiter doesn't allow module rpc - statespace={}, # global proc state vars - main=main, # main coroutine to be invoked - ) - # assign process-local actor - global _current_actor - _current_actor = arbiter + if actor and actor.is_arbiter: + # we're already the arbiter (re-entrant call from the arbiter actor) + yield LocalPortal(actor) + else: + arbiter = Arbiter( + 'arbiter', + # rpc_module_paths=[], # the arbiter doesn't allow module rpc + # statespace={}, # global proc state vars + main=main, # main coroutine to be invoked + **kwargs, + ) + # assign process-local actor + global _current_actor + _current_actor = arbiter - # start the arbiter in process in a new task - async with trio.open_nursery() as nursery: + # start the arbiter in process in a new task + async with trio.open_nursery() as nursery: - # start local channel-server and fake the portal API - # NOTE: this won't block since we provide the nursery - await serve_local_actor( - arbiter, nursery=nursery, accept_addr=(host, port)) + # start local channel-server and fake the portal API + # NOTE: this won't block since we provide the nursery + await serve_local_actor( + arbiter, nursery=nursery, accept_addr=(host, port)) - yield LocalPortal(arbiter) + yield LocalPortal(arbiter) - # XXX: If spawned locally, the arbiter is cancelled when this - # context is complete given that there are no more active - # peer channels connected to it. - arbiter.cancel_server() + # XXX: If spawned locally, the arbiter is cancelled when this + # context is complete given that there are no more active + # peer channels connected to it. + if not arbiter._outlive_main: + arbiter.cancel_server() @asynccontextmanager @@ -735,7 +747,7 @@ async def find_actor(name): async def _main(async_fn, args, kwargs, name): - main = partial(async_fn, *args) + main = partial(async_fn, *args) if async_fn else None # Creates an internal nursery which shouldn't be cancelled even if # the one opened below is (this is desirable because the arbitter should # stay up until a re-election process has taken place - which is not @@ -744,6 +756,7 @@ async def _main(async_fn, args, kwargs, name): host=kwargs.pop('arbiter_host', '127.0.0.1'), port=kwargs.pop('arbiter_port', 1616), main=main, + **kwargs, ) as portal: if not current_actor().is_arbiter: # create a local actor and start it up its main routine