From 97e2403fb1bfef832638bfd06e082bc0badc6b93 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Dec 2023 13:11:00 -0500 Subject: [PATCH] Rework backfiller and null-segment task conc For each timeframe open a sub-nursery to do the backfilling + tsdb load + null-segment scanning in an effort to both speed up load time (though we need to reverse the current order to really make it faster rn since moving to the much faster parquet file backend) and do concurrent time-gap/null-segment checking of tsdb history while mrf (most recent frame) history is backfilling. The details are more or less just `trio` related task-func composition tricks and a reordering of said funcs for optimal startup latency. Also commented the `back_load_from_tsdb()` task for now since it's unused. --- piker/data/history.py | 507 ++++++++++++++++++++++++++---------------- 1 file changed, 310 insertions(+), 197 deletions(-) diff --git a/piker/data/history.py b/piker/data/history.py index 4b7737b0..2a4c7a81 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -36,6 +36,7 @@ import trio from trio_typing import TaskStatus import tractor from pendulum import ( + DateTime, Duration, from_timestamp, ) @@ -172,16 +173,27 @@ async def maybe_fill_null_segments( sampler_stream: tractor.MsgStream, mkt: MktPair, + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, + ) -> list[Frame]: + null_segs_detected = trio.Event() + task_status.started(null_segs_detected) + frame: Frame = shm.array - async for ( + + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + for ( absi_start, absi_end, fi_start, fi_end, start_t, end_t, start_dt, end_dt, ) in iter_null_segs( - frame, + null_segs=null_segs, + frame=frame, timeframe=timeframe, ): @@ -191,6 +203,7 @@ async def maybe_fill_null_segments( start_dt and end_dt < start_dt ): + await tractor.pause() break ( @@ -233,29 +246,37 @@ async def maybe_fill_null_segments( }, }) + null_segs_detected.set() + # RECHECK for more null-gaps + frame: Frame = shm.array + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + if ( + null_segs + and + len(null_segs[-1]) + ): await tractor.pause() - # TODO: interatively step through any remaining time gaps? - # if ( - # next_end_dt not in frame[ - # ): - # pass - - # RECHECK for more null-gaps - frame: Frame = shm.array - null_segs: tuple | None = get_null_segs( - frame, - period=timeframe, - ) - if ( - null_segs - and - len(null_segs[-1]) - ): - await tractor.pause() + # TODO: interatively step through any remaining + # time-gaps/null-segments and spawn piecewise backfiller + # tasks in a nursery? + # -[ ] not sure that's going to work so well on the ib + # backend but worth a shot? + # -[ ] mk new history connections to make it properly + # parallel possible no matter the backend? + # -[ ] fill algo: do queries in alternating "latest, then + # earliest, then latest.. etc?" + # if ( + # next_end_dt not in frame[ + # ): + # pass async def start_backfill( + tn: trio.Nursery, get_hist, mod: ModuleType, mkt: MktPair, @@ -510,23 +531,6 @@ async def start_backfill( f'Finished filling gap to tsdb start @ {backfill_until_dt}!' ) - # NOTE: conduct tsdb timestamp gap detection and backfill any - # seemingly missing (null-time) segments.. - - # TODO: ideally these can never exist! - # -[ ] somehow it seems sometimes we're writing zero-ed - # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this - # work PREVENTAVELY instead? - # -[ ] fill in non-zero epoch time values ALWAYS! - await maybe_fill_null_segments( - shm=shm, - timeframe=timeframe, - get_hist=get_hist, - sampler_stream=sampler_stream, - mkt=mkt, - ) - # XXX: extremely important, there can be no checkpoints # in the block above to avoid entering new ``frames`` # values while we're pipelining the current ones to @@ -537,6 +541,12 @@ async def start_backfill( bf_done.set() +# NOTE: originally this was used to cope with a tsdb (marketstore) +# which could not delivery very large frames of history over gRPC +# (thanks goolag) due to corruption issues. NOW, using apache +# parquet (by default in the local filesys) we don't have this +# requirement since the files can be loaded very quickly in +# entirety to memory via async def back_load_from_tsdb( storemod: ModuleType, storage: StorageClient, @@ -674,10 +684,94 @@ async def back_load_from_tsdb( # await sampler_stream.send('broadcast_all') +async def push_latest_frame( + dt_eps: list[DateTime, DateTime], + shm: ShmArray, + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ], + timeframe: float, + config: dict, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, +): + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + try: + ( + array, + mr_start_dt, + mr_end_dt, + ) = await get_hist( + timeframe, + end_dt=None, + ) + # so caller can access these ep values + dt_eps.extend([ + mr_start_dt, + mr_end_dt, + ]) + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + except DataUnavailable: + task_status.started() + + if timeframe > 1: + await tractor.pause() + + return + + # NOTE: on the first history, most recent history + # frame we PREPEND from the current shm ._last index + # and thus a gap between the earliest datum loaded here + # and the latest loaded from the tsdb may exist! + log.info(f'Pushing {array.size} to shm!') + shm.push( + array, + prepend=True, # append on first frame + ) + + +async def load_tsdb_hist( + storage: StorageClient, + mkt: MktPair, + timeframe: float, +) -> tuple[ + np.ndarray, + DateTime, + DateTime, +] | None: + # loads a (large) frame of data from the tsdb depending + # on the db's query size limit; our "nativedb" (using + # parquet) generally can load the entire history into mem + # but if not then below the remaining history can be lazy + # loaded? + fqme: str = mkt.fqme + tsdb_entry: tuple[ + np.ndarray, + DateTime, + DateTime, + ] + try: + tsdb_entry: tuple | None = await storage.load( + fqme, + timeframe=timeframe, + ) + return tsdb_entry + + except TimeseriesNotFound: + log.warning( + f'No timeseries yet for {timeframe}@{fqme}' + ) + return None + + async def tsdb_backfill( mod: ModuleType, storemod: ModuleType, - tn: trio.Nursery, storage: StorageClient, mkt: MktPair, @@ -692,175 +786,193 @@ async def tsdb_backfill( ) -> None: + if timeframe not in (1, 60): + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {timeframe} seconds..\n' + 'So yuh.. dun do dat brudder.' + ) + get_hist: Callable[ [int, datetime, datetime], tuple[np.ndarray, str] ] config: dict[str, int] - async with mod.open_history_client( - mkt, - ) as (get_hist, config): + async with ( + mod.open_history_client( + mkt, + ) as (get_hist, config), + + # NOTE: this sub-nursery splits to tasks for the given + # sampling rate to concurrently load offline tsdb + # timeseries as well as new data from the venue backend! + ): log.info( f'`{mod}` history client returned backfill config:\n' f'{config}\n' ) - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - try: - ( - array, - mr_start_dt, - mr_end_dt, - ) = await get_hist( + dt_eps: list[DateTime, DateTime] = [] + async with trio.open_nursery() as tn: + tn.start_soon( + push_latest_frame, + dt_eps, + shm, + get_hist, timeframe, - end_dt=None, + config, ) - # XXX: timeframe not supported for backend (since - # above exception type), terminate immediately since - # there's no backfilling possible. - except DataUnavailable: + # tell parent task to continue + # TODO: really we'd want this the other way with the + # tsdb load happening asap and the since the latest + # frame query will normally be the main source of + # latency? task_status.started() - if timeframe > 1: - await tractor.pause() + tsdb_entry: tuple = await load_tsdb_hist( + storage, + mkt, + timeframe, + ) - return - - # NOTE: on the first history, most recent history - # frame we PREPEND from the current shm ._last index - # and thus a gap between the earliest datum loaded here - # and the latest loaded from the tsdb may exist! - log.info(f'Pushing {array.size} to shm!') - shm.push( - array, - prepend=True, # append on first frame - ) + # NOTE: iabs to start backfilling from, reverse chronological, + # ONLY AFTER the first history frame has been pushed to + # mem! backfill_gap_from_shm_index: int = shm._first.value + 1 - # tell parent task to continue - task_status.started() + ( + mr_start_dt, + mr_end_dt, + ) = dt_eps - # loads a (large) frame of data from the tsdb depending - # on the db's query size limit; our "nativedb" (using - # parquet) generally can load the entire history into mem - # but if not then below the remaining history can be lazy - # loaded? - fqme: str = mkt.fqme - last_tsdb_dt: datetime | None = None - try: - tsdb_entry: tuple | None = await storage.load( - fqme, - timeframe=timeframe, - ) - except TimeseriesNotFound: - log.warning( - f'No timeseries yet for {timeframe}@{fqme}' - ) - else: - ( - tsdb_history, - first_tsdb_dt, - last_tsdb_dt, - ) = tsdb_entry + async with trio.open_nursery() as tn: - # calc the index from which the tsdb data should be - # prepended, presuming there is a gap between the - # latest frame (loaded/read above) and the latest - # sample loaded from the tsdb. - backfill_diff: Duration = mr_start_dt - last_tsdb_dt - offset_s: float = backfill_diff.in_seconds() + # Prepend any tsdb history to the shm buffer which should + # now be full of the most recent history pulled from the + # backend's last frame. + if tsdb_entry: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry - # XXX EDGE CASEs: the most recent frame overlaps with - # prior tsdb history!! - # - so the latest frame's start time is earlier then - # the tsdb's latest sample. - # - alternatively this may also more generally occur - # when the venue was closed (say over the weeknd) - # causing a timeseries gap, AND the query frames size - # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS - # GREATER THAN the current venue-market's operating - # session (time) we will receive datums from BEFORE THE - # CLOSURE GAP and thus the `offset_s` value will be - # NEGATIVE! In this case we need to ensure we don't try - # to push datums that have already been recorded in the - # tsdb. In this case we instead only retreive and push - # the series portion missing from the db's data set. - # if offset_s < 0: - # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt - # non_overlap_offset_s: float = backfill_diff.in_seconds() + # if there is a gap to backfill from the first + # history frame until the last datum loaded from the tsdb + # continue that now in the background + bf_done = await tn.start( + partial( + start_backfill, + tn=tn, + get_hist=get_hist, + mod=mod, + mkt=mkt, + shm=shm, + timeframe=timeframe, - offset_samples: int = round(offset_s / timeframe) - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - if offset_s > 0: - # NOTE XXX: ONLY when there is an actual gap - # between the earliest sample in the latest history - # frame do we want to NOT stick the latest tsdb - # history adjacent to that latest frame! - prepend_start = shm._first.value - offset_samples + 1 - to_push = tsdb_history[-prepend_start:] - else: - # when there is overlap we want to remove the - # overlapping samples from the tsdb portion (taking - # instead the latest frame's values since THEY - # SHOULD BE THE SAME) and prepend DIRECTLY adjacent - # to the latest frame! - # TODO: assert the overlap segment array contains - # the same values!?! - prepend_start = shm._first.value - to_push = tsdb_history[-(shm._first.value):offset_samples - 1] - - # tsdb history is so far in the past we can't fit it in - # shm buffer space so simply don't load it! - if prepend_start > 0: - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - start=prepend_start, - field_map=storemod.ohlc_key_map, + backfill_from_shm_index=backfill_gap_from_shm_index, + backfill_from_dt=mr_start_dt, + sampler_stream=sampler_stream, + backfill_until_dt=last_tsdb_dt, + storage=storage, + write_tsdb=True, + ) ) - log.info(f'Loaded {to_push.shape} datums from storage') + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() + # XXX EDGE CASEs: the most recent frame overlaps with + # prior tsdb history!! + # - so the latest frame's start time is earlier then + # the tsdb's latest sample. + # - alternatively this may also more generally occur + # when the venue was closed (say over the weeknd) + # causing a timeseries gap, AND the query frames size + # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS + # GREATER THAN the current venue-market's operating + # session (time) we will receive datums from BEFORE THE + # CLOSURE GAP and thus the `offset_s` value will be + # NEGATIVE! In this case we need to ensure we don't try + # to push datums that have already been recorded in the + # tsdb. In this case we instead only retreive and push + # the series portion missing from the db's data set. + # if offset_s < 0: + # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt + # non_overlap_offset_s: float = backfill_diff.in_seconds() + + offset_samples: int = round(offset_s / timeframe) + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + if offset_s > 0: + # NOTE XXX: ONLY when there is an actual gap + # between the earliest sample in the latest history + # frame do we want to NOT stick the latest tsdb + # history adjacent to that latest frame! + prepend_start = shm._first.value - offset_samples + 1 + to_push = tsdb_history[-prepend_start:] + else: + # when there is overlap we want to remove the + # overlapping samples from the tsdb portion (taking + # instead the latest frame's values since THEY + # SHOULD BE THE SAME) and prepend DIRECTLY adjacent + # to the latest frame! + # TODO: assert the overlap segment array contains + # the same values!?! + prepend_start = shm._first.value + to_push = tsdb_history[-(shm._first.value):offset_samples - 1] + + # tsdb history is so far in the past we can't fit it in + # shm buffer space so simply don't load it! + if prepend_start > 0: + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any + # seemingly missing (null-time) segments.. + # TODO: ideally these can never exist! + # -[ ] somehow it seems sometimes we're writing zero-ed + # segments to tsdbs during teardown? + # -[ ] can we ensure that the backcfiller tasks do this + # work PREVENTAVELY instead? + # -[ ] fill in non-zero epoch time values ALWAYS! + # await maybe_fill_null_segments( + nulls_detected: trio.Event = await tn.start(partial( + maybe_fill_null_segments, + + shm=shm, + timeframe=timeframe, + get_hist=get_hist, + sampler_stream=sampler_stream, + mkt=mkt, + )) + + + # TODO: who would want to? + await nulls_detected.wait() + + await bf_done.wait() # TODO: maybe start history anal and load missing "history # gaps" via backend.. - if timeframe not in (1, 60): - raise ValueError( - '`piker` only needs to support 1m and 1s sampling ' - 'but ur api is trying to deliver a longer ' - f'timeframe of {timeframe} seconds..\n' - 'So yuh.. dun do dat brudder.' - ) - - # if there is a gap to backfill from the first - # history frame until the last datum loaded from the tsdb - # continue that now in the background - bf_done = await tn.start( - partial( - start_backfill, - get_hist=get_hist, - mod=mod, - mkt=mkt, - shm=shm, - timeframe=timeframe, - backfill_from_shm_index=backfill_gap_from_shm_index, - backfill_from_dt=mr_start_dt, - sampler_stream=sampler_stream, - backfill_until_dt=last_tsdb_dt, - storage=storage, - write_tsdb=True, - ) - ) - # if len(hist_shm.array) < 2: # TODO: there's an edge case here to solve where if the last # frame before market close (at least on ib) was pushed and @@ -874,32 +986,29 @@ async def tsdb_backfill( # backload any further data from tsdb (concurrently per # timeframe) if not all data was able to be loaded (in memory) # from the ``StorageClient.load()`` call above. - try: - await trio.sleep_forever() - finally: - return + await trio.sleep_forever() # XXX NOTE: this is legacy from when we were using # marketstore and we needed to continue backloading # incrementally from the tsdb client.. (bc it couldn't # handle a single large query with gRPC for some # reason.. classic goolag pos) - tn.start_soon( - back_load_from_tsdb, + # tn.start_soon( + # back_load_from_tsdb, - storemod, - storage, - fqme, + # storemod, + # storage, + # fqme, - tsdb_history, - last_tsdb_dt, - mr_start_dt, - mr_end_dt, - bf_done, + # tsdb_history, + # last_tsdb_dt, + # mr_start_dt, + # mr_end_dt, + # bf_done, - timeframe, - shm, - ) + # timeframe, + # shm, + # ) async def manage_history( @@ -1007,6 +1116,11 @@ async def manage_history( async with ( storage.open_storage_client() as (storemod, client), + + # NOTE: this nursery spawns a task per "timeframe" (aka + # sampling period) data set since normally differently + # sampled timeseries can be loaded / process independently + # ;) trio.open_nursery() as tn, ): log.info( @@ -1056,7 +1170,6 @@ async def manage_history( tsdb_backfill, mod=mod, storemod=storemod, - tn=tn, # bus, storage=client, mkt=mkt,