diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index e375ef2..b60f45c 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -74,6 +74,7 @@ class BroadcastState: ''' queue: deque + maxlen: int # map of underlying instance id keys to receiver instances which # must be provided as a singleton per broadcaster set. @@ -81,7 +82,7 @@ class BroadcastState: # broadcast event to wake up all sleeping consumer tasks # on a newly produced value from the sender. - recv_ready: Optional[tuple[str, trio.Event]] = None + recv_ready: Optional[tuple[int, trio.Event]] = None class BroadcastReceiver(ReceiveChannel): @@ -150,7 +151,7 @@ class BroadcastReceiver(ReceiveChannel): # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back # or bail out on its own (thus un-subscribing) - state.subs[key] = state.queue.maxlen - 1 + state.subs[key] = state.maxlen - 1 # this task was overrun by the producer side task: Task = current_task() @@ -174,7 +175,12 @@ class BroadcastReceiver(ReceiveChannel): # right? try: value = await self._recv() + # items with lower indices are "newer" + # NOTE: ``collections.deque`` implicitly takes care of + # trucating values outside our ``state.maxlen``. In the + # alt-backend-array-case we'll need to make sure this is + # implemented in similar ringer-buffer-ish style. state.queue.appendleft(value) # broadcast new value to all subscribers by increasing @@ -295,6 +301,7 @@ def broadcast_receiver( recv_chan, state=BroadcastState( queue=deque(maxlen=max_buffer_size), + maxlen=max_buffer_size, subs={}, ), **kwargs,