forked from goodboy/tractor
1
0
Fork 0

Let `ActorNursery` choose whether to raise remote errors

- Don't raise inside `result_from_portal()` and instead return a flag that
  indicates whether the error was remote or not.
- Stick the soft reap sequence outside a `finally:`.
- do error tracking in `ActorNursery._handle_err() -> bool:` to avoid duplicate
  raises on close.
- add `ActorNursery.cancel_called: bool`
- accept a cancelled soft reap and toss in some logging for now to begin
  figuring out races with the spawner nursery vs. the enter block being
  the source of an error that causes actor nursery cancellation.
- cancel the spawn nursery if all procs complete but the nursery hasn't
  been closed (pretty sure this isn't correct nor working.. the nursery
  should always be closed in order for the join procs event to have
  arrived).
- tossed in some code for the mp backend but none of it works (or is
  tested) and needs to be rewritten like the trio spawner likely.

  All still very WIP in case that wasn't clear XD
zombie_lord_infinite
Tyler Goodlet 2021-10-12 11:38:19 -04:00
parent 5eb7c4c857
commit 348423ece7
2 changed files with 358 additions and 222 deletions

View File

@ -96,6 +96,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
async def result_from_portal(
portal: Portal,
actor: Actor,
@ -103,7 +104,7 @@ async def result_from_portal(
cancel_on_result: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
) -> tuple[Optional[Any], Optional[BaseException]]:
"""
Cancel actor gracefully once it's "main" portal's
result arrives.
@ -111,9 +112,11 @@ async def result_from_portal(
Should only be called for actors spawned with `run_in_actor()`.
"""
__tracebackhide__ = True
# __tracebackhide__ = True
uid = portal.channel.uid
remote_result = None
is_remote_result = None
# cancel control is explicityl done by the caller
with trio.CancelScope() as cs:
@ -129,45 +132,37 @@ async def result_from_portal(
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
result = await portal.result()
is_remote_result = True
log.info(f"Returning final result: {result}")
except RemoteActorError as rerr:
# this includes real remote errors as well as
# `ContextCancelled`
is_remote_result = True
result = rerr
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
result = err
errors[actor.uid] = err
raise
except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
is_remote_result = False
result = err
# errors[actor.uid] = err
# raise
if cancel_on_result:
if isinstance(result, Exception):
# errors[actor.uid] = result
log.warning(
f"Cancelling single-task-run {uid} after error {result}"
)
# raise result
if cs.cancelled_caught:
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
else:
log.runtime(
f"Cancelling {uid} gracefully "
f"after one-time-task result {result}")
return result, is_remote_result
# an actor that was `.run_in_actor()` executes a single task
# and delivers the result, then we cancel it.
# TODO: likely in the future we should just implement this using
# the new `open_context()` IPC api, since it's the more general
# api and can represent this form.
# XXX: do we need this?
# await maybe_wait_for_debugger()
await portal.cancel_actor()
# except trio.Cancelled as err:
# # lol, of course we need this too ;P
# # TODO: merge with above?
# log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
# result = err
# # errors[actor.uid] = err
# raise
return result
# return result
async def do_hard_kill(
@ -209,17 +204,17 @@ async def reap_proc(
proc: trio.Process,
uid: tuple[str, str],
terminate_after: float = float('inf'),
terminate_after: Optional[float] = None,
hard_kill_after: int = 0.1,
) -> None:
with trio.move_on_after(terminate_after) as cs:
with trio.move_on_after(terminate_after or float('inf')) as cs:
# Wait for proc termination but **dont' yet** do
# any out-of-ipc-land termination / process
# killing. This is a "light" (cancellable) join,
# the hard join is below after timeout
await proc.wait()
log.info(f'{uid} terminated gracefully')
log.info(f'Proc for {uid} terminated gracefully')
if cs.cancelled_caught and terminate_after is not float('inf'):
# Always "hard" join lingering sub procs since no
@ -248,6 +243,7 @@ async def new_proc(
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
graceful_kill_timeout: int = 3,
@ -357,6 +353,7 @@ async def new_proc(
) as cerr:
log.exception(f'Relaying unexpected {cerr} to nursery')
await breakpoint()
# sending IPC-msg level cancel requests is expected to be
# managed by the nursery.
@ -373,161 +370,230 @@ async def new_proc(
# True, # cancel_on_result
)
finally:
# 2 cases:
# - actor nursery was cancelled in which case
# we want to try a soft reap of the actor via
# ipc cancellation and then failing that do a hard
# reap.
# - this is normal termination and we must wait indefinitely
# for ria and daemon actors
reaping_cancelled: bool = False
ria = portal in actor_nursery._cancel_after_result_on_exit
# Graceful reap attempt - 2 cases:
# - actor nursery was cancelled in which case
# we want to try a soft reap of the actor via
# ipc cancellation and then failing that do a hard
# reap.
# - this is normal termination and we must wait indefinitely
# for ria to return and daemon actors to be cancelled
reaping_cancelled: bool = False
ria = portal in actor_nursery._cancel_after_result_on_exit
result = None
# this is the soft reap sequence. we can
# either collect results:
# - ria actors get them them via ``Portal.result()``
# - we wait forever on daemon actors until they're
# cancelled by user code via ``Portal.cancel_actor()``
# or ``ActorNursery.cancel(). in the latter case
# we have to expect another cancel here since
# the task spawning nurseries will both be cacelled
# by ``ActorNursery.cancel()``.
# this is the soft reap sequence. we can
# either collect results:
# - ria actors get them them via ``Portal.result()``
# - we wait forever on daemon actors until they're
# cancelled by user code via ``Portal.cancel_actor()``
# or ``ActorNursery.cancel(). in the latter case
# we have to expect another cancel here since
# the task spawning nurseries will both be cacelled
# by ``ActorNursery.cancel()``.
# OR, we're cancelled while collecting results, which
# case we need to try another soft cancel and reap attempt.
try:
log.cancel(f'Starting soft actor reap for {uid}')
cancel_scope = None
# async with trio.open_nursery() as nursery:
# OR, we're cancelled while collecting results, which
# case we need to try another soft cancel and reap attempt.
try:
log.cancel(f'Starting soft actor reap for {uid}')
cancel_scope = None
reap_timeout = None
if portal.channel.connected() and ria:
if portal.channel.connected() and ria:
# we wait for result and cancel on completion
# if uid[0] == 'odds':
# await breakpoint()
await result_from_portal(
portal,
subactor,
errors,
True, # cancel_on_result
)
# # collect any expected ``.run_in_actor()`` results
# cancel_scope = await nursery.start(
# result_from_portal,
# portal,
# subactor,
# errors,
# True, # cancel_on_result
# )
result, is_remote = await result_from_portal(
portal,
subactor,
errors,
# True, # cancel_on_result
)
if is_remote:
if isinstance(result, RemoteActorError):
# errors[actor.uid] = result
if (
portal.cancel_called and
isinstance(result, ContextCancelled)
):
log.cancel(f'{uid} received expected cancel')
errors[uid] = result
# soft & cancellable
await reap_proc(proc, uid)
# fall through to below soft proc reap
reap_timeout = 0.5
# # if proc terminates before portal result
# if cancel_scope:
# cancel_scope.cancel()
except (
ContextCancelled,
) as err:
if portal.cancel_called:
log.cancel('{uid} received expected cancel')
else:
log.warning(
f"Cancelling single-task-run {uid} after remote error {result}"
)
# soft & cancellable
await reap_proc(proc, uid, terminate_after=0.1)
# likely a real remote error propagation
# so pass up to nursery strat
should_raise = await actor_nursery._handle_err(
result,
portal=portal,
)
except (
RemoteActorError,
) as err:
reaping_cancelled = err
log.exception(f'{uid} remote error')
await actor_nursery._handle_err(err, portal=portal)
# propagate up to spawn nursery to be
# grouped into any multierror.
# if should_raise:
# raise result
else:
log.runtime(
f"Cancelling {uid} gracefully "
f"after one-time-task result {result}")
# an actor that was `.run_in_actor()` executes a single task
# and delivers the result, then we cancel it.
# TODO: likely in the future we should just implement this using
# the new `open_context()` IPC api, since it's the more general
# api and can represent this form.
# XXX: do we need this?
# await maybe_wait_for_debugger()
await portal.cancel_actor()
except (
trio.Cancelled,
) as err:
reaping_cancelled = err
if actor_nursery.cancelled:
log.cancel(f'{uid} wait cancelled by nursery')
else:
log.exception(f'{uid} soft wait error?')
log.exception(
f"Cancelling single-task-run {uid} after local error"
)
raise result
except (
BaseException
) as err:
reaping_cancelled = err
log.exception(f'{uid} soft reap local error')
# soft & cancellable
await reap_proc(proc, uid, terminate_after=reap_timeout)
finally:
if reaping_cancelled:
if actor_nursery.cancelled:
log.cancel(f'Nursery cancelled during soft wait for {uid}')
# except (
# ContextCancelled,
# ) as err:
# if portal.cancel_called:
# log.cancel('{uid} received expected cancel')
# # soft & cancellable
# await reap_proc(proc, uid, terminate_after=0.1)
# except (
# RemoteActorError,
# ) as err:
# reaping_cancelled = err
# log.exception(f'{uid} remote error')
# await actor_nursery._handle_err(err, portal=portal)
except (
trio.Cancelled,
) as err:
# NOTE: for now we pack the cancelleds and expect the actor
# nursery to re-raise them in a multierror but we could
# have also let them bubble up through the spawn nursery.
# in theory it's more correct to raise any
# `ContextCancelled` errors we get back from the
# `Portal.cancel_actor()` call and in that error
# have meta-data about whether we timeout out or
# actually got a cancel message back from the remote task.
# IF INSTEAD we raised *here* then this logic has to be
# handled inside the oca supervisor block and the spawn_n
# task cancelleds would have to be replaced with the remote
# task `ContextCancelled`s, *if* they ever arrive.
errors[uid] = err
# with trio.CancelScope(shield=True):
# await breakpoint()
if actor_nursery.cancel_called:
log.cancel(f'{uid} soft reap cancelled by nursery')
else:
if not actor_nursery._spawn_n.cancel_scope.cancel_called:
# this would be pretty weird and unexpected
await breakpoint()
# actor nursery wasn't cancelled before the spawn
# nursery was which likely means that there was
# an error in the actor nursery enter and the
# spawn nursery cancellation "beat" the call to
# .cancel()? that's a bug right?
# saw this with settings bugs in the ordermode pane in
# piker.
log.exception(f'{uid} soft wait error?')
raise RuntimeError(
'Task spawn nursery cancelled before actor nursery?')
finally:
if reaping_cancelled:
assert actor_nursery.cancel_called
if actor_nursery.cancelled:
log.cancel(f'Nursery cancelled during soft wait for {uid}')
with trio.CancelScope(shield=True):
await maybe_wait_for_debugger()
# XXX: we should probably just
# check for a `ContextCancelled` on portals
# here and fill them in over `trio.Cancelled` right?
# hard reap sequence with timeouts
if proc.poll() is None:
log.cancel(f'Attempting hard reap for {uid}')
with trio.CancelScope(shield=True):
await maybe_wait_for_debugger()
# XXX: can't do this, it'll hang some tests.. no
# idea why yet.
# with trio.CancelScope(shield=True):
# await actor_nursery._handle_err(
# reaping_cancelled,
# portal=portal
# )
# hard reap sequence
# ``Portal.cancel_actor()`` is expected to have
# been called by the supervising nursery so we
# do **not** call it here.
# hard reap sequence with timeouts
if proc.poll() is None:
log.cancel(f'Attempting hard reap for {uid}')
with trio.CancelScope(shield=True):
# hard reap sequence
# ``Portal.cancel_actor()`` is expected to have
# been called by the supervising nursery so we
# do **not** call it here.
await reap_proc(
proc,
uid,
# this is the same as previous timeout
# setting before rewriting this spawn
# section
terminate_after=3,
)
# if somehow the hard reap didn't collect the child then
# we send in the big gunz.
while proc.poll() is None:
log.critical(
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
f'{proc}'
await reap_proc(
proc,
uid,
# this is the same as previous timeout
# setting before rewriting this spawn
# section
terminate_after=3,
)
with trio.CancelScope(shield=True):
await reap_proc(
proc,
uid,
terminate_after=0.1,
)
log.info(f"Joined {proc}")
# 2 cases:
# - the actor terminated gracefully
# - we're cancelled and likely need to re-raise
# if somehow the hard reap didn't collect the child then
# we send in the big gunz.
while proc.poll() is None:
log.critical(
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
f'{proc}'
)
with trio.CancelScope(shield=True):
await reap_proc(
proc,
uid,
terminate_after=0.1,
)
# pop child entry to indicate we no longer managing this
# subactor
subactor, proc, portal = actor_nursery._children.pop(
subactor.uid)
if not actor_nursery._children:
log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
log.info(f"Joined {proc}")
# not entirely sure why we need this.. but without it
# the reaping cancelled error is never reported upwards
# to the spawn nursery?
if reaping_cancelled:
raise reaping_cancelled
# 2 cases:
# - the actor terminated gracefully
# - we're cancelled and likely need to re-raise
# pop child entry to indicate we no longer managing this
# subactor
subactor, proc, portal = actor_nursery._children.pop(
subactor.uid)
if not actor_nursery._children:
# all subactor children have completed
log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
spawn_n = actor_nursery._spawn_n
# with trio.CancelScope(shield=True):
# await breakpoint()
if not spawn_n._closed:
# the parent task that opened the actor nursery
# hasn't yet closed it so we cancel that task now.
spawn_n.cancel_scope.cancel()
# not entirely sure why we need this.. but without it
# the reaping cancelled error is never reported upwards
# to the spawn nursery?
# if reaping_cancelled:
# raise reaping_cancelled
else:
# `multiprocessing`
@ -644,7 +710,8 @@ async def mp_new_proc(
# no shield is required here (vs. above on the trio backend)
# since debug mode is not supported on mp.
await actor_nursery._join_procs.wait()
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
finally:
# XXX: in the case we were cancelled before the sub-proc
@ -659,13 +726,23 @@ async def mp_new_proc(
try:
# async with trio.open_nursery() as n:
# n.cancel_scope.shield = True
cancel_scope = await nursery.start(
result_from_portal,
print('soft mp reap')
# cancel_scope = await nursery.start(
result = await result_from_portal(
portal,
subactor,
errors
errors,
# True,
)
except trio.Cancelled as err:
# except trio.Cancelled as err:
except BaseException as err:
log.exception('hard mp reap')
with trio.CancelScope(shield=True):
await actor_nursery._handle_err(err, portal=portal)
print('sent to nursery')
cancel_exc = err
# if the reaping task was cancelled we may have hit
@ -675,31 +752,43 @@ async def mp_new_proc(
reaping_cancelled = True
if proc.is_alive():
with trio.move_on_after(0.1) as cs:
cs.shield = True
await proc_waiter(proc)
with trio.CancelScope(shield=True):
print('hard reaping')
with trio.move_on_after(0.1) as cs:
cs.shield = True
await proc_waiter(proc)
if cs.cancelled_caught:
print('pwning mp proc')
proc.terminate()
finally:
if not reaping_cancelled and proc.is_alive():
await proc_waiter(proc)
# if not reaping_cancelled and proc.is_alive():
# await proc_waiter(proc)
# TODO: timeout block here?
proc.join()
# TODO: timeout block here?
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
log.debug(f"Joined {proc}")
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()
# pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
elif reaping_cancelled: # let the cancellation bubble up
assert cancel_exc
raise cancel_exc
if not actor_nursery._children:
# all subactor children have completed
# log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()
if reaping_cancelled: # let the cancellation bubble up
print('raising')
assert cancel_exc
raise cancel_exc

View File

@ -12,7 +12,7 @@ import trio
from async_generator import asynccontextmanager
from . import _debug
from ._debug import maybe_wait_for_debugger
from ._debug import maybe_wait_for_debugger, breakpoint
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
@ -48,10 +48,19 @@ class ActorNursery:
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
self._cancel_called: bool = False
self._join_procs = trio.Event()
self._all_children_reaped = trio.Event()
self.errors = errors
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def start_actor(
self,
name: str,
@ -177,17 +186,22 @@ class ActorNursery:
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
self.cancelled = True
"""
# entries may be poppsed by the spawning backend as
# actors cancel individually
childs = self._children.copy()
if self.cancel_called:
log.warning(
f'Nursery with children {len(childs)} already cancelled')
return
log.cancel(
f'Cancelling nursery in {self._actor.uid} with children\n'
f'{childs.keys()}'
)
self._cancel_called = True
# wake up all spawn tasks to move on as those nursery
# has ``__aexit__()``-ed
@ -196,44 +210,69 @@ class ActorNursery:
await maybe_wait_for_debugger()
# one-cancels-all strat
async with trio.open_nursery() as cancel_sender:
for subactor, proc, portal in childs.values():
if proc.poll() is None and not portal.cancel_called:
cancel_sender.start_soon(portal.cancel_actor)
try:
async with trio.open_nursery() as cancel_sender:
for subactor, proc, portal in childs.values():
if not portal.cancel_called and portal.channel.connected():
cancel_sender.start_soon(portal.cancel_actor)
except trio.MultiError as err:
_err = err
log.exception(f'{self} errors during cancel')
# await breakpoint()
# # LOL, ok so multiprocessing requires this for some reason..
# with trio.CancelScope(shield=True):
# await trio.lowlevel.checkpoint()
# cancel all spawner tasks
# self._spawn_n.cancel_scope.cancel()
self.cancelled = True
async def _handle_err(
self,
err: BaseException,
portal: Optional[Portal] = None,
is_ctx_error: bool = False,
) -> None:
) -> bool:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if is_ctx_error:
assert not portal
uid = self._actor.uid
else:
uid = portal.channel.uid
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
if err not in self.errors.values():
self.errors[uid] = err
# cancel all subactors
await self.cancel()
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.error(
f"Nursery for {current_actor().uid} "
f"errored from {uid} with\n{err}")
# cancel all subactors
await self.cancel()
return True
log.warning(f'Skipping duplicate error for {uid}')
return False
@asynccontextmanager
@ -251,6 +290,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
src_err: Optional[BaseException] = None
nurse_err: Optional[BaseException] = None
# errors from this daemon actor nursery bubble up to caller
try:
@ -303,9 +343,16 @@ async def _open_and_supervise_one_cancels_all_nursery(
src_err = err
# with trio.CancelScope(shield=True):
await anursery._handle_err(err)
raise
should_raise = await anursery._handle_err(err, is_ctx_error=True)
# XXX: raising here causes some cancellation
# / multierror tests to fail because of what appears to
# be double raise? we probably need to see how `trio`
# does this case..
if should_raise:
raise
# except trio.MultiError as err:
except BaseException as err:
# nursery bubble up
nurse_err = err
@ -331,15 +378,15 @@ async def _open_and_supervise_one_cancels_all_nursery(
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if src_err and src_err not in errors.values():
errors[actor.uid] = src_err
# await breakpoint()
if errors:
# if nurse_err or src_err:
if anursery._children:
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
# with trio.CancelScope(shield=True):
# await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))