From 37607d61caf7c09588a15c503d2760fab88fa9cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Aug 2020 15:23:35 -0400 Subject: [PATCH] Port `DataFeed` api to broker specific normalizer routine --- piker/brokers/data.py | 139 ++++++++---------------------------------- 1 file changed, 24 insertions(+), 115 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index dbb76772..ea2076bb 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -81,10 +81,10 @@ class BrokerFeed: @tractor.msg.pub(tasks=['stock', 'option']) async def stream_poll_requests( - get_topics: typing.Callable, + get_topics: Callable, get_quotes: Coroutine, + normalizer: Callable, rate: int = 3, # delay between quote requests - diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: """Stream requests for quotes for a set of symbols at the given ``rate`` (per second). @@ -129,59 +129,15 @@ async def stream_poll_requests( quotes = await wait_for_network(request_quotes) new_quotes = {} - if diff_cached: - # If cache is enabled then only deliver "new" changes. - # Useful for polling setups but obviously should be - # disabled if you're rx-ing per-tick data. - for quote in quotes: - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - last_volume = last.get('volume', 0) - # find all keys that have match to a new value compared - # to the last quote received - new = set(quote.items()) - set(last.items()) - if new: - log.info(f"New quote {symbol}:\n{new}") - _cache[symbol] = quote - - # only ship diff updates and other required fields - payload = {k: quote[k] for k, v in new} - payload['symbol'] = symbol - - # if there was volume likely the last size of - # shares traded is useful info and it's possible - # that the set difference from above will disregard - # a "size" value since the same # of shares were traded - volume = payload.get('volume') - if volume: - volume_since_last_quote = volume - last_volume - assert volume_since_last_quote > 0 - payload['volume_delta'] = volume_since_last_quote - - # TODO: We can emit 2 ticks here: - # - one for the volume differential - # - one for the last known trade size - # The first in theory can be unwound and - # interpolated assuming the broker passes an - # accurate daily VWAP value. - # To make this work we need a universal ``size`` - # field that is normalized before hitting this logic. - # XXX: very questrade specific - payload['size'] = quote['lastTradeSize'] - - log.info(f"New paylod {symbol}:\n{payload}") - - # XXX: we append to a list for the options case where the - # subscription topic (key) is the same for all - # expiries even though this is uncessary for the - # stock case (different topic [i.e. symbol] for each - # quote). - new_quotes.setdefault(quote['key'], []).append(payload) - else: - # log.debug(f"Delivering quotes:\n{quotes}") - for quote in quotes: - new_quotes.setdefault(quote['key'], []).append(quote) + normalized = normalizer(quotes, _cache) + for symbol, quote in normalized.items(): + # XXX: we append to a list for the options case where the + # subscription topic (key) is the same for all + # expiries even though this is uncessary for the + # stock case (different topic [i.e. symbol] for each + # quote). + new_quotes.setdefault(quote['key'], []).append(quote) if new_quotes: yield new_quotes @@ -208,53 +164,6 @@ async def symbol_data(broker: str, tickers: List[str]): return await feed.client.symbol_info(tickers) -async def smoke_quote(get_quotes, tickers, broker): - """Do an initial "smoke" request for symbols in ``tickers`` filtering - out any symbols not supported by the broker queried in the call to - ``get_quotes()``. - """ - # TODO: trim out with #37 - ################################################# - # get a single quote filtering out any bad tickers - # NOTE: this code is always run for every new client - # subscription even when a broker quoter task is already running - # since the new client needs to know what symbols are accepted - log.warn(f"Retrieving smoke quote for symbols {tickers}") - quotes = await get_quotes(tickers) - - # report any tickers that aren't returned in the first quote - invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes)) - for symbol in invalid_tickers: - tickers.remove(symbol) - log.warn( - f"Symbol `{symbol}` not found by broker `{broker}`" - ) - - # pop any tickers that return "empty" quotes - payload = {} - for quote in quotes: - symbol = quote['symbol'] - if quote is None: - log.warn( - f"Symbol `{symbol}` not found by broker" - f" `{broker}`") - # XXX: not this mutates the input list (for now) - tickers.remove(symbol) - continue - - # report any unknown/invalid symbols (QT specific) - if quote.get('low52w', False) is None: - log.error( - f"{symbol} seems to be defunct") - - payload[symbol] = quote - - return payload - - # end of section to be trimmed out with #37 - ########################################### - - @asynccontextmanager async def get_cached_feed( brokername: str, @@ -297,7 +206,6 @@ async def start_quote_stream( broker: str, symbols: List[Any], feed_type: str = 'stock', - diff_cached: bool = True, rate: int = 3, ) -> None: """Handle per-broker quote stream subscriptions using a "lazy" pub-sub @@ -316,8 +224,6 @@ async def start_quote_stream( f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") # another actor task may have already created it async with get_cached_feed(broker) as feed: - # function to format packets delivered to subscribers - packetizer = None if feed_type == 'stock': get_quotes = feed.quoters.setdefault( @@ -326,7 +232,7 @@ async def start_quote_stream( ) # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) - payload = await smoke_quote(get_quotes, symbols, broker) + first_quotes = await feed.mod.smoke_quote(get_quotes, symbols) formatter = feed.mod.format_stock_quote elif feed_type == 'option': @@ -338,22 +244,27 @@ async def start_quote_stream( await feed.mod.option_quoter(feed.client, symbols) ) # packetize - payload = { + first_quotes = { quote['symbol']: quote for quote in await get_quotes(symbols) } formatter = feed.mod.format_option_quote sd = await feed.client.symbol_info(symbols) - # formatter = partial(formatter, symbol_data=sd) + feed.mod._symbol_info_cache.update(sd) - packetizer = partial( - feed.mod.packetizer, + normalize = partial( + feed.mod.normalize, formatter=formatter, - symbol_data=sd, ) - # push initial smoke quote response for client initialization + # pre-process first set of quotes + payload = {} + for sym, quote in first_quotes.items(): + fquote, _ = formatter(quote, sd) + assert fquote['displayable'] + payload[sym] = fquote + await ctx.send_yield(payload) await stream_poll_requests( @@ -362,11 +273,11 @@ async def start_quote_stream( task_name=feed_type, ctx=ctx, topics=symbols, - packetizer=packetizer, + packetizer=feed.mod.packetizer, # actual func args get_quotes=get_quotes, - diff_cached=diff_cached, + normalizer=normalize, rate=rate, ) log.info( @@ -401,7 +312,6 @@ class DataFeed: symbols: Sequence[str], feed_type: str, rate: int = 1, - diff_cached: bool = True, test: str = '', ) -> (AsyncGenerator, dict): if feed_type not in self._allowed: @@ -445,7 +355,6 @@ class DataFeed: broker=self.brokermod.name, symbols=symbols, feed_type=feed_type, - diff_cached=diff_cached, rate=rate, )