forked from goodboy/tractor
1
0
Fork 0

Wake all broadcast consumers on EOC

Without this wakeup you can have tasks which re-enter `.receive()`
and get stuck waiting on the wakeup event indefinitely. Whenever
a ``trio.EndOfChannel`` arrives we want to make sure all consumers
at least know about it and don't block. This previous behaviour was
basically a bug.

Add some state flags for tracking if the broadcaster was either
cancelled or terminated via EOC mostly for testing and debugging
purposes though this info might be useful if we decide to offer
a `.statistics()` like API in the future.
end_of_channel_fixes
Tyler Goodlet 2021-12-15 14:27:49 -05:00
parent 61e134dc5d
commit 3deb1b91e6
1 changed files with 24 additions and 2 deletions

View File

@ -100,6 +100,15 @@ class BroadcastState:
# on a newly produced value from the sender. # on a newly produced value from the sender.
recv_ready: Optional[tuple[int, trio.Event]] = None recv_ready: Optional[tuple[int, trio.Event]] = None
# if a ``trio.EndOfChannel`` is received on any
# consumer all consumers should be placed in this state
# such that the group is notified of the end-of-broadcast.
# For now, this is solely for testing/debugging purposes.
eoc: bool = False
# If the broadcaster was cancelled, we might as well track it
cancelled: bool = False
class BroadcastReceiver(ReceiveChannel): class BroadcastReceiver(ReceiveChannel):
'''A memory receive channel broadcaster which is non-lossy for the '''A memory receive channel broadcaster which is non-lossy for the
@ -222,13 +231,23 @@ class BroadcastReceiver(ReceiveChannel):
event.set() event.set()
return value return value
except trio.EndOfChannel:
# if any one consumer gets an EOC from the underlying
# receiver we need to unblock and send that signal to
# all other consumers.
self._state.eoc = True
if event.statistics().tasks_waiting:
event.set()
raise
except ( except (
trio.Cancelled, trio.Cancelled,
trio.EndOfChannel,
): ):
# handle cancelled specially otherwise sibling # handle cancelled specially otherwise sibling
# consumers will be awoken with a sequence of -1 # consumers will be awoken with a sequence of -1
# state.recv_ready = trio.Cancelled # and will potentially try to rewait the underlying
# receiver instead of just cancelling immediately.
self._state.cancelled = True
if event.statistics().tasks_waiting: if event.statistics().tasks_waiting:
event.set() event.set()
raise raise
@ -305,7 +324,10 @@ class BroadcastReceiver(ReceiveChannel):
async def aclose( async def aclose(
self, self,
) -> None: ) -> None:
'''
Close this receiver without affecting other consumers.
'''
if self._closed: if self._closed:
return return