diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 12ab382..9bec416 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -215,19 +215,23 @@ class BroadcastReceiver(ReceiveChannel): # return this value." # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging + mxln = state.maxlen + lost = seq - mxln + # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back # or bail out on its own (thus un-subscribing) - state.subs[key] = state.maxlen - 1 + state.subs[key] = mxln - 1 # this task was overrun by the producer side task: Task = current_task() - msg = f'Task {task.name} was overrun' + msg = f'Task `{task.name}` overrun and dropped `{lost}` values' if self._raise_on_lag: raise Lagged(msg) else: log.warning(msg) + return self.receive_nowait(_key, _state) state.subs[key] -= 1 return value