Factor out QT quoting specifics into the backend
parent
42e9296b36
commit
04fa3c7ca4
|
@ -4,6 +4,7 @@ Core broker-daemon tasks and API.
|
||||||
import time
|
import time
|
||||||
import inspect
|
import inspect
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
from typing import AsyncContextManager
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -41,6 +42,7 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict:
|
||||||
|
|
||||||
async def poll_tickers(
|
async def poll_tickers(
|
||||||
client: 'Client',
|
client: 'Client',
|
||||||
|
quoter: AsyncContextManager,
|
||||||
tickers: [str],
|
tickers: [str],
|
||||||
q: trio.Queue,
|
q: trio.Queue,
|
||||||
rate: int = 3, # delay between quote requests
|
rate: int = 3, # delay between quote requests
|
||||||
|
@ -49,56 +51,45 @@ async def poll_tickers(
|
||||||
"""Stream quotes for a sequence of tickers at the given ``rate``
|
"""Stream quotes for a sequence of tickers at the given ``rate``
|
||||||
per second.
|
per second.
|
||||||
"""
|
"""
|
||||||
t2ids = await client.tickers2ids(tickers)
|
|
||||||
ids = ','.join(map(str, t2ids.values()))
|
|
||||||
sleeptime = round(1. / rate, 3)
|
sleeptime = round(1. / rate, 3)
|
||||||
_cache = {}
|
_cache = {}
|
||||||
|
|
||||||
while True: # use an event here to trigger exit?
|
async with quoter(client, tickers) as get_quotes:
|
||||||
prequote_start = time.time()
|
while True: # use an event here to trigger exit?
|
||||||
try:
|
prequote_start = time.time()
|
||||||
quotes_resp = await client.api.quotes(ids=ids)
|
quotes = await get_quotes(tickers)
|
||||||
except QuestradeError as qterr:
|
postquote_start = time.time()
|
||||||
if "Access token is invalid" in str(qterr.args[0]):
|
payload = []
|
||||||
# out-of-process piker may have renewed already
|
for quote in quotes:
|
||||||
client._reload_config()
|
|
||||||
quotes_resp = await client.api.quotes(ids=ids)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
postquote_start = time.time()
|
if quote['delay'] > 0:
|
||||||
quotes = quotes_resp['quotes']
|
log.warning(f"Delayed quote:\n{quote}")
|
||||||
payload = []
|
|
||||||
for quote in quotes:
|
|
||||||
|
|
||||||
if quote['delay'] > 0:
|
if diff_cached:
|
||||||
log.warning(f"Delayed quote:\n{quote}")
|
# if cache is enabled then only deliver "new" changes
|
||||||
|
symbol = quote['symbol']
|
||||||
if diff_cached:
|
last = _cache.setdefault(symbol, {})
|
||||||
# if cache is enabled then only deliver "new" changes
|
new = set(quote.items()) - set(last.items())
|
||||||
symbol = quote['symbol']
|
if new:
|
||||||
last = _cache.setdefault(symbol, {})
|
log.info(
|
||||||
new = set(quote.items()) - set(last.items())
|
f"New quote {quote['symbol']}:\n{new}")
|
||||||
if new:
|
_cache[symbol] = quote
|
||||||
log.info(
|
payload.append(quote)
|
||||||
f"New quote {quote['symbol']}:\n{new}")
|
else:
|
||||||
_cache[symbol] = quote
|
|
||||||
payload.append(quote)
|
payload.append(quote)
|
||||||
|
|
||||||
|
if payload:
|
||||||
|
q.put_nowait(payload)
|
||||||
|
|
||||||
|
req_time = round(postquote_start - prequote_start, 3)
|
||||||
|
proc_time = round(time.time() - postquote_start, 3)
|
||||||
|
tot = req_time + proc_time
|
||||||
|
log.debug(f"Request + processing took {req_time + proc_time}")
|
||||||
|
delay = sleeptime - (req_time + proc_time)
|
||||||
|
if delay <= 0:
|
||||||
|
log.warn(
|
||||||
|
f"Took {req_time} (request) + {proc_time} (processing) = {tot}"
|
||||||
|
f" secs (> {sleeptime}) for processing quotes?")
|
||||||
else:
|
else:
|
||||||
payload.append(quote)
|
log.debug(f"Sleeping for {delay}")
|
||||||
|
await trio.sleep(delay)
|
||||||
if payload:
|
|
||||||
q.put_nowait(payload)
|
|
||||||
|
|
||||||
req_time = round(postquote_start - prequote_start, 3)
|
|
||||||
proc_time = round(time.time() - postquote_start, 3)
|
|
||||||
tot = req_time + proc_time
|
|
||||||
log.debug(f"Request + processing took {req_time + proc_time}")
|
|
||||||
delay = sleeptime - (req_time + proc_time)
|
|
||||||
if delay <= 0:
|
|
||||||
log.warn(
|
|
||||||
f"Took {req_time} (request) + {proc_time} (processing) = {tot}"
|
|
||||||
f" secs (> {sleeptime}) for processing quotes?")
|
|
||||||
else:
|
|
||||||
log.debug(f"Sleeping for {delay}")
|
|
||||||
await trio.sleep(delay)
|
|
||||||
|
|
|
@ -271,3 +271,26 @@ async def get_client() -> Client:
|
||||||
yield client
|
yield client
|
||||||
finally:
|
finally:
|
||||||
write_conf(client)
|
write_conf(client)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def quoter(client: Client, tickers: [str]):
|
||||||
|
t2ids = await client.tickers2ids(tickers)
|
||||||
|
ids = ','.join(map(str, t2ids.values()))
|
||||||
|
|
||||||
|
async def get_quote(tickers):
|
||||||
|
"""Query for quotes using cached symbol ids.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
quotes_resp = await client.api.quotes(ids=ids)
|
||||||
|
except QuestradeError as qterr:
|
||||||
|
if "Access token is invalid" in str(qterr.args[0]):
|
||||||
|
# out-of-process piker may have renewed already
|
||||||
|
client._reload_config()
|
||||||
|
quotes_resp = await client.api.quotes(ids=ids)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
return quotes_resp['quotes']
|
||||||
|
|
||||||
|
yield get_quote
|
||||||
|
|
Loading…
Reference in New Issue