diff --git a/piker/tractor.py b/piker/tractor.py index 361e9a0..cb7e121 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -234,6 +234,10 @@ class Actor: assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + waiters = q.statistics().tasks_waiting_get + if not waiters: + log.warn( + f"No tasks are currently waiting for results from call {cid}?") q.put_nowait(msg) def get_waitq(self, actorid, cid): @@ -264,6 +268,8 @@ class Actor: try: async for msg in chan.aiter_recv(): if msg is None: # terminate sentinel + log.debug(f"Cancelling all tasks for {chan}") + nursery.cancel_scope.cancel() log.debug(f"Terminating msg loop for {chan}") break log.debug(f"Received msg {msg}") @@ -312,8 +318,6 @@ class Actor: except trio.ClosedStreamError: log.error(f"{chan} broke") - log.debug(f"Cancelling all tasks for {chan}") - nursery.cancel_scope.cancel() log.debug(f"Exiting msg loop for {chan}") def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):