# piker: trading gear for hackers # Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Questrade API backend. """ from __future__ import annotations import inspect import time from datetime import datetime from functools import partial import itertools import configparser from pprint import pformat from typing import ( List, Tuple, Dict, Any, Iterator, NamedTuple, AsyncGenerator, Callable, ) import pendulum import trio import tractor from async_generator import asynccontextmanager import numpy as np import wrapt import asks from ..calc import humanize, percent_change from .._cacheables import open_cached_client, async_lifo_cache from .. import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log log = get_logger(__name__) _use_practice_account = False _refresh_token_ep = 'https://{}login.questrade.com/oauth2/' _version = 'v1' # stock queries/sec # it seems 4 rps is best we can do total _rate_limit = 4 _time_frames = { '1m': 'OneMinute', '2m': 'TwoMinutes', '3m': 'ThreeMinutes', '4m': 'FourMinutes', '5m': 'FiveMinutes', '10m': 'TenMinutes', '15m': 'FifteenMinutes', '20m': 'TwentyMinutes', '30m': 'HalfHour', '1h': 'OneHour', '2h': 'TwoHours', '4h': 'FourHours', 'D': 'OneDay', 'W': 'OneWeek', 'M': 'OneMonth', 'Y': 'OneYear', } class QuestradeError(Exception): "Non-200 OK response code" class ContractsKey(NamedTuple): symbol: str id: int expiry: datetime def refresh_token_on_err(tries=3): """`_API` method decorator which locks the client and refreshes tokens before unlocking access to the API again. QT's service end can't handle concurrent requests to multiple endpoints reliably without choking up and confusing their interal servers. """ @wrapt.decorator async def wrapper(wrapped, api, args, kwargs): assert inspect.iscoroutinefunction(wrapped) client = api.client if not client._has_access.is_set(): log.warning("Waiting on access lock") await client._has_access.wait() for i in range(1, tries): try: try: client._request_not_in_progress = trio.Event() return await wrapped(*args, **kwargs) finally: client._request_not_in_progress.set() except (QuestradeError, BrokerError) as qterr: if "Access token is invalid" not in str(qterr.args[0]): raise # TODO: this will crash when run from a sub-actor since # STDIN can't be acquired (ONLY WITH MP). The right way # to handle this is to make a request to the parent # actor (i.e. spawner of this) to call this # `client.ensure_access()` locally thus blocking until # the user provides an API key on the "client side" log.warning(f"Tokens are invalid refreshing try {i}..") await client.ensure_access(force_refresh=True) if i == tries - 1: raise return wrapper class _API: """Questrade API endpoints exposed as methods and wrapped with an http session. """ def __init__( self, client: Client, ): self.client = client self._sess: asks.Session = client._sess @refresh_token_on_err() async def _get(self, path: str, params=None) -> dict: """Get an endpoint "reliably" by ensuring access on failure. """ resp = await self._sess.get(path=f'/{path}', params=params) return resproc(resp, log) async def _new_auth_token( self, refresh_token: str, ) -> dict: """Request a new api authorization ``refresh_token``. Gain api access using either a user provided or existing token. See the instructions:: http://www.questrade.com/api/documentation/getting-started http://www.questrade.com/api/documentation/security """ resp = await self._sess.get( self.client._auth_ep + 'token', params={'grant_type': 'refresh_token', 'refresh_token': refresh_token} ) return resproc(resp, log) async def _revoke_auth_token( self, practise: bool = False, ) -> None: """Revoke api access for the current token. """ token = self.access_data['refresh_token'] log.debug(f"Revoking token {token}") resp = await asks.post( self.client._auth_ep + 'revoke', headers={'token': token} ) return resp # accounts end points async def accounts(self) -> dict: return await self._get('accounts') async def time(self) -> dict: return await self._get('time') async def balances(self, id: str) -> dict: return await self._get(f'accounts/{id}/balances') async def postions(self, id: str) -> dict: return await self._get(f'accounts/{id}/positions') # market end points async def markets(self) -> dict: return await self._get('markets') async def search(self, prefix: str) -> dict: return await self._get( 'symbols/search', params={'prefix': prefix}) async def symbols(self, ids: str = '', names: str = '') -> dict: log.debug(f"Symbol lookup for {ids or names}") return await self._get( 'symbols', params={'ids': ids, 'names': names}) async def quotes(self, ids: str) -> dict: quotes = (await self._get( 'markets/quotes', params={'ids': ids}))['quotes'] for quote in quotes: quote['key'] = quote['symbol'] return quotes async def candles( self, symbol_id: str, start: str, end: str, interval: str ) -> List[Dict[str, float]]: """Retrieve historical candles for provided date range. """ return (await self._get( f'markets/candles/{symbol_id}', params={'startTime': start, 'endTime': end, 'interval': interval}, ))['candles'] async def option_contracts(self, symbol_id: str) -> dict: "Retrieve all option contract API ids with expiry -> strike prices." contracts = await self._get(f'symbols/{symbol_id}/options') return contracts['optionChain'] @refresh_token_on_err() async def option_quotes( self, contracts: Dict[ContractsKey, Dict[int, dict]] = {}, option_ids: List[int] = [], # if you don't want them all ) -> dict: """Retrieve option chain quotes for all option ids or by filter(s). """ filters = [ { "underlyingId": int(symbol_id), "expiryDate": str(expiry), } # every expiry per symbol id for (symbol, symbol_id, expiry), bystrike in contracts.items() ] resp = await self._sess.post( path='/markets/quotes/options', # XXX: b'{"code":1024,"message":"The size of the array requested # is not valid: optionIds"}' # ^ what I get when trying to use too many ids manually... json={'filters': filters, 'optionIds': option_ids} ) return resproc(resp, log)['optionQuotes'] class Client: """API client suitable for use as a long running broker daemon or single api requests. Provides a high-level api which wraps the underlying endpoint calls. """ def __init__( self, config: dict, ): # use 2 connections per streaming endpoint (stocks, opts) # TODO: when we have more then one account key then this should scale # linearly with that. self._sess = asks.Session(connections=4) self.api = _API(self) self._conf = config self._is_practice = _use_practice_account or ( config['questrade'].get('is_practice', False) ) self._auth_ep = _refresh_token_ep.format( 'practice' if self._is_practice else '') self.access_data = {} self._reload_config(config=config) self._symbol_cache: Dict[str, int] = {} self._optids2contractinfo = {} self._contract2ids = {} # for blocking during token refresh self._has_access = trio.Event() self._has_access.set() self._request_not_in_progress = trio.Event() self._request_not_in_progress.set() self._mutex = trio.StrictFIFOLock() def _reload_config(self, config=None, **kwargs): if config: self._conf = config else: self._conf, _ = get_config(**kwargs) self.access_data = dict(self._conf['questrade']) def write_config(self): """Save access creds to config file. """ self._conf['questrade'] = self.access_data config.write(self._conf) async def ensure_access( self, force_refresh: bool = False, ask_user: bool = True, ) -> dict: """Acquire a new token set (``access_token`` and ``refresh_token``). Checks if the locally cached (file system) ``access_token`` has expired (based on a ``expires_at`` time stamp stored in the brokers.ini config) expired (normally has a lifetime of 3 days). If ``false is set then and refreshs token if necessary using the ``refresh_token``. If the ``refresh_token`` has expired a new one needs to be provided by the user. """ # wait for ongoing requests to clear (API can't handle # concurrent endpoint requests alongside a token refresh) await self._request_not_in_progress.wait() # block api access to tall other tasks # XXX: this is limitation of the API when using a single # token whereby their service can't handle concurrent requests # to differnet end points (particularly the auth ep) which # causes hangs and premature token invalidation issues. self._has_access = trio.Event() try: # don't allow simultaneous token refresh requests async with self._mutex: access_token = self.access_data.get('access_token') expires = float(self.access_data.get('expires_at', 0)) expires_stamp = datetime.fromtimestamp( expires).strftime('%Y-%m-%d %H:%M:%S') if not access_token or ( expires < time.time() ) or force_refresh: log.info("Refreshing API tokens") log.debug( f"Refreshing access token {access_token} which expired" f" at {expires_stamp}") try: data = await self.api._new_auth_token( self.access_data['refresh_token']) except BrokerError as qterr: def get_err_msg(err): # handle str and bytes... msg = err.args[0] return msg.decode() if msg.isascii() else msg msg = get_err_msg(qterr) if "We're making some changes" in msg: # API service is down raise QuestradeError("API is down for maintenance") elif msg == 'Bad Request': # likely config ``refresh_token`` is expired but # may be updated in the config file via # another actor self._reload_config() try: data = await self.api._new_auth_token( self.access_data['refresh_token']) except BrokerError as qterr: if get_err_msg(qterr) == 'Bad Request' and ( ask_user ): # actually expired; get new from user self._reload_config(force_from_user=True) data = await self.api._new_auth_token( self.access_data['refresh_token']) else: raise QuestradeError(qterr) else: raise qterr self.access_data.update(data) log.debug(f"Updated tokens:\n{data}") # store an absolute access token expiry time self.access_data['expires_at'] = time.time() + float( data['expires_in']) # write to config to disk self.write_config() else: log.debug( f"\nCurrent access token {access_token} expires at" f" {expires_stamp}\n") # set access token header for the session data = self.access_data self._sess.headers.update({ 'Authorization': (f"{data['token_type']} {data['access_token']}")} ) # set base API url (asks shorthand) self._sess.base_location = data['api_server'] + _version finally: self._has_access.set() return data async def tickers2ids( self, tickers: Iterator[str] ) -> Dict[str, int]: """Helper routine that take a sequence of ticker symbols and returns their corresponding QT numeric symbol ids. Cache any symbol to id lookups for later use. """ cache = self._symbol_cache symbols2ids = {} for symbol in tickers: id = cache.get(symbol) if id is not None: symbols2ids[symbol] = id # still missing uncached values - hit the api server to_lookup = list(set(tickers) - set(symbols2ids)) if to_lookup: data = await self.api.symbols(names=','.join(to_lookup)) for symbol in data['symbols']: name = symbol['symbol'] cache[name] = symbols2ids[name] = str(symbol['symbolId']) return symbols2ids async def symbol_info(self, symbols: List[str]): """Return symbol data for ``symbols``. """ t2ids = await self.tickers2ids(symbols) ids = ','.join(t2ids.values()) symbols = {} for pkt in (await self.api.symbols(ids=ids))['symbols']: symbols[pkt['symbol']] = pkt return symbols # TODO: deprecate symbol_data = symbol_info async def quote(self, tickers: [str]): """Return stock quotes for each ticker in ``tickers``. """ t2ids = await self.tickers2ids(tickers) quotes = [] if t2ids: ids = ','.join(t2ids.values()) quotes = (await self.api.quotes(ids=ids)) return quotes async def symbol2contracts( self, symbol: str ) -> Dict[Tuple[str, int, datetime], dict]: """Return option contract for the given symbol. The most useful part is the expiries which can be passed to the option chain endpoint but specifc contract ids can be pulled here as well. """ id = int((await self.tickers2ids([symbol]))[symbol]) contracts = await self.api.option_contracts(id) return { ContractsKey( symbol=symbol, id=id, # convert to native datetime objs for sorting expiry=datetime.fromisoformat(item['expiryDate'])): item for item in contracts } async def get_all_contracts( self, symbols: Iterator[str], # {symbol_id: {dt_iso_contract: {strike_price: {contract_id: id}}}} ) -> Dict[int, Dict[str, Dict[int, Any]]]: """Look up all contracts for each symbol in ``symbols`` and return the of symbol ids to contracts by further organized by expiry and strike price. This routine is a bit slow doing all the contract lookups (a request per symbol) and thus the return values should be cached for use with ``option_chains()``. """ by_key = {} for symbol in symbols: contracts = await self.symbol2contracts(symbol) # FIXME: chainPerRoot here is probably why in some UIs # you see a second chain with a (1) suffixed; should # probably handle this eventually. for key, byroot in sorted( # sort by datetime contracts.items(), key=lambda item: item[0].expiry ): for chain in byroot['chainPerRoot']: optroot = chain['optionRoot'] # handle QTs "adjusted contracts" (aka adjusted for # the underlying in some way; usually has a '(1)' in # the expiry key in their UI) adjusted_contracts = optroot not in key.symbol tail = optroot[len(key.symbol):] suffix = '-' + tail if adjusted_contracts else '' by_key[ ContractsKey( key.symbol + suffix, key.id, # converting back - maybe just do this initially? key.expiry.isoformat(timespec='microseconds'), ) ] = { item['strikePrice']: item for item in chain['chainPerStrikePrice'] } # fill out contract id to strike expiry map for tup, bystrikes in by_key.items(): for strike, ids in bystrikes.items(): for key, contract_type in ( ('callSymbolId', 'call'), ('putSymbolId', 'put') ): contract_int_id = ids[key] self._optids2contractinfo[contract_int_id] = { 'strike': strike, 'expiry': tup.expiry, 'contract_type': contract_type, 'contract_key': tup, } # store ids per contract self._contract2ids.setdefault( tup, set()).add(contract_int_id) return by_key async def option_chains( self, # see dict output from ``get_all_contracts()`` contracts: dict, ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Return option chain snap quote for each ticker in ``symbols``. """ quotes = await self.api.option_quotes(contracts=contracts) # XXX the below doesn't work so well due to the symbol count # limit per quote request # quotes = await self.api.option_quotes(option_ids=list(contract_ids)) for quote in quotes: id = quote['symbolId'] contract_info = self._optids2contractinfo[id].copy() key = contract_info.pop('contract_key') # XXX TODO: this currently doesn't handle adjusted contracts # (i.e. ones that we stick a '(1)' after) # index by .symbol, .expiry since that's what # a subscriber (currently) sends initially quote['key'] = (key.symbol, key.expiry) # update with expiry and strike (Obviously the # QT api designers are using some kind of severely # stupid disparate table system where they keep # contract info in a separate table from the quote format # keys. I'm really not surprised though - windows shop..) # quote.update(self._optids2contractinfo[quote['symbolId']]) quote.update(contract_info) return quotes async def bars( self, symbol: str, # EST in ISO 8601 format is required... below is EPOCH start_date: str = "1970-01-01T00:00:00.000000-05:00", time_frame: str = '1m', count: float = 20e3, is_paid_feed: bool = False, ) -> List[Dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. .. note:: The candles endpoint only allows "2000" points per query however tests here show that it is 20k candles per query. """ # fix case if symbol.islower(): symbol = symbol.swapcase() sids = await self.tickers2ids([symbol]) if not sids: raise SymbolNotFound(symbol) sid = sids[symbol] # get last market open end time est_end = now = pendulum.now('UTC').in_timezoe( 'America/New_York').start_of('minute') # on non-paid feeds we can't retreive the first 15 mins wd = now.isoweekday() if wd > 5: quotes = await self.quote([symbol]) est_end = pendulum.parse( quotes[0]['lastTradeTime'] ) if est_end.hour == 0: # XXX don't bother figuring out extended hours for now est_end = est_end.replace(hour=17) if not is_paid_feed: est_end = est_end.shift(minutes=-15) est_start = est_end.shift(minutes=-count) start = time.time() bars = await self.api.candles( sid, start=est_start.isoformat(), end=est_end.isoformat(), interval=_time_frames[time_frame], ) log.debug( f"Took {time.time() - start} seconds to retreive {len(bars)} bars") return bars async def search_symbols( self, pattern: str, # how many contracts to return upto: int = 10, ) -> Dict[str, str]: details = {} results = await self.api.search(prefix=pattern) for result in results['symbols']: sym = result['symbol'] if '.' not in sym: sym = f"{sym}.{result['listingExchange']}" details[sym] = result if len(details) == upto: return details # marketstore TSD compatible numpy dtype for bar _qt_bars_dt = [ ('Epoch', 'i8'), # ('start', 'S40'), # ('end', 'S40'), ('low', 'f4'), ('high', 'f4'), ('open', 'f4'), ('close', 'f4'), ('volume', 'i8'), # ('VWAP', 'f4') ] def get_OHLCV( bar: Dict[str, Any] ) -> Tuple[str, Any]: """Return a marketstore key-compatible OHCLV dictionary. """ del bar['end'] del bar['VWAP'] bar['start'] = pendulum.from_timestamp(bar['start']) / 10**9 return tuple(bar.values()) def bars_to_marketstore_structarray( bars: List[Dict[str, Any]] ) -> np.array: """Return marketstore writeable recarray from sequence of bars retrieved via the ``candles`` endpoint. """ return np.array(list(map(get_OHLCV, bars)), dtype=_qt_bars_dt) async def token_refresher(client): """Coninually refresh the ``access_token`` near its expiry time. """ while True: await trio.sleep( float(client.access_data['expires_at']) - time.time() - .1) await client.ensure_access(force_refresh=True) def _token_from_user(conf: 'configparser.ConfigParser') -> None: """Get API token from the user on the console. """ refresh_token = input("Please provide your Questrade access token: ") conf['questrade'] = {'refresh_token': refresh_token} def get_config( config_path: str = None, force_from_user: bool = False, ask_user_on_failure: bool = False, ) -> "configparser.ConfigParser": """Load the broker config from disk. By default this is the file: ~/.config/piker/brokers.ini though may be different depending on your OS. """ log.debug("Reloading access config data") conf, path = config.load(config_path) # check if the current config has a token section = conf.get('questrade') has_token = section.get('refresh_token') if section else False if force_from_user or ask_user_on_failure and not (section or has_token): log.warn("Forcing manual token auth from user") _token_from_user(conf) else: if not section: raise ValueError(f"No `questrade` section found in {path}") if not has_token: raise ValueError(f"No refresh token found in {path}") return conf, path @asynccontextmanager async def get_client( config_path: str = None, ask_user: bool = True ) -> Client: """Spawn a broker client for making requests to the API service. """ conf, path = get_config(config_path, ask_user_on_failure=ask_user) log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}") client = Client(conf) await client.ensure_access(ask_user=ask_user) try: log.debug("Check time to ensure access token is valid") # XXX: the `time()` end point requires acc_read Oauth access. # In order to use a client you need at least one key with this # access enabled in order to do symbol searches and id lookups. await client.api.time() except Exception: raise # access token is likely no good log.warn(f"Access tokens {client.access_data} seem" f" expired, forcing refresh") await client.ensure_access(force_refresh=True, ask_user=ask_user) await client.api.time() try: yield client except trio.Cancelled: # only write config if we didn't bail out client.write_config() raise async def stock_quoter(client: Client, tickers: List[str]): """Stock quoter context. Yeah so fun times..QT has this symbol to ``int`` id lookup system that you have to use to get any quotes. That means we try to be smart and maintain a cache of this map lazily as requests from in for new tickers/symbols. Most of the closure variables here are to deal with that. """ @async_lifo_cache(maxsize=128) async def get_symbol_id_seq(symbols: Tuple[str]): """For each tuple ``(symbol_1, symbol_2, ... , symbol_n)`` return a symbol id sequence string ``'id_1,id_2, ... , id_n'``. """ return ','.join(map(str, (await client.tickers2ids(symbols)).values())) async def get_quote(tickers): """Query for quotes using cached symbol ids. """ if not tickers: # don't hit the network return {} ids = await get_symbol_id_seq(tuple(tickers)) quotes_resp = await client.api.quotes(ids=ids) # post-processing for quote in quotes_resp: if quote.get('delay', 0) > 0: log.warn(f"Delayed quote:\n{quote}") return quotes_resp return get_quote async def option_quoter(client: Client, tickers: List[str]): """Option quoter context. """ # sanity if isinstance(tickers[0], tuple): datetime.fromisoformat(tickers[0][1]) else: raise ValueError('Option subscription format is (symbol, expiry)') @async_lifo_cache(maxsize=128) async def get_contract_by_date( sym_date_pairs: Tuple[Tuple[str, str]], ): """For each tuple, ``(symbol_date_1, symbol_date_2, ... , symbol_date_n)`` return a contract dict. """ symbols, dates = zip(*sym_date_pairs) contracts = await client.get_all_contracts(symbols) selected = {} for key, val in contracts.items(): if key.expiry in dates: selected[key] = val return selected async def get_quote(symbol_date_pairs): """Query for quotes using cached symbol ids. """ contracts = await get_contract_by_date( tuple(symbol_date_pairs)) return await client.option_chains(contracts) return get_quote # Questrade column order / value conversion # XXX: keys-values in this map define the final column values which will # be "displayable" but not necessarily used for "data processing" # (i.e. comparisons for sorting purposes or other calculations). _qt_stock_keys = { 'symbol': 'symbol', # done manually in qtconvert '%': '%', 'lastTradePrice': 'last', 'askPrice': 'ask', 'bidPrice': 'bid', 'lastTradeSize': 'size', 'bidSize': 'bsize', 'askSize': 'asize', 'VWAP': ('VWAP', partial(round, ndigits=3)), 'MC': ('MC', humanize), '$ vol': ('$ vol', humanize), 'volume': ('volume', humanize), # 'close': 'close', # 'openPrice': 'open', 'lowPrice': 'low', 'highPrice': 'high', # 'low52w': 'low52w', # put in info widget # 'high52w': 'high52w', # "lastTradePriceTrHrs": 7.99, # 'lastTradeTime': ('fill_time', datetime.fromisoformat), 'lastTradeTime': 'fill_time', "lastTradeTick": 'tick', # ("Equal", "Up", "Down") # "symbolId": 3575753, # "tier": "", # 'isHalted': 'halted', # as subscript 'h' # 'delay': 'delay', # as subscript 'p' } # BidAskLayout columns which will contain three cells the first stacked on top # of the other 2 (this is a UI layout instruction) _stock_bidasks = { 'last': ['bid', 'ask'], 'size': ['bsize', 'asize'], 'VWAP': ['low', 'high'], 'volume': ['MC', '$ vol'], } def format_stock_quote( quote: dict, symbol_data: dict, keymap: dict = _qt_stock_keys, ) -> Tuple[dict, dict]: """Remap a list of quote dicts ``quotes`` using the mapping of old keys -> new keys ``keymap`` returning 2 dicts: one with raw data and the other for display. Returns 2 dicts: first is the original values mapped by new keys, and the second is the same but with all values converted to a "display-friendly" string format. """ symbol = quote['symbol'] previous = symbol_data[symbol]['prevDayClosePrice'] computed = {'symbol': symbol} last = quote.get('lastTradePrice') if last: change = percent_change(previous, last) share_count = symbol_data[symbol].get('outstandingShares', None) mktcap = share_count * last if (last and share_count) else 0 computed.update({ # 'symbol': quote['symbol'], '%': round(change, 3), 'MC': mktcap, # why questrade do you have to be shipping null values!!! # '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), 'close': previous, }) vwap = quote.get('VWAP') volume = quote.get('volume') if volume is not None: # could be 0 # why questrade do you have to be an asshole shipping null values!!! computed['$ vol'] = round((vwap or 0) * (volume or 0), 3) new = {} displayable = {} for key, value in itertools.chain(quote.items(), computed.items()): new_key = keymap.get(key) if not new_key: continue # API servers can return `None` vals when markets are closed (weekend) value = 0 if value is None else value display_value = value # convert values to a displayble format using available formatting func if isinstance(new_key, tuple): new_key, func = new_key display_value = func(value) if value else value new[new_key] = value displayable[new_key] = display_value new['displayable'] = displayable return new, displayable _qt_option_keys = { "lastTradePrice": 'last', "askPrice": 'ask', "bidPrice": 'bid', "lastTradeSize": 'size', "bidSize": 'bsize', "askSize": 'asize', 'VWAP': ('VWAP', partial(round, ndigits=3)), "lowPrice": 'low', "highPrice": 'high', # "expiry": "expiry", # "delay": 0, "delta": ('delta', partial(round, ndigits=3)), # "gamma": ('gama', partial(round, ndigits=3)), # "rho": ('rho', partial(round, ndigits=3)), # "theta": ('theta', partial(round, ndigits=3)), # "vega": ('vega', partial(round, ndigits=3)), '$ vol': ('$ vol', humanize), # XXX: required key to trigger trade execution datum msg 'volume': ('volume', humanize), # "2021-01-15T00:00:00.000000-05:00", # "isHalted": false, # "key": [ # "APHA.TO", # "2021-01-15T00:00:00.000000-05:00" # ], # "lastTradePriceTrHrs": null, # "lastTradeTick": 'tick', "lastTradeTime": 'time', "openInterest": 'oi', "openPrice": 'open', # "strike": 'strike', # "symbol": "APHA15Jan21P8.00.MX", # "symbolId": 23881868, # "underlying": "APHA.TO", # "underlyingId": 8297492, "symbol": 'symbol', "contract_type": 'contract_type', "volatility": ( 'IV %', lambda v: '{}'.format(round(v, ndigits=2)) ), "strike": 'strike', } _option_bidasks = { 'last': ['bid', 'ask'], 'size': ['bsize', 'asize'], 'VWAP': ['low', 'high'], 'vol': ['oi', '$ vol'], } def format_option_quote( quote: dict, symbol_data: dict, keymap: dict = _qt_option_keys, include_displayables: bool = True, ) -> Tuple[dict, dict]: """Remap a list of quote dicts ``quotes`` using the mapping of old keys -> new keys ``keymap`` returning 2 dicts: one with raw data and the other for display. Returns 2 dicts: first is the original values mapped by new keys, and the second is the same but with all values converted to a "display-friendly" string format. """ # TODO: need historical data.. # (cause why would questrade keep their quote structure consistent across # assets..) # previous = symbol_data[symbol]['prevDayClosePrice'] # change = percent_change(previous, last) computed = { # why QT do you have to be an asshole shipping null values!!! # '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), # '%': round(change, 3), # 'close': previous, } new = {} displayable = {} vwap = quote.get('VWAP') volume = quote.get('volume') if volume is not None: # could be 0 # why questrade do you have to be an asshole shipping null values!!! computed['$ vol'] = round((vwap or 0) * (volume or 0), 3) # structuring and normalization for key, new_key in keymap.items(): display_value = value = computed.get(key) or quote.get(key) # API servers can return `None` vals when markets are closed (weekend) value = 0 if value is None else value # convert values to a displayble format using available formatting func if isinstance(new_key, tuple): new_key, func = new_key display_value = func(value) if value else value new[new_key] = value displayable[new_key] = display_value return new, displayable async def smoke_quote( get_quotes, tickers ): """Do an initial "smoke" request for symbols in ``tickers`` filtering out any symbols not supported by the broker queried in the call to ``get_quotes()``. """ from operator import itemgetter # TODO: trim out with #37 ################################################# # get a single quote filtering out any bad tickers # NOTE: this code is always run for every new client # subscription even when a broker quoter task is already running # since the new client needs to know what symbols are accepted log.warn(f"Retrieving smoke quote for symbols {tickers}") quotes = await get_quotes(tickers) # report any tickers that aren't returned in the first quote invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes)) for symbol in invalid_tickers: tickers.remove(symbol) log.warn( f"Symbol `{symbol}` not found") # by broker `{broker}`" # ) # pop any tickers that return "empty" quotes payload = {} for quote in quotes: symbol = quote['symbol'] if quote is None: log.warn( f"Symbol `{symbol}` not found") # XXX: not this mutates the input list (for now) tickers.remove(symbol) continue # report any unknown/invalid symbols (QT specific) if quote.get('low52w', False) is None: log.error( f"{symbol} seems to be defunct") quote['symbol'] = symbol payload[symbol] = quote return payload # end of section to be trimmed out with #37 ########################################### # unbounded, shared between streaming tasks _symbol_info_cache = {} # function to format packets delivered to subscribers def packetizer( topic: str, quotes: Dict[str, Any], ) -> Dict[str, Any]: """Normalize quotes by name into dicts using broker-specific processing. """ # repack into symbol keyed dict return {q['symbol']: q for q in quotes} def normalize( quotes: Dict[str, Any], _cache: Dict[str, Any], # dict held in scope of the streaming loop formatter: Callable, ) -> Dict[str, Any]: """Deliver normalized quotes by name into dicts using broker-specific processing; only emit changes differeing from the last quote sample creating a psuedo-tick type datum. """ new = {} # XXX: this is effectively emitting "sampled ticks" # useful for polling setups but obviously should be # disabled if you're already rx-ing per-tick data. for quote in quotes: symbol = quote['symbol'] # look up last quote from cache last = _cache.setdefault(symbol, {}) _cache[symbol] = quote # compute volume difference last_volume = last.get('volume', 0) current_volume = quote['volume'] volume_diff = current_volume - last_volume # find all keys that have match to a new value compared # to the last quote received changed = set(quote.items()) - set(last.items()) if changed: log.info(f"New quote {symbol}:\n{changed}") # TODO: can we reduce the # of iterations here and in # called funcs? payload = {k: quote[k] for k, v in changed} payload['symbol'] = symbol # required by formatter # TODO: we should probaby do the "computed" fields # processing found inside this func in a downstream actor? fquote, _ = formatter(payload, _symbol_info_cache) fquote['key'] = fquote['symbol'] = symbol # if there was volume likely the last size of # shares traded is useful info and it's possible # that the set difference from above will disregard # a "size" value since the same # of shares were traded # volume = payload.get('volume') if volume_diff: if volume_diff < 0: log.error(f"Uhhh {symbol} volume: {volume_diff} ?") fquote['volume_delta'] = volume_diff # TODO: We can emit 2 ticks here: # - one for the volume differential # - one for the last known trade size # The first in theory can be unwound and # interpolated assuming the broker passes an # accurate daily VWAP value. # To make this work we need a universal ``size`` # field that is normalized before hitting this logic. fquote['size'] = quote.get('lastTradeSize', 0) if 'last' not in fquote: fquote['last'] = quote.get('lastTradePrice', float('nan')) new[symbol] = fquote if new: log.info(f"New quotes:\n{pformat(new)}") return new # TODO: currently this backend uses entirely different # data feed machinery that was written earlier then the # existing stuff used in other backends. This needs to # be ported eventually and should *just work* despite # being a multi-symbol, poll-style feed system. @tractor.stream async def stream_quotes( ctx: tractor.Context, # marks this as a streaming func symbols: List[str], feed_type: str = 'stock', rate: int = 3, loglevel: str = None, # feed_type: str = 'stock', ) -> AsyncGenerator[str, Dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel) async with open_cached_client('questrade') as client: if feed_type == 'stock': formatter = format_stock_quote get_quotes = await stock_quoter(client, symbols) # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) first_quotes = await smoke_quote(get_quotes, list(symbols)) else: formatter = format_option_quote get_quotes = await option_quoter(client, symbols) # packetize first_quotes = { quote['symbol']: quote for quote in await get_quotes(symbols) } # update global symbol data state sd = await client.symbol_info(symbols) _symbol_info_cache.update(sd) # pre-process first set of quotes payload = {} for sym, quote in first_quotes.items(): fquote, _ = formatter(quote, sd) payload[sym] = fquote # push initial smoke quote response for client initialization await ctx.send_yield(payload) from .data import stream_poll_requests await stream_poll_requests( # ``msg.pub`` required kwargs task_name=feed_type, ctx=ctx, topics=symbols, packetizer=packetizer, # actual target "streaming func" args get_quotes=get_quotes, normalizer=partial(normalize, formatter=formatter), rate=rate, ) log.info("Terminating stream quoter task")