diff --git a/piker/data/history.py b/piker/data/history.py index 4b7737b0..2a4c7a81 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -36,6 +36,7 @@ import trio from trio_typing import TaskStatus import tractor from pendulum import ( + DateTime, Duration, from_timestamp, ) @@ -172,16 +173,27 @@ async def maybe_fill_null_segments( sampler_stream: tractor.MsgStream, mkt: MktPair, + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, + ) -> list[Frame]: + null_segs_detected = trio.Event() + task_status.started(null_segs_detected) + frame: Frame = shm.array - async for ( + + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + for ( absi_start, absi_end, fi_start, fi_end, start_t, end_t, start_dt, end_dt, ) in iter_null_segs( - frame, + null_segs=null_segs, + frame=frame, timeframe=timeframe, ): @@ -191,6 +203,7 @@ async def maybe_fill_null_segments( start_dt and end_dt < start_dt ): + await tractor.pause() break ( @@ -233,29 +246,37 @@ async def maybe_fill_null_segments( }, }) + null_segs_detected.set() + # RECHECK for more null-gaps + frame: Frame = shm.array + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + if ( + null_segs + and + len(null_segs[-1]) + ): await tractor.pause() - # TODO: interatively step through any remaining time gaps? - # if ( - # next_end_dt not in frame[ - # ): - # pass - - # RECHECK for more null-gaps - frame: Frame = shm.array - null_segs: tuple | None = get_null_segs( - frame, - period=timeframe, - ) - if ( - null_segs - and - len(null_segs[-1]) - ): - await tractor.pause() + # TODO: interatively step through any remaining + # time-gaps/null-segments and spawn piecewise backfiller + # tasks in a nursery? + # -[ ] not sure that's going to work so well on the ib + # backend but worth a shot? + # -[ ] mk new history connections to make it properly + # parallel possible no matter the backend? + # -[ ] fill algo: do queries in alternating "latest, then + # earliest, then latest.. etc?" + # if ( + # next_end_dt not in frame[ + # ): + # pass async def start_backfill( + tn: trio.Nursery, get_hist, mod: ModuleType, mkt: MktPair, @@ -510,23 +531,6 @@ async def start_backfill( f'Finished filling gap to tsdb start @ {backfill_until_dt}!' ) - # NOTE: conduct tsdb timestamp gap detection and backfill any - # seemingly missing (null-time) segments.. - - # TODO: ideally these can never exist! - # -[ ] somehow it seems sometimes we're writing zero-ed - # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this - # work PREVENTAVELY instead? - # -[ ] fill in non-zero epoch time values ALWAYS! - await maybe_fill_null_segments( - shm=shm, - timeframe=timeframe, - get_hist=get_hist, - sampler_stream=sampler_stream, - mkt=mkt, - ) - # XXX: extremely important, there can be no checkpoints # in the block above to avoid entering new ``frames`` # values while we're pipelining the current ones to @@ -537,6 +541,12 @@ async def start_backfill( bf_done.set() +# NOTE: originally this was used to cope with a tsdb (marketstore) +# which could not delivery very large frames of history over gRPC +# (thanks goolag) due to corruption issues. NOW, using apache +# parquet (by default in the local filesys) we don't have this +# requirement since the files can be loaded very quickly in +# entirety to memory via async def back_load_from_tsdb( storemod: ModuleType, storage: StorageClient, @@ -674,10 +684,94 @@ async def back_load_from_tsdb( # await sampler_stream.send('broadcast_all') +async def push_latest_frame( + dt_eps: list[DateTime, DateTime], + shm: ShmArray, + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ], + timeframe: float, + config: dict, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, +): + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + try: + ( + array, + mr_start_dt, + mr_end_dt, + ) = await get_hist( + timeframe, + end_dt=None, + ) + # so caller can access these ep values + dt_eps.extend([ + mr_start_dt, + mr_end_dt, + ]) + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + except DataUnavailable: + task_status.started() + + if timeframe > 1: + await tractor.pause() + + return + + # NOTE: on the first history, most recent history + # frame we PREPEND from the current shm ._last index + # and thus a gap between the earliest datum loaded here + # and the latest loaded from the tsdb may exist! + log.info(f'Pushing {array.size} to shm!') + shm.push( + array, + prepend=True, # append on first frame + ) + + +async def load_tsdb_hist( + storage: StorageClient, + mkt: MktPair, + timeframe: float, +) -> tuple[ + np.ndarray, + DateTime, + DateTime, +] | None: + # loads a (large) frame of data from the tsdb depending + # on the db's query size limit; our "nativedb" (using + # parquet) generally can load the entire history into mem + # but if not then below the remaining history can be lazy + # loaded? + fqme: str = mkt.fqme + tsdb_entry: tuple[ + np.ndarray, + DateTime, + DateTime, + ] + try: + tsdb_entry: tuple | None = await storage.load( + fqme, + timeframe=timeframe, + ) + return tsdb_entry + + except TimeseriesNotFound: + log.warning( + f'No timeseries yet for {timeframe}@{fqme}' + ) + return None + + async def tsdb_backfill( mod: ModuleType, storemod: ModuleType, - tn: trio.Nursery, storage: StorageClient, mkt: MktPair, @@ -692,175 +786,193 @@ async def tsdb_backfill( ) -> None: + if timeframe not in (1, 60): + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {timeframe} seconds..\n' + 'So yuh.. dun do dat brudder.' + ) + get_hist: Callable[ [int, datetime, datetime], tuple[np.ndarray, str] ] config: dict[str, int] - async with mod.open_history_client( - mkt, - ) as (get_hist, config): + async with ( + mod.open_history_client( + mkt, + ) as (get_hist, config), + + # NOTE: this sub-nursery splits to tasks for the given + # sampling rate to concurrently load offline tsdb + # timeseries as well as new data from the venue backend! + ): log.info( f'`{mod}` history client returned backfill config:\n' f'{config}\n' ) - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - try: - ( - array, - mr_start_dt, - mr_end_dt, - ) = await get_hist( + dt_eps: list[DateTime, DateTime] = [] + async with trio.open_nursery() as tn: + tn.start_soon( + push_latest_frame, + dt_eps, + shm, + get_hist, timeframe, - end_dt=None, + config, ) - # XXX: timeframe not supported for backend (since - # above exception type), terminate immediately since - # there's no backfilling possible. - except DataUnavailable: + # tell parent task to continue + # TODO: really we'd want this the other way with the + # tsdb load happening asap and the since the latest + # frame query will normally be the main source of + # latency? task_status.started() - if timeframe > 1: - await tractor.pause() + tsdb_entry: tuple = await load_tsdb_hist( + storage, + mkt, + timeframe, + ) - return - - # NOTE: on the first history, most recent history - # frame we PREPEND from the current shm ._last index - # and thus a gap between the earliest datum loaded here - # and the latest loaded from the tsdb may exist! - log.info(f'Pushing {array.size} to shm!') - shm.push( - array, - prepend=True, # append on first frame - ) + # NOTE: iabs to start backfilling from, reverse chronological, + # ONLY AFTER the first history frame has been pushed to + # mem! backfill_gap_from_shm_index: int = shm._first.value + 1 - # tell parent task to continue - task_status.started() + ( + mr_start_dt, + mr_end_dt, + ) = dt_eps - # loads a (large) frame of data from the tsdb depending - # on the db's query size limit; our "nativedb" (using - # parquet) generally can load the entire history into mem - # but if not then below the remaining history can be lazy - # loaded? - fqme: str = mkt.fqme - last_tsdb_dt: datetime | None = None - try: - tsdb_entry: tuple | None = await storage.load( - fqme, - timeframe=timeframe, - ) - except TimeseriesNotFound: - log.warning( - f'No timeseries yet for {timeframe}@{fqme}' - ) - else: - ( - tsdb_history, - first_tsdb_dt, - last_tsdb_dt, - ) = tsdb_entry + async with trio.open_nursery() as tn: - # calc the index from which the tsdb data should be - # prepended, presuming there is a gap between the - # latest frame (loaded/read above) and the latest - # sample loaded from the tsdb. - backfill_diff: Duration = mr_start_dt - last_tsdb_dt - offset_s: float = backfill_diff.in_seconds() + # Prepend any tsdb history to the shm buffer which should + # now be full of the most recent history pulled from the + # backend's last frame. + if tsdb_entry: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry - # XXX EDGE CASEs: the most recent frame overlaps with - # prior tsdb history!! - # - so the latest frame's start time is earlier then - # the tsdb's latest sample. - # - alternatively this may also more generally occur - # when the venue was closed (say over the weeknd) - # causing a timeseries gap, AND the query frames size - # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS - # GREATER THAN the current venue-market's operating - # session (time) we will receive datums from BEFORE THE - # CLOSURE GAP and thus the `offset_s` value will be - # NEGATIVE! In this case we need to ensure we don't try - # to push datums that have already been recorded in the - # tsdb. In this case we instead only retreive and push - # the series portion missing from the db's data set. - # if offset_s < 0: - # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt - # non_overlap_offset_s: float = backfill_diff.in_seconds() + # if there is a gap to backfill from the first + # history frame until the last datum loaded from the tsdb + # continue that now in the background + bf_done = await tn.start( + partial( + start_backfill, + tn=tn, + get_hist=get_hist, + mod=mod, + mkt=mkt, + shm=shm, + timeframe=timeframe, - offset_samples: int = round(offset_s / timeframe) - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - if offset_s > 0: - # NOTE XXX: ONLY when there is an actual gap - # between the earliest sample in the latest history - # frame do we want to NOT stick the latest tsdb - # history adjacent to that latest frame! - prepend_start = shm._first.value - offset_samples + 1 - to_push = tsdb_history[-prepend_start:] - else: - # when there is overlap we want to remove the - # overlapping samples from the tsdb portion (taking - # instead the latest frame's values since THEY - # SHOULD BE THE SAME) and prepend DIRECTLY adjacent - # to the latest frame! - # TODO: assert the overlap segment array contains - # the same values!?! - prepend_start = shm._first.value - to_push = tsdb_history[-(shm._first.value):offset_samples - 1] - - # tsdb history is so far in the past we can't fit it in - # shm buffer space so simply don't load it! - if prepend_start > 0: - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - start=prepend_start, - field_map=storemod.ohlc_key_map, + backfill_from_shm_index=backfill_gap_from_shm_index, + backfill_from_dt=mr_start_dt, + sampler_stream=sampler_stream, + backfill_until_dt=last_tsdb_dt, + storage=storage, + write_tsdb=True, + ) ) - log.info(f'Loaded {to_push.shape} datums from storage') + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() + # XXX EDGE CASEs: the most recent frame overlaps with + # prior tsdb history!! + # - so the latest frame's start time is earlier then + # the tsdb's latest sample. + # - alternatively this may also more generally occur + # when the venue was closed (say over the weeknd) + # causing a timeseries gap, AND the query frames size + # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS + # GREATER THAN the current venue-market's operating + # session (time) we will receive datums from BEFORE THE + # CLOSURE GAP and thus the `offset_s` value will be + # NEGATIVE! In this case we need to ensure we don't try + # to push datums that have already been recorded in the + # tsdb. In this case we instead only retreive and push + # the series portion missing from the db's data set. + # if offset_s < 0: + # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt + # non_overlap_offset_s: float = backfill_diff.in_seconds() + + offset_samples: int = round(offset_s / timeframe) + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + if offset_s > 0: + # NOTE XXX: ONLY when there is an actual gap + # between the earliest sample in the latest history + # frame do we want to NOT stick the latest tsdb + # history adjacent to that latest frame! + prepend_start = shm._first.value - offset_samples + 1 + to_push = tsdb_history[-prepend_start:] + else: + # when there is overlap we want to remove the + # overlapping samples from the tsdb portion (taking + # instead the latest frame's values since THEY + # SHOULD BE THE SAME) and prepend DIRECTLY adjacent + # to the latest frame! + # TODO: assert the overlap segment array contains + # the same values!?! + prepend_start = shm._first.value + to_push = tsdb_history[-(shm._first.value):offset_samples - 1] + + # tsdb history is so far in the past we can't fit it in + # shm buffer space so simply don't load it! + if prepend_start > 0: + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any + # seemingly missing (null-time) segments.. + # TODO: ideally these can never exist! + # -[ ] somehow it seems sometimes we're writing zero-ed + # segments to tsdbs during teardown? + # -[ ] can we ensure that the backcfiller tasks do this + # work PREVENTAVELY instead? + # -[ ] fill in non-zero epoch time values ALWAYS! + # await maybe_fill_null_segments( + nulls_detected: trio.Event = await tn.start(partial( + maybe_fill_null_segments, + + shm=shm, + timeframe=timeframe, + get_hist=get_hist, + sampler_stream=sampler_stream, + mkt=mkt, + )) + + + # TODO: who would want to? + await nulls_detected.wait() + + await bf_done.wait() # TODO: maybe start history anal and load missing "history # gaps" via backend.. - if timeframe not in (1, 60): - raise ValueError( - '`piker` only needs to support 1m and 1s sampling ' - 'but ur api is trying to deliver a longer ' - f'timeframe of {timeframe} seconds..\n' - 'So yuh.. dun do dat brudder.' - ) - - # if there is a gap to backfill from the first - # history frame until the last datum loaded from the tsdb - # continue that now in the background - bf_done = await tn.start( - partial( - start_backfill, - get_hist=get_hist, - mod=mod, - mkt=mkt, - shm=shm, - timeframe=timeframe, - backfill_from_shm_index=backfill_gap_from_shm_index, - backfill_from_dt=mr_start_dt, - sampler_stream=sampler_stream, - backfill_until_dt=last_tsdb_dt, - storage=storage, - write_tsdb=True, - ) - ) - # if len(hist_shm.array) < 2: # TODO: there's an edge case here to solve where if the last # frame before market close (at least on ib) was pushed and @@ -874,32 +986,29 @@ async def tsdb_backfill( # backload any further data from tsdb (concurrently per # timeframe) if not all data was able to be loaded (in memory) # from the ``StorageClient.load()`` call above. - try: - await trio.sleep_forever() - finally: - return + await trio.sleep_forever() # XXX NOTE: this is legacy from when we were using # marketstore and we needed to continue backloading # incrementally from the tsdb client.. (bc it couldn't # handle a single large query with gRPC for some # reason.. classic goolag pos) - tn.start_soon( - back_load_from_tsdb, + # tn.start_soon( + # back_load_from_tsdb, - storemod, - storage, - fqme, + # storemod, + # storage, + # fqme, - tsdb_history, - last_tsdb_dt, - mr_start_dt, - mr_end_dt, - bf_done, + # tsdb_history, + # last_tsdb_dt, + # mr_start_dt, + # mr_end_dt, + # bf_done, - timeframe, - shm, - ) + # timeframe, + # shm, + # ) async def manage_history( @@ -1007,6 +1116,11 @@ async def manage_history( async with ( storage.open_storage_client() as (storemod, client), + + # NOTE: this nursery spawns a task per "timeframe" (aka + # sampling period) data set since normally differently + # sampled timeseries can be loaded / process independently + # ;) trio.open_nursery() as tn, ): log.info( @@ -1056,7 +1170,6 @@ async def manage_history( tsdb_backfill, mod=mod, storemod=storemod, - tn=tn, # bus, storage=client, mkt=mkt,