diff --git a/tractor/_supervise.py b/tractor/_supervise.py index af83aa5..86a317d 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -21,6 +21,7 @@ from contextlib import asynccontextmanager as acm from functools import partial import inspect +from pprint import pformat from typing import TYPE_CHECKING import typing import warnings @@ -189,14 +190,16 @@ class ActorNursery: **kwargs, # explicit args to ``fn`` ) -> Portal: - """Spawn a new actor, run a lone task, then terminate the actor and + ''' + Spawn a new actor, run a lone task, then terminate the actor and return its result. Actors spawned using this method are kept alive at nursery teardown until the task spawned by executing ``fn`` completes at which point the actor is terminated. - """ - mod_path = fn.__module__ + + ''' + mod_path: str = fn.__module__ if name is None: # use the explicit function name if not provided @@ -231,7 +234,11 @@ class ActorNursery: ) return portal - async def cancel(self, hard_kill: bool = False) -> None: + async def cancel( + self, + hard_kill: bool = False, + + ) -> None: ''' Cancel this nursery by instructing each subactor to cancel itself and wait for all subactors to terminate. @@ -242,10 +249,12 @@ class ActorNursery: ''' self.cancelled = True - log.cancel(f"Cancelling nursery in {self._actor.uid}") + log.cancel( + 'Cancelling actor nursery\n' + f'|_{self._children}\n' + ) with trio.move_on_after(3) as cs: - - async with trio.open_nursery() as nursery: + async with trio.open_nursery() as tn: subactor: Actor proc: trio.Process @@ -288,7 +297,7 @@ class ActorNursery: # spawn cancel tasks for each sub-actor assert portal if portal.channel.connected(): - nursery.start_soon(portal.cancel_actor) + tn.start_soon(portal.cancel_actor) # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes @@ -343,7 +352,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # the above "daemon actor" nursery will be notified. async with trio.open_nursery() as ria_nursery: - anursery = ActorNursery( + an = ActorNursery( actor, ria_nursery, da_nursery, @@ -352,16 +361,16 @@ async def _open_and_supervise_one_cancels_all_nursery( try: # spawning of actors happens in the caller's scope # after we yield upwards - yield anursery + yield an # When we didn't error in the caller's scope, # signal all process-monitor-tasks to conduct # the "hard join phase". log.runtime( - f"Waiting on subactors {anursery._children} " - "to complete" + 'Waiting on subactors to complete:\n' + f'{pformat(an._children)}\n' ) - anursery._join_procs.set() + an._join_procs.set() except BaseException as inner_err: errors[actor.uid] = inner_err @@ -373,13 +382,13 @@ async def _open_and_supervise_one_cancels_all_nursery( # Instead try to wait for pdb to be released before # tearing down. await maybe_wait_for_debugger( - child_in_debug=anursery._at_least_one_child_in_debug + child_in_debug=an._at_least_one_child_in_debug ) # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't # worry more are coming). - anursery._join_procs.set() + an._join_procs.set() # XXX: hypothetically an error could be # raised and then a cancel signal shows up @@ -413,7 +422,7 @@ async def _open_and_supervise_one_cancels_all_nursery( ) # cancel all subactors - await anursery.cancel() + await an.cancel() # ria_nursery scope end @@ -434,7 +443,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # XXX: yet another guard before allowing the cancel # sequence in case a (single) child is in debug. await maybe_wait_for_debugger( - child_in_debug=anursery._at_least_one_child_in_debug + child_in_debug=an._at_least_one_child_in_debug ) # If actor-local error was raised while waiting on @@ -442,9 +451,9 @@ async def _open_and_supervise_one_cancels_all_nursery( # remaining sub-actors (due to our lone strategy: # one-cancels-all). log.cancel(f"Nursery cancelling due to {err}") - if anursery._children: + if an._children: with trio.CancelScope(shield=True): - await anursery.cancel() + await an.cancel() raise finally: # No errors were raised while awaiting ".run_in_actor()" @@ -454,9 +463,9 @@ async def _open_and_supervise_one_cancels_all_nursery( # collected in ``errors`` so cancel all actors, summarize # all errors and re-raise. if errors: - if anursery._children: + if an._children: with trio.CancelScope(shield=True): - await anursery.cancel() + await an.cancel() # use `BaseExceptionGroup` as needed if len(errors) > 1: @@ -511,20 +520,20 @@ async def open_nursery( try: async with _open_and_supervise_one_cancels_all_nursery( actor - ) as anursery: - yield anursery + ) as an: + yield an finally: - anursery.exited.set() + an.exited.set() else: # sub-nursery case try: async with _open_and_supervise_one_cancels_all_nursery( actor - ) as anursery: - yield anursery + ) as an: + yield an finally: - anursery.exited.set() + an.exited.set() finally: log.debug("Nursery teardown complete")