Drop sampler consumers that overrun 6x

fqsns
Tyler Goodlet 2022-03-24 13:29:07 -04:00
parent a6e32e7530
commit 81cd696ec8
1 changed files with 14 additions and 2 deletions

View File

@ -19,6 +19,8 @@ Sampling and broadcast machinery for (soft) real-time delivery of
financial data flows. financial data flows.
""" """
from __future__ import annotations
from collections import Counter
import time import time
import tractor import tractor
@ -188,6 +190,8 @@ async def sample_and_broadcast(
log.info("Started shared mem bar writer") log.info("Started shared mem bar writer")
overruns = Counter()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# TODO: ``numba`` this! # TODO: ``numba`` this!
@ -262,8 +266,8 @@ async def sample_and_broadcast(
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct
# key here. # key here.
bsym = f'{broker_symbol}.{brokername}' bsym = f'{broker_symbol}.{brokername}'
lags: int = 0
lags = 0
for (stream, tick_throttle) in subs: for (stream, tick_throttle) in subs:
try: try:
@ -283,10 +287,18 @@ async def sample_and_broadcast(
f'{ctx.channel.uid} !!!' f'{ctx.channel.uid} !!!'
) )
else: else:
key = id(stream)
overruns[key] += 1
log.warning( log.warning(
f'Feed overrun {bus.brokername} -> ' f'Feed overrun {bus.brokername} -> '
f'feed @ {tick_throttle} Hz' f'feed @ {tick_throttle} Hz'
) )
if overruns[key] > 6:
log.warning(
f'Dropping consumer {stream}'
)
await stream.aclose()
raise trio.BrokenResourceError
else: else:
await stream.send( await stream.send(
{bsym: quote} {bsym: quote}
@ -309,7 +321,7 @@ async def sample_and_broadcast(
'`brokerd`-quotes-feed connection' '`brokerd`-quotes-feed connection'
) )
if tick_throttle: if tick_throttle:
assert stream.closed() assert stream._closed
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?