Add "no-tsdb-found" history load length defaults
parent
09e988ec3e
commit
5d26609693
|
@ -228,7 +228,7 @@ def diff_history(
|
||||||
# the + 1 is because ``last_tsdb_dt`` is pulled from
|
# the + 1 is because ``last_tsdb_dt`` is pulled from
|
||||||
# the last row entry for the ``'time'`` field retreived
|
# the last row entry for the ``'time'`` field retreived
|
||||||
# from the tsdb.
|
# from the tsdb.
|
||||||
to_push = array[abs(s_diff)+1:]
|
to_push = array[abs(s_diff) + 1:]
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# pass back only the portion of the array that is
|
# pass back only the portion of the array that is
|
||||||
|
@ -251,6 +251,7 @@ async def start_backfill(
|
||||||
last_tsdb_dt: Optional[datetime] = None,
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
storage: Optional[Storage] = None,
|
storage: Optional[Storage] = None,
|
||||||
write_tsdb: bool = True,
|
write_tsdb: bool = True,
|
||||||
|
tsdb_is_up: bool = False,
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
@ -266,8 +267,8 @@ async def start_backfill(
|
||||||
|
|
||||||
# sample period step size in seconds
|
# sample period step size in seconds
|
||||||
step_size_s = (
|
step_size_s = (
|
||||||
pendulum.from_timestamp(times[-1]) -
|
pendulum.from_timestamp(times[-1])
|
||||||
pendulum.from_timestamp(times[-2])
|
- pendulum.from_timestamp(times[-2])
|
||||||
).seconds
|
).seconds
|
||||||
|
|
||||||
# "frame"'s worth of sample period steps in seconds
|
# "frame"'s worth of sample period steps in seconds
|
||||||
|
@ -292,25 +293,33 @@ async def start_backfill(
|
||||||
# let caller unblock and deliver latest history frame
|
# let caller unblock and deliver latest history frame
|
||||||
task_status.started((shm, start_dt, end_dt, bf_done))
|
task_status.started((shm, start_dt, end_dt, bf_done))
|
||||||
|
|
||||||
|
# based on the sample step size, maybe load a certain amount history
|
||||||
if last_tsdb_dt is None:
|
if last_tsdb_dt is None:
|
||||||
# maybe a better default (they don't seem to define epoch?!)
|
if step_size_s not in (1, 60):
|
||||||
|
|
||||||
# based on the sample step size load a certain amount
|
|
||||||
# history
|
|
||||||
if step_size_s == 1:
|
|
||||||
last_tsdb_dt = pendulum.now().subtract(days=2)
|
|
||||||
|
|
||||||
elif step_size_s == 60:
|
|
||||||
last_tsdb_dt = pendulum.now().subtract(years=2)
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'`piker` only needs to support 1m and 1s sampling '
|
'`piker` only needs to support 1m and 1s sampling '
|
||||||
'but ur api is trying to deliver a longer '
|
'but ur api is trying to deliver a longer '
|
||||||
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun '
|
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun '
|
||||||
'do dat bruh.'
|
'do dat brudder.'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# when no tsdb "last datum" is provided, we just load
|
||||||
|
# some near-term history.
|
||||||
|
periods = {
|
||||||
|
1: {'days': 1},
|
||||||
|
60: {'days': 14},
|
||||||
|
}
|
||||||
|
|
||||||
|
if tsdb_is_up:
|
||||||
|
# do a decently sized backfill and load it into storage.
|
||||||
|
periods = {
|
||||||
|
1: {'days': 6},
|
||||||
|
60: {'years': 2},
|
||||||
|
}
|
||||||
|
|
||||||
|
kwargs = periods[step_size_s]
|
||||||
|
last_tsdb_dt = start_dt.subtract(**kwargs)
|
||||||
|
|
||||||
# configure async query throttling
|
# configure async query throttling
|
||||||
erlangs = config.get('erlangs', 1)
|
erlangs = config.get('erlangs', 1)
|
||||||
rate = config.get('rate', 1)
|
rate = config.get('rate', 1)
|
||||||
|
@ -568,8 +577,8 @@ async def start_backfill(
|
||||||
start_dt,
|
start_dt,
|
||||||
end_dt,
|
end_dt,
|
||||||
) = await get_ohlc_frame(
|
) = await get_ohlc_frame(
|
||||||
input_end_dt=last_shm_prepend_dt,
|
input_end_dt=last_shm_prepend_dt,
|
||||||
iter_dts_gen=idts,
|
iter_dts_gen=idts,
|
||||||
)
|
)
|
||||||
last_epoch = to_push['time'][-1]
|
last_epoch = to_push['time'][-1]
|
||||||
diff = start - last_epoch
|
diff = start - last_epoch
|
||||||
|
@ -1003,7 +1012,7 @@ async def open_feed_bus(
|
||||||
brokername: str,
|
brokername: str,
|
||||||
symbol: str, # normally expected to the broker-specific fqsn
|
symbol: str, # normally expected to the broker-specific fqsn
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
tick_throttle: Optional[float] = None,
|
tick_throttle: Optional[float] = None,
|
||||||
start_stream: bool = True,
|
start_stream: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -1264,7 +1273,7 @@ async def install_brokerd_search(
|
||||||
# a backend module?
|
# a backend module?
|
||||||
pause_period=getattr(
|
pause_period=getattr(
|
||||||
brokermod, '_search_conf', {}
|
brokermod, '_search_conf', {}
|
||||||
).get('pause_period', 0.0616),
|
).get('pause_period', 0.0616),
|
||||||
):
|
):
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue