From 0ac564dbf3e48075fd23ca6c2bec23c72c581857 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jul 2018 15:27:02 -0400 Subject: [PATCH] Only cancel channel spawned rpc tasks when explicitly notified --- piker/tractor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/piker/tractor.py b/piker/tractor.py index 361e9a07..cb7e121d 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -234,6 +234,10 @@ class Actor: assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + waiters = q.statistics().tasks_waiting_get + if not waiters: + log.warn( + f"No tasks are currently waiting for results from call {cid}?") q.put_nowait(msg) def get_waitq(self, actorid, cid): @@ -264,6 +268,8 @@ class Actor: try: async for msg in chan.aiter_recv(): if msg is None: # terminate sentinel + log.debug(f"Cancelling all tasks for {chan}") + nursery.cancel_scope.cancel() log.debug(f"Terminating msg loop for {chan}") break log.debug(f"Received msg {msg}") @@ -312,8 +318,6 @@ class Actor: except trio.ClosedStreamError: log.error(f"{chan} broke") - log.debug(f"Cancelling all tasks for {chan}") - nursery.cancel_scope.cancel() log.debug(f"Exiting msg loop for {chan}") def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):