diff --git a/piker/data/feed.py b/piker/data/feed.py index 9beec93b..51f1275b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1033,9 +1033,7 @@ async def allocate_persistent_feed( [symstr], init_msgs, ) - bs_mktid: str = init.bs_mktid mkt: MktPair = init.mkt_info - assert mkt.bs_mktid == bs_mktid fqme: str = mkt.fqme # HISTORY storage, run 2 tasks: @@ -1150,14 +1148,14 @@ async def open_feed_bus( ctx: tractor.Context, brokername: str, - symbols: list[str], # normally expected to the broker-specific fqsn + symbols: list[str], # normally expected to the broker-specific fqme loglevel: str = 'error', tick_throttle: Optional[float] = None, start_stream: bool = True, ) -> dict[ - str, # fqsn + str, # fqme tuple[dict, dict] # pair of dicts of the initmsg and first quotes ]: ''' @@ -1218,33 +1216,32 @@ async def open_feed_bus( # XXX: ``.first_quote`` may be outdated here if this is secondary # subscriber - flume = bus.feeds[symbol] - sym = flume.symbol - bs_mktid = sym.key - fqsn = sym.fqme # true fqsn - assert bs_mktid in fqsn and brokername in fqsn + flume: Flume = bus.feeds[symbol] + mkt: MktPair = flume.mkt + bs_fqme: str = mkt.bs_fqme + fqme: str = mkt.fqme + assert brokername in fqme - if sym.suffix: - bs_mktid = fqsn.removesuffix(f'.{brokername}') - log.warning(f'{brokername} expanded symbol {symbol} -> {bs_mktid}') + if mkt.suffix: + log.warning(f'{brokername} expanded symbol {symbol} -> {bs_fqme}') # pack for ``.started()`` sync msg - flumes[fqsn] = flume + flumes[fqme] = flume - # we use the broker-specific market id (bs_mktid) for the + # we use the broker-specific fqme (bs_fqme) for the # sampler subscription since the backend isn't (yet) expected to - # append it's own name to the fqsn, so we filter on keys which + # append it's own name to the fqme, so we filter on keys which # *do not* include that name (e.g .ib) . - bus._subscribers.setdefault(bs_mktid, set()) + bus._subscribers.setdefault(bs_fqme, set()) # sync feed subscribers with flume handles await ctx.started( - {fqsn: flume.to_msg() - for fqsn, flume in flumes.items()} + {fqme: flume.to_msg() + for fqme, flume in flumes.items()} ) if not start_stream: - log.warning(f'Not opening real-time stream for {fqsn}') + log.warning(f'Not opening real-time stream for {fqme}') await trio.sleep_forever() # real-time stream loop @@ -1258,11 +1255,11 @@ async def open_feed_bus( ): local_subs: dict[str, set[tuple]] = {} - for fqsn, flume in flumes.items(): + for fqme, flume in flumes.items(): # re-send to trigger display loop cycle (necessary especially # when the mkt is closed and no real-time messages are # expected). - await stream.send({fqsn: flume.first_quote}) + await stream.send({fqme: flume.first_quote}) # set a common msg stream for all requested symbols assert stream @@ -1304,9 +1301,9 @@ async def open_feed_bus( # maybe use the current task-id to key the sub list that's # added / removed? Or maybe we can add a general # pause-resume by sub-key api? - bs_mktid = fqsn.removesuffix(f'.{brokername}') - local_subs.setdefault(bs_mktid, set()).add(sub) - bus.add_subs(bs_mktid, {sub}) + bs_fqme = fqme.removesuffix(f'.{brokername}') + local_subs.setdefault(bs_fqme, set()).add(sub) + bus.add_subs(bs_fqme, {sub}) # sync caller with all subs registered state sub_registered.set() @@ -1319,16 +1316,16 @@ async def open_feed_bus( async for msg in stream: if msg == 'pause': - for bs_mktid, subs in local_subs.items(): + for bs_fqme, subs in local_subs.items(): log.info( - f'Pausing {bs_mktid} feed for {uid}') - bus.remove_subs(bs_mktid, subs) + f'Pausing {bs_fqme} feed for {uid}') + bus.remove_subs(bs_fqme, subs) elif msg == 'resume': - for bs_mktid, subs in local_subs.items(): + for bs_fqme, subs in local_subs.items(): log.info( - f'Resuming {bs_mktid} feed for {uid}') - bus.add_subs(bs_mktid, subs) + f'Resuming {bs_fqme} feed for {uid}') + bus.add_subs(bs_fqme, subs) else: raise ValueError(msg) @@ -1342,8 +1339,8 @@ async def open_feed_bus( cs.cancel() # drop all subs for this task from the bus - for bs_mktid, subs in local_subs.items(): - bus.remove_subs(bs_mktid, subs) + for bs_fqme, subs in local_subs.items(): + bus.remove_subs(bs_fqme, subs) class Feed(Struct):