Add "no-tsdb-found" history load length defaults

pre_flow
Tyler Goodlet 2022-05-15 13:36:08 -04:00
parent 7555cb4318
commit 54ae86544a
1 changed files with 28 additions and 19 deletions

View File

@ -227,7 +227,7 @@ def diff_history(
# the + 1 is because ``last_tsdb_dt`` is pulled from
# the last row entry for the ``'time'`` field retreived
# from the tsdb.
to_push = array[abs(s_diff)+1:]
to_push = array[abs(s_diff) + 1:]
else:
# pass back only the portion of the array that is
@ -250,6 +250,7 @@ async def start_backfill(
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
write_tsdb: bool = True,
tsdb_is_up: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -265,8 +266,8 @@ async def start_backfill(
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
pendulum.from_timestamp(times[-1])
- pendulum.from_timestamp(times[-2])
).seconds
# "frame"'s worth of sample period steps in seconds
@ -291,25 +292,33 @@ async def start_backfill(
# let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done))
# based on the sample step size, maybe load a certain amount history
if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!)
# based on the sample step size load a certain amount
# history
if step_size_s == 1:
last_tsdb_dt = pendulum.now().subtract(days=2)
elif step_size_s == 60:
last_tsdb_dt = pendulum.now().subtract(years=2)
else:
if step_size_s not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun '
'do dat bruh.'
'do dat brudder.'
)
# when no tsdb "last datum" is provided, we just load
# some near-term history.
periods = {
1: {'days': 1},
60: {'days': 14},
}
if tsdb_is_up:
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
60: {'years': 2},
}
kwargs = periods[step_size_s]
last_tsdb_dt = start_dt.subtract(**kwargs)
# configure async query throttling
erlangs = config.get('erlangs', 1)
rate = config.get('rate', 1)
@ -567,8 +576,8 @@ async def start_backfill(
start_dt,
end_dt,
) = await get_ohlc_frame(
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
)
last_epoch = to_push['time'][-1]
diff = start - last_epoch
@ -1002,7 +1011,7 @@ async def open_feed_bus(
brokername: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
) -> None:
@ -1263,7 +1272,7 @@ async def install_brokerd_search(
# a backend module?
pause_period=getattr(
brokermod, '_search_conf', {}
).get('pause_period', 0.0616),
).get('pause_period', 0.0616),
):
yield