Make ticker stream caching optional
Push all ticker quotes to the queue regardless of duplicate content. That is, don't worry about only pushing new quote changes (turns out it is useful when coloring a watchlist where multiple of the same quote may indicate multiple similar trades and we only want to quickly "pulse" color changes on value changes). If it is desired to only push new changes, the ``cache`` flag enables the old behaviour. Also add `Client.symbols()` for returning symbol data from a sequence of tickers.kivy_mainline_and_py3.8
parent
13342c459a
commit
224451f44a
|
@ -158,7 +158,18 @@ class Client:
|
||||||
"""
|
"""
|
||||||
t2ids = await self.tickers2ids(tickers)
|
t2ids = await self.tickers2ids(tickers)
|
||||||
ids = ','.join(map(str, t2ids.values()))
|
ids = ','.join(map(str, t2ids.values()))
|
||||||
return await self.api.quotes(ids=ids)
|
return (await self.api.quotes(ids=ids))['quotes']
|
||||||
|
|
||||||
|
async def symbols(self, tickers):
|
||||||
|
"""Return quotes for each ticker in ``tickers``.
|
||||||
|
"""
|
||||||
|
t2ids = await self.tickers2ids(tickers)
|
||||||
|
ids = ','.join(map(str, t2ids.values()))
|
||||||
|
symbols = {}
|
||||||
|
for pkt in (await self.api.symbols(ids=ids))['symbols']:
|
||||||
|
symbols[pkt['symbol']] = pkt
|
||||||
|
|
||||||
|
return symbols
|
||||||
|
|
||||||
|
|
||||||
class _API:
|
class _API:
|
||||||
|
@ -282,7 +293,10 @@ async def serve_forever(tasks) -> None:
|
||||||
|
|
||||||
|
|
||||||
async def poll_tickers(
|
async def poll_tickers(
|
||||||
client: Client, tickers: [str], q: trio.Queue, rate: int = 2,
|
client: Client, tickers: [str],
|
||||||
|
q: trio.Queue,
|
||||||
|
rate: int = 3,
|
||||||
|
cache: bool = False, # only deliver "new" changes to the queue
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Stream quotes for a sequence of tickers at the given ``rate``
|
"""Stream quotes for a sequence of tickers at the given ``rate``
|
||||||
per second.
|
per second.
|
||||||
|
@ -298,15 +312,21 @@ async def poll_tickers(
|
||||||
quotes = quotes_resp['quotes']
|
quotes = quotes_resp['quotes']
|
||||||
# log.trace(quotes)
|
# log.trace(quotes)
|
||||||
|
|
||||||
# only push quotes with "new" data
|
|
||||||
payload = []
|
payload = []
|
||||||
for quote in quotes:
|
for quote in quotes:
|
||||||
symbol = quote['symbol']
|
|
||||||
last = _cache.setdefault(symbol, {})
|
if quote['delay'] > 0:
|
||||||
new = set(quote.items()) - set(last.items())
|
log.warning(f"Delayed quote:\n{quote}")
|
||||||
if new:
|
|
||||||
log.debug(f"New quote {symbol} data:\n{new}")
|
if cache: # if cache is enabled then only deliver "new" changes
|
||||||
_cache[symbol] = quote
|
symbol = quote['symbol']
|
||||||
|
last = _cache.setdefault(symbol, {})
|
||||||
|
new = set(quote.items()) - set(last.items())
|
||||||
|
if new:
|
||||||
|
log.debug(f"New quote {symbol} data:\n{new}")
|
||||||
|
_cache[symbol] = quote
|
||||||
|
payload.append(quote)
|
||||||
|
else:
|
||||||
payload.append(quote)
|
payload.append(quote)
|
||||||
|
|
||||||
if payload:
|
if payload:
|
||||||
|
|
Loading…
Reference in New Issue