diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 88198c6..35711b2 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -100,6 +100,15 @@ class BroadcastState: # on a newly produced value from the sender. recv_ready: Optional[tuple[int, trio.Event]] = None + # if a ``trio.EndOfChannel`` is received on any + # consumer all consumers should be placed in this state + # such that the group is notified of the end-of-broadcast. + # For now, this is solely for testing/debugging purposes. + eoc: bool = False + + # If the broadcaster was cancelled, we might as well track it + cancelled: bool = False + class BroadcastReceiver(ReceiveChannel): '''A memory receive channel broadcaster which is non-lossy for the @@ -222,13 +231,23 @@ class BroadcastReceiver(ReceiveChannel): event.set() return value + except trio.EndOfChannel: + # if any one consumer gets an EOC from the underlying + # receiver we need to unblock and send that signal to + # all other consumers. + self._state.eoc = True + if event.statistics().tasks_waiting: + event.set() + raise + except ( trio.Cancelled, - trio.EndOfChannel, ): # handle cancelled specially otherwise sibling # consumers will be awoken with a sequence of -1 - # state.recv_ready = trio.Cancelled + # and will potentially try to rewait the underlying + # receiver instead of just cancelling immediately. + self._state.cancelled = True if event.statistics().tasks_waiting: event.set() raise @@ -305,7 +324,10 @@ class BroadcastReceiver(ReceiveChannel): async def aclose( self, ) -> None: + ''' + Close this receiver without affecting other consumers. + ''' if self._closed: return