diff --git a/piker/data/feed.py b/piker/data/feed.py index 672974e6..09dce25e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -208,39 +208,50 @@ async def _setup_persistent_brokerd( def diff_history( - array, - start_dt, - end_dt, - last_tsdb_dt: Optional[datetime] = None + array: np.ndarray, + timeframe: int, + start_dt: datetime, + end_dt: datetime, + last_tsdb_dt: datetime | None = None ) -> np.ndarray: + # no diffing with tsdb dt index possible.. + if last_tsdb_dt is None: + return array + + time_attr = { + 1: 'seconds', + 60: 'minutes', + }[timeframe] + i_diff = getattr((end_dt - last_tsdb_dt), time_attr) + + # if we detect a partial frame's worth of data + # that is new, slice out only that history and + # write to shm. + if i_diff < 0: + # empty case since tsdb already has this history + return array[:0] + + ln = len(array) to_push = array - if last_tsdb_dt: - s_diff = (start_dt - last_tsdb_dt).seconds + if i_diff < ln: + # slice out missing history from end of frame + to_push = array[ln - i_diff:ln] - # if we detect a partial frame's worth of data - # that is new, slice out only that history and - # write to shm. - if ( - s_diff < 0 - ): - if abs(s_diff) < len(array): - # the + 1 is because ``last_tsdb_dt`` is pulled from - # the last row entry for the ``'time'`` field retreived - # from the tsdb. - to_push = array[abs(s_diff) + 1:] + # XXX: OLD GAP HANDLING..if there's a gap in a partial frame + # worth. we don't need this any more with the timeframe aware + # diffing above right? + # else: + # # pass back only the portion of the array that is + # # greater then the last time stamp in the tsdb. + # time = array['time'] + # to_push = array[time >= last_tsdb_dt.timestamp()] - else: - # pass back only the portion of the array that is - # greater then the last time stamp in the tsdb. - time = array['time'] - to_push = array[time >= last_tsdb_dt.timestamp()] - - log.debug( - f'Pushing partial frame {to_push.size} to shm' - ) + log.debug( + f'Pushing partial frame {to_push.size} to shm' + ) return to_push @@ -286,6 +297,7 @@ async def start_backfill( to_push = diff_history( array, + timeframe, start_dt, end_dt, last_tsdb_dt=last_tsdb_dt, @@ -334,12 +346,12 @@ async def start_backfill( 60: {'years': 6}, } - kwargs = periods[step_size_s] + period_duration = periods[step_size_s] # NOTE: manually set the "latest" datetime which we intend to # backfill history "until" so as to adhere to the history # settings above when the tsdb is detected as being empty. - last_tsdb_dt = start_dt.subtract(**kwargs) + last_tsdb_dt = start_dt.subtract(**period_duration) # configure async query throttling # rate = config.get('rate', 1) @@ -353,7 +365,7 @@ async def start_backfill( # inline sequential loop where we simply pass the # last retrieved start dt to the next request as # it's end dt. - while start_dt > last_tsdb_dt: + while end_dt > last_tsdb_dt: log.debug( f'Requesting {step_size_s}s frame ending in {start_dt}' ) @@ -363,6 +375,8 @@ async def start_backfill( timeframe, end_dt=start_dt, ) + # if timeframe == 1: + # await tractor.breakpoint() # broker says there never was or is no more history to pull except DataUnavailable: @@ -404,6 +418,7 @@ async def start_backfill( to_push = diff_history( array, + timeframe, start_dt, end_dt, last_tsdb_dt=last_tsdb_dt, @@ -424,6 +439,9 @@ async def start_backfill( log.info( f'Shm buffer overrun on: {start_dt} -> {end_dt}?' ) + # can't push the entire frame? so + # push only the amount that can fit.. + await tractor.breakpoint() break log.info( @@ -510,6 +528,8 @@ async def tsdb_backfill( # start history anal and load missing new data via backend. for timeframe, shm in shms.items(): + # loads a (large) frame of data from the tsdb depending + # on the db's query size limit. tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( fqsn, timeframe=timeframe, @@ -586,29 +606,45 @@ async def tsdb_backfill( if bf_done: await bf_done.wait() - # Load tsdb history into shm buffer (for display). - # TODO: eventually it'd be nice to not require a shm array/buffer # to accomplish this.. maybe we can do some kind of tsdb direct to # graphics format eventually in a child-actor? - # do diff against last start frame of history and only fill - # in from the tsdb an allotment that allows for most recent - # to be loaded into mem *before* tsdb data. - if last_tsdb_dt and latest_start_dt: - dt_diff_s = ( - latest_start_dt - last_tsdb_dt - ).seconds - else: - dt_diff_s = 0 - # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field prepend_start = shm._first.value + shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) - # sanity check on most-recent-data loading - assert prepend_start > dt_diff_s + if last_tsdb_dt: + assert shm_last_dt >= last_tsdb_dt + + # do diff against start index of last frame of history and only + # fill in an amount of datums from tsdb allows for most recent + # to be loaded into mem *before* tsdb data. + if ( + last_tsdb_dt + and latest_start_dt + ): + backfilled_size_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + else: + backfilled_size_s = ( + latest_start_dt - shm_last_dt + ).seconds + + # Load TSDB history into shm buffer (for display) if there is + # remaining buffer space. + + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s data loaded' + ) if ( len(tsdb_history) @@ -1266,7 +1302,6 @@ async def open_feed_bus( # expected to append it's own name to the fqsn, so we filter # on keys which *do not* include that name (e.g .ib) . bus._subscribers.setdefault(bfqsn, []) - # await tractor.breakpoint() # sync feed subscribers with flume handles await ctx.started(