forked from goodboy/tractor
1
0
Fork 0

Fix actor nursery __exit__ handling

When an error is raised inside a nursery block (in the local actor)
cancel all spawned children and ensure the error is properly
unsuppressed.

Also,
- change `invoke_cmd` to `send_cmd` and expect the caller to use
  `result_from_q` (avoids implicit blocking for responses that might
  never arrive)
- `nursery.start()` the channel server block such that we wait for the
  underlying listener to spawn before making outbound connections
- cancel the channel server when an actor's main task completes
  (given that `outlive_main == False`)
- raise subactor errors directly in the local actors's msg loop
- enforce that `treat_as_gen` async functions respond with a caller id
  (`cid`) in each yield packet
asyncgen_closing_fix
Tyler Goodlet 2018-06-23 13:48:04 -04:00
parent 2c637db5b7
commit e9422fa001
1 changed files with 58 additions and 34 deletions

View File

@ -24,7 +24,7 @@ log = get_logger('tractor')
_current_actor = None _current_actor = None
# for debugging # for debugging
log = get_console_log('trace') log = get_console_log('info')
class ActorFailure(Exception): class ActorFailure(Exception):
@ -219,14 +219,16 @@ class Actor:
log.debug(f"Registering for result from call id {cid}") log.debug(f"Registering for result from call id {cid}")
return cids2qs.setdefault(cid, trio.Queue(1000)) return cids2qs.setdefault(cid, trio.Queue(1000))
async def invoke_cmd(self, chan, ns, func, kwargs): async def send_cmd(self, chan, ns, func, kwargs):
"""Invoke a remote command by sending a `cmd` message and waiting """Send a ``'cmd'`` message to a remote actor and return a
on the msg processing loop for its response(s). caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop.
""" """
cid = str(uuid.uuid1()) cid = str(uuid.uuid1())
q = self.get_waitq(chan.uid, cid) q = self.get_waitq(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return await result_from_q(q) return cid, q
async def _process_messages(self, chan, treat_as_gen=False): async def _process_messages(self, chan, treat_as_gen=False):
"""Process messages async-RPC style. """Process messages async-RPC style.
@ -240,16 +242,15 @@ class Actor:
log.debug(f"Terminating msg loop for {chan}") log.debug(f"Terminating msg loop for {chan}")
break break
log.debug(f"Received msg {msg}") log.debug(f"Received msg {msg}")
# try:
cid = msg.get('cid') cid = msg.get('cid')
if cid: # deliver response to local caller/waiter if cid: # deliver response to local caller/waiter
self._push_result(chan.uid, cid, msg) self._push_result(chan.uid, cid, msg)
if 'error' in msg:
# TODO: need something better then this slop
raise RemoteActorError(msg['error'])
continue continue
else: else:
ns, funcname, kwargs, actorid, cid = msg['cmd'] ns, funcname, kwargs, actorid, cid = msg['cmd']
# except Exception:
# await chan.send({'error': traceback.format_exc()})
# break
log.debug( log.debug(
f"Processing request from {actorid}\n" f"Processing request from {actorid}\n"
@ -264,7 +265,10 @@ class Actor:
sig = inspect.signature(func) sig = inspect.signature(func)
treat_as_gen = False treat_as_gen = False
if 'chan' in sig.parameters: if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent # TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator. # about what is considered a far-end async-generator.
# Right now both actual async gens and any async # Right now both actual async gens and any async
@ -302,7 +306,7 @@ class Actor:
# Startup up channel server # Startup up channel server
host, port = accept_addr host, port = accept_addr
nursery.start_soon(partial( await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port) self._serve_forever, accept_host=host, accept_port=port)
) )
@ -323,8 +327,6 @@ class Actor:
log.warn( log.warn(
f"already have channel for {uid} registered?" f"already have channel for {uid} registered?"
) )
# else:
# self._peers[uid] = chan
# handle new connection back to parent optionally # handle new connection back to parent optionally
# begin responding to RPC # begin responding to RPC
@ -354,7 +356,7 @@ class Actor:
# tear down channel server # tear down channel server
if not self._outlive_main: if not self._outlive_main:
log.debug(f"Shutting down channel server") log.debug(f"Shutting down channel server")
self._server_nursery.cancel_scope.cancel() self.cancel_server()
# blocks here as expected if no nursery was provided until # blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is # the channel server is killed (i.e. this actor is
@ -397,8 +399,6 @@ class Actor:
) )
self._listeners.extend(listeners) self._listeners.extend(listeners)
log.debug(f"Spawned {listeners}") log.debug(f"Spawned {listeners}")
# when launched in-process, trigger awaiter's completion
task_status.started() task_status.started()
def cancel(self): def cancel(self):
@ -407,6 +407,12 @@ class Actor:
""" """
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
def cancel_server(self):
"""Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established.
"""
self._server_nursery.cancel_scope.cancel()
@property @property
def accept_addr(self): def accept_addr(self):
"""Primary address to which the channel server is bound. """Primary address to which the channel server is bound.
@ -417,6 +423,9 @@ class Actor:
def get_parent(self): def get_parent(self):
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chan(self, actorid):
return self._peers[actorid]
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has
@ -480,12 +489,14 @@ class Portal:
# ship a function call request to the remote actor # ship a function call request to the remote actor
actor = current_actor() actor = current_actor()
resptype, first_msg, q = await actor.invoke_cmd(chan, ns, func, kwargs) cid, q = await actor.send_cmd(chan, ns, func, kwargs)
# wait on first response msg
resptype, first_msg, q = await result_from_q(q)
if resptype == 'yield': if resptype == 'yield':
async def yield_from_q(): async def yield_from_q():
yield first_msg yield first_msg['yield']
for msg in q: for msg in q:
try: try:
yield msg['yield'] yield msg['yield']
@ -583,26 +594,27 @@ class ActorNursery:
nursery.start_soon(wait_for_proc, proc) nursery.start_soon(wait_for_proc, proc)
async def cancel(self, hard_kill=False): async def cancel(self, hard_kill=False):
log.debug(f"Cancelling nursery")
for subactor, proc, main_q in self._children.values(): for subactor, proc, main_q in self._children.values():
if proc is mp.current_process(): if proc is mp.current_process():
# XXX: does this even make sense? # XXX: does this even make sense?
subactor.cancel() await subactor.cancel()
else: else:
if hard_kill: if hard_kill:
log.warn(f"Hard killing subactors {self._children}")
proc.terminate() proc.terminate()
# send KeyBoardInterrupt (trio abort signal) to underlying # send KeyBoardInterrupt (trio abort signal) to underlying
# sub-actors # sub-actors
# os.kill(proc.pid, signal.SIGINT) # os.kill(proc.pid, signal.SIGINT)
else: else:
# invoke cancel cmd # send cancel cmd - likely no response from subactor
actor = self._actor actor = self._actor
resptype, first_msg, q = await actor.invoke_cmd( cid, q = await actor.send_cmd(
actor._peers[subactor.uid], # channel actor.get_chan(subactor.uid), # channel lookup
'self', 'self',
'cancel', 'cancel',
{}, {},
) )
log.debug(f"Waiting on all subactors to complete") log.debug(f"Waiting on all subactors to complete")
await self.wait() await self.wait()
log.debug(f"All subactors for {self} have terminated") log.debug(f"All subactors for {self} have terminated")
@ -611,16 +623,25 @@ class ActorNursery:
"""Wait on all subactor's main routines to complete. """Wait on all subactor's main routines to complete.
""" """
async def wait_for_actor(actor, proc, q): async def wait_for_actor(actor, proc, q):
ret_type, msg, q = await result_from_q(q) if proc.is_alive():
log.info(f"{actor.uid} main task completed with {msg}") ret_type, msg, q = await result_from_q(q)
if not actor._outlive_main: log.info(f"{actor.uid} main task completed with {msg}")
# trigger msg loop to break if not actor._outlive_main:
log.info(f"Signalling msg loop exit for {actor.uid}") # trigger msg loop to break
await self._actor._peers[actor.uid].send(None) log.info(f"Signalling msg loop exit for {actor.uid}")
await self._actor.get_chan(actor.uid).send(None)
async with trio.open_nursery() as nursery: if etype is not None:
for subactor, proc, main_q in self._children.values(): log.warn(f"{current_actor().uid} errored with {etype}, "
nursery.start_soon(wait_for_actor, subactor, proc, main_q) "cancelling nursery")
await self.cancel()
else:
log.debug(f"Waiting on subactors to complete")
async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
log.debug(f"Nursery teardown complete")
def current_actor() -> Actor: def current_actor() -> Actor:
@ -693,9 +714,10 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None):
yield LocalPortal(arbiter) yield LocalPortal(arbiter)
# If spawned locally, the arbiter is cancelled when this context # XXX: If spawned locally, the arbiter is cancelled when this
# is complete? # context is complete given that there are no more active
# nursery.cancel_scope.cancel() # peer channels connected to it.
arbiter.cancel_server()
@asynccontextmanager @asynccontextmanager
@ -737,6 +759,8 @@ async def _main(async_fn, args, kwargs, name):
await serve_local_actor( await serve_local_actor(
actor, accept_addr=kwargs.pop('accept_addr', (None, 0))) actor, accept_addr=kwargs.pop('accept_addr', (None, 0)))
log.info("Completed async main") log.info("Completed async main")
# TODO: when the local actor's main has completed we cancel?
# actor.cancel()
else: else:
# block waiting for the arbiter main task to complete # block waiting for the arbiter main task to complete
pass pass