diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 2f78dec9..a3e00691 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -4,6 +4,7 @@ Core broker-daemon tasks and API. import time import inspect from types import ModuleType +from typing import AsyncContextManager import trio @@ -41,6 +42,7 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict: async def poll_tickers( client: 'Client', + quoter: AsyncContextManager, tickers: [str], q: trio.Queue, rate: int = 3, # delay between quote requests @@ -49,56 +51,45 @@ async def poll_tickers( """Stream quotes for a sequence of tickers at the given ``rate`` per second. """ - t2ids = await client.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) sleeptime = round(1. / rate, 3) _cache = {} - while True: # use an event here to trigger exit? - prequote_start = time.time() - try: - quotes_resp = await client.api.quotes(ids=ids) - except QuestradeError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # out-of-process piker may have renewed already - client._reload_config() - quotes_resp = await client.api.quotes(ids=ids) - else: - raise + async with quoter(client, tickers) as get_quotes: + while True: # use an event here to trigger exit? + prequote_start = time.time() + quotes = await get_quotes(tickers) + postquote_start = time.time() + payload = [] + for quote in quotes: - postquote_start = time.time() - quotes = quotes_resp['quotes'] - payload = [] - for quote in quotes: + if quote['delay'] > 0: + log.warning(f"Delayed quote:\n{quote}") - if quote['delay'] > 0: - log.warning(f"Delayed quote:\n{quote}") - - if diff_cached: - # if cache is enabled then only deliver "new" changes - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote + if diff_cached: + # if cache is enabled then only deliver "new" changes + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + payload.append(quote) + else: payload.append(quote) + + if payload: + q.put_nowait(payload) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {req_time + proc_time}") + delay = sleeptime - (req_time + proc_time) + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) = {tot}" + f" secs (> {sleeptime}) for processing quotes?") else: - payload.append(quote) - - if payload: - q.put_nowait(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {req_time + proc_time}") - delay = sleeptime - (req_time + proc_time) - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) = {tot}" - f" secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index a66e1c50..a4cc3005 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -271,3 +271,26 @@ async def get_client() -> Client: yield client finally: write_conf(client) + + +@asynccontextmanager +async def quoter(client: Client, tickers: [str]): + t2ids = await client.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + + async def get_quote(tickers): + """Query for quotes using cached symbol ids. + """ + try: + quotes_resp = await client.api.quotes(ids=ids) + except QuestradeError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # out-of-process piker may have renewed already + client._reload_config() + quotes_resp = await client.api.quotes(ids=ids) + else: + raise + + return quotes_resp['quotes'] + + yield get_quote