diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 5cf6abea..b1427cab 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -3,9 +3,11 @@ Live data feed machinery """ import time from functools import partial +from itertools import cycle import socket +import json from types import ModuleType -from typing import Coroutine, Callable, Dict +from typing import Coroutine, Callable, Dict, List import trio import tractor @@ -14,7 +16,7 @@ from ..log import get_logger, get_console_log from . import get_brokermod -log = get_logger('broker.core') +log = get_logger('broker.data') async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: @@ -40,11 +42,9 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def stream_quotes( brokermod: ModuleType, - get_quotes: Coroutine, - tickers2chans: Dict[str, tractor.Channel], + request_quotes: Coroutine, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue - cid: str = None, ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` per second. @@ -52,53 +52,84 @@ async def stream_quotes( A stock-broker client ``get_quotes()`` async context manager must be provided which returns an async quote retrieval function. """ - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") - sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching while True: # use an event here to trigger exit? prequote_start = time.time() - if not any(tickers2chans.values()): - log.warn(f"No subs left for broker {brokermod.name}, exiting task") - break - - tickers = list(tickers2chans.keys()) + # tickers = list(tickers2chans.keys()) with trio.move_on_after(3) as cancel_scope: - quotes = await get_quotes(tickers) + quotes = await request_quotes() cancelled = cancel_scope.cancelled_caught if cancelled: log.warn("Quote query timed out after 3 seconds, retrying...") # handle network outages by idling until response is received - quotes = await wait_for_network(partial(get_quotes, tickers)) + # quotes = await wait_for_network(partial(get_quotes, tickers)) + quotes = await wait_for_network(request_quotes) postquote_start = time.time() + new_quotes = {} + if diff_cached: + # if cache is enabled then only deliver "new" changes + for symbol, quote in quotes.items(): + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + new_quotes[symbol] = quote + else: + new_quotes = quotes + + yield new_quotes + + # latency monitoring + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) " + f"= {tot} secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) + + +async def fan_out_to_chans( + brokermod: ModuleType, + get_quotes: Coroutine, + tickers2chans: Dict[str, tractor.Channel], + rate: int = 5, # delay between quote requests + diff_cached: bool = True, # only deliver "new" quotes to the queue + cid: str = None, +) -> None: + """Request and fan out quotes to each subscribed actor channel. + """ + broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + + async def request(): + """Get quotes for current symbol subscription set. + """ + return await get_quotes(list(tickers2chans.keys())) + + async for quotes in stream_quotes(brokermod, request, rate): chan_payloads = {} for symbol, quote in quotes.items(): - if diff_cached: - # if cache is enabled then only deliver "new" changes - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - for chan, cid in tickers2chans.get(symbol, set()): - chan_payloads.setdefault( - chan, - {'yield': {}, 'cid': cid} - )['yield'][symbol] = quote - else: - for chan, cid in tickers2chans[symbol]: - chan_payloads.setdefault( - chan, - {'yield': {}, 'cid': cid} - )['yield'][symbol] = quote + # set symbol quotes for each subscriber + for chan, cid in tickers2chans.get(symbol, set()): + chan_payloads.setdefault( + chan, + {'yield': {}, 'cid': cid} + )['yield'][symbol] = quote # deliver to each subscriber (fan out) if chan_payloads: @@ -114,19 +145,9 @@ async def stream_quotes( for chanset in tickers2chans.values(): chanset.discard((chan, cid)) - # latency monitoring - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {tot}") - delay = sleeptime - tot - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) " - f"= {tot} secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) + if not any(tickers2chans.values()): + log.warn(f"No subs left for broker {brokermod.name}, exiting task") + break log.info(f"Terminating stream quoter task for {brokermod.name}") @@ -226,7 +247,7 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): async def start_quote_stream( broker: str, - tickers: [str], + tickers: List[str], chan: tractor.Channel = None, cid: str = None, ) -> None: @@ -256,8 +277,8 @@ async def start_quote_stream( log.info(f"Subscribing with existing `{broker}` daemon") tickers2chans = broker2tickersubs[broker] - # do a smoke quote (note this mutates the input list and filters out bad - # symbols for now) + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) payload = await smoke_quote(get_quotes, tickers, broker) # push initial smoke quote response for client initialization await chan.send({'yield': payload, 'cid': cid}) @@ -271,7 +292,7 @@ async def start_quote_stream( log.info(f"Spawning quoter task for {brokermod.name}") async with trio.open_nursery() as nursery: nursery.start_soon(partial( - stream_quotes, brokermod, get_quotes, tickers2chans, + fan_out_to_chans, brokermod, get_quotes, tickers2chans, cid=cid) ) dtasks.add(broker) @@ -293,3 +314,40 @@ async def start_quote_stream( log.info(f"No more subscriptions for {broker}") broker2tickersubs.pop(broker, None) dtasks.discard(broker) + + +async def stream_to_file( + watchlist_name: str, + filename: str, + portal: tractor._portal.Portal, + tickers: List[str], + brokermod: ModuleType, + rate: int, +): + """Record client side received quotes to file ``filename``. + """ + # an async generator instance + agen = await portal.run( + "piker.brokers.data", 'start_quote_stream', + broker=brokermod.name, tickers=tickers) + + fname = filename or f'{watchlist_name}.jsonstream' + with open(fname, 'a') as f: + async for quotes in agen: + f.write(json.dumps(quotes)) + f.write('\n--\n') + + return fname + + +async def stream_from_file( + filename: str, +): + with open(filename, 'r') as quotes_file: + content = quotes_file.read() + + pkts = content.split('--')[:-1] # simulate 2 separate quote packets + payloads = [json.loads(pkt) for pkt in pkts] + for payload in cycle(payloads): + yield payload + await trio.sleep(0.3)