From f977d37ceec0f12b36bfdb22835b0c6eb285a509 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Nov 2019 17:11:48 -0500 Subject: [PATCH] Add nursery self-destruct logic on cancel failure If a nursery fails to cancel (some sub-actors presumably) then hard kill the whole process tree to avoid hangs during a catastrophic failure. This logic may get factored out (and changed) as we introduce custom supervisor strategies. --- tractor/_trionics.py | 61 +++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 7ebe37a..6954367 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -87,6 +87,7 @@ class ActorNursery: event, chan = await self._actor.wait_for_peer(actor.uid) portal = Portal(chan) self._children[actor.uid] = (actor, proc, portal) + return portal async def run_in_actor( @@ -174,12 +175,19 @@ class ActorNursery: result = await exhaust_portal(portal, actor) if isinstance(result, Exception): errors.append(result) - log.info(f"Cancelling {portal.channel.uid} gracefully") + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) + else: + log.info(f"Cancelling {portal.channel.uid} gracefully") + + # cancel the process now that we have a final result await portal.cancel_actor() - if cs.cancelled_caught: - log.warning( - "Result waiter was cancelled, process may have died") + # XXX: lol, this will never get run without a shield above.. + # if cs.cancelled_caught: + # log.warning( + # "Result waiter was cancelled, process may have died") async def wait_for_proc( proc: mp.Process, @@ -194,11 +202,12 @@ class ActorNursery: # please god don't hang proc.join() log.debug(f"Joined {proc}") + # indicate we are no longer managing this subactor self._children.pop(actor.uid) # proc terminated, cancel result waiter that may have - # been spawned in tandem - if cancel_scope: + # been spawned in tandem if not done already + if cancel_scope: # and not portal._cancelled: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -222,11 +231,12 @@ class ActorNursery: if errors: if not self.cancelled: - # halt here and expect to be called again once the nursery - # has been cancelled externally (ex. from within __aexit__() - # if an error is captured from ``wait()`` then ``cancel()`` - # is called immediately after which in turn calls ``wait()`` - # again.) + # bubble up error(s) here and expect to be called again + # once the nursery has been cancelled externally (ex. + # from within __aexit__() if an error is caught around + # ``self.wait()`` then, ``self.cancel()`` is called + # immediately, in the default supervisor strat, after + # which in turn ``self.wait()`` is called again.) raise trio.MultiError(errors) # wait on all `start_actor()` subactors to complete @@ -259,7 +269,7 @@ class ActorNursery: # os.kill(proc.pid, signal.SIGINT) log.debug(f"Cancelling nursery") - with trio.fail_after(3): + with trio.move_on_after(3) as cs: async with trio.open_nursery() as n: for subactor, proc, portal in self._children.values(): if hard_kill: @@ -272,6 +282,10 @@ class ActorNursery: await event.wait() # channel/portal should now be up _, _, portal = self._children[subactor.uid] + + # XXX should be impossible to get here + # unless method was called from within + # shielded cancel scope. if portal is None: # cancelled while waiting on the event # to arrive @@ -281,10 +295,18 @@ class ActorNursery: else: # there's no other choice left do_hard_kill(proc) - # spawn cancel tasks + # spawn cancel tasks for each sub-actor assert portal n.start_soon(portal.cancel_actor) + # if we cancelled the cancel (we hung cancelling remote actors) + # then hard kill all sub-processes + if cs.cancelled_caught: + log.error(f"Failed to gracefully cancel {self}, hard killing!") + async with trio.open_nursery() as n: + for subactor, proc, portal in self._children.values(): + n.start_soon(do_hard_kill, proc) + # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True await self.wait() @@ -292,6 +314,9 @@ class ActorNursery: async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ + # XXX: this is effectively the (for now) lone + # cancellation/supervisor strategy (one-cancels-all) + # which exactly mimicks trio's behaviour if etype is not None: try: # XXX: hypothetically an error could be raised and then @@ -313,16 +338,16 @@ class ActorNursery: raise trio.MultiError(merr.exceptions + [value]) raise else: - # XXX: this is effectively the (for now) lone - # cancellation/supervisor strategy which exactly - # mimicks trio's behaviour log.debug(f"Waiting on subactors {self._children} to complete") try: await self.wait() except (Exception, trio.MultiError) as err: - log.warning(f"Nursery caught {err}, cancelling") - await self.cancel() + log.warning(f"Nursery cancelling due to {err}") + if self._children: + with trio.CancelScope(shield=True): + await self.cancel() raise + log.debug(f"Nursery teardown complete")