Merge pull request #31 from pikers/tolerate_the_network

Tolerate the network
kivy_mainline_and_py3.8
goodboy 2018-04-02 16:48:49 -04:00 committed by GitHub
commit 397c27e05a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 20 deletions

View File

@ -3,6 +3,8 @@ Core broker-daemon tasks and API.
""" """
import time import time
import inspect import inspect
from functools import partial
import socket
from types import ModuleType from types import ModuleType
from typing import AsyncContextManager from typing import AsyncContextManager
@ -44,6 +46,21 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict:
return results return results
async def wait_for_network(get_quotes, sleep=1):
"""Wait until the network comes back up.
"""
while True:
try:
with trio.move_on_after(1) as cancel_scope:
return await get_quotes()
if cancel_scope.cancelled_caught:
log.warn("Quote query timed out")
continue
except socket.gaierror:
log.warn(f"Network is down waiting for reestablishment...")
await trio.sleep(sleep)
async def poll_tickers( async def poll_tickers(
client: 'Client', client: 'Client',
quoter: AsyncContextManager, quoter: AsyncContextManager,
@ -64,30 +81,35 @@ async def poll_tickers(
async with quoter(client, tickers) as get_quotes: async with quoter(client, tickers) as get_quotes:
while True: # use an event here to trigger exit? while True: # use an event here to trigger exit?
prequote_start = time.time() prequote_start = time.time()
quotes = await get_quotes(tickers)
with trio.move_on_after(3) as cancel_scope:
quotes = await get_quotes(tickers)
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...")
# handle network outages by idling until response is received
quotes = await wait_for_network(partial(get_quotes, tickers))
postquote_start = time.time() postquote_start = time.time()
payload = [] payload = {}
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
# FIXME: None is returned if a symbol can't be found. # FIXME: None is returned if a symbol can't be found.
# Consider filtering out such symbols before starting poll loop # Consider filtering out such symbols before starting poll loop
if quote is None: if quote is None:
continue continue
if quote.get('delay', 0) > 0:
log.warning(f"Delayed quote:\n{quote}")
if diff_cached: if diff_cached:
# if cache is enabled then only deliver "new" changes # if cache is enabled then only deliver "new" changes
symbol = quote['symbol']
last = _cache.setdefault(symbol, {}) last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items()) new = set(quote.items()) - set(last.items())
if new: if new:
log.info( log.info(
f"New quote {quote['symbol']}:\n{new}") f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote _cache[symbol] = quote
payload.append(quote) payload[symbol] = quote
else: else:
payload.append(quote) payload[symbol] = quote
if payload: if payload:
q.put_nowait(payload) q.put_nowait(payload)

View File

@ -304,7 +304,24 @@ async def quoter(client: Client, tickers: [str]):
else: else:
raise raise
return {quote['symbol']: quote for quote in quotes_resp['quotes']} # dict packing and post-processing
quotes = {}
for quote in quotes_resp['quotes']:
quotes[quote['symbol']] = quote
if quote.get('delay', 0) > 0:
log.warning(f"Delayed quote:\n{quote}")
return quotes
first_quotes_dict = await get_quote(tickers)
for symbol, quote in first_quotes_dict.items():
if quote['low52w'] is None:
log.warn(f"{symbol} seems to be defunct discarding from tickers")
t2ids.pop(symbol)
# re-save symbol ids cache
ids = ','.join(map(str, t2ids.values()))
yield get_quote yield get_quote

View File

@ -349,10 +349,8 @@ async def update_quotes(
grid.quote_cache = cache grid.quote_cache = cache
# initial coloring # initial coloring
for quote in first_quotes: for sym, quote in first_quotes.items():
sym = quote['symbol']
row = grid.symbols2rows[sym] row = grid.symbols2rows[sym]
# record, displayable = qtconvert(quote, symbol_data=symbol_data)
record, displayable = brokermod.format_quote( record, displayable = brokermod.format_quote(
quote, symbol_data=symbol_data) quote, symbol_data=symbol_data)
row.update(record, displayable) row.update(record, displayable)
@ -365,12 +363,11 @@ async def update_quotes(
while True: while True:
log.debug("Waiting on quotes") log.debug("Waiting on quotes")
quotes = await queue.get() # new quotes data only quotes = await queue.get() # new quotes data only
for quote in quotes: for symbol, quote in quotes.items():
# record, displayable = qtconvert(quote, symbol_data=symbol_data)
record, displayable = brokermod.format_quote( record, displayable = brokermod.format_quote(
quote, symbol_data=symbol_data) quote, symbol_data=symbol_data)
row = grid.symbols2rows[record['symbol']] row = grid.symbols2rows[symbol]
cache[record['symbol']] = (record, row) cache[symbol] = (record, row)
row.update(record, displayable) row.update(record, displayable)
color_row(row, record) color_row(row, record)
@ -401,11 +398,10 @@ async def _async_main(name, tickers, brokermod, rate):
) )
# get first quotes response # get first quotes response
pkts = await queue.get() quotes = await queue.get()
first_quotes = [ first_quotes = [
# qtconvert(quote, symbol_data=sd)[0] for quote in pkts]
brokermod.format_quote(quote, symbol_data=sd)[0] brokermod.format_quote(quote, symbol_data=sd)[0]
for quote in pkts] for quote in quotes.values()]
if first_quotes[0].get('last') is None: if first_quotes[0].get('last') is None:
log.error("Broker API is down temporarily") log.error("Broker API is down temporarily")
@ -463,4 +459,4 @@ async def _async_main(name, tickers, brokermod, rate):
} }
nursery.start_soon(run_kivy, widgets['root'], nursery) nursery.start_soon(run_kivy, widgets['root'], nursery)
nursery.start_soon( nursery.start_soon(
update_quotes, brokermod, widgets, queue, sd, pkts) update_quotes, brokermod, widgets, queue, sd, quotes)