Fix partial-frame-missing backfill logic
This had a bug prior where the end of a frame (a partial) wasn't being sliced correctly and we'd get odd gaps showing up in the backfilled from `brokerd` vs. tsdb end index. Repair this by doing timeframe aware index diffing in `diff_history()` which seems to resolve it. Also, use the frame-result's `end_dt: datetime` for the loop exit condition.agg_feedz
							parent
							
								
									36868bb86e
								
							
						
					
					
						commit
						8476d8d056
					
				| 
						 | 
					@ -208,39 +208,50 @@ async def _setup_persistent_brokerd(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def diff_history(
 | 
					def diff_history(
 | 
				
			||||||
    array,
 | 
					    array: np.ndarray,
 | 
				
			||||||
    start_dt,
 | 
					    timeframe: int,
 | 
				
			||||||
    end_dt,
 | 
					    start_dt: datetime,
 | 
				
			||||||
    last_tsdb_dt: Optional[datetime] = None
 | 
					    end_dt: datetime,
 | 
				
			||||||
 | 
					    last_tsdb_dt: datetime | None = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> np.ndarray:
 | 
					) -> np.ndarray:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # no diffing with tsdb dt index possible..
 | 
				
			||||||
 | 
					    if last_tsdb_dt is None:
 | 
				
			||||||
 | 
					        return array
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    time_attr = {
 | 
				
			||||||
 | 
					        1: 'seconds',
 | 
				
			||||||
 | 
					        60: 'minutes',
 | 
				
			||||||
 | 
					    }[timeframe]
 | 
				
			||||||
 | 
					    i_diff = getattr((end_dt - last_tsdb_dt), time_attr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # if we detect a partial frame's worth of data
 | 
				
			||||||
 | 
					    # that is new, slice out only that history and
 | 
				
			||||||
 | 
					    # write to shm.
 | 
				
			||||||
 | 
					    if i_diff < 0:
 | 
				
			||||||
 | 
					        # empty case since tsdb already has this history
 | 
				
			||||||
 | 
					        return array[:0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ln = len(array)
 | 
				
			||||||
    to_push = array
 | 
					    to_push = array
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if last_tsdb_dt:
 | 
					    if i_diff < ln:
 | 
				
			||||||
        s_diff = (start_dt - last_tsdb_dt).seconds
 | 
					        # slice out missing history from end of frame
 | 
				
			||||||
 | 
					        to_push = array[ln - i_diff:ln]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # if we detect a partial frame's worth of data
 | 
					    # XXX: OLD GAP HANDLING..if there's a gap in a partial frame
 | 
				
			||||||
        # that is new, slice out only that history and
 | 
					    # worth. we don't need this any more with the timeframe aware
 | 
				
			||||||
        # write to shm.
 | 
					    # diffing above right?
 | 
				
			||||||
        if (
 | 
					    # else:
 | 
				
			||||||
            s_diff < 0
 | 
					    #     # pass back only the portion of the array that is
 | 
				
			||||||
        ):
 | 
					    #     # greater then the last time stamp in the tsdb.
 | 
				
			||||||
            if abs(s_diff) < len(array):
 | 
					    #     time = array['time']
 | 
				
			||||||
                # the + 1 is because ``last_tsdb_dt`` is pulled from
 | 
					    #     to_push = array[time >= last_tsdb_dt.timestamp()]
 | 
				
			||||||
                # the last row entry for the ``'time'`` field retreived
 | 
					 | 
				
			||||||
                # from the tsdb.
 | 
					 | 
				
			||||||
                to_push = array[abs(s_diff) + 1:]
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            else:
 | 
					    log.debug(
 | 
				
			||||||
                # pass back only the portion of the array that is
 | 
					        f'Pushing partial frame {to_push.size} to shm'
 | 
				
			||||||
                # greater then the last time stamp in the tsdb.
 | 
					    )
 | 
				
			||||||
                time = array['time']
 | 
					 | 
				
			||||||
                to_push = array[time >= last_tsdb_dt.timestamp()]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            log.debug(
 | 
					 | 
				
			||||||
                f'Pushing partial frame {to_push.size} to shm'
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return to_push
 | 
					    return to_push
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -286,6 +297,7 @@ async def start_backfill(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        to_push = diff_history(
 | 
					        to_push = diff_history(
 | 
				
			||||||
            array,
 | 
					            array,
 | 
				
			||||||
 | 
					            timeframe,
 | 
				
			||||||
            start_dt,
 | 
					            start_dt,
 | 
				
			||||||
            end_dt,
 | 
					            end_dt,
 | 
				
			||||||
            last_tsdb_dt=last_tsdb_dt,
 | 
					            last_tsdb_dt=last_tsdb_dt,
 | 
				
			||||||
| 
						 | 
					@ -334,12 +346,12 @@ async def start_backfill(
 | 
				
			||||||
                    60: {'years': 6},
 | 
					                    60: {'years': 6},
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            kwargs = periods[step_size_s]
 | 
					            period_duration = periods[step_size_s]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: manually set the "latest" datetime which we intend to
 | 
					            # NOTE: manually set the "latest" datetime which we intend to
 | 
				
			||||||
            # backfill history "until" so as to adhere to the history
 | 
					            # backfill history "until" so as to adhere to the history
 | 
				
			||||||
            # settings above when the tsdb is detected as being empty.
 | 
					            # settings above when the tsdb is detected as being empty.
 | 
				
			||||||
            last_tsdb_dt = start_dt.subtract(**kwargs)
 | 
					            last_tsdb_dt = start_dt.subtract(**period_duration)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # configure async query throttling
 | 
					        # configure async query throttling
 | 
				
			||||||
        # rate = config.get('rate', 1)
 | 
					        # rate = config.get('rate', 1)
 | 
				
			||||||
| 
						 | 
					@ -353,7 +365,7 @@ async def start_backfill(
 | 
				
			||||||
        # inline sequential loop where we simply pass the
 | 
					        # inline sequential loop where we simply pass the
 | 
				
			||||||
        # last retrieved start dt to the next request as
 | 
					        # last retrieved start dt to the next request as
 | 
				
			||||||
        # it's end dt.
 | 
					        # it's end dt.
 | 
				
			||||||
        while start_dt > last_tsdb_dt:
 | 
					        while end_dt > last_tsdb_dt:
 | 
				
			||||||
            log.debug(
 | 
					            log.debug(
 | 
				
			||||||
                f'Requesting {step_size_s}s frame ending in {start_dt}'
 | 
					                f'Requesting {step_size_s}s frame ending in {start_dt}'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
| 
						 | 
					@ -363,6 +375,8 @@ async def start_backfill(
 | 
				
			||||||
                    timeframe,
 | 
					                    timeframe,
 | 
				
			||||||
                    end_dt=start_dt,
 | 
					                    end_dt=start_dt,
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					                # if timeframe == 1:
 | 
				
			||||||
 | 
					                #     await tractor.breakpoint()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # broker says there never was or is no more history to pull
 | 
					            # broker says there never was or is no more history to pull
 | 
				
			||||||
            except DataUnavailable:
 | 
					            except DataUnavailable:
 | 
				
			||||||
| 
						 | 
					@ -404,6 +418,7 @@ async def start_backfill(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            to_push = diff_history(
 | 
					            to_push = diff_history(
 | 
				
			||||||
                array,
 | 
					                array,
 | 
				
			||||||
 | 
					                timeframe,
 | 
				
			||||||
                start_dt,
 | 
					                start_dt,
 | 
				
			||||||
                end_dt,
 | 
					                end_dt,
 | 
				
			||||||
                last_tsdb_dt=last_tsdb_dt,
 | 
					                last_tsdb_dt=last_tsdb_dt,
 | 
				
			||||||
| 
						 | 
					@ -424,6 +439,9 @@ async def start_backfill(
 | 
				
			||||||
                log.info(
 | 
					                log.info(
 | 
				
			||||||
                    f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
 | 
					                    f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					                # can't push the entire frame? so
 | 
				
			||||||
 | 
					                # push only the amount that can fit..
 | 
				
			||||||
 | 
					                await tractor.breakpoint()
 | 
				
			||||||
                break
 | 
					                break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            log.info(
 | 
					            log.info(
 | 
				
			||||||
| 
						 | 
					@ -510,6 +528,8 @@ async def tsdb_backfill(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # start history anal and load missing new data via backend.
 | 
					    # start history anal and load missing new data via backend.
 | 
				
			||||||
    for timeframe, shm in shms.items():
 | 
					    for timeframe, shm in shms.items():
 | 
				
			||||||
 | 
					        # loads a (large) frame of data from the tsdb depending
 | 
				
			||||||
 | 
					        # on the db's query size limit.
 | 
				
			||||||
        tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load(
 | 
					        tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load(
 | 
				
			||||||
            fqsn,
 | 
					            fqsn,
 | 
				
			||||||
            timeframe=timeframe,
 | 
					            timeframe=timeframe,
 | 
				
			||||||
| 
						 | 
					@ -586,29 +606,45 @@ async def tsdb_backfill(
 | 
				
			||||||
        if bf_done:
 | 
					        if bf_done:
 | 
				
			||||||
            await bf_done.wait()
 | 
					            await bf_done.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Load tsdb history into shm buffer (for display).
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # TODO: eventually it'd be nice to not require a shm array/buffer
 | 
					        # TODO: eventually it'd be nice to not require a shm array/buffer
 | 
				
			||||||
        # to accomplish this.. maybe we can do some kind of tsdb direct to
 | 
					        # to accomplish this.. maybe we can do some kind of tsdb direct to
 | 
				
			||||||
        # graphics format eventually in a child-actor?
 | 
					        # graphics format eventually in a child-actor?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # do diff against last start frame of history and only fill
 | 
					 | 
				
			||||||
        # in from the tsdb an allotment that allows for most recent
 | 
					 | 
				
			||||||
        # to be loaded into mem *before* tsdb data.
 | 
					 | 
				
			||||||
        if last_tsdb_dt and latest_start_dt:
 | 
					 | 
				
			||||||
            dt_diff_s = (
 | 
					 | 
				
			||||||
                latest_start_dt - last_tsdb_dt
 | 
					 | 
				
			||||||
            ).seconds
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            dt_diff_s = 0
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # TODO: see if there's faster multi-field reads:
 | 
					        # TODO: see if there's faster multi-field reads:
 | 
				
			||||||
        # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
 | 
					        # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
 | 
				
			||||||
        # re-index  with a `time` and index field
 | 
					        # re-index  with a `time` and index field
 | 
				
			||||||
        prepend_start = shm._first.value
 | 
					        prepend_start = shm._first.value
 | 
				
			||||||
 | 
					        shm_last_dt = pendulum.from_timestamp(shm.array[0]['time'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # sanity check on most-recent-data loading
 | 
					        if last_tsdb_dt:
 | 
				
			||||||
        assert prepend_start > dt_diff_s
 | 
					            assert shm_last_dt >= last_tsdb_dt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # do diff against start index of last frame of history and only
 | 
				
			||||||
 | 
					        # fill in an amount of datums from tsdb allows for most recent
 | 
				
			||||||
 | 
					        # to be loaded into mem *before* tsdb data.
 | 
				
			||||||
 | 
					        if (
 | 
				
			||||||
 | 
					            last_tsdb_dt
 | 
				
			||||||
 | 
					            and latest_start_dt
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					            backfilled_size_s = (
 | 
				
			||||||
 | 
					                latest_start_dt - last_tsdb_dt
 | 
				
			||||||
 | 
					            ).seconds
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            backfilled_size_s = (
 | 
				
			||||||
 | 
					                latest_start_dt - shm_last_dt
 | 
				
			||||||
 | 
					            ).seconds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Load TSDB history into shm buffer (for display) if there is
 | 
				
			||||||
 | 
					        # remaining buffer space.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # if the shm buffer len is not large enough to contain
 | 
				
			||||||
 | 
					        # all missing data between the most recent backend-queried frame
 | 
				
			||||||
 | 
					        # and the most recent dt-index in the db we warn that we only
 | 
				
			||||||
 | 
					        # want to load a portion of the next tsdb query to fill that
 | 
				
			||||||
 | 
					        # space.
 | 
				
			||||||
 | 
					        log.info(
 | 
				
			||||||
 | 
					            f'{backfilled_size_s} seconds worth of {timeframe}s data loaded'
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (
 | 
					        if (
 | 
				
			||||||
            len(tsdb_history)
 | 
					            len(tsdb_history)
 | 
				
			||||||
| 
						 | 
					@ -1266,7 +1302,6 @@ async def open_feed_bus(
 | 
				
			||||||
        # expected to append it's own name to the fqsn, so we filter
 | 
					        # expected to append it's own name to the fqsn, so we filter
 | 
				
			||||||
        # on keys which *do not* include that name (e.g .ib) .
 | 
					        # on keys which *do not* include that name (e.g .ib) .
 | 
				
			||||||
        bus._subscribers.setdefault(bfqsn, [])
 | 
					        bus._subscribers.setdefault(bfqsn, [])
 | 
				
			||||||
        # await tractor.breakpoint()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # sync feed subscribers with flume handles
 | 
					    # sync feed subscribers with flume handles
 | 
				
			||||||
    await ctx.started(
 | 
					    await ctx.started(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue