diff --git a/piker/data/feed.py b/piker/data/feed.py index c7041135..c2fbbd5e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -333,7 +333,7 @@ async def start_backfill( # do a decently sized backfill and load it into storage. periods = { 1: {'days': 6}, - 60: {'years': 10}, + 60: {'years': 6}, } kwargs = periods[step_size_s] @@ -348,36 +348,45 @@ async def start_backfill( # last retrieved start dt to the next request as # it's end dt. starts: set[datetime] = set() - while start_dt > last_tsdb_dt: - - print(f"QUERY end_dt={start_dt}") try: log.info( f'Requesting {step_size_s}s frame ending in {start_dt}' ) - array, start_dt, end_dt = await hist( + array, next_start_dt, end_dt = await hist( timeframe, end_dt=start_dt, ) + + if next_start_dt in starts: + start_dt = min(starts) + print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") + continue + + # only update new start point if new + start_dt = next_start_dt + starts.add(start_dt) + assert array['time'][0] == start_dt.timestamp() except NoData: + # XXX: unhandled history gap (shouldn't happen?) log.warning( f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' ) - return None # discard signal + await tractor.breakpoint() - except DataUnavailable as duerr: - # broker is being a bish and we can't pull - # any more.. - log.warning('backend halted on data deliver !?!?') + except DataUnavailable: # as duerr: + # broker is being a bish and we can't pull any more.. + log.warning( + f'NO-MORE-DATA: backend {mod.name} halted history!?' + ) # ugh, what's a better way? # TODO: fwiw, we probably want a way to signal a throttle # condition (eg. with ib) so that we can halt the # request loop until the condition is resolved? - return duerr + return diff = end_dt - start_dt frame_time_diff_s = diff.seconds @@ -394,22 +403,6 @@ async def start_backfill( f'{diff} ~= {frame_time_diff_s} seconds' ) - array, _start_dt, end_dt = await hist( - timeframe, - end_dt=start_dt, - ) - - if ( - _start_dt in starts - ): - print("SKIPPING DUPLICATE FRAME @ {_start_dt}") - start_dt = min(starts) - continue - - # only update new start point if new - start_dt = _start_dt - starts.add(start_dt) - to_push = diff_history( array, start_dt,