Filter symbols and push initial quote in stream handler

Filter out bad symbols by processing an initial batch quote and
pushing to the subscribing client before spawning a quoter task.
This also avoids exposing the quoter task to anything but the
broker module and a `get_quotes()` routine.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-04-18 01:30:22 -04:00
parent 02a71c51ba
commit 030ecdcce8
1 changed files with 100 additions and 85 deletions

View File

@ -7,7 +7,7 @@ import json
from functools import partial from functools import partial
import socket import socket
from types import ModuleType from types import ModuleType
from typing import AsyncContextManager from typing import Coroutine
import trio import trio
@ -84,7 +84,7 @@ class StreamQueue:
self._agen = self._iter_packets() self._agen = self._iter_packets()
async def _iter_packets(self): async def _iter_packets(self):
"""Get a packet from the underlying stream. """Yield packets from the underlying stream.
""" """
delim = self._delim delim = self._delim
buff = b'' buff = b''
@ -128,6 +128,7 @@ class StreamQueue:
async def poll_tickers( async def poll_tickers(
brokermod: ModuleType, brokermod: ModuleType,
get_quotes: Coroutine,
tickers2qs: {str: StreamQueue}, tickers2qs: {str: StreamQueue},
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
@ -146,91 +147,68 @@ async def poll_tickers(
rate = broker_limit rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
tickers = list(tickers2qs.keys()) while True: # use an event here to trigger exit?
prequote_start = time.time()
async with brokermod.get_client() as client: tickers = list(tickers2qs.keys())
async with brokermod.quoter(client, tickers) as get_quotes: with trio.move_on_after(3) as cancel_scope:
# run a first quote smoke test filtering out any bad tickers quotes = await get_quotes(tickers)
first_quotes_dict = await get_quotes(tickers)
valid_symbols = list(first_quotes_dict.keys())[:]
for ticker in set(tickers) - set(valid_symbols): cancelled = cancel_scope.cancelled_caught
tickers2qs.pop(ticker) if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...")
# handle network outages by idling until response is received
quotes = await wait_for_network(partial(get_quotes, tickers))
# push intial quotes postquote_start = time.time()
q_payloads = {} q_payloads = {}
for symbol, quote in first_quotes_dict.items(): for symbol, quote in quotes.items():
if quote is None: # FIXME: None is returned if a symbol can't be found.
tickers2qs.pop(symbol) # Consider filtering out such symbols before starting poll loop
continue if quote is None:
continue
if diff_cached:
# if cache is enabled then only deliver "new" changes
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
for queue in tickers2qs[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote
else:
for queue in tickers2qs[symbol]: for queue in tickers2qs[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote q_payloads.setdefault(queue, {})[symbol] = quote
if q_payloads: # deliver to each subscriber
for queue, payload in q_payloads.items(): if q_payloads:
await queue.put(payload) for queue, payload in q_payloads.items():
await queue.put(payload)
# assign valid symbol set req_time = round(postquote_start - prequote_start, 3)
tickers = list(tickers2qs.keys()) proc_time = round(time.time() - postquote_start, 3)
tot = req_time + proc_time
while True: # use an event here to trigger exit? log.debug(f"Request + processing took {tot}")
prequote_start = time.time() delay = sleeptime - tot
if delay <= 0:
with trio.move_on_after(3) as cancel_scope: log.warn(
quotes = await get_quotes(valid_symbols) f"Took {req_time} (request) + {proc_time} (processing) "
f"= {tot} secs (> {sleeptime}) for processing quotes?")
cancelled = cancel_scope.cancelled_caught else:
if cancelled: log.debug(f"Sleeping for {delay}")
log.warn("Quote query timed out after 3 seconds, retrying...") await trio.sleep(delay)
# handle network outages by idling until response is received
quotes = await wait_for_network(partial(get_quotes, tickers))
postquote_start = time.time()
q_payloads = {}
for symbol, quote in quotes.items():
# FIXME: None is returned if a symbol can't be found.
# Consider filtering out such symbols before starting poll loop
if quote is None:
continue
if diff_cached:
# if cache is enabled then only deliver "new" changes
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
for queue in tickers2qs[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote
else:
for queue in tickers2qs[symbol]:
q_payloads[queue] = {symbol: quote}
# deliver to each subscriber
if q_payloads:
for queue, payload in q_payloads.items():
await queue.put(payload)
req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3)
tot = req_time + proc_time
log.debug(f"Request + processing took {tot}")
delay = sleeptime - tot
if delay <= 0:
log.warn(
f"Took {req_time} (request) + {proc_time} (processing) "
f"= {tot} secs (> {sleeptime}) for processing quotes?")
else:
log.debug(f"Sleeping for {delay}")
await trio.sleep(delay)
async def start_quoter(stream): async def start_quoter(stream):
"""Handle per-broker quote stream subscriptions. """Handle per-broker quote stream subscriptions.
Spawns new quoter tasks for each broker backend on-demand.
""" """
broker2tickersubs = {} broker2tickersubs = {}
tickers2qs = {} tickers2qs = {}
clients = {}
queue = StreamQueue(stream) # wrap in a shabby queue-like api queue = StreamQueue(stream) # wrap in a shabby queue-like api
log.debug(f"Accepted new connection from {queue.peer}") log.debug(f"Accepted new connection from {queue.peer}")
@ -240,27 +218,64 @@ async def start_quoter(stream):
log.info( log.info(
f"{queue.peer} subscribed to {broker} for tickers {tickers}") f"{queue.peer} subscribed to {broker} for tickers {tickers}")
if broker not in broker2tickersubs: # spawn quote streamer if broker not in broker2tickersubs:
tickers2qs = broker2tickersubs.setdefault(broker, {}) tickers2qs = broker2tickersubs.setdefault(
broker, {}.fromkeys(tickers, {queue,}))
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
log.info(f"Spawning quote streamer for broker {broker}") log.info(f"Spawning quote streamer for broker {broker}")
# TODO: move to AsyncExitStack in 3.7
client = await brokermod.get_client().__aenter__()
get_quotes = await brokermod.quoter(client, tickers)
else:
brokermod, client, get_quotes = clients[broker]
tickers2qs = broker2tickersubs[broker]
# update map from each symbol to requesting client's queue
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# remove stale ticker subscriptions
for ticker in set(tickers2qs) - set(tickers):
tickers2qs[ticker].remove(queue)
# run a single quote filtering out any bad tickers
quotes = await get_quotes(tickers)
# pop any tickers that aren't returned in the first quote
for ticker in set(tickers) - set(quotes):
log.warn(
f"Symbol `{ticker}` not found by broker `{brokermod.name}`")
tickers2qs.pop(ticker)
# pop any tickers that return "empty" quotes
payload = {}
for symbol, quote in quotes.items():
if quote is None:
log.warn(
f"Symbol `{symbol}` not found by broker"
f" `{brokermod.name}`")
tickers2qs.pop(symbol, None)
continue
payload[symbol] = quote
if broker not in clients: # no quoter task yet
clients[broker] = (brokermod, client, get_quotes)
# push initial quotes response for client initialization
await queue.put(payload)
# task should begin on the next checkpoint/iteration # task should begin on the next checkpoint/iteration
nursery.start_soon(poll_tickers, brokermod, tickers2qs) log.info(f"Spawning quoter task for {brokermod.name}")
nursery.start_soon(
# create map from each symbol to consuming client queues poll_tickers, brokermod, get_quotes, tickers2qs)
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# remove queue from any ticker subscriptions it no longer wants
for ticker in set(tickers2qs) - set(tickers):
tickers2qs[ticker].remove(queue)
else: else:
log.info(f"{queue.peer} was disconnected") log.info(f"{queue.peer} was disconnected")
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
# TODO: move to AsyncExitStack in 3.7
for _, client, _ in clients.values():
await client.__aexit__()
async def _daemon_main(brokermod): async def _daemon_main(brokermod):
"""Entry point for the piker daemon. """Entry point for the broker daemon.
""" """
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
listeners = await nursery.start( listeners = await nursery.start(