diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 746f23df..2ea0de8f 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -2,9 +2,10 @@ Questrade API backend. """ import time -import datetime +from datetime import datetime from functools import partial import configparser +from typing import List, Tuple, Dict, Any import trio from async_generator import asynccontextmanager @@ -29,6 +30,73 @@ class QuestradeError(Exception): "Non-200 OK response code" +class _API: + """Questrade API endpoints exposed as methods and wrapped with an + http session. + """ + def __init__(self, session: asks.Session): + self._sess = session + + async def _request(self, path: str, params=None) -> dict: + resp = await self._sess.get(path=f'/{path}', params=params) + return resproc(resp, log) + + async def accounts(self) -> dict: + return await self._request('accounts') + + async def time(self) -> dict: + return await self._request('time') + + async def markets(self) -> dict: + return await self._request('markets') + + async def search(self, prefix: str) -> dict: + return await self._request( + 'symbols/search', params={'prefix': prefix}) + + async def symbols(self, ids: str = '', names: str = '') -> dict: + log.debug(f"Symbol lookup for {ids or names}") + return await self._request( + 'symbols', params={'ids': ids, 'names': names}) + + async def quotes(self, ids: str) -> dict: + return await self._request('markets/quotes', params={'ids': ids}) + + async def candles(self, id: str, start: str, end, interval) -> dict: + return await self._request(f'markets/candles/{id}', params={}) + + async def balances(self, id: str) -> dict: + return await self._request(f'accounts/{id}/balances') + + async def postions(self, id: str) -> dict: + return await self._request(f'accounts/{id}/positions') + + async def option_contracts(self, symbol_id: str) -> dict: + "Retrieve all option contract API ids with expiry -> strike prices." + contracts = await self._request(f'symbols/{symbol_id}/options') + return contracts['optionChain'] + + async def option_quotes( + self, + ids: List[int], + expiry: str, + option_ids: List[int] = [], # if you don't want them all + ) -> dict: + "Retrieve option chain quotes for all option ids or by filter(s)." + filters = [ + { + "underlyingId": int(symbol_id), + "expiryDate": str(expiry), + } for symbol_id in ids + ] + + resp = await self._sess.post( + path=f'/markets/quotes/options', + json={'filters': filters, 'optionIds': option_ids} + ) + return resproc(resp, log) + + class Client: """API client suitable for use as a long running broker daemon or single api requests. @@ -100,7 +168,7 @@ class Client: """ access_token = self.access_data.get('access_token') expires = float(self.access_data.get('expires_at', 0)) - expires_stamp = datetime.datetime.fromtimestamp( + expires_stamp = datetime.fromtimestamp( expires).strftime('%Y-%m-%d %H:%M:%S') if not access_token or (expires < time.time()) or force_refresh: log.debug( @@ -147,15 +215,26 @@ class Client: data = await self.api.symbols(names=','.join(tickers)) symbols2ids = {} for ticker, symbol in zip(tickers, data['symbols']): - symbols2ids[symbol['symbol']] = symbol['symbolId'] + symbols2ids[symbol['symbol']] = str(symbol['symbolId']) return symbols2ids - async def quote(self, tickers: [str]): - """Return quotes for each ticker in ``tickers``. + async def symbol_data(self, tickers: List[str]): + """Return symbol data for ``tickers``. """ t2ids = await self.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) + ids = ','.join(t2ids.values()) + symbols = {} + for pkt in (await self.api.symbols(ids=ids))['symbols']: + symbols[pkt['symbol']] = pkt + + return symbols + + async def quote(self, tickers: [str]): + """Return stock quotes for each ticker in ``tickers``. + """ + t2ids = await self.tickers2ids(tickers) + ids = ','.join(t2ids.values()) results = (await self.api.quotes(ids=ids))['quotes'] quotes = {quote['symbol']: quote for quote in results} @@ -167,58 +246,59 @@ class Client: return quotes - async def symbol_data(self, tickers: [str]): - """Return symbol data for ``tickers``. + async def option_contracts( + self, + symbol: str + ) -> Tuple[int, Dict[datetime, dict]]: + """Return option contract dat for the given symbol. + + The most useful part is the expiries which can be passed to the option + chain endpoint but specifc contract ids can be pulled here as well. """ - t2ids = await self.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) - symbols = {} - for pkt in (await self.api.symbols(ids=ids))['symbols']: - symbols[pkt['symbol']] = pkt + id = int((await self.tickers2ids([symbol]))[symbol]) + contracts = await self.api.option_contracts(id) + return id, { + # convert to native datetime objs for sorting + datetime.fromisoformat(item['expiryDate']): + item for item in contracts + } - return symbols + async def max_contract_expiry( + self, + symbols: List[str] + ) -> Tuple[List[int], datetime]: + """Look up all contracts for each symbol in ``symbols`` and return the + list of symbol ids as well as the maximum possible option contract + expiry out of the bunch. + This routine is a bit slow doing all the contract lookups (a request + per symbol) and thus the return values should be cached for use with + ``option_chains()``. + """ + batch = {} + for symbol in symbols: + id, contracts = await self.option_contracts(symbol) + batch[id] = max(contracts) -class _API: - """Questrade API endpoints exposed as methods and wrapped with an - http session. - """ - def __init__(self, session: asks.Session): - self._sess = session + return tuple(batch.keys()), max(batch.values()) - async def _request(self, path: str, params=None) -> dict: - resp = await self._sess.get(path=f'/{path}', params=params) - return resproc(resp, log) + async def option_chains( + self, + symbol_ids: List[int], + max_expiry: str # iso format datetime (microseconds) + ) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return option chain snap quote for each ticker in ``symbols``. + """ + quotes = (await self.api.option_quotes( + ids=symbol_ids, + expiry=max_expiry.isoformat(timespec='microseconds') + ))['optionQuotes'] - async def accounts(self) -> dict: - return await self._request('accounts') + batch = {} + for quote in quotes: + batch.setdefault(quote['underlying'], {})[quote['symbol']] = quote - async def time(self) -> dict: - return await self._request('time') - - async def markets(self) -> dict: - return await self._request('markets') - - async def search(self, prefix: str) -> dict: - return await self._request( - 'symbols/search', params={'prefix': prefix}) - - async def symbols(self, ids: str = '', names: str = '') -> dict: - log.debug(f"Symbol lookup for {ids or names}") - return await self._request( - 'symbols', params={'ids': ids, 'names': names}) - - async def quotes(self, ids: str) -> dict: - return await self._request('markets/quotes', params={'ids': ids}) - - async def candles(self, id: str, start: str, end, interval) -> dict: - return await self._request(f'markets/candles/{id}', params={}) - - async def balances(self, id: str) -> dict: - return await self._request(f'accounts/{id}/balances') - - async def postions(self, id: str) -> dict: - return await self._request(f'accounts/{id}/positions') + return batch async def token_refresher(client): @@ -286,13 +366,18 @@ async def get_client() -> Client: write_conf(client) -async def quoter(client: Client, tickers: [str]): +async def quoter(client: Client, tickers: List[str]): """Quoter context. + + Yeah so fun times..QT has this symbol to ``int`` id lookup system that you + have to use to get any quotes. That means we try to be smart and maintain + a cache of this map lazily as requests from in for new tickers/symbols. + Most of the closure variables here are to deal with that. """ t2ids = {} ids = '' - def filter_symbols(quotes_dict): + def filter_symbols(quotes_dict: dict): nonlocal t2ids for symbol, quote in quotes_dict.items(): if quote['low52w'] is None: @@ -311,6 +396,7 @@ async def quoter(client: Client, tickers: [str]): # update ticker ids cache log.debug(f"Tickers set changed {new - current}") t2ids = await client.tickers2ids(tickers) + # re-save symbol -> ids cache ids = ','.join(map(str, t2ids.values())) try: @@ -323,6 +409,12 @@ async def quoter(client: Client, tickers: [str]): quotes_resp = await client.api.quotes(ids=ids) except BrokerError as qterr: if "Access token is invalid" in str(qterr.args[0]): + # TODO: this will crash when run from a sub-actor since + # STDIN can't be acquired. The right way to handle this + # is to make a request to the parent actor (i.e. + # spawner of this) to call this + # `client.ensure_access()` locally thus blocking until + # the user provides an API key on the "client side" await client.ensure_access(force_refresh=True) else: raise @@ -340,7 +432,7 @@ async def quoter(client: Client, tickers: [str]): first_quotes_dict = await get_quote(tickers) filter_symbols(first_quotes_dict) - # re-save symbol ids cache + # re-save symbol -> ids cache ids = ','.join(map(str, t2ids.values())) return get_quote @@ -357,6 +449,7 @@ _qt_keys = { 'askPrice': 'ask', 'bidPrice': 'bid', 'lastTradeSize': 'size', + 'lastTradeTime': ('time', datetime.fromisoformat), 'bidSize': 'bsize', 'askSize': 'asize', 'VWAP': ('VWAP', partial(round, ndigits=3)), @@ -371,7 +464,6 @@ _qt_keys = { # 'high52w': 'high52w', # "lastTradePriceTrHrs": 7.99, # "lastTradeTick": "Equal", - # "lastTradeTime": "2018-01-30T18:28:23.434000-05:00", # "symbolId": 3575753, # "tier": "", # 'isHalted': 'halted', # as subscript 'h' @@ -389,7 +481,7 @@ def format_quote( quote: dict, symbol_data: dict, keymap: dict = _qt_keys, -) -> (dict, dict): +) -> Tuple[dict, dict]: """Remap a list of quote dicts ``quotes`` using the mapping of old keys -> new keys ``keymap`` returning 2 dicts: one with raw data and the other for display.