From bd7eb16ab25876a11b7ce4ab2b244b72e923a2b6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Mar 2018 13:13:07 -0400 Subject: [PATCH 01/15] Move core tasks to separate module Begin abstracting out broker backends by moving core data query tasks into a module which requires and calls a broker backend API. --- piker/brokers/_util.py | 34 ++++++++ piker/brokers/core.py | 104 ++++++++++++++++++++++++ piker/brokers/questrade.py | 158 ++++--------------------------------- 3 files changed, 154 insertions(+), 142 deletions(-) create mode 100644 piker/brokers/_util.py create mode 100644 piker/brokers/core.py diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py new file mode 100644 index 00000000..8d7a3e7b --- /dev/null +++ b/piker/brokers/_util.py @@ -0,0 +1,34 @@ +""" +Handy utils. +""" +import json +import asks +import logging + +from ..log import colorize_json + + +class BrokerError(Exception): + "Generic broker issue" + + +def resproc( + resp: asks.response_objects.Response, + log: logging.Logger, + return_json: bool = True +) -> asks.response_objects.Response: + """Process response and return its json content. + + Raise the appropriate error on non-200 OK responses. + """ + if not resp.status_code == 200: + raise BrokerError(resp.body) + try: + data = resp.json() + except json.decoder.JSONDecodeError: + log.exception(f"Failed to process {resp}:\n{resp.text}") + raise BrokerError(resp.text) + else: + log.trace(f"Received json contents:\n{colorize_json(data)}") + + return data if return_json else resp diff --git a/piker/brokers/core.py b/piker/brokers/core.py new file mode 100644 index 00000000..2f78dec9 --- /dev/null +++ b/piker/brokers/core.py @@ -0,0 +1,104 @@ +""" +Core broker-daemon tasks and API. +""" +import time +import inspect +from types import ModuleType + +import trio + +from .questrade import QuestradeError +from ..log import get_logger +log = get_logger('broker.core') + + +async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: + """Make (proxy through) an api call by name and return its result. + """ + async with brokermod.get_client() as client: + meth = getattr(client.api, methname, None) + if meth is None: + log.error(f"No api method `{methname}` could be found?") + return + elif not kwargs: + # verify kwargs requirements are met + sig = inspect.signature(meth) + if sig.parameters: + log.error( + f"Argument(s) are required by the `{methname}` method: " + f"{tuple(sig.parameters.keys())}") + return + + return await meth(**kwargs) + + +async def quote(brokermod: ModuleType, tickers: [str]) -> dict: + """Return quotes dict for ``tickers``. + """ + async with brokermod.get_client() as client: + return await client.quote(tickers) + + +async def poll_tickers( + client: 'Client', + tickers: [str], + q: trio.Queue, + rate: int = 3, # delay between quote requests + diff_cached: bool = True, # only deliver "new" quotes to the queue +) -> None: + """Stream quotes for a sequence of tickers at the given ``rate`` + per second. + """ + t2ids = await client.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + sleeptime = round(1. / rate, 3) + _cache = {} + + while True: # use an event here to trigger exit? + prequote_start = time.time() + try: + quotes_resp = await client.api.quotes(ids=ids) + except QuestradeError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # out-of-process piker may have renewed already + client._reload_config() + quotes_resp = await client.api.quotes(ids=ids) + else: + raise + + postquote_start = time.time() + quotes = quotes_resp['quotes'] + payload = [] + for quote in quotes: + + if quote['delay'] > 0: + log.warning(f"Delayed quote:\n{quote}") + + if diff_cached: + # if cache is enabled then only deliver "new" changes + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + payload.append(quote) + else: + payload.append(quote) + + if payload: + q.put_nowait(payload) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {req_time + proc_time}") + delay = sleeptime - (req_time + proc_time) + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) = {tot}" + f" secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index cdd3335b..a66e1c50 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1,8 +1,6 @@ """ Questrade API backend. """ -import inspect -import json import time import datetime @@ -10,6 +8,7 @@ import trio from async_generator import asynccontextmanager from . import config +from ._util import resproc, BrokerError from ..log import get_logger, colorize_json # TODO: move to urllib3/requests once supported @@ -26,28 +25,6 @@ class QuestradeError(Exception): "Non-200 OK response code" -def resproc( - resp: asks.response_objects.Response, - return_json: bool = True -) -> asks.response_objects.Response: - """Process response and return its json content. - - Raise the appropriate error on non-200 OK responses. - """ - if not resp.status_code == 200: - raise QuestradeError(resp.body) - - try: - data = resp.json() - except json.decoder.JSONDecodeError: - log.exception(f"Failed to process {resp}:\n{resp.text}") - raise QuestradeError(resp.text) - else: - log.trace(f"Received json contents:\n{colorize_json(data)}") - - return data if return_json else resp - - class Client: """API client suitable for use as a long running broker daemon or single api requests. @@ -80,7 +57,7 @@ class Client: params={'grant_type': 'refresh_token', 'refresh_token': self.access_data['refresh_token']} ) - data = resproc(resp) + data = resproc(resp, log) self.access_data.update(data) return data @@ -121,11 +98,12 @@ class Client: expires_stamp = datetime.datetime.fromtimestamp( expires).strftime('%Y-%m-%d %H:%M:%S') if not access_token or (expires < time.time()) or force_refresh: - log.info(f"Refreshing access token {access_token} which expired at" - f" {expires_stamp}") + log.debug( + f"Refreshing access token {access_token} which expired at" + f" {expires_stamp}") try: data = await self._new_auth_token() - except QuestradeError as qterr: + except BrokerError as qterr: if "We're making some changes" in str(qterr.args[0]): # API service is down raise QuestradeError("API is down for maintenance") @@ -135,13 +113,13 @@ class Client: self._reload_config() try: data = await self._new_auth_token() - except QuestradeError as qterr: + except BrokerError as qterr: if qterr.args[0].decode() == 'Bad Request': # actually expired; get new from user self._reload_config(force_from_user=True) data = await self._new_auth_token() else: - raise qterr + raise QuestradeError(qterr) else: raise qterr @@ -151,8 +129,8 @@ class Client: # write to config on disk write_conf(self) else: - log.info(f"\nCurrent access token {access_token} expires at" - f" {expires_stamp}\n") + log.debug(f"\nCurrent access token {access_token} expires at" + f" {expires_stamp}\n") self._prep_sess() return self.access_data @@ -168,12 +146,13 @@ class Client: return symbols2ids - async def quote(self, tickers): + async def quote(self, tickers: [str]): """Return quotes for each ticker in ``tickers``. """ t2ids = await self.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) - return (await self.api.quotes(ids=ids))['quotes'] + results = (await self.api.quotes(ids=ids))['quotes'] + return {sym: quote for sym, quote in zip(tickers, results)} async def symbols(self, tickers): """Return quotes for each ticker in ``tickers``. @@ -196,7 +175,7 @@ class _API: async def _request(self, path: str, params=None) -> dict: resp = await self._sess.get(path=f'/{path}', params=params) - return resproc(resp) + return resproc(resp, log) async def accounts(self) -> dict: return await self._request('accounts') @@ -268,6 +247,8 @@ def write_conf(client): @asynccontextmanager async def get_client() -> Client: """Spawn a broker client. + + A client must adhere to the method calls in ``piker.broker.core``. """ conf = get_config() log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}") @@ -290,110 +271,3 @@ async def get_client() -> Client: yield client finally: write_conf(client) - - -async def serve_forever(tasks) -> None: - """Start up a client and serve until terminated. - """ - async with get_client() as client: - # pretty sure this doesn't work - # await client._revoke_auth_token() - - async with trio.open_nursery() as nursery: - # launch token manager - nursery.start_soon(token_refresher, client) - - # launch children - for task in tasks: - nursery.start_soon(task, client) - - -async def poll_tickers( - client: Client, tickers: [str], - q: trio.Queue, - rate: int = 3, # delay between quote requests - diff_cached: bool = True, # only deliver "new" quotes to the queue -) -> None: - """Stream quotes for a sequence of tickers at the given ``rate`` - per second. - """ - t2ids = await client.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) - sleeptime = round(1. / rate, 3) - _cache = {} - - while True: # use an event here to trigger exit? - prequote_start = time.time() - try: - quotes_resp = await client.api.quotes(ids=ids) - except QuestradeError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # out-of-process piker may have renewed already - client._reload_config() - quotes_resp = await client.api.quotes(ids=ids) - else: - raise - - postquote_start = time.time() - quotes = quotes_resp['quotes'] - payload = [] - for quote in quotes: - - if quote['delay'] > 0: - log.warning(f"Delayed quote:\n{quote}") - - if diff_cached: - # if cache is enabled then only deliver "new" changes - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - payload.append(quote) - else: - payload.append(quote) - - if payload: - q.put_nowait(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {req_time + proc_time}") - delay = sleeptime - (req_time + proc_time) - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) = {tot}" - f" secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) - - -async def api(methname: str, **kwargs) -> dict: - """Make (proxy through) an api call by name and return its result. - """ - async with get_client() as client: - meth = getattr(client.api, methname, None) - if meth is None: - log.error(f"No api method `{methname}` could be found?") - return - elif not kwargs: - # verify kwargs requirements are met - sig = inspect.signature(meth) - if sig.parameters: - log.error( - f"Argument(s) are required by the `{methname}` method: " - f"{tuple(sig.parameters.keys())}") - return - - return await meth(**kwargs) - - -async def quote(tickers: [str]) -> dict: - """Return quotes dict for ``tickers``. - """ - async with get_client() as client: - return await client.quote(tickers) From e75f0718a5ecc50c6c03e85e7ae8515a70c94120 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Mar 2018 13:26:12 -0400 Subject: [PATCH 02/15] Add a basic quotes-only robinhood backend We need a yank to test the order system and other end points that require auth. Resolves #2 --- piker/brokers/robinhood.py | 52 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 piker/brokers/robinhood.py diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py new file mode 100644 index 00000000..2c626f15 --- /dev/null +++ b/piker/brokers/robinhood.py @@ -0,0 +1,52 @@ +""" +Robinhood API backend. +""" +import asks +from async_generator import asynccontextmanager + +from ..log import get_logger +from ._util import resproc + +log = get_logger('robinhood') + +_service_ep = 'https://api.robinhood.com' + + +class _API: + """Robinhood API endpoints exposed as methods and wrapped with an + http session. + """ + def __init__(self, session: asks.Session): + self._sess = session + + async def _request(self, path: str, params=None) -> dict: + resp = await self._sess.get(path=f'/{path}', params=params) + return resproc(resp, log) + + async def quotes(self, symbols: str) -> dict: + return await self._request('quotes/', params={'symbols': symbols}) + + async def fundamentals(self, symbols: str) -> dict: + return await self._request('fundamentals/', params={'symbols': symbols}) + + +class Client: + """API client suitable for use as a long running broker daemon or + single api requests. + """ + def __init__(self): + self._sess = asks.Session() + self._sess.base_location = _service_ep + self.api = _API(self._sess) + + async def quote(self, symbols: [str]): + resp = await self.api.quotes(','.join(symbols)) + results = resp['results'] + return {sym: quote for sym, quote in zip(symbols, results)} + + +@asynccontextmanager +async def get_client() -> Client: + """Spawn a RH broker client. + """ + yield Client() From 42e9296b36f6b1df6e3efdd6337be1c39d38d33c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Mar 2018 13:28:24 -0400 Subject: [PATCH 03/15] Adjust cli to new backend api --- piker/cli.py | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index a1641a34..6be7d220 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -8,7 +8,10 @@ import click import trio import pandas as pd -from .log import get_console_log, colorize_json +from .log import get_console_log, colorize_json, get_logger +from .brokers import core + +log = get_logger('cli') def run(main, loglevel='info'): @@ -29,7 +32,7 @@ def cli(): @cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default='questrade', help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--keys', '-k', multiple=True, help='Return results only for these keys') @@ -49,7 +52,7 @@ def api(meth, kwargs, loglevel, broker, keys): key, _, value = kwarg.partition('=') _kwargs[key] = value - data = run(partial(brokermod.api, meth, **_kwargs), loglevel=loglevel) + data = run(partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) if keys: # filter to requested keys @@ -66,7 +69,7 @@ def api(meth, kwargs, loglevel, broker, keys): @cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default='questrade', help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--df-output', '-df', flag_value=True, help='Ouput in `pandas.DataFrame` format') @@ -75,8 +78,11 @@ def quote(loglevel, broker, tickers, df_output): """client for testing broker API methods with pretty printing of output. """ brokermod = import_module('.' + broker, 'piker.brokers') - quotes = run(partial(brokermod.quote, tickers), loglevel=loglevel) - cols = quotes[0].copy() + quotes = run(partial(core.quote, brokermod, tickers), loglevel=loglevel) + if not quotes: + log.error(f"No quotes could be found for {tickers}?") + return + cols = quotes[tickers[0]].copy() cols.pop('symbol') if df_output: df = pd.DataFrame( @@ -90,22 +96,7 @@ def quote(loglevel, broker, tickers, df_output): @cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') -@click.option('--loglevel', '-l', default='info', help='Logging level') -@click.argument('tickers', nargs=-1) -def stream(broker, loglevel, tickers, keys): - # import broker module daemon entry point - bm = import_module('.' + broker, 'piker.brokers') - run( - partial(bm.serve_forever, [ - partial(bm.poll_tickers, tickers=tickers) - ]), - loglevel - ) - - -@cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default='questrade', help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.argument('name', nargs=1, required=True) def watch(loglevel, broker, name): @@ -130,6 +121,9 @@ def watch(loglevel, broker, name): 'GM', 'TSLA', 'DOL.TO', 'CIM', 'SPY', 'SHOP.TO', ], + 'pharma': [ + 'ATE.VN' + ], } # broker_conf_path = os.path.join( # click.get_app_dir('piker'), 'watchlists.json') From 04fa3c7ca45f46fe11c0056a695aa4d5cd510e15 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Mar 2018 15:39:49 -0400 Subject: [PATCH 04/15] Factor out QT quoting specifics into the backend --- piker/brokers/core.py | 83 +++++++++++++++++--------------------- piker/brokers/questrade.py | 23 +++++++++++ 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 2f78dec9..a3e00691 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -4,6 +4,7 @@ Core broker-daemon tasks and API. import time import inspect from types import ModuleType +from typing import AsyncContextManager import trio @@ -41,6 +42,7 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict: async def poll_tickers( client: 'Client', + quoter: AsyncContextManager, tickers: [str], q: trio.Queue, rate: int = 3, # delay between quote requests @@ -49,56 +51,45 @@ async def poll_tickers( """Stream quotes for a sequence of tickers at the given ``rate`` per second. """ - t2ids = await client.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) sleeptime = round(1. / rate, 3) _cache = {} - while True: # use an event here to trigger exit? - prequote_start = time.time() - try: - quotes_resp = await client.api.quotes(ids=ids) - except QuestradeError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # out-of-process piker may have renewed already - client._reload_config() - quotes_resp = await client.api.quotes(ids=ids) - else: - raise + async with quoter(client, tickers) as get_quotes: + while True: # use an event here to trigger exit? + prequote_start = time.time() + quotes = await get_quotes(tickers) + postquote_start = time.time() + payload = [] + for quote in quotes: - postquote_start = time.time() - quotes = quotes_resp['quotes'] - payload = [] - for quote in quotes: + if quote['delay'] > 0: + log.warning(f"Delayed quote:\n{quote}") - if quote['delay'] > 0: - log.warning(f"Delayed quote:\n{quote}") - - if diff_cached: - # if cache is enabled then only deliver "new" changes - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote + if diff_cached: + # if cache is enabled then only deliver "new" changes + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + payload.append(quote) + else: payload.append(quote) + + if payload: + q.put_nowait(payload) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {req_time + proc_time}") + delay = sleeptime - (req_time + proc_time) + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) = {tot}" + f" secs (> {sleeptime}) for processing quotes?") else: - payload.append(quote) - - if payload: - q.put_nowait(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {req_time + proc_time}") - delay = sleeptime - (req_time + proc_time) - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) = {tot}" - f" secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index a66e1c50..a4cc3005 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -271,3 +271,26 @@ async def get_client() -> Client: yield client finally: write_conf(client) + + +@asynccontextmanager +async def quoter(client: Client, tickers: [str]): + t2ids = await client.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + + async def get_quote(tickers): + """Query for quotes using cached symbol ids. + """ + try: + quotes_resp = await client.api.quotes(ids=ids) + except QuestradeError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # out-of-process piker may have renewed already + client._reload_config() + quotes_resp = await client.api.quotes(ids=ids) + else: + raise + + return quotes_resp['quotes'] + + yield get_quote From 48fe280e0c97493e3d3cd53091954b61a32ccb3d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Mar 2018 21:01:55 -0400 Subject: [PATCH 05/15] Return None on failed symbol lookups --- piker/brokers/core.py | 7 ++++++- piker/brokers/questrade.py | 12 +++++++++++- piker/brokers/robinhood.py | 11 ++++++----- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index a3e00691..e87b0ded 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -37,7 +37,12 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict: """Return quotes dict for ``tickers``. """ async with brokermod.get_client() as client: - return await client.quote(tickers) + results = await client.quote(tickers) + for key, val in results.items(): + if val is None: + brokermod.log.warn(f"Could not find symbol {key}?") + + return results async def poll_tickers( diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index a4cc3005..021895f1 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -152,7 +152,15 @@ class Client: t2ids = await self.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) results = (await self.api.quotes(ids=ids))['quotes'] - return {sym: quote for sym, quote in zip(tickers, results)} + quotes = {quote['symbol']: quote for quote in results} + + # set None for all symbols not found + if len(t2ids) < len(tickers): + for ticker in tickers: + if ticker not in quotes: + quotes[ticker] = None + + return quotes async def symbols(self, tickers): """Return quotes for each ticker in ``tickers``. @@ -275,6 +283,8 @@ async def get_client() -> Client: @asynccontextmanager async def quoter(client: Client, tickers: [str]): + """Quoter context. + """ t2ids = await client.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 2c626f15..5a6b7de8 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -1,8 +1,8 @@ """ Robinhood API backend. """ -import asks from async_generator import asynccontextmanager +import asks from ..log import get_logger from ._util import resproc @@ -27,7 +27,8 @@ class _API: return await self._request('quotes/', params={'symbols': symbols}) async def fundamentals(self, symbols: str) -> dict: - return await self._request('fundamentals/', params={'symbols': symbols}) + return await self._request( + 'fundamentals/', params={'symbols': symbols}) class Client: @@ -40,9 +41,9 @@ class Client: self.api = _API(self._sess) async def quote(self, symbols: [str]): - resp = await self.api.quotes(','.join(symbols)) - results = resp['results'] - return {sym: quote for sym, quote in zip(symbols, results)} + results = (await self.api.quotes(','.join(symbols)))['results'] + return {quote['symbol'] if quote else sym: quote + for sym, quote in zip(symbols, results)} @asynccontextmanager From 6c0f1fbdfcaa7335c175dc4c53224c09bedaa657 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Mar 2018 21:02:59 -0400 Subject: [PATCH 06/15] Fill failed symbol lookup df row with NaNs --- piker/cli.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index 6be7d220..7467622f 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -32,7 +32,8 @@ def cli(): @cli.command() -@click.option('--broker', '-b', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default='questrade', + help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--keys', '-k', multiple=True, help='Return results only for these keys') @@ -52,7 +53,8 @@ def api(meth, kwargs, loglevel, broker, keys): key, _, value = kwarg.partition('=') _kwargs[key] = value - data = run(partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) + data = run( + partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) if keys: # filter to requested keys @@ -69,7 +71,8 @@ def api(meth, kwargs, loglevel, broker, keys): @cli.command() -@click.option('--broker', '-b', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default='questrade', + help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--df-output', '-df', flag_value=True, help='Ouput in `pandas.DataFrame` format') @@ -82,12 +85,13 @@ def quote(loglevel, broker, tickers, df_output): if not quotes: log.error(f"No quotes could be found for {tickers}?") return - cols = quotes[tickers[0]].copy() + + cols = next(filter(bool, quotes.values())).copy() cols.pop('symbol') if df_output: df = pd.DataFrame( - quotes, - index=[item['symbol'] for item in quotes], + (quote or {} for quote in quotes.values()), + index=quotes.keys(), columns=cols, ) click.echo(df) @@ -96,7 +100,8 @@ def quote(loglevel, broker, tickers, df_output): @cli.command() -@click.option('--broker', '-b', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default='questrade', + help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.argument('name', nargs=1, required=True) def watch(loglevel, broker, name): From f0149118e132bc1d9adca7466e11387481750b81 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 20 Mar 2018 21:20:55 -0400 Subject: [PATCH 07/15] Specify bid-ask "stacked" cells by argument --- piker/ui/watchlist.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 18593e31..42d45286 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -21,6 +21,7 @@ from kivy.core.window import Window from ..calc import humanize, percent_change from ..log import get_logger from .pager import PagerView +from ..brokers.core import poll_tickers log = get_logger('watchlist') @@ -266,7 +267,7 @@ class Row(GridLayout): turn adjust the text color of the values based on content changes. """ def __init__( - self, record, headers=(), table=None, is_header_row=False, + self, record, headers=(), bidasks=None, table=None, is_header_row=False, **kwargs ): super(Row, self).__init__(cols=len(record), **kwargs) @@ -276,13 +277,9 @@ class Row(GridLayout): self.is_header = is_header_row # create `BidAskCells` first - bidasks = { - 'last': ['bid', 'ask'], - 'size': ['bsize', 'asize'], - 'VWAP': ['low', 'high'], - } - ba_cells = {} layouts = {} + bidasks = bidasks or {} + ba_cells = {} for key, children in bidasks.items(): layout = BidAskLayout( [record[key]] + [record[child] for child in children], @@ -356,10 +353,10 @@ class TickerTable(GridLayout): # for tracking last clicked column header cell self.last_clicked_col_cell = None - def append_row(self, record): + def append_row(self, record, bidasks=None): """Append a `Row` of `Cell` objects to this table. """ - row = Row(record, headers=('symbol',), table=self) + row = Row(record, headers=('symbol',), bidasks=bidasks, table=self) # store ref to each row self.symbols2rows[row._last_record['symbol']] = row self.add_widget(row) @@ -467,7 +464,8 @@ async def _async_main(name, tickers, brokermod): # get long term data including last days close price sd = await client.symbols(tickers) - nursery.start_soon(brokermod.poll_tickers, client, tickers, queue) + nursery.start_soon( + poll_tickers, client, brokermod.quoter, tickers, queue) # get first quotes response pkts = await queue.get() @@ -485,11 +483,19 @@ async def _async_main(name, tickers, brokermod): Builder.load_string(_kv) box = BoxLayout(orientation='vertical', padding=5, spacing=5) + # define bid-ask "stacked" cells + bidasks = { + 'last': ['bid', 'ask'], + 'size': ['bsize', 'asize'], + 'VWAP': ['low', 'high'], + } + # add header row headers = first_quotes[0].keys() header = Row( {key: key for key in headers}, headers=headers, + bidasks=bidasks, is_header_row=True, size_hint=(1, None), ) @@ -501,7 +507,7 @@ async def _async_main(name, tickers, brokermod): size_hint=(1, None), ) for ticker_record in first_quotes: - grid.append_row(ticker_record) + grid.append_row(ticker_record, bidasks=bidasks) # associate the col headers row with the ticker table even though # they're technically wrapped separately in containing BoxLayout header.table = grid From 6b47130c7715a35b7c652bf0cc1416124c470e4b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Mar 2018 10:30:43 -0400 Subject: [PATCH 08/15] Move quote formatting to broker backends --- piker/brokers/core.py | 7 ++- piker/brokers/questrade.py | 86 ++++++++++++++++++++++++++++- piker/brokers/robinhood.py | 107 +++++++++++++++++++++++++++++++++++++ 3 files changed, 197 insertions(+), 3 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e87b0ded..e660a9b5 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -65,9 +65,12 @@ async def poll_tickers( quotes = await get_quotes(tickers) postquote_start = time.time() payload = [] - for quote in quotes: + for symbol, quote in quotes.items(): + # TODO: uhh wtf? + if not quote: + continue - if quote['delay'] > 0: + if quote.get('delay', 0) > 0: log.warning(f"Delayed quote:\n{quote}") if diff_cached: diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 021895f1..034ecd24 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -3,10 +3,12 @@ Questrade API backend. """ import time import datetime +from functools import partial import trio from async_generator import asynccontextmanager +from ..calc import humanize, percent_change from . import config from ._util import resproc, BrokerError from ..log import get_logger, colorize_json @@ -301,6 +303,88 @@ async def quoter(client: Client, tickers: [str]): else: raise - return quotes_resp['quotes'] + return {quote['symbol']: quote for quote in quotes_resp['quotes']} yield get_quote + + +# Questrade key conversion / column order +_qt_keys = { + 'symbol': 'symbol', # done manually in qtconvert + '%': '%', + 'lastTradePrice': 'last', + 'askPrice': 'ask', + 'bidPrice': 'bid', + 'lastTradeSize': 'size', + 'bidSize': 'bsize', + 'askSize': 'asize', + 'VWAP': ('VWAP', partial(round, ndigits=3)), + 'mktcap': ('mktcap', humanize), + '$ vol': ('$ vol', humanize), + 'volume': ('vol', humanize), + 'close': 'close', + 'openPrice': 'open', + 'lowPrice': 'low', + 'highPrice': 'high', + # 'low52w': 'low52w', # put in info widget + # 'high52w': 'high52w', + # "lastTradePriceTrHrs": 7.99, + # "lastTradeTick": "Equal", + # "lastTradeTime": "2018-01-30T18:28:23.434000-05:00", + # "symbolId": 3575753, + # "tier": "", + # 'isHalted': 'halted', # as subscript 'h' + # 'delay': 'delay', # as subscript 'p' +} + +_bidasks = { + 'last': ['bid', 'ask'], + 'size': ['bsize', 'asize'], + 'VWAP': ['low', 'high'], +} + + +def format_quote( + quote: dict, + symbol_data: dict, + keymap: dict = _qt_keys, +) -> (dict, dict): + """Remap a list of quote dicts ``quotes`` using the mapping of old keys + -> new keys ``keymap`` returning 2 dicts: one with raw data and the other + for display. + + Returns 2 dicts: first is the original values mapped by new keys, + and the second is the same but with all values converted to a + "display-friendly" string format. + """ + last = quote['lastTradePrice'] + symbol = quote['symbol'] + previous = symbol_data[symbol]['prevDayClosePrice'] + change = percent_change(previous, last) + share_count = symbol_data[symbol].get('outstandingShares', None) + mktcap = share_count * last if share_count else 'NA' + computed = { + 'symbol': quote['symbol'], + '%': round(change, 3), + 'mktcap': mktcap, + '$ vol': round(quote['VWAP'] * quote['volume'], 3), + 'close': previous, + } + new = {} + displayable = {} + + for key, new_key in keymap.items(): + display_value = value = computed.get(key) or quote.get(key) + + # API servers can return `None` vals when markets are closed (weekend) + value = 0 if value is None else value + + # convert values to a displayble format using available formatting func + if isinstance(new_key, tuple): + new_key, func = new_key + display_value = func(value) + + new[new_key] = value + displayable[new_key] = display_value + + return new, displayable diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 5a6b7de8..405f1fa0 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -1,11 +1,14 @@ """ Robinhood API backend. """ +from functools import partial + from async_generator import asynccontextmanager import asks from ..log import get_logger from ._util import resproc +from ..calc import percent_change log = get_logger('robinhood') @@ -45,9 +48,113 @@ class Client: return {quote['symbol'] if quote else sym: quote for sym, quote in zip(symbols, results)} + async def symbols(self, tickers: [str]): + """Placeholder for the watchlist calling code... + """ + return {} + @asynccontextmanager async def get_client() -> Client: """Spawn a RH broker client. """ yield Client() + + +@asynccontextmanager +async def quoter(client: Client, tickers: [str]): + """Quoter context. + """ + yield client.quote + + +# Robinhood key conversion / column order +_rh_keys = { + 'symbol': 'symbol', # done manually in qtconvert + '%': '%', + 'last_trade_price': ('last', partial(round, ndigits=3)), + 'last_extended_hours_trade_price': 'last pre-mkt', + 'ask_price': ('ask', partial(round, ndigits=3)), + 'bid_price': ('bid', partial(round, ndigits=3)), + # 'lastTradeSize': 'size', # not available? + 'bid_size': 'bsize', + 'ask_size': 'asize', + # 'VWAP': ('VWAP', partial(round, ndigits=3)), + # 'mktcap': ('mktcap', humanize), + # '$ vol': ('$ vol', humanize), + # 'volume': ('vol', humanize), + 'previous_close': 'close', + 'adjusted_previous_close': 'adj close', + # 'trading_halted': 'halted', + + # example fields + # "adjusted_previous_close": "8.1900", + # "ask_price": "8.2800", + # "ask_size": 1200, + # "bid_price": "8.2500", + # "bid_size": 1800, + # "has_traded": true, + # "last_extended_hours_trade_price": null, + # "last_trade_price": "8.2350", + # "last_trade_price_source": "nls", + # "previous_close": "8.1900", + # "previous_close_date": "2018-03-20", + # "symbol": "CRON", + # "trading_halted": false, + # "updated_at": "2018-03-21T13:46:05Z" +} + +_bidasks = { + 'last': ['bid', 'ask'], + # 'size': ['bsize', 'asize'], + # 'VWAP': ['low', 'high'], + # 'last pre-mkt': ['close', 'adj close'], +} + + +def format_quote( + quote: dict, symbol_data: dict, + keymap: dict = _rh_keys, +) -> (dict, dict): + """remap a list of quote dicts ``quotes`` using the mapping of old keys + -> new keys ``keymap`` returning 2 dicts: one with raw data and the other + for display. + + returns 2 dicts: first is the original values mapped by new keys, + and the second is the same but with all values converted to a + "display-friendly" string format. + """ + last = quote['last_trade_price'] + # symbol = quote['symbol'] + previous = quote['previous_close'] + change = percent_change(float(previous), float(last)) + # share_count = symbol_data[symbol].get('outstandingshares', none) + # mktcap = share_count * last if share_count else 'na' + computed = { + 'symbol': quote['symbol'], + '%': round(change, 3), + 'ask_price': float(quote['ask_price']), + 'bid_price': float(quote['bid_price']), + 'last_trade_price': float(quote['last_trade_price']), + # 'mktcap': mktcap, + # '$ vol': round(quote['vwap'] * quote['volume'], 3), + 'close': previous, + } + new = {} + displayable = {} + + for key, new_key in keymap.items(): + display_value = value = computed.get(key) or quote.get(key) + + # api servers can return `None` vals when markets are closed (weekend) + value = 0 if value is None else value + + # convert values to a displayble format using available formatting func + if isinstance(new_key, tuple): + new_key, func = new_key + display_value = func(value) + + new[new_key] = value + displayable[new_key] = display_value + + return new, displayable From 456e86990fba4648f767e7a079076be41d212cdd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Mar 2018 10:44:05 -0400 Subject: [PATCH 09/15] Make watchlist app broker agnostic --- piker/ui/watchlist.py | 111 ++++++++---------------------------------- 1 file changed, 20 insertions(+), 91 deletions(-) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 42d45286..a447fd87 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -6,7 +6,7 @@ Launch with ``piker watch ``. (Currently there's a bunch of questrade specific stuff in here) """ from itertools import chain -from functools import partial +from types import ModuleType import trio from kivy.uix.boxlayout import BoxLayout @@ -18,7 +18,6 @@ from kivy import utils from kivy.app import async_runTouchApp from kivy.core.window import Window -from ..calc import humanize, percent_change from ..log import get_logger from .pager import PagerView from ..brokers.core import poll_tickers @@ -97,81 +96,6 @@ _kv = (f''' ''') -# Questrade key conversion / column order -_qt_keys = { - 'symbol': 'symbol', # done manually in qtconvert - '%': '%', - 'lastTradePrice': 'last', - 'askPrice': 'ask', - 'bidPrice': 'bid', - 'lastTradeSize': 'size', - 'bidSize': 'bsize', - 'askSize': 'asize', - 'VWAP': ('VWAP', partial(round, ndigits=3)), - 'mktcap': ('mktcap', humanize), - '$ vol': ('$ vol', humanize), - 'volume': ('vol', humanize), - 'close': 'close', - 'openPrice': 'open', - 'lowPrice': 'low', - 'highPrice': 'high', - 'low52w': 'low52w', - 'high52w': 'high52w', - # "lastTradePriceTrHrs": 7.99, - # "lastTradeTick": "Equal", - # "lastTradeTime": "2018-01-30T18:28:23.434000-05:00", - # "symbolId": 3575753, - # "tier": "", - # 'isHalted': 'halted', - # 'delay': 'delay', # as subscript 'p' -} - - -def qtconvert( - quote: dict, symbol_data: dict, - keymap: dict = _qt_keys, -) -> (dict, dict): - """Remap a list of quote dicts ``quotes`` using the mapping of old keys - -> new keys ``keymap`` returning 2 dicts: one with raw data and the other - for display. - - Returns 2 dicts: first is the original values mapped by new keys, - and the second is the same but with all values converted to a - "display-friendly" string format. - """ - last = quote['lastTradePrice'] - symbol = quote['symbol'] - previous = symbol_data[symbol]['prevDayClosePrice'] - change = percent_change(previous, last) - share_count = symbol_data[symbol].get('outstandingShares', None) - mktcap = share_count * last if share_count else 'NA' - computed = { - 'symbol': quote['symbol'], - '%': round(change, 3), - 'mktcap': mktcap, - '$ vol': round(quote['VWAP'] * quote['volume'], 3), - 'close': previous, - } - new = {} - displayable = {} - - for key, new_key in keymap.items(): - display_value = value = quote.get(key) or computed.get(key) - - # API servers can return `None` vals when markets are closed (weekend) - value = 0 if value is None else value - - # convert values to a displayble format using available formatting func - if isinstance(new_key, tuple): - new_key, func = new_key - display_value = func(value) - - new[new_key] = value - displayable[new_key] = display_value - - return new, displayable - - class HeaderCell(Button): """Column header cell label. """ @@ -267,7 +191,8 @@ class Row(GridLayout): turn adjust the text color of the values based on content changes. """ def __init__( - self, record, headers=(), bidasks=None, table=None, is_header_row=False, + self, record, headers=(), bidasks=None, table=None, + is_header_row=False, **kwargs ): super(Row, self).__init__(cols=len(record), **kwargs) @@ -392,6 +317,7 @@ class TickerTable(GridLayout): async def update_quotes( + brokermod: ModuleType, widgets: dict, queue: trio.Queue, symbol_data: dict, @@ -425,7 +351,9 @@ async def update_quotes( for quote in first_quotes: sym = quote['symbol'] row = grid.symbols2rows[sym] - record, displayable = qtconvert(quote, symbol_data=symbol_data) + # record, displayable = qtconvert(quote, symbol_data=symbol_data) + record, displayable = brokermod.format_quote( + quote, symbol_data=symbol_data) row.update(record, displayable) color_row(row, record) cache[sym] = (record, row) @@ -437,7 +365,9 @@ async def update_quotes( log.debug("Waiting on quotes") quotes = await queue.get() # new quotes data only for quote in quotes: - record, displayable = qtconvert(quote, symbol_data=symbol_data) + # record, displayable = qtconvert(quote, symbol_data=symbol_data) + record, displayable = brokermod.format_quote( + quote, symbol_data=symbol_data) row = grid.symbols2rows[record['symbol']] cache[record['symbol']] = (record, row) row.update(record, displayable) @@ -469,26 +399,24 @@ async def _async_main(name, tickers, brokermod): # get first quotes response pkts = await queue.get() + first_quotes = [ + # qtconvert(quote, symbol_data=sd)[0] for quote in pkts] + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in pkts] - if pkts[0]['lastTradePrice'] is None: - log.error("Questrade API is down temporarily") + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") nursery.cancel_scope.cancel() return - first_quotes = [ - qtconvert(quote, symbol_data=sd)[0] for quote in pkts] - # build out UI Window.set_title(f"watchlist: {name}\t(press ? for help)") Builder.load_string(_kv) box = BoxLayout(orientation='vertical', padding=5, spacing=5) # define bid-ask "stacked" cells - bidasks = { - 'last': ['bid', 'ask'], - 'size': ['bsize', 'asize'], - 'VWAP': ['low', 'high'], - } + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks # add header row headers = first_quotes[0].keys() @@ -531,4 +459,5 @@ async def _async_main(name, tickers, brokermod): 'pager': pager, } nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon(update_quotes, widgets, queue, sd, pkts) + nursery.start_soon( + update_quotes, brokermod, widgets, queue, sd, pkts) From 1ea784558cc48914fa69b8ac10b6091da303af53 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Mar 2018 17:26:30 -0400 Subject: [PATCH 10/15] Pass through rate arg to `poll_tickers` --- piker/ui/watchlist.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index a447fd87..4aa3b268 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -7,6 +7,7 @@ Launch with ``piker watch ``. """ from itertools import chain from types import ModuleType +from functools import partial import trio from kivy.uix.boxlayout import BoxLayout @@ -383,7 +384,7 @@ async def run_kivy(root, nursery): nursery.cancel_scope.cancel() # cancel all other tasks that may be running -async def _async_main(name, tickers, brokermod): +async def _async_main(name, tickers, brokermod, rate): '''Launch kivy app + all other related tasks. This is started with cli command `piker watch`. @@ -395,7 +396,9 @@ async def _async_main(name, tickers, brokermod): sd = await client.symbols(tickers) nursery.start_soon( - poll_tickers, client, brokermod.quoter, tickers, queue) + partial(poll_tickers, client, brokermod.quoter, tickers, queue, + rate=rate) + ) # get first quotes response pkts = await queue.get() From 933fe980c1abc4b4089c2898bac6cb0f57775953 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Mar 2018 17:27:04 -0400 Subject: [PATCH 11/15] Set default quote rate to 5/sec --- piker/brokers/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e660a9b5..8e52fe1b 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -50,7 +50,7 @@ async def poll_tickers( quoter: AsyncContextManager, tickers: [str], q: trio.Queue, - rate: int = 3, # delay between quote requests + rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` From 200526da8adcc584b4af60d40272c58fc7ca89e3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Mar 2018 17:28:26 -0400 Subject: [PATCH 12/15] Query QT at most 3 quotes/sec to avoid rate limits --- piker/brokers/questrade.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 034ecd24..312b2fa9 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -21,6 +21,7 @@ log = get_logger('questrade') _refresh_token_ep = 'https://login.questrade.com/oauth2/' _version = 'v1' +_rate_limit = 3 # queries/sec class QuestradeError(Exception): From 178e091f413496ba01f72e5afe99b48a4727e3fc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Mar 2018 17:28:40 -0400 Subject: [PATCH 13/15] Make robinhood the default broker backend - Add a rate limit cli option - Allow broker backends to define a max quote query limit - Add an index ETF list to demonstrate robinhood's real-time prices --- piker/cli.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index 7467622f..43540213 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -12,6 +12,7 @@ from .log import get_console_log, colorize_json, get_logger from .brokers import core log = get_logger('cli') +DEFAULT_BROKER = 'robinhood' def run(main, loglevel='info'): @@ -32,7 +33,7 @@ def cli(): @cli.command() -@click.option('--broker', '-b', default='questrade', +@click.option('--broker', '-b', default=DEFAULT_BROKER, help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--keys', '-k', multiple=True, @@ -71,7 +72,7 @@ def api(meth, kwargs, loglevel, broker, keys): @cli.command() -@click.option('--broker', '-b', default='questrade', +@click.option('--broker', '-b', default=DEFAULT_BROKER, help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--df-output', '-df', flag_value=True, @@ -100,15 +101,16 @@ def quote(loglevel, broker, tickers, df_output): @cli.command() -@click.option('--broker', '-b', default='questrade', +@click.option('--broker', '-b', default=DEFAULT_BROKER, help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--rate', '-r', default=5, help='Logging level') @click.argument('name', nargs=1, required=True) -def watch(loglevel, broker, name): +def watch(loglevel, broker, rate, name): """Spawn a watchlist. """ from .ui.watchlist import _async_main - get_console_log(loglevel) # activate console logging + log = get_console_log(loglevel) # activate console logging brokermod = import_module('.' + broker, 'piker.brokers') watchlists = { @@ -122,15 +124,15 @@ def watch(loglevel, broker, name): 'SEED.TO', 'HMJR.TO', 'CMED.TO', 'PAS.VN', 'CRON', ], - 'dad': [ - 'GM', 'TSLA', 'DOL.TO', 'CIM', 'SPY', - 'SHOP.TO', - ], - 'pharma': [ - 'ATE.VN' - ], + 'dad': ['GM', 'TSLA', 'DOL.TO', 'CIM', 'SPY', 'SHOP.TO'], + 'pharma': ['ATE.VN'], + 'indexes': ['SPY', 'DAX', 'QQQ', 'DIA'], } # broker_conf_path = os.path.join( # click.get_app_dir('piker'), 'watchlists.json') # from piker.testing import _quote_streamer as brokermod - trio.run(_async_main, name, watchlists[name], brokermod) + broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + trio.run(_async_main, name, watchlists[name], brokermod, rate) From 08aa996e27fb65135207afe40d81538ca6fc3795 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Mar 2018 17:32:32 -0400 Subject: [PATCH 14/15] Update readme for robinhood usage --- README.rst | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 2f793c83..dedc93f1 100644 --- a/README.rst +++ b/README.rst @@ -21,9 +21,15 @@ For a development install:: pip install cython pip install -e ./ -r requirements.txt -To start the real-time pot-stock watchlist:: +To start the real-time index ETF watchlist:: - piker watch cannabis + piker watch indexes -l info + + +If you want to see super granular price changes, increase the +broker quote query ``rate`` with ``-r``:: + + piker watch indexes -l info -r 10 .. _trio: https://github.com/python-trio/trio From 29ddfe017c0d48b2a3816012e1d7ac3905897b55 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 23 Mar 2018 16:15:56 -0400 Subject: [PATCH 15/15] Use total time calc --- piker/brokers/core.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 8e52fe1b..6d9c7dbd 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -55,9 +55,12 @@ async def poll_tickers( ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` per second. + + A broker-client ``quoter`` async context manager must be provided which + returns an async quote function. """ sleeptime = round(1. / rate, 3) - _cache = {} + _cache = {} # ticker to quote caching async with quoter(client, tickers) as get_quotes: while True: # use an event here to trigger exit? @@ -66,8 +69,9 @@ async def poll_tickers( postquote_start = time.time() payload = [] for symbol, quote in quotes.items(): - # TODO: uhh wtf? - if not quote: + # FIXME: None is returned if a symbol can't be found. + # Consider filtering out such symbols before starting poll loop + if quote is None: continue if quote.get('delay', 0) > 0: @@ -92,8 +96,8 @@ async def poll_tickers( req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) tot = req_time + proc_time - log.debug(f"Request + processing took {req_time + proc_time}") - delay = sleeptime - (req_time + proc_time) + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot if delay <= 0: log.warn( f"Took {req_time} (request) + {proc_time} (processing) = {tot}"