From 81cd696ec818cbfa80319d90fd5e666128cfef26 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Mar 2022 13:29:07 -0400 Subject: [PATCH] Drop sampler consumers that overrun 6x --- piker/data/_sampling.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 16c6b017..d31bf7b1 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -19,6 +19,8 @@ Sampling and broadcast machinery for (soft) real-time delivery of financial data flows. """ +from __future__ import annotations +from collections import Counter import time import tractor @@ -188,6 +190,8 @@ async def sample_and_broadcast( log.info("Started shared mem bar writer") + overruns = Counter() + # iterate stream delivered by broker async for quotes in quote_stream: # TODO: ``numba`` this! @@ -262,8 +266,8 @@ async def sample_and_broadcast( # should?) so we have to manually generate the correct # key here. bsym = f'{broker_symbol}.{brokername}' + lags: int = 0 - lags = 0 for (stream, tick_throttle) in subs: try: @@ -283,10 +287,18 @@ async def sample_and_broadcast( f'{ctx.channel.uid} !!!' ) else: + key = id(stream) + overruns[key] += 1 log.warning( f'Feed overrun {bus.brokername} -> ' f'feed @ {tick_throttle} Hz' ) + if overruns[key] > 6: + log.warning( + f'Dropping consumer {stream}' + ) + await stream.aclose() + raise trio.BrokenResourceError else: await stream.send( {bsym: quote} @@ -309,7 +321,7 @@ async def sample_and_broadcast( '`brokerd`-quotes-feed connection' ) if tick_throttle: - assert stream.closed() + assert stream._closed # XXX: do we need to deregister here # if it's done in the fee bus code?