Pass in default history time of 1 min

Adjust all history query machinery to pass a `timeframe: int` in seconds
and set default of 60 (aka 1m) such that history views from here forward
will be 1m sampled OHLCV. Further when the tsdb is detected as up load
a full 10 years of data if possible on the 1m - backends will eventually
get a config section (`brokers.toml`) that allow user's to tune this.
clears_table_events
Tyler Goodlet 2022-09-15 14:07:06 -04:00
parent 9270391e01
commit 7f498766af
1 changed files with 29 additions and 7 deletions

View File

@ -29,7 +29,9 @@ from pprint import pformat
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncIterator, Optional, AsyncIterator,
Callable,
Optional,
Generator, Generator,
Awaitable, Awaitable,
TYPE_CHECKING, TYPE_CHECKING,
@ -252,6 +254,7 @@ async def start_backfill(
mod: ModuleType, mod: ModuleType,
bfqsn: str, bfqsn: str,
shm: ShmArray, shm: ShmArray,
timeframe: float,
last_tsdb_dt: Optional[datetime] = None, last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None, storage: Optional[Storage] = None,
@ -262,11 +265,19 @@ async def start_backfill(
) -> int: ) -> int:
hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(bfqsn) as (hist, config): async with mod.open_history_client(bfqsn) as (hist, config):
# get latest query's worth of history all the way # get latest query's worth of history all the way
# back to what is recorded in the tsdb # back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt=None) array, start_dt, end_dt = await hist(
timeframe,
end_dt=None,
)
times = array['time'] times = array['time']
@ -304,8 +315,8 @@ async def start_backfill(
raise ValueError( raise ValueError(
'`piker` only needs to support 1m and 1s sampling ' '`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer ' 'but ur api is trying to deliver a longer '
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' f'timeframe of {step_size_s} seconds..\n'
'do dat brudder.' 'So yuh.. dun do dat brudder.'
) )
# when no tsdb "last datum" is provided, we just load # when no tsdb "last datum" is provided, we just load
@ -319,7 +330,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage. # do a decently sized backfill and load it into storage.
periods = { periods = {
1: {'days': 6}, 1: {'days': 6},
60: {'years': 2}, 60: {'years': 10},
} }
kwargs = periods[step_size_s] kwargs = periods[step_size_s]
@ -390,7 +401,10 @@ async def start_backfill(
log.info( log.info(
f'Requesting {step_size_s}s frame ending in {input_end_dt}' f'Requesting {step_size_s}s frame ending in {input_end_dt}'
) )
array, start_dt, end_dt = await hist(end_dt=input_end_dt) array, start_dt, end_dt = await hist(
timeframe,
end_dt=input_end_dt,
)
assert array['time'][0] == start_dt.timestamp() assert array['time'][0] == start_dt.timestamp()
except NoData: except NoData:
@ -640,6 +654,7 @@ async def start_backfill(
await storage.write_ohlcv( await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul.. f'{bfqsn}.{mod.name}', # lul..
to_push, to_push,
timeframe,
) )
# TODO: can we only trigger this if the respective # TODO: can we only trigger this if the respective
@ -660,6 +675,7 @@ async def manage_history(
fqsn: str, fqsn: str,
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
@ -726,7 +742,10 @@ async def manage_history(
# shm backfiller approach below. # shm backfiller approach below.
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
series, _, last_tsdb_dt = await storage.load(fqsn) series, _, last_tsdb_dt = await storage.load(
fqsn,
timeframe=timeframe,
)
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
( (
@ -739,6 +758,7 @@ async def manage_history(
mod, mod,
bfqsn, bfqsn,
hist_shm, hist_shm,
timeframe=timeframe,
last_tsdb_dt=last_tsdb_dt, last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True, tsdb_is_up=True,
storage=storage, storage=storage,
@ -804,6 +824,7 @@ async def manage_history(
series = await storage.read_ohlcv( series = await storage.read_ohlcv(
fqsn, fqsn,
end=end, end=end,
timeframe=timeframe,
) )
history = list(series.values()) history = list(series.values())
fastest = history[0] fastest = history[0]
@ -856,6 +877,7 @@ async def manage_history(
mod, mod,
bfqsn, bfqsn,
hist_shm, hist_shm,
timeframe=timeframe,
) )
) )