From 7250deb30fa480cc708370659166b48e7ed38148 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Oct 2021 18:18:00 -0400 Subject: [PATCH] Make OCA nursery **not** a multiplexed mindfuck --- tractor/_supervise.py | 254 ++++++++++++++---------------------------- 1 file changed, 85 insertions(+), 169 deletions(-) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 0a400ea..921c121 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -12,6 +12,7 @@ import trio from async_generator import asynccontextmanager from . import _debug +from ._debug import maybe_wait_for_debugger from ._state import current_actor, is_main_process, is_root_process from .log import get_logger, get_loglevel from ._actor import Actor @@ -50,6 +51,7 @@ class ActorNursery: self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False self._join_procs = trio.Event() + self._all_children_reaped = trio.Event() self.errors = errors async def start_actor( @@ -168,8 +170,11 @@ class ActorNursery: ) return portal - async def cancel(self, hard_kill: bool = False) -> None: - """Cancel this nursery by instructing each subactor to cancel + async def cancel( + self, + ) -> None: + """ + Cancel this nursery by instructing each subactor to cancel itself and wait for all subactors to terminate. If ``hard_killl`` is set to ``True`` then kill the processes @@ -177,58 +182,20 @@ class ActorNursery: """ self.cancelled = True - log.cancel(f"Cancelling nursery in {self._actor.uid}") - with trio.move_on_after(3) as cs: + childs = tuple(self._children.keys()) + log.cancel( + f"Cancelling nursery in {self._actor.uid} with children\n{childs}" + ) - async with trio.open_nursery() as nursery: + await maybe_wait_for_debugger() - for subactor, proc, portal in self._children.values(): - - # TODO: are we ever even going to use this or - # is the spawning backend responsible for such - # things? I'm thinking latter. - if hard_kill: - proc.terminate() - - else: - if portal is None: # actor hasn't fully spawned yet - event = self._actor._peer_connected[subactor.uid] - log.warning( - f"{subactor.uid} wasn't finished spawning?") - - await event.wait() - - # channel/portal should now be up - _, _, portal = self._children[subactor.uid] - - # XXX should be impossible to get here - # unless method was called from within - # shielded cancel scope. - if portal is None: - # cancelled while waiting on the event - # to arrive - chan = self._actor._peers[subactor.uid][-1] - if chan: - portal = Portal(chan) - else: # there's no other choice left - proc.terminate() - - # spawn cancel tasks for each sub-actor - assert portal - nursery.start_soon(portal.cancel_actor) - - # if we cancelled the cancel (we hung cancelling remote actors) - # then hard kill all sub-processes - if cs.cancelled_caught: - log.error( - f"Failed to cancel {self}\nHard killing process tree!") - for subactor, proc, portal in self._children.values(): - log.warning(f"Hard killing process {proc}") - proc.terminate() - - # mark ourselves as having (tried to have) cancelled all subactors + # wake up all spawn tasks self._join_procs.set() + # cancel all spawner nurseries + self._ria_nursery.cancel_scope.cancel() + self._da_nursery.cancel_scope.cancel() + @asynccontextmanager async def _open_and_supervise_one_cancels_all_nursery( @@ -244,10 +211,13 @@ async def _open_and_supervise_one_cancels_all_nursery( # a supervisor strategy **before** blocking indefinitely to wait for # actors spawned in "daemon mode" (aka started using # ``ActorNursery.start_actor()``). + original_err = None # errors from this daemon actor nursery bubble up to caller - async with trio.open_nursery() as da_nursery: - try: + try: + async with trio.open_nursery() as da_nursery: + # try: + # This is the inner level "run in actor" nursery. It is # awaited first since actors spawned in this way (using # ``ActorNusery.run_in_actor()``) are expected to only @@ -256,15 +226,15 @@ async def _open_and_supervise_one_cancels_all_nursery( # immediately raised for handling by a supervisor strategy. # As such if the strategy propagates any error(s) upwards # the above "daemon actor" nursery will be notified. - async with trio.open_nursery() as ria_nursery: + try: + async with trio.open_nursery() as ria_nursery: - anursery = ActorNursery( - actor, - ria_nursery, - da_nursery, - errors - ) - try: + anursery = ActorNursery( + actor, + ria_nursery, + da_nursery, + errors + ) # spawning of actors happens in the caller's scope # after we yield upwards yield anursery @@ -274,131 +244,76 @@ async def _open_and_supervise_one_cancels_all_nursery( "to complete" ) - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - # signal all process monitor tasks to conduct # hard join phase. + # await maybe_wait_for_debugger() + # log.error('joing trigger NORMAL') anursery._join_procs.set() - except BaseException as err: + except BaseException as err: + original_err = err - # If we error in the root but the debugger is - # engaged we don't want to prematurely kill (and - # thus clobber access to) the local tty since it - # will make the pdb repl unusable. - # Instead try to wait for pdb to be released before - # tearing down. - if is_root_process(): - - # TODO: could this make things more deterministic? - # wait to see if a sub-actor task will be - # scheduled and grab the tty lock on the next - # tick? - # await trio.testing.wait_all_tasks_blocked() - - debug_complete = _debug._no_remote_has_tty - if ( - debug_complete and - not debug_complete.is_set() - ): - log.warning( - 'Root has errored but pdb is in use by ' - f'child {_debug._global_actor_in_debug}\n' - 'Waiting on tty lock to release..') - - with trio.CancelScope(shield=True): - await debug_complete.wait() - - # if the caller's scope errored then we activate our - # one-cancels-all supervisor strategy (don't - # worry more are coming). - anursery._join_procs.set() - - try: - # XXX: hypothetically an error could be - # raised and then a cancel signal shows up - # slightly after in which case the `else:` - # block here might not complete? For now, - # shield both. - with trio.CancelScope(shield=True): - etype = type(err) - if etype in ( - trio.Cancelled, - KeyboardInterrupt - ) or ( - is_multi_cancelled(err) - ): - log.cancel( - f"Nursery for {current_actor().uid} " - f"was cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") - - # cancel all subactors - await anursery.cancel() - - except trio.MultiError as merr: - # If we receive additional errors while waiting on - # remaining subactors that were cancelled, - # aggregate those errors with the original error - # that triggered this teardown. - if err not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [err]) - else: - raise - - # ria_nursery scope end - - # XXX: do we need a `trio.Cancelled` catch here as well? - # this is the catch around the ``.run_in_actor()`` nursery - except ( - - Exception, - trio.MultiError, - trio.Cancelled - - ) as err: - - # If actor-local error was raised while waiting on - # ".run_in_actor()" actors then we also want to cancel all - # remaining sub-actors (due to our lone strategy: - # one-cancels-all). - log.cancel(f"Nursery cancelling due to {err}") - if anursery._children: + # XXX: hypothetically an error could be + # raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. with trio.CancelScope(shield=True): + etype = type(err) + + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( + is_multi_cancelled(err) + ): + log.cancel( + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") + + # cancel all subactors await anursery.cancel() - raise - finally: - # No errors were raised while awaiting ".run_in_actor()" - # actors but those actors may have returned remote errors as - # results (meaning they errored remotely and have relayed - # those errors back to this parent actor). The errors are - # collected in ``errors`` so cancel all actors, summarize - # all errors and re-raise. - if errors: - if anursery._children: - with trio.CancelScope(shield=True): - await anursery.cancel() - # use `MultiError` as needed - if len(errors) > 1: - raise trio.MultiError(tuple(errors.values())) - else: - raise list(errors.values())[0] + # ria_nursery scope end - nursery checkpoint - # ria_nursery scope end - nursery checkpoint + # after daemon nursery exit + finally: + with trio.CancelScope(shield=True): + await anursery._all_children_reaped.wait() + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. + if errors: + if anursery._children: + raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!") + # with trio.CancelScope(shield=True): + # await anursery.cancel() - # after nursery exit + # use `MultiError` as needed + if len(errors) > 1: + raise trio.MultiError(tuple(errors.values())) + else: + raise list(errors.values())[0] + + elif original_err: + raise original_err + + log.cancel(f'{anursery} terminated gracefully') @asynccontextmanager async def open_nursery( **kwargs, ) -> typing.AsyncGenerator[ActorNursery, None]: - """Create and yield a new ``ActorNursery`` to be used for spawning + """ + Create and yield a new ``ActorNursery`` to be used for spawning structured concurrent subactors. When an actor is spawned a new trio task is started which @@ -410,6 +325,7 @@ async def open_nursery( close it. It turns out this approach is probably more correct anyway since it is more clear from the following nested nurseries which cancellation scopes correspond to each spawned subactor set. + """ implicit_runtime = False