diff --git a/tractor/_actor.py b/tractor/_actor.py index 80a7dd5..f4b9795 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -152,10 +152,10 @@ async def _invoke( # cancel scope will not have been inserted yet log.warn( f"Task {func} likely errored or cancelled before it started") - - if not actor._rpc_tasks: - log.info("All RPC tasks have completed") - actor._ongoing_rpc_tasks.set() + finally: + if not actor._rpc_tasks: + log.info("All RPC tasks have completed") + actor._ongoing_rpc_tasks.set() def _get_mod_abspath(module): @@ -198,7 +198,9 @@ class Actor: """ self.name = name self.uid = (name, uid or str(uuid.uuid4())) - self._is_cancelled: bool = False + + self._cancel_complete = trio.Event() + self._cancel_called: bool = False # retreive and store parent `__main__` data which # will be passed to children @@ -531,7 +533,10 @@ class Actor: else: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` - log.warning(f"{self.uid} was remotely cancelled") + log.warning( + f"{self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await self._cancel_complete.wait() loop_cs.cancel() break @@ -540,8 +545,9 @@ class Actor: else: # channel disconnect log.debug( - f"{chan} from {chan.uid} disconnected, cancelling all rpc tasks") - await self.cancel_rpc_tasks(chan) + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + self.cancel_rpc_tasks(chan) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -833,7 +839,8 @@ class Actor: spawning new rpc tasks - return control the parent channel message loop """ - self._is_cancelled = True + log.warning(f"{self.uid} is trying to cancel") + self._cancel_called = True # cancel all ongoing rpc tasks with trio.CancelScope(shield=True): @@ -848,14 +855,16 @@ class Actor: # kill all ongoing tasks await self.cancel_rpc_tasks() + # cancel all rpc tasks permanently + if self._service_n: + self._service_n.cancel_scope.cancel() + # stop channel server self.cancel_server() await self._server_down.wait() - # rekt all channel loops - if self._service_n: - self._service_n.cancel_scope.cancel() - + log.warning(f"{self.uid} was sucessfullly cancelled") + self._cancel_complete.set() return True # XXX: hard kill logic if needed? @@ -895,6 +904,9 @@ class Actor: scope.cancel() # wait for _invoke to mark the task complete + log.debug( + f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" + f"peer: {chan.uid}\n") await is_complete.wait() log.debug( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"