From 8233d12afb7530f219f85334295816a49034f416 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jun 2023 11:16:19 -0400 Subject: [PATCH] Detect and fill time gaps in tsdb history For now, just detect and fill in gaps (via fresh backend queries) *in the shm buffer* but eventually i'm pretty sure we can just write these direct to the parquet file as well. Use the new `.data._timeseries.detect_null_time_gap()` to find and fill in the `ShmArray` index range, re-check it and enter a prompt if it didn't totally fill. Also, - do a massive cleanup and removal of all unused/commented code. - drop the duplicate frames tracking, don't think we need it after removing multi-frame concurrent queries. - change backfill loop variable `end_dt` -> `last_start_dt` which is more semantically correct. - fix logic to backfill any missing sub-sequence portion for any frame query that overruns the shm buffer prependable space by detecting the available rows left to insert and only push those. - add a new `shm_push_in_between()` helper to match. --- piker/data/history.py | 581 +++++++++++++++--------------------------- 1 file changed, 208 insertions(+), 373 deletions(-) diff --git a/piker/data/history.py b/piker/data/history.py index 51e19c5a..eea6e83f 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -67,10 +67,6 @@ if TYPE_CHECKING: def diff_history( array: np.ndarray, - # timeframe: int, - # start_dt: datetime, - # end_dt: datetime, - append_until_dt: datetime | None = None, prepend_until_dt: datetime | None = None, @@ -90,123 +86,45 @@ def diff_history( else: return array[times >= prepend_until_dt.timestamp()] +async def shm_push_in_between( + shm: ShmArray, + to_push: np.ndarray, + prepend_index: int, -# async def open_history_mngr( -# mod: ModuleType, -# mkt: MktPair, -# # shm: ShmArray, -# # timeframes: list[float] = [60, 1], -# timeframes: float, + update_start_on_prepend: bool = False, -# ) -> Callable[ -# [int, datetime, datetime], -# tuple[np.ndarray, str] -# ]: -# ''' -# Open a "history manager" for the backend data provider, -# get the latest "frames worth" of ohlcv history, -# push the history to shm and deliver -# the start datum's datetime value so that further history loading -# can be done until synchronized with the tsdb loaded time series. +) -> int: + shm.push( + to_push, + prepend=True, -# ''' -# hist: Callable[ -# [int, datetime, datetime], -# tuple[np.ndarray, str] -# ] -# config: dict[str, int] + # XXX: only update the ._first index if no tsdb + # segment was previously prepended by the + # parent task. + update_first=update_start_on_prepend, -# async with mod.open_history_client( -# mkt, -# ) as (hist, config): -# log.info(f'{mod} history client returned backfill config: {config}') + # XXX: only prepend from a manually calculated shm + # index if there was already a tsdb history + # segment prepended (since then the + # ._first.value is going to be wayyy in the + # past!) + start=( + prepend_index + if not update_start_on_prepend + else None + ), + ) + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + array = shm.array + zeros = array[array['low'] == 0] + if ( + 0 < zeros.size < 1000 + ): + tractor.breakpoint() -# # get latest query's worth of history all the way -# # back to what is recorded in the tsdb -# array, mr_start_dt, mr_end_dt = await hist( -# timeframe, -# end_dt=None, -# ) -# times: np.ndarray = array['time'] - -# # sample period step size in seconds -# step_size_s = ( -# from_timestamp(times[-1]) -# - from_timestamp(times[-2]) -# ).seconds - -# if step_size_s not in (1, 60): -# log.error(f'Last 2 sample period is off!? -> {step_size_s}') -# step_size_s = ( -# from_timestamp(times[-2]) -# - from_timestamp(times[-3]) -# ).seconds - -# # NOTE: on the first history, most recent history -# # frame we PREPEND from the current shm ._last index -# # and thus a gap between the earliest datum loaded here -# # and the latest loaded from the tsdb may exist! -# log.info(f'Pushing {to_push.size} to shm!') -# shm.push( -# to_push, -# prepend=True, -# # start= -# ) - - -# # if the market is open (aka we have a live feed) but the -# # history sample step index seems off we report the surrounding -# # data and drop into a bp. this case shouldn't really ever -# # happen if we're doing history retrieval correctly. -# # if ( -# # step_size_s == 60 -# # and feed_is_live.is_set() -# # ): -# # inow = round(time.time()) -# # diff = inow - times[-1] -# # if abs(diff) > 60: -# # surr = array[-6:] -# # diff_in_mins = round(diff/60., ndigits=2) -# # log.warning( -# # f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n' -# # f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' -# # 'Surrounding 6 time stamps:\n' -# # f'{list(surr["time"])}\n' -# # 'Here is surrounding 6 samples:\n' -# # f'{surr}\nn' -# # ) - -# # uncomment this for a hacker who wants to investigate -# # this case manually.. -# # await tractor.breakpoint() - -# # frame's worth of sample-period-steps, in seconds -# # frame_size_s = len(array) * step_size_s - -# to_push = array -# # to_push = diff_history( -# # array, -# # # timeframe, -# # # mr_start_dt, -# # # mr_end_dt, - -# # # backfill scenario for "most recent" frame -# # prepend_until_dt=last_tsdb_dt, -# # ) - -# # NOTE: on the first history, most recent history -# # frame we PREPEND from the current shm ._last index -# # and thus a gap between the earliest datum loaded here -# # and the latest loaded from the tsdb may exist! -# log.info(f'Pushing {to_push.size} to shm!') -# shm.push( -# to_push, -# prepend=True, -# # start= -# ) -# # TODO: should we wrap this in a "history frame" type or -# # something? -# yield hist, mr_start_dt, mr_end_dt async def start_backfill( @@ -221,44 +139,27 @@ async def start_backfill( sampler_stream: tractor.MsgStream, - backfill_until_dt: datetime | None = None, storage: StorageClient | None = None, write_tsdb: bool = True, - # tsdb_is_up: bool = True, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, ) -> int: - # TODO: *** THIS IS A BUG *** - # we need to only broadcast to subscribers for this fqme.. - # otherwise all fsps get reset on every chart.. - # await sampler_stream.send('broadcast_all') - - # signal that backfilling to tsdb's end datum is complete - bf_done = trio.Event() - # let caller unblock and deliver latest history frame - task_status.started( #( - # mr_start_dt, - # mr_end_dt, - bf_done, - )# ) + # and use to signal that backfilling the shm gap until + # the tsdb end is complete! + bf_done = trio.Event() + task_status.started(bf_done) # based on the sample step size, maybe load a certain amount history update_start_on_prepend: bool = False if backfill_until_dt is None: - if timeframe not in (1, 60): - raise ValueError( - '`piker` only needs to support 1m and 1s sampling ' - 'but ur api is trying to deliver a longer ' - f'timeframe of {timeframe} seconds..\n' - 'So yuh.. dun do dat brudder.' - ) - + # TODO: drop this right and just expose the backfill + # limits inside a [storage] section in conf.toml? # when no tsdb "last datum" is provided, we just load # some near-term history. # periods = { @@ -266,7 +167,6 @@ async def start_backfill( # 60: {'days': 14}, # } - # if tsdb_is_up: # do a decently sized backfill and load it into storage. periods = { 1: {'days': 6}, @@ -281,38 +181,33 @@ async def start_backfill( # settings above when the tsdb is detected as being empty. backfill_until_dt = backfill_from_dt.subtract(**period_duration) + + # TODO: can we drop this? without conc i don't think this + # is necessary any more? # configure async query throttling # rate = config.get('rate', 1) # XXX: legacy from ``trimeter`` code but unsupported now. # erlangs = config.get('erlangs', 1) - # avoid duplicate history frames with a set of datetime frame # starts and associated counts of how many duplicates we see # per time stamp. starts: Counter[datetime] = Counter() - # conduct "backward history filling" since - # no tsdb history yet exists. - - # implemented via a simple inline sequential loop where we - # simply pass the last retrieved start dt to the next - # request as it's end dt. - # while end_dt < backfill_until_dt: - # while ( - # end_dt is None # init case - # or end_dt < mr_start_dt - # ): - - # conduct "forward filling" from the last time step - # loaded from the tsdb until the first step loaded - # just above - end_dt: datetime = backfill_from_dt - # start_dt: datetime = backfill_until_dt + # conduct "backward history gap filling" where we push to + # the shm buffer until we have history back until the + # latest entry loaded from the tsdb's table B) + last_start_dt: datetime = backfill_from_dt next_prepend_index: int = backfill_from_shm_index - while end_dt > backfill_until_dt: + while last_start_dt > backfill_until_dt: + + # if timeframe == 60: + # await tractor.breakpoint() + # else: + # return + log.debug( - f'Requesting {timeframe}s frame ending in {end_dt}' + f'Requesting {timeframe}s frame ending in {last_start_dt}' ) try: @@ -322,8 +217,7 @@ async def start_backfill( next_end_dt, ) = await get_hist( timeframe, - end_dt=end_dt, - # start_dt=start_dt, + end_dt=last_start_dt, ) # broker says there never was or is no more history to pull @@ -338,22 +232,23 @@ async def start_backfill( # request loop until the condition is resolved? return - if ( - next_start_dt in starts - and starts[next_start_dt] <= 6 - ): - start_dt = min(starts) - log.warning( - f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}" - ) - starts[start_dt] += 1 - continue + # TODO: drop this? see todo above.. + # if ( + # next_start_dt in starts + # and starts[next_start_dt] <= 6 + # ): + # start_dt = min(starts) + # log.warning( + # f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}" + # ) + # starts[start_dt] += 1 + # continue - elif starts[next_start_dt] > 6: - log.warning( - f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?' - ) - return + # elif starts[next_start_dt] > 6: + # log.warning( + # f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?' + # ) + # return # only update new start point if not-yet-seen start_dt: datetime = next_start_dt @@ -361,7 +256,7 @@ async def start_backfill( assert array['time'][0] == start_dt.timestamp() - diff = end_dt - start_dt + diff = last_start_dt - start_dt frame_time_diff_s = diff.seconds # frame's worth of sample-period-steps, in seconds @@ -374,48 +269,31 @@ async def start_backfill( # history gap (eg. market closed period, outage, etc.) # so just report it to console for now. log.warning( - f'History frame ending @ {end_dt} appears to have a gap:\n' + f'History frame ending @ {last_start_dt} appears to have a gap:\n' f'{diff} ~= {frame_time_diff_s} seconds' ) to_push = diff_history( array, - # timeframe, - # start_dt, - # end_dt, prepend_until_dt=backfill_until_dt, ) ln = len(to_push) if ln: - log.info(f'{ln} bars for {start_dt} -> {end_dt}') + log.info(f'{ln} bars for {start_dt} -> {last_start_dt}') else: log.warning( - f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' + f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {last_start_dt}' ) # bail gracefully on shm allocation overrun/full # condition try: - shm.push( + await shm_push_in_between( + shm, to_push, - prepend=True, - - # XXX: only update the ._first index if no tsdb - # segment was previously prepended by the - # parent task. - update_first=update_start_on_prepend, - - # XXX: only prepend from a manually calculated shm - # index if there was already a tsdb history - # segment prepended (since then the - # ._first.value is going to be wayyy in the - # past!) - start=( - next_prepend_index - if not update_start_on_prepend - else None - ), + prepend_index=next_prepend_index, + update_start_on_prepend=update_start_on_prepend, ) await sampler_stream.send({ 'broadcast_all': { @@ -425,34 +303,40 @@ async def start_backfill( # decrement next prepend point next_prepend_index = next_prepend_index - ln - end_dt = next_start_dt - - # XXX: extremely important, there can be no checkpoints - # in the block above to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - array = shm.array - zeros = array[array['low'] == 0] - if ( - 0 < zeros.size < 10 - ): - await tractor.breakpoint() - + last_start_dt = next_start_dt except ValueError as ve: _ve = ve - log.info( - f'Shm buffer overrun on: {start_dt} -> {end_dt}?' + log.error( + f'Shm buffer prepend OVERRUN on: {start_dt} -> {last_start_dt}?' ) - await tractor.breakpoint() + if next_prepend_index < ln: + log.warning( + f'Shm buffer can only hold {next_prepend_index} more rows..\n' + f'Appending those from recent {ln}-sized frame, no more!' + ) + + to_push = to_push[-next_prepend_index + 1:] + await shm_push_in_between( + shm, + to_push, + prepend_index=next_prepend_index, + update_start_on_prepend=update_start_on_prepend, + ) + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': True + }, + }) + # can't push the entire frame? so # push only the amount that can fit.. break log.info( f'Shm pushed {ln} frame:\n' - f'{start_dt} -> {end_dt}' + f'{start_dt} -> {last_start_dt}' ) # FINALLY, maybe write immediately to the tsdb backend for @@ -460,11 +344,10 @@ async def start_backfill( if ( storage is not None and write_tsdb - and False ): log.info( f'Writing {ln} frame to storage:\n' - f'{start_dt} -> {end_dt}' + f'{start_dt} -> {last_start_dt}' ) if mkt.dst.atype not in {'crypto', 'crypto_currency'}: @@ -477,11 +360,59 @@ async def start_backfill( else: col_sym_key: str = mkt.get_fqme(delim_char='') + # TODO: implement parquet append!? await storage.write_ohlcv( col_sym_key, shm.array, timeframe, ) + else: + # finally filled gap + log.info( + f'Finished filling gap to tsdb start @ {backfill_until_dt}!' + ) + # conduct tsdb timestamp gap detection and backfill any + # seemingly missing portions! + + from ._timeseries import detect_null_time_gap + + indices: tuple | None = detect_null_time_gap(shm) + if indices: + ( + istart, + start, + end, + iend, + ) = indices + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( + timeframe, + start_dt=from_timestamp(start), + end_dt=from_timestamp(end), + ) + await shm_push_in_between( + shm, + array, + prepend_index=iend, + update_start_on_prepend=False, + ) + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': True + }, + }) + indices: tuple | None = detect_null_time_gap(shm) + if indices: + ( + istart, + start, + end, + iend, + ) = indices + await tractor.breakpoint() # TODO: can we only trigger this if the respective # history in "in view"?!? @@ -496,43 +427,11 @@ async def start_backfill( bf_done.set() -def push_tsdb_history_to_shm( - storemod: ModuleType, - shm: ShmArray, - tsdb_history: np.ndarray, - time_field_key: str, - prepend: bool = False, - -) -> datetime: - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = shm._first.value - - to_push = tsdb_history[-prepend_start:] - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=prepend, - # update_first=False, - # start=prepend_start, - field_map=storemod.ohlc_key_map, - ) - - log.info(f'Loaded {to_push.shape} datums from storage') - tsdb_last_frame_start = tsdb_history[time_field_key][0] - return from_timestamp(tsdb_last_frame_start) - - async def back_load_from_tsdb( storemod: ModuleType, storage: StorageClient, fqme: str, - # dts_per_tf: dict[int, datetime], tsdb_history: np.ndarray, @@ -631,13 +530,26 @@ async def back_load_from_tsdb( else: tsdb_last_frame_start = next_start - tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( - storemod, - shm, - tsdb_history, - time_key, + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=storemod.ohlc_key_map, ) + log.info(f'Loaded {to_push.shape} datums from storage') + tsdb_last_frame_start = tsdb_history[time_key][0] + # manually trigger step update to update charts/fsps # which need an incremental update. # NOTE: the way this works is super duper @@ -651,22 +563,18 @@ async def back_load_from_tsdb( # graphics loop cycle. # await sampler_stream.send('broadcast_all') - # TODO: write new data to tsdb to be ready to for next read. - async def tsdb_backfill( mod: ModuleType, storemod: ModuleType, - # bus: _FeedsBus, tn: trio.Nursery, + storage: StorageClient, mkt: MktPair, - # shms: dict[int, ShmArray], shm: ShmArray, timeframe: float, sampler_stream: tractor.MsgStream, - # feed_is_live: trio.Event, task_status: TaskStatus[ tuple[ShmArray, ShmArray] @@ -674,22 +582,11 @@ async def tsdb_backfill( ) -> None: - # TODO: this should be used verbatim for the pure - # shm backfiller approach below. - # dts_per_tf: dict[int, datetime] = {} - fqme: str = mkt.fqme - - # time_key: str = 'time' - # if getattr(storemod, 'ohlc_key_map', False): - # keymap: bidict = storemod.ohlc_key_map - # time_key: str = keymap.inverse['time'] - get_hist: Callable[ [int, datetime, datetime], tuple[np.ndarray, str] ] config: dict[str, int] - async with mod.open_history_client( mkt, ) as (get_hist, config): @@ -733,24 +630,18 @@ async def tsdb_backfill( shm.push( array, prepend=True, # append on first frame - # start= ) backfill_gap_from_shm_index: int = shm._first.value + 1 # tell parent task to continue task_status.started() - # start history anal and load missing new data via backend. - # backfill_until_dt: datetime | None = None - # started_after_tsdb_load: bool = False - - # for timeframe, shm in shms.items(): - # loads a (large) frame of data from the tsdb depending # on the db's query size limit; our "nativedb" (using # parquet) generally can load the entire history into mem # but if not then below the remaining history can be lazy # loaded? + fqme: str = mkt.fqme tsdb_entry: tuple | None = await storage.load( fqme, timeframe=timeframe, @@ -777,45 +668,36 @@ async def tsdb_backfill( # re-index with a `time` and index field prepend_start = shm._first.value - offset_samples + 1 - to_push = tsdb_history[-prepend_start:] - shm.push( - to_push, + # tsdb history is so far in the past we can't fit it in + # shm buffer space so simply don't load it! + if prepend_start > 0: + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - start=prepend_start, - field_map=storemod.ohlc_key_map, + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # TODO: maybe start history anal and load missing "history + # gaps" via backend.. + + if timeframe not in (1, 60): + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {timeframe} seconds..\n' + 'So yuh.. dun do dat brudder.' ) - - log.info(f'Loaded {to_push.shape} datums from storage') - - # tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( - # storemod, - # shm, - # tsdb_history, - # time_key, - # prepend=True, - # ) - # assert tsdb_last_frame_start == first_tsdb_dt - - # unblock the feed bus management task - # assert len(shms[1].array) - # if not started_after_tsdb_load: - # task_status.started() - # started_after_tsdb_load = True - - # begin backfiller task ASAP - # try: - # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb # continue that now in the background - # try: - # ( - # latest_start_dt, - # latest_end_dt, bf_done = await tn.start( partial( start_backfill, @@ -827,46 +709,24 @@ async def tsdb_backfill( backfill_from_shm_index=backfill_gap_from_shm_index, backfill_from_dt=mr_start_dt, - backfill_until_dt=last_tsdb_dt, + sampler_stream=sampler_stream, - # feed_is_live, - + backfill_until_dt=last_tsdb_dt, storage=storage, - # tsdb_is_up=True, ) ) - # if tsdb_entry: - # dts_per_tf[timeframe] = ( - # tsdb_history, - # last_tsdb_dt, - # latest_start_dt, - # latest_end_dt, - # bf_done, - # ) - # elif not started_after_tsdb_load: - # task_status.started() - # started_after_tsdb_load = True - # XXX: timeframe not supported for backend (since - # above exception type), terminate immediately since - # there's no backfilling possible. - # except DataUnavailable: - # return - # continue - - # tsdb_history = series.get(timeframe) - - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. # backload any further data from tsdb (concurrently per # timeframe) if not all data was able to be loaded (in memory) @@ -875,7 +735,6 @@ async def tsdb_backfill( await trio.sleep_forever() finally: return - # write_ohlcv # IF we need to continue backloading incrementall from the # tsdb client.. @@ -895,29 +754,6 @@ async def tsdb_backfill( timeframe, shm, ) - # async with trio.open_nursery() as nurse: - # for timeframe, shm in shms.items(): - - # entry = dts_per_tf.get(timeframe) - # if not entry: - # continue - - # ( - # tsdb_history, - # last_tsdb_dt, - # latest_start_dt, - # latest_end_dt, - # bf_done, - # ) = entry - - # if not tsdb_history.size: - # continue - - - # try: - # await trio.sleep_forever() - # finally: - # write_ohlcv async def manage_history( @@ -1079,7 +915,6 @@ async def manage_history( timeframe, sample_stream, - # feed_is_live, ) # indicate to caller that feed can be delivered to