Only push new quotes to the queue at 5 per sec
parent
f4fc3d099b
commit
b7b7abbc1f
|
@ -300,8 +300,8 @@ async def serve_forever(tasks) -> None:
|
||||||
async def poll_tickers(
|
async def poll_tickers(
|
||||||
client: Client, tickers: [str],
|
client: Client, tickers: [str],
|
||||||
q: trio.Queue,
|
q: trio.Queue,
|
||||||
rate: int = 3,
|
rate: int = 5, # 200ms delay between quotes
|
||||||
cache: bool = False, # only deliver "new" changes to the queue
|
time_cached: bool = True, # only deliver "new" quotes to the queue
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Stream quotes for a sequence of tickers at the given ``rate``
|
"""Stream quotes for a sequence of tickers at the given ``rate``
|
||||||
per second.
|
per second.
|
||||||
|
@ -323,12 +323,13 @@ async def poll_tickers(
|
||||||
if quote['delay'] > 0:
|
if quote['delay'] > 0:
|
||||||
log.warning(f"Delayed quote:\n{quote}")
|
log.warning(f"Delayed quote:\n{quote}")
|
||||||
|
|
||||||
if cache: # if cache is enabled then only deliver "new" changes
|
if time_cached: # if cache is enabled then only deliver "new" changes
|
||||||
symbol = quote['symbol']
|
symbol = quote['symbol']
|
||||||
last = _cache.setdefault(symbol, {})
|
last = _cache.setdefault(symbol, {})
|
||||||
new = set(quote.items()) - set(last.items())
|
timekey = 'lastTradeTime'
|
||||||
if new:
|
if quote[timekey] != last.get(timekey):
|
||||||
log.debug(f"New quote {symbol} data:\n{new}")
|
log.info(
|
||||||
|
f"New quote {quote['symbol']} @ {quote[timekey]}")
|
||||||
_cache[symbol] = quote
|
_cache[symbol] = quote
|
||||||
payload.append(quote)
|
payload.append(quote)
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in New Issue