WIP, actor-nursery non-graceful-cancel raises EG
Attempting a rework of the post-cancellation "raising semantics" such
that subactors which are `ActorCancelled` as a result of a non-graceful
in-scope error, are acked via a re-raised
`ExceptionGroup[ActorCancelled*N, Exception]`
*outside the an-block*. Eventually, the idea is to have `ActorCancelled`
be relayed from each subactor in response to any
`Actor.cancel()/Portal.cancel_actor()` request much like
`Context.cancel()/ContextCancelled`.
This is a WIP bc it does break a few tests and requires related
`_spawn`-mod-machinery changes to match some of which I'm not yet sure
are required; need to dig into to the details of the currently failing
suites first.
`._supervise` patch deats,
- add `ActorNursery.maybe_error` which delivers the maybe-EG or
  `._scope_error` depending on `.errors` (now `._errors`, a mapping from
  `Aid`-keys) has entries seet for subs.
- raise ^ if non-null in a new outer-`finally` in
  `_open_and_supervise_one_cancels_all_nursery()`; an "outer" block is
  added to ensure all sub-actor-excs are emited/captured as part of
  `ActorNursery.cancel()` being called (as prior) as well as the
  `da_nursery` being explicitly cancelled alongside it (to unblock the
  tn-block, but still not sure why this is necessary yet?..).
- (now masked) tried injecting actorcs from `.cancel()` loop, but (again
  per more explanation in section below) seems to be suffering a race
  issue with RAE relay?
- left in buncha notes obvi for all this..
`._spawn` patch deats,
- as above, expect `errors: dict` to map from `Aid`-keys.
- pass `errors: dict` into `soft_kill()` since it seemed like we'd want
  to (for now) inject `ActoreCancelled` in some cases (but now i'm not
  sure XD).
- tried out a couple spots (which are now masked) to inject
  `ActorCancelled` after calling `Portal.cancel()` in various
  subactor-supervision routines whenev an RAE is not set..
  - oddly seems to be overwriting actual errors (likely due to racing
    with RAE receive and/or actorc-request timeout?) despite the guard
    logic..which clearly doesn't resolve the issue..
- buncha `tn`-style renaming.
			
			
				actor_cancelled_exc_type
			
			
		
							parent
							
								
									f0adb0fb54
								
							
						
					
					
						commit
						4b4e5df2b7
					
				|  | @ -50,7 +50,11 @@ from tractor._addr import UnwrappedAddress | ||||||
| from tractor._portal import Portal | from tractor._portal import Portal | ||||||
| from tractor._runtime import Actor | from tractor._runtime import Actor | ||||||
| from tractor._entry import _mp_main | from tractor._entry import _mp_main | ||||||
| from tractor._exceptions import ActorFailure | from tractor._exceptions import ( | ||||||
|  |     ActorCancelled, | ||||||
|  |     ActorFailure, | ||||||
|  |     # NoResult, | ||||||
|  | ) | ||||||
| from tractor.msg import ( | from tractor.msg import ( | ||||||
|     types as msgtypes, |     types as msgtypes, | ||||||
|     pretty_struct, |     pretty_struct, | ||||||
|  | @ -137,7 +141,6 @@ def try_set_start_method( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def exhaust_portal( | async def exhaust_portal( | ||||||
| 
 |  | ||||||
|     portal: Portal, |     portal: Portal, | ||||||
|     actor: Actor |     actor: Actor | ||||||
| 
 | 
 | ||||||
|  | @ -185,10 +188,12 @@ async def exhaust_portal( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def cancel_on_completion( | async def cancel_on_completion( | ||||||
| 
 |  | ||||||
|     portal: Portal, |     portal: Portal, | ||||||
|     actor: Actor, |     actor: Actor, | ||||||
|     errors: dict[tuple[str, str], Exception], |     errors: dict[ | ||||||
|  |         msgtypes.Aid, | ||||||
|  |         Exception, | ||||||
|  |     ], | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|  | @ -209,24 +214,57 @@ async def cancel_on_completion( | ||||||
|         portal, |         portal, | ||||||
|         actor, |         actor, | ||||||
|     ) |     ) | ||||||
|  |     aid: msgtypes.Aid = actor.aid | ||||||
|  |     repr_aid: str = aid.reprol(sin_uuid=False) | ||||||
|  | 
 | ||||||
|     if isinstance(result, Exception): |     if isinstance(result, Exception): | ||||||
|         errors[actor.uid]: Exception = result |         errors[aid]: Exception = result | ||||||
|         log.cancel( |         log.cancel( | ||||||
|             'Cancelling subactor runtime due to error:\n\n' |             'Cancelling subactor {repr_aid!r} runtime due to error\n' | ||||||
|             f'Portal.cancel_actor() => {portal.channel.uid}\n\n' |             f'\n' | ||||||
|             f'error: {result}\n' |             f'Portal.cancel_actor() => {portal.channel.uid}\n' | ||||||
|  |             f'\n' | ||||||
|  |             f'{result!r}\n' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     else: |     else: | ||||||
|         log.runtime( |         report: str = ( | ||||||
|             'Cancelling subactor gracefully:\n\n' |             f'Cancelling subactor {repr_aid!r} gracefully..\n' | ||||||
|             f'Portal.cancel_actor() => {portal.channel.uid}\n\n' |             f'\n' | ||||||
|             f'result: {result}\n' |         ) | ||||||
|  |         canc_info: str = ( | ||||||
|  |             f'Portal.cancel_actor() => {portal.chan.uid}\n' | ||||||
|  |             f'\n' | ||||||
|  |             f'final-result => {result!r}\n' | ||||||
|  |         ) | ||||||
|  |         log.cancel( | ||||||
|  |             report | ||||||
|  |             + | ||||||
|  |             canc_info | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     # cancel the process now that we have a final result |     # cancel the process now that we have a final result | ||||||
|     await portal.cancel_actor() |     await portal.cancel_actor() | ||||||
| 
 | 
 | ||||||
|  |     if ( | ||||||
|  |         not errors.get(aid) | ||||||
|  |         # and | ||||||
|  |         # result is NoResult | ||||||
|  |     ): | ||||||
|  |         pass | ||||||
|  |         # await debug.pause(shield=True) | ||||||
|  | 
 | ||||||
|  |         # errors[aid] = ActorCancelled( | ||||||
|  |         #     message=( | ||||||
|  |         #         f'Cancelled subactor {repr_aid!r}\n' | ||||||
|  |         #         f'{canc_info}\n' | ||||||
|  |         #     ), | ||||||
|  |         #     canceller=current_actor().aid, | ||||||
|  |         #     # TODO? should we have a ack-msg? | ||||||
|  |         #     # ipc_msg=?? | ||||||
|  |         #     # boxed_type=trio.Cancelled, | ||||||
|  |         # ) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| async def hard_kill( | async def hard_kill( | ||||||
|     proc: trio.Process, |     proc: trio.Process, | ||||||
|  | @ -331,6 +369,10 @@ async def soft_kill( | ||||||
|         Awaitable, |         Awaitable, | ||||||
|     ], |     ], | ||||||
|     portal: Portal, |     portal: Portal, | ||||||
|  |     errors: dict[ | ||||||
|  |         msgtypes.Aid, | ||||||
|  |         Exception, | ||||||
|  |     ], | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|  | @ -374,8 +416,8 @@ async def soft_kill( | ||||||
|         # below. This means we try to do a graceful teardown |         # below. This means we try to do a graceful teardown | ||||||
|         # via sending a cancel message before getting out |         # via sending a cancel message before getting out | ||||||
|         # zombie killing tools. |         # zombie killing tools. | ||||||
|         async with trio.open_nursery() as n: |         async with trio.open_nursery() as tn: | ||||||
|             n.cancel_scope.shield = True |             tn.cancel_scope.shield = True | ||||||
| 
 | 
 | ||||||
|             async def cancel_on_proc_deth(): |             async def cancel_on_proc_deth(): | ||||||
|                 ''' |                 ''' | ||||||
|  | @ -385,24 +427,35 @@ async def soft_kill( | ||||||
| 
 | 
 | ||||||
|                 ''' |                 ''' | ||||||
|                 await wait_func(proc) |                 await wait_func(proc) | ||||||
|                 n.cancel_scope.cancel() |                 tn.cancel_scope.cancel() | ||||||
| 
 | 
 | ||||||
|             # start a task to wait on the termination of the |             # start a task to wait on the termination of the | ||||||
|             # process by itself waiting on a (caller provided) wait |             # process by itself waiting on a (caller provided) wait | ||||||
|             # function which should unblock when the target process |             # function which should unblock when the target process | ||||||
|             # has terminated. |             # has terminated. | ||||||
|             n.start_soon(cancel_on_proc_deth) |             tn.start_soon(cancel_on_proc_deth) | ||||||
| 
 | 
 | ||||||
|             # send the actor-runtime a cancel request. |             # send the actor-runtime a cancel request. | ||||||
|             await portal.cancel_actor() |             await portal.cancel_actor() | ||||||
| 
 | 
 | ||||||
|  |             # if not errors.get(peer_aid): | ||||||
|  |             #     errors[peer_aid] = ActorCancelled( | ||||||
|  |             #         message=( | ||||||
|  |             #             'Sub-actor cancelled gracefully by parent\n' | ||||||
|  |             #         ), | ||||||
|  |             #         canceller=current_actor().aid, | ||||||
|  |             #         # TODO? should we have a ack-msg? | ||||||
|  |             #         # ipc_msg=?? | ||||||
|  |             #         # boxed_type=trio.Cancelled, | ||||||
|  |             #     ) | ||||||
|  | 
 | ||||||
|             if proc.poll() is None:  # type: ignore |             if proc.poll() is None:  # type: ignore | ||||||
|                 log.warning( |                 log.warning( | ||||||
|                     'Subactor still alive after cancel request?\n\n' |                     'Subactor still alive after cancel request?\n\n' | ||||||
|                     f'uid: {peer_aid}\n' |                     f'uid: {peer_aid}\n' | ||||||
|                     f'|_{proc}\n' |                     f'|_{proc}\n' | ||||||
|                 ) |                 ) | ||||||
|                 n.cancel_scope.cancel() |                 tn.cancel_scope.cancel() | ||||||
|         raise |         raise | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -410,7 +463,10 @@ async def new_proc( | ||||||
|     name: str, |     name: str, | ||||||
|     actor_nursery: ActorNursery, |     actor_nursery: ActorNursery, | ||||||
|     subactor: Actor, |     subactor: Actor, | ||||||
|     errors: dict[tuple[str, str], Exception], |     errors: dict[ | ||||||
|  |         msgtypes.Aid, | ||||||
|  |         Exception, | ||||||
|  |     ], | ||||||
| 
 | 
 | ||||||
|     # passed through to actor main |     # passed through to actor main | ||||||
|     bind_addrs: list[UnwrappedAddress], |     bind_addrs: list[UnwrappedAddress], | ||||||
|  | @ -449,7 +505,10 @@ async def trio_proc( | ||||||
|     name: str, |     name: str, | ||||||
|     actor_nursery: ActorNursery, |     actor_nursery: ActorNursery, | ||||||
|     subactor: Actor, |     subactor: Actor, | ||||||
|     errors: dict[tuple[str, str], Exception], |     errors: dict[ | ||||||
|  |         msgtypes.Aid, | ||||||
|  |         Exception, | ||||||
|  |     ], | ||||||
| 
 | 
 | ||||||
|     # passed through to actor main |     # passed through to actor main | ||||||
|     bind_addrs: list[UnwrappedAddress], |     bind_addrs: list[UnwrappedAddress], | ||||||
|  | @ -572,9 +631,9 @@ async def trio_proc( | ||||||
|         with trio.CancelScope(shield=True): |         with trio.CancelScope(shield=True): | ||||||
|             await actor_nursery._join_procs.wait() |             await actor_nursery._join_procs.wait() | ||||||
| 
 | 
 | ||||||
|         async with trio.open_nursery() as nursery: |         async with trio.open_nursery() as ptl_reaper_tn: | ||||||
|             if portal in actor_nursery._cancel_after_result_on_exit: |             if portal in actor_nursery._cancel_after_result_on_exit: | ||||||
|                 nursery.start_soon( |                 ptl_reaper_tn.start_soon( | ||||||
|                     cancel_on_completion, |                     cancel_on_completion, | ||||||
|                     portal, |                     portal, | ||||||
|                     subactor, |                     subactor, | ||||||
|  | @ -587,7 +646,8 @@ async def trio_proc( | ||||||
|             await soft_kill( |             await soft_kill( | ||||||
|                 proc, |                 proc, | ||||||
|                 trio.Process.wait,  # XXX, uses `pidfd_open()` below. |                 trio.Process.wait,  # XXX, uses `pidfd_open()` below. | ||||||
|                 portal |                 portal, | ||||||
|  |                 errors, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # cancel result waiter that may have been spawned in |             # cancel result waiter that may have been spawned in | ||||||
|  | @ -596,7 +656,7 @@ async def trio_proc( | ||||||
|                 'Cancelling portal result reaper task\n' |                 'Cancelling portal result reaper task\n' | ||||||
|                 f'c)> {subactor.aid.reprol()!r}\n' |                 f'c)> {subactor.aid.reprol()!r}\n' | ||||||
|             ) |             ) | ||||||
|             nursery.cancel_scope.cancel() |             ptl_reaper_tn.cancel_scope.cancel() | ||||||
| 
 | 
 | ||||||
|     finally: |     finally: | ||||||
|         # XXX NOTE XXX: The "hard" reap since no actor zombies are |         # XXX NOTE XXX: The "hard" reap since no actor zombies are | ||||||
|  | @ -669,7 +729,10 @@ async def mp_proc( | ||||||
|     name: str, |     name: str, | ||||||
|     actor_nursery: ActorNursery,  # type: ignore  # noqa |     actor_nursery: ActorNursery,  # type: ignore  # noqa | ||||||
|     subactor: Actor, |     subactor: Actor, | ||||||
|     errors: dict[tuple[str, str], Exception], |     errors: dict[ | ||||||
|  |         msgtypes.Aid, | ||||||
|  |         Exception, | ||||||
|  |     ], | ||||||
|     # passed through to actor main |     # passed through to actor main | ||||||
|     bind_addrs: list[UnwrappedAddress], |     bind_addrs: list[UnwrappedAddress], | ||||||
|     parent_addr: UnwrappedAddress, |     parent_addr: UnwrappedAddress, | ||||||
|  | @ -794,7 +857,7 @@ async def mp_proc( | ||||||
|                     cancel_on_completion, |                     cancel_on_completion, | ||||||
|                     portal, |                     portal, | ||||||
|                     subactor, |                     subactor, | ||||||
|                     errors |                     errors, | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|             # This is a "soft" (cancellable) join/reap which |             # This is a "soft" (cancellable) join/reap which | ||||||
|  | @ -803,7 +866,8 @@ async def mp_proc( | ||||||
|             await soft_kill( |             await soft_kill( | ||||||
|                 proc, |                 proc, | ||||||
|                 proc_waiter, |                 proc_waiter, | ||||||
|                 portal |                 portal, | ||||||
|  |                 errors, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # cancel result waiter that may have been spawned in |             # cancel result waiter that may have been spawned in | ||||||
|  |  | ||||||
|  | @ -30,6 +30,9 @@ import warnings | ||||||
| import trio | import trio | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | from .msg import ( | ||||||
|  |     types as msgtypes, | ||||||
|  | ) | ||||||
| from .devx import ( | from .devx import ( | ||||||
|     debug, |     debug, | ||||||
|     pformat as _pformat, |     pformat as _pformat, | ||||||
|  | @ -48,6 +51,7 @@ from .trionics import ( | ||||||
| ) | ) | ||||||
| from ._exceptions import ( | from ._exceptions import ( | ||||||
|     ContextCancelled, |     ContextCancelled, | ||||||
|  |     ActorCancelled, | ||||||
| ) | ) | ||||||
| from ._root import ( | from ._root import ( | ||||||
|     open_root_actor, |     open_root_actor, | ||||||
|  | @ -99,7 +103,10 @@ class ActorNursery: | ||||||
|         actor: Actor, |         actor: Actor, | ||||||
|         ria_nursery: trio.Nursery, |         ria_nursery: trio.Nursery, | ||||||
|         da_nursery: trio.Nursery, |         da_nursery: trio.Nursery, | ||||||
|         errors: dict[tuple[str, str], BaseException], |         errors: dict[ | ||||||
|  |             msgtypes.Aid, | ||||||
|  |             BaseException, | ||||||
|  |         ], | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         # self.supervisor = supervisor  # TODO |         # self.supervisor = supervisor  # TODO | ||||||
|  | @ -117,9 +124,11 @@ class ActorNursery: | ||||||
|             ] |             ] | ||||||
|         ] = {} |         ] = {} | ||||||
| 
 | 
 | ||||||
|  |         # signals when it is ok to start waiting o subactor procs | ||||||
|  |         # for termination. | ||||||
|         self._join_procs = trio.Event() |         self._join_procs = trio.Event() | ||||||
|         self._at_least_one_child_in_debug: bool = False |         self._at_least_one_child_in_debug: bool = False | ||||||
|         self.errors = errors |         self._errors = errors | ||||||
|         self._scope_error: BaseException|None = None |         self._scope_error: BaseException|None = None | ||||||
|         self.exited = trio.Event() |         self.exited = trio.Event() | ||||||
| 
 | 
 | ||||||
|  | @ -260,7 +269,7 @@ class ActorNursery: | ||||||
|                 name, |                 name, | ||||||
|                 self, |                 self, | ||||||
|                 subactor, |                 subactor, | ||||||
|                 self.errors, |                 self._errors, | ||||||
|                 bind_addrs, |                 bind_addrs, | ||||||
|                 parent_addr, |                 parent_addr, | ||||||
|                 _rtv,  # run time vars |                 _rtv,  # run time vars | ||||||
|  | @ -364,7 +373,9 @@ class ActorNursery: | ||||||
|         # then `._children`.. |         # then `._children`.. | ||||||
|         children: dict = self._children |         children: dict = self._children | ||||||
|         child_count: int = len(children) |         child_count: int = len(children) | ||||||
|         msg: str = f'Cancelling actor nursery with {child_count} children\n' |         msg: str = ( | ||||||
|  |             f'Cancelling actor-nursery with {child_count} children\n' | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         server: IPCServer = self._actor.ipc_server |         server: IPCServer = self._actor.ipc_server | ||||||
| 
 | 
 | ||||||
|  | @ -391,7 +402,9 @@ class ActorNursery: | ||||||
| 
 | 
 | ||||||
|                     else: |                     else: | ||||||
|                         if portal is None:  # actor hasn't fully spawned yet |                         if portal is None:  # actor hasn't fully spawned yet | ||||||
|                             event: trio.Event = server._peer_connected[subactor.uid] |                             event: trio.Event = server._peer_connected[ | ||||||
|  |                                 subactor.uid | ||||||
|  |                             ] | ||||||
|                             log.warning( |                             log.warning( | ||||||
|                                 f"{subactor.uid} never 't finished spawning?" |                                 f"{subactor.uid} never 't finished spawning?" | ||||||
|                             ) |                             ) | ||||||
|  | @ -416,7 +429,20 @@ class ActorNursery: | ||||||
|                         # spawn cancel tasks for each sub-actor |                         # spawn cancel tasks for each sub-actor | ||||||
|                         assert portal |                         assert portal | ||||||
|                         if portal.channel.connected(): |                         if portal.channel.connected(): | ||||||
|                             tn.start_soon(portal.cancel_actor) | 
 | ||||||
|  |                             async def canc_subactor(): | ||||||
|  |                                 await portal.cancel_actor() | ||||||
|  |                                 # aid: msgtypes.Aid = subactor.aid | ||||||
|  |                                 # reprol: str = aid.reprol(sin_uuid=False) | ||||||
|  |                                 # if not self._errors.get(aid): | ||||||
|  |                                 #     self._errors[aid] = ActorCancelled( | ||||||
|  |                                 #         message=( | ||||||
|  |                                 #             f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n' | ||||||
|  |                                 #         ), | ||||||
|  |                                 #         canceller=self._actor.aid, | ||||||
|  |                                 #     ) | ||||||
|  | 
 | ||||||
|  |                             tn.start_soon(canc_subactor) | ||||||
| 
 | 
 | ||||||
|                 log.cancel(msg) |                 log.cancel(msg) | ||||||
|         # if we cancelled the cancel (we hung cancelling remote actors) |         # if we cancelled the cancel (we hung cancelling remote actors) | ||||||
|  | @ -442,6 +468,47 @@ class ActorNursery: | ||||||
|         # mark ourselves as having (tried to have) cancelled all subactors |         # mark ourselves as having (tried to have) cancelled all subactors | ||||||
|         self._join_procs.set() |         self._join_procs.set() | ||||||
| 
 | 
 | ||||||
|  |     @property | ||||||
|  |     def maybe_error(self) -> ( | ||||||
|  |         BaseException| | ||||||
|  |         BaseExceptionGroup| | ||||||
|  |         None | ||||||
|  |     ): | ||||||
|  |         ''' | ||||||
|  |         Deliver any captured scope errors including those relayed | ||||||
|  |         from subactors such as `ActorCancelled` during a non-graceful | ||||||
|  |         cancellation scenario. | ||||||
|  | 
 | ||||||
|  |         When more then a "graceful cancel" occurrs wrap all collected | ||||||
|  |         sub-exceptions in a raised `ExceptionGroup`. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         scope_exc: BaseException|None = self._scope_error | ||||||
|  | 
 | ||||||
|  |         # XXX NOTE, only pack an eg if there i at least one | ||||||
|  |         # non-actorc exception received from a subactor, OR | ||||||
|  |         # return `._scope_error` verbatim. | ||||||
|  |         if (errors := self._errors): | ||||||
|  |             # use `BaseExceptionGroup` as needed | ||||||
|  |             excs: list[BaseException] = list(errors.values()) | ||||||
|  |             if ( | ||||||
|  |                 len(excs) > 1 | ||||||
|  |                 and | ||||||
|  |                 any( | ||||||
|  |                     type(exc) not in {ActorCancelled,} | ||||||
|  |                     for exc in excs | ||||||
|  |                 ) | ||||||
|  |             ): | ||||||
|  |                 return ExceptionGroup( | ||||||
|  |                     'ActorNursery multi-errored with', | ||||||
|  |                     tuple(excs), | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |             # raise the lone subactor exc | ||||||
|  |             return list(excs)[0] | ||||||
|  | 
 | ||||||
|  |         return scope_exc | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def _open_and_supervise_one_cancels_all_nursery( | async def _open_and_supervise_one_cancels_all_nursery( | ||||||
|  | @ -457,7 +524,10 @@ async def _open_and_supervise_one_cancels_all_nursery( | ||||||
|     inner_err: BaseException|None = None |     inner_err: BaseException|None = None | ||||||
| 
 | 
 | ||||||
|     # the collection of errors retreived from spawned sub-actors |     # the collection of errors retreived from spawned sub-actors | ||||||
|     errors: dict[tuple[str, str], BaseException] = {} |     errors: dict[ | ||||||
|  |         msgtypes.Aid, | ||||||
|  |         BaseException, | ||||||
|  |     ] = {} | ||||||
| 
 | 
 | ||||||
|     # This is the outermost level "deamon actor" nursery. It is awaited |     # This is the outermost level "deamon actor" nursery. It is awaited | ||||||
|     # **after** the below inner "run in actor nursery". This allows for |     # **after** the below inner "run in actor nursery". This allows for | ||||||
|  | @ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery( | ||||||
|     # `ActorNursery.start_actor()`). |     # `ActorNursery.start_actor()`). | ||||||
| 
 | 
 | ||||||
|     # errors from this daemon actor nursery bubble up to caller |     # errors from this daemon actor nursery bubble up to caller | ||||||
|     async with ( |     try: | ||||||
|         collapse_eg(), |         async with ( | ||||||
|         trio.open_nursery() as da_nursery, |             collapse_eg(), | ||||||
|     ): |             trio.open_nursery() as da_nursery, | ||||||
|         try: |         ): | ||||||
|             # This is the inner level "run in actor" nursery. It is |             try: | ||||||
|             # awaited first since actors spawned in this way (using |                 # This is the inner level "run in actor" nursery. It is | ||||||
|             # `ActorNusery.run_in_actor()`) are expected to only |                 # awaited first since actors spawned in this way (using | ||||||
|             # return a single result and then complete (i.e. be canclled |                 # `ActorNusery.run_in_actor()`) are expected to only | ||||||
|             # gracefully). Errors collected from these actors are |                 # return a single result and then complete (i.e. be canclled | ||||||
|             # immediately raised for handling by a supervisor strategy. |                 # gracefully). Errors collected from these actors are | ||||||
|             # As such if the strategy propagates any error(s) upwards |                 # immediately raised for handling by a supervisor strategy. | ||||||
|             # the above "daemon actor" nursery will be notified. |                 # As such if the strategy propagates any error(s) upwards | ||||||
|             async with ( |                 # the above "daemon actor" nursery will be notified. | ||||||
|                 collapse_eg(), |                 async with ( | ||||||
|                 trio.open_nursery() as ria_nursery, |                     collapse_eg(), | ||||||
|             ): |                     trio.open_nursery() as ria_nursery, | ||||||
|                 an = ActorNursery( |                 ): | ||||||
|                     actor, |                     an = ActorNursery( | ||||||
|                     ria_nursery, |                         actor, | ||||||
|                     da_nursery, |                         ria_nursery, | ||||||
|                     errors |                         da_nursery, | ||||||
|                 ) |                         errors | ||||||
|                 try: |  | ||||||
|                     # spawning of actors happens in the caller's scope |  | ||||||
|                     # after we yield upwards |  | ||||||
|                     yield an |  | ||||||
| 
 |  | ||||||
|                     # When we didn't error in the caller's scope, |  | ||||||
|                     # signal all process-monitor-tasks to conduct |  | ||||||
|                     # the "hard join phase". |  | ||||||
|                     log.runtime( |  | ||||||
|                         'Waiting on subactors to complete:\n' |  | ||||||
|                         f'>}} {len(an._children)}\n' |  | ||||||
|                     ) |                     ) | ||||||
|                     an._join_procs.set() |                     try: | ||||||
|  |                         # spawning of actors happens in the caller's scope | ||||||
|  |                         # after we yield upwards | ||||||
|  |                         yield an | ||||||
| 
 | 
 | ||||||
|                 except BaseException as _inner_err: |                         # When we didn't error in the caller's scope, | ||||||
|                     inner_err = _inner_err |                         # signal all process-monitor-tasks to conduct | ||||||
|                     errors[actor.uid] = inner_err |                         # the "hard join phase". | ||||||
|  |                         log.runtime( | ||||||
|  |                             'Waiting on subactors to complete:\n' | ||||||
|  |                             f'>}} {len(an._children)}\n' | ||||||
|  |                         ) | ||||||
|  |                         an._join_procs.set() | ||||||
| 
 | 
 | ||||||
|                     # If we error in the root but the debugger is |                     except BaseException as _inner_err: | ||||||
|                     # engaged we don't want to prematurely kill (and |                         inner_err = _inner_err | ||||||
|                     # thus clobber access to) the local tty since it |                         # errors[actor.aid] = inner_err | ||||||
|                     # will make the pdb repl unusable. |  | ||||||
|                     # Instead try to wait for pdb to be released before |  | ||||||
|                     # tearing down. |  | ||||||
|                     await debug.maybe_wait_for_debugger( |  | ||||||
|                         child_in_debug=an._at_least_one_child_in_debug |  | ||||||
|                     ) |  | ||||||
| 
 | 
 | ||||||
|                     # if the caller's scope errored then we activate our |                         # If we error in the root but the debugger is | ||||||
|                     # one-cancels-all supervisor strategy (don't |                         # engaged we don't want to prematurely kill (and | ||||||
|                     # worry more are coming). |                         # thus clobber access to) the local tty since it | ||||||
|                     an._join_procs.set() |                         # will make the pdb repl unusable. | ||||||
|  |                         # Instead try to wait for pdb to be released before | ||||||
|  |                         # tearing down. | ||||||
|  |                         await debug.maybe_wait_for_debugger( | ||||||
|  |                             child_in_debug=an._at_least_one_child_in_debug | ||||||
|  |                         ) | ||||||
| 
 | 
 | ||||||
|                     # XXX NOTE XXX: hypothetically an error could |                         # if the caller's scope errored then we activate our | ||||||
|                     # be raised and then a cancel signal shows up |                         # one-cancels-all supervisor strategy (don't | ||||||
|                     # slightly after in which case the `else:` |                         # worry more are coming). | ||||||
|                     # block here might not complete?  For now, |                         an._join_procs.set() | ||||||
|                     # shield both. |  | ||||||
|                     with trio.CancelScope(shield=True): |  | ||||||
|                         etype: type = type(inner_err) |  | ||||||
|                         if etype in ( |  | ||||||
|                             trio.Cancelled, |  | ||||||
|                             KeyboardInterrupt, |  | ||||||
|                         ) or ( |  | ||||||
|                             is_multi_cancelled(inner_err) |  | ||||||
|                         ): |  | ||||||
|                             log.cancel( |  | ||||||
|                                 f'Actor-nursery cancelled by {etype}\n\n' |  | ||||||
| 
 | 
 | ||||||
|                                 f'{current_actor().uid}\n' |                         # XXX NOTE XXX: hypothetically an error could | ||||||
|                                 f' |_{an}\n\n' |                         # be raised and then a cancel signal shows up | ||||||
|  |                         # slightly after in which case the `else:` | ||||||
|  |                         # block here might not complete?  For now, | ||||||
|  |                         # shield both. | ||||||
|  |                         with trio.CancelScope(shield=True): | ||||||
|  |                             etype: type = type(inner_err) | ||||||
|  |                             if etype in ( | ||||||
|  |                                 trio.Cancelled, | ||||||
|  |                                 KeyboardInterrupt, | ||||||
|  |                             ) or ( | ||||||
|  |                                 is_multi_cancelled(inner_err) | ||||||
|  |                             ): | ||||||
|  |                                 log.cancel( | ||||||
|  |                                     f'Actor-nursery cancelled by {etype}\n\n' | ||||||
| 
 | 
 | ||||||
|                                 # TODO: show tb str? |                                     f'{current_actor().uid}\n' | ||||||
|                                 # f'{tb_str}' |                                     f' |_{an}\n\n' | ||||||
|                             ) |  | ||||||
|                         elif etype in { |  | ||||||
|                             ContextCancelled, |  | ||||||
|                         }: |  | ||||||
|                             log.cancel( |  | ||||||
|                                 'Actor-nursery caught remote cancellation\n' |  | ||||||
|                                 '\n' |  | ||||||
|                                 f'{inner_err.tb_str}' |  | ||||||
|                             ) |  | ||||||
|                         else: |  | ||||||
|                             log.exception( |  | ||||||
|                                 'Nursery errored with:\n' |  | ||||||
| 
 | 
 | ||||||
|                                 # TODO: same thing as in |                                     # TODO: show tb str? | ||||||
|                                 # `._invoke()` to compute how to |                                     # f'{tb_str}' | ||||||
|                                 # place this div-line in the |                                 ) | ||||||
|                                 # middle of the above msg |                             elif etype in { | ||||||
|                                 # content.. |                                 ContextCancelled, | ||||||
|                                 # -[ ] prolly helper-func it too |                             }: | ||||||
|                                 #   in our `.log` module.. |                                 log.cancel( | ||||||
|                                 # '------ - ------' |                                     'Actor-nursery caught remote cancellation\n' | ||||||
|                             ) |                                     '\n' | ||||||
|  |                                     f'{inner_err.tb_str}' | ||||||
|  |                                 ) | ||||||
|  |                             else: | ||||||
|  |                                 log.exception( | ||||||
|  |                                     'Nursery errored with:\n' | ||||||
| 
 | 
 | ||||||
|                         # cancel all subactors |                                     # TODO: same thing as in | ||||||
|                         await an.cancel() |                                     # `._invoke()` to compute how to | ||||||
|  |                                     # place this div-line in the | ||||||
|  |                                     # middle of the above msg | ||||||
|  |                                     # content.. | ||||||
|  |                                     # -[ ] prolly helper-func it too | ||||||
|  |                                     #   in our `.log` module.. | ||||||
|  |                                     # '------ - ------' | ||||||
|  |                                 ) | ||||||
| 
 | 
 | ||||||
|             # ria_nursery scope end |                             # cancel all subactors | ||||||
|  |                             await an.cancel() | ||||||
| 
 | 
 | ||||||
|         # TODO: this is the handler around the ``.run_in_actor()`` |                 # ria_nursery scope end | ||||||
|         # nursery. Ideally we can drop this entirely in the future as |  | ||||||
|         # the whole ``.run_in_actor()`` API should be built "on top of" |  | ||||||
|         # this lower level spawn-request-cancel "daemon actor" API where |  | ||||||
|         # a local in-actor task nursery is used with one-to-one task |  | ||||||
|         # + `await Portal.run()` calls and the results/errors are |  | ||||||
|         # handled directly (inline) and errors by the local nursery. |  | ||||||
|         except ( |  | ||||||
|             Exception, |  | ||||||
|             BaseExceptionGroup, |  | ||||||
|             trio.Cancelled |  | ||||||
|         ) as _outer_err: |  | ||||||
|             outer_err = _outer_err |  | ||||||
| 
 | 
 | ||||||
|             an._scope_error = outer_err or inner_err |             # TODO: this is the handler around the ``.run_in_actor()`` | ||||||
|  |             # nursery. Ideally we can drop this entirely in the future as | ||||||
|  |             # the whole ``.run_in_actor()`` API should be built "on top of" | ||||||
|  |             # this lower level spawn-request-cancel "daemon actor" API where | ||||||
|  |             # a local in-actor task nursery is used with one-to-one task | ||||||
|  |             # + `await Portal.run()` calls and the results/errors are | ||||||
|  |             # handled directly (inline) and errors by the local nursery. | ||||||
|  |             except ( | ||||||
|  |                 Exception, | ||||||
|  |                 BaseExceptionGroup, | ||||||
|  |                 trio.Cancelled | ||||||
|  |             ) as _outer_err: | ||||||
|  |                 outer_err = _outer_err | ||||||
| 
 | 
 | ||||||
|             # XXX: yet another guard before allowing the cancel |                 # XXX: yet another guard before allowing the cancel | ||||||
|             # sequence in case a (single) child is in debug. |                 # sequence in case a (single) child is in debug. | ||||||
|             await debug.maybe_wait_for_debugger( |                 await debug.maybe_wait_for_debugger( | ||||||
|                 child_in_debug=an._at_least_one_child_in_debug |                     child_in_debug=an._at_least_one_child_in_debug | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             # If actor-local error was raised while waiting on |  | ||||||
|             # ".run_in_actor()" actors then we also want to cancel all |  | ||||||
|             # remaining sub-actors (due to our lone strategy: |  | ||||||
|             # one-cancels-all). |  | ||||||
|             if an._children: |  | ||||||
|                 log.cancel( |  | ||||||
|                     'Actor-nursery cancelling due error type:\n' |  | ||||||
|                     f'{outer_err}\n' |  | ||||||
|                 ) |                 ) | ||||||
|                 with trio.CancelScope(shield=True): |  | ||||||
|                     await an.cancel() |  | ||||||
|             raise |  | ||||||
| 
 | 
 | ||||||
|         finally: |                 # If actor-local error was raised while waiting on | ||||||
|             # No errors were raised while awaiting ".run_in_actor()" |                 # ".run_in_actor()" actors then we also want to cancel all | ||||||
|             # actors but those actors may have returned remote errors as |                 # remaining sub-actors (due to our lone strategy: | ||||||
|             # results (meaning they errored remotely and have relayed |                 # one-cancels-all). | ||||||
|             # those errors back to this parent actor). The errors are |  | ||||||
|             # collected in ``errors`` so cancel all actors, summarize |  | ||||||
|             # all errors and re-raise. |  | ||||||
|             if errors: |  | ||||||
|                 if an._children: |                 if an._children: | ||||||
|  |                     log.cancel( | ||||||
|  |                         'Actor-nursery cancelling due error type:\n' | ||||||
|  |                         f'{outer_err}\n' | ||||||
|  |                     ) | ||||||
|                     with trio.CancelScope(shield=True): |                     with trio.CancelScope(shield=True): | ||||||
|                         await an.cancel() |                         await an.cancel() | ||||||
| 
 | 
 | ||||||
|                 # use `BaseExceptionGroup` as needed |                 raise | ||||||
|                 if len(errors) > 1: |  | ||||||
|                     raise BaseExceptionGroup( |  | ||||||
|                         'tractor.ActorNursery errored with', |  | ||||||
|                         tuple(errors.values()), |  | ||||||
|                     ) |  | ||||||
|                 else: |  | ||||||
|                     raise list(errors.values())[0] |  | ||||||
| 
 | 
 | ||||||
|             # show frame on any (likely) internal error |             finally: | ||||||
|             if ( |                 scope_exc = an._scope_error = outer_err or inner_err | ||||||
|                 not an.cancelled |                 # await debug.pause(shield=True) | ||||||
|                 and an._scope_error |                 # if scope_exc: | ||||||
|             ): |                 #     errors[actor.aid] = scope_exc | ||||||
|                 __tracebackhide__: bool = False |  | ||||||
| 
 | 
 | ||||||
|         # da_nursery scope end - nursery checkpoint |                 # show this frame on any internal error | ||||||
|     # final exit |                 if ( | ||||||
|  |                     not an.cancelled | ||||||
|  |                     and | ||||||
|  |                     scope_exc | ||||||
|  |                 ): | ||||||
|  |                     __tracebackhide__: bool = False | ||||||
|  | 
 | ||||||
|  |                 # NOTE, it's possible no errors were raised while | ||||||
|  |                 # awaiting ".run_in_actor()" actors but those | ||||||
|  |                 # sub-actors may have delivered remote errors as | ||||||
|  |                 # results, normally captured via machinery in | ||||||
|  |                 # `._spawn.cancel_on_completion()`. | ||||||
|  |                 # | ||||||
|  |                 # Any such remote errors are collected in `an._errors` | ||||||
|  |                 # which is summarized via `ActorNursery.maybe_error` | ||||||
|  |                 # which is maybe re-raised in an outer block (below). | ||||||
|  |                 # | ||||||
|  |                 # So here we first cancel all subactors the summarize | ||||||
|  |                 # all errors and then later (in that outer block) | ||||||
|  |                 # maybe-raise on a "non-graceful" cancellation | ||||||
|  |                 # outcome, normally as a summary EG. | ||||||
|  |                 if ( | ||||||
|  |                     scope_exc | ||||||
|  |                     or | ||||||
|  |                     errors | ||||||
|  |                 ): | ||||||
|  | 
 | ||||||
|  |                     if an._children: | ||||||
|  |                         with trio.CancelScope(shield=True): | ||||||
|  |                             await an.cancel() | ||||||
|  | 
 | ||||||
|  |                     # cancel outer tn so we unblock outside this | ||||||
|  |                     # finally! | ||||||
|  |                     da_nursery.cance_scope.cancel() | ||||||
|  |                     # | ||||||
|  |                     # ^TODO? still don't get why needed? | ||||||
|  |                     # - an.cancel() should cause all spawn-subtasks | ||||||
|  |                     #   to eventually exit? | ||||||
|  |                     # - also, could (instead) we sync to an event here before | ||||||
|  |                     #   (ever) calling `an.cancel()`?? | ||||||
|  | 
 | ||||||
|  |         # `da_nursery` scope end, thus a checkpoint. | ||||||
|  |     finally: | ||||||
|  | 
 | ||||||
|  |             # raise any eg compiled from all subs | ||||||
|  |             # ??TODO should we also adopt strict-egs here like | ||||||
|  |             # `trio.Nursery`?? | ||||||
|  |             # | ||||||
|  |             # XXX justification notes, | ||||||
|  |             # docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups | ||||||
|  |             # anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888 | ||||||
|  |             # gh: https://github.com/python-trio/trio/issues/611 | ||||||
|  |         if an_exc := an.maybe_error: | ||||||
|  |             raise an_exc | ||||||
|  | 
 | ||||||
|  |         if scope_exc := an._scope_error: | ||||||
|  |             raise scope_exc | ||||||
|  | 
 | ||||||
|  |     # @acm-fn scope exit | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| _shutdown_msg: str = ( | _shutdown_msg: str = ( | ||||||
|  | @ -648,7 +754,7 @@ _shutdown_msg: str = ( | ||||||
| # @api_frame | # @api_frame | ||||||
| async def open_nursery( | async def open_nursery( | ||||||
|     *,  # named params only! |     *,  # named params only! | ||||||
|     hide_tb: bool = True, |     hide_tb: bool = False, | ||||||
|     **kwargs, |     **kwargs, | ||||||
|     # ^TODO, paramspec for `open_root_actor()` |     # ^TODO, paramspec for `open_root_actor()` | ||||||
| 
 | 
 | ||||||
|  | @ -684,16 +790,21 @@ async def open_nursery( | ||||||
|             # mark us for teardown on exit |             # mark us for teardown on exit | ||||||
|             implicit_runtime: bool = True |             implicit_runtime: bool = True | ||||||
| 
 | 
 | ||||||
|             async with open_root_actor( |             async with ( | ||||||
|                 hide_tb=hide_tb, |                 # collapse_eg(hide_tb=hide_tb), | ||||||
|                 **kwargs, |                 open_root_actor( | ||||||
|             ) as actor: |                     hide_tb=hide_tb, | ||||||
|  |                     **kwargs, | ||||||
|  |                 ) as actor, | ||||||
|  |             ): | ||||||
|                 assert actor is current_actor() |                 assert actor is current_actor() | ||||||
| 
 | 
 | ||||||
|                 try: |                 try: | ||||||
|                     async with _open_and_supervise_one_cancels_all_nursery( |                     async with ( | ||||||
|                         actor |                         _open_and_supervise_one_cancels_all_nursery( | ||||||
|                     ) as an: |                             actor | ||||||
|  |                         ) as an | ||||||
|  |                     ): | ||||||
| 
 | 
 | ||||||
|                         # NOTE: mark this nursery as having |                         # NOTE: mark this nursery as having | ||||||
|                         # implicitly started the root actor so |                         # implicitly started the root actor so | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue