diff --git a/tractor/_supervise.py b/tractor/_supervise.py index c4532b9..4b9f733 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -34,14 +34,12 @@ class ActorNursery: def __init__( self, actor: Actor, - ria_nursery: trio.Nursery, - da_nursery: trio.Nursery, + spawn_nursery: trio.Nursery, errors: Dict[Tuple[str, str], Exception], ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor - self._ria_nursery = ria_nursery - self._da_nursery = da_nursery + self._spawn_n = spawn_nursery self._children: Dict[ Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] @@ -99,7 +97,7 @@ class ActorNursery: # start a task to spawn a process # blocks until process has been started and a portal setup - nursery = nursery or self._da_nursery + nursery = nursery or self._spawn_n # XXX: the type ignore is actually due to a `mypy` bug return await nursery.start( # type: ignore @@ -149,7 +147,7 @@ class ActorNursery: bind_addr=bind_addr, loglevel=loglevel, # use the run_in_actor nursery - nursery=self._ria_nursery, + nursery=self._spawn_n, infect_asyncio=infect_asyncio, ) @@ -182,19 +180,59 @@ class ActorNursery: """ self.cancelled = True - childs = tuple(self._children.keys()) + # entries may be poppsed by the spawning backend as + # actors cancel individually + childs = self._children.copy() + log.cancel( - f"Cancelling nursery in {self._actor.uid} with children\n{childs}" + f'Cancelling nursery in {self._actor.uid} with children\n' + f'{childs.keys()}' ) + # wake up all spawn tasks to move on as those nursery + # has ``__aexit__()``-ed + self._join_procs.set() + await maybe_wait_for_debugger() - # wake up all spawn tasks - self._join_procs.set() + # one-cancels-all strat + async with trio.open_nursery() as cancel_sender: + for subactor, proc, portal in childs.values(): + cancel_sender.start_soon(portal.cancel_actor) - # cancel all spawner nurseries - self._ria_nursery.cancel_scope.cancel() - self._da_nursery.cancel_scope.cancel() + # cancel all spawner tasks + # self._spawn_n.cancel_scope.cancel() + + async def _handle_err( + self, + err: BaseException, + portal: Optional[Portal] = None, + + ) -> None: + # XXX: hypothetically an error could be + # raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. + with trio.CancelScope(shield=True): + etype = type(err) + + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( + is_multi_cancelled(err) + ): + log.cancel( + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") + + # cancel all subactors + await self.cancel() @asynccontextmanager @@ -211,11 +249,11 @@ async def _open_and_supervise_one_cancels_all_nursery( # a supervisor strategy **before** blocking indefinitely to wait for # actors spawned in "daemon mode" (aka started using # ``ActorNursery.start_actor()``). - original_err = None + src_err: Optional[BaseException] = None # errors from this daemon actor nursery bubble up to caller try: - async with trio.open_nursery() as da_nursery: + async with trio.open_nursery() as spawn_n: # try: # This is the inner level "run in actor" nursery. It is @@ -226,71 +264,76 @@ async def _open_and_supervise_one_cancels_all_nursery( # immediately raised for handling by a supervisor strategy. # As such if the strategy propagates any error(s) upwards # the above "daemon actor" nursery will be notified. + + anursery = ActorNursery( + actor, + spawn_n, + errors + ) + # spawning of actors happens in the caller's scope + # after we yield upwards try: - async with trio.open_nursery() as ria_nursery: + yield anursery - anursery = ActorNursery( - actor, - ria_nursery, - da_nursery, - errors - ) - # spawning of actors happens in the caller's scope - # after we yield upwards - yield anursery + log.runtime( + f"Waiting on subactors {anursery._children} " + "to complete" + ) - log.runtime( - f"Waiting on subactors {anursery._children} " - "to complete" - ) + # signal all process monitor tasks to conduct + # hard join phase. + # await maybe_wait_for_debugger() + # log.error('joing trigger NORMAL') + anursery._join_procs.set() - # signal all process monitor tasks to conduct - # hard join phase. - # await maybe_wait_for_debugger() - # log.error('joing trigger NORMAL') - anursery._join_procs.set() + # NOTE: there are 2 cases for error propagation: + # - an actor which is ``.run_in_actor()`` invoked + # runs a single task and reports the error upwards + # - the top level task which opened this nursery (in the + # parent actor) raises. In this case the raise can come + # from a variety of places: + # - user task code unrelated to the nursery/child actors + # - a ``RemoteActorError`` propagated up through the + # portal api from a child actor which will look the exact + # same as a user code failure. except BaseException as err: - original_err = err + print('ERROR') + # anursery._join_procs.set() + src_err = err - # XXX: hypothetically an error could be - # raised and then a cancel signal shows up - # slightly after in which case the `else:` - # block here might not complete? For now, - # shield both. - with trio.CancelScope(shield=True): - etype = type(err) + # with trio.CancelScope(shield=True): + await anursery._handle_err(err) + raise - if etype in ( - trio.Cancelled, - KeyboardInterrupt - ) or ( - is_multi_cancelled(err) - ): - log.cancel( - f"Nursery for {current_actor().uid} " - f"was cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") + except BaseException as err: + # nursery bubble up + nurse_err = err - # cancel all subactors - await anursery.cancel() + # do not double cancel subactors + if not anursery.cancelled: + await anursery._handle_err(err) - # ria_nursery scope end - nursery checkpoint + raise - # after daemon nursery exit finally: - log.cancel(f'Waiting on remaining children {anursery._children}') - with trio.CancelScope(shield=True): - await anursery._all_children_reaped.wait() + if anursery._children: + log.cancel(f'Waiting on remaining children {anursery._children}') + with trio.CancelScope(shield=True): + await anursery._all_children_reaped.wait() + + log.cancel(f'All children complete for {anursery}') + # No errors were raised while awaiting ".run_in_actor()" # actors but those actors may have returned remote errors as # results (meaning they errored remotely and have relayed # those errors back to this parent actor). The errors are # collected in ``errors`` so cancel all actors, summarize # all errors and re-raise. + + if src_err and src_err not in errors.values(): + errors[actor.uid] = src_err + if errors: if anursery._children: raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!") @@ -306,8 +349,8 @@ async def _open_and_supervise_one_cancels_all_nursery( log.cancel(f'{anursery} terminated gracefully') # XXX" honestly no idea why this is needed but sure.. - if isinstance(original_err, KeyboardInterrupt) and anursery.cancelled: - raise original_err + if isinstance(src_err, KeyboardInterrupt) and anursery.cancelled: + raise src_err @asynccontextmanager