From 3deb1b91e63bc83aa3a26cf4f5c41d1497aadd52 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 14:27:49 -0500 Subject: [PATCH] Wake all broadcast consumers on EOC Without this wakeup you can have tasks which re-enter `.receive()` and get stuck waiting on the wakeup event indefinitely. Whenever a ``trio.EndOfChannel`` arrives we want to make sure all consumers at least know about it and don't block. This previous behaviour was basically a bug. Add some state flags for tracking if the broadcaster was either cancelled or terminated via EOC mostly for testing and debugging purposes though this info might be useful if we decide to offer a `.statistics()` like API in the future. --- tractor/trionics/_broadcast.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 88198c6..35711b2 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -100,6 +100,15 @@ class BroadcastState: # on a newly produced value from the sender. recv_ready: Optional[tuple[int, trio.Event]] = None + # if a ``trio.EndOfChannel`` is received on any + # consumer all consumers should be placed in this state + # such that the group is notified of the end-of-broadcast. + # For now, this is solely for testing/debugging purposes. + eoc: bool = False + + # If the broadcaster was cancelled, we might as well track it + cancelled: bool = False + class BroadcastReceiver(ReceiveChannel): '''A memory receive channel broadcaster which is non-lossy for the @@ -222,13 +231,23 @@ class BroadcastReceiver(ReceiveChannel): event.set() return value + except trio.EndOfChannel: + # if any one consumer gets an EOC from the underlying + # receiver we need to unblock and send that signal to + # all other consumers. + self._state.eoc = True + if event.statistics().tasks_waiting: + event.set() + raise + except ( trio.Cancelled, - trio.EndOfChannel, ): # handle cancelled specially otherwise sibling # consumers will be awoken with a sequence of -1 - # state.recv_ready = trio.Cancelled + # and will potentially try to rewait the underlying + # receiver instead of just cancelling immediately. + self._state.cancelled = True if event.statistics().tasks_waiting: event.set() raise @@ -305,7 +324,10 @@ class BroadcastReceiver(ReceiveChannel): async def aclose( self, ) -> None: + ''' + Close this receiver without affecting other consumers. + ''' if self._closed: return