diff --git a/piker/brokers/data.py b/piker/brokers/data.py index b076ef9c..f5cff4a8 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -159,17 +159,16 @@ async def fan_out_to_chans( ): chan_payloads = {} for quote in quotes: - # set symbol quotes for each subscriber - # for chan, cid in symbols2chans.get(quote['key'], set()): - for chan, cid in symbols2chans[quote['key']]: + packet = {quote['symbol']: quote} + for chan, cid in symbols2chans.get(quote['key'], set()): chan_payloads.setdefault( - chan, + (chan, cid), {'yield': {}, 'cid': cid} - )['yield'].update({quote['symbol']: quote}) + )['yield'].update(packet) # deliver to each subscriber (fan out) if chan_payloads: - for chan, payload in chan_payloads.items(): + for (chan, cid), payload in chan_payloads.items(): try: await chan.send(payload) except ( @@ -269,7 +268,7 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid): chanset = symbols2chans.get(ticker) # XXX: cid will be different on unsub call for item in chanset.copy(): - if chan in item: + if (chan, cid) == item: chanset.discard(item) if not chanset: