diff --git a/piker/data/feed.py b/piker/data/feed.py index 66b540ee..2400f39d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -29,7 +29,9 @@ from pprint import pformat from types import ModuleType from typing import ( Any, - AsyncIterator, Optional, + AsyncIterator, + Callable, + Optional, Generator, Awaitable, TYPE_CHECKING, @@ -252,6 +254,7 @@ async def start_backfill( mod: ModuleType, bfqsn: str, shm: ShmArray, + timeframe: float, last_tsdb_dt: Optional[datetime] = None, storage: Optional[Storage] = None, @@ -262,11 +265,19 @@ async def start_backfill( ) -> int: + hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ] + config: dict[str, int] async with mod.open_history_client(bfqsn) as (hist, config): # get latest query's worth of history all the way # back to what is recorded in the tsdb - array, start_dt, end_dt = await hist(end_dt=None) + array, start_dt, end_dt = await hist( + timeframe, + end_dt=None, + ) times = array['time'] @@ -304,8 +315,8 @@ async def start_backfill( raise ValueError( '`piker` only needs to support 1m and 1s sampling ' 'but ur api is trying to deliver a longer ' - f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' - 'do dat brudder.' + f'timeframe of {step_size_s} seconds..\n' + 'So yuh.. dun do dat brudder.' ) # when no tsdb "last datum" is provided, we just load @@ -319,7 +330,7 @@ async def start_backfill( # do a decently sized backfill and load it into storage. periods = { 1: {'days': 6}, - 60: {'years': 2}, + 60: {'years': 10}, } kwargs = periods[step_size_s] @@ -390,7 +401,10 @@ async def start_backfill( log.info( f'Requesting {step_size_s}s frame ending in {input_end_dt}' ) - array, start_dt, end_dt = await hist(end_dt=input_end_dt) + array, start_dt, end_dt = await hist( + timeframe, + end_dt=input_end_dt, + ) assert array['time'][0] == start_dt.timestamp() except NoData: @@ -640,6 +654,7 @@ async def start_backfill( await storage.write_ohlcv( f'{bfqsn}.{mod.name}', # lul.. to_push, + timeframe, ) # TODO: can we only trigger this if the respective @@ -660,6 +675,7 @@ async def manage_history( fqsn: str, some_data_ready: trio.Event, feed_is_live: trio.Event, + timeframe: float = 60, # in seconds task_status: TaskStatus = trio.TASK_STATUS_IGNORED, @@ -726,7 +742,10 @@ async def manage_history( # shm backfiller approach below. # start history anal and load missing new data via backend. - series, _, last_tsdb_dt = await storage.load(fqsn) + series, _, last_tsdb_dt = await storage.load( + fqsn, + timeframe=timeframe, + ) broker, symbol, expiry = unpack_fqsn(fqsn) ( @@ -739,6 +758,7 @@ async def manage_history( mod, bfqsn, hist_shm, + timeframe=timeframe, last_tsdb_dt=last_tsdb_dt, tsdb_is_up=True, storage=storage, @@ -804,6 +824,7 @@ async def manage_history( series = await storage.read_ohlcv( fqsn, end=end, + timeframe=timeframe, ) history = list(series.values()) fastest = history[0] @@ -856,6 +877,7 @@ async def manage_history( mod, bfqsn, hist_shm, + timeframe=timeframe, ) )