From b285db4c58a9406026b964892bf6e6461432aba9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Feb 2021 12:59:43 -0500 Subject: [PATCH] Factor OCA supervisor into new func --- tractor/_root.py | 5 +- tractor/_trionics.py | 237 +++++++++++++++++++++++-------------------- 2 files changed, 130 insertions(+), 112 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index e48ca98..1206864 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -164,7 +164,10 @@ async def open_root_actor( ) try: yield actor - # result = await main() + + # except BaseException as err: + # breakpoint() + except (Exception, trio.MultiError) as err: logger.exception("Actor crashed:") await _debug._maybe_enter_pm(err) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 490778d..dfe4fba 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -200,6 +200,128 @@ class ActorNursery: self._join_procs.set() +@asynccontextmanager +async def _open_and_supervise_one_cancels_all_nursery( + actor: Actor, +) -> typing.AsyncGenerator[ActorNursery, None]: + + # the collection of errors retreived from spawned sub-actors + errors: Dict[Tuple[str, str], Exception] = {} + + # This is the outermost level "deamon actor" nursery. It is awaited + # **after** the below inner "run in actor nursery". This allows for + # handling errors that are generated by the inner nursery in + # a supervisor strategy **before** blocking indefinitely to wait for + # actors spawned in "daemon mode" (aka started using + # ``ActorNursery.start_actor()``). + + # errors from this daemon actor nursery bubble up to caller + async with trio.open_nursery() as da_nursery: + try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # ``ActorNusery.run_in_actor()``) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. + async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( + actor, + ria_nursery, + da_nursery, + errors + ) + try: + # spawning of actors happens in the caller's scope + # after we yield upwards + yield anursery + log.debug( + f"Waiting on subactors {anursery._children} " + "to complete" + ) + except BaseException as err: + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). + anursery._join_procs.set() + try: + # XXX: hypothetically an error could be + # raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. + with trio.CancelScope(shield=True): + etype = type(err) + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( + is_multi_cancelled(err) + ): + log.warning( + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") + + # cancel all subactors + await anursery.cancel() + + except trio.MultiError as merr: + # If we receive additional errors while waiting on + # remaining subactors that were cancelled, + # aggregate those errors with the original error + # that triggered this teardown. + if err not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [err]) + else: + raise + + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + log.debug("Waiting on all subactors to complete") + anursery._join_procs.set() + + # ria_nursery scope end + + # XXX: do we need a `trio.Cancelled` catch here as well? + except (Exception, trio.MultiError, trio.Cancelled) as err: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). + log.warning(f"Nursery cancelling due to {err}") + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + raise + finally: + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. + if errors: + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + + # use `MultiError` as needed + if len(errors) > 1: + raise trio.MultiError(tuple(errors.values())) + else: + raise list(errors.values())[0] + + # ria_nursery scope end - nursery checkpoint + + # after nursery exit + + @asynccontextmanager async def open_nursery( **kwargs, @@ -234,120 +356,13 @@ async def open_nursery( # mark us for teardown on exit implicit_runtime = True - # the collection of errors retreived from spawned sub-actors - errors: Dict[Tuple[str, str], Exception] = {} - - # This is the outermost level "deamon actor" nursery. It is awaited - # **after** the below inner "run in actor nursery". This allows for - # handling errors that are generated by the inner nursery in - # a supervisor strategy **before** blocking indefinitely to wait for - # actors spawned in "daemon mode" (aka started using - # ``ActorNursery.start_actor()``). try: - async with trio.open_nursery() as da_nursery: - try: - # This is the inner level "run in actor" nursery. It is - # awaited first since actors spawned in this way (using - # ``ActorNusery.run_in_actor()``) are expected to only - # return a single result and then complete (i.e. be canclled - # gracefully). Errors collected from these actors are - # immediately raised for handling by a supervisor strategy. - # As such if the strategy propagates any error(s) upwards - # the above "daemon actor" nursery will be notified. - async with trio.open_nursery() as ria_nursery: - anursery = ActorNursery( - actor, - ria_nursery, - da_nursery, - errors - ) - try: - # spawning of actors happens in the caller's scope - # after we yield upwards - yield anursery - log.debug( - f"Waiting on subactors {anursery._children} " - "to complete" - ) - except BaseException as err: - # if the caller's scope errored then we activate our - # one-cancels-all supervisor strategy (don't - # worry more are coming). - anursery._join_procs.set() - try: - # XXX: hypothetically an error could be - # raised and then a cancel signal shows up - # slightly after in which case the `else:` - # block here might not complete? For now, - # shield both. - with trio.CancelScope(shield=True): - etype = type(err) - if etype in ( - trio.Cancelled, - KeyboardInterrupt - ) or ( - is_multi_cancelled(err) - ): - log.warning( - f"Nursery for {current_actor().uid} " - f"was cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") + async with _open_and_supervise_one_cancels_all_nursery( + actor + ) as anursery: - # cancel all subactors - await anursery.cancel() + yield anursery - except trio.MultiError as merr: - # If we receive additional errors while waiting on - # remaining subactors that were cancelled, - # aggregate those errors with the original error - # that triggered this teardown. - if err not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [err]) - else: - raise - - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - log.debug("Waiting on all subactors to complete") - anursery._join_procs.set() - - # ria_nursery scope end - - # XXX: do we need a `trio.Cancelled` catch here as well? - except (Exception, trio.MultiError, trio.Cancelled) as err: - # If actor-local error was raised while waiting on - # ".run_in_actor()" actors then we also want to cancel all - # remaining sub-actors (due to our lone strategy: - # one-cancels-all). - log.warning(f"Nursery cancelling due to {err}") - if anursery._children: - with trio.CancelScope(shield=True): - await anursery.cancel() - raise - finally: - # No errors were raised while awaiting ".run_in_actor()" - # actors but those actors may have returned remote errors as - # results (meaning they errored remotely and have relayed - # those errors back to this parent actor). The errors are - # collected in ``errors`` so cancel all actors, summarize - # all errors and re-raise. - if errors: - if anursery._children: - with trio.CancelScope(shield=True): - await anursery.cancel() - - # use `MultiError` as needed - if len(errors) > 1: - raise trio.MultiError(tuple(errors.values())) - else: - raise list(errors.values())[0] - - # ria_nursery scope end - nursery checkpoint - - # after nursery exit finally: log.debug("Nursery teardown complete")