From 151e7bf4fa76046306799c962aac1c0a139b2fd9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Feb 2018 02:18:33 -0500 Subject: [PATCH] More client enhancements - Extend the qt api to include candles (not working yet), balances, positions. - Add a `quote()` method to the `Client` for batch ticker quotes and expose it through a CLI subcommand. - Make `poll_tickers` push new quotes to a `trio.Queue` --- piker/__init__.py | 3 ++ piker/brokers/questrade.py | 76 ++++++++++++++++++++++++++++++++------ 2 files changed, 67 insertions(+), 12 deletions(-) diff --git a/piker/__init__.py b/piker/__init__.py index e69de29b..ac6e86e3 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -0,0 +1,3 @@ +""" +piker: trading toolz for hackerz. +""" diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 3c30efea..b56efb0d 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -49,11 +49,13 @@ def resproc( class Client: """API client suitable for use as a long running broker daemon or - for single api requests. + single api requests. + + Provides a high-level api which wraps the underlying endpoint calls. """ def __init__(self, config: 'configparser.ConfigParser'): self._sess = asks.Session() - self.api = API(self._sess) + self.api = _API(self._sess) self._conf = config self.access_data = {} self.user_data = {} @@ -151,9 +153,17 @@ class Client: return symbols2ids + async def quote(self, tickers): + """Return quotes for each ticker in ``tickers``. + """ + t2ids = await self.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + return await self.api.quotes(ids=ids) -class API: - """Questrade API at its finest. + +class _API: + """Questrade API endpoints exposed as methods and wrapped with an + http session. """ def __init__(self, session: asks.Session): self._sess = session @@ -183,6 +193,15 @@ class API: async def quotes(self, ids: str) -> dict: return await self._request('markets/quotes', params={'ids': ids}) + async def candles(self, id: str, start: str, end, interval) -> dict: + return await self._request(f'markets/candles/{id}', params={}) + + async def balances(self, id: str) -> dict: + return await self._request(f'accounts/{id}/balances') + + async def postions(self, id: str) -> dict: + return await self._request(f'accounts/{id}/positions') + async def token_refresher(client): """Coninually refresh the ``access_token`` near its expiry time. @@ -194,7 +213,8 @@ async def token_refresher(client): def _token_from_user(conf: 'configparser.ConfigParser') -> None: - # get from user + """Get API token from the user on the console. + """ refresh_token = input("Please provide your Questrade access token: ") conf['questrade'] = {'refresh_token': refresh_token} @@ -261,21 +281,46 @@ async def serve_forever(tasks) -> None: nursery.start_soon(task, client) -async def poll_tickers(client, tickers, rate=2): - """Auto-poll snap quotes for a sequence of tickers at the given ``rate`` +async def poll_tickers( + client: Client, tickers: [str], q: trio.Queue, rate: int = 2, +) -> None: + """Stream quotes for a sequence of tickers at the given ``rate`` per second. """ t2ids = await client.tickers2ids(tickers) - sleeptime = 1. / rate ids = ','.join(map(str, t2ids.values())) + sleeptime = 1. / rate + _cache = {} while True: # use an event here to trigger exit? - quote_data = await client.api.quotes(ids=ids) - await trio.sleep(sleeptime) + quotes_resp = await client.api.quotes(ids=ids) + start = time.time() + quotes = quotes_resp['quotes'] + # log.trace(quotes) + + # only push quotes with "new" data + payload = [] + for quote in quotes: + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.debug(f"New quote {symbol} data:\n{new}") + _cache[symbol] = quote + payload.append(quote) + + if payload: + q.put_nowait(payload) + + proc_time = time.time() - start + delay = sleeptime - proc_time + if delay <= 0: + log.warn(f"Took {proc_time} seconds for processing quotes?") + await trio.sleep(delay) -async def api(methname, **kwargs) -> dict: - """Make (proxy) through an api call by name and return its result. +async def api(methname: str, **kwargs) -> dict: + """Make (proxy through) an api call by name and return its result. """ async with get_client() as client: meth = getattr(client.api, methname, None) @@ -292,3 +337,10 @@ async def api(methname, **kwargs) -> dict: return return await meth(**kwargs) + + +async def quote(tickers: [str]) -> dict: + """Return quotes dict for ``tickers``. + """ + async with get_client() as client: + return await client.quote(tickers)