From 97b79064f5f952c56732a1299b2fd342805312cc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Jan 2022 09:50:37 -0500 Subject: [PATCH] Attempt to collect tardy errors after proc deth In an effort to support `.run_in_actor()` error raising by our nursery we ideally collect as many child errors as possible during nursery teardown and error collection/propagation. Here we try a couple things, - factor the per-actor error y retrieval into a new `pack_and_report_errors()` - when a result retrieval via `exhaust_portal()` is cancelled pack the `trio.Cancelled` into the `errors: dict` expecting to rescan for errors for any such entries after process termination. - at the end of the spawn task conduct a timed-out 2nd retrieval of any late delivered error from the child task for each entry in `errors` containing a cancelled. This causes a bunch of cancellation tests to still fail seemingly due to the race case where the OCA nursery may have requested cancellation of children *before* they can remote-error and thus the `MultiError` matching expectations aren't going to (always) be correct. Previously we were always waiting for all `.run_in_actor()` results to arrive and **not** raising any errors early (which in turn triggers local cancellation). --- tractor/_spawn.py | 96 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 24 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 08d3afa..fa7137f 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -57,7 +57,7 @@ from .log import get_logger from ._portal import Portal from ._actor import Actor from ._entry import _mp_main -from ._exceptions import ActorFailure +from ._exceptions import ActorFailure, NoResult log = get_logger('tractor') @@ -136,7 +136,10 @@ async def exhaust_portal( # always be established and shutdown using a context manager api final = await portal.result() - except (Exception, trio.MultiError) as err: + except ( + Exception, + trio.MultiError + ) as err: # we reraise in the parent task via a ``trio.MultiError`` return err @@ -151,36 +154,56 @@ async def exhaust_portal( return final +async def pack_and_report_errors( + portal: Portal, + subactor: Actor, + errors: Dict[Tuple[str, str], Exception], + +) -> Any: + + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, subactor) + + uid = portal.channel.uid + if ( + isinstance(result, Exception) + # or isinstance(result, trio.MultiError) + ): + errors[subactor.uid] = result + log.warning(f"{uid} received remote error:\n{result}") + raise result + + elif isinstance(result, trio.Cancelled): + errors[subactor.uid] = result + log.runtime(f"{uid} was cancelled before result") + + else: + log.runtime( f"{uid} received final result:\n{result}") + + return result + + async def cancel_on_completion( portal: Portal, - actor: Actor, + subactor: Actor, errors: Dict[Tuple[str, str], Exception], ) -> None: ''' - Cancel actor gracefully once it's "main" portal's + Cancel subactor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. ''' - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - - if isinstance(result, Exception): - errors[actor.uid] = result - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - raise result - - else: - log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") + await pack_and_report_errors( + portal, + subactor, + errors, + ) # cancel the process now that we have a final result await portal.cancel_actor() @@ -348,8 +371,9 @@ async def new_proc( with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait() + cancel_on_complete = portal in actor_nursery._cancel_after_result_on_exit async with trio.open_nursery() as nursery: - if portal in actor_nursery._cancel_after_result_on_exit: + if cancel_on_complete: nursery.start_soon( cancel_on_completion, portal, @@ -373,6 +397,11 @@ async def new_proc( f"{subactor.uid}") nursery.cancel_scope.cancel() + # if errors: + # log.warning( + # f'Remote errors retreived from child: {subactor.uid}') + # actor_nursery._ria_nursery.cancel_scope.cancel() + finally: # The "hard" reap since no actor zombies are allowed! # XXX: do this **after** cancellation/tearfown to avoid @@ -402,11 +431,30 @@ async def new_proc( else: log.warning('Nursery cancelled before sub-proc started') + uid = subactor.uid if not cancelled_during_spawn: - # pop child entry to indicate we no longer managing this - # subactor - actor_nursery._children.pop(subactor.uid) + subactor, _, portal = actor_nursery._children.pop(uid) + # check for a late delivery of an error from + # the target remote task and overwrite any cancel + # that was captured as part of teardown. + if cancel_on_complete: + error = errors.get(uid) + if type(error) is trio.Cancelled: + # actor was cancelled before it's final result was + # retreived so check now for any result and pack as + # an error to be raised in the surrounding + # nursery's multierror handling. + errors.pop(uid) + with trio.move_on_after(0.001) as cs: + cs.shield = True + err = await pack_and_report_errors( + portal, + subactor, + errors, + ) + if type(err) is trio.Cancelled: + errors.pop(uid) else: # `multiprocessing` # async with trio.open_nursery() as nursery: