diff --git a/tractor/_actor.py b/tractor/_actor.py index c75647e..5b5e7ff 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -144,7 +144,7 @@ async def _invoke( if not actor._rpc_tasks: log.info(f"All RPC tasks have completed") - actor._no_more_rpc_tasks.set() + actor._ongoing_rpc_tasks.set() class Actor: @@ -183,9 +183,8 @@ class Actor: self._peer_connected: dict = {} self._no_more_peers = trio.Event() self._no_more_peers.set() - - self._no_more_rpc_tasks = trio.Event() - self._no_more_rpc_tasks.set() + self._ongoing_rpc_tasks = trio.Event() + self._ongoing_rpc_tasks.set() # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: Dict[ Tuple[Channel, str], @@ -234,7 +233,7 @@ class Actor: ) -> None: """Entry point for new inbound connections to the channel server. """ - self._no_more_peers.clear() + self._no_more_peers = trio.Event() chan = Channel(stream=stream) log.info(f"New connection to us {chan}") @@ -448,7 +447,7 @@ class Actor: f"{cs}") else: # mark that we have ongoing rpc tasks - self._no_more_rpc_tasks.clear() + self._ongoing_rpc_tasks = trio.Event() log.info(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested @@ -709,10 +708,9 @@ class Actor: for (chan, cid) in tasks.copy(): # TODO: this should really done in a nursery batch await self._cancel_task(cid, chan) - # if tasks: log.info( f"Waiting for remaining rpc tasks to complete {tasks}") - await self._no_more_rpc_tasks.wait() + await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby