diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 86a317d..c27e0e4 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -34,7 +34,10 @@ from ._state import current_actor, is_main_process from .log import get_logger, get_loglevel from ._runtime import Actor from ._portal import Portal -from ._exceptions import is_multi_cancelled +from ._exceptions import ( + is_multi_cancelled, + ContextCancelled, +) from ._root import open_root_actor from . import _state from . import _spawn @@ -104,6 +107,14 @@ class ActorNursery: self.errors = errors self.exited = trio.Event() + # NOTE: when no explicit call is made to + # `.open_root_actor()` by application code, + # `.open_nursery()` will implicitly call it to start the + # actor-tree runtime. In this case we mark ourselves as + # such so that runtime components can be aware for logging + # and syncing purposes to any actor opened nurseries. + self._implicit_runtime_started: bool = False + async def start_actor( self, name: str, @@ -249,10 +260,11 @@ class ActorNursery: ''' self.cancelled = True - log.cancel( - 'Cancelling actor nursery\n' - f'|_{self._children}\n' - ) + # TODO: impl a repr for spawn more compact + # then `._children`.. + children: dict = self._children + child_count: int = len(children) + msg: str = f'Cancelling actor nursery with {child_count} children\n' with trio.move_on_after(3) as cs: async with trio.open_nursery() as tn: @@ -263,7 +275,7 @@ class ActorNursery: subactor, proc, portal, - ) in self._children.values(): + ) in children.values(): # TODO: are we ever even going to use this or # is the spawning backend responsible for such @@ -275,12 +287,13 @@ class ActorNursery: if portal is None: # actor hasn't fully spawned yet event = self._actor._peer_connected[subactor.uid] log.warning( - f"{subactor.uid} wasn't finished spawning?") + f"{subactor.uid} never 't finished spawning?" + ) await event.wait() # channel/portal should now be up - _, _, portal = self._children[subactor.uid] + _, _, portal = children[subactor.uid] # XXX should be impossible to get here # unless method was called from within @@ -299,11 +312,13 @@ class ActorNursery: if portal.channel.connected(): tn.start_soon(portal.cancel_actor) + log.cancel(msg) # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes if cs.cancelled_caught: log.error( - f'Failed to cancel {self}\nHard killing process tree!' + f'Failed to cancel {self}?\n' + 'Hard killing underlying subprocess tree!\n' ) subactor: Actor proc: trio.Process @@ -312,7 +327,7 @@ class ActorNursery: subactor, proc, portal, - ) in self._children.values(): + ) in children.values(): log.warning(f"Hard killing process {proc}") proc.terminate() @@ -390,26 +405,39 @@ async def _open_and_supervise_one_cancels_all_nursery( # worry more are coming). an._join_procs.set() - # XXX: hypothetically an error could be - # raised and then a cancel signal shows up + # XXX NOTE XXX: hypothetically an error could + # be raised and then a cancel signal shows up # slightly after in which case the `else:` # block here might not complete? For now, # shield both. with trio.CancelScope(shield=True): - etype = type(inner_err) + etype: type = type(inner_err) if etype in ( trio.Cancelled, - KeyboardInterrupt + KeyboardInterrupt, ) or ( is_multi_cancelled(inner_err) ): log.cancel( - f"Nursery for {current_actor().uid} " - f"was cancelled with {etype}") + f'Actor-nursery cancelled by {etype}\n\n' + + f'{current_actor().uid}\n' + f' |_{an}\n\n' + + # TODO: show tb str? + # f'{tb_str}' + ) + elif etype in { + ContextCancelled, + }: + log.cancel( + 'Actor-nursery caught remote cancellation\n\n' + + f'{inner_err.tb_str}' + ) else: log.exception( - f"Nursery for {current_actor().uid} " - "errored with:" + 'Nursery errored with:\n' # TODO: same thing as in # `._invoke()` to compute how to @@ -450,11 +478,15 @@ async def _open_and_supervise_one_cancels_all_nursery( # ".run_in_actor()" actors then we also want to cancel all # remaining sub-actors (due to our lone strategy: # one-cancels-all). - log.cancel(f"Nursery cancelling due to {err}") if an._children: + log.cancel( + 'Actor-nursery cancelling due error type:\n' + f'{err}\n' + ) with trio.CancelScope(shield=True): await an.cancel() raise + finally: # No errors were raised while awaiting ".run_in_actor()" # actors but those actors may have returned remote errors as @@ -500,7 +532,7 @@ async def open_nursery( which cancellation scopes correspond to each spawned subactor set. ''' - implicit_runtime = False + implicit_runtime: bool = False actor = current_actor(err_on_no_runtime=False) @@ -512,7 +544,7 @@ async def open_nursery( log.info("Starting actor runtime!") # mark us for teardown on exit - implicit_runtime = True + implicit_runtime: bool = True async with open_root_actor(**kwargs) as actor: assert actor is current_actor() @@ -521,8 +553,21 @@ async def open_nursery( async with _open_and_supervise_one_cancels_all_nursery( actor ) as an: + + # NOTE: mark this nursery as having + # implicitly started the root actor so + # that `._runtime` machinery can avoid + # certain teardown synchronization + # blocking/waits and any associated (warn) + # logging when it's known that this + # nursery shouldn't be exited before the + # root actor is. + an._implicit_runtime_started = True yield an finally: + # XXX: this event will be set after the root actor + # runtime is already torn down, so we want to + # avoid any blocking on it. an.exited.set() else: # sub-nursery case @@ -536,8 +581,13 @@ async def open_nursery( an.exited.set() finally: - log.debug("Nursery teardown complete") + msg: str = ( + 'Actor-nursery exited\n' + f'|_{an}\n\n' + ) # shutdown runtime if it was started if implicit_runtime: - log.info("Shutting down actor tree") + msg += '=> Shutting down actor runtime <=\n' + + log.info(msg)