diff --git a/piker/brokers/core.py b/piker/brokers/core.py index b70f3a23..2dc1f809 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -7,7 +7,7 @@ import json from functools import partial import socket from types import ModuleType -from typing import AsyncContextManager +from typing import Coroutine import trio @@ -84,7 +84,7 @@ class StreamQueue: self._agen = self._iter_packets() async def _iter_packets(self): - """Get a packet from the underlying stream. + """Yield packets from the underlying stream. """ delim = self._delim buff = b'' @@ -128,6 +128,7 @@ class StreamQueue: async def poll_tickers( brokermod: ModuleType, + get_quotes: Coroutine, tickers2qs: {str: StreamQueue}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue @@ -146,91 +147,68 @@ async def poll_tickers( rate = broker_limit log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") - tickers = list(tickers2qs.keys()) + while True: # use an event here to trigger exit? + prequote_start = time.time() - async with brokermod.get_client() as client: - async with brokermod.quoter(client, tickers) as get_quotes: - # run a first quote smoke test filtering out any bad tickers - first_quotes_dict = await get_quotes(tickers) - valid_symbols = list(first_quotes_dict.keys())[:] + tickers = list(tickers2qs.keys()) + with trio.move_on_after(3) as cancel_scope: + quotes = await get_quotes(tickers) - for ticker in set(tickers) - set(valid_symbols): - tickers2qs.pop(ticker) + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn("Quote query timed out after 3 seconds, retrying...") + # handle network outages by idling until response is received + quotes = await wait_for_network(partial(get_quotes, tickers)) - # push intial quotes - q_payloads = {} - for symbol, quote in first_quotes_dict.items(): - if quote is None: - tickers2qs.pop(symbol) - continue + postquote_start = time.time() + q_payloads = {} + for symbol, quote in quotes.items(): + # FIXME: None is returned if a symbol can't be found. + # Consider filtering out such symbols before starting poll loop + if quote is None: + continue + + if diff_cached: + # if cache is enabled then only deliver "new" changes + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote + else: for queue in tickers2qs[symbol]: q_payloads.setdefault(queue, {})[symbol] = quote - if q_payloads: - for queue, payload in q_payloads.items(): - await queue.put(payload) + # deliver to each subscriber + if q_payloads: + for queue, payload in q_payloads.items(): + await queue.put(payload) - # assign valid symbol set - tickers = list(tickers2qs.keys()) - - while True: # use an event here to trigger exit? - prequote_start = time.time() - - with trio.move_on_after(3) as cancel_scope: - quotes = await get_quotes(valid_symbols) - - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.warn("Quote query timed out after 3 seconds, retrying...") - # handle network outages by idling until response is received - quotes = await wait_for_network(partial(get_quotes, tickers)) - - postquote_start = time.time() - q_payloads = {} - for symbol, quote in quotes.items(): - # FIXME: None is returned if a symbol can't be found. - # Consider filtering out such symbols before starting poll loop - if quote is None: - continue - - if diff_cached: - # if cache is enabled then only deliver "new" changes - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - for queue in tickers2qs[symbol]: - q_payloads.setdefault(queue, {})[symbol] = quote - else: - for queue in tickers2qs[symbol]: - q_payloads[queue] = {symbol: quote} - - # deliver to each subscriber - if q_payloads: - for queue, payload in q_payloads.items(): - await queue.put(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {tot}") - delay = sleeptime - tot - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) " - f"= {tot} secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) " + f"= {tot} secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) async def start_quoter(stream): """Handle per-broker quote stream subscriptions. + + Spawns new quoter tasks for each broker backend on-demand. """ broker2tickersubs = {} tickers2qs = {} + clients = {} queue = StreamQueue(stream) # wrap in a shabby queue-like api log.debug(f"Accepted new connection from {queue.peer}") @@ -240,27 +218,64 @@ async def start_quoter(stream): log.info( f"{queue.peer} subscribed to {broker} for tickers {tickers}") - if broker not in broker2tickersubs: # spawn quote streamer - tickers2qs = broker2tickersubs.setdefault(broker, {}) + if broker not in broker2tickersubs: + tickers2qs = broker2tickersubs.setdefault( + broker, {}.fromkeys(tickers, {queue,})) brokermod = get_brokermod(broker) log.info(f"Spawning quote streamer for broker {broker}") + + # TODO: move to AsyncExitStack in 3.7 + client = await brokermod.get_client().__aenter__() + get_quotes = await brokermod.quoter(client, tickers) + else: + brokermod, client, get_quotes = clients[broker] + tickers2qs = broker2tickersubs[broker] + # update map from each symbol to requesting client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + # remove stale ticker subscriptions + for ticker in set(tickers2qs) - set(tickers): + tickers2qs[ticker].remove(queue) + + # run a single quote filtering out any bad tickers + quotes = await get_quotes(tickers) + # pop any tickers that aren't returned in the first quote + for ticker in set(tickers) - set(quotes): + log.warn( + f"Symbol `{ticker}` not found by broker `{brokermod.name}`") + tickers2qs.pop(ticker) + + # pop any tickers that return "empty" quotes + payload = {} + for symbol, quote in quotes.items(): + if quote is None: + log.warn( + f"Symbol `{symbol}` not found by broker" + f" `{brokermod.name}`") + tickers2qs.pop(symbol, None) + continue + payload[symbol] = quote + + if broker not in clients: # no quoter task yet + clients[broker] = (brokermod, client, get_quotes) + # push initial quotes response for client initialization + await queue.put(payload) + # task should begin on the next checkpoint/iteration - nursery.start_soon(poll_tickers, brokermod, tickers2qs) - - # create map from each symbol to consuming client queues - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) - - # remove queue from any ticker subscriptions it no longer wants - for ticker in set(tickers2qs) - set(tickers): - tickers2qs[ticker].remove(queue) + log.info(f"Spawning quoter task for {brokermod.name}") + nursery.start_soon( + poll_tickers, brokermod, get_quotes, tickers2qs) else: log.info(f"{queue.peer} was disconnected") nursery.cancel_scope.cancel() + # TODO: move to AsyncExitStack in 3.7 + for _, client, _ in clients.values(): + await client.__aexit__() + async def _daemon_main(brokermod): - """Entry point for the piker daemon. + """Entry point for the broker daemon. """ async with trio.open_nursery() as nursery: listeners = await nursery.start(