diff --git a/README.rst b/README.rst index 2f793c83..dedc93f1 100644 --- a/README.rst +++ b/README.rst @@ -21,9 +21,15 @@ For a development install:: pip install cython pip install -e ./ -r requirements.txt -To start the real-time pot-stock watchlist:: +To start the real-time index ETF watchlist:: - piker watch cannabis + piker watch indexes -l info + + +If you want to see super granular price changes, increase the +broker quote query ``rate`` with ``-r``:: + + piker watch indexes -l info -r 10 .. _trio: https://github.com/python-trio/trio diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py new file mode 100644 index 00000000..8d7a3e7b --- /dev/null +++ b/piker/brokers/_util.py @@ -0,0 +1,34 @@ +""" +Handy utils. +""" +import json +import asks +import logging + +from ..log import colorize_json + + +class BrokerError(Exception): + "Generic broker issue" + + +def resproc( + resp: asks.response_objects.Response, + log: logging.Logger, + return_json: bool = True +) -> asks.response_objects.Response: + """Process response and return its json content. + + Raise the appropriate error on non-200 OK responses. + """ + if not resp.status_code == 200: + raise BrokerError(resp.body) + try: + data = resp.json() + except json.decoder.JSONDecodeError: + log.exception(f"Failed to process {resp}:\n{resp.text}") + raise BrokerError(resp.text) + else: + log.trace(f"Received json contents:\n{colorize_json(data)}") + + return data if return_json else resp diff --git a/piker/brokers/core.py b/piker/brokers/core.py new file mode 100644 index 00000000..6d9c7dbd --- /dev/null +++ b/piker/brokers/core.py @@ -0,0 +1,107 @@ +""" +Core broker-daemon tasks and API. +""" +import time +import inspect +from types import ModuleType +from typing import AsyncContextManager + +import trio + +from .questrade import QuestradeError +from ..log import get_logger +log = get_logger('broker.core') + + +async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: + """Make (proxy through) an api call by name and return its result. + """ + async with brokermod.get_client() as client: + meth = getattr(client.api, methname, None) + if meth is None: + log.error(f"No api method `{methname}` could be found?") + return + elif not kwargs: + # verify kwargs requirements are met + sig = inspect.signature(meth) + if sig.parameters: + log.error( + f"Argument(s) are required by the `{methname}` method: " + f"{tuple(sig.parameters.keys())}") + return + + return await meth(**kwargs) + + +async def quote(brokermod: ModuleType, tickers: [str]) -> dict: + """Return quotes dict for ``tickers``. + """ + async with brokermod.get_client() as client: + results = await client.quote(tickers) + for key, val in results.items(): + if val is None: + brokermod.log.warn(f"Could not find symbol {key}?") + + return results + + +async def poll_tickers( + client: 'Client', + quoter: AsyncContextManager, + tickers: [str], + q: trio.Queue, + rate: int = 5, # delay between quote requests + diff_cached: bool = True, # only deliver "new" quotes to the queue +) -> None: + """Stream quotes for a sequence of tickers at the given ``rate`` + per second. + + A broker-client ``quoter`` async context manager must be provided which + returns an async quote function. + """ + sleeptime = round(1. / rate, 3) + _cache = {} # ticker to quote caching + + async with quoter(client, tickers) as get_quotes: + while True: # use an event here to trigger exit? + prequote_start = time.time() + quotes = await get_quotes(tickers) + postquote_start = time.time() + payload = [] + for symbol, quote in quotes.items(): + # FIXME: None is returned if a symbol can't be found. + # Consider filtering out such symbols before starting poll loop + if quote is None: + continue + + if quote.get('delay', 0) > 0: + log.warning(f"Delayed quote:\n{quote}") + + if diff_cached: + # if cache is enabled then only deliver "new" changes + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + payload.append(quote) + else: + payload.append(quote) + + if payload: + q.put_nowait(payload) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) = {tot}" + f" secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index cdd3335b..312b2fa9 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1,15 +1,16 @@ """ Questrade API backend. """ -import inspect -import json import time import datetime +from functools import partial import trio from async_generator import asynccontextmanager +from ..calc import humanize, percent_change from . import config +from ._util import resproc, BrokerError from ..log import get_logger, colorize_json # TODO: move to urllib3/requests once supported @@ -20,34 +21,13 @@ log = get_logger('questrade') _refresh_token_ep = 'https://login.questrade.com/oauth2/' _version = 'v1' +_rate_limit = 3 # queries/sec class QuestradeError(Exception): "Non-200 OK response code" -def resproc( - resp: asks.response_objects.Response, - return_json: bool = True -) -> asks.response_objects.Response: - """Process response and return its json content. - - Raise the appropriate error on non-200 OK responses. - """ - if not resp.status_code == 200: - raise QuestradeError(resp.body) - - try: - data = resp.json() - except json.decoder.JSONDecodeError: - log.exception(f"Failed to process {resp}:\n{resp.text}") - raise QuestradeError(resp.text) - else: - log.trace(f"Received json contents:\n{colorize_json(data)}") - - return data if return_json else resp - - class Client: """API client suitable for use as a long running broker daemon or single api requests. @@ -80,7 +60,7 @@ class Client: params={'grant_type': 'refresh_token', 'refresh_token': self.access_data['refresh_token']} ) - data = resproc(resp) + data = resproc(resp, log) self.access_data.update(data) return data @@ -121,11 +101,12 @@ class Client: expires_stamp = datetime.datetime.fromtimestamp( expires).strftime('%Y-%m-%d %H:%M:%S') if not access_token or (expires < time.time()) or force_refresh: - log.info(f"Refreshing access token {access_token} which expired at" - f" {expires_stamp}") + log.debug( + f"Refreshing access token {access_token} which expired at" + f" {expires_stamp}") try: data = await self._new_auth_token() - except QuestradeError as qterr: + except BrokerError as qterr: if "We're making some changes" in str(qterr.args[0]): # API service is down raise QuestradeError("API is down for maintenance") @@ -135,13 +116,13 @@ class Client: self._reload_config() try: data = await self._new_auth_token() - except QuestradeError as qterr: + except BrokerError as qterr: if qterr.args[0].decode() == 'Bad Request': # actually expired; get new from user self._reload_config(force_from_user=True) data = await self._new_auth_token() else: - raise qterr + raise QuestradeError(qterr) else: raise qterr @@ -151,8 +132,8 @@ class Client: # write to config on disk write_conf(self) else: - log.info(f"\nCurrent access token {access_token} expires at" - f" {expires_stamp}\n") + log.debug(f"\nCurrent access token {access_token} expires at" + f" {expires_stamp}\n") self._prep_sess() return self.access_data @@ -168,12 +149,21 @@ class Client: return symbols2ids - async def quote(self, tickers): + async def quote(self, tickers: [str]): """Return quotes for each ticker in ``tickers``. """ t2ids = await self.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) - return (await self.api.quotes(ids=ids))['quotes'] + results = (await self.api.quotes(ids=ids))['quotes'] + quotes = {quote['symbol']: quote for quote in results} + + # set None for all symbols not found + if len(t2ids) < len(tickers): + for ticker in tickers: + if ticker not in quotes: + quotes[ticker] = None + + return quotes async def symbols(self, tickers): """Return quotes for each ticker in ``tickers``. @@ -196,7 +186,7 @@ class _API: async def _request(self, path: str, params=None) -> dict: resp = await self._sess.get(path=f'/{path}', params=params) - return resproc(resp) + return resproc(resp, log) async def accounts(self) -> dict: return await self._request('accounts') @@ -268,6 +258,8 @@ def write_conf(client): @asynccontextmanager async def get_client() -> Client: """Spawn a broker client. + + A client must adhere to the method calls in ``piker.broker.core``. """ conf = get_config() log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}") @@ -292,38 +284,16 @@ async def get_client() -> Client: write_conf(client) -async def serve_forever(tasks) -> None: - """Start up a client and serve until terminated. - """ - async with get_client() as client: - # pretty sure this doesn't work - # await client._revoke_auth_token() - - async with trio.open_nursery() as nursery: - # launch token manager - nursery.start_soon(token_refresher, client) - - # launch children - for task in tasks: - nursery.start_soon(task, client) - - -async def poll_tickers( - client: Client, tickers: [str], - q: trio.Queue, - rate: int = 3, # delay between quote requests - diff_cached: bool = True, # only deliver "new" quotes to the queue -) -> None: - """Stream quotes for a sequence of tickers at the given ``rate`` - per second. +@asynccontextmanager +async def quoter(client: Client, tickers: [str]): + """Quoter context. """ t2ids = await client.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) - sleeptime = round(1. / rate, 3) - _cache = {} - while True: # use an event here to trigger exit? - prequote_start = time.time() + async def get_quote(tickers): + """Query for quotes using cached symbol ids. + """ try: quotes_resp = await client.api.quotes(ids=ids) except QuestradeError as qterr: @@ -334,66 +304,88 @@ async def poll_tickers( else: raise - postquote_start = time.time() - quotes = quotes_resp['quotes'] - payload = [] - for quote in quotes: + return {quote['symbol']: quote for quote in quotes_resp['quotes']} - if quote['delay'] > 0: - log.warning(f"Delayed quote:\n{quote}") - - if diff_cached: - # if cache is enabled then only deliver "new" changes - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - payload.append(quote) - else: - payload.append(quote) - - if payload: - q.put_nowait(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {req_time + proc_time}") - delay = sleeptime - (req_time + proc_time) - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) = {tot}" - f" secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) + yield get_quote -async def api(methname: str, **kwargs) -> dict: - """Make (proxy through) an api call by name and return its result. +# Questrade key conversion / column order +_qt_keys = { + 'symbol': 'symbol', # done manually in qtconvert + '%': '%', + 'lastTradePrice': 'last', + 'askPrice': 'ask', + 'bidPrice': 'bid', + 'lastTradeSize': 'size', + 'bidSize': 'bsize', + 'askSize': 'asize', + 'VWAP': ('VWAP', partial(round, ndigits=3)), + 'mktcap': ('mktcap', humanize), + '$ vol': ('$ vol', humanize), + 'volume': ('vol', humanize), + 'close': 'close', + 'openPrice': 'open', + 'lowPrice': 'low', + 'highPrice': 'high', + # 'low52w': 'low52w', # put in info widget + # 'high52w': 'high52w', + # "lastTradePriceTrHrs": 7.99, + # "lastTradeTick": "Equal", + # "lastTradeTime": "2018-01-30T18:28:23.434000-05:00", + # "symbolId": 3575753, + # "tier": "", + # 'isHalted': 'halted', # as subscript 'h' + # 'delay': 'delay', # as subscript 'p' +} + +_bidasks = { + 'last': ['bid', 'ask'], + 'size': ['bsize', 'asize'], + 'VWAP': ['low', 'high'], +} + + +def format_quote( + quote: dict, + symbol_data: dict, + keymap: dict = _qt_keys, +) -> (dict, dict): + """Remap a list of quote dicts ``quotes`` using the mapping of old keys + -> new keys ``keymap`` returning 2 dicts: one with raw data and the other + for display. + + Returns 2 dicts: first is the original values mapped by new keys, + and the second is the same but with all values converted to a + "display-friendly" string format. """ - async with get_client() as client: - meth = getattr(client.api, methname, None) - if meth is None: - log.error(f"No api method `{methname}` could be found?") - return - elif not kwargs: - # verify kwargs requirements are met - sig = inspect.signature(meth) - if sig.parameters: - log.error( - f"Argument(s) are required by the `{methname}` method: " - f"{tuple(sig.parameters.keys())}") - return + last = quote['lastTradePrice'] + symbol = quote['symbol'] + previous = symbol_data[symbol]['prevDayClosePrice'] + change = percent_change(previous, last) + share_count = symbol_data[symbol].get('outstandingShares', None) + mktcap = share_count * last if share_count else 'NA' + computed = { + 'symbol': quote['symbol'], + '%': round(change, 3), + 'mktcap': mktcap, + '$ vol': round(quote['VWAP'] * quote['volume'], 3), + 'close': previous, + } + new = {} + displayable = {} - return await meth(**kwargs) + for key, new_key in keymap.items(): + display_value = value = computed.get(key) or quote.get(key) + # API servers can return `None` vals when markets are closed (weekend) + value = 0 if value is None else value -async def quote(tickers: [str]) -> dict: - """Return quotes dict for ``tickers``. - """ - async with get_client() as client: - return await client.quote(tickers) + # convert values to a displayble format using available formatting func + if isinstance(new_key, tuple): + new_key, func = new_key + display_value = func(value) + + new[new_key] = value + displayable[new_key] = display_value + + return new, displayable diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py new file mode 100644 index 00000000..405f1fa0 --- /dev/null +++ b/piker/brokers/robinhood.py @@ -0,0 +1,160 @@ +""" +Robinhood API backend. +""" +from functools import partial + +from async_generator import asynccontextmanager +import asks + +from ..log import get_logger +from ._util import resproc +from ..calc import percent_change + +log = get_logger('robinhood') + +_service_ep = 'https://api.robinhood.com' + + +class _API: + """Robinhood API endpoints exposed as methods and wrapped with an + http session. + """ + def __init__(self, session: asks.Session): + self._sess = session + + async def _request(self, path: str, params=None) -> dict: + resp = await self._sess.get(path=f'/{path}', params=params) + return resproc(resp, log) + + async def quotes(self, symbols: str) -> dict: + return await self._request('quotes/', params={'symbols': symbols}) + + async def fundamentals(self, symbols: str) -> dict: + return await self._request( + 'fundamentals/', params={'symbols': symbols}) + + +class Client: + """API client suitable for use as a long running broker daemon or + single api requests. + """ + def __init__(self): + self._sess = asks.Session() + self._sess.base_location = _service_ep + self.api = _API(self._sess) + + async def quote(self, symbols: [str]): + results = (await self.api.quotes(','.join(symbols)))['results'] + return {quote['symbol'] if quote else sym: quote + for sym, quote in zip(symbols, results)} + + async def symbols(self, tickers: [str]): + """Placeholder for the watchlist calling code... + """ + return {} + + +@asynccontextmanager +async def get_client() -> Client: + """Spawn a RH broker client. + """ + yield Client() + + +@asynccontextmanager +async def quoter(client: Client, tickers: [str]): + """Quoter context. + """ + yield client.quote + + +# Robinhood key conversion / column order +_rh_keys = { + 'symbol': 'symbol', # done manually in qtconvert + '%': '%', + 'last_trade_price': ('last', partial(round, ndigits=3)), + 'last_extended_hours_trade_price': 'last pre-mkt', + 'ask_price': ('ask', partial(round, ndigits=3)), + 'bid_price': ('bid', partial(round, ndigits=3)), + # 'lastTradeSize': 'size', # not available? + 'bid_size': 'bsize', + 'ask_size': 'asize', + # 'VWAP': ('VWAP', partial(round, ndigits=3)), + # 'mktcap': ('mktcap', humanize), + # '$ vol': ('$ vol', humanize), + # 'volume': ('vol', humanize), + 'previous_close': 'close', + 'adjusted_previous_close': 'adj close', + # 'trading_halted': 'halted', + + # example fields + # "adjusted_previous_close": "8.1900", + # "ask_price": "8.2800", + # "ask_size": 1200, + # "bid_price": "8.2500", + # "bid_size": 1800, + # "has_traded": true, + # "last_extended_hours_trade_price": null, + # "last_trade_price": "8.2350", + # "last_trade_price_source": "nls", + # "previous_close": "8.1900", + # "previous_close_date": "2018-03-20", + # "symbol": "CRON", + # "trading_halted": false, + # "updated_at": "2018-03-21T13:46:05Z" +} + +_bidasks = { + 'last': ['bid', 'ask'], + # 'size': ['bsize', 'asize'], + # 'VWAP': ['low', 'high'], + # 'last pre-mkt': ['close', 'adj close'], +} + + +def format_quote( + quote: dict, symbol_data: dict, + keymap: dict = _rh_keys, +) -> (dict, dict): + """remap a list of quote dicts ``quotes`` using the mapping of old keys + -> new keys ``keymap`` returning 2 dicts: one with raw data and the other + for display. + + returns 2 dicts: first is the original values mapped by new keys, + and the second is the same but with all values converted to a + "display-friendly" string format. + """ + last = quote['last_trade_price'] + # symbol = quote['symbol'] + previous = quote['previous_close'] + change = percent_change(float(previous), float(last)) + # share_count = symbol_data[symbol].get('outstandingshares', none) + # mktcap = share_count * last if share_count else 'na' + computed = { + 'symbol': quote['symbol'], + '%': round(change, 3), + 'ask_price': float(quote['ask_price']), + 'bid_price': float(quote['bid_price']), + 'last_trade_price': float(quote['last_trade_price']), + # 'mktcap': mktcap, + # '$ vol': round(quote['vwap'] * quote['volume'], 3), + 'close': previous, + } + new = {} + displayable = {} + + for key, new_key in keymap.items(): + display_value = value = computed.get(key) or quote.get(key) + + # api servers can return `None` vals when markets are closed (weekend) + value = 0 if value is None else value + + # convert values to a displayble format using available formatting func + if isinstance(new_key, tuple): + new_key, func = new_key + display_value = func(value) + + new[new_key] = value + displayable[new_key] = display_value + + return new, displayable diff --git a/piker/cli.py b/piker/cli.py index a1641a34..43540213 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -8,7 +8,11 @@ import click import trio import pandas as pd -from .log import get_console_log, colorize_json +from .log import get_console_log, colorize_json, get_logger +from .brokers import core + +log = get_logger('cli') +DEFAULT_BROKER = 'robinhood' def run(main, loglevel='info'): @@ -29,7 +33,8 @@ def cli(): @cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--keys', '-k', multiple=True, help='Return results only for these keys') @@ -49,7 +54,8 @@ def api(meth, kwargs, loglevel, broker, keys): key, _, value = kwarg.partition('=') _kwargs[key] = value - data = run(partial(brokermod.api, meth, **_kwargs), loglevel=loglevel) + data = run( + partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) if keys: # filter to requested keys @@ -66,7 +72,8 @@ def api(meth, kwargs, loglevel, broker, keys): @cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--df-output', '-df', flag_value=True, help='Ouput in `pandas.DataFrame` format') @@ -75,13 +82,17 @@ def quote(loglevel, broker, tickers, df_output): """client for testing broker API methods with pretty printing of output. """ brokermod = import_module('.' + broker, 'piker.brokers') - quotes = run(partial(brokermod.quote, tickers), loglevel=loglevel) - cols = quotes[0].copy() + quotes = run(partial(core.quote, brokermod, tickers), loglevel=loglevel) + if not quotes: + log.error(f"No quotes could be found for {tickers}?") + return + + cols = next(filter(bool, quotes.values())).copy() cols.pop('symbol') if df_output: df = pd.DataFrame( - quotes, - index=[item['symbol'] for item in quotes], + (quote or {} for quote in quotes.values()), + index=quotes.keys(), columns=cols, ) click.echo(df) @@ -90,29 +101,16 @@ def quote(loglevel, broker, tickers, df_output): @cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') -@click.option('--loglevel', '-l', default='info', help='Logging level') -@click.argument('tickers', nargs=-1) -def stream(broker, loglevel, tickers, keys): - # import broker module daemon entry point - bm = import_module('.' + broker, 'piker.brokers') - run( - partial(bm.serve_forever, [ - partial(bm.poll_tickers, tickers=tickers) - ]), - loglevel - ) - - -@cli.command() -@click.option('--broker', default='questrade', help='Broker backend to use') +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--rate', '-r', default=5, help='Logging level') @click.argument('name', nargs=1, required=True) -def watch(loglevel, broker, name): +def watch(loglevel, broker, rate, name): """Spawn a watchlist. """ from .ui.watchlist import _async_main - get_console_log(loglevel) # activate console logging + log = get_console_log(loglevel) # activate console logging brokermod = import_module('.' + broker, 'piker.brokers') watchlists = { @@ -126,12 +124,15 @@ def watch(loglevel, broker, name): 'SEED.TO', 'HMJR.TO', 'CMED.TO', 'PAS.VN', 'CRON', ], - 'dad': [ - 'GM', 'TSLA', 'DOL.TO', 'CIM', 'SPY', - 'SHOP.TO', - ], + 'dad': ['GM', 'TSLA', 'DOL.TO', 'CIM', 'SPY', 'SHOP.TO'], + 'pharma': ['ATE.VN'], + 'indexes': ['SPY', 'DAX', 'QQQ', 'DIA'], } # broker_conf_path = os.path.join( # click.get_app_dir('piker'), 'watchlists.json') # from piker.testing import _quote_streamer as brokermod - trio.run(_async_main, name, watchlists[name], brokermod) + broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + trio.run(_async_main, name, watchlists[name], brokermod, rate) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 18593e31..4aa3b268 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -6,6 +6,7 @@ Launch with ``piker watch ``. (Currently there's a bunch of questrade specific stuff in here) """ from itertools import chain +from types import ModuleType from functools import partial import trio @@ -18,9 +19,9 @@ from kivy import utils from kivy.app import async_runTouchApp from kivy.core.window import Window -from ..calc import humanize, percent_change from ..log import get_logger from .pager import PagerView +from ..brokers.core import poll_tickers log = get_logger('watchlist') @@ -96,81 +97,6 @@ _kv = (f''' ''') -# Questrade key conversion / column order -_qt_keys = { - 'symbol': 'symbol', # done manually in qtconvert - '%': '%', - 'lastTradePrice': 'last', - 'askPrice': 'ask', - 'bidPrice': 'bid', - 'lastTradeSize': 'size', - 'bidSize': 'bsize', - 'askSize': 'asize', - 'VWAP': ('VWAP', partial(round, ndigits=3)), - 'mktcap': ('mktcap', humanize), - '$ vol': ('$ vol', humanize), - 'volume': ('vol', humanize), - 'close': 'close', - 'openPrice': 'open', - 'lowPrice': 'low', - 'highPrice': 'high', - 'low52w': 'low52w', - 'high52w': 'high52w', - # "lastTradePriceTrHrs": 7.99, - # "lastTradeTick": "Equal", - # "lastTradeTime": "2018-01-30T18:28:23.434000-05:00", - # "symbolId": 3575753, - # "tier": "", - # 'isHalted': 'halted', - # 'delay': 'delay', # as subscript 'p' -} - - -def qtconvert( - quote: dict, symbol_data: dict, - keymap: dict = _qt_keys, -) -> (dict, dict): - """Remap a list of quote dicts ``quotes`` using the mapping of old keys - -> new keys ``keymap`` returning 2 dicts: one with raw data and the other - for display. - - Returns 2 dicts: first is the original values mapped by new keys, - and the second is the same but with all values converted to a - "display-friendly" string format. - """ - last = quote['lastTradePrice'] - symbol = quote['symbol'] - previous = symbol_data[symbol]['prevDayClosePrice'] - change = percent_change(previous, last) - share_count = symbol_data[symbol].get('outstandingShares', None) - mktcap = share_count * last if share_count else 'NA' - computed = { - 'symbol': quote['symbol'], - '%': round(change, 3), - 'mktcap': mktcap, - '$ vol': round(quote['VWAP'] * quote['volume'], 3), - 'close': previous, - } - new = {} - displayable = {} - - for key, new_key in keymap.items(): - display_value = value = quote.get(key) or computed.get(key) - - # API servers can return `None` vals when markets are closed (weekend) - value = 0 if value is None else value - - # convert values to a displayble format using available formatting func - if isinstance(new_key, tuple): - new_key, func = new_key - display_value = func(value) - - new[new_key] = value - displayable[new_key] = display_value - - return new, displayable - - class HeaderCell(Button): """Column header cell label. """ @@ -266,7 +192,8 @@ class Row(GridLayout): turn adjust the text color of the values based on content changes. """ def __init__( - self, record, headers=(), table=None, is_header_row=False, + self, record, headers=(), bidasks=None, table=None, + is_header_row=False, **kwargs ): super(Row, self).__init__(cols=len(record), **kwargs) @@ -276,13 +203,9 @@ class Row(GridLayout): self.is_header = is_header_row # create `BidAskCells` first - bidasks = { - 'last': ['bid', 'ask'], - 'size': ['bsize', 'asize'], - 'VWAP': ['low', 'high'], - } - ba_cells = {} layouts = {} + bidasks = bidasks or {} + ba_cells = {} for key, children in bidasks.items(): layout = BidAskLayout( [record[key]] + [record[child] for child in children], @@ -356,10 +279,10 @@ class TickerTable(GridLayout): # for tracking last clicked column header cell self.last_clicked_col_cell = None - def append_row(self, record): + def append_row(self, record, bidasks=None): """Append a `Row` of `Cell` objects to this table. """ - row = Row(record, headers=('symbol',), table=self) + row = Row(record, headers=('symbol',), bidasks=bidasks, table=self) # store ref to each row self.symbols2rows[row._last_record['symbol']] = row self.add_widget(row) @@ -395,6 +318,7 @@ class TickerTable(GridLayout): async def update_quotes( + brokermod: ModuleType, widgets: dict, queue: trio.Queue, symbol_data: dict, @@ -428,7 +352,9 @@ async def update_quotes( for quote in first_quotes: sym = quote['symbol'] row = grid.symbols2rows[sym] - record, displayable = qtconvert(quote, symbol_data=symbol_data) + # record, displayable = qtconvert(quote, symbol_data=symbol_data) + record, displayable = brokermod.format_quote( + quote, symbol_data=symbol_data) row.update(record, displayable) color_row(row, record) cache[sym] = (record, row) @@ -440,7 +366,9 @@ async def update_quotes( log.debug("Waiting on quotes") quotes = await queue.get() # new quotes data only for quote in quotes: - record, displayable = qtconvert(quote, symbol_data=symbol_data) + # record, displayable = qtconvert(quote, symbol_data=symbol_data) + record, displayable = brokermod.format_quote( + quote, symbol_data=symbol_data) row = grid.symbols2rows[record['symbol']] cache[record['symbol']] = (record, row) row.update(record, displayable) @@ -456,7 +384,7 @@ async def run_kivy(root, nursery): nursery.cancel_scope.cancel() # cancel all other tasks that may be running -async def _async_main(name, tickers, brokermod): +async def _async_main(name, tickers, brokermod, rate): '''Launch kivy app + all other related tasks. This is started with cli command `piker watch`. @@ -467,29 +395,38 @@ async def _async_main(name, tickers, brokermod): # get long term data including last days close price sd = await client.symbols(tickers) - nursery.start_soon(brokermod.poll_tickers, client, tickers, queue) + nursery.start_soon( + partial(poll_tickers, client, brokermod.quoter, tickers, queue, + rate=rate) + ) # get first quotes response pkts = await queue.get() + first_quotes = [ + # qtconvert(quote, symbol_data=sd)[0] for quote in pkts] + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in pkts] - if pkts[0]['lastTradePrice'] is None: - log.error("Questrade API is down temporarily") + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") nursery.cancel_scope.cancel() return - first_quotes = [ - qtconvert(quote, symbol_data=sd)[0] for quote in pkts] - # build out UI Window.set_title(f"watchlist: {name}\t(press ? for help)") Builder.load_string(_kv) box = BoxLayout(orientation='vertical', padding=5, spacing=5) + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks + # add header row headers = first_quotes[0].keys() header = Row( {key: key for key in headers}, headers=headers, + bidasks=bidasks, is_header_row=True, size_hint=(1, None), ) @@ -501,7 +438,7 @@ async def _async_main(name, tickers, brokermod): size_hint=(1, None), ) for ticker_record in first_quotes: - grid.append_row(ticker_record) + grid.append_row(ticker_record, bidasks=bidasks) # associate the col headers row with the ticker table even though # they're technically wrapped separately in containing BoxLayout header.table = grid @@ -525,4 +462,5 @@ async def _async_main(name, tickers, brokermod): 'pager': pager, } nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon(update_quotes, widgets, queue, sd, pkts) + nursery.start_soon( + update_quotes, brokermod, widgets, queue, sd, pkts)