From dc1edeecdaf50832f828f12c0e79acc72e520aff Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 26 Oct 2022 09:52:51 -0400 Subject: [PATCH] Do tsdb backloading to shm concurrently Not only improves startup latency but also avoids a bug where the rt buffer was being tsdb-history prepended *before* the backfilling of recent data from the backend was complete resulting in our of order frames in shm. --- piker/data/feed.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index ea3f5e1d..ad377941 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -552,6 +552,7 @@ async def tsdb_backfill( last_tsdb_dt, None, None, + bf_done, ) continue @@ -561,6 +562,7 @@ async def tsdb_backfill( last_tsdb_dt, latest_start_dt, latest_end_dt, + bf_done, ) # if len(hist_shm.array) < 2: @@ -580,22 +582,27 @@ async def tsdb_backfill( shms[1], )) - # sync to backend history task's query/load completion - await bf_done.wait() - - # Load tsdb history into shm buffer (for display). - - # TODO: eventually it'd be nice to not require a shm array/buffer - # to accomplish this.. maybe we can do some kind of tsdb direct to - # graphics format eventually in a child-actor? - for timeframe, shm in shms.items(): + async def back_load_from_tsdb( + timeframe: int, + shm: ShmArray, + ): ( tsdb_history, last_tsdb_dt, latest_start_dt, latest_end_dt, + bf_done, ) = dts_per_tf[timeframe] + # sync to backend history task's query/load completion + await bf_done.wait() + + # Load tsdb history into shm buffer (for display). + + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? + # do diff against last start frame of history and only fill # in from the tsdb an allotment that allows for most recent # to be loaded into mem *before* tsdb data. @@ -686,7 +693,18 @@ async def tsdb_backfill( for delay_s in sampler.subscribers: await broadcast(delay_s) - # TODO: write new data to tsdb to be ready to for next read. + # TODO: write new data to tsdb to be ready to for next read. + + # backload from db (concurrently per timeframe) once backfilling of + # recent dat a loaded from the backend provider (see + # ``bf_done.wait()`` call). + async with trio.open_nursery() as nurse: + for timeframe, shm in shms.items(): + nurse.start_soon( + back_load_from_tsdb, + timeframe, + shm, + ) async def manage_history(