Move core tasks to separate module
Begin abstracting out broker backends by moving core data query tasks into a module which requires and calls a broker backend API.kivy_mainline_and_py3.8
parent
56f38263be
commit
bd7eb16ab2
|
@ -0,0 +1,34 @@
|
||||||
|
"""
|
||||||
|
Handy utils.
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import asks
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from ..log import colorize_json
|
||||||
|
|
||||||
|
|
||||||
|
class BrokerError(Exception):
|
||||||
|
"Generic broker issue"
|
||||||
|
|
||||||
|
|
||||||
|
def resproc(
|
||||||
|
resp: asks.response_objects.Response,
|
||||||
|
log: logging.Logger,
|
||||||
|
return_json: bool = True
|
||||||
|
) -> asks.response_objects.Response:
|
||||||
|
"""Process response and return its json content.
|
||||||
|
|
||||||
|
Raise the appropriate error on non-200 OK responses.
|
||||||
|
"""
|
||||||
|
if not resp.status_code == 200:
|
||||||
|
raise BrokerError(resp.body)
|
||||||
|
try:
|
||||||
|
data = resp.json()
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
||||||
|
raise BrokerError(resp.text)
|
||||||
|
else:
|
||||||
|
log.trace(f"Received json contents:\n{colorize_json(data)}")
|
||||||
|
|
||||||
|
return data if return_json else resp
|
|
@ -0,0 +1,104 @@
|
||||||
|
"""
|
||||||
|
Core broker-daemon tasks and API.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
import inspect
|
||||||
|
from types import ModuleType
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from .questrade import QuestradeError
|
||||||
|
from ..log import get_logger
|
||||||
|
log = get_logger('broker.core')
|
||||||
|
|
||||||
|
|
||||||
|
async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict:
|
||||||
|
"""Make (proxy through) an api call by name and return its result.
|
||||||
|
"""
|
||||||
|
async with brokermod.get_client() as client:
|
||||||
|
meth = getattr(client.api, methname, None)
|
||||||
|
if meth is None:
|
||||||
|
log.error(f"No api method `{methname}` could be found?")
|
||||||
|
return
|
||||||
|
elif not kwargs:
|
||||||
|
# verify kwargs requirements are met
|
||||||
|
sig = inspect.signature(meth)
|
||||||
|
if sig.parameters:
|
||||||
|
log.error(
|
||||||
|
f"Argument(s) are required by the `{methname}` method: "
|
||||||
|
f"{tuple(sig.parameters.keys())}")
|
||||||
|
return
|
||||||
|
|
||||||
|
return await meth(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
async def quote(brokermod: ModuleType, tickers: [str]) -> dict:
|
||||||
|
"""Return quotes dict for ``tickers``.
|
||||||
|
"""
|
||||||
|
async with brokermod.get_client() as client:
|
||||||
|
return await client.quote(tickers)
|
||||||
|
|
||||||
|
|
||||||
|
async def poll_tickers(
|
||||||
|
client: 'Client',
|
||||||
|
tickers: [str],
|
||||||
|
q: trio.Queue,
|
||||||
|
rate: int = 3, # delay between quote requests
|
||||||
|
diff_cached: bool = True, # only deliver "new" quotes to the queue
|
||||||
|
) -> None:
|
||||||
|
"""Stream quotes for a sequence of tickers at the given ``rate``
|
||||||
|
per second.
|
||||||
|
"""
|
||||||
|
t2ids = await client.tickers2ids(tickers)
|
||||||
|
ids = ','.join(map(str, t2ids.values()))
|
||||||
|
sleeptime = round(1. / rate, 3)
|
||||||
|
_cache = {}
|
||||||
|
|
||||||
|
while True: # use an event here to trigger exit?
|
||||||
|
prequote_start = time.time()
|
||||||
|
try:
|
||||||
|
quotes_resp = await client.api.quotes(ids=ids)
|
||||||
|
except QuestradeError as qterr:
|
||||||
|
if "Access token is invalid" in str(qterr.args[0]):
|
||||||
|
# out-of-process piker may have renewed already
|
||||||
|
client._reload_config()
|
||||||
|
quotes_resp = await client.api.quotes(ids=ids)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
postquote_start = time.time()
|
||||||
|
quotes = quotes_resp['quotes']
|
||||||
|
payload = []
|
||||||
|
for quote in quotes:
|
||||||
|
|
||||||
|
if quote['delay'] > 0:
|
||||||
|
log.warning(f"Delayed quote:\n{quote}")
|
||||||
|
|
||||||
|
if diff_cached:
|
||||||
|
# if cache is enabled then only deliver "new" changes
|
||||||
|
symbol = quote['symbol']
|
||||||
|
last = _cache.setdefault(symbol, {})
|
||||||
|
new = set(quote.items()) - set(last.items())
|
||||||
|
if new:
|
||||||
|
log.info(
|
||||||
|
f"New quote {quote['symbol']}:\n{new}")
|
||||||
|
_cache[symbol] = quote
|
||||||
|
payload.append(quote)
|
||||||
|
else:
|
||||||
|
payload.append(quote)
|
||||||
|
|
||||||
|
if payload:
|
||||||
|
q.put_nowait(payload)
|
||||||
|
|
||||||
|
req_time = round(postquote_start - prequote_start, 3)
|
||||||
|
proc_time = round(time.time() - postquote_start, 3)
|
||||||
|
tot = req_time + proc_time
|
||||||
|
log.debug(f"Request + processing took {req_time + proc_time}")
|
||||||
|
delay = sleeptime - (req_time + proc_time)
|
||||||
|
if delay <= 0:
|
||||||
|
log.warn(
|
||||||
|
f"Took {req_time} (request) + {proc_time} (processing) = {tot}"
|
||||||
|
f" secs (> {sleeptime}) for processing quotes?")
|
||||||
|
else:
|
||||||
|
log.debug(f"Sleeping for {delay}")
|
||||||
|
await trio.sleep(delay)
|
|
@ -1,8 +1,6 @@
|
||||||
"""
|
"""
|
||||||
Questrade API backend.
|
Questrade API backend.
|
||||||
"""
|
"""
|
||||||
import inspect
|
|
||||||
import json
|
|
||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
|
@ -10,6 +8,7 @@ import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from . import config
|
from . import config
|
||||||
|
from ._util import resproc, BrokerError
|
||||||
from ..log import get_logger, colorize_json
|
from ..log import get_logger, colorize_json
|
||||||
|
|
||||||
# TODO: move to urllib3/requests once supported
|
# TODO: move to urllib3/requests once supported
|
||||||
|
@ -26,28 +25,6 @@ class QuestradeError(Exception):
|
||||||
"Non-200 OK response code"
|
"Non-200 OK response code"
|
||||||
|
|
||||||
|
|
||||||
def resproc(
|
|
||||||
resp: asks.response_objects.Response,
|
|
||||||
return_json: bool = True
|
|
||||||
) -> asks.response_objects.Response:
|
|
||||||
"""Process response and return its json content.
|
|
||||||
|
|
||||||
Raise the appropriate error on non-200 OK responses.
|
|
||||||
"""
|
|
||||||
if not resp.status_code == 200:
|
|
||||||
raise QuestradeError(resp.body)
|
|
||||||
|
|
||||||
try:
|
|
||||||
data = resp.json()
|
|
||||||
except json.decoder.JSONDecodeError:
|
|
||||||
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
|
||||||
raise QuestradeError(resp.text)
|
|
||||||
else:
|
|
||||||
log.trace(f"Received json contents:\n{colorize_json(data)}")
|
|
||||||
|
|
||||||
return data if return_json else resp
|
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
class Client:
|
||||||
"""API client suitable for use as a long running broker daemon or
|
"""API client suitable for use as a long running broker daemon or
|
||||||
single api requests.
|
single api requests.
|
||||||
|
@ -80,7 +57,7 @@ class Client:
|
||||||
params={'grant_type': 'refresh_token',
|
params={'grant_type': 'refresh_token',
|
||||||
'refresh_token': self.access_data['refresh_token']}
|
'refresh_token': self.access_data['refresh_token']}
|
||||||
)
|
)
|
||||||
data = resproc(resp)
|
data = resproc(resp, log)
|
||||||
self.access_data.update(data)
|
self.access_data.update(data)
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
@ -121,11 +98,12 @@ class Client:
|
||||||
expires_stamp = datetime.datetime.fromtimestamp(
|
expires_stamp = datetime.datetime.fromtimestamp(
|
||||||
expires).strftime('%Y-%m-%d %H:%M:%S')
|
expires).strftime('%Y-%m-%d %H:%M:%S')
|
||||||
if not access_token or (expires < time.time()) or force_refresh:
|
if not access_token or (expires < time.time()) or force_refresh:
|
||||||
log.info(f"Refreshing access token {access_token} which expired at"
|
log.debug(
|
||||||
f" {expires_stamp}")
|
f"Refreshing access token {access_token} which expired at"
|
||||||
|
f" {expires_stamp}")
|
||||||
try:
|
try:
|
||||||
data = await self._new_auth_token()
|
data = await self._new_auth_token()
|
||||||
except QuestradeError as qterr:
|
except BrokerError as qterr:
|
||||||
if "We're making some changes" in str(qterr.args[0]):
|
if "We're making some changes" in str(qterr.args[0]):
|
||||||
# API service is down
|
# API service is down
|
||||||
raise QuestradeError("API is down for maintenance")
|
raise QuestradeError("API is down for maintenance")
|
||||||
|
@ -135,13 +113,13 @@ class Client:
|
||||||
self._reload_config()
|
self._reload_config()
|
||||||
try:
|
try:
|
||||||
data = await self._new_auth_token()
|
data = await self._new_auth_token()
|
||||||
except QuestradeError as qterr:
|
except BrokerError as qterr:
|
||||||
if qterr.args[0].decode() == 'Bad Request':
|
if qterr.args[0].decode() == 'Bad Request':
|
||||||
# actually expired; get new from user
|
# actually expired; get new from user
|
||||||
self._reload_config(force_from_user=True)
|
self._reload_config(force_from_user=True)
|
||||||
data = await self._new_auth_token()
|
data = await self._new_auth_token()
|
||||||
else:
|
else:
|
||||||
raise qterr
|
raise QuestradeError(qterr)
|
||||||
else:
|
else:
|
||||||
raise qterr
|
raise qterr
|
||||||
|
|
||||||
|
@ -151,8 +129,8 @@ class Client:
|
||||||
# write to config on disk
|
# write to config on disk
|
||||||
write_conf(self)
|
write_conf(self)
|
||||||
else:
|
else:
|
||||||
log.info(f"\nCurrent access token {access_token} expires at"
|
log.debug(f"\nCurrent access token {access_token} expires at"
|
||||||
f" {expires_stamp}\n")
|
f" {expires_stamp}\n")
|
||||||
|
|
||||||
self._prep_sess()
|
self._prep_sess()
|
||||||
return self.access_data
|
return self.access_data
|
||||||
|
@ -168,12 +146,13 @@ class Client:
|
||||||
|
|
||||||
return symbols2ids
|
return symbols2ids
|
||||||
|
|
||||||
async def quote(self, tickers):
|
async def quote(self, tickers: [str]):
|
||||||
"""Return quotes for each ticker in ``tickers``.
|
"""Return quotes for each ticker in ``tickers``.
|
||||||
"""
|
"""
|
||||||
t2ids = await self.tickers2ids(tickers)
|
t2ids = await self.tickers2ids(tickers)
|
||||||
ids = ','.join(map(str, t2ids.values()))
|
ids = ','.join(map(str, t2ids.values()))
|
||||||
return (await self.api.quotes(ids=ids))['quotes']
|
results = (await self.api.quotes(ids=ids))['quotes']
|
||||||
|
return {sym: quote for sym, quote in zip(tickers, results)}
|
||||||
|
|
||||||
async def symbols(self, tickers):
|
async def symbols(self, tickers):
|
||||||
"""Return quotes for each ticker in ``tickers``.
|
"""Return quotes for each ticker in ``tickers``.
|
||||||
|
@ -196,7 +175,7 @@ class _API:
|
||||||
|
|
||||||
async def _request(self, path: str, params=None) -> dict:
|
async def _request(self, path: str, params=None) -> dict:
|
||||||
resp = await self._sess.get(path=f'/{path}', params=params)
|
resp = await self._sess.get(path=f'/{path}', params=params)
|
||||||
return resproc(resp)
|
return resproc(resp, log)
|
||||||
|
|
||||||
async def accounts(self) -> dict:
|
async def accounts(self) -> dict:
|
||||||
return await self._request('accounts')
|
return await self._request('accounts')
|
||||||
|
@ -268,6 +247,8 @@ def write_conf(client):
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def get_client() -> Client:
|
async def get_client() -> Client:
|
||||||
"""Spawn a broker client.
|
"""Spawn a broker client.
|
||||||
|
|
||||||
|
A client must adhere to the method calls in ``piker.broker.core``.
|
||||||
"""
|
"""
|
||||||
conf = get_config()
|
conf = get_config()
|
||||||
log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}")
|
log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}")
|
||||||
|
@ -290,110 +271,3 @@ async def get_client() -> Client:
|
||||||
yield client
|
yield client
|
||||||
finally:
|
finally:
|
||||||
write_conf(client)
|
write_conf(client)
|
||||||
|
|
||||||
|
|
||||||
async def serve_forever(tasks) -> None:
|
|
||||||
"""Start up a client and serve until terminated.
|
|
||||||
"""
|
|
||||||
async with get_client() as client:
|
|
||||||
# pretty sure this doesn't work
|
|
||||||
# await client._revoke_auth_token()
|
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
|
||||||
# launch token manager
|
|
||||||
nursery.start_soon(token_refresher, client)
|
|
||||||
|
|
||||||
# launch children
|
|
||||||
for task in tasks:
|
|
||||||
nursery.start_soon(task, client)
|
|
||||||
|
|
||||||
|
|
||||||
async def poll_tickers(
|
|
||||||
client: Client, tickers: [str],
|
|
||||||
q: trio.Queue,
|
|
||||||
rate: int = 3, # delay between quote requests
|
|
||||||
diff_cached: bool = True, # only deliver "new" quotes to the queue
|
|
||||||
) -> None:
|
|
||||||
"""Stream quotes for a sequence of tickers at the given ``rate``
|
|
||||||
per second.
|
|
||||||
"""
|
|
||||||
t2ids = await client.tickers2ids(tickers)
|
|
||||||
ids = ','.join(map(str, t2ids.values()))
|
|
||||||
sleeptime = round(1. / rate, 3)
|
|
||||||
_cache = {}
|
|
||||||
|
|
||||||
while True: # use an event here to trigger exit?
|
|
||||||
prequote_start = time.time()
|
|
||||||
try:
|
|
||||||
quotes_resp = await client.api.quotes(ids=ids)
|
|
||||||
except QuestradeError as qterr:
|
|
||||||
if "Access token is invalid" in str(qterr.args[0]):
|
|
||||||
# out-of-process piker may have renewed already
|
|
||||||
client._reload_config()
|
|
||||||
quotes_resp = await client.api.quotes(ids=ids)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
postquote_start = time.time()
|
|
||||||
quotes = quotes_resp['quotes']
|
|
||||||
payload = []
|
|
||||||
for quote in quotes:
|
|
||||||
|
|
||||||
if quote['delay'] > 0:
|
|
||||||
log.warning(f"Delayed quote:\n{quote}")
|
|
||||||
|
|
||||||
if diff_cached:
|
|
||||||
# if cache is enabled then only deliver "new" changes
|
|
||||||
symbol = quote['symbol']
|
|
||||||
last = _cache.setdefault(symbol, {})
|
|
||||||
new = set(quote.items()) - set(last.items())
|
|
||||||
if new:
|
|
||||||
log.info(
|
|
||||||
f"New quote {quote['symbol']}:\n{new}")
|
|
||||||
_cache[symbol] = quote
|
|
||||||
payload.append(quote)
|
|
||||||
else:
|
|
||||||
payload.append(quote)
|
|
||||||
|
|
||||||
if payload:
|
|
||||||
q.put_nowait(payload)
|
|
||||||
|
|
||||||
req_time = round(postquote_start - prequote_start, 3)
|
|
||||||
proc_time = round(time.time() - postquote_start, 3)
|
|
||||||
tot = req_time + proc_time
|
|
||||||
log.debug(f"Request + processing took {req_time + proc_time}")
|
|
||||||
delay = sleeptime - (req_time + proc_time)
|
|
||||||
if delay <= 0:
|
|
||||||
log.warn(
|
|
||||||
f"Took {req_time} (request) + {proc_time} (processing) = {tot}"
|
|
||||||
f" secs (> {sleeptime}) for processing quotes?")
|
|
||||||
else:
|
|
||||||
log.debug(f"Sleeping for {delay}")
|
|
||||||
await trio.sleep(delay)
|
|
||||||
|
|
||||||
|
|
||||||
async def api(methname: str, **kwargs) -> dict:
|
|
||||||
"""Make (proxy through) an api call by name and return its result.
|
|
||||||
"""
|
|
||||||
async with get_client() as client:
|
|
||||||
meth = getattr(client.api, methname, None)
|
|
||||||
if meth is None:
|
|
||||||
log.error(f"No api method `{methname}` could be found?")
|
|
||||||
return
|
|
||||||
elif not kwargs:
|
|
||||||
# verify kwargs requirements are met
|
|
||||||
sig = inspect.signature(meth)
|
|
||||||
if sig.parameters:
|
|
||||||
log.error(
|
|
||||||
f"Argument(s) are required by the `{methname}` method: "
|
|
||||||
f"{tuple(sig.parameters.keys())}")
|
|
||||||
return
|
|
||||||
|
|
||||||
return await meth(**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
async def quote(tickers: [str]) -> dict:
|
|
||||||
"""Return quotes dict for ``tickers``.
|
|
||||||
"""
|
|
||||||
async with get_client() as client:
|
|
||||||
return await client.quote(tickers)
|
|
||||||
|
|
Loading…
Reference in New Issue