1249 lines
42 KiB
Python
1249 lines
42 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
Questrade API backend.
|
|
"""
|
|
from __future__ import annotations
|
|
import inspect
|
|
import time
|
|
from datetime import datetime
|
|
from functools import partial
|
|
import itertools
|
|
import configparser
|
|
from pprint import pformat
|
|
from typing import (
|
|
List, Tuple, Dict, Any, Iterator, NamedTuple,
|
|
AsyncGenerator,
|
|
Callable,
|
|
)
|
|
|
|
import pendulum
|
|
import trio
|
|
import tractor
|
|
from async_generator import asynccontextmanager
|
|
import numpy as np
|
|
import wrapt
|
|
import asks
|
|
|
|
from ..calc import humanize, percent_change
|
|
from .._cacheables import open_cached_client, async_lifo_cache
|
|
from .. import config
|
|
from ._util import resproc, BrokerError, SymbolNotFound
|
|
from ..log import (
|
|
colorize_json,
|
|
)
|
|
from ._util import (
|
|
log,
|
|
get_console_log,
|
|
)
|
|
|
|
_use_practice_account = False
|
|
_refresh_token_ep = 'https://{}login.questrade.com/oauth2/'
|
|
_version = 'v1'
|
|
|
|
# stock queries/sec
|
|
# it seems 4 rps is best we can do total
|
|
_rate_limit = 4
|
|
|
|
_time_frames = {
|
|
'1m': 'OneMinute',
|
|
'2m': 'TwoMinutes',
|
|
'3m': 'ThreeMinutes',
|
|
'4m': 'FourMinutes',
|
|
'5m': 'FiveMinutes',
|
|
'10m': 'TenMinutes',
|
|
'15m': 'FifteenMinutes',
|
|
'20m': 'TwentyMinutes',
|
|
'30m': 'HalfHour',
|
|
'1h': 'OneHour',
|
|
'2h': 'TwoHours',
|
|
'4h': 'FourHours',
|
|
'D': 'OneDay',
|
|
'W': 'OneWeek',
|
|
'M': 'OneMonth',
|
|
'Y': 'OneYear',
|
|
}
|
|
|
|
|
|
class QuestradeError(Exception):
|
|
"Non-200 OK response code"
|
|
|
|
|
|
class ContractsKey(NamedTuple):
|
|
symbol: str
|
|
id: int
|
|
expiry: datetime
|
|
|
|
|
|
def refresh_token_on_err(tries=3):
|
|
"""`_API` method decorator which locks the client and refreshes tokens
|
|
before unlocking access to the API again.
|
|
|
|
QT's service end can't handle concurrent requests to multiple
|
|
endpoints reliably without choking up and confusing their interal
|
|
servers.
|
|
"""
|
|
|
|
@wrapt.decorator
|
|
async def wrapper(wrapped, api, args, kwargs):
|
|
assert inspect.iscoroutinefunction(wrapped)
|
|
client = api.client
|
|
|
|
if not client._has_access.is_set():
|
|
log.warning("Waiting on access lock")
|
|
await client._has_access.wait()
|
|
|
|
for i in range(1, tries):
|
|
try:
|
|
try:
|
|
client._request_not_in_progress = trio.Event()
|
|
return await wrapped(*args, **kwargs)
|
|
finally:
|
|
client._request_not_in_progress.set()
|
|
except (QuestradeError, BrokerError) as qterr:
|
|
if "Access token is invalid" not in str(qterr.args[0]):
|
|
raise
|
|
# TODO: this will crash when run from a sub-actor since
|
|
# STDIN can't be acquired (ONLY WITH MP). The right way
|
|
# to handle this is to make a request to the parent
|
|
# actor (i.e. spawner of this) to call this
|
|
# `client.ensure_access()` locally thus blocking until
|
|
# the user provides an API key on the "client side"
|
|
log.warning(f"Tokens are invalid refreshing try {i}..")
|
|
await client.ensure_access(force_refresh=True)
|
|
if i == tries - 1:
|
|
raise
|
|
return wrapper
|
|
|
|
|
|
class _API:
|
|
"""Questrade API endpoints exposed as methods and wrapped with an
|
|
http session.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
client: Client,
|
|
):
|
|
self.client = client
|
|
self._sess: asks.Session = client._sess
|
|
|
|
@refresh_token_on_err()
|
|
async def _get(self, path: str, params=None) -> dict:
|
|
"""Get an endpoint "reliably" by ensuring access on failure.
|
|
"""
|
|
resp = await self._sess.get(path=f'/{path}', params=params)
|
|
return resproc(resp, log)
|
|
|
|
async def _new_auth_token(
|
|
self,
|
|
refresh_token: str,
|
|
) -> dict:
|
|
"""Request a new api authorization ``refresh_token``.
|
|
|
|
Gain api access using either a user provided or existing token.
|
|
See the instructions::
|
|
|
|
http://www.questrade.com/api/documentation/getting-started
|
|
http://www.questrade.com/api/documentation/security
|
|
"""
|
|
resp = await self._sess.get(
|
|
self.client._auth_ep + 'token',
|
|
params={'grant_type': 'refresh_token',
|
|
'refresh_token': refresh_token}
|
|
)
|
|
return resproc(resp, log)
|
|
|
|
async def _revoke_auth_token(
|
|
self,
|
|
practise: bool = False,
|
|
) -> None:
|
|
"""Revoke api access for the current token.
|
|
"""
|
|
token = self.access_data['refresh_token']
|
|
log.debug(f"Revoking token {token}")
|
|
resp = await asks.post(
|
|
self.client._auth_ep + 'revoke',
|
|
headers={'token': token}
|
|
)
|
|
return resp
|
|
|
|
# accounts end points
|
|
|
|
async def accounts(self) -> dict:
|
|
return await self._get('accounts')
|
|
|
|
async def time(self) -> dict:
|
|
return await self._get('time')
|
|
|
|
async def balances(self, id: str) -> dict:
|
|
return await self._get(f'accounts/{id}/balances')
|
|
|
|
async def postions(self, id: str) -> dict:
|
|
return await self._get(f'accounts/{id}/positions')
|
|
|
|
# market end points
|
|
|
|
async def markets(self) -> dict:
|
|
return await self._get('markets')
|
|
|
|
async def search(self, prefix: str) -> dict:
|
|
return await self._get(
|
|
'symbols/search', params={'prefix': prefix})
|
|
|
|
async def symbols(self, ids: str = '', names: str = '') -> dict:
|
|
log.debug(f"Symbol lookup for {ids or names}")
|
|
return await self._get(
|
|
'symbols', params={'ids': ids, 'names': names})
|
|
|
|
async def quotes(self, ids: str) -> dict:
|
|
quotes = (await self._get(
|
|
'markets/quotes', params={'ids': ids}))['quotes']
|
|
for quote in quotes:
|
|
quote['key'] = quote['symbol']
|
|
return quotes
|
|
|
|
async def candles(
|
|
self, symbol_id:
|
|
str, start: str,
|
|
end: str,
|
|
interval: str
|
|
) -> List[Dict[str, float]]:
|
|
"""Retrieve historical candles for provided date range.
|
|
"""
|
|
return (await self._get(
|
|
f'markets/candles/{symbol_id}',
|
|
params={'startTime': start, 'endTime': end, 'interval': interval},
|
|
))['candles']
|
|
|
|
async def option_contracts(self, symbol_id: str) -> dict:
|
|
"Retrieve all option contract API ids with expiry -> strike prices."
|
|
contracts = await self._get(f'symbols/{symbol_id}/options')
|
|
return contracts['optionChain']
|
|
|
|
@refresh_token_on_err()
|
|
async def option_quotes(
|
|
self,
|
|
contracts: Dict[ContractsKey, Dict[int, dict]] = {},
|
|
option_ids: List[int] = [], # if you don't want them all
|
|
) -> dict:
|
|
"""Retrieve option chain quotes for all option ids or by filter(s).
|
|
"""
|
|
filters = [
|
|
{
|
|
"underlyingId": int(symbol_id),
|
|
"expiryDate": str(expiry),
|
|
}
|
|
# every expiry per symbol id
|
|
for (symbol, symbol_id, expiry), bystrike in contracts.items()
|
|
]
|
|
resp = await self._sess.post(
|
|
path='/markets/quotes/options',
|
|
# XXX: b'{"code":1024,"message":"The size of the array requested
|
|
# is not valid: optionIds"}'
|
|
# ^ what I get when trying to use too many ids manually...
|
|
json={'filters': filters, 'optionIds': option_ids}
|
|
)
|
|
return resproc(resp, log)['optionQuotes']
|
|
|
|
|
|
class Client:
|
|
"""API client suitable for use as a long running broker daemon or
|
|
single api requests.
|
|
|
|
Provides a high-level api which wraps the underlying endpoint calls.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
config: dict,
|
|
):
|
|
# use 2 connections per streaming endpoint (stocks, opts)
|
|
# TODO: when we have more then one account key then this should scale
|
|
# linearly with that.
|
|
self._sess = asks.Session(connections=4)
|
|
self.api = _API(self)
|
|
self._conf = config
|
|
self._is_practice = _use_practice_account or (
|
|
config['questrade'].get('is_practice', False)
|
|
)
|
|
self._auth_ep = _refresh_token_ep.format(
|
|
'practice' if self._is_practice else '')
|
|
self.access_data = {}
|
|
self._reload_config(config=config)
|
|
self._symbol_cache: Dict[str, int] = {}
|
|
self._optids2contractinfo = {}
|
|
self._contract2ids = {}
|
|
# for blocking during token refresh
|
|
self._has_access = trio.Event()
|
|
self._has_access.set()
|
|
self._request_not_in_progress = trio.Event()
|
|
self._request_not_in_progress.set()
|
|
self._mutex = trio.StrictFIFOLock()
|
|
|
|
def _reload_config(self, config=None, **kwargs):
|
|
if config:
|
|
self._conf = config
|
|
else:
|
|
self._conf, _ = get_config(**kwargs)
|
|
self.access_data = dict(self._conf['questrade'])
|
|
|
|
def write_config(self):
|
|
"""Save access creds to config file.
|
|
"""
|
|
self._conf['questrade'] = self.access_data
|
|
config.write(self._conf)
|
|
|
|
async def ensure_access(
|
|
self,
|
|
force_refresh: bool = False,
|
|
ask_user: bool = True,
|
|
) -> dict:
|
|
"""Acquire a new token set (``access_token`` and ``refresh_token``).
|
|
|
|
Checks if the locally cached (file system) ``access_token`` has expired
|
|
(based on a ``expires_at`` time stamp stored in the brokers.ini config)
|
|
expired (normally has a lifetime of 3 days). If ``false is set then
|
|
and refreshs token if necessary using the ``refresh_token``. If the
|
|
``refresh_token`` has expired a new one needs to be provided by the
|
|
user.
|
|
"""
|
|
# wait for ongoing requests to clear (API can't handle
|
|
# concurrent endpoint requests alongside a token refresh)
|
|
await self._request_not_in_progress.wait()
|
|
|
|
# block api access to tall other tasks
|
|
# XXX: this is limitation of the API when using a single
|
|
# token whereby their service can't handle concurrent requests
|
|
# to differnet end points (particularly the auth ep) which
|
|
# causes hangs and premature token invalidation issues.
|
|
self._has_access = trio.Event()
|
|
try:
|
|
# don't allow simultaneous token refresh requests
|
|
async with self._mutex:
|
|
access_token = self.access_data.get('access_token')
|
|
expires = float(self.access_data.get('expires_at', 0))
|
|
expires_stamp = datetime.fromtimestamp(
|
|
expires).strftime('%Y-%m-%d %H:%M:%S')
|
|
if not access_token or (
|
|
expires < time.time()
|
|
) or force_refresh:
|
|
log.info("Refreshing API tokens")
|
|
log.debug(
|
|
f"Refreshing access token {access_token} which expired"
|
|
f" at {expires_stamp}")
|
|
try:
|
|
data = await self.api._new_auth_token(
|
|
self.access_data['refresh_token'])
|
|
except BrokerError as qterr:
|
|
|
|
def get_err_msg(err):
|
|
# handle str and bytes...
|
|
msg = err.args[0]
|
|
return msg.decode() if msg.isascii() else msg
|
|
|
|
msg = get_err_msg(qterr)
|
|
|
|
if "We're making some changes" in msg:
|
|
# API service is down
|
|
raise QuestradeError("API is down for maintenance")
|
|
|
|
elif msg == 'Bad Request':
|
|
# likely config ``refresh_token`` is expired but
|
|
# may be updated in the config file via
|
|
# another actor
|
|
self._reload_config()
|
|
try:
|
|
data = await self.api._new_auth_token(
|
|
self.access_data['refresh_token'])
|
|
except BrokerError as qterr:
|
|
if get_err_msg(qterr) == 'Bad Request' and (
|
|
ask_user
|
|
):
|
|
# actually expired; get new from user
|
|
self._reload_config(force_from_user=True)
|
|
data = await self.api._new_auth_token(
|
|
self.access_data['refresh_token'])
|
|
else:
|
|
raise QuestradeError(qterr)
|
|
else:
|
|
raise qterr
|
|
|
|
self.access_data.update(data)
|
|
log.debug(f"Updated tokens:\n{data}")
|
|
# store an absolute access token expiry time
|
|
self.access_data['expires_at'] = time.time() + float(
|
|
data['expires_in'])
|
|
|
|
# write to config to disk
|
|
self.write_config()
|
|
else:
|
|
log.debug(
|
|
f"\nCurrent access token {access_token} expires at"
|
|
f" {expires_stamp}\n")
|
|
|
|
# set access token header for the session
|
|
data = self.access_data
|
|
self._sess.headers.update({
|
|
'Authorization':
|
|
(f"{data['token_type']} {data['access_token']}")}
|
|
)
|
|
# set base API url (asks shorthand)
|
|
self._sess.base_location = data['api_server'] + _version
|
|
finally:
|
|
self._has_access.set()
|
|
|
|
return data
|
|
|
|
async def tickers2ids(
|
|
self,
|
|
tickers: Iterator[str]
|
|
) -> Dict[str, int]:
|
|
"""Helper routine that take a sequence of ticker symbols and returns
|
|
their corresponding QT numeric symbol ids.
|
|
|
|
Cache any symbol to id lookups for later use.
|
|
"""
|
|
cache = self._symbol_cache
|
|
symbols2ids = {}
|
|
for symbol in tickers:
|
|
id = cache.get(symbol)
|
|
if id is not None:
|
|
symbols2ids[symbol] = id
|
|
|
|
# still missing uncached values - hit the api server
|
|
to_lookup = list(set(tickers) - set(symbols2ids))
|
|
if to_lookup:
|
|
data = await self.api.symbols(names=','.join(to_lookup))
|
|
for symbol in data['symbols']:
|
|
name = symbol['symbol']
|
|
cache[name] = symbols2ids[name] = str(symbol['symbolId'])
|
|
|
|
return symbols2ids
|
|
|
|
async def symbol_info(self, symbols: List[str]):
|
|
"""Return symbol data for ``symbols``.
|
|
"""
|
|
t2ids = await self.tickers2ids(symbols)
|
|
ids = ','.join(t2ids.values())
|
|
symbols = {}
|
|
for pkt in (await self.api.symbols(ids=ids))['symbols']:
|
|
symbols[pkt['symbol']] = pkt
|
|
|
|
return symbols
|
|
|
|
# TODO: deprecate
|
|
symbol_data = symbol_info
|
|
|
|
async def quote(self, tickers: [str]):
|
|
"""Return stock quotes for each ticker in ``tickers``.
|
|
"""
|
|
t2ids = await self.tickers2ids(tickers)
|
|
quotes = []
|
|
if t2ids:
|
|
ids = ','.join(t2ids.values())
|
|
quotes = (await self.api.quotes(ids=ids))
|
|
|
|
return quotes
|
|
|
|
async def symbol2contracts(
|
|
self,
|
|
symbol: str
|
|
) -> Dict[Tuple[str, int, datetime], dict]:
|
|
"""Return option contract for the given symbol.
|
|
|
|
The most useful part is the expiries which can be passed to the option
|
|
chain endpoint but specifc contract ids can be pulled here as well.
|
|
"""
|
|
id = int((await self.tickers2ids([symbol]))[symbol])
|
|
contracts = await self.api.option_contracts(id)
|
|
return {
|
|
ContractsKey(
|
|
symbol=symbol,
|
|
id=id,
|
|
# convert to native datetime objs for sorting
|
|
expiry=datetime.fromisoformat(item['expiryDate'])):
|
|
item for item in contracts
|
|
}
|
|
|
|
async def get_all_contracts(
|
|
self,
|
|
symbols: Iterator[str],
|
|
# {symbol_id: {dt_iso_contract: {strike_price: {contract_id: id}}}}
|
|
) -> Dict[int, Dict[str, Dict[int, Any]]]:
|
|
"""Look up all contracts for each symbol in ``symbols`` and return the
|
|
of symbol ids to contracts by further organized by expiry and strike
|
|
price.
|
|
|
|
This routine is a bit slow doing all the contract lookups (a request
|
|
per symbol) and thus the return values should be cached for use with
|
|
``option_chains()``.
|
|
"""
|
|
by_key = {}
|
|
for symbol in symbols:
|
|
contracts = await self.symbol2contracts(symbol)
|
|
# FIXME: chainPerRoot here is probably why in some UIs
|
|
# you see a second chain with a (1) suffixed; should
|
|
# probably handle this eventually.
|
|
for key, byroot in sorted(
|
|
# sort by datetime
|
|
contracts.items(),
|
|
key=lambda item: item[0].expiry
|
|
):
|
|
for chain in byroot['chainPerRoot']:
|
|
optroot = chain['optionRoot']
|
|
|
|
# handle QTs "adjusted contracts" (aka adjusted for
|
|
# the underlying in some way; usually has a '(1)' in
|
|
# the expiry key in their UI)
|
|
adjusted_contracts = optroot not in key.symbol
|
|
tail = optroot[len(key.symbol):]
|
|
suffix = '-' + tail if adjusted_contracts else ''
|
|
|
|
by_key[
|
|
ContractsKey(
|
|
key.symbol + suffix,
|
|
key.id,
|
|
# converting back - maybe just do this initially?
|
|
key.expiry.isoformat(timespec='microseconds'),
|
|
)
|
|
] = {
|
|
item['strikePrice']: item for item in
|
|
chain['chainPerStrikePrice']
|
|
}
|
|
|
|
# fill out contract id to strike expiry map
|
|
for tup, bystrikes in by_key.items():
|
|
for strike, ids in bystrikes.items():
|
|
for key, contract_type in (
|
|
('callSymbolId', 'call'), ('putSymbolId', 'put')
|
|
):
|
|
contract_int_id = ids[key]
|
|
self._optids2contractinfo[contract_int_id] = {
|
|
'strike': strike,
|
|
'expiry': tup.expiry,
|
|
'contract_type': contract_type,
|
|
'contract_key': tup,
|
|
}
|
|
# store ids per contract
|
|
self._contract2ids.setdefault(
|
|
tup, set()).add(contract_int_id)
|
|
return by_key
|
|
|
|
async def option_chains(
|
|
self,
|
|
# see dict output from ``get_all_contracts()``
|
|
contracts: dict,
|
|
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
|
"""Return option chain snap quote for each ticker in ``symbols``.
|
|
"""
|
|
quotes = await self.api.option_quotes(contracts=contracts)
|
|
# XXX the below doesn't work so well due to the symbol count
|
|
# limit per quote request
|
|
# quotes = await self.api.option_quotes(option_ids=list(contract_ids))
|
|
for quote in quotes:
|
|
id = quote['symbolId']
|
|
contract_info = self._optids2contractinfo[id].copy()
|
|
key = contract_info.pop('contract_key')
|
|
|
|
# XXX TODO: this currently doesn't handle adjusted contracts
|
|
# (i.e. ones that we stick a '(1)' after)
|
|
|
|
# index by .symbol, .expiry since that's what
|
|
# a subscriber (currently) sends initially
|
|
quote['key'] = (key.symbol, key.expiry)
|
|
|
|
# update with expiry and strike (Obviously the
|
|
# QT api designers are using some kind of severely
|
|
# stupid disparate table system where they keep
|
|
# contract info in a separate table from the quote format
|
|
# keys. I'm really not surprised though - windows shop..)
|
|
# quote.update(self._optids2contractinfo[quote['symbolId']])
|
|
quote.update(contract_info)
|
|
|
|
return quotes
|
|
|
|
async def bars(
|
|
self,
|
|
symbol: str,
|
|
# EST in ISO 8601 format is required... below is EPOCH
|
|
start_date: str = "1970-01-01T00:00:00.000000-05:00",
|
|
time_frame: str = '1m',
|
|
count: float = 20e3,
|
|
is_paid_feed: bool = False,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Retreive OHLCV bars for a symbol over a range to the present.
|
|
|
|
.. note::
|
|
The candles endpoint only allows "2000" points per query
|
|
however tests here show that it is 20k candles per query.
|
|
"""
|
|
# fix case
|
|
if symbol.islower():
|
|
symbol = symbol.swapcase()
|
|
|
|
sids = await self.tickers2ids([symbol])
|
|
if not sids:
|
|
raise SymbolNotFound(symbol)
|
|
|
|
sid = sids[symbol]
|
|
|
|
# get last market open end time
|
|
est_end = now = pendulum.now('UTC').in_timezoe(
|
|
'America/New_York').start_of('minute')
|
|
|
|
# on non-paid feeds we can't retreive the first 15 mins
|
|
wd = now.isoweekday()
|
|
if wd > 5:
|
|
quotes = await self.quote([symbol])
|
|
est_end = pendulum.parse(
|
|
quotes[0]['lastTradeTime']
|
|
)
|
|
if est_end.hour == 0:
|
|
# XXX don't bother figuring out extended hours for now
|
|
est_end = est_end.replace(hour=17)
|
|
|
|
if not is_paid_feed:
|
|
est_end = est_end.shift(minutes=-15)
|
|
|
|
est_start = est_end.shift(minutes=-count)
|
|
|
|
start = time.time()
|
|
bars = await self.api.candles(
|
|
sid,
|
|
start=est_start.isoformat(),
|
|
end=est_end.isoformat(),
|
|
interval=_time_frames[time_frame],
|
|
)
|
|
log.debug(
|
|
f"Took {time.time() - start} seconds to retreive {len(bars)} bars")
|
|
return bars
|
|
|
|
async def search_symbols(
|
|
self,
|
|
pattern: str,
|
|
# how many contracts to return
|
|
upto: int = 10,
|
|
) -> Dict[str, str]:
|
|
details = {}
|
|
results = await self.api.search(prefix=pattern)
|
|
for result in results['symbols']:
|
|
sym = result['symbol']
|
|
if '.' not in sym:
|
|
sym = f"{sym}.{result['listingExchange']}"
|
|
|
|
details[sym] = result
|
|
|
|
if len(details) == upto:
|
|
return details
|
|
|
|
|
|
# marketstore TSD compatible numpy dtype for bar
|
|
_qt_bars_dt = [
|
|
('Epoch', 'i8'),
|
|
# ('start', 'S40'),
|
|
# ('end', 'S40'),
|
|
('low', 'f4'),
|
|
('high', 'f4'),
|
|
('open', 'f4'),
|
|
('close', 'f4'),
|
|
('volume', 'i8'),
|
|
# ('VWAP', 'f4')
|
|
]
|
|
|
|
|
|
def get_OHLCV(
|
|
bar: Dict[str, Any]
|
|
) -> Tuple[str, Any]:
|
|
"""Return a marketstore key-compatible OHCLV dictionary.
|
|
"""
|
|
del bar['end']
|
|
del bar['VWAP']
|
|
bar['start'] = pendulum.from_timestamp(bar['start']) / 10**9
|
|
return tuple(bar.values())
|
|
|
|
|
|
def bars_to_marketstore_structarray(
|
|
bars: List[Dict[str, Any]]
|
|
) -> np.array:
|
|
"""Return marketstore writeable recarray from sequence of bars
|
|
retrieved via the ``candles`` endpoint.
|
|
"""
|
|
return np.array(list(map(get_OHLCV, bars)), dtype=_qt_bars_dt)
|
|
|
|
|
|
async def token_refresher(client):
|
|
"""Coninually refresh the ``access_token`` near its expiry time.
|
|
"""
|
|
while True:
|
|
await trio.sleep(
|
|
float(client.access_data['expires_at']) - time.time() - .1)
|
|
await client.ensure_access(force_refresh=True)
|
|
|
|
|
|
def _token_from_user(conf: 'configparser.ConfigParser') -> None:
|
|
"""Get API token from the user on the console.
|
|
"""
|
|
refresh_token = input("Please provide your Questrade access token: ")
|
|
conf['questrade'] = {'refresh_token': refresh_token}
|
|
|
|
|
|
def get_config(
|
|
config_path: str = None,
|
|
force_from_user: bool = False,
|
|
ask_user_on_failure: bool = False,
|
|
) -> "configparser.ConfigParser":
|
|
"""Load the broker config from disk.
|
|
|
|
By default this is the file:
|
|
|
|
~/.config/piker/brokers.ini
|
|
|
|
though may be different depending on your OS.
|
|
"""
|
|
log.debug("Reloading access config data")
|
|
conf, path = config.load(config_path)
|
|
|
|
# check if the current config has a token
|
|
section = conf.get('questrade')
|
|
has_token = section.get('refresh_token') if section else False
|
|
|
|
if force_from_user or ask_user_on_failure and not (section or has_token):
|
|
log.warn("Forcing manual token auth from user")
|
|
_token_from_user(conf)
|
|
else:
|
|
if not section:
|
|
raise ValueError(f"No `questrade` section found in {path}")
|
|
if not has_token:
|
|
raise ValueError(f"No refresh token found in {path}")
|
|
|
|
return conf, path
|
|
|
|
|
|
@asynccontextmanager
|
|
async def get_client(
|
|
config_path: str = None,
|
|
ask_user: bool = True
|
|
) -> Client:
|
|
"""Spawn a broker client for making requests to the API service.
|
|
"""
|
|
conf, path = get_config(config_path, ask_user_on_failure=ask_user)
|
|
log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}")
|
|
client = Client(conf)
|
|
await client.ensure_access(ask_user=ask_user)
|
|
try:
|
|
log.debug("Check time to ensure access token is valid")
|
|
# XXX: the `time()` end point requires acc_read Oauth access.
|
|
# In order to use a client you need at least one key with this
|
|
# access enabled in order to do symbol searches and id lookups.
|
|
await client.api.time()
|
|
except Exception:
|
|
raise
|
|
# access token is likely no good
|
|
log.warn(f"Access tokens {client.access_data} seem"
|
|
f" expired, forcing refresh")
|
|
await client.ensure_access(force_refresh=True, ask_user=ask_user)
|
|
await client.api.time()
|
|
try:
|
|
yield client
|
|
except trio.Cancelled:
|
|
# only write config if we didn't bail out
|
|
client.write_config()
|
|
raise
|
|
|
|
|
|
async def stock_quoter(client: Client, tickers: List[str]):
|
|
"""Stock quoter context.
|
|
|
|
Yeah so fun times..QT has this symbol to ``int`` id lookup system that you
|
|
have to use to get any quotes. That means we try to be smart and maintain
|
|
a cache of this map lazily as requests from in for new tickers/symbols.
|
|
Most of the closure variables here are to deal with that.
|
|
"""
|
|
@async_lifo_cache(maxsize=128)
|
|
async def get_symbol_id_seq(symbols: Tuple[str]):
|
|
"""For each tuple ``(symbol_1, symbol_2, ... , symbol_n)``
|
|
return a symbol id sequence string ``'id_1,id_2, ... , id_n'``.
|
|
"""
|
|
return ','.join(map(str, (await client.tickers2ids(symbols)).values()))
|
|
|
|
async def get_quote(tickers):
|
|
"""Query for quotes using cached symbol ids.
|
|
"""
|
|
if not tickers:
|
|
# don't hit the network
|
|
return {}
|
|
|
|
ids = await get_symbol_id_seq(tuple(tickers))
|
|
quotes_resp = await client.api.quotes(ids=ids)
|
|
|
|
# post-processing
|
|
for quote in quotes_resp:
|
|
if quote.get('delay', 0) > 0:
|
|
log.warn(f"Delayed quote:\n{quote}")
|
|
|
|
return quotes_resp
|
|
|
|
return get_quote
|
|
|
|
|
|
async def option_quoter(client: Client, tickers: List[str]):
|
|
"""Option quoter context.
|
|
"""
|
|
# sanity
|
|
if isinstance(tickers[0], tuple):
|
|
datetime.fromisoformat(tickers[0][1])
|
|
else:
|
|
raise ValueError('Option subscription format is (symbol, expiry)')
|
|
|
|
@async_lifo_cache(maxsize=128)
|
|
async def get_contract_by_date(
|
|
sym_date_pairs: Tuple[Tuple[str, str]],
|
|
):
|
|
"""For each tuple,
|
|
``(symbol_date_1, symbol_date_2, ... , symbol_date_n)``
|
|
return a contract dict.
|
|
"""
|
|
symbols, dates = zip(*sym_date_pairs)
|
|
contracts = await client.get_all_contracts(symbols)
|
|
selected = {}
|
|
for key, val in contracts.items():
|
|
if key.expiry in dates:
|
|
selected[key] = val
|
|
|
|
return selected
|
|
|
|
async def get_quote(symbol_date_pairs):
|
|
"""Query for quotes using cached symbol ids.
|
|
"""
|
|
contracts = await get_contract_by_date(
|
|
tuple(symbol_date_pairs))
|
|
return await client.option_chains(contracts)
|
|
|
|
return get_quote
|
|
|
|
|
|
# Questrade column order / value conversion
|
|
# XXX: keys-values in this map define the final column values which will
|
|
# be "displayable" but not necessarily used for "data processing"
|
|
# (i.e. comparisons for sorting purposes or other calculations).
|
|
_qt_stock_keys = {
|
|
'symbol': 'symbol', # done manually in qtconvert
|
|
'%': '%',
|
|
'lastTradePrice': 'last',
|
|
'askPrice': 'ask',
|
|
'bidPrice': 'bid',
|
|
'lastTradeSize': 'size',
|
|
'bidSize': 'bsize',
|
|
'askSize': 'asize',
|
|
'VWAP': ('VWAP', partial(round, ndigits=3)),
|
|
'MC': ('MC', humanize),
|
|
'$ vol': ('$ vol', humanize),
|
|
'volume': ('volume', humanize),
|
|
# 'close': 'close',
|
|
# 'openPrice': 'open',
|
|
'lowPrice': 'low',
|
|
'highPrice': 'high',
|
|
# 'low52w': 'low52w', # put in info widget
|
|
# 'high52w': 'high52w',
|
|
# "lastTradePriceTrHrs": 7.99,
|
|
# 'lastTradeTime': ('fill_time', datetime.fromisoformat),
|
|
'lastTradeTime': 'fill_time',
|
|
"lastTradeTick": 'tick', # ("Equal", "Up", "Down")
|
|
# "symbolId": 3575753,
|
|
# "tier": "",
|
|
# 'isHalted': 'halted', # as subscript 'h'
|
|
# 'delay': 'delay', # as subscript 'p'
|
|
}
|
|
|
|
# BidAskLayout columns which will contain three cells the first stacked on top
|
|
# of the other 2 (this is a UI layout instruction)
|
|
_stock_bidasks = {
|
|
'last': ['bid', 'ask'],
|
|
'size': ['bsize', 'asize'],
|
|
'VWAP': ['low', 'high'],
|
|
'volume': ['MC', '$ vol'],
|
|
}
|
|
|
|
|
|
def format_stock_quote(
|
|
quote: dict,
|
|
symbol_data: dict,
|
|
keymap: dict = _qt_stock_keys,
|
|
) -> Tuple[dict, dict]:
|
|
"""Remap a list of quote dicts ``quotes`` using the mapping of old keys
|
|
-> new keys ``keymap`` returning 2 dicts: one with raw data and the other
|
|
for display.
|
|
|
|
Returns 2 dicts: first is the original values mapped by new keys,
|
|
and the second is the same but with all values converted to a
|
|
"display-friendly" string format.
|
|
"""
|
|
symbol = quote['symbol']
|
|
previous = symbol_data[symbol]['prevDayClosePrice']
|
|
|
|
computed = {'symbol': symbol}
|
|
last = quote.get('lastTradePrice')
|
|
if last:
|
|
change = percent_change(previous, last)
|
|
share_count = symbol_data[symbol].get('outstandingShares', None)
|
|
mktcap = share_count * last if (last and share_count) else 0
|
|
computed.update({
|
|
# 'symbol': quote['symbol'],
|
|
'%': round(change, 3),
|
|
'MC': mktcap,
|
|
# why questrade do you have to be shipping null values!!!
|
|
# '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3),
|
|
'close': previous,
|
|
})
|
|
|
|
vwap = quote.get('VWAP')
|
|
volume = quote.get('volume')
|
|
if volume is not None: # could be 0
|
|
# why questrade do you have to be an asshole shipping null values!!!
|
|
computed['$ vol'] = round((vwap or 0) * (volume or 0), 3)
|
|
|
|
new = {}
|
|
displayable = {}
|
|
|
|
for key, value in itertools.chain(quote.items(), computed.items()):
|
|
new_key = keymap.get(key)
|
|
if not new_key:
|
|
continue
|
|
|
|
# API servers can return `None` vals when markets are closed (weekend)
|
|
value = 0 if value is None else value
|
|
|
|
display_value = value
|
|
|
|
# convert values to a displayble format using available formatting func
|
|
if isinstance(new_key, tuple):
|
|
new_key, func = new_key
|
|
display_value = func(value) if value else value
|
|
|
|
new[new_key] = value
|
|
displayable[new_key] = display_value
|
|
|
|
new['displayable'] = displayable
|
|
return new, displayable
|
|
|
|
|
|
_qt_option_keys = {
|
|
"lastTradePrice": 'last',
|
|
"askPrice": 'ask',
|
|
"bidPrice": 'bid',
|
|
"lastTradeSize": 'size',
|
|
"bidSize": 'bsize',
|
|
"askSize": 'asize',
|
|
'VWAP': ('VWAP', partial(round, ndigits=3)),
|
|
"lowPrice": 'low',
|
|
"highPrice": 'high',
|
|
# "expiry": "expiry",
|
|
# "delay": 0,
|
|
"delta": ('delta', partial(round, ndigits=3)),
|
|
# "gamma": ('gama', partial(round, ndigits=3)),
|
|
# "rho": ('rho', partial(round, ndigits=3)),
|
|
# "theta": ('theta', partial(round, ndigits=3)),
|
|
# "vega": ('vega', partial(round, ndigits=3)),
|
|
'$ vol': ('$ vol', humanize),
|
|
# XXX: required key to trigger trade execution datum msg
|
|
'volume': ('volume', humanize),
|
|
# "2021-01-15T00:00:00.000000-05:00",
|
|
# "isHalted": false,
|
|
# "key": [
|
|
# "APHA.TO",
|
|
# "2021-01-15T00:00:00.000000-05:00"
|
|
# ],
|
|
# "lastTradePriceTrHrs": null,
|
|
# "lastTradeTick": 'tick',
|
|
"lastTradeTime": 'time',
|
|
"openInterest": 'oi',
|
|
"openPrice": 'open',
|
|
# "strike": 'strike',
|
|
# "symbol": "APHA15Jan21P8.00.MX",
|
|
# "symbolId": 23881868,
|
|
# "underlying": "APHA.TO",
|
|
# "underlyingId": 8297492,
|
|
"symbol": 'symbol',
|
|
"contract_type": 'contract_type',
|
|
"volatility": (
|
|
'IV %',
|
|
lambda v: '{}'.format(round(v, ndigits=2))
|
|
),
|
|
"strike": 'strike',
|
|
}
|
|
|
|
_option_bidasks = {
|
|
'last': ['bid', 'ask'],
|
|
'size': ['bsize', 'asize'],
|
|
'VWAP': ['low', 'high'],
|
|
'vol': ['oi', '$ vol'],
|
|
}
|
|
|
|
|
|
def format_option_quote(
|
|
quote: dict,
|
|
symbol_data: dict,
|
|
keymap: dict = _qt_option_keys,
|
|
include_displayables: bool = True,
|
|
) -> Tuple[dict, dict]:
|
|
"""Remap a list of quote dicts ``quotes`` using the mapping of old keys
|
|
-> new keys ``keymap`` returning 2 dicts: one with raw data and the other
|
|
for display.
|
|
|
|
Returns 2 dicts: first is the original values mapped by new keys,
|
|
and the second is the same but with all values converted to a
|
|
"display-friendly" string format.
|
|
"""
|
|
# TODO: need historical data..
|
|
# (cause why would questrade keep their quote structure consistent across
|
|
# assets..)
|
|
# previous = symbol_data[symbol]['prevDayClosePrice']
|
|
# change = percent_change(previous, last)
|
|
computed = {
|
|
# why QT do you have to be an asshole shipping null values!!!
|
|
# '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3),
|
|
# '%': round(change, 3),
|
|
# 'close': previous,
|
|
}
|
|
new = {}
|
|
displayable = {}
|
|
|
|
vwap = quote.get('VWAP')
|
|
volume = quote.get('volume')
|
|
if volume is not None: # could be 0
|
|
# why questrade do you have to be an asshole shipping null values!!!
|
|
computed['$ vol'] = round((vwap or 0) * (volume or 0), 3)
|
|
|
|
# structuring and normalization
|
|
for key, new_key in keymap.items():
|
|
display_value = value = computed.get(key) or quote.get(key)
|
|
|
|
# API servers can return `None` vals when markets are closed (weekend)
|
|
value = 0 if value is None else value
|
|
|
|
# convert values to a displayble format using available formatting func
|
|
if isinstance(new_key, tuple):
|
|
new_key, func = new_key
|
|
display_value = func(value) if value else value
|
|
|
|
new[new_key] = value
|
|
displayable[new_key] = display_value
|
|
|
|
return new, displayable
|
|
|
|
|
|
async def smoke_quote(
|
|
get_quotes,
|
|
tickers
|
|
):
|
|
"""Do an initial "smoke" request for symbols in ``tickers`` filtering
|
|
out any symbols not supported by the broker queried in the call to
|
|
``get_quotes()``.
|
|
"""
|
|
from operator import itemgetter
|
|
# TODO: trim out with #37
|
|
#################################################
|
|
# get a single quote filtering out any bad tickers
|
|
# NOTE: this code is always run for every new client
|
|
# subscription even when a broker quoter task is already running
|
|
# since the new client needs to know what symbols are accepted
|
|
log.warn(f"Retrieving smoke quote for symbols {tickers}")
|
|
quotes = await get_quotes(tickers)
|
|
|
|
# report any tickers that aren't returned in the first quote
|
|
invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes))
|
|
for symbol in invalid_tickers:
|
|
tickers.remove(symbol)
|
|
log.warn(
|
|
f"Symbol `{symbol}` not found") # by broker `{broker}`"
|
|
# )
|
|
|
|
# pop any tickers that return "empty" quotes
|
|
payload = {}
|
|
for quote in quotes:
|
|
symbol = quote['symbol']
|
|
if quote is None:
|
|
log.warn(
|
|
f"Symbol `{symbol}` not found")
|
|
# XXX: not this mutates the input list (for now)
|
|
tickers.remove(symbol)
|
|
continue
|
|
|
|
# report any unknown/invalid symbols (QT specific)
|
|
if quote.get('low52w', False) is None:
|
|
log.error(
|
|
f"{symbol} seems to be defunct")
|
|
|
|
quote['symbol'] = symbol
|
|
payload[symbol] = quote
|
|
|
|
return payload
|
|
|
|
# end of section to be trimmed out with #37
|
|
###########################################
|
|
|
|
|
|
# unbounded, shared between streaming tasks
|
|
_symbol_info_cache = {}
|
|
|
|
|
|
# function to format packets delivered to subscribers
|
|
def packetizer(
|
|
topic: str,
|
|
quotes: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
"""Normalize quotes by name into dicts using broker-specific
|
|
processing.
|
|
"""
|
|
# repack into symbol keyed dict
|
|
return {q['symbol']: q for q in quotes}
|
|
|
|
|
|
def normalize(
|
|
quotes: Dict[str, Any],
|
|
_cache: Dict[str, Any], # dict held in scope of the streaming loop
|
|
formatter: Callable,
|
|
) -> Dict[str, Any]:
|
|
"""Deliver normalized quotes by name into dicts using
|
|
broker-specific processing; only emit changes differeing from the
|
|
last quote sample creating a psuedo-tick type datum.
|
|
"""
|
|
new = {}
|
|
# XXX: this is effectively emitting "sampled ticks"
|
|
# useful for polling setups but obviously should be
|
|
# disabled if you're already rx-ing per-tick data.
|
|
for quote in quotes:
|
|
symbol = quote['symbol']
|
|
|
|
# look up last quote from cache
|
|
last = _cache.setdefault(symbol, {})
|
|
_cache[symbol] = quote
|
|
|
|
# compute volume difference
|
|
last_volume = last.get('volume', 0)
|
|
current_volume = quote['volume']
|
|
volume_diff = current_volume - last_volume
|
|
|
|
# find all keys that have match to a new value compared
|
|
# to the last quote received
|
|
changed = set(quote.items()) - set(last.items())
|
|
if changed:
|
|
log.info(f"New quote {symbol}:\n{changed}")
|
|
|
|
# TODO: can we reduce the # of iterations here and in
|
|
# called funcs?
|
|
payload = {k: quote[k] for k, v in changed}
|
|
payload['symbol'] = symbol # required by formatter
|
|
|
|
# TODO: we should probaby do the "computed" fields
|
|
# processing found inside this func in a downstream actor?
|
|
fquote, _ = formatter(payload, _symbol_info_cache)
|
|
fquote['key'] = fquote['symbol'] = symbol
|
|
|
|
# if there was volume likely the last size of
|
|
# shares traded is useful info and it's possible
|
|
# that the set difference from above will disregard
|
|
# a "size" value since the same # of shares were traded
|
|
# volume = payload.get('volume')
|
|
if volume_diff:
|
|
if volume_diff < 0:
|
|
log.error(f"Uhhh {symbol} volume: {volume_diff} ?")
|
|
|
|
fquote['volume_delta'] = volume_diff
|
|
|
|
# TODO: We can emit 2 ticks here:
|
|
# - one for the volume differential
|
|
# - one for the last known trade size
|
|
# The first in theory can be unwound and
|
|
# interpolated assuming the broker passes an
|
|
# accurate daily VWAP value.
|
|
# To make this work we need a universal ``size``
|
|
# field that is normalized before hitting this logic.
|
|
fquote['size'] = quote.get('lastTradeSize', 0)
|
|
if 'last' not in fquote:
|
|
fquote['last'] = quote.get('lastTradePrice', float('nan'))
|
|
|
|
new[symbol] = fquote
|
|
|
|
if new:
|
|
log.info(f"New quotes:\n{pformat(new)}")
|
|
return new
|
|
|
|
|
|
# TODO: currently this backend uses entirely different
|
|
# data feed machinery that was written earlier then the
|
|
# existing stuff used in other backends. This needs to
|
|
# be ported eventually and should *just work* despite
|
|
# being a multi-symbol, poll-style feed system.
|
|
@tractor.stream
|
|
async def stream_quotes(
|
|
ctx: tractor.Context, # marks this as a streaming func
|
|
symbols: List[str],
|
|
feed_type: str = 'stock',
|
|
rate: int = 3,
|
|
loglevel: str = None,
|
|
# feed_type: str = 'stock',
|
|
) -> AsyncGenerator[str, Dict[str, Any]]:
|
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
|
get_console_log(loglevel)
|
|
|
|
async with open_cached_client('questrade') as client:
|
|
if feed_type == 'stock':
|
|
formatter = format_stock_quote
|
|
get_quotes = await stock_quoter(client, symbols)
|
|
|
|
# do a smoke quote (note this mutates the input list and filters
|
|
# out bad symbols for now)
|
|
first_quotes = await smoke_quote(get_quotes, list(symbols))
|
|
else:
|
|
formatter = format_option_quote
|
|
get_quotes = await option_quoter(client, symbols)
|
|
# packetize
|
|
first_quotes = {
|
|
quote['symbol']: quote
|
|
for quote in await get_quotes(symbols)
|
|
}
|
|
|
|
# update global symbol data state
|
|
sd = await client.symbol_info(symbols)
|
|
_symbol_info_cache.update(sd)
|
|
|
|
# pre-process first set of quotes
|
|
payload = {}
|
|
for sym, quote in first_quotes.items():
|
|
fquote, _ = formatter(quote, sd)
|
|
payload[sym] = fquote
|
|
|
|
# push initial smoke quote response for client initialization
|
|
await ctx.send_yield(payload)
|
|
|
|
from .data import stream_poll_requests
|
|
|
|
await stream_poll_requests(
|
|
|
|
# ``msg.pub`` required kwargs
|
|
task_name=feed_type,
|
|
ctx=ctx,
|
|
topics=symbols,
|
|
packetizer=packetizer,
|
|
|
|
# actual target "streaming func" args
|
|
get_quotes=get_quotes,
|
|
normalizer=partial(normalize, formatter=formatter),
|
|
rate=rate,
|
|
)
|
|
log.info("Terminating stream quoter task")
|