More client enhancements

- Extend the qt api to include candles (not working yet), balances, positions.
- Add a `quote()` method to the `Client` for batch ticker quotes and expose
  it through a CLI subcommand.
- Make `poll_tickers` push new quotes to a `trio.Queue`
kivy_mainline_and_py3.8
Tyler Goodlet 2018-02-08 02:18:33 -05:00
parent a2d38f49cf
commit 151e7bf4fa
2 changed files with 67 additions and 12 deletions

View File

@ -0,0 +1,3 @@
"""
piker: trading toolz for hackerz.
"""

View File

@ -49,11 +49,13 @@ def resproc(
class Client:
"""API client suitable for use as a long running broker daemon or
for single api requests.
single api requests.
Provides a high-level api which wraps the underlying endpoint calls.
"""
def __init__(self, config: 'configparser.ConfigParser'):
self._sess = asks.Session()
self.api = API(self._sess)
self.api = _API(self._sess)
self._conf = config
self.access_data = {}
self.user_data = {}
@ -151,9 +153,17 @@ class Client:
return symbols2ids
async def quote(self, tickers):
"""Return quotes for each ticker in ``tickers``.
"""
t2ids = await self.tickers2ids(tickers)
ids = ','.join(map(str, t2ids.values()))
return await self.api.quotes(ids=ids)
class API:
"""Questrade API at its finest.
class _API:
"""Questrade API endpoints exposed as methods and wrapped with an
http session.
"""
def __init__(self, session: asks.Session):
self._sess = session
@ -183,6 +193,15 @@ class API:
async def quotes(self, ids: str) -> dict:
return await self._request('markets/quotes', params={'ids': ids})
async def candles(self, id: str, start: str, end, interval) -> dict:
return await self._request(f'markets/candles/{id}', params={})
async def balances(self, id: str) -> dict:
return await self._request(f'accounts/{id}/balances')
async def postions(self, id: str) -> dict:
return await self._request(f'accounts/{id}/positions')
async def token_refresher(client):
"""Coninually refresh the ``access_token`` near its expiry time.
@ -194,7 +213,8 @@ async def token_refresher(client):
def _token_from_user(conf: 'configparser.ConfigParser') -> None:
# get from user
"""Get API token from the user on the console.
"""
refresh_token = input("Please provide your Questrade access token: ")
conf['questrade'] = {'refresh_token': refresh_token}
@ -261,21 +281,46 @@ async def serve_forever(tasks) -> None:
nursery.start_soon(task, client)
async def poll_tickers(client, tickers, rate=2):
"""Auto-poll snap quotes for a sequence of tickers at the given ``rate``
async def poll_tickers(
client: Client, tickers: [str], q: trio.Queue, rate: int = 2,
) -> None:
"""Stream quotes for a sequence of tickers at the given ``rate``
per second.
"""
t2ids = await client.tickers2ids(tickers)
sleeptime = 1. / rate
ids = ','.join(map(str, t2ids.values()))
sleeptime = 1. / rate
_cache = {}
while True: # use an event here to trigger exit?
quote_data = await client.api.quotes(ids=ids)
await trio.sleep(sleeptime)
quotes_resp = await client.api.quotes(ids=ids)
start = time.time()
quotes = quotes_resp['quotes']
# log.trace(quotes)
# only push quotes with "new" data
payload = []
for quote in quotes:
symbol = quote['symbol']
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.debug(f"New quote {symbol} data:\n{new}")
_cache[symbol] = quote
payload.append(quote)
if payload:
q.put_nowait(payload)
proc_time = time.time() - start
delay = sleeptime - proc_time
if delay <= 0:
log.warn(f"Took {proc_time} seconds for processing quotes?")
await trio.sleep(delay)
async def api(methname, **kwargs) -> dict:
"""Make (proxy) through an api call by name and return its result.
async def api(methname: str, **kwargs) -> dict:
"""Make (proxy through) an api call by name and return its result.
"""
async with get_client() as client:
meth = getattr(client.api, methname, None)
@ -292,3 +337,10 @@ async def api(methname, **kwargs) -> dict:
return
return await meth(**kwargs)
async def quote(tickers: [str]) -> dict:
"""Return quotes dict for ``tickers``.
"""
async with get_client() as client:
return await client.quote(tickers)