From 573b8fef73fb515695093354fe0d470ab5182814 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 13 Oct 2020 11:48:52 -0400 Subject: [PATCH] Add better actor cancellation tracking Add `Actor._cancel_called` and `._cancel_complete` making it possible to determine whether the actor has started the cancellation sequence and whether that sequence has fully completed. This allows for blocking in internal machinery tasks as necessary. Also, always trigger the end of ongoing rpc tasks even if the last task errors; there's no guarantee the trio cancellation semantics will guarantee us a nice internal "state" without this. --- tractor/_actor.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 80a7dd5..f4b9795 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -152,10 +152,10 @@ async def _invoke( # cancel scope will not have been inserted yet log.warn( f"Task {func} likely errored or cancelled before it started") - - if not actor._rpc_tasks: - log.info("All RPC tasks have completed") - actor._ongoing_rpc_tasks.set() + finally: + if not actor._rpc_tasks: + log.info("All RPC tasks have completed") + actor._ongoing_rpc_tasks.set() def _get_mod_abspath(module): @@ -198,7 +198,9 @@ class Actor: """ self.name = name self.uid = (name, uid or str(uuid.uuid4())) - self._is_cancelled: bool = False + + self._cancel_complete = trio.Event() + self._cancel_called: bool = False # retreive and store parent `__main__` data which # will be passed to children @@ -531,7 +533,10 @@ class Actor: else: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` - log.warning(f"{self.uid} was remotely cancelled") + log.warning( + f"{self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await self._cancel_complete.wait() loop_cs.cancel() break @@ -540,8 +545,9 @@ class Actor: else: # channel disconnect log.debug( - f"{chan} from {chan.uid} disconnected, cancelling all rpc tasks") - await self.cancel_rpc_tasks(chan) + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + self.cancel_rpc_tasks(chan) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -833,7 +839,8 @@ class Actor: spawning new rpc tasks - return control the parent channel message loop """ - self._is_cancelled = True + log.warning(f"{self.uid} is trying to cancel") + self._cancel_called = True # cancel all ongoing rpc tasks with trio.CancelScope(shield=True): @@ -848,14 +855,16 @@ class Actor: # kill all ongoing tasks await self.cancel_rpc_tasks() + # cancel all rpc tasks permanently + if self._service_n: + self._service_n.cancel_scope.cancel() + # stop channel server self.cancel_server() await self._server_down.wait() - # rekt all channel loops - if self._service_n: - self._service_n.cancel_scope.cancel() - + log.warning(f"{self.uid} was sucessfullly cancelled") + self._cancel_complete.set() return True # XXX: hard kill logic if needed? @@ -895,6 +904,9 @@ class Actor: scope.cancel() # wait for _invoke to mark the task complete + log.debug( + f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" + f"peer: {chan.uid}\n") await is_complete.wait() log.debug( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"