More client improvements

- colorize json response data in logs
- support ``refresh_token`` retrieval from user if the token for some
  reason expires while the client is live
- extend api method support for markets, search, symbols, and quotes
- support "proxying" through api calls via an ``api`` coro for one off
  client queries (useful for cli testing)
kivy_mainline_and_py3.8
Tyler Goodlet 2018-01-26 14:25:53 -05:00
parent 534ba0b698
commit 27a39ac3ad
1 changed files with 101 additions and 42 deletions

View File

@ -1,14 +1,16 @@
""" """
Questrade API backend. Questrade API backend.
""" """
import trio import json
from . import config
from ..log import get_logger
from pprint import pformat
import time import time
import datetime import datetime
import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from . import config
from ..log import get_logger, colorize_json
# TODO: move to urllib3/requests once supported # TODO: move to urllib3/requests once supported
import asks import asks
asks.init('trio') asks.init('trio')
@ -31,40 +33,32 @@ def resproc(
Raise the appropriate error on non-200 OK responses. Raise the appropriate error on non-200 OK responses.
""" """
data = resp.json()
log.debug(f"Received json contents:\n{pformat(data)}\n")
if not resp.status_code == 200: if not resp.status_code == 200:
raise QuestradeError(resp.body) raise QuestradeError(resp.body)
try:
data = resp.json()
except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}")
else:
log.debug(f"Received json contents:\n{colorize_json(data)}")
return data if return_json else resp return data if return_json else resp
class API:
"""Questrade API at its finest.
"""
def __init__(self, session: asks.Session):
self._sess = session
async def _request(self, path: str) -> dict:
resp = await self._sess.get(path=f'/{path}')
return resproc(resp)
async def accounts(self):
return await self._request('accounts')
async def time(self):
return await self._request('time')
class Client: class Client:
"""API client suitable for use as a long running broker daemon. """API client suitable for use as a long running broker daemon.
""" """
def __init__(self, config: dict): def __init__(self, config: 'configparser.ConfigParser'):
self._sess = asks.Session() self._sess = asks.Session()
self.api = API(self._sess) self.api = API(self._sess)
self.access_data = config self._conf = config
self.access_data = {}
self.user_data = {} self.user_data = {}
self._apply_config(config)
def _apply_config(self, config):
self.access_data = dict(self._conf['questrade'])
async def _new_auth_token(self) -> dict: async def _new_auth_token(self) -> dict:
"""Request a new api authorization ``refresh_token``. """Request a new api authorization ``refresh_token``.
@ -113,7 +107,8 @@ class Client:
(based on a ``expires_at`` time stamp stored in the brokers.ini config) (based on a ``expires_at`` time stamp stored in the brokers.ini config)
expired (normally has a lifetime of 3 days). If ``false is set then expired (normally has a lifetime of 3 days). If ``false is set then
and refreshs token if necessary using the ``refresh_token``. If the and refreshs token if necessary using the ``refresh_token``. If the
``refresh_token`` has expired a new one needs to be provided by the user. ``refresh_token`` has expired a new one needs to be provided by the
user.
""" """
access_token = self.access_data.get('access_token') access_token = self.access_data.get('access_token')
expires = float(self.access_data.get('expires_at', 0)) expires = float(self.access_data.get('expires_at', 0))
@ -122,11 +117,20 @@ class Client:
if not access_token or (expires < time.time()) or force_refresh: if not access_token or (expires < time.time()) or force_refresh:
log.info(f"Refreshing access token {access_token} which expired at" log.info(f"Refreshing access token {access_token} which expired at"
f" {expires_stamp}") f" {expires_stamp}")
try:
data = await self._new_auth_token()
except QuestradeError as qterr:
# likely config ``refresh_token`` is expired
if qterr.args[0].decode() == 'Bad Request':
_token_from_user(self._conf)
self._apply_config(self._conf)
data = await self._new_auth_token() data = await self._new_auth_token()
# store absolute token expiry time # store absolute token expiry time
self.access_data['expires_at'] = time.time() + float( self.access_data['expires_at'] = time.time() + float(
data['expires_in']) data['expires_in'])
# write to config on disk
write_conf(self)
else: else:
log.info(f"\nCurrent access token {access_token} expires at" log.info(f"\nCurrent access token {access_token} expires at"
f" {expires_stamp}\n") f" {expires_stamp}\n")
@ -135,6 +139,53 @@ class Client:
return self.access_data return self.access_data
class API:
"""Questrade API at its finest.
"""
def __init__(self, session: asks.Session):
self._sess = session
async def _request(self, path: str, params=None) -> dict:
resp = await self._sess.get(path=f'/{path}', params=params)
return resproc(resp)
async def accounts(self) -> dict:
return await self._request('accounts')
async def time(self) -> dict:
return await self._request('time')
async def markets(self) -> dict:
return await self._request('markets')
async def search(self, prefix: str) -> dict:
return await self._request(
'symbols/search', params={'prefix': prefix})
async def symbols(self, ids: str = '', names: str = '') -> dict:
log.debug(f"Symbol lookup for {ids}")
return await self._request(
'symbols', params={'ids': ids, 'names': names})
async def quotes(self, ids: str) -> dict:
return await self._request('markets/quotes', params={'ids': ids})
async def token_refresher(client):
"""Coninually refresh the ``access_token`` near its expiry time.
"""
while True:
await trio.sleep(
float(client.access_data['expires_at']) - time.time() - .1)
await client.ensure_access(force_refresh=True)
def _token_from_user(conf: 'configparser.ConfigParser') -> None:
# get from user
refresh_token = input("Please provide your Questrade access token: ")
conf['questrade'] = {'refresh_token': refresh_token}
def get_config() -> "configparser.ConfigParser": def get_config() -> "configparser.ConfigParser":
conf, path = config.load() conf, path = config.load()
if not conf.has_section('questrade') or ( if not conf.has_section('questrade') or (
@ -142,19 +193,16 @@ def get_config() -> "configparser.ConfigParser":
): ):
log.warn( log.warn(
f"No valid refresh token could be found in {path}") f"No valid refresh token could be found in {path}")
# get from user _token_from_user(conf)
refresh_token = input("Please provide your Questrade access token: ")
conf['questrade'] = {'refresh_token': refresh_token}
return conf return conf
async def token_refresher(client): def write_conf(client):
"""Coninually refresh the ``access_token`` near its expiry time. """Save access creds to config file.
""" """
while True: client._conf['questrade'] = client.access_data
await trio.sleep(float(client.access_data['expires_at']) - time.time() - .1) config.write(client._conf)
await client.ensure_access()
@asynccontextmanager @asynccontextmanager
@ -162,8 +210,8 @@ async def get_client() -> Client:
"""Spawn a broker client. """Spawn a broker client.
""" """
conf = get_config() conf = get_config()
log.debug(f"Loaded config:\n{pformat(dict(conf['questrade']))}\n") log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}")
client = Client(dict(conf['questrade'])) client = Client(conf)
await client.ensure_access() await client.ensure_access()
try: try:
@ -178,12 +226,10 @@ async def get_client() -> Client:
await client.api.time() await client.api.time()
accounts = await client.api.accounts() accounts = await client.api.accounts()
log.info(f"Available accounts:\n{pformat(accounts)}\n") log.info(f"Available accounts:\n{colorize_json(accounts)}")
yield client yield client
finally: finally:
# save access creds for next run write_conf(client)
conf['questrade'] = client.access_data
config.write(conf)
async def serve_forever() -> None: async def serve_forever() -> None:
@ -194,3 +240,16 @@ async def serve_forever() -> None:
# await client._revoke_auth_token() # await client._revoke_auth_token()
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
nursery.start_soon(token_refresher, client) nursery.start_soon(token_refresher, client)
async def api(methname, **kwargs) -> dict:
async with get_client() as client:
meth = getattr(client.api, methname, None)
if meth is None:
log.error(f"No api method `{methname}` could be found?")
else:
arg = kwargs.get('arg')
if arg:
return await meth(arg)
else:
return await meth(**kwargs)