diff --git a/piker/data/feed.py b/piker/data/feed.py index ac6b188a..047bd40d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -267,8 +267,6 @@ async def allocate_persistent_feed( # TODO: probably make a struct msg type for this as well # since eventually we do want to have more efficient IPC.. first_quote: dict[str, Any] - - symstr = symstr.lower() ( init_msgs, first_quote, @@ -465,9 +463,6 @@ async def open_feed_bus( for symbol in symbols: - # we always use lower case keys internally - symbol = symbol.lower() - # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery @@ -653,7 +648,12 @@ class Feed(Struct): brokers: Sequence[str] | None = None, ) -> trio.abc.ReceiveChannel: + ''' + Open steams to multiple data providers (``brokers``) and + multiplex their msgs onto a common mem chan for + only-requires-a-single-thread style consumption. + ''' if brokers is None: mods = self.mods brokers = list(self.mods) @@ -739,7 +739,7 @@ async def install_brokerd_search( @acm async def maybe_open_feed( - fqsns: list[str], + fqmes: list[str], loglevel: Optional[str] = None, **kwargs, @@ -754,12 +754,12 @@ async def maybe_open_feed( in a tractor broadcast receiver. ''' - fqsn = fqsns[0] + fqme = fqmes[0] async with maybe_open_context( acm_func=open_feed, kwargs={ - 'fqsns': fqsns, + 'fqmes': fqmes, 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), @@ -767,12 +767,12 @@ async def maybe_open_feed( 'allow_overruns': kwargs.get('allow_overruns', True), 'start_stream': kwargs.get('start_stream', True), }, - key=fqsn, + key=fqme, ) as (cache_hit, feed): if cache_hit: - log.info(f'Using cached feed for {fqsn}') + log.info(f'Using cached feed for {fqme}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use @@ -793,7 +793,7 @@ async def maybe_open_feed( @acm async def open_feed( - fqsns: list[str], + fqmes: list[str], loglevel: str | None = None, allow_overruns: bool = True, @@ -808,9 +808,9 @@ async def open_feed( providers: dict[ModuleType, list[str]] = {} feed = Feed() - for fqsn in fqsns: - brokername, *_ = unpack_fqme(fqsn) - bfqsn = fqsn.replace('.' + brokername, '') + for fqme in fqmes: + brokername, *_ = unpack_fqme(fqme) + bfqme = fqme.replace('.' + brokername, '') try: mod = get_brokermod(brokername) @@ -818,13 +818,13 @@ async def open_feed( mod = get_ingestormod(brokername) # built a per-provider map to instrument names - providers.setdefault(mod, []).append(bfqsn) + providers.setdefault(mod, []).append(bfqme) feed.mods[mod.name] = mod # one actor per brokerd for now brokerd_ctxs = [] - for brokermod, bfqsns in providers.items(): + for brokermod, bfqmes in providers.items(): # if no `brokerd` for this backend exists yet we spawn # a daemon actor for it. @@ -843,7 +843,7 @@ async def open_feed( bus_ctxs: list[AsyncContextManager] = [] for ( portal, - (brokermod, bfqsns), + (brokermod, bfqmes), ) in zip(portals, providers.items()): feed.portals[brokermod] = portal @@ -868,10 +868,20 @@ async def open_feed( portal.open_context( open_feed_bus, brokername=brokermod.name, - symbols=bfqsns, + symbols=bfqmes, loglevel=loglevel, start_stream=start_stream, tick_throttle=tick_throttle, + + # XXX: super important to avoid + # the brokerd from some other + # backend overruning the task here + # bc some other brokerd took longer + # to startup before we hit the `.open_stream()` + # loop below XD .. really we should try to do each + # of these stream open sequences sequentially per + # backend? .. need some thot! + allow_overruns=True, ) ) @@ -880,16 +890,16 @@ async def open_feed( async with ( gather_contexts(bus_ctxs) as ctxs, ): - stream_ctxs = [] + stream_ctxs: list[tractor.MsgStream] = [] for ( (ctx, flumes_msg_dict), - (brokermod, bfqsns), + (brokermod, bfqmes), ) in zip(ctxs, providers.items()): - for fqsn, flume_msg in flumes_msg_dict.items(): + for fqme, flume_msg in flumes_msg_dict.items(): flume = Flume.from_msg(flume_msg) - assert flume.symbol.fqsn == fqsn - feed.flumes[fqsn] = flume + assert flume.symbol.fqme == fqme + feed.flumes[fqme] = flume # TODO: do we need this? flume.feed = feed @@ -915,21 +925,24 @@ async def open_feed( ) ) + stream: tractor.MsgStream + brokermod: ModuleType + fqmes: list[str] async with ( gather_contexts(stream_ctxs) as streams, ): for ( stream, - (brokermod, bfqsns), + (brokermod, bfqmes), ) in zip(streams, providers.items()): assert stream feed.streams[brokermod.name] = stream - # apply `brokerd`-common steam to each flume - # tracking a symbol from that provider. - for fqsn, flume in feed.flumes.items(): - if brokermod.name == flume.symbol.broker: + # apply `brokerd`-common stream to each flume + # tracking a live market feed from that provider. + for fqme, flume in feed.flumes.items(): + if brokermod.name == flume.mkt.broker: flume.stream = stream assert len(feed.mods) == len(feed.portals) == len(feed.streams)