WIP, actor-nursery non-graceful-cancel raises EG
Attempting a rework of the post-cancellation "raising semantics" such
that subactors which are `ActorCancelled` as a result of a non-graceful
in-scope error, are acked via a re-raised
`ExceptionGroup[ActorCancelled*N, Exception]`
*outside the an-block*. Eventually, the idea is to have `ActorCancelled`
be relayed from each subactor in response to any
`Actor.cancel()/Portal.cancel_actor()` request much like
`Context.cancel()/ContextCancelled`.
This is a WIP bc it does break a few tests and requires related
`_spawn`-mod-machinery changes to match some of which I'm not yet sure
are required; need to dig into to the details of the currently failing
suites first.
`._supervise` patch deats,
- add `ActorNursery.maybe_error` which delivers the maybe-EG or
  `._scope_error` depending on `.errors` (now `._errors`, a mapping from
  `Aid`-keys) has entries seet for subs.
- raise ^ if non-null in a new outer-`finally` in
  `_open_and_supervise_one_cancels_all_nursery()`; an "outer" block is
  added to ensure all sub-actor-excs are emited/captured as part of
  `ActorNursery.cancel()` being called (as prior) as well as the
  `da_nursery` being explicitly cancelled alongside it (to unblock the
  tn-block, but still not sure why this is necessary yet?..).
- (now masked) tried injecting actorcs from `.cancel()` loop, but (again
  per more explanation in section below) seems to be suffering a race
  issue with RAE relay?
- left in buncha notes obvi for all this..
`._spawn` patch deats,
- as above, expect `errors: dict` to map from `Aid`-keys.
- pass `errors: dict` into `soft_kill()` since it seemed like we'd want
  to (for now) inject `ActoreCancelled` in some cases (but now i'm not
  sure XD).
- tried out a couple spots (which are now masked) to inject
  `ActorCancelled` after calling `Portal.cancel()` in various
  subactor-supervision routines whenev an RAE is not set..
  - oddly seems to be overwriting actual errors (likely due to racing
    with RAE receive and/or actorc-request timeout?) despite the guard
    logic..which clearly doesn't resolve the issue..
- buncha `tn`-style renaming.
			
			
			
		
							parent
							
								
									1f269d8c32
								
							
						
					
					
						commit
						39d2a3ee3d
					
				| 
						 | 
				
			
			@ -50,7 +50,11 @@ from tractor._addr import UnwrappedAddress
 | 
			
		|||
from tractor._portal import Portal
 | 
			
		||||
from tractor._runtime import Actor
 | 
			
		||||
from tractor._entry import _mp_main
 | 
			
		||||
from tractor._exceptions import ActorFailure
 | 
			
		||||
from tractor._exceptions import (
 | 
			
		||||
    ActorCancelled,
 | 
			
		||||
    ActorFailure,
 | 
			
		||||
    # NoResult,
 | 
			
		||||
)
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
    pretty_struct,
 | 
			
		||||
| 
						 | 
				
			
			@ -137,7 +141,6 @@ def try_set_start_method(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def exhaust_portal(
 | 
			
		||||
 | 
			
		||||
    portal: Portal,
 | 
			
		||||
    actor: Actor
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -185,10 +188,12 @@ async def exhaust_portal(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def cancel_on_completion(
 | 
			
		||||
 | 
			
		||||
    portal: Portal,
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    errors: dict[tuple[str, str], Exception],
 | 
			
		||||
    errors: dict[
 | 
			
		||||
        msgtypes.Aid,
 | 
			
		||||
        Exception,
 | 
			
		||||
    ],
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -209,24 +214,57 @@ async def cancel_on_completion(
 | 
			
		|||
        portal,
 | 
			
		||||
        actor,
 | 
			
		||||
    )
 | 
			
		||||
    aid: msgtypes.Aid = actor.aid
 | 
			
		||||
    repr_aid: str = aid.reprol(sin_uuid=False)
 | 
			
		||||
 | 
			
		||||
    if isinstance(result, Exception):
 | 
			
		||||
        errors[actor.uid]: Exception = result
 | 
			
		||||
        errors[aid]: Exception = result
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            'Cancelling subactor runtime due to error:\n\n'
 | 
			
		||||
            f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
 | 
			
		||||
            f'error: {result}\n'
 | 
			
		||||
            'Cancelling subactor {repr_aid!r} runtime due to error\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'Portal.cancel_actor() => {portal.channel.uid}\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{result!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            'Cancelling subactor gracefully:\n\n'
 | 
			
		||||
            f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
 | 
			
		||||
            f'result: {result}\n'
 | 
			
		||||
        report: str = (
 | 
			
		||||
            f'Cancelling subactor {repr_aid!r} gracefully..\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
        )
 | 
			
		||||
        canc_info: str = (
 | 
			
		||||
            f'Portal.cancel_actor() => {portal.chan.uid}\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'final-result => {result!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            report
 | 
			
		||||
            +
 | 
			
		||||
            canc_info
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # cancel the process now that we have a final result
 | 
			
		||||
    await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
    if (
 | 
			
		||||
        not errors.get(aid)
 | 
			
		||||
        # and
 | 
			
		||||
        # result is NoResult
 | 
			
		||||
    ):
 | 
			
		||||
        pass
 | 
			
		||||
        # await debug.pause(shield=True)
 | 
			
		||||
 | 
			
		||||
        # errors[aid] = ActorCancelled(
 | 
			
		||||
        #     message=(
 | 
			
		||||
        #         f'Cancelled subactor {repr_aid!r}\n'
 | 
			
		||||
        #         f'{canc_info}\n'
 | 
			
		||||
        #     ),
 | 
			
		||||
        #     canceller=current_actor().aid,
 | 
			
		||||
        #     # TODO? should we have a ack-msg?
 | 
			
		||||
        #     # ipc_msg=??
 | 
			
		||||
        #     # boxed_type=trio.Cancelled,
 | 
			
		||||
        # )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def hard_kill(
 | 
			
		||||
    proc: trio.Process,
 | 
			
		||||
| 
						 | 
				
			
			@ -314,6 +352,10 @@ async def soft_kill(
 | 
			
		|||
        Awaitable,
 | 
			
		||||
    ],
 | 
			
		||||
    portal: Portal,
 | 
			
		||||
    errors: dict[
 | 
			
		||||
        msgtypes.Aid,
 | 
			
		||||
        Exception,
 | 
			
		||||
    ],
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -357,8 +399,8 @@ async def soft_kill(
 | 
			
		|||
        # below. This means we try to do a graceful teardown
 | 
			
		||||
        # via sending a cancel message before getting out
 | 
			
		||||
        # zombie killing tools.
 | 
			
		||||
        async with trio.open_nursery() as n:
 | 
			
		||||
            n.cancel_scope.shield = True
 | 
			
		||||
        async with trio.open_nursery() as tn:
 | 
			
		||||
            tn.cancel_scope.shield = True
 | 
			
		||||
 | 
			
		||||
            async def cancel_on_proc_deth():
 | 
			
		||||
                '''
 | 
			
		||||
| 
						 | 
				
			
			@ -368,24 +410,35 @@ async def soft_kill(
 | 
			
		|||
 | 
			
		||||
                '''
 | 
			
		||||
                await wait_func(proc)
 | 
			
		||||
                n.cancel_scope.cancel()
 | 
			
		||||
                tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
            # start a task to wait on the termination of the
 | 
			
		||||
            # process by itself waiting on a (caller provided) wait
 | 
			
		||||
            # function which should unblock when the target process
 | 
			
		||||
            # has terminated.
 | 
			
		||||
            n.start_soon(cancel_on_proc_deth)
 | 
			
		||||
            tn.start_soon(cancel_on_proc_deth)
 | 
			
		||||
 | 
			
		||||
            # send the actor-runtime a cancel request.
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
            # if not errors.get(peer_aid):
 | 
			
		||||
            #     errors[peer_aid] = ActorCancelled(
 | 
			
		||||
            #         message=(
 | 
			
		||||
            #             'Sub-actor cancelled gracefully by parent\n'
 | 
			
		||||
            #         ),
 | 
			
		||||
            #         canceller=current_actor().aid,
 | 
			
		||||
            #         # TODO? should we have a ack-msg?
 | 
			
		||||
            #         # ipc_msg=??
 | 
			
		||||
            #         # boxed_type=trio.Cancelled,
 | 
			
		||||
            #     )
 | 
			
		||||
 | 
			
		||||
            if proc.poll() is None:  # type: ignore
 | 
			
		||||
                log.warning(
 | 
			
		||||
                    'Subactor still alive after cancel request?\n\n'
 | 
			
		||||
                    f'uid: {peer_aid}\n'
 | 
			
		||||
                    f'|_{proc}\n'
 | 
			
		||||
                )
 | 
			
		||||
                n.cancel_scope.cancel()
 | 
			
		||||
                tn.cancel_scope.cancel()
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -393,7 +446,10 @@ async def new_proc(
 | 
			
		|||
    name: str,
 | 
			
		||||
    actor_nursery: ActorNursery,
 | 
			
		||||
    subactor: Actor,
 | 
			
		||||
    errors: dict[tuple[str, str], Exception],
 | 
			
		||||
    errors: dict[
 | 
			
		||||
        msgtypes.Aid,
 | 
			
		||||
        Exception,
 | 
			
		||||
    ],
 | 
			
		||||
 | 
			
		||||
    # passed through to actor main
 | 
			
		||||
    bind_addrs: list[UnwrappedAddress],
 | 
			
		||||
| 
						 | 
				
			
			@ -432,7 +488,10 @@ async def trio_proc(
 | 
			
		|||
    name: str,
 | 
			
		||||
    actor_nursery: ActorNursery,
 | 
			
		||||
    subactor: Actor,
 | 
			
		||||
    errors: dict[tuple[str, str], Exception],
 | 
			
		||||
    errors: dict[
 | 
			
		||||
        msgtypes.Aid,
 | 
			
		||||
        Exception,
 | 
			
		||||
    ],
 | 
			
		||||
 | 
			
		||||
    # passed through to actor main
 | 
			
		||||
    bind_addrs: list[UnwrappedAddress],
 | 
			
		||||
| 
						 | 
				
			
			@ -555,9 +614,9 @@ async def trio_proc(
 | 
			
		|||
        with trio.CancelScope(shield=True):
 | 
			
		||||
            await actor_nursery._join_procs.wait()
 | 
			
		||||
 | 
			
		||||
        async with trio.open_nursery() as nursery:
 | 
			
		||||
        async with trio.open_nursery() as ptl_reaper_tn:
 | 
			
		||||
            if portal in actor_nursery._cancel_after_result_on_exit:
 | 
			
		||||
                nursery.start_soon(
 | 
			
		||||
                ptl_reaper_tn.start_soon(
 | 
			
		||||
                    cancel_on_completion,
 | 
			
		||||
                    portal,
 | 
			
		||||
                    subactor,
 | 
			
		||||
| 
						 | 
				
			
			@ -570,7 +629,8 @@ async def trio_proc(
 | 
			
		|||
            await soft_kill(
 | 
			
		||||
                proc,
 | 
			
		||||
                trio.Process.wait,  # XXX, uses `pidfd_open()` below.
 | 
			
		||||
                portal
 | 
			
		||||
                portal,
 | 
			
		||||
                errors,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # cancel result waiter that may have been spawned in
 | 
			
		||||
| 
						 | 
				
			
			@ -579,7 +639,7 @@ async def trio_proc(
 | 
			
		|||
                'Cancelling portal result reaper task\n'
 | 
			
		||||
                f'c)> {subactor.aid.reprol()!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            nursery.cancel_scope.cancel()
 | 
			
		||||
            ptl_reaper_tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        # XXX NOTE XXX: The "hard" reap since no actor zombies are
 | 
			
		||||
| 
						 | 
				
			
			@ -652,7 +712,10 @@ async def mp_proc(
 | 
			
		|||
    name: str,
 | 
			
		||||
    actor_nursery: ActorNursery,  # type: ignore  # noqa
 | 
			
		||||
    subactor: Actor,
 | 
			
		||||
    errors: dict[tuple[str, str], Exception],
 | 
			
		||||
    errors: dict[
 | 
			
		||||
        msgtypes.Aid,
 | 
			
		||||
        Exception,
 | 
			
		||||
    ],
 | 
			
		||||
    # passed through to actor main
 | 
			
		||||
    bind_addrs: list[UnwrappedAddress],
 | 
			
		||||
    parent_addr: UnwrappedAddress,
 | 
			
		||||
| 
						 | 
				
			
			@ -777,7 +840,7 @@ async def mp_proc(
 | 
			
		|||
                    cancel_on_completion,
 | 
			
		||||
                    portal,
 | 
			
		||||
                    subactor,
 | 
			
		||||
                    errors
 | 
			
		||||
                    errors,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # This is a "soft" (cancellable) join/reap which
 | 
			
		||||
| 
						 | 
				
			
			@ -786,7 +849,8 @@ async def mp_proc(
 | 
			
		|||
            await soft_kill(
 | 
			
		||||
                proc,
 | 
			
		||||
                proc_waiter,
 | 
			
		||||
                portal
 | 
			
		||||
                portal,
 | 
			
		||||
                errors,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # cancel result waiter that may have been spawned in
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -30,6 +30,9 @@ import warnings
 | 
			
		|||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
from .msg import (
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
)
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    pformat as _pformat,
 | 
			
		||||
| 
						 | 
				
			
			@ -48,6 +51,7 @@ from .trionics import (
 | 
			
		|||
)
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
    ContextCancelled,
 | 
			
		||||
    ActorCancelled,
 | 
			
		||||
)
 | 
			
		||||
from ._root import (
 | 
			
		||||
    open_root_actor,
 | 
			
		||||
| 
						 | 
				
			
			@ -99,7 +103,10 @@ class ActorNursery:
 | 
			
		|||
        actor: Actor,
 | 
			
		||||
        ria_nursery: trio.Nursery,
 | 
			
		||||
        da_nursery: trio.Nursery,
 | 
			
		||||
        errors: dict[tuple[str, str], BaseException],
 | 
			
		||||
        errors: dict[
 | 
			
		||||
            msgtypes.Aid,
 | 
			
		||||
            BaseException,
 | 
			
		||||
        ],
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        # self.supervisor = supervisor  # TODO
 | 
			
		||||
| 
						 | 
				
			
			@ -117,9 +124,11 @@ class ActorNursery:
 | 
			
		|||
            ]
 | 
			
		||||
        ] = {}
 | 
			
		||||
 | 
			
		||||
        # signals when it is ok to start waiting o subactor procs
 | 
			
		||||
        # for termination.
 | 
			
		||||
        self._join_procs = trio.Event()
 | 
			
		||||
        self._at_least_one_child_in_debug: bool = False
 | 
			
		||||
        self.errors = errors
 | 
			
		||||
        self._errors = errors
 | 
			
		||||
        self._scope_error: BaseException|None = None
 | 
			
		||||
        self.exited = trio.Event()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -260,7 +269,7 @@ class ActorNursery:
 | 
			
		|||
                name,
 | 
			
		||||
                self,
 | 
			
		||||
                subactor,
 | 
			
		||||
                self.errors,
 | 
			
		||||
                self._errors,
 | 
			
		||||
                bind_addrs,
 | 
			
		||||
                parent_addr,
 | 
			
		||||
                _rtv,  # run time vars
 | 
			
		||||
| 
						 | 
				
			
			@ -364,7 +373,9 @@ class ActorNursery:
 | 
			
		|||
        # then `._children`..
 | 
			
		||||
        children: dict = self._children
 | 
			
		||||
        child_count: int = len(children)
 | 
			
		||||
        msg: str = f'Cancelling actor nursery with {child_count} children\n'
 | 
			
		||||
        msg: str = (
 | 
			
		||||
            f'Cancelling actor-nursery with {child_count} children\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        server: IPCServer = self._actor.ipc_server
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -391,7 +402,9 @@ class ActorNursery:
 | 
			
		|||
 | 
			
		||||
                    else:
 | 
			
		||||
                        if portal is None:  # actor hasn't fully spawned yet
 | 
			
		||||
                            event: trio.Event = server._peer_connected[subactor.uid]
 | 
			
		||||
                            event: trio.Event = server._peer_connected[
 | 
			
		||||
                                subactor.uid
 | 
			
		||||
                            ]
 | 
			
		||||
                            log.warning(
 | 
			
		||||
                                f"{subactor.uid} never 't finished spawning?"
 | 
			
		||||
                            )
 | 
			
		||||
| 
						 | 
				
			
			@ -416,7 +429,20 @@ class ActorNursery:
 | 
			
		|||
                        # spawn cancel tasks for each sub-actor
 | 
			
		||||
                        assert portal
 | 
			
		||||
                        if portal.channel.connected():
 | 
			
		||||
                            tn.start_soon(portal.cancel_actor)
 | 
			
		||||
 | 
			
		||||
                            async def canc_subactor():
 | 
			
		||||
                                await portal.cancel_actor()
 | 
			
		||||
                                # aid: msgtypes.Aid = subactor.aid
 | 
			
		||||
                                # reprol: str = aid.reprol(sin_uuid=False)
 | 
			
		||||
                                # if not self._errors.get(aid):
 | 
			
		||||
                                #     self._errors[aid] = ActorCancelled(
 | 
			
		||||
                                #         message=(
 | 
			
		||||
                                #             f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n'
 | 
			
		||||
                                #         ),
 | 
			
		||||
                                #         canceller=self._actor.aid,
 | 
			
		||||
                                #     )
 | 
			
		||||
 | 
			
		||||
                            tn.start_soon(canc_subactor)
 | 
			
		||||
 | 
			
		||||
                log.cancel(msg)
 | 
			
		||||
        # if we cancelled the cancel (we hung cancelling remote actors)
 | 
			
		||||
| 
						 | 
				
			
			@ -442,6 +468,47 @@ class ActorNursery:
 | 
			
		|||
        # mark ourselves as having (tried to have) cancelled all subactors
 | 
			
		||||
        self._join_procs.set()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def maybe_error(self) -> (
 | 
			
		||||
        BaseException|
 | 
			
		||||
        BaseExceptionGroup|
 | 
			
		||||
        None
 | 
			
		||||
    ):
 | 
			
		||||
        '''
 | 
			
		||||
        Deliver any captured scope errors including those relayed
 | 
			
		||||
        from subactors such as `ActorCancelled` during a non-graceful
 | 
			
		||||
        cancellation scenario.
 | 
			
		||||
 | 
			
		||||
        When more then a "graceful cancel" occurrs wrap all collected
 | 
			
		||||
        sub-exceptions in a raised `ExceptionGroup`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        scope_exc: BaseException|None = self._scope_error
 | 
			
		||||
 | 
			
		||||
        # XXX NOTE, only pack an eg if there i at least one
 | 
			
		||||
        # non-actorc exception received from a subactor, OR
 | 
			
		||||
        # return `._scope_error` verbatim.
 | 
			
		||||
        if (errors := self._errors):
 | 
			
		||||
            # use `BaseExceptionGroup` as needed
 | 
			
		||||
            excs: list[BaseException] = list(errors.values())
 | 
			
		||||
            if (
 | 
			
		||||
                len(excs) > 1
 | 
			
		||||
                and
 | 
			
		||||
                any(
 | 
			
		||||
                    type(exc) not in {ActorCancelled,}
 | 
			
		||||
                    for exc in excs
 | 
			
		||||
                )
 | 
			
		||||
            ):
 | 
			
		||||
                return ExceptionGroup(
 | 
			
		||||
                    'ActorNursery multi-errored with',
 | 
			
		||||
                    tuple(excs),
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # raise the lone subactor exc
 | 
			
		||||
            return list(excs)[0]
 | 
			
		||||
 | 
			
		||||
        return scope_exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		||||
| 
						 | 
				
			
			@ -457,7 +524,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
    inner_err: BaseException|None = None
 | 
			
		||||
 | 
			
		||||
    # the collection of errors retreived from spawned sub-actors
 | 
			
		||||
    errors: dict[tuple[str, str], BaseException] = {}
 | 
			
		||||
    errors: dict[
 | 
			
		||||
        msgtypes.Aid,
 | 
			
		||||
        BaseException,
 | 
			
		||||
    ] = {}
 | 
			
		||||
 | 
			
		||||
    # This is the outermost level "deamon actor" nursery. It is awaited
 | 
			
		||||
    # **after** the below inner "run in actor nursery". This allows for
 | 
			
		||||
| 
						 | 
				
			
			@ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
    # `ActorNursery.start_actor()`).
 | 
			
		||||
 | 
			
		||||
    # errors from this daemon actor nursery bubble up to caller
 | 
			
		||||
    async with (
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        trio.open_nursery() as da_nursery,
 | 
			
		||||
    ):
 | 
			
		||||
        try:
 | 
			
		||||
            # This is the inner level "run in actor" nursery. It is
 | 
			
		||||
            # awaited first since actors spawned in this way (using
 | 
			
		||||
            # `ActorNusery.run_in_actor()`) are expected to only
 | 
			
		||||
            # return a single result and then complete (i.e. be canclled
 | 
			
		||||
            # gracefully). Errors collected from these actors are
 | 
			
		||||
            # immediately raised for handling by a supervisor strategy.
 | 
			
		||||
            # As such if the strategy propagates any error(s) upwards
 | 
			
		||||
            # the above "daemon actor" nursery will be notified.
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as ria_nursery,
 | 
			
		||||
            ):
 | 
			
		||||
                an = ActorNursery(
 | 
			
		||||
                    actor,
 | 
			
		||||
                    ria_nursery,
 | 
			
		||||
                    da_nursery,
 | 
			
		||||
                    errors
 | 
			
		||||
                )
 | 
			
		||||
                try:
 | 
			
		||||
                    # spawning of actors happens in the caller's scope
 | 
			
		||||
                    # after we yield upwards
 | 
			
		||||
                    yield an
 | 
			
		||||
 | 
			
		||||
                    # When we didn't error in the caller's scope,
 | 
			
		||||
                    # signal all process-monitor-tasks to conduct
 | 
			
		||||
                    # the "hard join phase".
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        'Waiting on subactors to complete:\n'
 | 
			
		||||
                        f'>}} {len(an._children)}\n'
 | 
			
		||||
    try:
 | 
			
		||||
        async with (
 | 
			
		||||
            collapse_eg(),
 | 
			
		||||
            trio.open_nursery() as da_nursery,
 | 
			
		||||
        ):
 | 
			
		||||
            try:
 | 
			
		||||
                # This is the inner level "run in actor" nursery. It is
 | 
			
		||||
                # awaited first since actors spawned in this way (using
 | 
			
		||||
                # `ActorNusery.run_in_actor()`) are expected to only
 | 
			
		||||
                # return a single result and then complete (i.e. be canclled
 | 
			
		||||
                # gracefully). Errors collected from these actors are
 | 
			
		||||
                # immediately raised for handling by a supervisor strategy.
 | 
			
		||||
                # As such if the strategy propagates any error(s) upwards
 | 
			
		||||
                # the above "daemon actor" nursery will be notified.
 | 
			
		||||
                async with (
 | 
			
		||||
                    collapse_eg(),
 | 
			
		||||
                    trio.open_nursery() as ria_nursery,
 | 
			
		||||
                ):
 | 
			
		||||
                    an = ActorNursery(
 | 
			
		||||
                        actor,
 | 
			
		||||
                        ria_nursery,
 | 
			
		||||
                        da_nursery,
 | 
			
		||||
                        errors
 | 
			
		||||
                    )
 | 
			
		||||
                    an._join_procs.set()
 | 
			
		||||
                    try:
 | 
			
		||||
                        # spawning of actors happens in the caller's scope
 | 
			
		||||
                        # after we yield upwards
 | 
			
		||||
                        yield an
 | 
			
		||||
 | 
			
		||||
                except BaseException as _inner_err:
 | 
			
		||||
                    inner_err = _inner_err
 | 
			
		||||
                    errors[actor.uid] = inner_err
 | 
			
		||||
                        # When we didn't error in the caller's scope,
 | 
			
		||||
                        # signal all process-monitor-tasks to conduct
 | 
			
		||||
                        # the "hard join phase".
 | 
			
		||||
                        log.runtime(
 | 
			
		||||
                            'Waiting on subactors to complete:\n'
 | 
			
		||||
                            f'>}} {len(an._children)}\n'
 | 
			
		||||
                        )
 | 
			
		||||
                        an._join_procs.set()
 | 
			
		||||
 | 
			
		||||
                    # If we error in the root but the debugger is
 | 
			
		||||
                    # engaged we don't want to prematurely kill (and
 | 
			
		||||
                    # thus clobber access to) the local tty since it
 | 
			
		||||
                    # will make the pdb repl unusable.
 | 
			
		||||
                    # Instead try to wait for pdb to be released before
 | 
			
		||||
                    # tearing down.
 | 
			
		||||
                    await debug.maybe_wait_for_debugger(
 | 
			
		||||
                        child_in_debug=an._at_least_one_child_in_debug
 | 
			
		||||
                    )
 | 
			
		||||
                    except BaseException as _inner_err:
 | 
			
		||||
                        inner_err = _inner_err
 | 
			
		||||
                        # errors[actor.aid] = inner_err
 | 
			
		||||
 | 
			
		||||
                    # if the caller's scope errored then we activate our
 | 
			
		||||
                    # one-cancels-all supervisor strategy (don't
 | 
			
		||||
                    # worry more are coming).
 | 
			
		||||
                    an._join_procs.set()
 | 
			
		||||
                        # If we error in the root but the debugger is
 | 
			
		||||
                        # engaged we don't want to prematurely kill (and
 | 
			
		||||
                        # thus clobber access to) the local tty since it
 | 
			
		||||
                        # will make the pdb repl unusable.
 | 
			
		||||
                        # Instead try to wait for pdb to be released before
 | 
			
		||||
                        # tearing down.
 | 
			
		||||
                        await debug.maybe_wait_for_debugger(
 | 
			
		||||
                            child_in_debug=an._at_least_one_child_in_debug
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                    # XXX NOTE XXX: hypothetically an error could
 | 
			
		||||
                    # be raised and then a cancel signal shows up
 | 
			
		||||
                    # slightly after in which case the `else:`
 | 
			
		||||
                    # block here might not complete?  For now,
 | 
			
		||||
                    # shield both.
 | 
			
		||||
                    with trio.CancelScope(shield=True):
 | 
			
		||||
                        etype: type = type(inner_err)
 | 
			
		||||
                        if etype in (
 | 
			
		||||
                            trio.Cancelled,
 | 
			
		||||
                            KeyboardInterrupt,
 | 
			
		||||
                        ) or (
 | 
			
		||||
                            is_multi_cancelled(inner_err)
 | 
			
		||||
                        ):
 | 
			
		||||
                            log.cancel(
 | 
			
		||||
                                f'Actor-nursery cancelled by {etype}\n\n'
 | 
			
		||||
                        # if the caller's scope errored then we activate our
 | 
			
		||||
                        # one-cancels-all supervisor strategy (don't
 | 
			
		||||
                        # worry more are coming).
 | 
			
		||||
                        an._join_procs.set()
 | 
			
		||||
 | 
			
		||||
                                f'{current_actor().uid}\n'
 | 
			
		||||
                                f' |_{an}\n\n'
 | 
			
		||||
                        # XXX NOTE XXX: hypothetically an error could
 | 
			
		||||
                        # be raised and then a cancel signal shows up
 | 
			
		||||
                        # slightly after in which case the `else:`
 | 
			
		||||
                        # block here might not complete?  For now,
 | 
			
		||||
                        # shield both.
 | 
			
		||||
                        with trio.CancelScope(shield=True):
 | 
			
		||||
                            etype: type = type(inner_err)
 | 
			
		||||
                            if etype in (
 | 
			
		||||
                                trio.Cancelled,
 | 
			
		||||
                                KeyboardInterrupt,
 | 
			
		||||
                            ) or (
 | 
			
		||||
                                is_multi_cancelled(inner_err)
 | 
			
		||||
                            ):
 | 
			
		||||
                                log.cancel(
 | 
			
		||||
                                    f'Actor-nursery cancelled by {etype}\n\n'
 | 
			
		||||
 | 
			
		||||
                                # TODO: show tb str?
 | 
			
		||||
                                # f'{tb_str}'
 | 
			
		||||
                            )
 | 
			
		||||
                        elif etype in {
 | 
			
		||||
                            ContextCancelled,
 | 
			
		||||
                        }:
 | 
			
		||||
                            log.cancel(
 | 
			
		||||
                                'Actor-nursery caught remote cancellation\n'
 | 
			
		||||
                                '\n'
 | 
			
		||||
                                f'{inner_err.tb_str}'
 | 
			
		||||
                            )
 | 
			
		||||
                        else:
 | 
			
		||||
                            log.exception(
 | 
			
		||||
                                'Nursery errored with:\n'
 | 
			
		||||
                                    f'{current_actor().uid}\n'
 | 
			
		||||
                                    f' |_{an}\n\n'
 | 
			
		||||
 | 
			
		||||
                                # TODO: same thing as in
 | 
			
		||||
                                # `._invoke()` to compute how to
 | 
			
		||||
                                # place this div-line in the
 | 
			
		||||
                                # middle of the above msg
 | 
			
		||||
                                # content..
 | 
			
		||||
                                # -[ ] prolly helper-func it too
 | 
			
		||||
                                #   in our `.log` module..
 | 
			
		||||
                                # '------ - ------'
 | 
			
		||||
                            )
 | 
			
		||||
                                    # TODO: show tb str?
 | 
			
		||||
                                    # f'{tb_str}'
 | 
			
		||||
                                )
 | 
			
		||||
                            elif etype in {
 | 
			
		||||
                                ContextCancelled,
 | 
			
		||||
                            }:
 | 
			
		||||
                                log.cancel(
 | 
			
		||||
                                    'Actor-nursery caught remote cancellation\n'
 | 
			
		||||
                                    '\n'
 | 
			
		||||
                                    f'{inner_err.tb_str}'
 | 
			
		||||
                                )
 | 
			
		||||
                            else:
 | 
			
		||||
                                log.exception(
 | 
			
		||||
                                    'Nursery errored with:\n'
 | 
			
		||||
 | 
			
		||||
                        # cancel all subactors
 | 
			
		||||
                        await an.cancel()
 | 
			
		||||
                                    # TODO: same thing as in
 | 
			
		||||
                                    # `._invoke()` to compute how to
 | 
			
		||||
                                    # place this div-line in the
 | 
			
		||||
                                    # middle of the above msg
 | 
			
		||||
                                    # content..
 | 
			
		||||
                                    # -[ ] prolly helper-func it too
 | 
			
		||||
                                    #   in our `.log` module..
 | 
			
		||||
                                    # '------ - ------'
 | 
			
		||||
                                )
 | 
			
		||||
 | 
			
		||||
            # ria_nursery scope end
 | 
			
		||||
                            # cancel all subactors
 | 
			
		||||
                            await an.cancel()
 | 
			
		||||
 | 
			
		||||
        # TODO: this is the handler around the ``.run_in_actor()``
 | 
			
		||||
        # nursery. Ideally we can drop this entirely in the future as
 | 
			
		||||
        # the whole ``.run_in_actor()`` API should be built "on top of"
 | 
			
		||||
        # this lower level spawn-request-cancel "daemon actor" API where
 | 
			
		||||
        # a local in-actor task nursery is used with one-to-one task
 | 
			
		||||
        # + `await Portal.run()` calls and the results/errors are
 | 
			
		||||
        # handled directly (inline) and errors by the local nursery.
 | 
			
		||||
        except (
 | 
			
		||||
            Exception,
 | 
			
		||||
            BaseExceptionGroup,
 | 
			
		||||
            trio.Cancelled
 | 
			
		||||
        ) as _outer_err:
 | 
			
		||||
            outer_err = _outer_err
 | 
			
		||||
                # ria_nursery scope end
 | 
			
		||||
 | 
			
		||||
            an._scope_error = outer_err or inner_err
 | 
			
		||||
            # TODO: this is the handler around the ``.run_in_actor()``
 | 
			
		||||
            # nursery. Ideally we can drop this entirely in the future as
 | 
			
		||||
            # the whole ``.run_in_actor()`` API should be built "on top of"
 | 
			
		||||
            # this lower level spawn-request-cancel "daemon actor" API where
 | 
			
		||||
            # a local in-actor task nursery is used with one-to-one task
 | 
			
		||||
            # + `await Portal.run()` calls and the results/errors are
 | 
			
		||||
            # handled directly (inline) and errors by the local nursery.
 | 
			
		||||
            except (
 | 
			
		||||
                Exception,
 | 
			
		||||
                BaseExceptionGroup,
 | 
			
		||||
                trio.Cancelled
 | 
			
		||||
            ) as _outer_err:
 | 
			
		||||
                outer_err = _outer_err
 | 
			
		||||
 | 
			
		||||
            # XXX: yet another guard before allowing the cancel
 | 
			
		||||
            # sequence in case a (single) child is in debug.
 | 
			
		||||
            await debug.maybe_wait_for_debugger(
 | 
			
		||||
                child_in_debug=an._at_least_one_child_in_debug
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # If actor-local error was raised while waiting on
 | 
			
		||||
            # ".run_in_actor()" actors then we also want to cancel all
 | 
			
		||||
            # remaining sub-actors (due to our lone strategy:
 | 
			
		||||
            # one-cancels-all).
 | 
			
		||||
            if an._children:
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    'Actor-nursery cancelling due error type:\n'
 | 
			
		||||
                    f'{outer_err}\n'
 | 
			
		||||
                # XXX: yet another guard before allowing the cancel
 | 
			
		||||
                # sequence in case a (single) child is in debug.
 | 
			
		||||
                await debug.maybe_wait_for_debugger(
 | 
			
		||||
                    child_in_debug=an._at_least_one_child_in_debug
 | 
			
		||||
                )
 | 
			
		||||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    await an.cancel()
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
            # No errors were raised while awaiting ".run_in_actor()"
 | 
			
		||||
            # actors but those actors may have returned remote errors as
 | 
			
		||||
            # results (meaning they errored remotely and have relayed
 | 
			
		||||
            # those errors back to this parent actor). The errors are
 | 
			
		||||
            # collected in ``errors`` so cancel all actors, summarize
 | 
			
		||||
            # all errors and re-raise.
 | 
			
		||||
            if errors:
 | 
			
		||||
                # If actor-local error was raised while waiting on
 | 
			
		||||
                # ".run_in_actor()" actors then we also want to cancel all
 | 
			
		||||
                # remaining sub-actors (due to our lone strategy:
 | 
			
		||||
                # one-cancels-all).
 | 
			
		||||
                if an._children:
 | 
			
		||||
                    log.cancel(
 | 
			
		||||
                        'Actor-nursery cancelling due error type:\n'
 | 
			
		||||
                        f'{outer_err}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    with trio.CancelScope(shield=True):
 | 
			
		||||
                        await an.cancel()
 | 
			
		||||
 | 
			
		||||
                # use `BaseExceptionGroup` as needed
 | 
			
		||||
                if len(errors) > 1:
 | 
			
		||||
                    raise BaseExceptionGroup(
 | 
			
		||||
                        'tractor.ActorNursery errored with',
 | 
			
		||||
                        tuple(errors.values()),
 | 
			
		||||
                    )
 | 
			
		||||
                else:
 | 
			
		||||
                    raise list(errors.values())[0]
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            # show frame on any (likely) internal error
 | 
			
		||||
            if (
 | 
			
		||||
                not an.cancelled
 | 
			
		||||
                and an._scope_error
 | 
			
		||||
            ):
 | 
			
		||||
                __tracebackhide__: bool = False
 | 
			
		||||
            finally:
 | 
			
		||||
                scope_exc = an._scope_error = outer_err or inner_err
 | 
			
		||||
                # await debug.pause(shield=True)
 | 
			
		||||
                # if scope_exc:
 | 
			
		||||
                #     errors[actor.aid] = scope_exc
 | 
			
		||||
 | 
			
		||||
        # da_nursery scope end - nursery checkpoint
 | 
			
		||||
    # final exit
 | 
			
		||||
                # show this frame on any internal error
 | 
			
		||||
                if (
 | 
			
		||||
                    not an.cancelled
 | 
			
		||||
                    and
 | 
			
		||||
                    scope_exc
 | 
			
		||||
                ):
 | 
			
		||||
                    __tracebackhide__: bool = False
 | 
			
		||||
 | 
			
		||||
                # NOTE, it's possible no errors were raised while
 | 
			
		||||
                # awaiting ".run_in_actor()" actors but those
 | 
			
		||||
                # sub-actors may have delivered remote errors as
 | 
			
		||||
                # results, normally captured via machinery in
 | 
			
		||||
                # `._spawn.cancel_on_completion()`.
 | 
			
		||||
                #
 | 
			
		||||
                # Any such remote errors are collected in `an._errors`
 | 
			
		||||
                # which is summarized via `ActorNursery.maybe_error`
 | 
			
		||||
                # which is maybe re-raised in an outer block (below).
 | 
			
		||||
                #
 | 
			
		||||
                # So here we first cancel all subactors the summarize
 | 
			
		||||
                # all errors and then later (in that outer block)
 | 
			
		||||
                # maybe-raise on a "non-graceful" cancellation
 | 
			
		||||
                # outcome, normally as a summary EG.
 | 
			
		||||
                if (
 | 
			
		||||
                    scope_exc
 | 
			
		||||
                    or
 | 
			
		||||
                    errors
 | 
			
		||||
                ):
 | 
			
		||||
 | 
			
		||||
                    if an._children:
 | 
			
		||||
                        with trio.CancelScope(shield=True):
 | 
			
		||||
                            await an.cancel()
 | 
			
		||||
 | 
			
		||||
                    # cancel outer tn so we unblock outside this
 | 
			
		||||
                    # finally!
 | 
			
		||||
                    da_nursery.cance_scope.cancel()
 | 
			
		||||
                    #
 | 
			
		||||
                    # ^TODO? still don't get why needed?
 | 
			
		||||
                    # - an.cancel() should cause all spawn-subtasks
 | 
			
		||||
                    #   to eventually exit?
 | 
			
		||||
                    # - also, could (instead) we sync to an event here before
 | 
			
		||||
                    #   (ever) calling `an.cancel()`??
 | 
			
		||||
 | 
			
		||||
        # `da_nursery` scope end, thus a checkpoint.
 | 
			
		||||
    finally:
 | 
			
		||||
 | 
			
		||||
            # raise any eg compiled from all subs
 | 
			
		||||
            # ??TODO should we also adopt strict-egs here like
 | 
			
		||||
            # `trio.Nursery`??
 | 
			
		||||
            #
 | 
			
		||||
            # XXX justification notes,
 | 
			
		||||
            # docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups
 | 
			
		||||
            # anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888
 | 
			
		||||
            # gh: https://github.com/python-trio/trio/issues/611
 | 
			
		||||
        if an_exc := an.maybe_error:
 | 
			
		||||
            raise an_exc
 | 
			
		||||
 | 
			
		||||
        if scope_exc := an._scope_error:
 | 
			
		||||
            raise scope_exc
 | 
			
		||||
 | 
			
		||||
    # @acm-fn scope exit
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_shutdown_msg: str = (
 | 
			
		||||
| 
						 | 
				
			
			@ -647,7 +753,7 @@ _shutdown_msg: str = (
 | 
			
		|||
@acm
 | 
			
		||||
async def open_nursery(
 | 
			
		||||
    *,  # named params only!
 | 
			
		||||
    hide_tb: bool = True,
 | 
			
		||||
    hide_tb: bool = False,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
    # ^TODO, paramspec for `open_root_actor()`
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -683,16 +789,21 @@ async def open_nursery(
 | 
			
		|||
            # mark us for teardown on exit
 | 
			
		||||
            implicit_runtime: bool = True
 | 
			
		||||
 | 
			
		||||
            async with open_root_actor(
 | 
			
		||||
                hide_tb=hide_tb,
 | 
			
		||||
                **kwargs,
 | 
			
		||||
            ) as actor:
 | 
			
		||||
            async with (
 | 
			
		||||
                # collapse_eg(hide_tb=hide_tb),
 | 
			
		||||
                open_root_actor(
 | 
			
		||||
                    hide_tb=hide_tb,
 | 
			
		||||
                    **kwargs,
 | 
			
		||||
                ) as actor,
 | 
			
		||||
            ):
 | 
			
		||||
                assert actor is current_actor()
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    async with _open_and_supervise_one_cancels_all_nursery(
 | 
			
		||||
                        actor
 | 
			
		||||
                    ) as an:
 | 
			
		||||
                    async with (
 | 
			
		||||
                        _open_and_supervise_one_cancels_all_nursery(
 | 
			
		||||
                            actor
 | 
			
		||||
                        ) as an
 | 
			
		||||
                    ):
 | 
			
		||||
 | 
			
		||||
                        # NOTE: mark this nursery as having
 | 
			
		||||
                        # implicitly started the root actor so
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue