Wake all sleeping consumers on bcaster closure

context_caching
Tyler Goodlet 2021-12-15 16:20:57 -05:00
parent 213447008b
commit 11e64426f6
1 changed files with 7 additions and 1 deletions

View File

@ -331,10 +331,16 @@ class BroadcastReceiver(ReceiveChannel):
if self._closed:
return
# if there are sleeping consumers wake
# them on closure.
rr = self._state.recv_ready
if rr:
_, event = rr
event.set()
# XXX: leaving it like this consumers can still get values
# up to the last received that still reside in the queue.
self._state.subs.pop(self.key)
self._closed = True