From 03c38a1163561d800b8e85b361d1b6daedd4e315 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Aug 2021 10:45:23 -0400 Subject: [PATCH] It's a map of symbols to first quote dicts --- piker/data/feed.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 7b390ece..9bfe95a9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -192,7 +192,7 @@ async def allocate_persistent_feed( # establish broker backend quote stream # ``stream_quotes()`` is a required backend func - init_msg, first_quote = await bus.nursery.start( + init_msg, first_quotes = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, @@ -212,7 +212,7 @@ async def allocate_persistent_feed( # XXX: the ``symbol`` here is put into our native piker format (i.e. # lower case). - bus.feeds[symbol.lower()] = (cs, init_msg, first_quote) + bus.feeds[symbol.lower()] = (cs, init_msg, first_quotes) if opened: # start history backfill task ``backfill_bars()`` is @@ -227,7 +227,7 @@ async def allocate_persistent_feed( init_msg[symbol]['sample_rate'] = int(delay_s) # yield back control to starting nursery - task_status.started((init_msg, first_quote)) + task_status.started((init_msg, first_quotes)) await feed_is_live.wait() @@ -277,7 +277,7 @@ async def attach_feed_bus( # service nursery async with bus.task_lock: if entry is None: - init_msg, first_quote = await bus.nursery.start( + init_msg, first_quotes = await bus.nursery.start( partial( allocate_persistent_feed, @@ -294,13 +294,13 @@ async def attach_feed_bus( ) assert isinstance(bus.feeds[symbol], tuple) - # XXX: ``first_quote`` may be outdated here if this is secondary + # XXX: ``first_quotes`` may be outdated here if this is secondary # subscriber - cs, init_msg, first_quote = bus.feeds[symbol] + cs, init_msg, first_quotes = bus.feeds[symbol] # send this even to subscribers to existing feed? # deliver initial info message a first quote asap - await ctx.started((init_msg, first_quote)) + await ctx.started((init_msg, first_quotes)) async with ( ctx.open_stream() as stream, @@ -392,7 +392,7 @@ class Feed: name: str shm: ShmArray mod: ModuleType - first_quote: dict + first_quotes: dict # symbol names to first quote dicts stream: trio.abc.ReceiveChannel[dict[str, Any]] _brokerd_portal: tractor._portal.Portal @@ -509,7 +509,7 @@ async def open_feed( tick_throttle=tick_throttle, - ) as (ctx, (init_msg, first_quote)), + ) as (ctx, (init_msg, first_quotes)), ctx.open_stream() as stream, @@ -524,7 +524,7 @@ async def open_feed( name=brokername, shm=shm, mod=mod, - first_quote=first_quote, + first_quotes=first_quotes, stream=stream, _brokerd_portal=portal, )