Do tsdb backloading to shm concurrently

Not only improves startup latency but also avoids a bug where the rt
buffer was being tsdb-history prepended *before* the backfilling of
recent data from the backend was complete resulting in our of order
frames in shm.
ib_1m_hist
Tyler Goodlet 2022-10-26 09:52:51 -04:00
parent 4ca7817735
commit dc1edeecda
1 changed files with 28 additions and 10 deletions

View File

@ -552,6 +552,7 @@ async def tsdb_backfill(
last_tsdb_dt, last_tsdb_dt,
None, None,
None, None,
bf_done,
) )
continue continue
@ -561,6 +562,7 @@ async def tsdb_backfill(
last_tsdb_dt, last_tsdb_dt,
latest_start_dt, latest_start_dt,
latest_end_dt, latest_end_dt,
bf_done,
) )
# if len(hist_shm.array) < 2: # if len(hist_shm.array) < 2:
@ -580,22 +582,27 @@ async def tsdb_backfill(
shms[1], shms[1],
)) ))
# sync to backend history task's query/load completion async def back_load_from_tsdb(
await bf_done.wait() timeframe: int,
shm: ShmArray,
# Load tsdb history into shm buffer (for display). ):
# TODO: eventually it'd be nice to not require a shm array/buffer
# to accomplish this.. maybe we can do some kind of tsdb direct to
# graphics format eventually in a child-actor?
for timeframe, shm in shms.items():
( (
tsdb_history, tsdb_history,
last_tsdb_dt, last_tsdb_dt,
latest_start_dt, latest_start_dt,
latest_end_dt, latest_end_dt,
bf_done,
) = dts_per_tf[timeframe] ) = dts_per_tf[timeframe]
# sync to backend history task's query/load completion
await bf_done.wait()
# Load tsdb history into shm buffer (for display).
# TODO: eventually it'd be nice to not require a shm array/buffer
# to accomplish this.. maybe we can do some kind of tsdb direct to
# graphics format eventually in a child-actor?
# do diff against last start frame of history and only fill # do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent # in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data. # to be loaded into mem *before* tsdb data.
@ -686,7 +693,18 @@ async def tsdb_backfill(
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)
# TODO: write new data to tsdb to be ready to for next read. # TODO: write new data to tsdb to be ready to for next read.
# backload from db (concurrently per timeframe) once backfilling of
# recent dat a loaded from the backend provider (see
# ``bf_done.wait()`` call).
async with trio.open_nursery() as nurse:
for timeframe, shm in shms.items():
nurse.start_soon(
back_load_from_tsdb,
timeframe,
shm,
)
async def manage_history( async def manage_history(