forked from goodboy/tractor
Add nursery self-destruct logic on cancel failure
If a nursery fails to cancel (some sub-actors presumably) then hard kill the whole process tree to avoid hangs during a catastrophic failure. This logic may get factored out (and changed) as we introduce custom supervisor strategies.more_thorough_super_tests
parent
42978bf9ac
commit
f977d37cee
|
@ -87,6 +87,7 @@ class ActorNursery:
|
||||||
event, chan = await self._actor.wait_for_peer(actor.uid)
|
event, chan = await self._actor.wait_for_peer(actor.uid)
|
||||||
portal = Portal(chan)
|
portal = Portal(chan)
|
||||||
self._children[actor.uid] = (actor, proc, portal)
|
self._children[actor.uid] = (actor, proc, portal)
|
||||||
|
|
||||||
return portal
|
return portal
|
||||||
|
|
||||||
async def run_in_actor(
|
async def run_in_actor(
|
||||||
|
@ -174,12 +175,19 @@ class ActorNursery:
|
||||||
result = await exhaust_portal(portal, actor)
|
result = await exhaust_portal(portal, actor)
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors.append(result)
|
errors.append(result)
|
||||||
|
log.warning(
|
||||||
|
f"Cancelling {portal.channel.uid} after error {result}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
||||||
|
|
||||||
|
# cancel the process now that we have a final result
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
# XXX: lol, this will never get run without a shield above..
|
||||||
log.warning(
|
# if cs.cancelled_caught:
|
||||||
"Result waiter was cancelled, process may have died")
|
# log.warning(
|
||||||
|
# "Result waiter was cancelled, process may have died")
|
||||||
|
|
||||||
async def wait_for_proc(
|
async def wait_for_proc(
|
||||||
proc: mp.Process,
|
proc: mp.Process,
|
||||||
|
@ -194,11 +202,12 @@ class ActorNursery:
|
||||||
# please god don't hang
|
# please god don't hang
|
||||||
proc.join()
|
proc.join()
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
|
# indicate we are no longer managing this subactor
|
||||||
self._children.pop(actor.uid)
|
self._children.pop(actor.uid)
|
||||||
|
|
||||||
# proc terminated, cancel result waiter that may have
|
# proc terminated, cancel result waiter that may have
|
||||||
# been spawned in tandem
|
# been spawned in tandem if not done already
|
||||||
if cancel_scope:
|
if cancel_scope: # and not portal._cancelled:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Cancelling existing result waiter task for {actor.uid}")
|
f"Cancelling existing result waiter task for {actor.uid}")
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
@ -222,11 +231,12 @@ class ActorNursery:
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
if not self.cancelled:
|
if not self.cancelled:
|
||||||
# halt here and expect to be called again once the nursery
|
# bubble up error(s) here and expect to be called again
|
||||||
# has been cancelled externally (ex. from within __aexit__()
|
# once the nursery has been cancelled externally (ex.
|
||||||
# if an error is captured from ``wait()`` then ``cancel()``
|
# from within __aexit__() if an error is caught around
|
||||||
# is called immediately after which in turn calls ``wait()``
|
# ``self.wait()`` then, ``self.cancel()`` is called
|
||||||
# again.)
|
# immediately, in the default supervisor strat, after
|
||||||
|
# which in turn ``self.wait()`` is called again.)
|
||||||
raise trio.MultiError(errors)
|
raise trio.MultiError(errors)
|
||||||
|
|
||||||
# wait on all `start_actor()` subactors to complete
|
# wait on all `start_actor()` subactors to complete
|
||||||
|
@ -259,7 +269,7 @@ class ActorNursery:
|
||||||
# os.kill(proc.pid, signal.SIGINT)
|
# os.kill(proc.pid, signal.SIGINT)
|
||||||
|
|
||||||
log.debug(f"Cancelling nursery")
|
log.debug(f"Cancelling nursery")
|
||||||
with trio.fail_after(3):
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
for subactor, proc, portal in self._children.values():
|
for subactor, proc, portal in self._children.values():
|
||||||
if hard_kill:
|
if hard_kill:
|
||||||
|
@ -272,6 +282,10 @@ class ActorNursery:
|
||||||
await event.wait()
|
await event.wait()
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
_, _, portal = self._children[subactor.uid]
|
_, _, portal = self._children[subactor.uid]
|
||||||
|
|
||||||
|
# XXX should be impossible to get here
|
||||||
|
# unless method was called from within
|
||||||
|
# shielded cancel scope.
|
||||||
if portal is None:
|
if portal is None:
|
||||||
# cancelled while waiting on the event
|
# cancelled while waiting on the event
|
||||||
# to arrive
|
# to arrive
|
||||||
|
@ -281,10 +295,18 @@ class ActorNursery:
|
||||||
else: # there's no other choice left
|
else: # there's no other choice left
|
||||||
do_hard_kill(proc)
|
do_hard_kill(proc)
|
||||||
|
|
||||||
# spawn cancel tasks
|
# spawn cancel tasks for each sub-actor
|
||||||
assert portal
|
assert portal
|
||||||
n.start_soon(portal.cancel_actor)
|
n.start_soon(portal.cancel_actor)
|
||||||
|
|
||||||
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
|
# then hard kill all sub-processes
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
log.error(f"Failed to gracefully cancel {self}, hard killing!")
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
for subactor, proc, portal in self._children.values():
|
||||||
|
n.start_soon(do_hard_kill, proc)
|
||||||
|
|
||||||
# mark ourselves as having (tried to have) cancelled all subactors
|
# mark ourselves as having (tried to have) cancelled all subactors
|
||||||
self.cancelled = True
|
self.cancelled = True
|
||||||
await self.wait()
|
await self.wait()
|
||||||
|
@ -292,6 +314,9 @@ class ActorNursery:
|
||||||
async def __aexit__(self, etype, value, tb):
|
async def __aexit__(self, etype, value, tb):
|
||||||
"""Wait on all subactor's main routines to complete.
|
"""Wait on all subactor's main routines to complete.
|
||||||
"""
|
"""
|
||||||
|
# XXX: this is effectively the (for now) lone
|
||||||
|
# cancellation/supervisor strategy (one-cancels-all)
|
||||||
|
# which exactly mimicks trio's behaviour
|
||||||
if etype is not None:
|
if etype is not None:
|
||||||
try:
|
try:
|
||||||
# XXX: hypothetically an error could be raised and then
|
# XXX: hypothetically an error could be raised and then
|
||||||
|
@ -313,16 +338,16 @@ class ActorNursery:
|
||||||
raise trio.MultiError(merr.exceptions + [value])
|
raise trio.MultiError(merr.exceptions + [value])
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
# XXX: this is effectively the (for now) lone
|
|
||||||
# cancellation/supervisor strategy which exactly
|
|
||||||
# mimicks trio's behaviour
|
|
||||||
log.debug(f"Waiting on subactors {self._children} to complete")
|
log.debug(f"Waiting on subactors {self._children} to complete")
|
||||||
try:
|
try:
|
||||||
await self.wait()
|
await self.wait()
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
log.warning(f"Nursery caught {err}, cancelling")
|
log.warning(f"Nursery cancelling due to {err}")
|
||||||
|
if self._children:
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
await self.cancel()
|
await self.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
log.debug(f"Nursery teardown complete")
|
log.debug(f"Nursery teardown complete")
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue