Only cancel channel spawned rpc tasks when explicitly notified

asyncgen_closing_fix
Tyler Goodlet 2018-07-05 15:27:02 -04:00
parent 56d3f6cffb
commit b1ad909c54
1 changed files with 6 additions and 2 deletions

View File

@ -234,6 +234,10 @@ class Actor:
assert actorid, f"`actorid` can't be {actorid}" assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid) q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
waiters = q.statistics().tasks_waiting_get
if not waiters:
log.warn(
f"No tasks are currently waiting for results from call {cid}?")
q.put_nowait(msg) q.put_nowait(msg)
def get_waitq(self, actorid, cid): def get_waitq(self, actorid, cid):
@ -264,6 +268,8 @@ class Actor:
try: try:
async for msg in chan.aiter_recv(): async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel if msg is None: # terminate sentinel
log.debug(f"Cancelling all tasks for {chan}")
nursery.cancel_scope.cancel()
log.debug(f"Terminating msg loop for {chan}") log.debug(f"Terminating msg loop for {chan}")
break break
log.debug(f"Received msg {msg}") log.debug(f"Received msg {msg}")
@ -312,8 +318,6 @@ class Actor:
except trio.ClosedStreamError: except trio.ClosedStreamError:
log.error(f"{chan} broke") log.error(f"{chan} broke")
log.debug(f"Cancelling all tasks for {chan}")
nursery.cancel_scope.cancel()
log.debug(f"Exiting msg loop for {chan}") log.debug(f"Exiting msg loop for {chan}")
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):