diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py new file mode 100644 index 00000000..8d7a3e7b --- /dev/null +++ b/piker/brokers/_util.py @@ -0,0 +1,34 @@ +""" +Handy utils. +""" +import json +import asks +import logging + +from ..log import colorize_json + + +class BrokerError(Exception): + "Generic broker issue" + + +def resproc( + resp: asks.response_objects.Response, + log: logging.Logger, + return_json: bool = True +) -> asks.response_objects.Response: + """Process response and return its json content. + + Raise the appropriate error on non-200 OK responses. + """ + if not resp.status_code == 200: + raise BrokerError(resp.body) + try: + data = resp.json() + except json.decoder.JSONDecodeError: + log.exception(f"Failed to process {resp}:\n{resp.text}") + raise BrokerError(resp.text) + else: + log.trace(f"Received json contents:\n{colorize_json(data)}") + + return data if return_json else resp diff --git a/piker/brokers/core.py b/piker/brokers/core.py new file mode 100644 index 00000000..2f78dec9 --- /dev/null +++ b/piker/brokers/core.py @@ -0,0 +1,104 @@ +""" +Core broker-daemon tasks and API. +""" +import time +import inspect +from types import ModuleType + +import trio + +from .questrade import QuestradeError +from ..log import get_logger +log = get_logger('broker.core') + + +async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: + """Make (proxy through) an api call by name and return its result. + """ + async with brokermod.get_client() as client: + meth = getattr(client.api, methname, None) + if meth is None: + log.error(f"No api method `{methname}` could be found?") + return + elif not kwargs: + # verify kwargs requirements are met + sig = inspect.signature(meth) + if sig.parameters: + log.error( + f"Argument(s) are required by the `{methname}` method: " + f"{tuple(sig.parameters.keys())}") + return + + return await meth(**kwargs) + + +async def quote(brokermod: ModuleType, tickers: [str]) -> dict: + """Return quotes dict for ``tickers``. + """ + async with brokermod.get_client() as client: + return await client.quote(tickers) + + +async def poll_tickers( + client: 'Client', + tickers: [str], + q: trio.Queue, + rate: int = 3, # delay between quote requests + diff_cached: bool = True, # only deliver "new" quotes to the queue +) -> None: + """Stream quotes for a sequence of tickers at the given ``rate`` + per second. + """ + t2ids = await client.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + sleeptime = round(1. / rate, 3) + _cache = {} + + while True: # use an event here to trigger exit? + prequote_start = time.time() + try: + quotes_resp = await client.api.quotes(ids=ids) + except QuestradeError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # out-of-process piker may have renewed already + client._reload_config() + quotes_resp = await client.api.quotes(ids=ids) + else: + raise + + postquote_start = time.time() + quotes = quotes_resp['quotes'] + payload = [] + for quote in quotes: + + if quote['delay'] > 0: + log.warning(f"Delayed quote:\n{quote}") + + if diff_cached: + # if cache is enabled then only deliver "new" changes + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + payload.append(quote) + else: + payload.append(quote) + + if payload: + q.put_nowait(payload) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {req_time + proc_time}") + delay = sleeptime - (req_time + proc_time) + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) = {tot}" + f" secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index cdd3335b..a66e1c50 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1,8 +1,6 @@ """ Questrade API backend. """ -import inspect -import json import time import datetime @@ -10,6 +8,7 @@ import trio from async_generator import asynccontextmanager from . import config +from ._util import resproc, BrokerError from ..log import get_logger, colorize_json # TODO: move to urllib3/requests once supported @@ -26,28 +25,6 @@ class QuestradeError(Exception): "Non-200 OK response code" -def resproc( - resp: asks.response_objects.Response, - return_json: bool = True -) -> asks.response_objects.Response: - """Process response and return its json content. - - Raise the appropriate error on non-200 OK responses. - """ - if not resp.status_code == 200: - raise QuestradeError(resp.body) - - try: - data = resp.json() - except json.decoder.JSONDecodeError: - log.exception(f"Failed to process {resp}:\n{resp.text}") - raise QuestradeError(resp.text) - else: - log.trace(f"Received json contents:\n{colorize_json(data)}") - - return data if return_json else resp - - class Client: """API client suitable for use as a long running broker daemon or single api requests. @@ -80,7 +57,7 @@ class Client: params={'grant_type': 'refresh_token', 'refresh_token': self.access_data['refresh_token']} ) - data = resproc(resp) + data = resproc(resp, log) self.access_data.update(data) return data @@ -121,11 +98,12 @@ class Client: expires_stamp = datetime.datetime.fromtimestamp( expires).strftime('%Y-%m-%d %H:%M:%S') if not access_token or (expires < time.time()) or force_refresh: - log.info(f"Refreshing access token {access_token} which expired at" - f" {expires_stamp}") + log.debug( + f"Refreshing access token {access_token} which expired at" + f" {expires_stamp}") try: data = await self._new_auth_token() - except QuestradeError as qterr: + except BrokerError as qterr: if "We're making some changes" in str(qterr.args[0]): # API service is down raise QuestradeError("API is down for maintenance") @@ -135,13 +113,13 @@ class Client: self._reload_config() try: data = await self._new_auth_token() - except QuestradeError as qterr: + except BrokerError as qterr: if qterr.args[0].decode() == 'Bad Request': # actually expired; get new from user self._reload_config(force_from_user=True) data = await self._new_auth_token() else: - raise qterr + raise QuestradeError(qterr) else: raise qterr @@ -151,8 +129,8 @@ class Client: # write to config on disk write_conf(self) else: - log.info(f"\nCurrent access token {access_token} expires at" - f" {expires_stamp}\n") + log.debug(f"\nCurrent access token {access_token} expires at" + f" {expires_stamp}\n") self._prep_sess() return self.access_data @@ -168,12 +146,13 @@ class Client: return symbols2ids - async def quote(self, tickers): + async def quote(self, tickers: [str]): """Return quotes for each ticker in ``tickers``. """ t2ids = await self.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) - return (await self.api.quotes(ids=ids))['quotes'] + results = (await self.api.quotes(ids=ids))['quotes'] + return {sym: quote for sym, quote in zip(tickers, results)} async def symbols(self, tickers): """Return quotes for each ticker in ``tickers``. @@ -196,7 +175,7 @@ class _API: async def _request(self, path: str, params=None) -> dict: resp = await self._sess.get(path=f'/{path}', params=params) - return resproc(resp) + return resproc(resp, log) async def accounts(self) -> dict: return await self._request('accounts') @@ -268,6 +247,8 @@ def write_conf(client): @asynccontextmanager async def get_client() -> Client: """Spawn a broker client. + + A client must adhere to the method calls in ``piker.broker.core``. """ conf = get_config() log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}") @@ -290,110 +271,3 @@ async def get_client() -> Client: yield client finally: write_conf(client) - - -async def serve_forever(tasks) -> None: - """Start up a client and serve until terminated. - """ - async with get_client() as client: - # pretty sure this doesn't work - # await client._revoke_auth_token() - - async with trio.open_nursery() as nursery: - # launch token manager - nursery.start_soon(token_refresher, client) - - # launch children - for task in tasks: - nursery.start_soon(task, client) - - -async def poll_tickers( - client: Client, tickers: [str], - q: trio.Queue, - rate: int = 3, # delay between quote requests - diff_cached: bool = True, # only deliver "new" quotes to the queue -) -> None: - """Stream quotes for a sequence of tickers at the given ``rate`` - per second. - """ - t2ids = await client.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) - sleeptime = round(1. / rate, 3) - _cache = {} - - while True: # use an event here to trigger exit? - prequote_start = time.time() - try: - quotes_resp = await client.api.quotes(ids=ids) - except QuestradeError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # out-of-process piker may have renewed already - client._reload_config() - quotes_resp = await client.api.quotes(ids=ids) - else: - raise - - postquote_start = time.time() - quotes = quotes_resp['quotes'] - payload = [] - for quote in quotes: - - if quote['delay'] > 0: - log.warning(f"Delayed quote:\n{quote}") - - if diff_cached: - # if cache is enabled then only deliver "new" changes - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - payload.append(quote) - else: - payload.append(quote) - - if payload: - q.put_nowait(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {req_time + proc_time}") - delay = sleeptime - (req_time + proc_time) - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) = {tot}" - f" secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) - - -async def api(methname: str, **kwargs) -> dict: - """Make (proxy through) an api call by name and return its result. - """ - async with get_client() as client: - meth = getattr(client.api, methname, None) - if meth is None: - log.error(f"No api method `{methname}` could be found?") - return - elif not kwargs: - # verify kwargs requirements are met - sig = inspect.signature(meth) - if sig.parameters: - log.error( - f"Argument(s) are required by the `{methname}` method: " - f"{tuple(sig.parameters.keys())}") - return - - return await meth(**kwargs) - - -async def quote(tickers: [str]) -> dict: - """Return quotes dict for ``tickers``. - """ - async with get_client() as client: - return await client.quote(tickers)