Symbol subs must be cid specific
parent
11222e1176
commit
eb8c9e1a99
|
@ -159,17 +159,16 @@ async def fan_out_to_chans(
|
||||||
):
|
):
|
||||||
chan_payloads = {}
|
chan_payloads = {}
|
||||||
for quote in quotes:
|
for quote in quotes:
|
||||||
# set symbol quotes for each subscriber
|
packet = {quote['symbol']: quote}
|
||||||
# for chan, cid in symbols2chans.get(quote['key'], set()):
|
for chan, cid in symbols2chans.get(quote['key'], set()):
|
||||||
for chan, cid in symbols2chans[quote['key']]:
|
|
||||||
chan_payloads.setdefault(
|
chan_payloads.setdefault(
|
||||||
chan,
|
(chan, cid),
|
||||||
{'yield': {}, 'cid': cid}
|
{'yield': {}, 'cid': cid}
|
||||||
)['yield'].update({quote['symbol']: quote})
|
)['yield'].update(packet)
|
||||||
|
|
||||||
# deliver to each subscriber (fan out)
|
# deliver to each subscriber (fan out)
|
||||||
if chan_payloads:
|
if chan_payloads:
|
||||||
for chan, payload in chan_payloads.items():
|
for (chan, cid), payload in chan_payloads.items():
|
||||||
try:
|
try:
|
||||||
await chan.send(payload)
|
await chan.send(payload)
|
||||||
except (
|
except (
|
||||||
|
@ -269,7 +268,7 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid):
|
||||||
chanset = symbols2chans.get(ticker)
|
chanset = symbols2chans.get(ticker)
|
||||||
# XXX: cid will be different on unsub call
|
# XXX: cid will be different on unsub call
|
||||||
for item in chanset.copy():
|
for item in chanset.copy():
|
||||||
if chan in item:
|
if (chan, cid) == item:
|
||||||
chanset.discard(item)
|
chanset.discard(item)
|
||||||
|
|
||||||
if not chanset:
|
if not chanset:
|
||||||
|
|
Loading…
Reference in New Issue