Implement candles retrieval from Questrade

There's some expected limitations with the number of sticks allowed in
a single query (they say 2k but I've been able to pull 20k). Also note
without a paid data sub there's a 15m delay on 1m sticks (we'll hack
around that shortly, don't worry).
questrade_candles
Tyler Goodlet 2020-05-23 14:00:53 -04:00
parent 595f79c632
commit c11946988e
2 changed files with 121 additions and 14 deletions

View File

@ -9,14 +9,17 @@ from functools import partial
import configparser import configparser
from typing import List, Tuple, Dict, Any, Iterator, NamedTuple from typing import List, Tuple, Dict, Any, Iterator, NamedTuple
import arrow
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
import pandas as pd
import numpy as np
import wrapt import wrapt
import asks import asks
from ..calc import humanize, percent_change from ..calc import humanize, percent_change
from . import config from . import config
from ._util import resproc, BrokerError from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json from ..log import get_logger, colorize_json
from .._async_utils import async_lifo_cache from .._async_utils import async_lifo_cache
@ -30,6 +33,25 @@ _version = 'v1'
# it seems 4 rps is best we can do total # it seems 4 rps is best we can do total
_rate_limit = 4 _rate_limit = 4
_time_frames = {
'1m': 'OneMinute',
'2m': 'TwoMinutes',
'3m': 'ThreeMinutes',
'4m': 'FourMinutes',
'5m': 'FiveMinutes',
'10m': 'TenMinutes',
'15m': 'FifteenMinutes',
'20m': 'TwentyMinutes',
'30m': 'HalfHour',
'1h': 'OneHour',
'2h': 'TwoHours',
'4h': 'FourHours',
'D': 'OneDay',
'W': 'OneWeek',
'M': 'OneMonth',
'Y': 'OneYear',
}
class QuestradeError(Exception): class QuestradeError(Exception):
"Non-200 OK response code" "Non-200 OK response code"
@ -70,9 +92,9 @@ def refresh_token_on_err(tries=3):
if "Access token is invalid" not in str(qterr.args[0]): if "Access token is invalid" not in str(qterr.args[0]):
raise raise
# TODO: this will crash when run from a sub-actor since # TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this # STDIN can't be acquired (ONLY WITH MP). The right way
# is to make a request to the parent actor (i.e. # to handle this is to make a request to the parent
# spawner of this) to call this # actor (i.e. spawner of this) to call this
# `client.ensure_access()` locally thus blocking until # `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side" # the user provides an API key on the "client side"
log.warning(f"Tokens are invalid refreshing try {i}..") log.warning(f"Tokens are invalid refreshing try {i}..")
@ -168,8 +190,18 @@ class _API:
quote['key'] = quote['symbol'] quote['key'] = quote['symbol']
return quotes return quotes
async def candles(self, id: str, start: str, end, interval) -> dict: async def candles(
return await self._get(f'markets/candles/{id}', params={}) self, symbol_id:
str, start: str,
end: str,
interval: str
) -> List[Dict[str, float]]:
"""Retrieve historical candles for provided date range.
"""
return (await self._get(
f'markets/candles/{symbol_id}',
params={'startTime': start, 'endTime': end, 'interval': interval},
))['candles']
async def option_contracts(self, symbol_id: str) -> dict: async def option_contracts(self, symbol_id: str) -> dict:
"Retrieve all option contract API ids with expiry -> strike prices." "Retrieve all option contract API ids with expiry -> strike prices."
@ -193,7 +225,7 @@ class _API:
for (symbol, symbol_id, expiry), bystrike in contracts.items() for (symbol, symbol_id, expiry), bystrike in contracts.items()
] ]
resp = await self._sess.post( resp = await self._sess.post(
path=f'/markets/quotes/options', path='/markets/quotes/options',
# XXX: b'{"code":1024,"message":"The size of the array requested # XXX: b'{"code":1024,"message":"The size of the array requested
# is not valid: optionIds"}' # is not valid: optionIds"}'
# ^ what I get when trying to use too many ids manually... # ^ what I get when trying to use too many ids manually...
@ -349,7 +381,10 @@ class Client:
return data return data
async def tickers2ids(self, tickers): async def tickers2ids(
self,
tickers: Iterator[str]
) -> Dict[str, int]:
"""Helper routine that take a sequence of ticker symbols and returns """Helper routine that take a sequence of ticker symbols and returns
their corresponding QT numeric symbol ids. their corresponding QT numeric symbol ids.
@ -362,7 +397,7 @@ class Client:
if id is not None: if id is not None:
symbols2ids[symbol] = id symbols2ids[symbol] = id
# still missing uncached values - hit the server # still missing uncached values - hit the api server
to_lookup = list(set(tickers) - set(symbols2ids)) to_lookup = list(set(tickers) - set(symbols2ids))
if to_lookup: if to_lookup:
data = await self.api.symbols(names=','.join(to_lookup)) data = await self.api.symbols(names=','.join(to_lookup))
@ -511,6 +546,78 @@ class Client:
return quotes return quotes
async def bars(
self,
symbol: str,
# EST in ISO 8601 format is required...
# start_date: str = "1970-01-01T00:00:00.000000-05:00",
start_date: str = "2020-03-24T16:01:00.000000-04:00",
time_frame='1m',
count=20e3,
) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
.. note::
The candles endpoint only allows "2000" points per query
however tests here show that it is 20k candles per query.
"""
# fix case
if symbol.islower():
symbol = symbol.swapcase()
sids = await self.tickers2ids([symbol])
if not sids:
raise SymbolNotFound(symbol)
sid = sids[symbol]
est_now = arrow.utcnow().to('US/Eastern').floor('minute')
est_start = est_now.shift(minutes=-count)
start = time.time()
bars = await self.api.candles(
sid,
start=est_start.isoformat(),
end=est_now.isoformat(),
interval=_time_frames[time_frame],
)
log.debug(
f"Took {time.time() - start} seconds to retreive {len(bars)} bars")
return bars
# marketstore TSD compatible numpy dtype for bar
_qt_bars_dt = [
# ('start', 'S40'),
('Epoch', 'i8'),
# ('end', 'S40'),
('low', 'f4'),
('high', 'f4'),
('open', 'f4'),
('close', 'f4'),
('volume', 'i8'),
# ('VWAP', 'f4')
]
def get_OHLCV(
bar: Dict[str, Any]
) -> Tuple[str, Any]:
"""Return a marketstore key-compatible OHCLV dictionary.
"""
del bar['end']
del bar['VWAP']
bar['start'] = pd.Timestamp(bar['start']).value/10**9
return tuple(bar.values())
def to_marketstore_structarray(
bars: List[Dict[str, Any]]
) -> np.array:
"""Return marketstore writeable recarray from sequence of bars
retrieved via the ``candles`` endpoint.
"""
return np.array(list(map(get_OHLCV, bars)), dtype=_qt_bars_dt)
async def token_refresher(client): async def token_refresher(client):
"""Coninually refresh the ``access_token`` near its expiry time. """Coninually refresh the ``access_token`` near its expiry time.
@ -549,7 +656,7 @@ def get_config(
has_token = section.get('refresh_token') if section else False has_token = section.get('refresh_token') if section else False
if force_from_user or ask_user_on_failure and not (section or has_token): if force_from_user or ask_user_on_failure and not (section or has_token):
log.warn(f"Forcing manual token auth from user") log.warn("Forcing manual token auth from user")
_token_from_user(conf) _token_from_user(conf)
else: else:
if not section: if not section:
@ -634,7 +741,7 @@ async def option_quoter(client: Client, tickers: List[str]):
if isinstance(tickers[0], tuple): if isinstance(tickers[0], tuple):
datetime.fromisoformat(tickers[0][1]) datetime.fromisoformat(tickers[0][1])
else: else:
raise ValueError(f'Option subscription format is (symbol, expiry)') raise ValueError('Option subscription format is (symbol, expiry)')
@async_lifo_cache(maxsize=128) @async_lifo_cache(maxsize=128)
async def get_contract_by_date( async def get_contract_by_date(
@ -687,8 +794,8 @@ _qt_stock_keys = {
# 'low52w': 'low52w', # put in info widget # 'low52w': 'low52w', # put in info widget
# 'high52w': 'high52w', # 'high52w': 'high52w',
# "lastTradePriceTrHrs": 7.99, # "lastTradePriceTrHrs": 7.99,
# 'lastTradeTime': ('time', datetime.fromisoformat), 'lastTradeTime': ('fill_time', datetime.fromisoformat),
# "lastTradeTick": "Equal", "lastTradeTick": 'tick', # ("Equal", "Up", "Down")
# "symbolId": 3575753, # "symbolId": 3575753,
# "tier": "", # "tier": "",
# 'isHalted': 'halted', # as subscript 'h' # 'isHalted': 'halted', # as subscript 'h'
@ -696,7 +803,7 @@ _qt_stock_keys = {
} }
# BidAskLayout columns which will contain three cells the first stacked on top # BidAskLayout columns which will contain three cells the first stacked on top
# of the other 2 # of the other 2 (this is a UI layout instruction)
_stock_bidasks = { _stock_bidasks = {
'last': ['bid', 'ask'], 'last': ['bid', 'ask'],
'size': ['bsize', 'asize'], 'size': ['bsize', 'asize'],