Recurse and get the last value when in warn mode

breceiver_internals
Tyler Goodlet 2023-01-13 16:52:19 -05:00
parent 2707a0e971
commit 6ba29f8d56
1 changed files with 6 additions and 2 deletions

View File

@ -215,19 +215,23 @@ class BroadcastReceiver(ReceiveChannel):
# return this value." # return this value."
# https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging
mxln = state.maxlen
lost = seq - mxln
# decrement to the last value and expect # decrement to the last value and expect
# consumer to either handle the ``Lagged`` and come back # consumer to either handle the ``Lagged`` and come back
# or bail out on its own (thus un-subscribing) # or bail out on its own (thus un-subscribing)
state.subs[key] = state.maxlen - 1 state.subs[key] = mxln - 1
# this task was overrun by the producer side # this task was overrun by the producer side
task: Task = current_task() task: Task = current_task()
msg = f'Task {task.name} was overrun' msg = f'Task `{task.name}` overrun and dropped `{lost}` values'
if self._raise_on_lag: if self._raise_on_lag:
raise Lagged(msg) raise Lagged(msg)
else: else:
log.warning(msg) log.warning(msg)
return self.receive_nowait(_key, _state)
state.subs[key] -= 1 state.subs[key] -= 1
return value return value