From bb9d9c74b1b9f9ae053f75831dd4716ae2addb84 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Oct 2021 11:42:32 -0400 Subject: [PATCH] Do immediate remote task cancels As for `Actor.cancel()` requests, do the same for `Actor._cancel_task()` but use `_invoke()` to ensure correct msg transactions with caller. Don't cancel task cancels on a cancel-all-tasks operation in attempt at more determinism. --- tractor/_actor.py | 63 +++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 9416260..e85b16b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -682,29 +682,30 @@ class Actor: await pdb_complete.wait() # we immediately start the runtime machinery shutdown - await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + with trio.CancelScope(shield=True): + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + # await self._cancel_complete.wait() - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` - log.cancel( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await self._cancel_complete.wait() loop_cs.cancel() break - elif funcname == '_cancel_task': - - # XXX: a special case is made here for - # remote calls since we don't want the - # remote actor have to know which channel - # the task is associated with and we can't - # pass non-primitive types between actors. - # This means you can use: - # Portal.run('self', '_cancel_task, cid=did) - # without passing the `chan` arg. - kwargs['chan'] = chan + if funcname == '_cancel_task': + # we immediately start the runtime machinery shutdown + with trio.CancelScope(shield=True): + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + kwargs['chan'] = chan + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + continue else: # complain to client about restricted modules try: @@ -995,7 +996,7 @@ class Actor: raise finally: - log.info("Root nursery complete") + log.info("Runtime nursery complete") # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? @@ -1094,7 +1095,7 @@ class Actor: self._service_n.start_soon(self.cancel) async def cancel(self) -> bool: - """Cancel this actor. + """Cancel this actor's runtime. The "deterministic" teardown sequence in order is: - cancel all ongoing rpc tasks by cancel scope @@ -1187,18 +1188,20 @@ class Actor: registered for each. """ tasks = self._rpc_tasks - log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") - for (chan, cid) in tasks.copy(): - if only_chan is not None: - if only_chan != chan: - continue + if tasks: + log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") + for (chan, cid), (scope, func, is_complete) in tasks.copy().items(): + if only_chan is not None: + if only_chan != chan: + continue - # TODO: this should really done in a nursery batch - await self._cancel_task(cid, chan) + # TODO: this should really done in a nursery batch + if func != self._cancel_task: + await self._cancel_task(cid, chan) - log.cancel( - f"Waiting for remaining rpc tasks to complete {tasks}") - await self._ongoing_rpc_tasks.wait() + log.cancel( + f"Waiting for remaining rpc tasks to complete {tasks}") + await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby