From c7cf0cde9cafdf8e7b2819a6f4b05dc2c02b3f2b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 01:34:32 -0500 Subject: [PATCH] Add options streaming Well that was a doozy; had to rejig pretty much all of it. The deats: - Track broker components in a new `DataFeed` namedtuple - port to new list based batch quotes (not dicts any more) - lock access to cached broker-client / data-feed instantiation - respawn tasks that fail due to the network --- piker/brokers/data.py | 259 ++++++++++++++++++++++++++---------------- 1 file changed, 162 insertions(+), 97 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index b1427cab..bbbe5678 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -7,7 +7,10 @@ from itertools import cycle import socket import json from types import ModuleType -from typing import Coroutine, Callable, Dict, List +import typing +from typing import Coroutine, Callable, Dict, List, Any +import contextlib +from operator import itemgetter import trio import tractor @@ -56,12 +59,14 @@ async def stream_quotes( _cache = {} # ticker to quote caching while True: # use an event here to trigger exit? + prequote_start = time.time() - # tickers = list(tickers2chans.keys()) with trio.move_on_after(3) as cancel_scope: quotes = await request_quotes() + postquote_start = time.time() + cancelled = cancel_scope.cancelled_caught if cancelled: log.warn("Quote query timed out after 3 seconds, retrying...") @@ -69,18 +74,20 @@ async def stream_quotes( # quotes = await wait_for_network(partial(get_quotes, tickers)) quotes = await wait_for_network(request_quotes) - postquote_start = time.time() - new_quotes = {} + new_quotes = [] if diff_cached: - # if cache is enabled then only deliver "new" changes - for symbol, quote in quotes.items(): - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - new_quotes[symbol] = quote + # If cache is enabled then only deliver "new" changes. + # Useful for polling setups but obviously should be + # disabled if you're rx-ing event data. + for quote in quotes: + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + new_quotes.append(quote) else: new_quotes = quotes @@ -101,31 +108,51 @@ async def stream_quotes( await trio.sleep(delay) +class DataFeed(typing.NamedTuple): + """A per broker "data feed" container. + + A structure to keep track of components used by + real-time data daemons. + """ + mod: ModuleType + client: object + quoter_keys: List[str] = ['stock', 'option'] + tasks: Dict[str, trio._core._run.Task] = dict.fromkeys( + quoter_keys, False) + quoters: Dict[str, typing.Coroutine] = {} + subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} + + async def fan_out_to_chans( - brokermod: ModuleType, + feed: DataFeed, get_quotes: Coroutine, - tickers2chans: Dict[str, tractor.Channel], + symbols2chans: Dict[str, tractor.Channel], rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue cid: str = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. """ - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit - log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") async def request(): """Get quotes for current symbol subscription set. """ - return await get_quotes(list(tickers2chans.keys())) + return await get_quotes(list(symbols2chans.keys())) - async for quotes in stream_quotes(brokermod, request, rate): + async for quotes in stream_quotes( + feed.mod, request, rate, + diff_cached=diff_cached, + ): chan_payloads = {} - for symbol, quote in quotes.items(): + for quote in quotes: + # is this too QT specific? + symbol = quote['symbol'] # set symbol quotes for each subscriber - for chan, cid in tickers2chans.get(symbol, set()): + for chan, cid in symbols2chans.get(quote['key'], set()): chan_payloads.setdefault( chan, {'yield': {}, 'cid': cid} @@ -142,41 +169,21 @@ async def fan_out_to_chans( ConnectionRefusedError, ): log.warn(f"{chan} went down?") - for chanset in tickers2chans.values(): + for chanset in symbols2chans.values(): chanset.discard((chan, cid)) - if not any(tickers2chans.values()): - log.warn(f"No subs left for broker {brokermod.name}, exiting task") + if not any(symbols2chans.values()): + log.warn(f"No subs left for broker {feed.mod.name}, exiting task") break - log.info(f"Terminating stream quoter task for {brokermod.name}") + log.info(f"Terminating stream quoter task for {feed.mod.name}") -async def get_cached_client(broker, tickers): - """Get or create the current actor's cached broker client. - """ - # check if a cached client is in the local actor's statespace - clients = tractor.current_actor().statespace.setdefault('clients', {}) - try: - return clients[broker] - except KeyError: - log.info(f"Creating new client for broker {broker}") - brokermod = get_brokermod(broker) - # TODO: move to AsyncExitStack in 3.7 - client_cntxmng = brokermod.get_client() - client = await client_cntxmng.__aenter__() - get_quotes = await brokermod.quoter(client, tickers) - clients[broker] = ( - brokermod, client, client_cntxmng, get_quotes) - - return brokermod, client, client_cntxmng, get_quotes - - -async def symbol_data(broker, tickers): +async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ - _, client, _, get_quotes = await get_cached_client(broker, tickers) - return await client.symbol_data(tickers) + feed = await get_cached_feed(broker) + return await feed.client.symbol_data(tickers) async def smoke_quote(get_quotes, tickers, broker): @@ -192,8 +199,9 @@ async def smoke_quote(get_quotes, tickers, broker): # since the new client needs to know what symbols are accepted log.warn(f"Retrieving smoke quote for symbols {tickers}") quotes = await get_quotes(tickers) + # report any tickers that aren't returned in the first quote - invalid_tickers = set(tickers) - set(quotes) + invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes)) for symbol in invalid_tickers: tickers.remove(symbol) log.warn( @@ -202,7 +210,8 @@ async def smoke_quote(get_quotes, tickers, broker): # pop any tickers that return "empty" quotes payload = {} - for symbol, quote in quotes.items(): + for quote in quotes: + symbol = quote['symbol'] if quote is None: log.warn( f"Symbol `{symbol}` not found by broker" @@ -210,6 +219,12 @@ async def smoke_quote(get_quotes, tickers, broker): # XXX: not this mutates the input list (for now) tickers.remove(symbol) continue + + # report any unknown/invalid symbols (QT specific) + if quote.get('low52w', False) is None: + log.warn( + f"{symbol} seems to be defunct") + payload[symbol] = quote return payload @@ -218,23 +233,25 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -def modify_quote_stream(broker, tickers, chan=None, cid=None): +async def modify_quote_stream(broker, feed_type, tickers, chan=None, cid=None): """Absolute symbol subscription list for each quote stream. - Effectively a consumer subscription api. + Effectively a symbol subscription api. """ log.info(f"{chan} changed symbol subscription to {tickers}") - ss = tractor.current_actor().statespace - broker2tickersubs = ss['broker2tickersubs'] - tickers2chans = broker2tickersubs.get(broker) + feed = await get_cached_feed(broker) + symbols2chans = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan for ticker in tickers: - tickers2chans.setdefault(ticker, set()).add((chan, cid)) + symbols2chans.setdefault(ticker, set()).add((chan, cid)) + # remove any existing symbol subscriptions if symbol is not + # found in ``tickers`` + # TODO: this can likely be factored out into the pub-sub api for ticker in filter( - lambda ticker: ticker not in tickers, tickers2chans.copy() + lambda ticker: ticker not in tickers, symbols2chans.copy() ): - chanset = tickers2chans.get(ticker) + chanset = symbols2chans.get(ticker) # XXX: cid will be different on unsub call for item in chanset.copy(): if chan in item: @@ -242,12 +259,42 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): if not chanset: # pop empty sets which will trigger bg quoter task termination - tickers2chans.pop(ticker) + symbols2chans.pop(ticker) + + +async def get_cached_feed( + brokername: str, +) -> DataFeed: + """Get/create a ``DataFeed`` from/in the current actor. + """ + # check if a cached client is in the local actor's statespace + ss = tractor.current_actor().statespace + feeds = ss['feeds'] + lock = feeds['_lock'] + feed_stack = ss['feed_stacks'][brokername] + async with lock: + try: + feed = feeds[brokername] + log.info(f"Subscribing with existing `{brokername}` daemon") + return feed + except KeyError: + log.info(f"Creating new client for broker {brokername}") + brokermod = get_brokermod(brokername) + client = await feed_stack.enter_async_context( + brokermod.get_client()) + feed = DataFeed( + mod=brokermod, + client=client, + ) + feeds[brokername] = feed + return feed async def start_quote_stream( broker: str, - tickers: List[str], + symbols: List[Any], + feed_type: str = 'stock', + diff_cached: bool = True, chan: tractor.Channel = None, cid: str = None, ) -> None: @@ -263,57 +310,75 @@ async def start_quote_stream( get_console_log(actor.loglevel) # pull global vars from local actor ss = actor.statespace - broker2tickersubs = ss['broker2tickersubs'] - clients = ss['clients'] - dtasks = ss['dtasks'] - tickers = list(tickers) + # broker2symbolsubs = ss.setdefault('broker2symbolsubs', {}) + ss.setdefault('feeds', {'_lock': trio.Lock()}) + feed_stacks = ss.setdefault('feed_stacks', {}) + symbols = list(symbols) log.info( - f"{chan.uid} subscribed to {broker} for tickers {tickers}") + f"{chan.uid} subscribed to {broker} for symbols {symbols}") + feed_stack = feed_stacks.setdefault(broker, contextlib.AsyncExitStack()) + # another actor task may have already created it + feed = await get_cached_feed(broker) + symbols2chans = feed.subscriptions[feed_type] - brokermod, client, _, get_quotes = await get_cached_client(broker, tickers) - if broker not in broker2tickersubs: - tickers2chans = broker2tickersubs.setdefault(broker, {}) - else: - log.info(f"Subscribing with existing `{broker}` daemon") - tickers2chans = broker2tickersubs[broker] - - # do a smoke quote (note this mutates the input list and filters - # out bad symbols for now) - payload = await smoke_quote(get_quotes, tickers, broker) - # push initial smoke quote response for client initialization - await chan.send({'yield': payload, 'cid': cid}) + if feed_type == 'stock': + get_quotes = feed.quoters.setdefault( + 'stock', + await feed.mod.stock_quoter(feed.client, symbols) + ) + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) + payload = await smoke_quote(get_quotes, symbols, broker) + # push initial smoke quote response for client initialization + await chan.send({'yield': payload, 'cid': cid}) + elif feed_type == 'option': + # FIXME: yeah we need maybe a more general way to specify + # the arg signature for the option feed beasides a symbol + # + expiry date. + get_quotes = feed.quoters.setdefault( + 'option', + await feed.mod.option_quoter(feed.client, symbols) + ) # update map from each symbol to requesting client's chan - modify_quote_stream(broker, tickers, chan=chan, cid=cid) + await modify_quote_stream(broker, feed_type, symbols, chan, cid) try: - if broker not in dtasks: - # no quoter task yet so start a daemon task - log.info(f"Spawning quoter task for {brokermod.name}") - async with trio.open_nursery() as nursery: - nursery.start_soon(partial( - fan_out_to_chans, brokermod, get_quotes, tickers2chans, - cid=cid) - ) - dtasks.add(broker) - + if not feed.tasks.get(feed_type): + # no data feeder task yet; so start one + respawn = True + log.info(f"Spawning data feed task for {feed.mod.name}") + while respawn: + respawn = False + try: + async with trio.open_nursery() as nursery: + nursery.start_soon( + partial( + fan_out_to_chans, feed, get_quotes, + symbols2chans, + diff_cached=diff_cached, + cid=cid + ) + ) + feed.tasks[feed_type] = True + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True # unblocks when no more symbols subscriptions exist and the # quote streamer task terminates (usually because another call # was made to `modify_quoter` to unsubscribe from streaming # symbols) - log.info(f"Terminated quoter task for {brokermod.name}") - - # TODO: move to AsyncExitStack in 3.7 - for _, _, cntxmng, _ in clients.values(): - # FIXME: yes I know there's no error handling.. - await cntxmng.__aexit__(None, None, None) finally: + log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}") + feed.tasks.pop(feed_type) # if there are truly no more subscriptions with this broker # drop from broker subs dict - if not any(tickers2chans.values()): + if not any(symbols2chans.values()): log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker, None) - dtasks.discard(broker) + # broker2symbolsubs.pop(broker, None) + + # destroy the API client + await feed_stack.aclose() async def stream_to_file(