Only cancel channel spawned rpc tasks when explicitly notified
parent
e395845ddb
commit
0ac564dbf3
|
@ -234,6 +234,10 @@ class Actor:
|
||||||
assert actorid, f"`actorid` can't be {actorid}"
|
assert actorid, f"`actorid` can't be {actorid}"
|
||||||
q = self.get_waitq(actorid, cid)
|
q = self.get_waitq(actorid, cid)
|
||||||
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
||||||
|
waiters = q.statistics().tasks_waiting_get
|
||||||
|
if not waiters:
|
||||||
|
log.warn(
|
||||||
|
f"No tasks are currently waiting for results from call {cid}?")
|
||||||
q.put_nowait(msg)
|
q.put_nowait(msg)
|
||||||
|
|
||||||
def get_waitq(self, actorid, cid):
|
def get_waitq(self, actorid, cid):
|
||||||
|
@ -264,6 +268,8 @@ class Actor:
|
||||||
try:
|
try:
|
||||||
async for msg in chan.aiter_recv():
|
async for msg in chan.aiter_recv():
|
||||||
if msg is None: # terminate sentinel
|
if msg is None: # terminate sentinel
|
||||||
|
log.debug(f"Cancelling all tasks for {chan}")
|
||||||
|
nursery.cancel_scope.cancel()
|
||||||
log.debug(f"Terminating msg loop for {chan}")
|
log.debug(f"Terminating msg loop for {chan}")
|
||||||
break
|
break
|
||||||
log.debug(f"Received msg {msg}")
|
log.debug(f"Received msg {msg}")
|
||||||
|
@ -312,8 +318,6 @@ class Actor:
|
||||||
except trio.ClosedStreamError:
|
except trio.ClosedStreamError:
|
||||||
log.error(f"{chan} broke")
|
log.error(f"{chan} broke")
|
||||||
|
|
||||||
log.debug(f"Cancelling all tasks for {chan}")
|
|
||||||
nursery.cancel_scope.cancel()
|
|
||||||
log.debug(f"Exiting msg loop for {chan}")
|
log.debug(f"Exiting msg loop for {chan}")
|
||||||
|
|
||||||
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
|
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
|
||||||
|
|
Loading…
Reference in New Issue