Add reliable subactor lifetime control
parent
ef90d7f106
commit
84cd29644e
171
piker/tractor.py
171
piker/tractor.py
|
@ -24,7 +24,7 @@ log = get_logger('tractor')
|
|||
_current_actor = None
|
||||
|
||||
# for debugging
|
||||
log = get_console_log('debug')
|
||||
log = get_console_log('trace')
|
||||
|
||||
|
||||
class ActorFailure(Exception):
|
||||
|
@ -50,7 +50,9 @@ async def maybe_open_nursery(nursery=None):
|
|||
|
||||
async def _invoke(
|
||||
cid, chan, func, kwargs,
|
||||
treat_as_gen=False, raise_errs=False):
|
||||
treat_as_gen=False, raise_errs=False,
|
||||
task_status=trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
"""Invoke local func and return results over provided channel.
|
||||
"""
|
||||
try:
|
||||
|
@ -76,17 +78,21 @@ async def _invoke(
|
|||
if treat_as_gen:
|
||||
# XXX: the async-func may spawn further tasks which push
|
||||
# back values like an async-generator would but must
|
||||
# manualy construct the response dict-packet-responses as above
|
||||
# manualy construct the response dict-packet-responses as
|
||||
# above
|
||||
await coro
|
||||
else:
|
||||
await chan.send({'return': await coro, 'cid': cid})
|
||||
|
||||
task_status.started()
|
||||
except Exception:
|
||||
if not raise_errs:
|
||||
await chan.send({'error': traceback.format_exc(), 'cid': cid})
|
||||
else:
|
||||
raise
|
||||
|
||||
async def get_result(q):
|
||||
|
||||
async def result_from_q(q):
|
||||
"""Process a msg from a remote actor.
|
||||
"""
|
||||
first_msg = await q.get()
|
||||
|
@ -117,6 +123,7 @@ class Actor:
|
|||
statespace: dict = {},
|
||||
uid: str = None,
|
||||
allow_rpc: bool = True,
|
||||
outlive_main: bool = False,
|
||||
):
|
||||
self.uid = (name, uid or str(uuid.uuid1()))
|
||||
self.rpc_module_paths = rpc_module_paths
|
||||
|
@ -126,9 +133,12 @@ class Actor:
|
|||
# @dataclass once we get py3.7
|
||||
self.statespace = statespace
|
||||
self._allow_rpc = allow_rpc
|
||||
self._outlive_main = outlive_main
|
||||
|
||||
# filled in by `_async_main` after fork
|
||||
self._peers = {}
|
||||
self._no_more_peers = trio.Event()
|
||||
self._no_more_peers.set()
|
||||
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
|
||||
self._listeners = []
|
||||
self._parent_chan = None
|
||||
|
@ -158,28 +168,26 @@ class Actor:
|
|||
"""
|
||||
Entry point for new inbound connections to the channel server.
|
||||
"""
|
||||
self._no_more_peers.clear()
|
||||
chan = Channel(stream=stream)
|
||||
log.info(f"New {chan} connected to us")
|
||||
log.info(f"New connection to us {chan}")
|
||||
# send/receive initial handshake response
|
||||
await chan.send(self.uid)
|
||||
uid = await chan.recv()
|
||||
chan.uid = uid
|
||||
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||
|
||||
# XXX WTF!?!! THIS BLOCKS RANDOMLY?
|
||||
# assert tuple(raddr) == chan.laddr
|
||||
|
||||
# channel tracking
|
||||
event = self._peers.pop(uid, None)
|
||||
chan.event = event
|
||||
self._peers[uid] = chan
|
||||
log.info(f"Registered {chan} for {uid}")
|
||||
log.debug(f"Retrieved event {event}")
|
||||
log.debug(f"Registered {chan} for {uid}")
|
||||
|
||||
# Instructing connection: this is likely a new channel to
|
||||
# a recently spawned actor which we'd like to control via
|
||||
# async-rpc calls.
|
||||
if event and getattr(event, 'set', None):
|
||||
log.info(f"Waking waiters of {event.statistics()}")
|
||||
log.debug(f"Waking channel waiters {event.statistics()}")
|
||||
# Alert any task waiting on this connection to come up
|
||||
event.set()
|
||||
event.clear() # now consumer can wait on this channel to close
|
||||
|
@ -189,10 +197,14 @@ class Actor:
|
|||
try:
|
||||
await self._process_messages(chan)
|
||||
finally:
|
||||
# Drop ref to channel so it can be gc-ed
|
||||
self._peers.pop(chan.uid, None)
|
||||
chan.event.set()
|
||||
log.debug(f"Releasing channel {chan}")
|
||||
# Drop ref to channel so it can be gc-ed and disconnected
|
||||
if chan is not self._parent_chan:
|
||||
log.debug(f"Releasing channel {chan}")
|
||||
self._peers.pop(chan.uid, None)
|
||||
chan.event.set() # signal teardown/disconnection
|
||||
if not self._peers: # no more channels connected
|
||||
self._no_more_peers.set()
|
||||
log.debug(f"No more peer channels")
|
||||
|
||||
def _push_result(self, actorid, cid, msg):
|
||||
q = self.get_waitq(actorid, cid)
|
||||
|
@ -201,20 +213,20 @@ class Actor:
|
|||
|
||||
def get_waitq(self, actorid, cid):
|
||||
if actorid not in self._actors2calls:
|
||||
log.warn(f"Caller id {cid} is not yet registered?")
|
||||
log.debug(f"Registering for results from {actorid}")
|
||||
cids2qs = self._actors2calls.setdefault(actorid, {})
|
||||
if cid not in cids2qs:
|
||||
log.warn(f"Caller id {cid} is not yet registered?")
|
||||
log.debug(f"Registering for result from call id {cid}")
|
||||
return cids2qs.setdefault(cid, trio.Queue(1000))
|
||||
|
||||
async def invoke_cmd(self, chan, ns, func, kwargs):
|
||||
"""Invoke a remote command by sending a `cmd` message and waiting
|
||||
on the msg processing loop for its response(s).
|
||||
"""
|
||||
cid = uuid.uuid1()
|
||||
cid = str(uuid.uuid1())
|
||||
q = self.get_waitq(chan.uid, cid)
|
||||
await chan.send((ns, func, kwargs, self.uid, cid))
|
||||
return await get_result(q)
|
||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||
return await result_from_q(q)
|
||||
|
||||
async def _process_messages(self, chan, treat_as_gen=False):
|
||||
"""Process messages async-RPC style.
|
||||
|
@ -224,6 +236,9 @@ class Actor:
|
|||
log.debug(f"Entering async-rpc loop for {chan}")
|
||||
async with trio.open_nursery() as nursery:
|
||||
async for msg in chan.aiter_recv():
|
||||
if msg is None: # terminate sentinel
|
||||
log.debug(f"Terminating msg loop for {chan}")
|
||||
break
|
||||
log.debug(f"Received msg {msg}")
|
||||
# try:
|
||||
cid = msg.get('cid')
|
||||
|
@ -265,7 +280,6 @@ class Actor:
|
|||
|
||||
def _fork_main(self, accept_addr, parent_addr=None):
|
||||
# after fork routine which invokes a fresh ``trio.run``
|
||||
log.info(f"self._peers are {self._peers}")
|
||||
log.info(
|
||||
f"Started new {ctx.current_process()} for actor {self.uid}")
|
||||
global _current_actor
|
||||
|
@ -286,11 +300,10 @@ class Actor:
|
|||
async with maybe_open_nursery(nursery) as nursery:
|
||||
self._root_nursery = nursery
|
||||
|
||||
# Startup up channel server, optionally begin serving RPC
|
||||
# requests from the parent.
|
||||
# Startup up channel server
|
||||
host, port = accept_addr
|
||||
await self._serve_forever(
|
||||
nursery, accept_host=host, accept_port=port,
|
||||
nursery.start_soon(partial(
|
||||
self._serve_forever, accept_host=host, accept_port=port)
|
||||
)
|
||||
|
||||
if parent_addr is not None:
|
||||
|
@ -303,36 +316,49 @@ class Actor:
|
|||
)
|
||||
await chan.connect()
|
||||
|
||||
# initial handshake, report who we are, figure out who they are
|
||||
# initial handshake, report who we are, who they are
|
||||
await chan.send(self.uid)
|
||||
uid = await chan.recv()
|
||||
if uid in self._peers:
|
||||
log.warn(
|
||||
f"already have channel for {uid} registered?"
|
||||
)
|
||||
else:
|
||||
self._peers[uid] = chan
|
||||
# else:
|
||||
# self._peers[uid] = chan
|
||||
|
||||
# handle new connection back to parent
|
||||
# handle new connection back to parent optionally
|
||||
# begin responding to RPC
|
||||
if self._allow_rpc:
|
||||
self.load_namespaces()
|
||||
nursery.start_soon(self._process_messages, chan)
|
||||
|
||||
if self.main:
|
||||
log.debug(f"Starting main task `{self.main}`")
|
||||
if self._parent_chan:
|
||||
log.debug(f"Starting main task `{self.main}`")
|
||||
# start "main" routine in a task
|
||||
nursery.start_soon(
|
||||
await nursery.start(
|
||||
_invoke, 'main', self._parent_chan, self.main, {},
|
||||
False, True # treat_as_gen, raise_errs params
|
||||
)
|
||||
else:
|
||||
# run directly
|
||||
log.debug(f"Running `{self.main}` directly")
|
||||
result = await self.main()
|
||||
|
||||
# blocks here as expected if no nursery was provided until
|
||||
# the channel server is killed (i.e. this actor is
|
||||
# cancelled or signalled by the parent actor)
|
||||
# terminate local in-proc once its main completes
|
||||
log.debug(
|
||||
f"Waiting for remaining peers {self._peers} to clear")
|
||||
await self._no_more_peers.wait()
|
||||
log.debug(f"All peer channels are complete")
|
||||
|
||||
# tear down channel server
|
||||
if not self._outlive_main:
|
||||
log.debug(f"Shutting down channel server")
|
||||
self._server_nursery.cancel_scope.cancel()
|
||||
|
||||
# blocks here as expected if no nursery was provided until
|
||||
# the channel server is killed (i.e. this actor is
|
||||
# cancelled or signalled by the parent actor)
|
||||
except Exception:
|
||||
if self._parent_chan:
|
||||
log.exception("Actor errored:")
|
||||
|
@ -345,7 +371,6 @@ class Actor:
|
|||
|
||||
async def _serve_forever(
|
||||
self,
|
||||
nursery, # spawns main func and channel server
|
||||
*,
|
||||
# (host, port) to bind for channel server
|
||||
accept_host=None,
|
||||
|
@ -357,19 +382,24 @@ class Actor:
|
|||
|
||||
"""
|
||||
log.debug(f"Starting tcp server on {accept_host}:{accept_port}")
|
||||
listeners = await nursery.start(
|
||||
partial(
|
||||
trio.serve_tcp,
|
||||
self._stream_handler,
|
||||
handler_nursery=nursery,
|
||||
port=accept_port, host=accept_host,
|
||||
async with trio.open_nursery() as nursery:
|
||||
self._server_nursery = nursery
|
||||
# TODO: might want to consider having a separate nursery
|
||||
# for the stream handler such that the server can be cancelled
|
||||
# whilst leaving existing channels up
|
||||
listeners = await nursery.start(
|
||||
partial(
|
||||
trio.serve_tcp,
|
||||
self._stream_handler,
|
||||
handler_nursery=self._root_nursery,
|
||||
port=accept_port, host=accept_host,
|
||||
)
|
||||
)
|
||||
)
|
||||
self._listeners.extend(listeners)
|
||||
log.debug(f"Spawned {listeners}")
|
||||
self._listeners.extend(listeners)
|
||||
log.debug(f"Spawned {listeners}")
|
||||
|
||||
# when launched in-process, trigger awaiter's completion
|
||||
task_status.started()
|
||||
# when launched in-process, trigger awaiter's completion
|
||||
task_status.started()
|
||||
|
||||
def cancel(self):
|
||||
"""This cancels the internal root-most nursery thereby gracefully
|
||||
|
@ -455,7 +485,7 @@ class Portal:
|
|||
if resptype == 'yield':
|
||||
|
||||
async def yield_from_q():
|
||||
yield first
|
||||
yield first_msg
|
||||
for msg in q:
|
||||
try:
|
||||
yield msg['yield']
|
||||
|
@ -506,6 +536,7 @@ class ActorNursery:
|
|||
statespace=None,
|
||||
rpc_module_paths=None,
|
||||
main=None,
|
||||
outlive_main=False, # sub-actors die when their main task completes
|
||||
):
|
||||
actor = Actor(
|
||||
name,
|
||||
|
@ -513,6 +544,7 @@ class ActorNursery:
|
|||
rpc_module_paths=rpc_module_paths,
|
||||
statespace=statespace, # global proc state vars
|
||||
main=main, # main coroutine to be invoked
|
||||
outlive_main=outlive_main,
|
||||
)
|
||||
parent_addr = self._actor.accept_addr
|
||||
proc = ctx.Process(
|
||||
|
@ -522,7 +554,6 @@ class ActorNursery:
|
|||
name=name,
|
||||
)
|
||||
proc.start()
|
||||
|
||||
if not proc.is_alive():
|
||||
raise ActorFailure("Couldn't start sub-actor?")
|
||||
|
||||
|
@ -548,27 +579,48 @@ class ActorNursery:
|
|||
|
||||
# unblocks when all waiter tasks have completed
|
||||
async with trio.open_nursery() as nursery:
|
||||
for actor, proc in self._children.values():
|
||||
for subactor, proc, main_q in self._children.values():
|
||||
nursery.start_soon(wait_for_proc, proc)
|
||||
|
||||
async def cancel(self):
|
||||
for actor, proc in self._children.values():
|
||||
async def cancel(self, hard_kill=False):
|
||||
for subactor, proc, main_q in self._children.values():
|
||||
if proc is mp.current_process():
|
||||
actor.cancel()
|
||||
# XXX: does this even make sense?
|
||||
subactor.cancel()
|
||||
else:
|
||||
# send KeyBoardInterrupt (trio abort signal) to underlying
|
||||
# sub-actors
|
||||
proc.terminate()
|
||||
# os.kill(proc.pid, signal.SIGINT)
|
||||
if hard_kill:
|
||||
proc.terminate()
|
||||
# send KeyBoardInterrupt (trio abort signal) to underlying
|
||||
# sub-actors
|
||||
# os.kill(proc.pid, signal.SIGINT)
|
||||
else:
|
||||
# invoke cancel cmd
|
||||
actor = self._actor
|
||||
resptype, first_msg, q = await actor.invoke_cmd(
|
||||
actor._peers[subactor.uid], # channel
|
||||
'self',
|
||||
'cancel',
|
||||
{},
|
||||
)
|
||||
|
||||
log.debug(f"Waiting on all subactors to complete")
|
||||
await self.wait()
|
||||
log.debug(f"All subactors for {self} have terminated")
|
||||
|
||||
async def __aexit__(self, etype, value, tb):
|
||||
"""Wait on all subactor's main routines to complete.
|
||||
"""
|
||||
async def wait_for_actor(actor, proc, q):
|
||||
ret_type, msg, q = await result_from_q(q)
|
||||
log.info(f"{actor.uid} main task completed with {msg}")
|
||||
if not actor._outlive_main:
|
||||
# trigger msg loop to break
|
||||
log.info(f"Signalling msg loop exit for {actor.uid}")
|
||||
await self._actor._peers[actor.uid].send(None)
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for subactor, proc, q in self._children.values():
|
||||
nursery.start_soon(get_result, q)
|
||||
for subactor, proc, main_q in self._children.values():
|
||||
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
|
||||
|
||||
|
||||
def current_actor() -> Actor:
|
||||
|
@ -627,7 +679,6 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
|
|||
statespace={}, # global proc state vars
|
||||
main=main, # main coroutine to be invoked
|
||||
)
|
||||
|
||||
# assign process-local actor
|
||||
global _current_actor
|
||||
_current_actor = arbiter
|
||||
|
@ -643,7 +694,7 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
|
|||
yield LocalPortal(arbiter)
|
||||
|
||||
# If spawned locally, the arbiter is cancelled when this context
|
||||
# is complete (i.e the underlying context manager block completes)
|
||||
# is complete?
|
||||
# nursery.cancel_scope.cancel()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue