Drop duplicate frame request
Must have gotten left in during refactor from the `trimeter` version? Drop down to 6 years for 1m sampling.clears_table_events
parent
96f5a8abb8
commit
69be65237f
|
@ -333,7 +333,7 @@ async def start_backfill(
|
||||||
# do a decently sized backfill and load it into storage.
|
# do a decently sized backfill and load it into storage.
|
||||||
periods = {
|
periods = {
|
||||||
1: {'days': 6},
|
1: {'days': 6},
|
||||||
60: {'years': 10},
|
60: {'years': 6},
|
||||||
}
|
}
|
||||||
|
|
||||||
kwargs = periods[step_size_s]
|
kwargs = periods[step_size_s]
|
||||||
|
@ -348,36 +348,45 @@ async def start_backfill(
|
||||||
# last retrieved start dt to the next request as
|
# last retrieved start dt to the next request as
|
||||||
# it's end dt.
|
# it's end dt.
|
||||||
starts: set[datetime] = set()
|
starts: set[datetime] = set()
|
||||||
|
|
||||||
while start_dt > last_tsdb_dt:
|
while start_dt > last_tsdb_dt:
|
||||||
|
|
||||||
print(f"QUERY end_dt={start_dt}")
|
|
||||||
try:
|
try:
|
||||||
log.info(
|
log.info(
|
||||||
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
||||||
)
|
)
|
||||||
array, start_dt, end_dt = await hist(
|
array, next_start_dt, end_dt = await hist(
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=start_dt,
|
end_dt=start_dt,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if next_start_dt in starts:
|
||||||
|
start_dt = min(starts)
|
||||||
|
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# only update new start point if new
|
||||||
|
start_dt = next_start_dt
|
||||||
|
starts.add(start_dt)
|
||||||
|
|
||||||
assert array['time'][0] == start_dt.timestamp()
|
assert array['time'][0] == start_dt.timestamp()
|
||||||
|
|
||||||
except NoData:
|
except NoData:
|
||||||
|
# XXX: unhandled history gap (shouldn't happen?)
|
||||||
log.warning(
|
log.warning(
|
||||||
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
||||||
)
|
)
|
||||||
return None # discard signal
|
await tractor.breakpoint()
|
||||||
|
|
||||||
except DataUnavailable as duerr:
|
except DataUnavailable: # as duerr:
|
||||||
# broker is being a bish and we can't pull
|
# broker is being a bish and we can't pull any more..
|
||||||
# any more..
|
log.warning(
|
||||||
log.warning('backend halted on data deliver !?!?')
|
f'NO-MORE-DATA: backend {mod.name} halted history!?'
|
||||||
|
)
|
||||||
|
|
||||||
# ugh, what's a better way?
|
# ugh, what's a better way?
|
||||||
# TODO: fwiw, we probably want a way to signal a throttle
|
# TODO: fwiw, we probably want a way to signal a throttle
|
||||||
# condition (eg. with ib) so that we can halt the
|
# condition (eg. with ib) so that we can halt the
|
||||||
# request loop until the condition is resolved?
|
# request loop until the condition is resolved?
|
||||||
return duerr
|
return
|
||||||
|
|
||||||
diff = end_dt - start_dt
|
diff = end_dt - start_dt
|
||||||
frame_time_diff_s = diff.seconds
|
frame_time_diff_s = diff.seconds
|
||||||
|
@ -394,22 +403,6 @@ async def start_backfill(
|
||||||
f'{diff} ~= {frame_time_diff_s} seconds'
|
f'{diff} ~= {frame_time_diff_s} seconds'
|
||||||
)
|
)
|
||||||
|
|
||||||
array, _start_dt, end_dt = await hist(
|
|
||||||
timeframe,
|
|
||||||
end_dt=start_dt,
|
|
||||||
)
|
|
||||||
|
|
||||||
if (
|
|
||||||
_start_dt in starts
|
|
||||||
):
|
|
||||||
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
|
|
||||||
start_dt = min(starts)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# only update new start point if new
|
|
||||||
start_dt = _start_dt
|
|
||||||
starts.add(start_dt)
|
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
|
Loading…
Reference in New Issue