From 27a39ac3ada57565c2909cef57d20bbcd50b5e28 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 26 Jan 2018 14:25:53 -0500 Subject: [PATCH] More client improvements - colorize json response data in logs - support ``refresh_token`` retrieval from user if the token for some reason expires while the client is live - extend api method support for markets, search, symbols, and quotes - support "proxying" through api calls via an ``api`` coro for one off client queries (useful for cli testing) --- piker/brokers/questrade.py | 143 ++++++++++++++++++++++++++----------- 1 file changed, 101 insertions(+), 42 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 2595b818..b7ad9d7c 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1,14 +1,16 @@ """ Questrade API backend. """ -import trio -from . import config -from ..log import get_logger -from pprint import pformat +import json import time import datetime + +import trio from async_generator import asynccontextmanager +from . import config +from ..log import get_logger, colorize_json + # TODO: move to urllib3/requests once supported import asks asks.init('trio') @@ -31,40 +33,32 @@ def resproc( Raise the appropriate error on non-200 OK responses. """ - data = resp.json() - log.debug(f"Received json contents:\n{pformat(data)}\n") - if not resp.status_code == 200: raise QuestradeError(resp.body) + try: + data = resp.json() + except json.decoder.JSONDecodeError: + log.exception(f"Failed to process {resp}") + else: + log.debug(f"Received json contents:\n{colorize_json(data)}") + return data if return_json else resp -class API: - """Questrade API at its finest. - """ - def __init__(self, session: asks.Session): - self._sess = session - - async def _request(self, path: str) -> dict: - resp = await self._sess.get(path=f'/{path}') - return resproc(resp) - - async def accounts(self): - return await self._request('accounts') - - async def time(self): - return await self._request('time') - - class Client: """API client suitable for use as a long running broker daemon. """ - def __init__(self, config: dict): + def __init__(self, config: 'configparser.ConfigParser'): self._sess = asks.Session() self.api = API(self._sess) - self.access_data = config + self._conf = config + self.access_data = {} self.user_data = {} + self._apply_config(config) + + def _apply_config(self, config): + self.access_data = dict(self._conf['questrade']) async def _new_auth_token(self) -> dict: """Request a new api authorization ``refresh_token``. @@ -113,7 +107,8 @@ class Client: (based on a ``expires_at`` time stamp stored in the brokers.ini config) expired (normally has a lifetime of 3 days). If ``false is set then and refreshs token if necessary using the ``refresh_token``. If the - ``refresh_token`` has expired a new one needs to be provided by the user. + ``refresh_token`` has expired a new one needs to be provided by the + user. """ access_token = self.access_data.get('access_token') expires = float(self.access_data.get('expires_at', 0)) @@ -122,11 +117,20 @@ class Client: if not access_token or (expires < time.time()) or force_refresh: log.info(f"Refreshing access token {access_token} which expired at" f" {expires_stamp}") - data = await self._new_auth_token() + try: + data = await self._new_auth_token() + except QuestradeError as qterr: + # likely config ``refresh_token`` is expired + if qterr.args[0].decode() == 'Bad Request': + _token_from_user(self._conf) + self._apply_config(self._conf) + data = await self._new_auth_token() # store absolute token expiry time self.access_data['expires_at'] = time.time() + float( data['expires_in']) + # write to config on disk + write_conf(self) else: log.info(f"\nCurrent access token {access_token} expires at" f" {expires_stamp}\n") @@ -135,6 +139,53 @@ class Client: return self.access_data +class API: + """Questrade API at its finest. + """ + def __init__(self, session: asks.Session): + self._sess = session + + async def _request(self, path: str, params=None) -> dict: + resp = await self._sess.get(path=f'/{path}', params=params) + return resproc(resp) + + async def accounts(self) -> dict: + return await self._request('accounts') + + async def time(self) -> dict: + return await self._request('time') + + async def markets(self) -> dict: + return await self._request('markets') + + async def search(self, prefix: str) -> dict: + return await self._request( + 'symbols/search', params={'prefix': prefix}) + + async def symbols(self, ids: str = '', names: str = '') -> dict: + log.debug(f"Symbol lookup for {ids}") + return await self._request( + 'symbols', params={'ids': ids, 'names': names}) + + async def quotes(self, ids: str) -> dict: + return await self._request('markets/quotes', params={'ids': ids}) + + +async def token_refresher(client): + """Coninually refresh the ``access_token`` near its expiry time. + """ + while True: + await trio.sleep( + float(client.access_data['expires_at']) - time.time() - .1) + await client.ensure_access(force_refresh=True) + + +def _token_from_user(conf: 'configparser.ConfigParser') -> None: + # get from user + refresh_token = input("Please provide your Questrade access token: ") + conf['questrade'] = {'refresh_token': refresh_token} + + def get_config() -> "configparser.ConfigParser": conf, path = config.load() if not conf.has_section('questrade') or ( @@ -142,19 +193,16 @@ def get_config() -> "configparser.ConfigParser": ): log.warn( f"No valid refresh token could be found in {path}") - # get from user - refresh_token = input("Please provide your Questrade access token: ") - conf['questrade'] = {'refresh_token': refresh_token} + _token_from_user(conf) return conf -async def token_refresher(client): - """Coninually refresh the ``access_token`` near its expiry time. +def write_conf(client): + """Save access creds to config file. """ - while True: - await trio.sleep(float(client.access_data['expires_at']) - time.time() - .1) - await client.ensure_access() + client._conf['questrade'] = client.access_data + config.write(client._conf) @asynccontextmanager @@ -162,8 +210,8 @@ async def get_client() -> Client: """Spawn a broker client. """ conf = get_config() - log.debug(f"Loaded config:\n{pformat(dict(conf['questrade']))}\n") - client = Client(dict(conf['questrade'])) + log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}") + client = Client(conf) await client.ensure_access() try: @@ -178,12 +226,10 @@ async def get_client() -> Client: await client.api.time() accounts = await client.api.accounts() - log.info(f"Available accounts:\n{pformat(accounts)}\n") + log.info(f"Available accounts:\n{colorize_json(accounts)}") yield client finally: - # save access creds for next run - conf['questrade'] = client.access_data - config.write(conf) + write_conf(client) async def serve_forever() -> None: @@ -194,3 +240,16 @@ async def serve_forever() -> None: # await client._revoke_auth_token() async with trio.open_nursery() as nursery: nursery.start_soon(token_refresher, client) + + +async def api(methname, **kwargs) -> dict: + async with get_client() as client: + meth = getattr(client.api, methname, None) + if meth is None: + log.error(f"No api method `{methname}` could be found?") + else: + arg = kwargs.get('arg') + if arg: + return await meth(arg) + else: + return await meth(**kwargs)