From 727d3cc027547289ef17c7a7bc1191e4c0b7b7ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Apr 2022 15:13:07 -0400 Subject: [PATCH] Unify backfilling logic into common task-routine --- piker/data/feed.py | 235 +++++++++++++++++++++++---------------------- 1 file changed, 122 insertions(+), 113 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 8cb89d81..0a24f747 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -32,7 +32,6 @@ from typing import ( Awaitable, ) -import pendulum import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus @@ -194,20 +193,97 @@ async def _setup_persistent_brokerd( await trio.sleep_forever() +def diff_history( + array, + start_dt, + end_dt, + last_tsdb_dt: Optional[datetime] = None + +) -> np.ndarray: + + if last_tsdb_dt: + s_diff = (last_tsdb_dt - start_dt).seconds + + # if we detect a partial frame's worth of data + # that is new, slice out only that history and + # write to shm. + if s_diff > 0: + assert last_tsdb_dt > start_dt + selected = array['time'] > last_tsdb_dt.timestamp() + to_push = array[selected] + log.info( + f'Pushing partial frame {to_push.size} to shm' + ) + return to_push + + return array + + async def start_backfill( mod: ModuleType, - fqsn: str, + bfqsn: str, shm: ShmArray, + last_tsdb_dt: Optional[datetime] = None, + do_legacy: bool = False, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> int: - return await mod.backfill_bars( - fqsn, - shm, - task_status=task_status, - ) + if do_legacy: + return await mod.backfill_bars( + bfqsn, + shm, + task_status=task_status, + ) + + async with mod.open_history_client(bfqsn) as hist: + + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + array, start_dt, end_dt = await hist(end_dt=None) + + to_push = diff_history( + array, + start_dt, + end_dt, + last_tsdb_dt=last_tsdb_dt, + ) + + log.info(f'Pushing {to_push.size} to shm!') + shm.push(to_push) + + for delay_s in sampler.subscribers: + await broadcast(delay_s) + + # let caller unblock and deliver latest history frame + task_status.started(shm) + + # pull new history frames until we hit latest + # already in the tsdb + # while start_dt > last_tsdb_dt: + while True: + array, start_dt, end_dt = await hist(end_dt=start_dt) + to_push = diff_history( + array, + start_dt, + end_dt, + + # last_tsdb_dt=last_tsdb_dt, + # XXX: hacky, just run indefinitely + last_tsdb_dt=None, + ) + log.info(f'Pushing {to_push.size} to shm!') + + # bail on shm allocation overrun + try: + shm.push(to_push, prepend=True) + except ValueError: + break + + for delay_s in sampler.subscribers: + await broadcast(delay_s) async def manage_history( @@ -251,127 +327,59 @@ async def manage_history( # for now only do backfilling if no tsdb can be found do_legacy_backfill = not is_up and opened - open_history_client = getattr(mod, 'open_history_client', None) - bfqsn = fqsn.replace('.' + mod.name, '') + open_history_client = getattr(mod, 'open_history_client', None) if is_up and opened and open_history_client: log.info('Found existing `marketstored`') from . import marketstore - async with marketstore.open_storage_client( fqsn, ) as storage: - tsdb_arrays = await storage.read_ohlcv(fqsn) + # TODO: this should be used verbatim for the pure + # shm backfiller approach below. - if not tsdb_arrays: - do_legacy_backfill = True + # start history anal and load missing new data via backend. + series, first_dt, last_dt = await storage.load(fqsn) - else: - log.info(f'Loaded tsdb history {tsdb_arrays}') - - fastest = list(tsdb_arrays.values())[0] - times = fastest['Epoch'] - first, last = times[0], times[-1] - first_tsdb_dt, last_tsdb_dt = map( - pendulum.from_timestamp, [first, last] + broker, symbol, expiry = unpack_fqsn(fqsn) + await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + last_tsdb_dt=last_dt, ) + ) + task_status.started(shm) + some_data_ready.set() - # TODO: this should be used verbatim for the pure - # shm backfiller approach below. + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + history = list(series.values()) + if history: + fastest = history[0] + shm.push( + fastest[-shm._first.value:], - def diff_history( - array, - start_dt, - end_dt, - last_tsdb_dt: Optional[datetime] = None - - ) -> np.ndarray: - - if last_tsdb_dt: - s_diff = (last_tsdb_dt - start_dt).seconds - - # if we detect a partial frame's worth of data - # that is new, slice out only that history and - # write to shm. - if s_diff > 0: - assert last_tsdb_dt > start_dt - selected = array['time'] > last_tsdb_dt.timestamp() - to_push = array[selected] - log.info( - f'Pushing partial frame {to_push.size} to shm' - ) - return to_push - - return array - - # start history anal and load missing new data via backend. - - broker, symbol, expiry = unpack_fqsn(fqsn) - - async with open_history_client(bfqsn) as hist: - - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - array, start_dt, end_dt = await hist(end_dt=None) - to_push = diff_history( - array, - start_dt, - end_dt, - last_tsdb_dt=last_tsdb_dt, - ) - log.info(f'Pushing {to_push.size} to shm!') - shm.push(to_push) - - for delay_s in sampler.subscribers: - await broadcast(delay_s) - - # let caller unblock and deliver latest history frame - task_status.started(shm) - some_data_ready.set() - - # pull new history frames until we hit latest - # already in the tsdb - # while start_dt > last_tsdb_dt: - while True: - array, start_dt, end_dt = await hist(end_dt=start_dt) - to_push = diff_history( - array, - start_dt, - end_dt, - # last_tsdb_dt=last_tsdb_dt, - # just run indefinitely - last_tsdb_dt=None, - ) - log.info(f'Pushing {to_push.size} to shm!') - shm.push(to_push, prepend=True) - for delay_s in sampler.subscribers: - await broadcast(delay_s) - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - shm.push( - fastest[-shm._first.value:], - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # start=shm._len - _secs_in_day, - field_map={ - 'Epoch': 'time', - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', - }, - ) - - # TODO: write new data to tsdb to be ready to for next - # read. + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # start=shm._len - _secs_in_day, + field_map={ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + }, + ) + # TODO: write new data to tsdb to be ready to for next read. if do_legacy_backfill: # do a legacy incremental backfill from the provider. @@ -385,6 +393,7 @@ async def manage_history( mod, bfqsn, shm, + do_legacy=True, ) # yield back after client connect with filled shm