Drop sampler consumers that overrun 6x
parent
0cb05ef868
commit
4aaf5a1f8b
|
@ -20,6 +20,7 @@ financial data flows.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from collections import Counter
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
|
@ -191,6 +192,8 @@ async def sample_and_broadcast(
|
|||
|
||||
log.info("Started shared mem bar writer")
|
||||
|
||||
overruns = Counter()
|
||||
|
||||
# iterate stream delivered by broker
|
||||
async for quotes in quote_stream:
|
||||
# TODO: ``numba`` this!
|
||||
|
@ -265,8 +268,8 @@ async def sample_and_broadcast(
|
|||
# should?) so we have to manually generate the correct
|
||||
# key here.
|
||||
bsym = f'{broker_symbol}.{brokername}'
|
||||
lags: int = 0
|
||||
|
||||
lags = 0
|
||||
for (stream, tick_throttle) in subs:
|
||||
|
||||
try:
|
||||
|
@ -286,10 +289,18 @@ async def sample_and_broadcast(
|
|||
f'{ctx.channel.uid} !!!'
|
||||
)
|
||||
else:
|
||||
key = id(stream)
|
||||
overruns[key] += 1
|
||||
log.warning(
|
||||
f'Feed overrun {bus.brokername} -> '
|
||||
f'feed @ {tick_throttle} Hz'
|
||||
)
|
||||
if overruns[key] > 6:
|
||||
log.warning(
|
||||
f'Dropping consumer {stream}'
|
||||
)
|
||||
await stream.aclose()
|
||||
raise trio.BrokenResourceError
|
||||
else:
|
||||
await stream.send(
|
||||
{bsym: quote}
|
||||
|
@ -312,7 +323,7 @@ async def sample_and_broadcast(
|
|||
'`brokerd`-quotes-feed connection'
|
||||
)
|
||||
if tick_throttle:
|
||||
assert stream.closed()
|
||||
assert stream._closed
|
||||
|
||||
# XXX: do we need to deregister here
|
||||
# if it's done in the fee bus code?
|
||||
|
|
Loading…
Reference in New Issue