From e91a50a1bad4f715c942887d021e7313c52ff6b4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Feb 2019 23:40:51 -0500 Subject: [PATCH] Make `get_cached_feed()` an asynccontextmanager Adjust feed locking around internal manager `yields` to make this work. Also, change quote publisher to deliver a list of quotes for each retrieved batch. This was actually broken for option streaming since each quote was being overwritten due to a common `key` value for all expiries. Asjust the `packetizer` function accordingly to work for both options and stocks. --- piker/brokers/data.py | 120 +++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 59 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 09847615..3c4cb944 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -15,6 +15,7 @@ from operator import itemgetter import trio import tractor +from async_generator import asynccontextmanager from ..log import get_logger, get_console_log from . import get_brokermod @@ -127,11 +128,16 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - new_quotes[quote['key']] = quote + # XXX: we append to a list for the options case where the + # subscription topic (key) is the same for all + # expiries even though this is uncessary for the + # stock case (different topic [i.e. symbol] for each + # quote). + new_quotes.setdefault(quote['key'], []).append(quote) else: log.info(f"Delivering quotes:\n{quotes}") for quote in quotes: - new_quotes[quote['symbol']] = quote + new_quotes.setdefault(quote['key'], []).append(quote) yield new_quotes @@ -153,8 +159,8 @@ async def stream_quotes( async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ - feed = await get_cached_feed(broker) - return await feed.client.symbol_data(tickers) + async with get_cached_feed(broker) as feed: + return await feed.client.symbol_data(tickers) async def smoke_quote(get_quotes, tickers, broker): @@ -193,7 +199,7 @@ async def smoke_quote(get_quotes, tickers, broker): # report any unknown/invalid symbols (QT specific) if quote.get('low52w', False) is None: - log.warn( + log.error( f"{symbol} seems to be defunct") payload[symbol] = quote @@ -204,6 +210,7 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### +@asynccontextmanager async def get_cached_feed( brokername: str, ) -> BrokerFeed: @@ -213,24 +220,29 @@ async def get_cached_feed( ss = tractor.current_actor().statespace feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) lock = feeds['_lock'] - async with lock: + try: try: - feed = feeds[brokername] - log.info(f"Subscribing with existing `{brokername}` daemon") - return feed + async with lock: + feed = feeds[brokername] + log.info(f"Subscribing with existing `{brokername}` daemon") + yield feed except KeyError: - log.info(f"Creating new client for broker {brokername}") - brokermod = get_brokermod(brokername) - exit_stack = contextlib.AsyncExitStack() - client = await exit_stack.enter_async_context( - brokermod.get_client()) - feed = BrokerFeed( - mod=brokermod, - client=client, - exit_stack=exit_stack, - ) - feeds[brokername] = feed - return feed + async with lock: + log.info(f"Creating new client for broker {brokername}") + brokermod = get_brokermod(brokername) + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( + brokermod.get_client()) + feed = BrokerFeed( + mod=brokermod, + client=client, + exit_stack=exit_stack, + ) + feeds[brokername] = feed + yield feed + finally: + # destroy the API client + await feed.exit_stack.aclose() async def start_quote_stream( @@ -256,40 +268,39 @@ async def start_quote_stream( log.info( f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") # another actor task may have already created it - feed = await get_cached_feed(broker) - symbols2ctxs = feed.subscriptions[feed_type] - packetizer = None + async with get_cached_feed(broker) as feed: + # function to format packets delivered to subscribers + packetizer = None - if feed_type == 'stock': - get_quotes = feed.quoters.setdefault( - 'stock', - await feed.mod.stock_quoter(feed.client, symbols) - ) - # do a smoke quote (note this mutates the input list and filters - # out bad symbols for now) - payload = await smoke_quote(get_quotes, symbols, broker) + if feed_type == 'stock': + get_quotes = feed.quoters.setdefault( + 'stock', + await feed.mod.stock_quoter(feed.client, symbols) + ) + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) + payload = await smoke_quote(get_quotes, symbols, broker) - elif feed_type == 'option': - # FIXME: yeah we need maybe a more general way to specify - # the arg signature for the option feed beasides a symbol - # + expiry date. - get_quotes = feed.quoters.setdefault( - 'option', - await feed.mod.option_quoter(feed.client, symbols) - ) - # packetize - payload = { - quote['symbol']: quote - for quote in await get_quotes(symbols) - } + elif feed_type == 'option': + # FIXME: yeah we need maybe a more general way to specify + # the arg signature for the option feed beasides a symbol + # + expiry date. + get_quotes = feed.quoters.setdefault( + 'option', + await feed.mod.option_quoter(feed.client, symbols) + ) + # packetize + payload = { + quote['symbol']: quote + for quote in await get_quotes(symbols) + } - def packetizer(topic, quote): - return {quote['symbol']: quote} + def packetizer(topic, quotes): + return {quote['symbol']: quote for quote in quotes} - # push initial smoke quote response for client initialization - await ctx.send_yield(payload) + # push initial smoke quote response for client initialization + await ctx.send_yield(payload) - try: await stream_quotes( # pub required kwargs @@ -306,14 +317,6 @@ async def start_quote_stream( ) log.info( f"Terminating stream quoter task for {feed.mod.name}") - finally: - # if there are truly no more subscriptions with this broker - # drop from broker subs dict - if not any(symbols2ctxs.values()): - log.info(f"No more subscriptions for broker {broker}") - - # destroy the API client - await feed.exit_stack.aclose() class DataFeed: @@ -377,7 +380,6 @@ class DataFeed: # get first quotes response log.debug(f"Waiting on first quote for {symbols}...") quotes = {} - # with trio.move_on_after(5): quotes = await quote_gen.__anext__() self.quote_gen = quote_gen