From 224451f44ab49b870becc6de62f09b29a5d32fd1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Feb 2018 18:42:19 -0500 Subject: [PATCH] Make ticker stream caching optional Push all ticker quotes to the queue regardless of duplicate content. That is, don't worry about only pushing new quote changes (turns out it is useful when coloring a watchlist where multiple of the same quote may indicate multiple similar trades and we only want to quickly "pulse" color changes on value changes). If it is desired to only push new changes, the ``cache`` flag enables the old behaviour. Also add `Client.symbols()` for returning symbol data from a sequence of tickers. --- piker/brokers/questrade.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index b56efb0d..950b7070 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -158,7 +158,18 @@ class Client: """ t2ids = await self.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) - return await self.api.quotes(ids=ids) + return (await self.api.quotes(ids=ids))['quotes'] + + async def symbols(self, tickers): + """Return quotes for each ticker in ``tickers``. + """ + t2ids = await self.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + symbols = {} + for pkt in (await self.api.symbols(ids=ids))['symbols']: + symbols[pkt['symbol']] = pkt + + return symbols class _API: @@ -282,7 +293,10 @@ async def serve_forever(tasks) -> None: async def poll_tickers( - client: Client, tickers: [str], q: trio.Queue, rate: int = 2, + client: Client, tickers: [str], + q: trio.Queue, + rate: int = 3, + cache: bool = False, # only deliver "new" changes to the queue ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` per second. @@ -298,15 +312,21 @@ async def poll_tickers( quotes = quotes_resp['quotes'] # log.trace(quotes) - # only push quotes with "new" data payload = [] for quote in quotes: - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.debug(f"New quote {symbol} data:\n{new}") - _cache[symbol] = quote + + if quote['delay'] > 0: + log.warning(f"Delayed quote:\n{quote}") + + if cache: # if cache is enabled then only deliver "new" changes + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.debug(f"New quote {symbol} data:\n{new}") + _cache[symbol] = quote + payload.append(quote) + else: payload.append(quote) if payload: