Don't wake sibling bcast consumers on a cancelled call

live_on_air_from_tokio
Tyler Goodlet 2021-08-31 18:30:06 -04:00
parent 5c6355062c
commit 9258f79510
1 changed files with 37 additions and 5 deletions

View File

@ -111,7 +111,7 @@ class BroadcastReceiver(ReceiveChannel):
self._recv = receive_afunc or rx_chan.receive self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False self._closed: bool = False
async def receive(self): async def receive(self) -> ReceiveType:
key = self.key key = self.key
state = self._state state = self._state
@ -169,9 +169,11 @@ class BroadcastReceiver(ReceiveChannel):
event = trio.Event() event = trio.Event()
state.recv_ready = key, event state.recv_ready = key, event
# if we're cancelled here it should be
# fine to bail without affecting any other consumers
# right?
try: try:
value = await self._recv() value = await self._recv()
# items with lower indices are "newer" # items with lower indices are "newer"
state.queue.appendleft(value) state.queue.appendleft(value)
@ -193,21 +195,51 @@ class BroadcastReceiver(ReceiveChannel):
): ):
state.subs[sub_key] += 1 state.subs[sub_key] += 1
# NOTE: this should ONLY be set if the above task was *NOT*
# cancelled on the `._recv()` call otherwise sibling
# consumers will be awoken with a sequence of -1
event.set()
return value return value
finally: finally:
# reset receiver waiter task event for next blocking condition # Reset receiver waiter task event for next blocking condition.
event.set() # this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on
# an event that won't be set!
state.recv_ready = None state.recv_ready = None
# This task is all caught up and ready to receive the latest # This task is all caught up and ready to receive the latest
# value, so queue sched it on the internal event. # value, so queue sched it on the internal event.
else: else:
seq = state.subs[key]
assert seq == -1 # sanity
_, ev = state.recv_ready _, ev = state.recv_ready
await ev.wait() await ev.wait()
seq = state.subs[key] seq = state.subs[key]
assert seq > -1, f'Invalid sequence {seq}!?'
value = state.queue[seq]
state.subs[key] -= 1 state.subs[key] -= 1
return state.queue[seq] return value
# NOTE: if we ever would like the behaviour where if the
# first task to recv on the underlying is cancelled but it
# still DOES trigger the ``.recv_ready``, event we'll likely need
# this logic:
# if seq > -1:
# # stuff from above..
# elif seq == -1:
# # XXX: In the case where the first task to allocate the
# # ``.recv_ready`` event is cancelled we will be woken with
# # a non-incremented sequence number and thus will read the
# # oldest value if we use that. Instead we need to detect if
# # we have not been incremented and then receive again.
# return await self.receive()
# else:
# raise ValueError(f'Invalid sequence {seq}!?')
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(