Re-route errors from spawn tasks and mngr task to handler

zombie_lord_infinite
Tyler Goodlet 2021-10-10 11:54:19 -04:00
parent 0488f5e57e
commit 8a59713d48
1 changed files with 107 additions and 64 deletions

View File

@ -34,14 +34,12 @@ class ActorNursery:
def __init__( def __init__(
self, self,
actor: Actor, actor: Actor,
ria_nursery: trio.Nursery, spawn_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: Dict[Tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
self._ria_nursery = ria_nursery self._spawn_n = spawn_nursery
self._da_nursery = da_nursery
self._children: Dict[ self._children: Dict[
Tuple[str, str], Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]] Tuple[Actor, mp.Process, Optional[Portal]]
@ -99,7 +97,7 @@ class ActorNursery:
# start a task to spawn a process # start a task to spawn a process
# blocks until process has been started and a portal setup # blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery nursery = nursery or self._spawn_n
# XXX: the type ignore is actually due to a `mypy` bug # XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore return await nursery.start( # type: ignore
@ -149,7 +147,7 @@ class ActorNursery:
bind_addr=bind_addr, bind_addr=bind_addr,
loglevel=loglevel, loglevel=loglevel,
# use the run_in_actor nursery # use the run_in_actor nursery
nursery=self._ria_nursery, nursery=self._spawn_n,
infect_asyncio=infect_asyncio, infect_asyncio=infect_asyncio,
) )
@ -182,19 +180,59 @@ class ActorNursery:
""" """
self.cancelled = True self.cancelled = True
childs = tuple(self._children.keys()) # entries may be poppsed by the spawning backend as
# actors cancel individually
childs = self._children.copy()
log.cancel( log.cancel(
f"Cancelling nursery in {self._actor.uid} with children\n{childs}" f'Cancelling nursery in {self._actor.uid} with children\n'
f'{childs.keys()}'
) )
# wake up all spawn tasks to move on as those nursery
# has ``__aexit__()``-ed
self._join_procs.set()
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
# wake up all spawn tasks # one-cancels-all strat
self._join_procs.set() async with trio.open_nursery() as cancel_sender:
for subactor, proc, portal in childs.values():
cancel_sender.start_soon(portal.cancel_actor)
# cancel all spawner nurseries # cancel all spawner tasks
self._ria_nursery.cancel_scope.cancel() # self._spawn_n.cancel_scope.cancel()
self._da_nursery.cancel_scope.cancel()
async def _handle_err(
self,
err: BaseException,
portal: Optional[Portal] = None,
) -> None:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors
await self.cancel()
@asynccontextmanager @asynccontextmanager
@ -211,11 +249,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
# a supervisor strategy **before** blocking indefinitely to wait for # a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using # actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``). # ``ActorNursery.start_actor()``).
original_err = None src_err: Optional[BaseException] = None
# errors from this daemon actor nursery bubble up to caller # errors from this daemon actor nursery bubble up to caller
try: try:
async with trio.open_nursery() as da_nursery: async with trio.open_nursery() as spawn_n:
# try: # try:
# This is the inner level "run in actor" nursery. It is # This is the inner level "run in actor" nursery. It is
@ -226,71 +264,76 @@ async def _open_and_supervise_one_cancels_all_nursery(
# immediately raised for handling by a supervisor strategy. # immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards # As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified. # the above "daemon actor" nursery will be notified.
anursery = ActorNursery(
actor,
spawn_n,
errors
)
# spawning of actors happens in the caller's scope
# after we yield upwards
try: try:
async with trio.open_nursery() as ria_nursery: yield anursery
anursery = ActorNursery( log.runtime(
actor, f"Waiting on subactors {anursery._children} "
ria_nursery, "to complete"
da_nursery, )
errors
)
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.runtime( # signal all process monitor tasks to conduct
f"Waiting on subactors {anursery._children} " # hard join phase.
"to complete" # await maybe_wait_for_debugger()
) # log.error('joing trigger NORMAL')
anursery._join_procs.set()
# signal all process monitor tasks to conduct # NOTE: there are 2 cases for error propagation:
# hard join phase. # - an actor which is ``.run_in_actor()`` invoked
# await maybe_wait_for_debugger() # runs a single task and reports the error upwards
# log.error('joing trigger NORMAL') # - the top level task which opened this nursery (in the
anursery._join_procs.set() # parent actor) raises. In this case the raise can come
# from a variety of places:
# - user task code unrelated to the nursery/child actors
# - a ``RemoteActorError`` propagated up through the
# portal api from a child actor which will look the exact
# same as a user code failure.
except BaseException as err: except BaseException as err:
original_err = err print('ERROR')
# anursery._join_procs.set()
src_err = err
# XXX: hypothetically an error could be # with trio.CancelScope(shield=True):
# raised and then a cancel signal shows up await anursery._handle_err(err)
# slightly after in which case the `else:` raise
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in ( except BaseException as err:
trio.Cancelled, # nursery bubble up
KeyboardInterrupt nurse_err = err
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors # do not double cancel subactors
await anursery.cancel() if not anursery.cancelled:
await anursery._handle_err(err)
# ria_nursery scope end - nursery checkpoint raise
# after daemon nursery exit
finally: finally:
log.cancel(f'Waiting on remaining children {anursery._children}') if anursery._children:
with trio.CancelScope(shield=True): log.cancel(f'Waiting on remaining children {anursery._children}')
await anursery._all_children_reaped.wait() with trio.CancelScope(shield=True):
await anursery._all_children_reaped.wait()
log.cancel(f'All children complete for {anursery}')
# No errors were raised while awaiting ".run_in_actor()" # No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as # actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed # results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are # those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize # collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise. # all errors and re-raise.
if src_err and src_err not in errors.values():
errors[actor.uid] = src_err
if errors: if errors:
if anursery._children: if anursery._children:
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!") raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
@ -306,8 +349,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
log.cancel(f'{anursery} terminated gracefully') log.cancel(f'{anursery} terminated gracefully')
# XXX" honestly no idea why this is needed but sure.. # XXX" honestly no idea why this is needed but sure..
if isinstance(original_err, KeyboardInterrupt) and anursery.cancelled: if isinstance(src_err, KeyboardInterrupt) and anursery.cancelled:
raise original_err raise src_err
@asynccontextmanager @asynccontextmanager