From 61e52213b25717a28ee897ffcac110cd800726ce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Dec 2023 21:34:31 -0500 Subject: [PATCH] Oof, fix no-tsdb-entry since needs full backfill case! Got borked by the logic re-factoring to get more conc going around tsdb vs. latest frame loads with nested nurseries. So, repair all that such that we can still backfill symbols previously not loaded as well as drop all the `_FeedBus` instance passing to subtasks where it's definitely not needed. Toss in a pause point around sampler stream `'backfilling'` msgs as well since there's seems to be a weird ctx-cancelled propagation going on when a feed client disconnects during backfill and this might be where the src `tractor.ContextCancelled` is getting bubbled from? --- piker/tsp/__init__.py | 245 +++++++++++++++++++++++------------------- piker/tsp/_anal.py | 2 +- 2 files changed, 135 insertions(+), 112 deletions(-) diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 9da46591..324f8229 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -269,16 +269,20 @@ async def maybe_fill_null_segments( # - remember that in the display side, only refersh this # if the respective history is actually "in view". # loop - await sampler_stream.send({ - 'broadcast_all': { + try: + await sampler_stream.send({ + 'broadcast_all': { - # XXX NOTE XXX: see the - # `.ui._display.increment_history_view()` if block - # that looks for this info to FORCE a hard viz - # redraw! - 'backfilling': (mkt.fqme, timeframe), - }, - }) + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) + except tractor.ContextCancelled: + # log.exception + await tractor.pause() null_segs_detected.set() # RECHECK for more null-gaps @@ -354,7 +358,6 @@ async def maybe_fill_null_segments( async def start_backfill( - tn: trio.Nursery, get_hist, mod: ModuleType, mkt: MktPair, @@ -408,7 +411,6 @@ async def start_backfill( # settings above when the tsdb is detected as being empty. backfill_until_dt = backfill_from_dt.subtract(**period_duration) - # STAGE NOTE: "backward history gap filling": # - we push to the shm buffer until we have history back # until the latest entry loaded from the tsdb's table B) @@ -752,6 +754,8 @@ async def back_load_from_tsdb( async def push_latest_frame( + # box-type only that should get packed with the datetime + # objects received for the latest history frame dt_eps: list[DateTime, DateTime], shm: ShmArray, get_hist: Callable[ @@ -761,8 +765,11 @@ async def push_latest_frame( timeframe: float, config: dict, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, -): + task_status: TaskStatus[ + Exception | list[datetime, datetime] + ] = trio.TASK_STATUS_IGNORED, + +) -> list[datetime, datetime] | None: # get latest query's worth of history all the way # back to what is recorded in the tsdb try: @@ -779,17 +786,19 @@ async def push_latest_frame( mr_start_dt, mr_end_dt, ]) + task_status.started(dt_eps) # XXX: timeframe not supported for backend (since # above exception type), terminate immediately since # there's no backfilling possible. except DataUnavailable: - task_status.started() + task_status.started(None) if timeframe > 1: await tractor.pause() - return + # prolly tf not supported + return None # NOTE: on the first history, most recent history # frame we PREPEND from the current shm ._last index @@ -801,11 +810,16 @@ async def push_latest_frame( prepend=True, # append on first frame ) + return dt_eps + async def load_tsdb_hist( storage: StorageClient, mkt: MktPair, timeframe: float, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + ) -> tuple[ np.ndarray, DateTime, @@ -909,25 +923,36 @@ async def tsdb_backfill( # mem! backfill_gap_from_shm_index: int = shm._first.value + 1 - # Prepend any tsdb history to the shm buffer which should - # now be full of the most recent history pulled from the - # backend's last frame. - if ( - dt_eps - and tsdb_entry - ): - # unpack both the latest (gap) backfilled frame dts + # Prepend any tsdb history into the rt-shm-buffer which + # should NOW be getting filled with the most recent history + # pulled from the data-backend. + if dt_eps: + # well then, unpack the latest (gap) backfilled frame dts ( mr_start_dt, mr_end_dt, ) = dt_eps - # AND the tsdb history from (offline) storage) - ( - tsdb_history, - first_tsdb_dt, - last_tsdb_dt, - ) = tsdb_entry + # NOTE: when there's no offline data, there's 2 cases: + # - data backend doesn't support timeframe/sample + # period (in which case `dt_eps` should be `None` and + # we shouldn't be here!), or + # - no prior history has been stored (yet) and we need + # todo full backfill of the history now. + if tsdb_entry is None: + # indicate to backfill task to fill the whole + # shm buffer as much as it can! + last_tsdb_dt = None + + # there's existing tsdb history from (offline) storage + # so only backfill the gap between the + # most-recent-frame (mrf) and that latest sample. + else: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb @@ -937,7 +962,6 @@ async def tsdb_backfill( bf_done = await tn.start( partial( start_backfill, - tn=tn, get_hist=get_hist, mod=mod, mkt=mkt, @@ -954,95 +978,98 @@ async def tsdb_backfill( write_tsdb=True, ) ) + nulls_detected: trio.Event | None = None + if last_tsdb_dt is not None: + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() - # calc the index from which the tsdb data should be - # prepended, presuming there is a gap between the - # latest frame (loaded/read above) and the latest - # sample loaded from the tsdb. - backfill_diff: Duration = mr_start_dt - last_tsdb_dt - offset_s: float = backfill_diff.in_seconds() + # XXX EDGE CASEs: the most recent frame overlaps with + # prior tsdb history!! + # - so the latest frame's start time is earlier then + # the tsdb's latest sample. + # - alternatively this may also more generally occur + # when the venue was closed (say over the weeknd) + # causing a timeseries gap, AND the query frames size + # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS + # GREATER THAN the current venue-market's operating + # session (time) we will receive datums from BEFORE THE + # CLOSURE GAP and thus the `offset_s` value will be + # NEGATIVE! In this case we need to ensure we don't try + # to push datums that have already been recorded in the + # tsdb. In this case we instead only retreive and push + # the series portion missing from the db's data set. + # if offset_s < 0: + # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt + # non_overlap_offset_s: float = backfill_diff.in_seconds() - # XXX EDGE CASEs: the most recent frame overlaps with - # prior tsdb history!! - # - so the latest frame's start time is earlier then - # the tsdb's latest sample. - # - alternatively this may also more generally occur - # when the venue was closed (say over the weeknd) - # causing a timeseries gap, AND the query frames size - # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS - # GREATER THAN the current venue-market's operating - # session (time) we will receive datums from BEFORE THE - # CLOSURE GAP and thus the `offset_s` value will be - # NEGATIVE! In this case we need to ensure we don't try - # to push datums that have already been recorded in the - # tsdb. In this case we instead only retreive and push - # the series portion missing from the db's data set. - # if offset_s < 0: - # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt - # non_overlap_offset_s: float = backfill_diff.in_seconds() + offset_samples: int = round(offset_s / timeframe) - offset_samples: int = round(offset_s / timeframe) + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + if offset_s > 0: + # NOTE XXX: ONLY when there is an actual gap + # between the earliest sample in the latest history + # frame do we want to NOT stick the latest tsdb + # history adjacent to that latest frame! + prepend_start = shm._first.value - offset_samples + 1 + to_push = tsdb_history[-prepend_start:] + else: + # when there is overlap we want to remove the + # overlapping samples from the tsdb portion (taking + # instead the latest frame's values since THEY + # SHOULD BE THE SAME) and prepend DIRECTLY adjacent + # to the latest frame! + # TODO: assert the overlap segment array contains + # the same values!?! + prepend_start = shm._first.value + to_push = tsdb_history[-(shm._first.value):offset_samples - 1] - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - if offset_s > 0: - # NOTE XXX: ONLY when there is an actual gap - # between the earliest sample in the latest history - # frame do we want to NOT stick the latest tsdb - # history adjacent to that latest frame! - prepend_start = shm._first.value - offset_samples + 1 - to_push = tsdb_history[-prepend_start:] - else: - # when there is overlap we want to remove the - # overlapping samples from the tsdb portion (taking - # instead the latest frame's values since THEY - # SHOULD BE THE SAME) and prepend DIRECTLY adjacent - # to the latest frame! - # TODO: assert the overlap segment array contains - # the same values!?! - prepend_start = shm._first.value - to_push = tsdb_history[-(shm._first.value):offset_samples - 1] + # tsdb history is so far in the past we can't fit it in + # shm buffer space so simply don't load it! + if prepend_start > 0: + shm.push( + to_push, - # tsdb history is so far in the past we can't fit it in - # shm buffer space so simply don't load it! - if prepend_start > 0: - shm.push( - to_push, + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - start=prepend_start, - field_map=storemod.ohlc_key_map, - ) + log.info(f'Loaded {to_push.shape} datums from storage') - log.info(f'Loaded {to_push.shape} datums from storage') + # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any + # seemingly missing (null-time) segments.. + # TODO: ideally these can never exist! + # -[ ] somehow it seems sometimes we're writing zero-ed + # segments to tsdbs during teardown? + # -[ ] can we ensure that the backcfiller tasks do this + # work PREVENTAVELY instead? + # -[ ] fill in non-zero epoch time values ALWAYS! + # await maybe_fill_null_segments( + nulls_detected: trio.Event = await tn.start(partial( + maybe_fill_null_segments, - # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any - # seemingly missing (null-time) segments.. - # TODO: ideally these can never exist! - # -[ ] somehow it seems sometimes we're writing zero-ed - # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this - # work PREVENTAVELY instead? - # -[ ] fill in non-zero epoch time values ALWAYS! - # await maybe_fill_null_segments( - nulls_detected: trio.Event = await tn.start(partial( - maybe_fill_null_segments, - - shm=shm, - timeframe=timeframe, - get_hist=get_hist, - sampler_stream=sampler_stream, - mkt=mkt, - )) + shm=shm, + timeframe=timeframe, + get_hist=get_hist, + sampler_stream=sampler_stream, + mkt=mkt, + )) # 2nd nursery END # TODO: who would want to? - await nulls_detected.wait() + if nulls_detected: + await nulls_detected.wait() + await bf_done.wait() # TODO: maybe start history anal and load missing "history # gaps" via backend.. @@ -1087,7 +1114,6 @@ async def tsdb_backfill( async def manage_history( mod: ModuleType, - bus: _FeedsBus, mkt: MktPair, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -1244,7 +1270,6 @@ async def manage_history( tsdb_backfill, mod=mod, storemod=storemod, - # bus, storage=client, mkt=mkt, shm=tf2mem[timeframe], @@ -1337,5 +1362,3 @@ def iter_dfs_from_shms( shm, df, ) - - diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index 35b64dcb..c8044bef 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -526,7 +526,7 @@ def with_dts( pl.from_epoch(pl.col(time_col)).alias('dt'), ]).with_columns([ pl.from_epoch( - pl.col(f'{time_col}_prev') + column=pl.col(f'{time_col}_prev'), ).alias('dt_prev'), pl.col('dt').diff().alias('dt_diff'), ]) #.with_columns(