From b3d348ee6ae4e71639fdfa7e66a1df73c7e69f3d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Aug 2025 12:56:17 -0400 Subject: [PATCH] WIP, actor-nursery non-graceful-cancel raises EG Attempting a rework of the post-cancellation "raising semantics" such that subactors which are `ActorCancelled` as a result of a non-graceful in-scope error, are acked via a re-raised `ExceptionGroup[ActorCancelled*N, Exception]` *outside the an-block*. Eventually, the idea is to have `ActorCancelled` be relayed from each subactor in response to any `Actor.cancel()/Portal.cancel_actor()` request much like `Context.cancel()/ContextCancelled`. This is a WIP bc it does break a few tests and requires related `_spawn`-mod-machinery changes to match some of which I'm not yet sure are required; need to dig into to the details of the currently failing suites first. `._supervise` patch deats, - add `ActorNursery.maybe_error` which delivers the maybe-EG or `._scope_error` depending on `.errors` (now `._errors`, a mapping from `Aid`-keys) has entries seet for subs. - raise ^ if non-null in a new outer-`finally` in `_open_and_supervise_one_cancels_all_nursery()`; an "outer" block is added to ensure all sub-actor-excs are emited/captured as part of `ActorNursery.cancel()` being called (as prior) as well as the `da_nursery` being explicitly cancelled alongside it (to unblock the tn-block, but still not sure why this is necessary yet?..). - (now masked) tried injecting actorcs from `.cancel()` loop, but (again per more explanation in section below) seems to be suffering a race issue with RAE relay? - left in buncha notes obvi for all this.. `._spawn` patch deats, - as above, expect `errors: dict` to map from `Aid`-keys. - pass `errors: dict` into `soft_kill()` since it seemed like we'd want to (for now) inject `ActoreCancelled` in some cases (but now i'm not sure XD). - tried out a couple spots (which are now masked) to inject `ActorCancelled` after calling `Portal.cancel()` in various subactor-supervision routines whenev an RAE is not set.. - oddly seems to be overwriting actual errors (likely due to racing with RAE receive and/or actorc-request timeout?) despite the guard logic..which clearly doesn't resolve the issue.. - buncha `tn`-style renaming. --- tractor/_spawn.py | 116 ++++++++--- tractor/_supervise.py | 439 ++++++++++++++++++++++++++---------------- 2 files changed, 365 insertions(+), 190 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 8d3c2cf6..ec601d8d 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -50,7 +50,11 @@ from tractor._addr import UnwrappedAddress from tractor._portal import Portal from tractor._runtime import Actor from tractor._entry import _mp_main -from tractor._exceptions import ActorFailure +from tractor._exceptions import ( + ActorCancelled, + ActorFailure, + # NoResult, +) from tractor.msg import ( types as msgtypes, pretty_struct, @@ -137,7 +141,6 @@ def try_set_start_method( async def exhaust_portal( - portal: Portal, actor: Actor @@ -185,10 +188,12 @@ async def exhaust_portal( async def cancel_on_completion( - portal: Portal, actor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], ) -> None: ''' @@ -209,24 +214,57 @@ async def cancel_on_completion( portal, actor, ) + aid: msgtypes.Aid = actor.aid + repr_aid: str = aid.reprol(sin_uuid=False) + if isinstance(result, Exception): - errors[actor.uid]: Exception = result + errors[aid]: Exception = result log.cancel( - 'Cancelling subactor runtime due to error:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' - f'error: {result}\n' + 'Cancelling subactor {repr_aid!r} runtime due to error\n' + f'\n' + f'Portal.cancel_actor() => {portal.channel.uid}\n' + f'\n' + f'{result!r}\n' ) else: - log.runtime( - 'Cancelling subactor gracefully:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' - f'result: {result}\n' + report: str = ( + f'Cancelling subactor {repr_aid!r} gracefully..\n' + f'\n' + ) + canc_info: str = ( + f'Portal.cancel_actor() => {portal.chan.uid}\n' + f'\n' + f'final-result => {result!r}\n' + ) + log.cancel( + report + + + canc_info ) # cancel the process now that we have a final result await portal.cancel_actor() + if ( + not errors.get(aid) + # and + # result is NoResult + ): + pass + # await debug.pause(shield=True) + + # errors[aid] = ActorCancelled( + # message=( + # f'Cancelled subactor {repr_aid!r}\n' + # f'{canc_info}\n' + # ), + # canceller=current_actor().aid, + # # TODO? should we have a ack-msg? + # # ipc_msg=?? + # # boxed_type=trio.Cancelled, + # ) + async def hard_kill( proc: trio.Process, @@ -331,6 +369,10 @@ async def soft_kill( Awaitable, ], portal: Portal, + errors: dict[ + msgtypes.Aid, + Exception, + ], ) -> None: ''' @@ -374,8 +416,8 @@ async def soft_kill( # below. This means we try to do a graceful teardown # via sending a cancel message before getting out # zombie killing tools. - async with trio.open_nursery() as n: - n.cancel_scope.shield = True + async with trio.open_nursery() as tn: + tn.cancel_scope.shield = True async def cancel_on_proc_deth(): ''' @@ -385,24 +427,35 @@ async def soft_kill( ''' await wait_func(proc) - n.cancel_scope.cancel() + tn.cancel_scope.cancel() # start a task to wait on the termination of the # process by itself waiting on a (caller provided) wait # function which should unblock when the target process # has terminated. - n.start_soon(cancel_on_proc_deth) + tn.start_soon(cancel_on_proc_deth) # send the actor-runtime a cancel request. await portal.cancel_actor() + # if not errors.get(peer_aid): + # errors[peer_aid] = ActorCancelled( + # message=( + # 'Sub-actor cancelled gracefully by parent\n' + # ), + # canceller=current_actor().aid, + # # TODO? should we have a ack-msg? + # # ipc_msg=?? + # # boxed_type=trio.Cancelled, + # ) + if proc.poll() is None: # type: ignore log.warning( 'Subactor still alive after cancel request?\n\n' f'uid: {peer_aid}\n' f'|_{proc}\n' ) - n.cancel_scope.cancel() + tn.cancel_scope.cancel() raise @@ -410,7 +463,10 @@ async def new_proc( name: str, actor_nursery: ActorNursery, subactor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], # passed through to actor main bind_addrs: list[UnwrappedAddress], @@ -449,7 +505,10 @@ async def trio_proc( name: str, actor_nursery: ActorNursery, subactor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], # passed through to actor main bind_addrs: list[UnwrappedAddress], @@ -572,9 +631,9 @@ async def trio_proc( with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait() - async with trio.open_nursery() as nursery: + async with trio.open_nursery() as ptl_reaper_tn: if portal in actor_nursery._cancel_after_result_on_exit: - nursery.start_soon( + ptl_reaper_tn.start_soon( cancel_on_completion, portal, subactor, @@ -587,7 +646,8 @@ async def trio_proc( await soft_kill( proc, trio.Process.wait, # XXX, uses `pidfd_open()` below. - portal + portal, + errors, ) # cancel result waiter that may have been spawned in @@ -596,7 +656,7 @@ async def trio_proc( 'Cancelling portal result reaper task\n' f'c)> {subactor.aid.reprol()!r}\n' ) - nursery.cancel_scope.cancel() + ptl_reaper_tn.cancel_scope.cancel() finally: # XXX NOTE XXX: The "hard" reap since no actor zombies are @@ -669,7 +729,10 @@ async def mp_proc( name: str, actor_nursery: ActorNursery, # type: ignore # noqa subactor: Actor, - errors: dict[tuple[str, str], Exception], + errors: dict[ + msgtypes.Aid, + Exception, + ], # passed through to actor main bind_addrs: list[UnwrappedAddress], parent_addr: UnwrappedAddress, @@ -794,7 +857,7 @@ async def mp_proc( cancel_on_completion, portal, subactor, - errors + errors, ) # This is a "soft" (cancellable) join/reap which @@ -803,7 +866,8 @@ async def mp_proc( await soft_kill( proc, proc_waiter, - portal + portal, + errors, ) # cancel result waiter that may have been spawned in diff --git a/tractor/_supervise.py b/tractor/_supervise.py index be89c4cb..5c943cce 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -30,6 +30,9 @@ import warnings import trio +from .msg import ( + types as msgtypes, +) from .devx import ( debug, pformat as _pformat, @@ -48,6 +51,7 @@ from .trionics import ( ) from ._exceptions import ( ContextCancelled, + ActorCancelled, ) from ._root import ( open_root_actor, @@ -99,7 +103,10 @@ class ActorNursery: actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, - errors: dict[tuple[str, str], BaseException], + errors: dict[ + msgtypes.Aid, + BaseException, + ], ) -> None: # self.supervisor = supervisor # TODO @@ -117,9 +124,11 @@ class ActorNursery: ] ] = {} + # signals when it is ok to start waiting o subactor procs + # for termination. self._join_procs = trio.Event() self._at_least_one_child_in_debug: bool = False - self.errors = errors + self._errors = errors self._scope_error: BaseException|None = None self.exited = trio.Event() @@ -260,7 +269,7 @@ class ActorNursery: name, self, subactor, - self.errors, + self._errors, bind_addrs, parent_addr, _rtv, # run time vars @@ -364,7 +373,9 @@ class ActorNursery: # then `._children`.. children: dict = self._children child_count: int = len(children) - msg: str = f'Cancelling actor nursery with {child_count} children\n' + msg: str = ( + f'Cancelling actor-nursery with {child_count} children\n' + ) server: IPCServer = self._actor.ipc_server @@ -391,7 +402,9 @@ class ActorNursery: else: if portal is None: # actor hasn't fully spawned yet - event: trio.Event = server._peer_connected[subactor.uid] + event: trio.Event = server._peer_connected[ + subactor.uid + ] log.warning( f"{subactor.uid} never 't finished spawning?" ) @@ -416,7 +429,20 @@ class ActorNursery: # spawn cancel tasks for each sub-actor assert portal if portal.channel.connected(): - tn.start_soon(portal.cancel_actor) + + async def canc_subactor(): + await portal.cancel_actor() + # aid: msgtypes.Aid = subactor.aid + # reprol: str = aid.reprol(sin_uuid=False) + # if not self._errors.get(aid): + # self._errors[aid] = ActorCancelled( + # message=( + # f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n' + # ), + # canceller=self._actor.aid, + # ) + + tn.start_soon(canc_subactor) log.cancel(msg) # if we cancelled the cancel (we hung cancelling remote actors) @@ -442,6 +468,47 @@ class ActorNursery: # mark ourselves as having (tried to have) cancelled all subactors self._join_procs.set() + @property + def maybe_error(self) -> ( + BaseException| + BaseExceptionGroup| + None + ): + ''' + Deliver any captured scope errors including those relayed + from subactors such as `ActorCancelled` during a non-graceful + cancellation scenario. + + When more then a "graceful cancel" occurrs wrap all collected + sub-exceptions in a raised `ExceptionGroup`. + + ''' + scope_exc: BaseException|None = self._scope_error + + # XXX NOTE, only pack an eg if there i at least one + # non-actorc exception received from a subactor, OR + # return `._scope_error` verbatim. + if (errors := self._errors): + # use `BaseExceptionGroup` as needed + excs: list[BaseException] = list(errors.values()) + if ( + len(excs) > 1 + and + any( + type(exc) not in {ActorCancelled,} + for exc in excs + ) + ): + return ExceptionGroup( + 'ActorNursery multi-errored with', + tuple(excs), + ) + + # raise the lone subactor exc + return list(excs)[0] + + return scope_exc + @acm async def _open_and_supervise_one_cancels_all_nursery( @@ -457,7 +524,10 @@ async def _open_and_supervise_one_cancels_all_nursery( inner_err: BaseException|None = None # the collection of errors retreived from spawned sub-actors - errors: dict[tuple[str, str], BaseException] = {} + errors: dict[ + msgtypes.Aid, + BaseException, + ] = {} # This is the outermost level "deamon actor" nursery. It is awaited # **after** the below inner "run in actor nursery". This allows for @@ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery( # `ActorNursery.start_actor()`). # errors from this daemon actor nursery bubble up to caller - async with ( - collapse_eg(), - trio.open_nursery() as da_nursery, - ): - try: - # This is the inner level "run in actor" nursery. It is - # awaited first since actors spawned in this way (using - # `ActorNusery.run_in_actor()`) are expected to only - # return a single result and then complete (i.e. be canclled - # gracefully). Errors collected from these actors are - # immediately raised for handling by a supervisor strategy. - # As such if the strategy propagates any error(s) upwards - # the above "daemon actor" nursery will be notified. - async with ( - collapse_eg(), - trio.open_nursery() as ria_nursery, - ): - an = ActorNursery( - actor, - ria_nursery, - da_nursery, - errors - ) - try: - # spawning of actors happens in the caller's scope - # after we yield upwards - yield an - - # When we didn't error in the caller's scope, - # signal all process-monitor-tasks to conduct - # the "hard join phase". - log.runtime( - 'Waiting on subactors to complete:\n' - f'>}} {len(an._children)}\n' + try: + async with ( + collapse_eg(), + trio.open_nursery() as da_nursery, + ): + try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # `ActorNusery.run_in_actor()`) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. + async with ( + collapse_eg(), + trio.open_nursery() as ria_nursery, + ): + an = ActorNursery( + actor, + ria_nursery, + da_nursery, + errors ) - an._join_procs.set() + try: + # spawning of actors happens in the caller's scope + # after we yield upwards + yield an - except BaseException as _inner_err: - inner_err = _inner_err - errors[actor.uid] = inner_err + # When we didn't error in the caller's scope, + # signal all process-monitor-tasks to conduct + # the "hard join phase". + log.runtime( + 'Waiting on subactors to complete:\n' + f'>}} {len(an._children)}\n' + ) + an._join_procs.set() - # If we error in the root but the debugger is - # engaged we don't want to prematurely kill (and - # thus clobber access to) the local tty since it - # will make the pdb repl unusable. - # Instead try to wait for pdb to be released before - # tearing down. - await debug.maybe_wait_for_debugger( - child_in_debug=an._at_least_one_child_in_debug - ) + except BaseException as _inner_err: + inner_err = _inner_err + # errors[actor.aid] = inner_err - # if the caller's scope errored then we activate our - # one-cancels-all supervisor strategy (don't - # worry more are coming). - an._join_procs.set() + # If we error in the root but the debugger is + # engaged we don't want to prematurely kill (and + # thus clobber access to) the local tty since it + # will make the pdb repl unusable. + # Instead try to wait for pdb to be released before + # tearing down. + await debug.maybe_wait_for_debugger( + child_in_debug=an._at_least_one_child_in_debug + ) - # XXX NOTE XXX: hypothetically an error could - # be raised and then a cancel signal shows up - # slightly after in which case the `else:` - # block here might not complete? For now, - # shield both. - with trio.CancelScope(shield=True): - etype: type = type(inner_err) - if etype in ( - trio.Cancelled, - KeyboardInterrupt, - ) or ( - is_multi_cancelled(inner_err) - ): - log.cancel( - f'Actor-nursery cancelled by {etype}\n\n' + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). + an._join_procs.set() - f'{current_actor().uid}\n' - f' |_{an}\n\n' + # XXX NOTE XXX: hypothetically an error could + # be raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. + with trio.CancelScope(shield=True): + etype: type = type(inner_err) + if etype in ( + trio.Cancelled, + KeyboardInterrupt, + ) or ( + is_multi_cancelled(inner_err) + ): + log.cancel( + f'Actor-nursery cancelled by {etype}\n\n' - # TODO: show tb str? - # f'{tb_str}' - ) - elif etype in { - ContextCancelled, - }: - log.cancel( - 'Actor-nursery caught remote cancellation\n' - '\n' - f'{inner_err.tb_str}' - ) - else: - log.exception( - 'Nursery errored with:\n' + f'{current_actor().uid}\n' + f' |_{an}\n\n' - # TODO: same thing as in - # `._invoke()` to compute how to - # place this div-line in the - # middle of the above msg - # content.. - # -[ ] prolly helper-func it too - # in our `.log` module.. - # '------ - ------' - ) + # TODO: show tb str? + # f'{tb_str}' + ) + elif etype in { + ContextCancelled, + }: + log.cancel( + 'Actor-nursery caught remote cancellation\n' + '\n' + f'{inner_err.tb_str}' + ) + else: + log.exception( + 'Nursery errored with:\n' - # cancel all subactors - await an.cancel() + # TODO: same thing as in + # `._invoke()` to compute how to + # place this div-line in the + # middle of the above msg + # content.. + # -[ ] prolly helper-func it too + # in our `.log` module.. + # '------ - ------' + ) - # ria_nursery scope end + # cancel all subactors + await an.cancel() - # TODO: this is the handler around the ``.run_in_actor()`` - # nursery. Ideally we can drop this entirely in the future as - # the whole ``.run_in_actor()`` API should be built "on top of" - # this lower level spawn-request-cancel "daemon actor" API where - # a local in-actor task nursery is used with one-to-one task - # + `await Portal.run()` calls and the results/errors are - # handled directly (inline) and errors by the local nursery. - except ( - Exception, - BaseExceptionGroup, - trio.Cancelled - ) as _outer_err: - outer_err = _outer_err + # ria_nursery scope end - an._scope_error = outer_err or inner_err + # TODO: this is the handler around the ``.run_in_actor()`` + # nursery. Ideally we can drop this entirely in the future as + # the whole ``.run_in_actor()`` API should be built "on top of" + # this lower level spawn-request-cancel "daemon actor" API where + # a local in-actor task nursery is used with one-to-one task + # + `await Portal.run()` calls and the results/errors are + # handled directly (inline) and errors by the local nursery. + except ( + Exception, + BaseExceptionGroup, + trio.Cancelled + ) as _outer_err: + outer_err = _outer_err - # XXX: yet another guard before allowing the cancel - # sequence in case a (single) child is in debug. - await debug.maybe_wait_for_debugger( - child_in_debug=an._at_least_one_child_in_debug - ) - - # If actor-local error was raised while waiting on - # ".run_in_actor()" actors then we also want to cancel all - # remaining sub-actors (due to our lone strategy: - # one-cancels-all). - if an._children: - log.cancel( - 'Actor-nursery cancelling due error type:\n' - f'{outer_err}\n' + # XXX: yet another guard before allowing the cancel + # sequence in case a (single) child is in debug. + await debug.maybe_wait_for_debugger( + child_in_debug=an._at_least_one_child_in_debug ) - with trio.CancelScope(shield=True): - await an.cancel() - raise - finally: - # No errors were raised while awaiting ".run_in_actor()" - # actors but those actors may have returned remote errors as - # results (meaning they errored remotely and have relayed - # those errors back to this parent actor). The errors are - # collected in ``errors`` so cancel all actors, summarize - # all errors and re-raise. - if errors: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). if an._children: + log.cancel( + 'Actor-nursery cancelling due error type:\n' + f'{outer_err}\n' + ) with trio.CancelScope(shield=True): await an.cancel() - # use `BaseExceptionGroup` as needed - if len(errors) > 1: - raise BaseExceptionGroup( - 'tractor.ActorNursery errored with', - tuple(errors.values()), - ) - else: - raise list(errors.values())[0] + raise - # show frame on any (likely) internal error - if ( - not an.cancelled - and an._scope_error - ): - __tracebackhide__: bool = False + finally: + scope_exc = an._scope_error = outer_err or inner_err + # await debug.pause(shield=True) + # if scope_exc: + # errors[actor.aid] = scope_exc - # da_nursery scope end - nursery checkpoint - # final exit + # show this frame on any internal error + if ( + not an.cancelled + and + scope_exc + ): + __tracebackhide__: bool = False + + # NOTE, it's possible no errors were raised while + # awaiting ".run_in_actor()" actors but those + # sub-actors may have delivered remote errors as + # results, normally captured via machinery in + # `._spawn.cancel_on_completion()`. + # + # Any such remote errors are collected in `an._errors` + # which is summarized via `ActorNursery.maybe_error` + # which is maybe re-raised in an outer block (below). + # + # So here we first cancel all subactors the summarize + # all errors and then later (in that outer block) + # maybe-raise on a "non-graceful" cancellation + # outcome, normally as a summary EG. + if ( + scope_exc + or + errors + ): + + if an._children: + with trio.CancelScope(shield=True): + await an.cancel() + + # cancel outer tn so we unblock outside this + # finally! + da_nursery.cance_scope.cancel() + # + # ^TODO? still don't get why needed? + # - an.cancel() should cause all spawn-subtasks + # to eventually exit? + # - also, could (instead) we sync to an event here before + # (ever) calling `an.cancel()`?? + + # `da_nursery` scope end, thus a checkpoint. + finally: + + # raise any eg compiled from all subs + # ??TODO should we also adopt strict-egs here like + # `trio.Nursery`?? + # + # XXX justification notes, + # docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups + # anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888 + # gh: https://github.com/python-trio/trio/issues/611 + if an_exc := an.maybe_error: + raise an_exc + + if scope_exc := an._scope_error: + raise scope_exc + + # @acm-fn scope exit _shutdown_msg: str = ( @@ -648,7 +754,7 @@ _shutdown_msg: str = ( # @api_frame async def open_nursery( *, # named params only! - hide_tb: bool = True, + hide_tb: bool = False, **kwargs, # ^TODO, paramspec for `open_root_actor()` @@ -684,16 +790,21 @@ async def open_nursery( # mark us for teardown on exit implicit_runtime: bool = True - async with open_root_actor( - hide_tb=hide_tb, - **kwargs, - ) as actor: + async with ( + # collapse_eg(hide_tb=hide_tb), + open_root_actor( + hide_tb=hide_tb, + **kwargs, + ) as actor, + ): assert actor is current_actor() try: - async with _open_and_supervise_one_cancels_all_nursery( - actor - ) as an: + async with ( + _open_and_supervise_one_cancels_all_nursery( + actor + ) as an + ): # NOTE: mark this nursery as having # implicitly started the root actor so