Support re-entrant calls to `get_arbiter()`

It gets called more then once when using `tractor.run()` and
then `find_actor()`. Also, allow passing in the log level to each
new spawned subactor.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-06-25 17:41:30 -04:00
parent 8c5af7fd97
commit c0d8d4fd99
1 changed files with 49 additions and 36 deletions

View File

@ -16,7 +16,6 @@ from async_generator import asynccontextmanager
from .ipc import Channel from .ipc import Channel
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
ctx = mp.get_context("forkserver") ctx = mp.get_context("forkserver")
log = get_logger('tractor') log = get_logger('tractor')
@ -279,12 +278,14 @@ class Actor:
) )
log.debug(f"Exiting msg loop for {chan}") log.debug(f"Exiting msg loop for {chan}")
def _fork_main(self, accept_addr, parent_addr=None): def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
log.info( log.info(
f"Started new {ctx.current_process()} for actor {self.uid}") f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor global _current_actor
_current_actor = self _current_actor = self
if loglevel:
get_console_log(loglevel)
log.debug(f"parent_addr is {parent_addr}") log.debug(f"parent_addr is {parent_addr}")
trio.run( trio.run(
partial(self._async_main, accept_addr, parent_addr=parent_addr)) partial(self._async_main, accept_addr, parent_addr=parent_addr))
@ -421,7 +422,7 @@ class Actor:
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chan(self, actorid): def get_chan(self, actorid):
return self._peers[actorid] return self._peers.get(actorid)
class Arbiter(Actor): class Arbiter(Actor):
@ -494,7 +495,7 @@ class Portal:
async def yield_from_q(): async def yield_from_q():
yield first_msg['yield'] yield first_msg['yield']
for msg in q: async for msg in q:
try: try:
yield msg['yield'] yield msg['yield']
except KeyError: except KeyError:
@ -539,12 +540,14 @@ class ActorNursery:
return self return self
async def start_actor( async def start_actor(
self, name, self,
name: str,
main=None,
bind_addr=('127.0.0.1', 0), bind_addr=('127.0.0.1', 0),
statespace=None, statespace=None,
rpc_module_paths=None, rpc_module_paths=None,
main=None,
outlive_main=False, # sub-actors die when their main task completes outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set console logging per subactor
): ):
actor = Actor( actor = Actor(
name, name,
@ -557,7 +560,7 @@ class ActorNursery:
parent_addr = self._actor.accept_addr parent_addr = self._actor.accept_addr
proc = ctx.Process( proc = ctx.Process(
target=actor._fork_main, target=actor._fork_main,
args=(bind_addr, parent_addr), args=(bind_addr, parent_addr, loglevel),
daemon=True, daemon=True,
name=name, name=name,
) )
@ -606,12 +609,16 @@ class ActorNursery:
else: else:
# send cancel cmd - likely no response from subactor # send cancel cmd - likely no response from subactor
actor = self._actor actor = self._actor
chan = actor.get_chan(subactor.uid)
if chan:
cid, q = await actor.send_cmd( cid, q = await actor.send_cmd(
actor.get_chan(subactor.uid), # channel lookup chan, # channel lookup
'self', 'self',
'cancel', 'cancel',
{}, {},
) )
else:
log.warn(f"Channel for {subactor.uid} is already down?")
log.debug(f"Waiting on all subactors to complete") log.debug(f"Waiting on all subactors to complete")
await self.wait() await self.wait()
log.debug(f"All subactors for {self} have terminated") log.debug(f"All subactors for {self} have terminated")
@ -680,7 +687,7 @@ class NoArbiterFound:
@asynccontextmanager @asynccontextmanager
async def get_arbiter(host='127.0.0.1', port=1616, main=None): async def get_arbiter(host='127.0.0.1', port=1616, main=None, **kwargs):
actor = current_actor() actor = current_actor()
if actor and not actor.is_arbiter: if actor and not actor.is_arbiter:
try: try:
@ -690,12 +697,16 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
except OSError as err: except OSError as err:
raise NoArbiterFound(err) raise NoArbiterFound(err)
else: else:
# no arbiter found on this host so start one in-process if actor and actor.is_arbiter:
# we're already the arbiter (re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
arbiter = Arbiter( arbiter = Arbiter(
'arbiter', 'arbiter',
rpc_module_paths=[], # the arbiter doesn't allow module rpc # rpc_module_paths=[], # the arbiter doesn't allow module rpc
statespace={}, # global proc state vars # statespace={}, # global proc state vars
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
**kwargs,
) )
# assign process-local actor # assign process-local actor
global _current_actor global _current_actor
@ -714,6 +725,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
# XXX: If spawned locally, the arbiter is cancelled when this # XXX: If spawned locally, the arbiter is cancelled when this
# context is complete given that there are no more active # context is complete given that there are no more active
# peer channels connected to it. # peer channels connected to it.
if not arbiter._outlive_main:
arbiter.cancel_server() arbiter.cancel_server()
@ -735,7 +747,7 @@ async def find_actor(name):
async def _main(async_fn, args, kwargs, name): async def _main(async_fn, args, kwargs, name):
main = partial(async_fn, *args) main = partial(async_fn, *args) if async_fn else None
# Creates an internal nursery which shouldn't be cancelled even if # Creates an internal nursery which shouldn't be cancelled even if
# the one opened below is (this is desirable because the arbitter should # the one opened below is (this is desirable because the arbitter should
# stay up until a re-election process has taken place - which is not # stay up until a re-election process has taken place - which is not
@ -744,6 +756,7 @@ async def _main(async_fn, args, kwargs, name):
host=kwargs.pop('arbiter_host', '127.0.0.1'), host=kwargs.pop('arbiter_host', '127.0.0.1'),
port=kwargs.pop('arbiter_port', 1616), port=kwargs.pop('arbiter_port', 1616),
main=main, main=main,
**kwargs,
) as portal: ) as portal:
if not current_actor().is_arbiter: if not current_actor().is_arbiter:
# create a local actor and start it up its main routine # create a local actor and start it up its main routine