diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 17652ee5..9da46591 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -891,44 +891,49 @@ async def tsdb_backfill( config, ) - # tell parent task to continue - # TODO: really we'd want this the other way with the - # tsdb load happening asap and the since the latest - # frame query will normally be the main source of - # latency? - task_status.started() - tsdb_entry: tuple = await load_tsdb_hist( storage, mkt, timeframe, ) + # tell parent task to continue + # TODO: really we'd want this the other way with the + # tsdb load happening asap and the since the latest + # frame query will normally be the main source of + # latency? + task_status.started() + # NOTE: iabs to start backfilling from, reverse chronological, # ONLY AFTER the first history frame has been pushed to # mem! backfill_gap_from_shm_index: int = shm._first.value + 1 - ( - mr_start_dt, - mr_end_dt, - ) = dt_eps + # Prepend any tsdb history to the shm buffer which should + # now be full of the most recent history pulled from the + # backend's last frame. + if ( + dt_eps + and tsdb_entry + ): + # unpack both the latest (gap) backfilled frame dts + ( + mr_start_dt, + mr_end_dt, + ) = dt_eps - async with trio.open_nursery() as tn: + # AND the tsdb history from (offline) storage) + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry - # Prepend any tsdb history to the shm buffer which should - # now be full of the most recent history pulled from the - # backend's last frame. - if tsdb_entry: - ( - tsdb_history, - first_tsdb_dt, - last_tsdb_dt, - ) = tsdb_entry + # if there is a gap to backfill from the first + # history frame until the last datum loaded from the tsdb + # continue that now in the background + async with trio.open_nursery() as tn: - # if there is a gap to backfill from the first - # history frame until the last datum loaded from the tsdb - # continue that now in the background bf_done = await tn.start( partial( start_backfill, @@ -941,8 +946,10 @@ async def tsdb_backfill( backfill_from_shm_index=backfill_gap_from_shm_index, backfill_from_dt=mr_start_dt, + sampler_stream=sampler_stream, backfill_until_dt=last_tsdb_dt, + storage=storage, write_tsdb=True, ) @@ -1032,11 +1039,11 @@ async def tsdb_backfill( mkt=mkt, )) + # 2nd nursery END - # TODO: who would want to? - await nulls_detected.wait() - - await bf_done.wait() + # TODO: who would want to? + await nulls_detected.wait() + await bf_done.wait() # TODO: maybe start history anal and load missing "history # gaps" via backend..