diff --git a/piker/data/feed.py b/piker/data/feed.py index ef027322..a2b64c44 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -951,6 +951,7 @@ async def allocate_persistent_feed( # this task. msg = init_msg[symbol] msg['hist_shm_token'] = hist_shm.token + msg['startup_hist_index'] = hist_shm.index - 1 msg['rt_shm_token'] = rt_shm.token # true fqsn @@ -1040,6 +1041,7 @@ async def allocate_persistent_feed( await sample_and_broadcast( bus, rt_shm, + hist_shm, quote_stream, brokername, sum_tick_vlm @@ -1222,6 +1224,8 @@ class Feed: stream: trio.abc.ReceiveChannel[dict[str, Any]] status: dict[str, Any] + startup_hist_index: int = 0 + throttle_rate: Optional[int] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None @@ -1361,13 +1365,14 @@ async def open_feed( ) as stream, ): + init = init_msg[bfqsn] # we can only read from shm hist_shm = attach_shm_array( - token=init_msg[bfqsn]['hist_shm_token'], + token=init['hist_shm_token'], readonly=True, ) rt_shm = attach_shm_array( - token=init_msg[bfqsn]['rt_shm_token'], + token=init['rt_shm_token'], readonly=True, ) @@ -1382,6 +1387,7 @@ async def open_feed( stream=stream, _portal=portal, status={}, + startup_hist_index=init['startup_hist_index'], throttle_rate=tick_throttle, )