From 5d2660969354af953f99cdb52c8c319e774eaa6b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 May 2022 13:36:08 -0400 Subject: [PATCH] Add "no-tsdb-found" history load length defaults --- piker/data/feed.py | 47 +++++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 848fcc10..d5e5d3b3 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -228,7 +228,7 @@ def diff_history( # the + 1 is because ``last_tsdb_dt`` is pulled from # the last row entry for the ``'time'`` field retreived # from the tsdb. - to_push = array[abs(s_diff)+1:] + to_push = array[abs(s_diff) + 1:] else: # pass back only the portion of the array that is @@ -251,6 +251,7 @@ async def start_backfill( last_tsdb_dt: Optional[datetime] = None, storage: Optional[Storage] = None, write_tsdb: bool = True, + tsdb_is_up: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -266,8 +267,8 @@ async def start_backfill( # sample period step size in seconds step_size_s = ( - pendulum.from_timestamp(times[-1]) - - pendulum.from_timestamp(times[-2]) + pendulum.from_timestamp(times[-1]) + - pendulum.from_timestamp(times[-2]) ).seconds # "frame"'s worth of sample period steps in seconds @@ -292,25 +293,33 @@ async def start_backfill( # let caller unblock and deliver latest history frame task_status.started((shm, start_dt, end_dt, bf_done)) + # based on the sample step size, maybe load a certain amount history if last_tsdb_dt is None: - # maybe a better default (they don't seem to define epoch?!) - - # based on the sample step size load a certain amount - # history - if step_size_s == 1: - last_tsdb_dt = pendulum.now().subtract(days=2) - - elif step_size_s == 60: - last_tsdb_dt = pendulum.now().subtract(years=2) - - else: + if step_size_s not in (1, 60): raise ValueError( '`piker` only needs to support 1m and 1s sampling ' 'but ur api is trying to deliver a longer ' f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' - 'do dat bruh.' + 'do dat brudder.' ) + # when no tsdb "last datum" is provided, we just load + # some near-term history. + periods = { + 1: {'days': 1}, + 60: {'days': 14}, + } + + if tsdb_is_up: + # do a decently sized backfill and load it into storage. + periods = { + 1: {'days': 6}, + 60: {'years': 2}, + } + + kwargs = periods[step_size_s] + last_tsdb_dt = start_dt.subtract(**kwargs) + # configure async query throttling erlangs = config.get('erlangs', 1) rate = config.get('rate', 1) @@ -568,8 +577,8 @@ async def start_backfill( start_dt, end_dt, ) = await get_ohlc_frame( - input_end_dt=last_shm_prepend_dt, - iter_dts_gen=idts, + input_end_dt=last_shm_prepend_dt, + iter_dts_gen=idts, ) last_epoch = to_push['time'][-1] diff = start - last_epoch @@ -1003,7 +1012,7 @@ async def open_feed_bus( brokername: str, symbol: str, # normally expected to the broker-specific fqsn loglevel: str, - tick_throttle: Optional[float] = None, + tick_throttle: Optional[float] = None, start_stream: bool = True, ) -> None: @@ -1264,7 +1273,7 @@ async def install_brokerd_search( # a backend module? pause_period=getattr( brokermod, '_search_conf', {} - ).get('pause_period', 0.0616), + ).get('pause_period', 0.0616), ): yield