From cfb125beefd6f59585014c4abbc786f3183f92ff Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 13 May 2023 17:35:46 -0400 Subject: [PATCH] `.data.feed`: finally solve startup overruns issue We need to allow overruns during the async multi-broker context spawning init bc some backends might take longer then others to setup (eg. binance vs. kucoin) and result in some context (stream) being overrun by the time we get to the `.open_stream()` phase. Ideally, we can maybe adjust the concurrent setup to be more of a task-per-provider style to avoid this in the future - which would also in theory result in more-immediate per-provider setup in terms showing ready feeds asap. Also, does a bunch of renaming from fqsn -> fqme and drops the lower casing of input symbols instead expecting the caller to know what the data backend it's requesting is going to be able to handle in terms of symbology. --- piker/data/feed.py | 69 +++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index ac6b188a..047bd40d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -267,8 +267,6 @@ async def allocate_persistent_feed( # TODO: probably make a struct msg type for this as well # since eventually we do want to have more efficient IPC.. first_quote: dict[str, Any] - - symstr = symstr.lower() ( init_msgs, first_quote, @@ -465,9 +463,6 @@ async def open_feed_bus( for symbol in symbols: - # we always use lower case keys internally - symbol = symbol.lower() - # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery @@ -653,7 +648,12 @@ class Feed(Struct): brokers: Sequence[str] | None = None, ) -> trio.abc.ReceiveChannel: + ''' + Open steams to multiple data providers (``brokers``) and + multiplex their msgs onto a common mem chan for + only-requires-a-single-thread style consumption. + ''' if brokers is None: mods = self.mods brokers = list(self.mods) @@ -739,7 +739,7 @@ async def install_brokerd_search( @acm async def maybe_open_feed( - fqsns: list[str], + fqmes: list[str], loglevel: Optional[str] = None, **kwargs, @@ -754,12 +754,12 @@ async def maybe_open_feed( in a tractor broadcast receiver. ''' - fqsn = fqsns[0] + fqme = fqmes[0] async with maybe_open_context( acm_func=open_feed, kwargs={ - 'fqsns': fqsns, + 'fqmes': fqmes, 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), @@ -767,12 +767,12 @@ async def maybe_open_feed( 'allow_overruns': kwargs.get('allow_overruns', True), 'start_stream': kwargs.get('start_stream', True), }, - key=fqsn, + key=fqme, ) as (cache_hit, feed): if cache_hit: - log.info(f'Using cached feed for {fqsn}') + log.info(f'Using cached feed for {fqme}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use @@ -793,7 +793,7 @@ async def maybe_open_feed( @acm async def open_feed( - fqsns: list[str], + fqmes: list[str], loglevel: str | None = None, allow_overruns: bool = True, @@ -808,9 +808,9 @@ async def open_feed( providers: dict[ModuleType, list[str]] = {} feed = Feed() - for fqsn in fqsns: - brokername, *_ = unpack_fqme(fqsn) - bfqsn = fqsn.replace('.' + brokername, '') + for fqme in fqmes: + brokername, *_ = unpack_fqme(fqme) + bfqme = fqme.replace('.' + brokername, '') try: mod = get_brokermod(brokername) @@ -818,13 +818,13 @@ async def open_feed( mod = get_ingestormod(brokername) # built a per-provider map to instrument names - providers.setdefault(mod, []).append(bfqsn) + providers.setdefault(mod, []).append(bfqme) feed.mods[mod.name] = mod # one actor per brokerd for now brokerd_ctxs = [] - for brokermod, bfqsns in providers.items(): + for brokermod, bfqmes in providers.items(): # if no `brokerd` for this backend exists yet we spawn # a daemon actor for it. @@ -843,7 +843,7 @@ async def open_feed( bus_ctxs: list[AsyncContextManager] = [] for ( portal, - (brokermod, bfqsns), + (brokermod, bfqmes), ) in zip(portals, providers.items()): feed.portals[brokermod] = portal @@ -868,10 +868,20 @@ async def open_feed( portal.open_context( open_feed_bus, brokername=brokermod.name, - symbols=bfqsns, + symbols=bfqmes, loglevel=loglevel, start_stream=start_stream, tick_throttle=tick_throttle, + + # XXX: super important to avoid + # the brokerd from some other + # backend overruning the task here + # bc some other brokerd took longer + # to startup before we hit the `.open_stream()` + # loop below XD .. really we should try to do each + # of these stream open sequences sequentially per + # backend? .. need some thot! + allow_overruns=True, ) ) @@ -880,16 +890,16 @@ async def open_feed( async with ( gather_contexts(bus_ctxs) as ctxs, ): - stream_ctxs = [] + stream_ctxs: list[tractor.MsgStream] = [] for ( (ctx, flumes_msg_dict), - (brokermod, bfqsns), + (brokermod, bfqmes), ) in zip(ctxs, providers.items()): - for fqsn, flume_msg in flumes_msg_dict.items(): + for fqme, flume_msg in flumes_msg_dict.items(): flume = Flume.from_msg(flume_msg) - assert flume.symbol.fqsn == fqsn - feed.flumes[fqsn] = flume + assert flume.symbol.fqme == fqme + feed.flumes[fqme] = flume # TODO: do we need this? flume.feed = feed @@ -915,21 +925,24 @@ async def open_feed( ) ) + stream: tractor.MsgStream + brokermod: ModuleType + fqmes: list[str] async with ( gather_contexts(stream_ctxs) as streams, ): for ( stream, - (brokermod, bfqsns), + (brokermod, bfqmes), ) in zip(streams, providers.items()): assert stream feed.streams[brokermod.name] = stream - # apply `brokerd`-common steam to each flume - # tracking a symbol from that provider. - for fqsn, flume in feed.flumes.items(): - if brokermod.name == flume.symbol.broker: + # apply `brokerd`-common stream to each flume + # tracking a live market feed from that provider. + for fqme, flume in feed.flumes.items(): + if brokermod.name == flume.mkt.broker: flume.stream = stream assert len(feed.mods) == len(feed.portals) == len(feed.streams)