diff --git a/piker/data/feed.py b/piker/data/feed.py index e51cfce5..0d0156b6 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -203,21 +203,26 @@ def diff_history( ) -> np.ndarray: if last_tsdb_dt: - s_diff = (last_tsdb_dt - start_dt).seconds + s_diff = (start_dt - last_tsdb_dt).seconds + + to_push = array[:s_diff] # if we detect a partial frame's worth of data # that is new, slice out only that history and # write to shm. - if s_diff > 0: - assert last_tsdb_dt > start_dt - selected = array['time'] > last_tsdb_dt.timestamp() - to_push = array[selected] + if abs(s_diff) < len(array): log.info( f'Pushing partial frame {to_push.size} to shm' ) - return to_push + # assert last_tsdb_dt > start_dt + # selected = array['time'] > last_tsdb_dt.timestamp() + # to_push = array[selected] + # return to_push - return array + return to_push + + else: + return array async def start_backfill( @@ -226,19 +231,11 @@ async def start_backfill( shm: ShmArray, last_tsdb_dt: Optional[datetime] = None, - # do_legacy: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> int: - # if do_legacy: - # return await mod.backfill_bars( - # bfqsn, - # shm, - # task_status=task_status, - # ) - async with mod.open_history_client(bfqsn) as hist: # get latest query's worth of history all the way @@ -258,23 +255,23 @@ async def start_backfill( for delay_s in sampler.subscribers: await broadcast(delay_s) + bf_done = trio.Event() # let caller unblock and deliver latest history frame - task_status.started(shm) + task_status.started((shm, start_dt, end_dt, bf_done)) if last_tsdb_dt is None: # maybe a better default (they don't seem to define epoch?!) last_tsdb_dt = pendulum.now().subtract(days=1) - # pull new history frames until we hit latest # already in the tsdb or a max count. - mx_fills = 16 + # mx_fills = 16 count = 0 + # while True: while ( - start_dt > last_tsdb_dt + end_dt > last_tsdb_dt # and count < mx_fills ): - # while True: count += 1 array, start_dt, end_dt = await hist(end_dt=start_dt) to_push = diff_history( @@ -282,22 +279,31 @@ async def start_backfill( start_dt, end_dt, - # last_tsdb_dt=last_tsdb_dt, + last_tsdb_dt=last_tsdb_dt, # XXX: hacky, just run indefinitely - last_tsdb_dt=None, + # last_tsdb_dt=None, ) - print("fPULLING {count}") + print(f"PULLING {count}") log.info(f'Pushing {to_push.size} to shm!') + if to_push.size < 1: + break + # bail on shm allocation overrun try: shm.push(to_push, prepend=True) except ValueError: + await tractor.breakpoint() break for delay_s in sampler.subscribers: await broadcast(delay_s) + bf_done.set() + # update start index to include all tsdb history + # that was pushed in the caller parent task. + # shm._first.value = 0 + async def manage_history( mod: ModuleType, @@ -358,7 +364,12 @@ async def manage_history( series, first_dt, last_dt = await storage.load(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn) - await bus.nursery.start( + ( + shm, + latest_start_dt, + latest_end_dt, + bf_done, + ) = await bus.nursery.start( partial( start_backfill, mod, @@ -370,19 +381,37 @@ async def manage_history( task_status.started(shm) some_data_ready.set() + await bf_done.wait() + # do diff against last start frame of history and only fill + # in from the tsdb an allotment that allows for most recent + # to be loaded into mem *before* tsdb data. + if last_dt: + dt_diff_s = (latest_start_dt - last_dt).seconds + else: + dt_diff_s = 0 + + # await trio.sleep_forever() # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field + prepend_start = shm._first.value + + # sanity check on most-recent-data loading + assert prepend_start > dt_diff_s + history = list(series.values()) if history: fastest = history[0] + to_push = fastest[:prepend_start] + shm.push( - fastest[-shm._first.value:], + to_push, # insert the history pre a "days worth" of samples # to leave some real-time buffer space at the end. prepend=True, - # start=shm._len - _secs_in_day, + # update_first=False, + # start=prepend_start, field_map={ 'Epoch': 'time', 'Open': 'open', @@ -392,6 +421,49 @@ async def manage_history( 'Volume': 'volume', }, ) + + # load as much from storage into shm as spacec will + # allow according to user's shm size settings. + count = 0 + end = fastest['Epoch'][0] + + while shm._first.value > 0: + count += 1 + series = await storage.read_ohlcv( + fqsn, + end=end, + ) + history = list(series.values()) + fastest = history[0] + end = fastest['Epoch'][0] + prepend_start -= len(to_push) + to_push = fastest[:prepend_start] + + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map={ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + }, + ) + for delay_s in sampler.subscribers: + await broadcast(delay_s) + + if count > 6: + break + + log.info(f'Loaded {to_push.shape} datums from storage') + # TODO: write new data to tsdb to be ready to for next read. if do_legacy_backfill: @@ -407,7 +479,6 @@ async def manage_history( mod, bfqsn, shm, - # do_legacy=True, ) )