Fix slice logic for less-then-frame tsdb overlap
When the tsdb has a last datum that is in the past less then a "frame's worth" of sample steps we need to slice out only the data from the latest frame that doesn't overlap; this fixes that slice logic.. Previously i dunno wth it was doing..incr_update_backup
							parent
							
								
									468cd3a381
								
							
						
					
					
						commit
						112cba43e5
					
				| 
						 | 
					@ -208,27 +208,24 @@ def diff_history(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> np.ndarray:
 | 
					) -> np.ndarray:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    to_push = array
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if last_tsdb_dt:
 | 
					    if last_tsdb_dt:
 | 
				
			||||||
        s_diff = (start_dt - last_tsdb_dt).seconds
 | 
					        s_diff = (start_dt - last_tsdb_dt).seconds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        to_push = array[:s_diff]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # if we detect a partial frame's worth of data
 | 
					        # if we detect a partial frame's worth of data
 | 
				
			||||||
        # that is new, slice out only that history and
 | 
					        # that is new, slice out only that history and
 | 
				
			||||||
        # write to shm.
 | 
					        # write to shm.
 | 
				
			||||||
        if abs(s_diff) < len(array):
 | 
					        if (
 | 
				
			||||||
 | 
					            s_diff < 0
 | 
				
			||||||
 | 
					            and abs(s_diff) < len(array)
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
            log.info(
 | 
					            log.info(
 | 
				
			||||||
                f'Pushing partial frame {to_push.size} to shm'
 | 
					                f'Pushing partial frame {to_push.size} to shm'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            # assert last_tsdb_dt > start_dt
 | 
					            to_push = array[abs(s_diff):]
 | 
				
			||||||
            # selected = array['time'] > last_tsdb_dt.timestamp()
 | 
					 | 
				
			||||||
            # to_push = array[selected]
 | 
					 | 
				
			||||||
            # return to_push
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return to_push
 | 
					    return to_push
 | 
				
			||||||
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        return array
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def start_backfill(
 | 
					async def start_backfill(
 | 
				
			||||||
| 
						 | 
					@ -248,6 +245,17 @@ async def start_backfill(
 | 
				
			||||||
        # back to what is recorded in the tsdb
 | 
					        # back to what is recorded in the tsdb
 | 
				
			||||||
        array, start_dt, end_dt = await hist(end_dt=None)
 | 
					        array, start_dt, end_dt = await hist(end_dt=None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        times = array['time']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # sample period step size in seconds
 | 
				
			||||||
 | 
					        step_size_s = (
 | 
				
			||||||
 | 
					            pendulum.from_timestamp(times[-1]) -
 | 
				
			||||||
 | 
					            pendulum.from_timestamp(times[-2])
 | 
				
			||||||
 | 
					        ).seconds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # "frame"'s worth of sample period steps in seconds
 | 
				
			||||||
 | 
					        frame_size_s = len(array) * step_size_s
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        to_push = diff_history(
 | 
					        to_push = diff_history(
 | 
				
			||||||
            array,
 | 
					            array,
 | 
				
			||||||
            start_dt,
 | 
					            start_dt,
 | 
				
			||||||
| 
						 | 
					@ -267,13 +275,6 @@ async def start_backfill(
 | 
				
			||||||
        # let caller unblock and deliver latest history frame
 | 
					        # let caller unblock and deliver latest history frame
 | 
				
			||||||
        task_status.started((shm, start_dt, end_dt, bf_done))
 | 
					        task_status.started((shm, start_dt, end_dt, bf_done))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        times = array['time']
 | 
					 | 
				
			||||||
        step_size_s = (
 | 
					 | 
				
			||||||
            pendulum.from_timestamp(times[-1]) -
 | 
					 | 
				
			||||||
            pendulum.from_timestamp(times[-2])
 | 
					 | 
				
			||||||
        ).seconds
 | 
					 | 
				
			||||||
        frame_size_s = len(to_push) * step_size_s
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if last_tsdb_dt is None:
 | 
					        if last_tsdb_dt is None:
 | 
				
			||||||
            # maybe a better default (they don't seem to define epoch?!)
 | 
					            # maybe a better default (they don't seem to define epoch?!)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue