Pass history shm "last index" in init msg, assign on feed

history_view
Tyler Goodlet 2022-09-01 11:26:29 -04:00
parent 3a434f312b
commit 49ccfdd673
1 changed files with 8 additions and 2 deletions

View File

@ -951,6 +951,7 @@ async def allocate_persistent_feed(
# this task.
msg = init_msg[symbol]
msg['hist_shm_token'] = hist_shm.token
msg['startup_hist_index'] = hist_shm.index - 1
msg['rt_shm_token'] = rt_shm.token
# true fqsn
@ -1040,6 +1041,7 @@ async def allocate_persistent_feed(
await sample_and_broadcast(
bus,
rt_shm,
hist_shm,
quote_stream,
brokername,
sum_tick_vlm
@ -1222,6 +1224,8 @@ class Feed:
stream: trio.abc.ReceiveChannel[dict[str, Any]]
status: dict[str, Any]
startup_hist_index: int = 0
throttle_rate: Optional[int] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
@ -1361,13 +1365,14 @@ async def open_feed(
) as stream,
):
init = init_msg[bfqsn]
# we can only read from shm
hist_shm = attach_shm_array(
token=init_msg[bfqsn]['hist_shm_token'],
token=init['hist_shm_token'],
readonly=True,
)
rt_shm = attach_shm_array(
token=init_msg[bfqsn]['rt_shm_token'],
token=init['rt_shm_token'],
readonly=True,
)
@ -1382,6 +1387,7 @@ async def open_feed(
stream=stream,
_portal=portal,
status={},
startup_hist_index=init['startup_hist_index'],
throttle_rate=tick_throttle,
)