From c11946988ef9c194be30db7ebefce6477c0e3d38 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 23 May 2020 14:00:53 -0400 Subject: [PATCH] Implement candles retrieval from Questrade There's some expected limitations with the number of sticks allowed in a single query (they say 2k but I've been able to pull 20k). Also note without a paid data sub there's a 15m delay on 1m sticks (we'll hack around that shortly, don't worry). --- piker/{ => brokers}/cli.py | 0 piker/brokers/questrade.py | 135 +++++++++++++++++++++++++++++++++---- 2 files changed, 121 insertions(+), 14 deletions(-) rename piker/{ => brokers}/cli.py (100%) diff --git a/piker/cli.py b/piker/brokers/cli.py similarity index 100% rename from piker/cli.py rename to piker/brokers/cli.py diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 67e3988f..c89b1c0d 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -9,14 +9,17 @@ from functools import partial import configparser from typing import List, Tuple, Dict, Any, Iterator, NamedTuple +import arrow import trio from async_generator import asynccontextmanager +import pandas as pd +import numpy as np import wrapt import asks from ..calc import humanize, percent_change from . import config -from ._util import resproc, BrokerError +from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json from .._async_utils import async_lifo_cache @@ -30,6 +33,25 @@ _version = 'v1' # it seems 4 rps is best we can do total _rate_limit = 4 +_time_frames = { + '1m': 'OneMinute', + '2m': 'TwoMinutes', + '3m': 'ThreeMinutes', + '4m': 'FourMinutes', + '5m': 'FiveMinutes', + '10m': 'TenMinutes', + '15m': 'FifteenMinutes', + '20m': 'TwentyMinutes', + '30m': 'HalfHour', + '1h': 'OneHour', + '2h': 'TwoHours', + '4h': 'FourHours', + 'D': 'OneDay', + 'W': 'OneWeek', + 'M': 'OneMonth', + 'Y': 'OneYear', +} + class QuestradeError(Exception): "Non-200 OK response code" @@ -70,9 +92,9 @@ def refresh_token_on_err(tries=3): if "Access token is invalid" not in str(qterr.args[0]): raise # TODO: this will crash when run from a sub-actor since - # STDIN can't be acquired. The right way to handle this - # is to make a request to the parent actor (i.e. - # spawner of this) to call this + # STDIN can't be acquired (ONLY WITH MP). The right way + # to handle this is to make a request to the parent + # actor (i.e. spawner of this) to call this # `client.ensure_access()` locally thus blocking until # the user provides an API key on the "client side" log.warning(f"Tokens are invalid refreshing try {i}..") @@ -168,8 +190,18 @@ class _API: quote['key'] = quote['symbol'] return quotes - async def candles(self, id: str, start: str, end, interval) -> dict: - return await self._get(f'markets/candles/{id}', params={}) + async def candles( + self, symbol_id: + str, start: str, + end: str, + interval: str + ) -> List[Dict[str, float]]: + """Retrieve historical candles for provided date range. + """ + return (await self._get( + f'markets/candles/{symbol_id}', + params={'startTime': start, 'endTime': end, 'interval': interval}, + ))['candles'] async def option_contracts(self, symbol_id: str) -> dict: "Retrieve all option contract API ids with expiry -> strike prices." @@ -193,7 +225,7 @@ class _API: for (symbol, symbol_id, expiry), bystrike in contracts.items() ] resp = await self._sess.post( - path=f'/markets/quotes/options', + path='/markets/quotes/options', # XXX: b'{"code":1024,"message":"The size of the array requested # is not valid: optionIds"}' # ^ what I get when trying to use too many ids manually... @@ -349,7 +381,10 @@ class Client: return data - async def tickers2ids(self, tickers): + async def tickers2ids( + self, + tickers: Iterator[str] + ) -> Dict[str, int]: """Helper routine that take a sequence of ticker symbols and returns their corresponding QT numeric symbol ids. @@ -362,7 +397,7 @@ class Client: if id is not None: symbols2ids[symbol] = id - # still missing uncached values - hit the server + # still missing uncached values - hit the api server to_lookup = list(set(tickers) - set(symbols2ids)) if to_lookup: data = await self.api.symbols(names=','.join(to_lookup)) @@ -511,6 +546,78 @@ class Client: return quotes + async def bars( + self, + symbol: str, + # EST in ISO 8601 format is required... + # start_date: str = "1970-01-01T00:00:00.000000-05:00", + start_date: str = "2020-03-24T16:01:00.000000-04:00", + time_frame='1m', + count=20e3, + ) -> List[Dict[str, Any]]: + """Retreive OHLCV bars for a symbol over a range to the present. + + .. note:: + The candles endpoint only allows "2000" points per query + however tests here show that it is 20k candles per query. + """ + # fix case + if symbol.islower(): + symbol = symbol.swapcase() + + sids = await self.tickers2ids([symbol]) + if not sids: + raise SymbolNotFound(symbol) + + sid = sids[symbol] + est_now = arrow.utcnow().to('US/Eastern').floor('minute') + est_start = est_now.shift(minutes=-count) + + start = time.time() + bars = await self.api.candles( + sid, + start=est_start.isoformat(), + end=est_now.isoformat(), + interval=_time_frames[time_frame], + ) + log.debug( + f"Took {time.time() - start} seconds to retreive {len(bars)} bars") + return bars + + +# marketstore TSD compatible numpy dtype for bar +_qt_bars_dt = [ + # ('start', 'S40'), + ('Epoch', 'i8'), + # ('end', 'S40'), + ('low', 'f4'), + ('high', 'f4'), + ('open', 'f4'), + ('close', 'f4'), + ('volume', 'i8'), + # ('VWAP', 'f4') +] + + +def get_OHLCV( + bar: Dict[str, Any] +) -> Tuple[str, Any]: + """Return a marketstore key-compatible OHCLV dictionary. + """ + del bar['end'] + del bar['VWAP'] + bar['start'] = pd.Timestamp(bar['start']).value/10**9 + return tuple(bar.values()) + + +def to_marketstore_structarray( + bars: List[Dict[str, Any]] +) -> np.array: + """Return marketstore writeable recarray from sequence of bars + retrieved via the ``candles`` endpoint. + """ + return np.array(list(map(get_OHLCV, bars)), dtype=_qt_bars_dt) + async def token_refresher(client): """Coninually refresh the ``access_token`` near its expiry time. @@ -549,7 +656,7 @@ def get_config( has_token = section.get('refresh_token') if section else False if force_from_user or ask_user_on_failure and not (section or has_token): - log.warn(f"Forcing manual token auth from user") + log.warn("Forcing manual token auth from user") _token_from_user(conf) else: if not section: @@ -634,7 +741,7 @@ async def option_quoter(client: Client, tickers: List[str]): if isinstance(tickers[0], tuple): datetime.fromisoformat(tickers[0][1]) else: - raise ValueError(f'Option subscription format is (symbol, expiry)') + raise ValueError('Option subscription format is (symbol, expiry)') @async_lifo_cache(maxsize=128) async def get_contract_by_date( @@ -687,8 +794,8 @@ _qt_stock_keys = { # 'low52w': 'low52w', # put in info widget # 'high52w': 'high52w', # "lastTradePriceTrHrs": 7.99, - # 'lastTradeTime': ('time', datetime.fromisoformat), - # "lastTradeTick": "Equal", + 'lastTradeTime': ('fill_time', datetime.fromisoformat), + "lastTradeTick": 'tick', # ("Equal", "Up", "Down") # "symbolId": 3575753, # "tier": "", # 'isHalted': 'halted', # as subscript 'h' @@ -696,7 +803,7 @@ _qt_stock_keys = { } # BidAskLayout columns which will contain three cells the first stacked on top -# of the other 2 +# of the other 2 (this is a UI layout instruction) _stock_bidasks = { 'last': ['bid', 'ask'], 'size': ['bsize', 'asize'],