diff --git a/piker/brokers/data.py b/piker/brokers/data.py index af5c10b3..e449a770 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -146,22 +146,29 @@ async def fan_out_to_chans( async def request(): """Get quotes for current symbol subscription set. """ - return await get_quotes(list(symbols2chans.keys())) + symbols = list(symbols2chans.keys()) + if symbols: + # subscription can be changed at any time + return await get_quotes(symbols) + else: + return () async for quotes in stream_quotes( feed.mod, request, rate, diff_cached=diff_cached, ): chan_payloads = {} + payload = {} for quote in quotes: # is this too QT specific? symbol = quote['symbol'] + payload[symbol] = quote # set symbol quotes for each subscriber for chan, cid in symbols2chans.get(quote['key'], set()): chan_payloads.setdefault( chan, - {'yield': {}, 'cid': cid} - )['yield'][symbol] = quote + {'yield': payload, 'cid': cid} + ) # deliver to each subscriber (fan out) if chan_payloads: @@ -347,6 +354,12 @@ async def start_quote_stream( 'option', await feed.mod.option_quoter(feed.client, symbols) ) + payload = { + quote['symbol']: quote + for quote in await get_quotes(symbols) + } + # push initial smoke quote response for client initialization + await chan.send({'yield': payload, 'cid': cid}) try: # update map from each symbol to requesting client's chan modify_quote_stream(broker, feed_type, symbols, chan, cid)