Add option quoter support for streaming

So much changed to get this working for both stocks and options:
- Index contracts by a new `ContractsKey` named tuple
- Move to pushing lists of quotes instead of dicts since option
  subscriptions are often not identified by their "symbol" key and
  this makes it difficult at fan out time to know how a quote should
  be indexed and delivered. Instead add a special `key` entry to each
  quote dict which is the quote's subscription key.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-11-30 00:33:40 -05:00
parent 75d22c6058
commit cd7d8d024d
1 changed files with 136 additions and 67 deletions

View File

@ -5,7 +5,8 @@ import time
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
import configparser import configparser
from typing import List, Tuple, Dict, Any from operator import itemgetter
from typing import List, Tuple, Dict, Any, Iterator, NamedTuple
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
@ -24,13 +25,19 @@ log = get_logger(__name__)
_refresh_token_ep = 'https://login.questrade.com/oauth2/' _refresh_token_ep = 'https://login.questrade.com/oauth2/'
_version = 'v1' _version = 'v1'
_rate_limit = 3 # queries/sec _rate_limit = 4 # queries/sec
class QuestradeError(Exception): class QuestradeError(Exception):
"Non-200 OK response code" "Non-200 OK response code"
class ContractsKey(NamedTuple):
symbol: str
id: int
expiry: datetime
class _API: class _API:
"""Questrade API endpoints exposed as methods and wrapped with an """Questrade API endpoints exposed as methods and wrapped with an
http session. http session.
@ -61,7 +68,11 @@ class _API:
'symbols', params={'ids': ids, 'names': names}) 'symbols', params={'ids': ids, 'names': names})
async def quotes(self, ids: str) -> dict: async def quotes(self, ids: str) -> dict:
return await self._request('markets/quotes', params={'ids': ids}) quotes = (await self._request(
'markets/quotes', params={'ids': ids}))['quotes']
for quote in quotes:
quote['key'] = quote['symbol']
return quotes
async def candles(self, id: str, start: str, end, interval) -> dict: async def candles(self, id: str, start: str, end, interval) -> dict:
return await self._request(f'markets/candles/{id}', params={}) return await self._request(f'markets/candles/{id}', params={})
@ -79,20 +90,19 @@ class _API:
async def option_quotes( async def option_quotes(
self, self,
contracts: Dict[int, Dict[str, dict]], contracts: Dict[ContractsKey, Dict[int, dict]],
option_ids: List[int] = [], # if you don't want them all option_ids: List[int] = [], # if you don't want them all
) -> dict: ) -> dict:
"Retrieve option chain quotes for all option ids or by filter(s)." """Retrieve option chain quotes for all option ids or by filter(s).
"""
filters = [ filters = [
{ {
"underlyingId": int(symbol_id), "underlyingId": int(symbol_id),
"expiryDate": str(expiry), "expiryDate": str(expiry),
} }
# every expiry per symbol id # every expiry per symbol id
for symbol_id, expiries in contracts.items() for (symbol, symbol_id, expiry), bystrike in contracts.items()
for expiry in expiries
] ]
resp = await self._sess.post( resp = await self._sess.post(
path=f'/markets/quotes/options', path=f'/markets/quotes/options',
json={'filters': filters, 'optionIds': option_ids} json={'filters': filters, 'optionIds': option_ids}
@ -111,9 +121,9 @@ class Client:
self.api = _API(self._sess) self.api = _API(self._sess)
self._conf = config self._conf = config
self.access_data = {} self.access_data = {}
self.user_data = {}
self._reload_config(config) self._reload_config(config)
self._symbol_cache = {} self._symbol_cache: Dict[str, int] = {}
self._contracts2expiries = {}
def _reload_config(self, config=None, **kwargs): def _reload_config(self, config=None, **kwargs):
log.warn("Reloading access config data") log.warn("Reloading access config data")
@ -252,8 +262,7 @@ class Client:
""" """
t2ids = await self.tickers2ids(tickers) t2ids = await self.tickers2ids(tickers)
ids = ','.join(t2ids.values()) ids = ','.join(t2ids.values())
results = (await self.api.quotes(ids=ids))['quotes'] quotes = (await self.api.quotes(ids=ids))
quotes = {quote['symbol']: quote for quote in results}
# set None for all symbols not found # set None for all symbols not found
if len(t2ids) < len(tickers): if len(t2ids) < len(tickers):
@ -266,7 +275,7 @@ class Client:
async def symbol2contracts( async def symbol2contracts(
self, self,
symbol: str symbol: str
) -> Tuple[int, Dict[datetime, dict]]: ) -> Dict[Tuple[str, int, datetime], dict]:
"""Return option contract for the given symbol. """Return option contract for the given symbol.
The most useful part is the expiries which can be passed to the option The most useful part is the expiries which can be passed to the option
@ -274,15 +283,18 @@ class Client:
""" """
id = int((await self.tickers2ids([symbol]))[symbol]) id = int((await self.tickers2ids([symbol]))[symbol])
contracts = await self.api.option_contracts(id) contracts = await self.api.option_contracts(id)
return id, { return {
# convert to native datetime objs for sorting ContractsKey(
datetime.fromisoformat(item['expiryDate']): symbol=symbol,
item for item in contracts id=id,
# convert to native datetime objs for sorting
expiry=datetime.fromisoformat(item['expiryDate'])):
item for item in contracts
} }
async def get_all_contracts( async def get_all_contracts(
self, self,
symbols: List[str], symbols: Iterator[str],
# {symbol_id: {dt_iso_contract: {strike_price: {contract_id: id}}}} # {symbol_id: {dt_iso_contract: {strike_price: {contract_id: id}}}}
) -> Dict[int, Dict[str, Dict[int, Any]]]: ) -> Dict[int, Dict[str, Dict[int, Any]]]:
"""Look up all contracts for each symbol in ``symbols`` and return the """Look up all contracts for each symbol in ``symbols`` and return the
@ -293,21 +305,29 @@ class Client:
per symbol) and thus the return values should be cached for use with per symbol) and thus the return values should be cached for use with
``option_chains()``. ``option_chains()``.
""" """
by_id = {} by_key = {}
for symbol in symbols: for symbol in symbols:
id, contracts = await self.symbol2contracts(symbol) contracts = await self.symbol2contracts(symbol)
by_id[id] = { # FIXME: chainPerRoot here is probably why in some UIs
dt.isoformat(timespec='microseconds'): { # you see a second chain with a (1) suffixed; should
# probably handle this eventually.
for key, byroot in sorted(
# sort by datetime
contracts.items(),
key=lambda item: item[0].expiry
):
by_key[
ContractsKey(
key.symbol,
key.id,
# converting back - maybe just do this initially?
key.expiry.isoformat(timespec='microseconds'),
)
] = {
item['strikePrice']: item for item in item['strikePrice']: item for item in
byroot['chainPerRoot'][0]['chainPerStrikePrice'] byroot['chainPerRoot'][0]['chainPerStrikePrice']
} }
for dt, byroot in sorted( return by_key
# sort by datetime
contracts.items(),
key=lambda item: item[0]
)
}
return by_id
async def option_chains( async def option_chains(
self, self,
@ -316,12 +336,14 @@ class Client:
) -> Dict[str, Dict[str, Dict[str, Any]]]: ) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option chain snap quote for each ticker in ``symbols``. """Return option chain snap quote for each ticker in ``symbols``.
""" """
quotes = await self.api.option_quotes(contracts) batch = []
batch = {} for key, bystrike in contracts.items():
for quote in quotes: quotes = await self.api.option_quotes({key: bystrike})
batch.setdefault( for quote in quotes:
quote['underlying'], {} # index by .symbol, .expiry since that's what
)[quote['symbol']] = quote # a subscriber (currently) sends initially
quote['key'] = (key[0], key[2])
batch.extend(quotes)
return batch return batch
@ -391,15 +413,14 @@ async def get_client() -> Client:
write_conf(client) write_conf(client)
async def quoter(client: Client, tickers: List[str]): async def stock_quoter(client: Client, tickers: List[str]):
"""Stock Quoter context. """Stock quoter context.
Yeah so fun times..QT has this symbol to ``int`` id lookup system that you Yeah so fun times..QT has this symbol to ``int`` id lookup system that you
have to use to get any quotes. That means we try to be smart and maintain have to use to get any quotes. That means we try to be smart and maintain
a cache of this map lazily as requests from in for new tickers/symbols. a cache of this map lazily as requests from in for new tickers/symbols.
Most of the closure variables here are to deal with that. Most of the closure variables here are to deal with that.
""" """
@async_lifo_cache(maxsize=128) @async_lifo_cache(maxsize=128)
async def get_symbol_id_seq(symbols: Tuple[str]): async def get_symbol_id_seq(symbols: Tuple[str]):
"""For each tuple ``(symbol_1, symbol_2, ... , symbol_n)`` """For each tuple ``(symbol_1, symbol_2, ... , symbol_n)``
@ -411,6 +432,7 @@ async def quoter(client: Client, tickers: List[str]):
"""Query for quotes using cached symbol ids. """Query for quotes using cached symbol ids.
""" """
if not tickers: if not tickers:
# don't hit the network
return {} return {}
ids = await get_symbol_id_seq(tuple(tickers)) ids = await get_symbol_id_seq(tuple(tickers))
@ -418,41 +440,88 @@ async def quoter(client: Client, tickers: List[str]):
try: try:
quotes_resp = await client.api.quotes(ids=ids) quotes_resp = await client.api.quotes(ids=ids)
except (QuestradeError, BrokerError) as qterr: except (QuestradeError, BrokerError) as qterr:
if "Access token is invalid" in str(qterr.args[0]): if "Access token is invalid" not in str(qterr.args[0]):
# out-of-process piker actor may have
# renewed already..
client._reload_config()
try:
quotes_resp = await client.api.quotes(ids=ids)
except BrokerError as qterr:
if "Access token is invalid" in str(qterr.args[0]):
# TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this
# is to make a request to the parent actor (i.e.
# spawner of this) to call this
# `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side"
await client.ensure_access(force_refresh=True)
quotes_resp = await client.api.quotes(ids=ids)
else:
raise raise
# out-of-process piker actor may have
# renewed already..
client._reload_config()
try:
quotes_resp = await client.api.quotes(ids=ids)
except BrokerError as qterr:
if "Access token is invalid" in str(qterr.args[0]):
# TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this
# is to make a request to the parent actor (i.e.
# spawner of this) to call this
# `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side"
await client.ensure_access(force_refresh=True)
quotes_resp = await client.api.quotes(ids=ids)
# dict packing and post-processing # post-processing
quotes = {} for quote in quotes_resp:
for quote in quotes_resp['quotes']:
quotes[quote['symbol']] = quote
if quote.get('delay', 0) > 0: if quote.get('delay', 0) > 0:
log.warn(f"Delayed quote:\n{quote}") log.warn(f"Delayed quote:\n{quote}")
return quotes return quotes_resp
# strip out unknown/invalid symbols return get_quote
first_quotes_dict = await get_quote(tickers)
for symbol, quote in first_quotes_dict.items():
if quote['low52w'] is None: async def option_quoter(client: Client, tickers: List[str]):
log.warn( """Option quoter context.
f"{symbol} seems to be defunct") """
# sanity
if isinstance(tickers[0], tuple):
datetime.fromisoformat(tickers[0][1])
else:
log.warn(f"Ignoring option quoter call with {tickers}")
# TODO make caller always check that a quoter has been set
return
@async_lifo_cache(maxsize=128)
async def get_contract_by_date(sym_date_pairs: Tuple[Tuple[str, str]]):
"""For each tuple,
``(symbol_date_1, symbol_date_2, ... , symbol_date_n)``
return a contract dict.
"""
symbols = map(itemgetter(0), sym_date_pairs)
dates = map(itemgetter(1), sym_date_pairs)
contracts = await client.get_all_contracts(symbols)
selected = {}
for key, val in contracts.items():
if key.expiry in dates:
selected[key] = val
return selected
async def get_quote(symbol_date_pairs):
"""Query for quotes using cached symbol ids.
"""
contracts = await get_contract_by_date(
tuple(symbol_date_pairs))
try:
quotes = await client.option_chains(contracts)
except (QuestradeError, BrokerError) as qterr:
if "Access token is invalid" not in str(qterr.args[0]):
raise
# out-of-process piker actor may have
# renewed already..
client._reload_config()
try:
quotes = await client.option_chains(contracts)
except BrokerError as qterr:
if "Access token is invalid" in str(qterr.args[0]):
# TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this
# is to make a request to the parent actor (i.e.
# spawner of this) to call this
# `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side"
await client.ensure_access(force_refresh=True)
quotes = await client.option_chains(contracts)
return quotes
return get_quote return get_quote