Merge pull request #3 from pikers/extend_client

CLI client!
kivy_mainline_and_py3.8
goodboy 2018-02-08 02:41:44 -05:00 committed by GitHub
commit 784777d65a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 282 additions and 98 deletions

View File

@ -1,3 +1,21 @@
piker
------
Destroy all suits
Anti-fragile trading gear for hackers, scientists, quants and underpants warriors.
Install
*******
``piker`` is currently under heavy alpha development and as such should
be cloned from this repo and hacked on directly.
If you insist on trying to install it (which should work) please do it
from this GitHub repository::
pip install git+git://github.com/pikers/piker.git
Tech
****
``piker`` is an attempt at a pro-grade, next-gen open source toolset
for trading and financial analysis. As such, it tries to use as much
cutting edge tech as possible including Python 3.6+ and ``trio``.

View File

@ -1,27 +1,3 @@
"""
Broker client-daemons and general back end machinery.
Broker clients, daemons and general back end machinery.
"""
import sys
import trio
from .questrade import serve_forever
from ..log import get_console_log
def main() -> None:
log = get_console_log('INFO', name='questrade')
argv = sys.argv[1:]
refresh_token = None
if argv:
refresh_token = argv[0]
# main loop
try:
client = trio.run(serve_forever, refresh_token)
except Exception as err:
log.exception(err)
else:
log.info(
f"\nLast refresh_token: {client.access_data['refresh_token']}\n"
f"Last access_token: {client.access_data['access_token']}\n"
)

View File

@ -0,0 +1,66 @@
"""
Console interface to broker client/daemons.
"""
from functools import partial
from importlib import import_module
import click
import trio
from ..log import get_console_log, colorize_json
def run(main, loglevel='info'):
log = get_console_log(loglevel)
# main sandwich
try:
return trio.run(main)
except Exception as err:
log.exception(err)
finally:
log.debug("Exiting piker")
@click.group()
def cli():
pass
@cli.command()
@click.option('--broker', default='questrade', help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.argument('meth', nargs=1)
@click.argument('kwargs', nargs=-1)
def api(meth, kwargs, loglevel, broker):
"""client for testing broker API methods with pretty printing of output.
"""
log = get_console_log(loglevel)
brokermod = import_module('.' + broker, 'piker.brokers')
_kwargs = {}
for kwarg in kwargs:
if '=' not in kwarg:
log.error(f"kwarg `{kwarg}` must be of form <key>=<value>")
else:
key, _, value = kwarg.partition('=')
_kwargs[key] = value
data = run(partial(brokermod.api, meth, **_kwargs), loglevel=loglevel)
if data:
click.echo(colorize_json(data))
@cli.command()
@click.option('--broker', default='questrade', help='Broker backend to use')
@click.option('--loglevel', '-l', default='info', help='Logging level')
@click.argument('tickers', nargs=-1)
def stream(broker, loglevel, tickers):
# import broker module daemon entry point
bm = import_module('.' + broker, 'piker.brokers')
run(
partial(bm.serve_forever, [
partial(bm.poll_tickers, tickers=tickers)
]),
loglevel
)

View File

@ -16,7 +16,6 @@ def load() -> (configparser.ConfigParser, str):
Create a ``broker.ini`` file if one dne.
"""
config = configparser.ConfigParser()
# mode = 'r' if path.exists(_broker_conf_path) else 'a'
read = config.read(_broker_conf_path)
log.debug(f"Read config file {_broker_conf_path}")
return config, _broker_conf_path

View File

@ -1,12 +1,17 @@
"""
Questrade API backend.
"""
from . import config
from ..log import get_logger
from pprint import pformat
import inspect
import json
import time
import datetime
import trio
from async_generator import asynccontextmanager
from . import config
from ..log import get_logger, colorize_json
# TODO: move to urllib3/requests once supported
import asks
asks.init('trio')
@ -25,50 +30,37 @@ def resproc(
resp: asks.response_objects.Response,
return_json: bool = True
) -> asks.response_objects.Response:
"""Raise error on non-200 OK response.
"""
data = resp.json()
log.debug(f"Received json contents:\n{pformat(data)}\n")
"""Process response and return its json content.
Raise the appropriate error on non-200 OK responses.
"""
if not resp.status_code == 200:
raise QuestradeError(resp.body)
try:
data = resp.json()
except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}")
else:
log.debug(f"Received json contents:\n{colorize_json(data)}")
return data if return_json else resp
class API:
"""Questrade API at its finest.
"""
def __init__(self, session: asks.Session):
self._sess = session
async def _request(self, path: str) -> dict:
resp = await self._sess.get(path=f'/{path}')
return resproc(resp)
async def accounts(self):
return await self._request('accounts')
async def time(self):
return await self._request('time')
class Client:
"""API client suitable for use as a long running broker daemon.
"""API client suitable for use as a long running broker daemon or
for single api requests.
"""
def __init__(self, config: dict):
sess = self._sess = asks.Session()
self.api = API(sess)
self.access_data = config
def __init__(self, config: 'configparser.ConfigParser'):
self._sess = asks.Session()
self.api = API(self._sess)
self._conf = config
self.access_data = {}
self.user_data = {}
self._conf = None # possibly set in ``from_config`` factory
self._apply_config(config)
@classmethod
async def from_config(cls, config):
client = cls(dict(config['questrade']))
client._conf = config
await client.enable_access()
return client
def _apply_config(self, config):
self.access_data = dict(self._conf['questrade'])
async def _new_auth_token(self) -> dict:
"""Request a new api authorization ``refresh_token``.
@ -89,7 +81,7 @@ class Client:
return data
async def _prep_sess(self) -> None:
def _prep_sess(self) -> None:
"""Fill http session with auth headers and a base url.
"""
data = self.access_data
@ -110,30 +102,102 @@ class Client:
)
return resp
async def enable_access(self, force_refresh: bool = False) -> dict:
"""Acquire new ``refresh_token`` and/or ``access_token`` if necessary.
async def ensure_access(self, force_refresh: bool = False) -> dict:
"""Acquire new ``access_token`` and/or ``refresh_token`` if necessary.
Only needs to be called if the locally stored ``refresh_token`` has
Checks if the locally cached (file system) ``access_token`` has expired
(based on a ``expires_at`` time stamp stored in the brokers.ini config)
expired (normally has a lifetime of 3 days). If ``false is set then
refresh the access token instead of using the locally cached version.
and refreshs token if necessary using the ``refresh_token``. If the
``refresh_token`` has expired a new one needs to be provided by the
user.
"""
access_token = self.access_data.get('access_token')
expires = float(self.access_data.get('expires_at', 0))
# expired_by = time.time() - float(self.ttl or 0)
# if not access_token or (self.ttl is None) or (expires < time.time()):
expires_stamp = datetime.datetime.fromtimestamp(
expires).strftime('%Y-%m-%d %H:%M:%S')
if not access_token or (expires < time.time()) or force_refresh:
log.info(
f"Access token {access_token} expired @ {expires}, "
"refreshing...")
data = await self._new_auth_token()
log.info(f"Refreshing access token {access_token} which expired at"
f" {expires_stamp}")
try:
data = await self._new_auth_token()
except QuestradeError as qterr:
# likely config ``refresh_token`` is expired
if qterr.args[0].decode() == 'Bad Request':
_token_from_user(self._conf)
self._apply_config(self._conf)
data = await self._new_auth_token()
# store absolute token expiry time
self.access_data['expires_at'] = time.time() + float(
data['expires_in'])
# write to config on disk
write_conf(self)
else:
log.info(f"\nCurrent access token {access_token} expires at"
f" {expires_stamp}\n")
await self._prep_sess()
self._prep_sess()
return self.access_data
async def tickers2ids(self, tickers):
"""Helper routine that take a sequence of ticker symbols and returns
their corresponding QT symbol ids.
"""
data = await self.api.symbols(names=','.join(tickers))
symbols2ids = {}
for ticker, symbol in zip(tickers, data['symbols']):
symbols2ids[symbol['symbol']] = symbol['symbolId']
return symbols2ids
class API:
"""Questrade API at its finest.
"""
def __init__(self, session: asks.Session):
self._sess = session
async def _request(self, path: str, params=None) -> dict:
resp = await self._sess.get(path=f'/{path}', params=params)
return resproc(resp)
async def accounts(self) -> dict:
return await self._request('accounts')
async def time(self) -> dict:
return await self._request('time')
async def markets(self) -> dict:
return await self._request('markets')
async def search(self, prefix: str) -> dict:
return await self._request(
'symbols/search', params={'prefix': prefix})
async def symbols(self, ids: str = '', names: str = '') -> dict:
log.debug(f"Symbol lookup for {ids}")
return await self._request(
'symbols', params={'ids': ids, 'names': names})
async def quotes(self, ids: str) -> dict:
return await self._request('markets/quotes', params={'ids': ids})
async def token_refresher(client):
"""Coninually refresh the ``access_token`` near its expiry time.
"""
while True:
await trio.sleep(
float(client.access_data['expires_at']) - time.time() - .1)
await client.ensure_access(force_refresh=True)
def _token_from_user(conf: 'configparser.ConfigParser') -> None:
# get from user
refresh_token = input("Please provide your Questrade access token: ")
conf['questrade'] = {'refresh_token': refresh_token}
def get_config() -> "configparser.ConfigParser":
conf, path = config.load()
@ -141,46 +205,90 @@ def get_config() -> "configparser.ConfigParser":
not conf['questrade'].get('refresh_token')
):
log.warn(
f"No valid `questrade` refresh token could be found in {path}")
# get from user
refresh_token = input("Please provide your Questrade access token: ")
conf['questrade'] = {'refresh_token': refresh_token}
f"No valid refresh token could be found in {path}")
_token_from_user(conf)
return conf
@asynccontextmanager
async def get_client(refresh_token: str = None) -> Client:
"""Spawn a broker client.
def write_conf(client):
"""Save access creds to config file.
"""
client._conf['questrade'] = client.access_data
config.write(client._conf)
@asynccontextmanager
async def get_client() -> Client:
"""Spawn a broker client.
"""
conf = get_config()
log.debug(f"Loaded questrade config: {conf['questrade']}")
log.info("Waiting on api access...")
client = await Client.from_config(conf)
log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}")
client = Client(conf)
await client.ensure_access()
try:
try: # do a test ping to ensure the access token works
log.debug("Check time to ensure access token is valid")
log.debug("Check time to ensure access token is valid")
try:
await client.api.time()
except Exception as err:
# access token is likely no good
log.warn(f"Access token {client.access_data['access_token']} seems"
f" expired, forcing refresh")
await client.enable_access(force_refresh=True)
await client.ensure_access(force_refresh=True)
await client.api.time()
accounts = await client.api.accounts()
log.info(f"Available accounts:\n{colorize_json(accounts)}")
yield client
finally:
# save access creds for next run
conf['questrade'] = client.access_data
config.write(conf)
write_conf(client)
async def serve_forever(refresh_token: str = None) -> None:
async def serve_forever(tasks) -> None:
"""Start up a client and serve until terminated.
"""
async with get_client(refresh_token) as client:
async with get_client() as client:
# pretty sure this doesn't work
# await client._revoke_auth_token()
return client
async with trio.open_nursery() as nursery:
# launch token manager
nursery.start_soon(token_refresher, client)
# launch children
for task in tasks:
nursery.start_soon(task, client)
async def poll_tickers(client, tickers, rate=2):
"""Auto-poll snap quotes for a sequence of tickers at the given ``rate``
per second.
"""
t2ids = await client.tickers2ids(tickers)
sleeptime = 1. / rate
ids = ','.join(map(str, t2ids.values()))
while True: # use an event here to trigger exit?
quote_data = await client.api.quotes(ids=ids)
await trio.sleep(sleeptime)
async def api(methname, **kwargs) -> dict:
"""Make (proxy) through an api call by name and return its result.
"""
async with get_client() as client:
meth = getattr(client.api, methname, None)
if meth is None:
log.error(f"No api method `{methname}` could be found?")
return
elif not kwargs:
# verify kwargs requirements are met
sig = inspect.signature(meth)
if sig.parameters:
log.error(
f"Argument(s) are required by the `{methname}` method: "
f"{tuple(sig.parameters.keys())}")
return
return await meth(**kwargs)

View File

@ -4,7 +4,9 @@ Log like a forester!
"""
import sys
import logging
import json
import colorlog
from pygments import highlight, lexers, formatters
_proj_name = 'piker'
@ -12,7 +14,8 @@ _proj_name = 'piker'
# (NOTE: we use the '{' format style)
# Here, `thin_white` is just the laymen's gray.
LOG_FORMAT = (
"{bold_white}{thin_white}{asctime}{reset}"
# "{bold_white}{log_color}{asctime}{reset}"
"{log_color}{asctime}{reset}"
" {bold_white}{thin_white}({reset}"
"{thin_white}{threadName}{reset}{bold_white}{thin_white})"
" {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]"
@ -32,7 +35,7 @@ STD_PALETTE = {
'ERROR': 'red',
'WARNING': 'yellow',
'INFO': 'green',
'DEBUG': 'purple',
'DEBUG': 'white',
'TRACE': 'cyan',
'GARBAGE': 'blue',
}
@ -83,3 +86,14 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger:
log.addHandler(handler)
return log
def colorize_json(data, style='algol_nu'):
"""Colorize json output using ``pygments``.
"""
formatted_json = json.dumps(data, sort_keys=True, indent=4)
return highlight(
formatted_json, lexers.JsonLexer(),
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)

View File

@ -28,10 +28,13 @@ setup(
],
entry_points={
'console_scripts': [
'pikerd = piker.brokers:main',
'piker = piker.brokers.cli:cli',
]
},
install_requires=['click', 'colorlog', 'trio', 'attrs'],
install_requires=[
'click', 'colorlog', 'trio', 'attrs', 'async_generator',
'pygments',
],
extras_require={
'questrade': ['asks'],
},