Support re-entrant calls to `get_arbiter()`

It gets called more then once when using `tractor.run()` and
then `find_actor()`. Also, allow passing in the log level to each
new spawned subactor.
asyncgen_closing_fix
Tyler Goodlet 2018-06-25 17:41:30 -04:00
parent 597546cf7b
commit 0aa49dcbdf
1 changed files with 49 additions and 36 deletions

View File

@ -16,7 +16,6 @@ from async_generator import asynccontextmanager
from .ipc import Channel from .ipc import Channel
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
ctx = mp.get_context("forkserver") ctx = mp.get_context("forkserver")
log = get_logger('tractor') log = get_logger('tractor')
@ -279,12 +278,14 @@ class Actor:
) )
log.debug(f"Exiting msg loop for {chan}") log.debug(f"Exiting msg loop for {chan}")
def _fork_main(self, accept_addr, parent_addr=None): def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
log.info( log.info(
f"Started new {ctx.current_process()} for actor {self.uid}") f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor global _current_actor
_current_actor = self _current_actor = self
if loglevel:
get_console_log(loglevel)
log.debug(f"parent_addr is {parent_addr}") log.debug(f"parent_addr is {parent_addr}")
trio.run( trio.run(
partial(self._async_main, accept_addr, parent_addr=parent_addr)) partial(self._async_main, accept_addr, parent_addr=parent_addr))
@ -421,7 +422,7 @@ class Actor:
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chan(self, actorid): def get_chan(self, actorid):
return self._peers[actorid] return self._peers.get(actorid)
class Arbiter(Actor): class Arbiter(Actor):
@ -494,7 +495,7 @@ class Portal:
async def yield_from_q(): async def yield_from_q():
yield first_msg['yield'] yield first_msg['yield']
for msg in q: async for msg in q:
try: try:
yield msg['yield'] yield msg['yield']
except KeyError: except KeyError:
@ -539,12 +540,14 @@ class ActorNursery:
return self return self
async def start_actor( async def start_actor(
self, name, self,
name: str,
main=None,
bind_addr=('127.0.0.1', 0), bind_addr=('127.0.0.1', 0),
statespace=None, statespace=None,
rpc_module_paths=None, rpc_module_paths=None,
main=None,
outlive_main=False, # sub-actors die when their main task completes outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set console logging per subactor
): ):
actor = Actor( actor = Actor(
name, name,
@ -557,7 +560,7 @@ class ActorNursery:
parent_addr = self._actor.accept_addr parent_addr = self._actor.accept_addr
proc = ctx.Process( proc = ctx.Process(
target=actor._fork_main, target=actor._fork_main,
args=(bind_addr, parent_addr), args=(bind_addr, parent_addr, loglevel),
daemon=True, daemon=True,
name=name, name=name,
) )
@ -606,12 +609,16 @@ class ActorNursery:
else: else:
# send cancel cmd - likely no response from subactor # send cancel cmd - likely no response from subactor
actor = self._actor actor = self._actor
cid, q = await actor.send_cmd( chan = actor.get_chan(subactor.uid)
actor.get_chan(subactor.uid), # channel lookup if chan:
'self', cid, q = await actor.send_cmd(
'cancel', chan, # channel lookup
{}, 'self',
) 'cancel',
{},
)
else:
log.warn(f"Channel for {subactor.uid} is already down?")
log.debug(f"Waiting on all subactors to complete") log.debug(f"Waiting on all subactors to complete")
await self.wait() await self.wait()
log.debug(f"All subactors for {self} have terminated") log.debug(f"All subactors for {self} have terminated")
@ -680,7 +687,7 @@ class NoArbiterFound:
@asynccontextmanager @asynccontextmanager
async def get_arbiter(host='127.0.0.1', port=1616, main=None): async def get_arbiter(host='127.0.0.1', port=1616, main=None, **kwargs):
actor = current_actor() actor = current_actor()
if actor and not actor.is_arbiter: if actor and not actor.is_arbiter:
try: try:
@ -690,31 +697,36 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
except OSError as err: except OSError as err:
raise NoArbiterFound(err) raise NoArbiterFound(err)
else: else:
# no arbiter found on this host so start one in-process if actor and actor.is_arbiter:
arbiter = Arbiter( # we're already the arbiter (re-entrant call from the arbiter actor)
'arbiter', yield LocalPortal(actor)
rpc_module_paths=[], # the arbiter doesn't allow module rpc else:
statespace={}, # global proc state vars arbiter = Arbiter(
main=main, # main coroutine to be invoked 'arbiter',
) # rpc_module_paths=[], # the arbiter doesn't allow module rpc
# assign process-local actor # statespace={}, # global proc state vars
global _current_actor main=main, # main coroutine to be invoked
_current_actor = arbiter **kwargs,
)
# assign process-local actor
global _current_actor
_current_actor = arbiter
# start the arbiter in process in a new task # start the arbiter in process in a new task
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
# start local channel-server and fake the portal API # start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery # NOTE: this won't block since we provide the nursery
await serve_local_actor( await serve_local_actor(
arbiter, nursery=nursery, accept_addr=(host, port)) arbiter, nursery=nursery, accept_addr=(host, port))
yield LocalPortal(arbiter) yield LocalPortal(arbiter)
# XXX: If spawned locally, the arbiter is cancelled when this # XXX: If spawned locally, the arbiter is cancelled when this
# context is complete given that there are no more active # context is complete given that there are no more active
# peer channels connected to it. # peer channels connected to it.
arbiter.cancel_server() if not arbiter._outlive_main:
arbiter.cancel_server()
@asynccontextmanager @asynccontextmanager
@ -735,7 +747,7 @@ async def find_actor(name):
async def _main(async_fn, args, kwargs, name): async def _main(async_fn, args, kwargs, name):
main = partial(async_fn, *args) main = partial(async_fn, *args) if async_fn else None
# Creates an internal nursery which shouldn't be cancelled even if # Creates an internal nursery which shouldn't be cancelled even if
# the one opened below is (this is desirable because the arbitter should # the one opened below is (this is desirable because the arbitter should
# stay up until a re-election process has taken place - which is not # stay up until a re-election process has taken place - which is not
@ -744,6 +756,7 @@ async def _main(async_fn, args, kwargs, name):
host=kwargs.pop('arbiter_host', '127.0.0.1'), host=kwargs.pop('arbiter_host', '127.0.0.1'),
port=kwargs.pop('arbiter_port', 1616), port=kwargs.pop('arbiter_port', 1616),
main=main, main=main,
**kwargs,
) as portal: ) as portal:
if not current_actor().is_arbiter: if not current_actor().is_arbiter:
# create a local actor and start it up its main routine # create a local actor and start it up its main routine