From 8e11d79712fc745c1ce37ad247bc755730aebb2a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Apr 2022 17:13:15 -0400 Subject: [PATCH] Drop legacy back-filling logic Use the new `open_history_client()` endpoint/API and expect backends to provide a history "getter" routine that can be called to load historical data into shm even when **not** using a tsdb. Add logic for filling in data from the tsdb once the backend has provided data up to the last recorded in the db. Add logic for avoiding overruns of the shm buffer with more-then-necessary queries of tsdb data. --- piker/data/feed.py | 125 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 98 insertions(+), 27 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index e51cfce5..0d0156b6 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -203,21 +203,26 @@ def diff_history( ) -> np.ndarray: if last_tsdb_dt: - s_diff = (last_tsdb_dt - start_dt).seconds + s_diff = (start_dt - last_tsdb_dt).seconds + + to_push = array[:s_diff] # if we detect a partial frame's worth of data # that is new, slice out only that history and # write to shm. - if s_diff > 0: - assert last_tsdb_dt > start_dt - selected = array['time'] > last_tsdb_dt.timestamp() - to_push = array[selected] + if abs(s_diff) < len(array): log.info( f'Pushing partial frame {to_push.size} to shm' ) - return to_push + # assert last_tsdb_dt > start_dt + # selected = array['time'] > last_tsdb_dt.timestamp() + # to_push = array[selected] + # return to_push - return array + return to_push + + else: + return array async def start_backfill( @@ -226,19 +231,11 @@ async def start_backfill( shm: ShmArray, last_tsdb_dt: Optional[datetime] = None, - # do_legacy: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> int: - # if do_legacy: - # return await mod.backfill_bars( - # bfqsn, - # shm, - # task_status=task_status, - # ) - async with mod.open_history_client(bfqsn) as hist: # get latest query's worth of history all the way @@ -258,23 +255,23 @@ async def start_backfill( for delay_s in sampler.subscribers: await broadcast(delay_s) + bf_done = trio.Event() # let caller unblock and deliver latest history frame - task_status.started(shm) + task_status.started((shm, start_dt, end_dt, bf_done)) if last_tsdb_dt is None: # maybe a better default (they don't seem to define epoch?!) last_tsdb_dt = pendulum.now().subtract(days=1) - # pull new history frames until we hit latest # already in the tsdb or a max count. - mx_fills = 16 + # mx_fills = 16 count = 0 + # while True: while ( - start_dt > last_tsdb_dt + end_dt > last_tsdb_dt # and count < mx_fills ): - # while True: count += 1 array, start_dt, end_dt = await hist(end_dt=start_dt) to_push = diff_history( @@ -282,22 +279,31 @@ async def start_backfill( start_dt, end_dt, - # last_tsdb_dt=last_tsdb_dt, + last_tsdb_dt=last_tsdb_dt, # XXX: hacky, just run indefinitely - last_tsdb_dt=None, + # last_tsdb_dt=None, ) - print("fPULLING {count}") + print(f"PULLING {count}") log.info(f'Pushing {to_push.size} to shm!') + if to_push.size < 1: + break + # bail on shm allocation overrun try: shm.push(to_push, prepend=True) except ValueError: + await tractor.breakpoint() break for delay_s in sampler.subscribers: await broadcast(delay_s) + bf_done.set() + # update start index to include all tsdb history + # that was pushed in the caller parent task. + # shm._first.value = 0 + async def manage_history( mod: ModuleType, @@ -358,7 +364,12 @@ async def manage_history( series, first_dt, last_dt = await storage.load(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn) - await bus.nursery.start( + ( + shm, + latest_start_dt, + latest_end_dt, + bf_done, + ) = await bus.nursery.start( partial( start_backfill, mod, @@ -370,19 +381,37 @@ async def manage_history( task_status.started(shm) some_data_ready.set() + await bf_done.wait() + # do diff against last start frame of history and only fill + # in from the tsdb an allotment that allows for most recent + # to be loaded into mem *before* tsdb data. + if last_dt: + dt_diff_s = (latest_start_dt - last_dt).seconds + else: + dt_diff_s = 0 + + # await trio.sleep_forever() # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field + prepend_start = shm._first.value + + # sanity check on most-recent-data loading + assert prepend_start > dt_diff_s + history = list(series.values()) if history: fastest = history[0] + to_push = fastest[:prepend_start] + shm.push( - fastest[-shm._first.value:], + to_push, # insert the history pre a "days worth" of samples # to leave some real-time buffer space at the end. prepend=True, - # start=shm._len - _secs_in_day, + # update_first=False, + # start=prepend_start, field_map={ 'Epoch': 'time', 'Open': 'open', @@ -392,6 +421,49 @@ async def manage_history( 'Volume': 'volume', }, ) + + # load as much from storage into shm as spacec will + # allow according to user's shm size settings. + count = 0 + end = fastest['Epoch'][0] + + while shm._first.value > 0: + count += 1 + series = await storage.read_ohlcv( + fqsn, + end=end, + ) + history = list(series.values()) + fastest = history[0] + end = fastest['Epoch'][0] + prepend_start -= len(to_push) + to_push = fastest[:prepend_start] + + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map={ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + }, + ) + for delay_s in sampler.subscribers: + await broadcast(delay_s) + + if count > 6: + break + + log.info(f'Loaded {to_push.shape} datums from storage') + # TODO: write new data to tsdb to be ready to for next read. if do_legacy_backfill: @@ -407,7 +479,6 @@ async def manage_history( mod, bfqsn, shm, - # do_legacy=True, ) )