Fix less-then-frame off by one slice, add db write toggle and disable
parent
d9e2666e80
commit
8c1905e35a
|
@ -223,10 +223,13 @@ def diff_history(
|
||||||
s_diff < 0
|
s_diff < 0
|
||||||
and abs(s_diff) < len(array)
|
and abs(s_diff) < len(array)
|
||||||
):
|
):
|
||||||
|
# the + 1 is because ``last_tsdb_dt`` is pulled from
|
||||||
|
# the last row entry for the ``'time'`` field retreived
|
||||||
|
# from the tsdb.
|
||||||
|
to_push = array[abs(s_diff)+1:]
|
||||||
log.info(
|
log.info(
|
||||||
f'Pushing partial frame {to_push.size} to shm'
|
f'Pushing partial frame {to_push.size} to shm'
|
||||||
)
|
)
|
||||||
to_push = array[abs(s_diff):]
|
|
||||||
|
|
||||||
return to_push
|
return to_push
|
||||||
|
|
||||||
|
@ -238,6 +241,7 @@ async def start_backfill(
|
||||||
|
|
||||||
last_tsdb_dt: Optional[datetime] = None,
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
storage: Optional[Storage] = None,
|
storage: Optional[Storage] = None,
|
||||||
|
write_tsdb: bool = False,
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
@ -285,7 +289,7 @@ async def start_backfill(
|
||||||
# based on the sample step size load a certain amount
|
# based on the sample step size load a certain amount
|
||||||
# history
|
# history
|
||||||
if step_size_s == 1:
|
if step_size_s == 1:
|
||||||
last_tsdb_dt = pendulum.now().subtract(days=6)
|
last_tsdb_dt = pendulum.now().subtract(days=2)
|
||||||
|
|
||||||
elif step_size_s == 60:
|
elif step_size_s == 60:
|
||||||
last_tsdb_dt = pendulum.now().subtract(years=2)
|
last_tsdb_dt = pendulum.now().subtract(years=2)
|
||||||
|
@ -368,7 +372,7 @@ async def start_backfill(
|
||||||
|
|
||||||
except NoData:
|
except NoData:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?'
|
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?'
|
||||||
)
|
)
|
||||||
return None # discard signal
|
return None # discard signal
|
||||||
|
|
||||||
|
@ -425,10 +429,9 @@ async def start_backfill(
|
||||||
# gen already terminated meaning we probably already
|
# gen already terminated meaning we probably already
|
||||||
# exhausted it via frame requests.
|
# exhausted it via frame requests.
|
||||||
log.info(
|
log.info(
|
||||||
f"Datetime index already exhausted, can't reset.."
|
"Datetime index already exhausted, can't reset.."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
@ -557,7 +560,10 @@ async def start_backfill(
|
||||||
# frame-result order.
|
# frame-result order.
|
||||||
earliest_end_dt = start_dt
|
earliest_end_dt = start_dt
|
||||||
|
|
||||||
if storage is not None:
|
if (
|
||||||
|
storage is not None
|
||||||
|
and write_tsdb
|
||||||
|
):
|
||||||
log.info(
|
log.info(
|
||||||
f'Writing {ln} frame to storage:\n'
|
f'Writing {ln} frame to storage:\n'
|
||||||
f'{start_dt} -> {end_dt}'
|
f'{start_dt} -> {end_dt}'
|
||||||
|
|
Loading…
Reference in New Issue