From 66137030d9f8850a0750a4b923bb735a11d237f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Oct 2021 16:36:19 -0400 Subject: [PATCH] Try out always delivering `ContextCancelled` Previously, on a task cancel request there was no real response other then the `None` returned from the `Actor._cancel_task()` method and sometimes this might get lost if the cancel task was cancelled by a runtime cancel request (i.e. an "actor cancel"). Instead let's try always checking if the task's cancel scope is cancelled and if so relay back to the caller a `ContextCancelled` which can then be explicitly handled by actor nursery machinery as well as individual cancel APIs (`Portal.cancel_actor()`, and maybe later if we decide to expose the `tractor.Context` on every `Portal.run()` call). Also, - fix up a bunch of cancellation related logging - add an `Actor.cancel_called` flag much like `trio`'s cancel scope --- tractor/_actor.py | 84 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 6279832..31ef862 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -194,7 +194,17 @@ async def _invoke( task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except (Exception, trio.MultiError) as err: + if cs.cancelled_caught: + raise ContextCancelled( + 'cancelled', + suberror_type=trio.Cancelled, + ) + + except ( + Exception, + trio.MultiError, + # trio.Cancelled, + ) as err: if not is_multi_cancelled(err): @@ -248,7 +258,7 @@ async def _invoke( # If we're cancelled before the task returns then the # cancel scope will not have been inserted yet log.warning( - f"Task {func} likely errored or cancelled before it started") + f"Task {func} likely errored or cancelled before start") finally: if not actor._rpc_tasks: log.runtime("All RPC tasks have completed") @@ -358,6 +368,14 @@ class Actor: Tuple[Any, Any, Any, Any, Any]] = None self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa + @property + def cancel_called(self) -> bool: + ''' + Same principle as ``trio.CancelScope.cancel_called``. + + ''' + return self._cancel_called + async def wait_for_peer( self, uid: Tuple[str, str] ) -> Tuple[trio.Event, Channel]: @@ -618,6 +636,7 @@ class Actor: # worked out we'll likely want to use that! msg = None nursery_cancelled_before_task: bool = False + uid = chan.uid log.runtime(f"Entering msg loop for {chan} from {chan.uid}") try: @@ -641,7 +660,7 @@ class Actor: log.runtime( f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") + f" {chan} from {uid}") break @@ -676,38 +695,46 @@ class Actor: f"{ns}.{funcname}({kwargs})") if ns == 'self': func = getattr(self, funcname) - if funcname == 'cancel': - # don't start entire actor runtime cancellation if this - # actor is in debug mode + if funcname == 'cancel': + # self.cancel() was called so kill this + # msg loop and break out into + # ``_async_main()`` + + log.cancel( + f"{self.uid} remote cancel msg from {uid}") + + # don't start entire actor runtime + # cancellation if this actor is in debug + # mode pdb_complete = _debug._local_pdb_complete if pdb_complete: + log.cancel( + f'{self.uid} is in debug, wait for unlock') await pdb_complete.wait() - # we immediately start the runtime machinery shutdown + # we immediately start the runtime machinery + # shutdown with trio.CancelScope(shield=True): - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` - log.cancel( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await _invoke(self, cid, chan, func, kwargs, is_rpc=False) - # await self._cancel_complete.wait() + await _invoke( + self, cid, chan, func, kwargs, is_rpc=False + ) loop_cs.cancel() - break + continue if funcname == '_cancel_task': + task_cid = kwargs['cid'] + log.cancel( + f'Actor {uid} requests cancel for {task_cid}') - # we immediately start the runtime machinery shutdown + # we immediately start the runtime machinery + # shutdown with trio.CancelScope(shield=True): - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` kwargs['chan'] = chan - log.cancel( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + await _invoke( + self, cid, chan, func, kwargs, is_rpc=False + ) continue else: # complain to client about restricted modules @@ -752,7 +779,8 @@ class Actor: log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") - # end of async for, channel disconnect vis ``trio.EndOfChannel`` + # end of async for, channel disconnect vis + # ``trio.EndOfChannel`` log.runtime( f"{chan} for {chan.uid} disconnected, cancelling tasks" ) @@ -1193,7 +1221,10 @@ class Actor: tasks = self._rpc_tasks if tasks: log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") - for (chan, cid), (scope, func, is_complete) in tasks.copy().items(): + for ( + (chan, cid), + (scope, func, is_complete), + ) in tasks.copy().items(): if only_chan is not None: if only_chan != chan: continue @@ -1202,8 +1233,9 @@ class Actor: if func != self._cancel_task: await self._cancel_task(cid, chan) - log.cancel( - f"Waiting for remaining rpc tasks to complete {tasks}") + if tasks: + log.cancel( + f"Waiting for remaining rpc tasks to complete {tasks}") await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: