Push every new change; not just the last trade
parent
62bac7b2cd
commit
29be7f58c9
|
@ -301,7 +301,7 @@ async def poll_tickers(
|
||||||
client: Client, tickers: [str],
|
client: Client, tickers: [str],
|
||||||
q: trio.Queue,
|
q: trio.Queue,
|
||||||
rate: int = 5, # 200ms delay between quotes
|
rate: int = 5, # 200ms delay between quotes
|
||||||
time_cached: bool = True, # only deliver "new" quotes to the queue
|
diff_cached: bool = True, # only deliver "new" quotes to the queue
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Stream quotes for a sequence of tickers at the given ``rate``
|
"""Stream quotes for a sequence of tickers at the given ``rate``
|
||||||
per second.
|
per second.
|
||||||
|
@ -323,14 +323,14 @@ async def poll_tickers(
|
||||||
if quote['delay'] > 0:
|
if quote['delay'] > 0:
|
||||||
log.warning(f"Delayed quote:\n{quote}")
|
log.warning(f"Delayed quote:\n{quote}")
|
||||||
|
|
||||||
if time_cached: # if cache is enabled then only deliver "new" changes
|
if diff_cached:
|
||||||
|
# if cache is enabled then only deliver "new" changes
|
||||||
symbol = quote['symbol']
|
symbol = quote['symbol']
|
||||||
last = _cache.setdefault(symbol, {})
|
last = _cache.setdefault(symbol, {})
|
||||||
timekey = 'lastTradeTime'
|
new = set(quote.items()) - set(last.items())
|
||||||
if quote[timekey] != last.get(timekey):
|
if new:
|
||||||
new = set(quote.items()) - set(last.items())
|
|
||||||
log.info(
|
log.info(
|
||||||
f"New quote {quote['symbol']} @ {quote[timekey]}:\n{new}")
|
f"New quote {quote['symbol']}:\n{new}")
|
||||||
_cache[symbol] = quote
|
_cache[symbol] = quote
|
||||||
payload.append(quote)
|
payload.append(quote)
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in New Issue