Add reliable subactor lifetime control

asyncgen_closing_fix
Tyler Goodlet 2018-06-22 23:30:55 -04:00
parent 5c70b4824f
commit 2c637db5b7
1 changed files with 111 additions and 60 deletions

View File

@ -24,7 +24,7 @@ log = get_logger('tractor')
_current_actor = None _current_actor = None
# for debugging # for debugging
log = get_console_log('debug') log = get_console_log('trace')
class ActorFailure(Exception): class ActorFailure(Exception):
@ -50,7 +50,9 @@ async def maybe_open_nursery(nursery=None):
async def _invoke( async def _invoke(
cid, chan, func, kwargs, cid, chan, func, kwargs,
treat_as_gen=False, raise_errs=False): treat_as_gen=False, raise_errs=False,
task_status=trio.TASK_STATUS_IGNORED
):
"""Invoke local func and return results over provided channel. """Invoke local func and return results over provided channel.
""" """
try: try:
@ -76,17 +78,21 @@ async def _invoke(
if treat_as_gen: if treat_as_gen:
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must # back values like an async-generator would but must
# manualy construct the response dict-packet-responses as above # manualy construct the response dict-packet-responses as
# above
await coro await coro
else: else:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
task_status.started()
except Exception: except Exception:
if not raise_errs: if not raise_errs:
await chan.send({'error': traceback.format_exc(), 'cid': cid}) await chan.send({'error': traceback.format_exc(), 'cid': cid})
else: else:
raise raise
async def get_result(q):
async def result_from_q(q):
"""Process a msg from a remote actor. """Process a msg from a remote actor.
""" """
first_msg = await q.get() first_msg = await q.get()
@ -117,6 +123,7 @@ class Actor:
statespace: dict = {}, statespace: dict = {},
uid: str = None, uid: str = None,
allow_rpc: bool = True, allow_rpc: bool = True,
outlive_main: bool = False,
): ):
self.uid = (name, uid or str(uuid.uuid1())) self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths self.rpc_module_paths = rpc_module_paths
@ -126,9 +133,12 @@ class Actor:
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.statespace = statespace self.statespace = statespace
self._allow_rpc = allow_rpc self._allow_rpc = allow_rpc
self._outlive_main = outlive_main
# filled in by `_async_main` after fork # filled in by `_async_main` after fork
self._peers = {} self._peers = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
self._listeners = [] self._listeners = []
self._parent_chan = None self._parent_chan = None
@ -158,28 +168,26 @@ class Actor:
""" """
Entry point for new inbound connections to the channel server. Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers.clear()
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New {chan} connected to us") log.info(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
await chan.send(self.uid) await chan.send(self.uid)
uid = await chan.recv() uid = await chan.recv()
chan.uid = uid chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete") log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
# XXX WTF!?!! THIS BLOCKS RANDOMLY? # channel tracking
# assert tuple(raddr) == chan.laddr
event = self._peers.pop(uid, None) event = self._peers.pop(uid, None)
chan.event = event chan.event = event
self._peers[uid] = chan self._peers[uid] = chan
log.info(f"Registered {chan} for {uid}") log.debug(f"Registered {chan} for {uid}")
log.debug(f"Retrieved event {event}")
# Instructing connection: this is likely a new channel to # Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via # a recently spawned actor which we'd like to control via
# async-rpc calls. # async-rpc calls.
if event and getattr(event, 'set', None): if event and getattr(event, 'set', None):
log.info(f"Waking waiters of {event.statistics()}") log.debug(f"Waking channel waiters {event.statistics()}")
# Alert any task waiting on this connection to come up # Alert any task waiting on this connection to come up
event.set() event.set()
event.clear() # now consumer can wait on this channel to close event.clear() # now consumer can wait on this channel to close
@ -189,10 +197,14 @@ class Actor:
try: try:
await self._process_messages(chan) await self._process_messages(chan)
finally: finally:
# Drop ref to channel so it can be gc-ed # Drop ref to channel so it can be gc-ed and disconnected
self._peers.pop(chan.uid, None) if chan is not self._parent_chan:
chan.event.set() log.debug(f"Releasing channel {chan}")
log.debug(f"Releasing channel {chan}") self._peers.pop(chan.uid, None)
chan.event.set() # signal teardown/disconnection
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug(f"No more peer channels")
def _push_result(self, actorid, cid, msg): def _push_result(self, actorid, cid, msg):
q = self.get_waitq(actorid, cid) q = self.get_waitq(actorid, cid)
@ -201,20 +213,20 @@ class Actor:
def get_waitq(self, actorid, cid): def get_waitq(self, actorid, cid):
if actorid not in self._actors2calls: if actorid not in self._actors2calls:
log.warn(f"Caller id {cid} is not yet registered?") log.debug(f"Registering for results from {actorid}")
cids2qs = self._actors2calls.setdefault(actorid, {}) cids2qs = self._actors2calls.setdefault(actorid, {})
if cid not in cids2qs: if cid not in cids2qs:
log.warn(f"Caller id {cid} is not yet registered?") log.debug(f"Registering for result from call id {cid}")
return cids2qs.setdefault(cid, trio.Queue(1000)) return cids2qs.setdefault(cid, trio.Queue(1000))
async def invoke_cmd(self, chan, ns, func, kwargs): async def invoke_cmd(self, chan, ns, func, kwargs):
"""Invoke a remote command by sending a `cmd` message and waiting """Invoke a remote command by sending a `cmd` message and waiting
on the msg processing loop for its response(s). on the msg processing loop for its response(s).
""" """
cid = uuid.uuid1() cid = str(uuid.uuid1())
q = self.get_waitq(chan.uid, cid) q = self.get_waitq(chan.uid, cid)
await chan.send((ns, func, kwargs, self.uid, cid)) await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return await get_result(q) return await result_from_q(q)
async def _process_messages(self, chan, treat_as_gen=False): async def _process_messages(self, chan, treat_as_gen=False):
"""Process messages async-RPC style. """Process messages async-RPC style.
@ -224,6 +236,9 @@ class Actor:
log.debug(f"Entering async-rpc loop for {chan}") log.debug(f"Entering async-rpc loop for {chan}")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
async for msg in chan.aiter_recv(): async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel
log.debug(f"Terminating msg loop for {chan}")
break
log.debug(f"Received msg {msg}") log.debug(f"Received msg {msg}")
# try: # try:
cid = msg.get('cid') cid = msg.get('cid')
@ -265,7 +280,6 @@ class Actor:
def _fork_main(self, accept_addr, parent_addr=None): def _fork_main(self, accept_addr, parent_addr=None):
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
log.info(f"self._peers are {self._peers}")
log.info( log.info(
f"Started new {ctx.current_process()} for actor {self.uid}") f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor global _current_actor
@ -286,11 +300,10 @@ class Actor:
async with maybe_open_nursery(nursery) as nursery: async with maybe_open_nursery(nursery) as nursery:
self._root_nursery = nursery self._root_nursery = nursery
# Startup up channel server, optionally begin serving RPC # Startup up channel server
# requests from the parent.
host, port = accept_addr host, port = accept_addr
await self._serve_forever( nursery.start_soon(partial(
nursery, accept_host=host, accept_port=port, self._serve_forever, accept_host=host, accept_port=port)
) )
if parent_addr is not None: if parent_addr is not None:
@ -303,36 +316,49 @@ class Actor:
) )
await chan.connect() await chan.connect()
# initial handshake, report who we are, figure out who they are # initial handshake, report who we are, who they are
await chan.send(self.uid) await chan.send(self.uid)
uid = await chan.recv() uid = await chan.recv()
if uid in self._peers: if uid in self._peers:
log.warn( log.warn(
f"already have channel for {uid} registered?" f"already have channel for {uid} registered?"
) )
else: # else:
self._peers[uid] = chan # self._peers[uid] = chan
# handle new connection back to parent # handle new connection back to parent optionally
# begin responding to RPC
if self._allow_rpc: if self._allow_rpc:
self.load_namespaces() self.load_namespaces()
nursery.start_soon(self._process_messages, chan) nursery.start_soon(self._process_messages, chan)
if self.main: if self.main:
log.debug(f"Starting main task `{self.main}`")
if self._parent_chan: if self._parent_chan:
log.debug(f"Starting main task `{self.main}`")
# start "main" routine in a task # start "main" routine in a task
nursery.start_soon( await nursery.start(
_invoke, 'main', self._parent_chan, self.main, {}, _invoke, 'main', self._parent_chan, self.main, {},
False, True # treat_as_gen, raise_errs params False, True # treat_as_gen, raise_errs params
) )
else: else:
# run directly # run directly
log.debug(f"Running `{self.main}` directly")
result = await self.main() result = await self.main()
# blocks here as expected if no nursery was provided until # terminate local in-proc once its main completes
# the channel server is killed (i.e. this actor is log.debug(
# cancelled or signalled by the parent actor) f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete")
# tear down channel server
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self._server_nursery.cancel_scope.cancel()
# blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is
# cancelled or signalled by the parent actor)
except Exception: except Exception:
if self._parent_chan: if self._parent_chan:
log.exception("Actor errored:") log.exception("Actor errored:")
@ -345,7 +371,6 @@ class Actor:
async def _serve_forever( async def _serve_forever(
self, self,
nursery, # spawns main func and channel server
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
accept_host=None, accept_host=None,
@ -357,19 +382,24 @@ class Actor:
""" """
log.debug(f"Starting tcp server on {accept_host}:{accept_port}") log.debug(f"Starting tcp server on {accept_host}:{accept_port}")
listeners = await nursery.start( async with trio.open_nursery() as nursery:
partial( self._server_nursery = nursery
trio.serve_tcp, # TODO: might want to consider having a separate nursery
self._stream_handler, # for the stream handler such that the server can be cancelled
handler_nursery=nursery, # whilst leaving existing channels up
port=accept_port, host=accept_host, listeners = await nursery.start(
partial(
trio.serve_tcp,
self._stream_handler,
handler_nursery=self._root_nursery,
port=accept_port, host=accept_host,
)
) )
) self._listeners.extend(listeners)
self._listeners.extend(listeners) log.debug(f"Spawned {listeners}")
log.debug(f"Spawned {listeners}")
# when launched in-process, trigger awaiter's completion # when launched in-process, trigger awaiter's completion
task_status.started() task_status.started()
def cancel(self): def cancel(self):
"""This cancels the internal root-most nursery thereby gracefully """This cancels the internal root-most nursery thereby gracefully
@ -455,7 +485,7 @@ class Portal:
if resptype == 'yield': if resptype == 'yield':
async def yield_from_q(): async def yield_from_q():
yield first yield first_msg
for msg in q: for msg in q:
try: try:
yield msg['yield'] yield msg['yield']
@ -506,6 +536,7 @@ class ActorNursery:
statespace=None, statespace=None,
rpc_module_paths=None, rpc_module_paths=None,
main=None, main=None,
outlive_main=False, # sub-actors die when their main task completes
): ):
actor = Actor( actor = Actor(
name, name,
@ -513,6 +544,7 @@ class ActorNursery:
rpc_module_paths=rpc_module_paths, rpc_module_paths=rpc_module_paths,
statespace=statespace, # global proc state vars statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
outlive_main=outlive_main,
) )
parent_addr = self._actor.accept_addr parent_addr = self._actor.accept_addr
proc = ctx.Process( proc = ctx.Process(
@ -522,7 +554,6 @@ class ActorNursery:
name=name, name=name,
) )
proc.start() proc.start()
if not proc.is_alive(): if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?") raise ActorFailure("Couldn't start sub-actor?")
@ -548,27 +579,48 @@ class ActorNursery:
# unblocks when all waiter tasks have completed # unblocks when all waiter tasks have completed
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for actor, proc in self._children.values(): for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_proc, proc) nursery.start_soon(wait_for_proc, proc)
async def cancel(self): async def cancel(self, hard_kill=False):
for actor, proc in self._children.values(): for subactor, proc, main_q in self._children.values():
if proc is mp.current_process(): if proc is mp.current_process():
actor.cancel() # XXX: does this even make sense?
subactor.cancel()
else: else:
# send KeyBoardInterrupt (trio abort signal) to underlying if hard_kill:
# sub-actors proc.terminate()
proc.terminate() # send KeyBoardInterrupt (trio abort signal) to underlying
# os.kill(proc.pid, signal.SIGINT) # sub-actors
# os.kill(proc.pid, signal.SIGINT)
else:
# invoke cancel cmd
actor = self._actor
resptype, first_msg, q = await actor.invoke_cmd(
actor._peers[subactor.uid], # channel
'self',
'cancel',
{},
)
log.debug(f"Waiting on all subactors to complete")
await self.wait() await self.wait()
log.debug(f"All subactors for {self} have terminated")
async def __aexit__(self, etype, value, tb): async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete. """Wait on all subactor's main routines to complete.
""" """
async def wait_for_actor(actor, proc, q):
ret_type, msg, q = await result_from_q(q)
log.info(f"{actor.uid} main task completed with {msg}")
if not actor._outlive_main:
# trigger msg loop to break
log.info(f"Signalling msg loop exit for {actor.uid}")
await self._actor._peers[actor.uid].send(None)
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, q in self._children.values(): for subactor, proc, main_q in self._children.values():
nursery.start_soon(get_result, q) nursery.start_soon(wait_for_actor, subactor, proc, main_q)
def current_actor() -> Actor: def current_actor() -> Actor:
@ -627,7 +679,6 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
statespace={}, # global proc state vars statespace={}, # global proc state vars
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
) )
# assign process-local actor # assign process-local actor
global _current_actor global _current_actor
_current_actor = arbiter _current_actor = arbiter
@ -643,7 +694,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
yield LocalPortal(arbiter) yield LocalPortal(arbiter)
# If spawned locally, the arbiter is cancelled when this context # If spawned locally, the arbiter is cancelled when this context
# is complete (i.e the underlying context manager block completes) # is complete?
# nursery.cancel_scope.cancel() # nursery.cancel_scope.cancel()