diff --git a/piker/data/feed.py b/piker/data/feed.py index 272add05..8c88f04b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -208,27 +208,24 @@ def diff_history( ) -> np.ndarray: + to_push = array + if last_tsdb_dt: s_diff = (start_dt - last_tsdb_dt).seconds - to_push = array[:s_diff] - # if we detect a partial frame's worth of data # that is new, slice out only that history and # write to shm. - if abs(s_diff) < len(array): + if ( + s_diff < 0 + and abs(s_diff) < len(array) + ): log.info( f'Pushing partial frame {to_push.size} to shm' ) - # assert last_tsdb_dt > start_dt - # selected = array['time'] > last_tsdb_dt.timestamp() - # to_push = array[selected] - # return to_push + to_push = array[abs(s_diff):] - return to_push - - else: - return array + return to_push async def start_backfill( @@ -248,6 +245,17 @@ async def start_backfill( # back to what is recorded in the tsdb array, start_dt, end_dt = await hist(end_dt=None) + times = array['time'] + + # sample period step size in seconds + step_size_s = ( + pendulum.from_timestamp(times[-1]) - + pendulum.from_timestamp(times[-2]) + ).seconds + + # "frame"'s worth of sample period steps in seconds + frame_size_s = len(array) * step_size_s + to_push = diff_history( array, start_dt, @@ -267,13 +275,6 @@ async def start_backfill( # let caller unblock and deliver latest history frame task_status.started((shm, start_dt, end_dt, bf_done)) - times = array['time'] - step_size_s = ( - pendulum.from_timestamp(times[-1]) - - pendulum.from_timestamp(times[-2]) - ).seconds - frame_size_s = len(to_push) * step_size_s - if last_tsdb_dt is None: # maybe a better default (they don't seem to define epoch?!)