From 835d1fa07a37a413ef0d0e7f0462205e2d25a654 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 04:12:54 -0500 Subject: [PATCH] Vastly improve error triggered cancellation At the expense of a bit more complexity in `ActorNursery.wait()` (which I commented the heck out of fwiw) this adds far superior and correct cancellation semantics for when a nursery is cancelled due to (remote) errors in subactors. This includes: - `wait()` will now raise a `trio.MultiError` if multiple subactors error with the same semantics as in `trio`. - in `wait()` portals which are paired with `run_in_actor()` spawned subactors (versus `start_actor()`) are waited on separately and if the nursery **hasn't** been cancelled but there are errors those are raised immediately before waiting on `start_actor()` subactors which will block indefinitely if they haven't been explicitly cancelled. - if `wait()` does raise when the nursery hasn't yet been cancelled it's expected that it will be called again depending on the actor supervision strategy (i.e. right now we operate with a one-cancels-all strategy, the same as `trio`, so `ActorNursery.__aexit__() calls `cancel()` if any error is raised by `wait()`). Oh and I added `is_main_process()` helper; can't remember why.. --- tractor/_trionics.py | 182 ++++++++++++++++++++++++++++--------------- 1 file changed, 121 insertions(+), 61 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dc607c7..a6c4b73 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -32,7 +32,8 @@ class ActorNursery: Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] ] = {} - # portals spawned with ``run_in_actor()`` + # portals spawned with ``run_in_actor()`` are + # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False self._forkserver: forkserver.ForkServer = None @@ -132,6 +133,8 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, ) + # this marks the actor to be cancelled after its portal result + # is retreived, see ``wait()`` below. self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, @@ -142,29 +145,65 @@ class ActorNursery: async def wait(self) -> None: """Wait for all subactors to complete. + + This is probably the most complicated (and confusing, sorry) + function that does all the clever crap to deal with cancellation, + error propagation, and graceful subprocess tear down. """ - async def maybe_consume_result(portal, actor): - if ( - portal in self._cancel_after_result_on_exit and - (portal._result is None and portal._exc is None) - ): - log.debug(f"Waiting on final result from {subactor.uid}") - res = await portal.result() - # if it's an async-gen then we should alert the user - # that we're cancelling it + async def exhaust_portal(portal, actor): + """Pull final result from portal (assuming it was one). + + If the main task is an async generator do our best to consume + what's left of it. + """ + try: + log.debug(f"Waiting on final result from {actor.uid}") + final = res = await portal.result() + # if it's an async-gen then alert that we're cancelling it if inspect.isasyncgen(res): + final = [] log.warning( f"Blindly consuming asyncgen for {actor.uid}") with trio.fail_after(1): async with aclosing(res) as agen: async for item in agen: log.debug(f"Consuming item {item}") + final.append(item) + except Exception as err: + # we reraise in the parent task via a ``trio.MultiError`` + return err + else: + return final + + async def cancel_on_completion( + portal: Portal, + actor: Actor, + task_status=trio.TASK_STATUS_IGNORED, + ) -> None: + """Cancel actor gracefully once it's "main" portal's + result arrives. + + Should only be called for actors spawned with `run_in_actor()`. + """ + with trio.open_cancel_scope() as cs: + task_status.started(cs) + # this may error in which case we expect the far end + # actor to have already terminated itself + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors.append(result) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + + if cs.cancelled_caught: + log.warning( + "Result waiter was cancelled, process may have died") async def wait_for_proc( proc: mp.Process, actor: Actor, portal: Portal, - cancel_scope: trio._core._run.CancelScope, + cancel_scope: Optional[trio._core._run.CancelScope] = None, ) -> None: # TODO: timeout block here? if proc.is_alive(): @@ -172,42 +211,57 @@ class ActorNursery: # please god don't hang proc.join() log.debug(f"Joined {proc}") - await maybe_consume_result(portal, actor) - self._children.pop(actor.uid) - # proc terminated, cancel result waiter + + # proc terminated, cancel result waiter that may have + # been spawned in tandem if cancel_scope: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() - async def wait_for_actor( - portal: Portal, - actor: Actor, - task_status=trio.TASK_STATUS_IGNORED, - ) -> None: - # cancel the actor gracefully - with trio.open_cancel_scope() as cs: - task_status.started(cs) - await maybe_consume_result(portal, actor) - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - - if cs.cancelled_caught: - log.warning("Result waiter was cancelled") - - # unblocks when all waiter tasks have completed + log.debug(f"Waiting on all subactors to complete") children = self._children.copy() + errors = [] + # wait on run_in_actor() tasks, unblocks when all complete async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): cs = None + # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: - cs = await nursery.start(wait_for_actor, portal, subactor) + cs = await nursery.start( + cancel_on_completion, portal, subactor) + # TODO: how do we handle remote host spawned actors? + nursery.start_soon( + wait_for_proc, proc, subactor, portal, cs) + + if errors: + if not self.cancelled: + # halt here and expect to be called again once the nursery + # has been cancelled externally (ex. from within __aexit__() + # if an error is captured from ``wait()`` then ``cancel()`` + # is called immediately after which in turn calls ``wait()`` + # again.) + raise trio.MultiError(errors) + + # wait on all `start_actor()` subactors to complete + # if errors were captured above and we have not been cancelled + # then these ``start_actor()`` spawned actors will block until + # cancelled externally + children = self._children.copy() + async with trio.open_nursery() as nursery: + for subactor, proc, portal in children.values(): + # TODO: how do we handle remote host spawned actors? nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) + log.debug(f"All subactors for {self} have terminated") + if errors: + # always raise any error if we're also cancelled + raise trio.MultiError(errors) + async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel - iteslf and wait for all subprocesses to terminate. + itself and wait for all subactors to terminate. If ``hard_killl`` is set to ``True`` then kill the processes directly without any far end graceful ``trio`` cancellation. @@ -234,56 +288,57 @@ class ActorNursery: # channel/portal should now be up _, _, portal = self._children[subactor.uid] if portal is None: - # cancelled while waiting on the event? + # cancelled while waiting on the event + # to arrive chan = self._actor._peers[subactor.uid][-1] if chan: portal = Portal(chan) else: # there's no other choice left do_hard_kill(proc) - # spawn cancel tasks async + # spawn cancel tasks assert portal n.start_soon(portal.cancel_actor) - log.debug(f"Waiting on all subactors to complete") - await self.wait() + # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True - log.debug(f"All subactors for {self} have terminated") + await self.wait() async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - try: - if etype is not None: + if etype is not None: + try: # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case the - # else block here might not complete? Should both be shielded? + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. with trio.open_cancel_scope(shield=True): if etype is trio.Cancelled: log.warning( - f"{current_actor().uid} was cancelled with {etype}" - ", cancelling actor nursery") - await self.cancel() + f"Nursery for {current_actor().uid} was " + f"cancelled with {etype}") else: log.exception( - f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() - else: - # XXX: this is effectively the lone cancellation/supervisor - # strategy which exactly mimicks trio's behaviour - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except Exception as err: - log.warning(f"Nursery caught {err}, cancelling") + f"Nursery for {current_actor().uid} " + f"errored with {etype}, ") await self.cancel() - raise - log.debug(f"Nursery teardown complete") - except Exception: - log.exception("Error on nursery exit:") - await self.wait() - raise + except trio.MultiError as merr: + if value not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [value]) + raise + else: + # XXX: this is effectively the (for now) lone + # cancellation/supervisor strategy which exactly + # mimicks trio's behaviour + log.debug(f"Waiting on subactors {self._children} to complete") + try: + await self.wait() + except Exception as err: + log.warning(f"Nursery caught {err}, cancelling") + await self.cancel() + raise + log.debug(f"Nursery teardown complete") @asynccontextmanager @@ -297,3 +352,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # TODO: figure out supervisors from erlang async with ActorNursery(current_actor()) as nursery: yield nursery + + +def is_main_process(): + "Bool determining if this actor is running in the top-most process." + return mp.current_process().name == 'MainProcess'