From a7e7c9d1c0053dc0571bf5f739abfd23f30901e3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 21:02:48 -0400 Subject: [PATCH] Store array `maxlen` in state singleton The `collections.deque` takes care of array length truncation of values for us implicitly but in the future we'll likely want this value exposed to alternate array implementations. This patch is to provide for that as well as make `mypy` happy since the `dequeu.maxlen` can also be `None`. --- tractor/_broadcast.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index e375ef2..b60f45c 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -74,6 +74,7 @@ class BroadcastState: ''' queue: deque + maxlen: int # map of underlying instance id keys to receiver instances which # must be provided as a singleton per broadcaster set. @@ -81,7 +82,7 @@ class BroadcastState: # broadcast event to wake up all sleeping consumer tasks # on a newly produced value from the sender. - recv_ready: Optional[tuple[str, trio.Event]] = None + recv_ready: Optional[tuple[int, trio.Event]] = None class BroadcastReceiver(ReceiveChannel): @@ -150,7 +151,7 @@ class BroadcastReceiver(ReceiveChannel): # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back # or bail out on its own (thus un-subscribing) - state.subs[key] = state.queue.maxlen - 1 + state.subs[key] = state.maxlen - 1 # this task was overrun by the producer side task: Task = current_task() @@ -174,7 +175,12 @@ class BroadcastReceiver(ReceiveChannel): # right? try: value = await self._recv() + # items with lower indices are "newer" + # NOTE: ``collections.deque`` implicitly takes care of + # trucating values outside our ``state.maxlen``. In the + # alt-backend-array-case we'll need to make sure this is + # implemented in similar ringer-buffer-ish style. state.queue.appendleft(value) # broadcast new value to all subscribers by increasing @@ -295,6 +301,7 @@ def broadcast_receiver( recv_chan, state=BroadcastState( queue=deque(maxlen=max_buffer_size), + maxlen=max_buffer_size, subs={}, ), **kwargs,