diff --git a/README.rst b/README.rst index 5b2b98f8..42f0f2cc 100644 --- a/README.rst +++ b/README.rst @@ -1,3 +1,21 @@ piker ------ -Destroy all suits +Anti-fragile trading gear for hackers, scientists, quants and underpants warriors. + + +Install +******* +``piker`` is currently under heavy alpha development and as such should +be cloned from this repo and hacked on directly. + +If you insist on trying to install it (which should work) please do it +from this GitHub repository:: + + pip install git+git://github.com/pikers/piker.git + + +Tech +**** +``piker`` is an attempt at a pro-grade, next-gen open source toolset +for trading and financial analysis. As such, it tries to use as much +cutting edge tech as possible including Python 3.6+ and ``trio``. diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index f2c6409c..54afc783 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -1,27 +1,3 @@ """ -Broker client-daemons and general back end machinery. +Broker clients, daemons and general back end machinery. """ -import sys -import trio -from .questrade import serve_forever -from ..log import get_console_log - - -def main() -> None: - log = get_console_log('INFO', name='questrade') - argv = sys.argv[1:] - - refresh_token = None - if argv: - refresh_token = argv[0] - - # main loop - try: - client = trio.run(serve_forever, refresh_token) - except Exception as err: - log.exception(err) - else: - log.info( - f"\nLast refresh_token: {client.access_data['refresh_token']}\n" - f"Last access_token: {client.access_data['access_token']}\n" - ) diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py new file mode 100644 index 00000000..b5501633 --- /dev/null +++ b/piker/brokers/cli.py @@ -0,0 +1,66 @@ +""" +Console interface to broker client/daemons. +""" +from functools import partial +from importlib import import_module + +import click +import trio + +from ..log import get_console_log, colorize_json + + +def run(main, loglevel='info'): + log = get_console_log(loglevel) + + # main sandwich + try: + return trio.run(main) + except Exception as err: + log.exception(err) + finally: + log.debug("Exiting piker") + + +@click.group() +def cli(): + pass + + +@cli.command() +@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.argument('meth', nargs=1) +@click.argument('kwargs', nargs=-1) +def api(meth, kwargs, loglevel, broker): + """client for testing broker API methods with pretty printing of output. + """ + log = get_console_log(loglevel) + brokermod = import_module('.' + broker, 'piker.brokers') + + _kwargs = {} + for kwarg in kwargs: + if '=' not in kwarg: + log.error(f"kwarg `{kwarg}` must be of form =") + else: + key, _, value = kwarg.partition('=') + _kwargs[key] = value + + data = run(partial(brokermod.api, meth, **_kwargs), loglevel=loglevel) + if data: + click.echo(colorize_json(data)) + + +@cli.command() +@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--loglevel', '-l', default='info', help='Logging level') +@click.argument('tickers', nargs=-1) +def stream(broker, loglevel, tickers): + # import broker module daemon entry point + bm = import_module('.' + broker, 'piker.brokers') + run( + partial(bm.serve_forever, [ + partial(bm.poll_tickers, tickers=tickers) + ]), + loglevel + ) diff --git a/piker/brokers/config.py b/piker/brokers/config.py index 5d923adf..979e2605 100644 --- a/piker/brokers/config.py +++ b/piker/brokers/config.py @@ -16,7 +16,6 @@ def load() -> (configparser.ConfigParser, str): Create a ``broker.ini`` file if one dne. """ config = configparser.ConfigParser() - # mode = 'r' if path.exists(_broker_conf_path) else 'a' read = config.read(_broker_conf_path) log.debug(f"Read config file {_broker_conf_path}") return config, _broker_conf_path diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index bdb5e710..3c30efea 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1,12 +1,17 @@ """ Questrade API backend. """ -from . import config -from ..log import get_logger -from pprint import pformat +import inspect +import json import time +import datetime + +import trio from async_generator import asynccontextmanager +from . import config +from ..log import get_logger, colorize_json + # TODO: move to urllib3/requests once supported import asks asks.init('trio') @@ -25,50 +30,37 @@ def resproc( resp: asks.response_objects.Response, return_json: bool = True ) -> asks.response_objects.Response: - """Raise error on non-200 OK response. - """ - data = resp.json() - log.debug(f"Received json contents:\n{pformat(data)}\n") + """Process response and return its json content. + Raise the appropriate error on non-200 OK responses. + """ if not resp.status_code == 200: raise QuestradeError(resp.body) + try: + data = resp.json() + except json.decoder.JSONDecodeError: + log.exception(f"Failed to process {resp}") + else: + log.debug(f"Received json contents:\n{colorize_json(data)}") + return data if return_json else resp -class API: - """Questrade API at its finest. - """ - def __init__(self, session: asks.Session): - self._sess = session - - async def _request(self, path: str) -> dict: - resp = await self._sess.get(path=f'/{path}') - return resproc(resp) - - async def accounts(self): - return await self._request('accounts') - - async def time(self): - return await self._request('time') - - class Client: - """API client suitable for use as a long running broker daemon. + """API client suitable for use as a long running broker daemon or + for single api requests. """ - def __init__(self, config: dict): - sess = self._sess = asks.Session() - self.api = API(sess) - self.access_data = config + def __init__(self, config: 'configparser.ConfigParser'): + self._sess = asks.Session() + self.api = API(self._sess) + self._conf = config + self.access_data = {} self.user_data = {} - self._conf = None # possibly set in ``from_config`` factory + self._apply_config(config) - @classmethod - async def from_config(cls, config): - client = cls(dict(config['questrade'])) - client._conf = config - await client.enable_access() - return client + def _apply_config(self, config): + self.access_data = dict(self._conf['questrade']) async def _new_auth_token(self) -> dict: """Request a new api authorization ``refresh_token``. @@ -89,7 +81,7 @@ class Client: return data - async def _prep_sess(self) -> None: + def _prep_sess(self) -> None: """Fill http session with auth headers and a base url. """ data = self.access_data @@ -110,30 +102,102 @@ class Client: ) return resp - async def enable_access(self, force_refresh: bool = False) -> dict: - """Acquire new ``refresh_token`` and/or ``access_token`` if necessary. + async def ensure_access(self, force_refresh: bool = False) -> dict: + """Acquire new ``access_token`` and/or ``refresh_token`` if necessary. - Only needs to be called if the locally stored ``refresh_token`` has + Checks if the locally cached (file system) ``access_token`` has expired + (based on a ``expires_at`` time stamp stored in the brokers.ini config) expired (normally has a lifetime of 3 days). If ``false is set then - refresh the access token instead of using the locally cached version. + and refreshs token if necessary using the ``refresh_token``. If the + ``refresh_token`` has expired a new one needs to be provided by the + user. """ access_token = self.access_data.get('access_token') expires = float(self.access_data.get('expires_at', 0)) - # expired_by = time.time() - float(self.ttl or 0) - # if not access_token or (self.ttl is None) or (expires < time.time()): + expires_stamp = datetime.datetime.fromtimestamp( + expires).strftime('%Y-%m-%d %H:%M:%S') if not access_token or (expires < time.time()) or force_refresh: - log.info( - f"Access token {access_token} expired @ {expires}, " - "refreshing...") - data = await self._new_auth_token() + log.info(f"Refreshing access token {access_token} which expired at" + f" {expires_stamp}") + try: + data = await self._new_auth_token() + except QuestradeError as qterr: + # likely config ``refresh_token`` is expired + if qterr.args[0].decode() == 'Bad Request': + _token_from_user(self._conf) + self._apply_config(self._conf) + data = await self._new_auth_token() # store absolute token expiry time self.access_data['expires_at'] = time.time() + float( data['expires_in']) + # write to config on disk + write_conf(self) + else: + log.info(f"\nCurrent access token {access_token} expires at" + f" {expires_stamp}\n") - await self._prep_sess() + self._prep_sess() return self.access_data + async def tickers2ids(self, tickers): + """Helper routine that take a sequence of ticker symbols and returns + their corresponding QT symbol ids. + """ + data = await self.api.symbols(names=','.join(tickers)) + symbols2ids = {} + for ticker, symbol in zip(tickers, data['symbols']): + symbols2ids[symbol['symbol']] = symbol['symbolId'] + + return symbols2ids + + +class API: + """Questrade API at its finest. + """ + def __init__(self, session: asks.Session): + self._sess = session + + async def _request(self, path: str, params=None) -> dict: + resp = await self._sess.get(path=f'/{path}', params=params) + return resproc(resp) + + async def accounts(self) -> dict: + return await self._request('accounts') + + async def time(self) -> dict: + return await self._request('time') + + async def markets(self) -> dict: + return await self._request('markets') + + async def search(self, prefix: str) -> dict: + return await self._request( + 'symbols/search', params={'prefix': prefix}) + + async def symbols(self, ids: str = '', names: str = '') -> dict: + log.debug(f"Symbol lookup for {ids}") + return await self._request( + 'symbols', params={'ids': ids, 'names': names}) + + async def quotes(self, ids: str) -> dict: + return await self._request('markets/quotes', params={'ids': ids}) + + +async def token_refresher(client): + """Coninually refresh the ``access_token`` near its expiry time. + """ + while True: + await trio.sleep( + float(client.access_data['expires_at']) - time.time() - .1) + await client.ensure_access(force_refresh=True) + + +def _token_from_user(conf: 'configparser.ConfigParser') -> None: + # get from user + refresh_token = input("Please provide your Questrade access token: ") + conf['questrade'] = {'refresh_token': refresh_token} + def get_config() -> "configparser.ConfigParser": conf, path = config.load() @@ -141,46 +205,90 @@ def get_config() -> "configparser.ConfigParser": not conf['questrade'].get('refresh_token') ): log.warn( - f"No valid `questrade` refresh token could be found in {path}") - # get from user - refresh_token = input("Please provide your Questrade access token: ") - conf['questrade'] = {'refresh_token': refresh_token} + f"No valid refresh token could be found in {path}") + _token_from_user(conf) return conf -@asynccontextmanager -async def get_client(refresh_token: str = None) -> Client: - """Spawn a broker client. +def write_conf(client): + """Save access creds to config file. + """ + client._conf['questrade'] = client.access_data + config.write(client._conf) + +@asynccontextmanager +async def get_client() -> Client: + """Spawn a broker client. """ conf = get_config() - log.debug(f"Loaded questrade config: {conf['questrade']}") - log.info("Waiting on api access...") - client = await Client.from_config(conf) + log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}") + client = Client(conf) + await client.ensure_access() try: - try: # do a test ping to ensure the access token works - log.debug("Check time to ensure access token is valid") + log.debug("Check time to ensure access token is valid") + try: await client.api.time() except Exception as err: # access token is likely no good log.warn(f"Access token {client.access_data['access_token']} seems" f" expired, forcing refresh") - await client.enable_access(force_refresh=True) + await client.ensure_access(force_refresh=True) await client.api.time() + accounts = await client.api.accounts() + log.info(f"Available accounts:\n{colorize_json(accounts)}") yield client finally: - # save access creds for next run - conf['questrade'] = client.access_data - config.write(conf) + write_conf(client) -async def serve_forever(refresh_token: str = None) -> None: +async def serve_forever(tasks) -> None: """Start up a client and serve until terminated. """ - async with get_client(refresh_token) as client: + async with get_client() as client: # pretty sure this doesn't work # await client._revoke_auth_token() - return client + + async with trio.open_nursery() as nursery: + # launch token manager + nursery.start_soon(token_refresher, client) + + # launch children + for task in tasks: + nursery.start_soon(task, client) + + +async def poll_tickers(client, tickers, rate=2): + """Auto-poll snap quotes for a sequence of tickers at the given ``rate`` + per second. + """ + t2ids = await client.tickers2ids(tickers) + sleeptime = 1. / rate + ids = ','.join(map(str, t2ids.values())) + + while True: # use an event here to trigger exit? + quote_data = await client.api.quotes(ids=ids) + await trio.sleep(sleeptime) + + +async def api(methname, **kwargs) -> dict: + """Make (proxy) through an api call by name and return its result. + """ + async with get_client() as client: + meth = getattr(client.api, methname, None) + if meth is None: + log.error(f"No api method `{methname}` could be found?") + return + elif not kwargs: + # verify kwargs requirements are met + sig = inspect.signature(meth) + if sig.parameters: + log.error( + f"Argument(s) are required by the `{methname}` method: " + f"{tuple(sig.parameters.keys())}") + return + + return await meth(**kwargs) diff --git a/piker/log.py b/piker/log.py index 5433d69a..a849dbce 100644 --- a/piker/log.py +++ b/piker/log.py @@ -4,7 +4,9 @@ Log like a forester! """ import sys import logging +import json import colorlog +from pygments import highlight, lexers, formatters _proj_name = 'piker' @@ -12,7 +14,8 @@ _proj_name = 'piker' # (NOTE: we use the '{' format style) # Here, `thin_white` is just the laymen's gray. LOG_FORMAT = ( - "{bold_white}{thin_white}{asctime}{reset}" + # "{bold_white}{log_color}{asctime}{reset}" + "{log_color}{asctime}{reset}" " {bold_white}{thin_white}({reset}" "{thin_white}{threadName}{reset}{bold_white}{thin_white})" " {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]" @@ -32,7 +35,7 @@ STD_PALETTE = { 'ERROR': 'red', 'WARNING': 'yellow', 'INFO': 'green', - 'DEBUG': 'purple', + 'DEBUG': 'white', 'TRACE': 'cyan', 'GARBAGE': 'blue', } @@ -83,3 +86,14 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: log.addHandler(handler) return log + + +def colorize_json(data, style='algol_nu'): + """Colorize json output using ``pygments``. + """ + formatted_json = json.dumps(data, sort_keys=True, indent=4) + return highlight( + formatted_json, lexers.JsonLexer(), + # likeable styles: algol_nu, tango, monokai + formatters.TerminalTrueColorFormatter(style=style) + ) diff --git a/setup.py b/setup.py index ca2daa3e..59974c6e 100755 --- a/setup.py +++ b/setup.py @@ -28,10 +28,13 @@ setup( ], entry_points={ 'console_scripts': [ - 'pikerd = piker.brokers:main', + 'piker = piker.brokers.cli:cli', ] }, - install_requires=['click', 'colorlog', 'trio', 'attrs'], + install_requires=[ + 'click', 'colorlog', 'trio', 'attrs', 'async_generator', + 'pygments', + ], extras_require={ 'questrade': ['asks'], },