diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 9da46591..324f8229 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -269,16 +269,20 @@ async def maybe_fill_null_segments( # - remember that in the display side, only refersh this # if the respective history is actually "in view". # loop - await sampler_stream.send({ - 'broadcast_all': { + try: + await sampler_stream.send({ + 'broadcast_all': { - # XXX NOTE XXX: see the - # `.ui._display.increment_history_view()` if block - # that looks for this info to FORCE a hard viz - # redraw! - 'backfilling': (mkt.fqme, timeframe), - }, - }) + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) + except tractor.ContextCancelled: + # log.exception + await tractor.pause() null_segs_detected.set() # RECHECK for more null-gaps @@ -354,7 +358,6 @@ async def maybe_fill_null_segments( async def start_backfill( - tn: trio.Nursery, get_hist, mod: ModuleType, mkt: MktPair, @@ -408,7 +411,6 @@ async def start_backfill( # settings above when the tsdb is detected as being empty. backfill_until_dt = backfill_from_dt.subtract(**period_duration) - # STAGE NOTE: "backward history gap filling": # - we push to the shm buffer until we have history back # until the latest entry loaded from the tsdb's table B) @@ -752,6 +754,8 @@ async def back_load_from_tsdb( async def push_latest_frame( + # box-type only that should get packed with the datetime + # objects received for the latest history frame dt_eps: list[DateTime, DateTime], shm: ShmArray, get_hist: Callable[ @@ -761,8 +765,11 @@ async def push_latest_frame( timeframe: float, config: dict, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, -): + task_status: TaskStatus[ + Exception | list[datetime, datetime] + ] = trio.TASK_STATUS_IGNORED, + +) -> list[datetime, datetime] | None: # get latest query's worth of history all the way # back to what is recorded in the tsdb try: @@ -779,17 +786,19 @@ async def push_latest_frame( mr_start_dt, mr_end_dt, ]) + task_status.started(dt_eps) # XXX: timeframe not supported for backend (since # above exception type), terminate immediately since # there's no backfilling possible. except DataUnavailable: - task_status.started() + task_status.started(None) if timeframe > 1: await tractor.pause() - return + # prolly tf not supported + return None # NOTE: on the first history, most recent history # frame we PREPEND from the current shm ._last index @@ -801,11 +810,16 @@ async def push_latest_frame( prepend=True, # append on first frame ) + return dt_eps + async def load_tsdb_hist( storage: StorageClient, mkt: MktPair, timeframe: float, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + ) -> tuple[ np.ndarray, DateTime, @@ -909,25 +923,36 @@ async def tsdb_backfill( # mem! backfill_gap_from_shm_index: int = shm._first.value + 1 - # Prepend any tsdb history to the shm buffer which should - # now be full of the most recent history pulled from the - # backend's last frame. - if ( - dt_eps - and tsdb_entry - ): - # unpack both the latest (gap) backfilled frame dts + # Prepend any tsdb history into the rt-shm-buffer which + # should NOW be getting filled with the most recent history + # pulled from the data-backend. + if dt_eps: + # well then, unpack the latest (gap) backfilled frame dts ( mr_start_dt, mr_end_dt, ) = dt_eps - # AND the tsdb history from (offline) storage) - ( - tsdb_history, - first_tsdb_dt, - last_tsdb_dt, - ) = tsdb_entry + # NOTE: when there's no offline data, there's 2 cases: + # - data backend doesn't support timeframe/sample + # period (in which case `dt_eps` should be `None` and + # we shouldn't be here!), or + # - no prior history has been stored (yet) and we need + # todo full backfill of the history now. + if tsdb_entry is None: + # indicate to backfill task to fill the whole + # shm buffer as much as it can! + last_tsdb_dt = None + + # there's existing tsdb history from (offline) storage + # so only backfill the gap between the + # most-recent-frame (mrf) and that latest sample. + else: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb @@ -937,7 +962,6 @@ async def tsdb_backfill( bf_done = await tn.start( partial( start_backfill, - tn=tn, get_hist=get_hist, mod=mod, mkt=mkt, @@ -954,95 +978,98 @@ async def tsdb_backfill( write_tsdb=True, ) ) + nulls_detected: trio.Event | None = None + if last_tsdb_dt is not None: + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() - # calc the index from which the tsdb data should be - # prepended, presuming there is a gap between the - # latest frame (loaded/read above) and the latest - # sample loaded from the tsdb. - backfill_diff: Duration = mr_start_dt - last_tsdb_dt - offset_s: float = backfill_diff.in_seconds() + # XXX EDGE CASEs: the most recent frame overlaps with + # prior tsdb history!! + # - so the latest frame's start time is earlier then + # the tsdb's latest sample. + # - alternatively this may also more generally occur + # when the venue was closed (say over the weeknd) + # causing a timeseries gap, AND the query frames size + # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS + # GREATER THAN the current venue-market's operating + # session (time) we will receive datums from BEFORE THE + # CLOSURE GAP and thus the `offset_s` value will be + # NEGATIVE! In this case we need to ensure we don't try + # to push datums that have already been recorded in the + # tsdb. In this case we instead only retreive and push + # the series portion missing from the db's data set. + # if offset_s < 0: + # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt + # non_overlap_offset_s: float = backfill_diff.in_seconds() - # XXX EDGE CASEs: the most recent frame overlaps with - # prior tsdb history!! - # - so the latest frame's start time is earlier then - # the tsdb's latest sample. - # - alternatively this may also more generally occur - # when the venue was closed (say over the weeknd) - # causing a timeseries gap, AND the query frames size - # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS - # GREATER THAN the current venue-market's operating - # session (time) we will receive datums from BEFORE THE - # CLOSURE GAP and thus the `offset_s` value will be - # NEGATIVE! In this case we need to ensure we don't try - # to push datums that have already been recorded in the - # tsdb. In this case we instead only retreive and push - # the series portion missing from the db's data set. - # if offset_s < 0: - # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt - # non_overlap_offset_s: float = backfill_diff.in_seconds() + offset_samples: int = round(offset_s / timeframe) - offset_samples: int = round(offset_s / timeframe) + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + if offset_s > 0: + # NOTE XXX: ONLY when there is an actual gap + # between the earliest sample in the latest history + # frame do we want to NOT stick the latest tsdb + # history adjacent to that latest frame! + prepend_start = shm._first.value - offset_samples + 1 + to_push = tsdb_history[-prepend_start:] + else: + # when there is overlap we want to remove the + # overlapping samples from the tsdb portion (taking + # instead the latest frame's values since THEY + # SHOULD BE THE SAME) and prepend DIRECTLY adjacent + # to the latest frame! + # TODO: assert the overlap segment array contains + # the same values!?! + prepend_start = shm._first.value + to_push = tsdb_history[-(shm._first.value):offset_samples - 1] - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - if offset_s > 0: - # NOTE XXX: ONLY when there is an actual gap - # between the earliest sample in the latest history - # frame do we want to NOT stick the latest tsdb - # history adjacent to that latest frame! - prepend_start = shm._first.value - offset_samples + 1 - to_push = tsdb_history[-prepend_start:] - else: - # when there is overlap we want to remove the - # overlapping samples from the tsdb portion (taking - # instead the latest frame's values since THEY - # SHOULD BE THE SAME) and prepend DIRECTLY adjacent - # to the latest frame! - # TODO: assert the overlap segment array contains - # the same values!?! - prepend_start = shm._first.value - to_push = tsdb_history[-(shm._first.value):offset_samples - 1] + # tsdb history is so far in the past we can't fit it in + # shm buffer space so simply don't load it! + if prepend_start > 0: + shm.push( + to_push, - # tsdb history is so far in the past we can't fit it in - # shm buffer space so simply don't load it! - if prepend_start > 0: - shm.push( - to_push, + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - start=prepend_start, - field_map=storemod.ohlc_key_map, - ) + log.info(f'Loaded {to_push.shape} datums from storage') - log.info(f'Loaded {to_push.shape} datums from storage') + # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any + # seemingly missing (null-time) segments.. + # TODO: ideally these can never exist! + # -[ ] somehow it seems sometimes we're writing zero-ed + # segments to tsdbs during teardown? + # -[ ] can we ensure that the backcfiller tasks do this + # work PREVENTAVELY instead? + # -[ ] fill in non-zero epoch time values ALWAYS! + # await maybe_fill_null_segments( + nulls_detected: trio.Event = await tn.start(partial( + maybe_fill_null_segments, - # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any - # seemingly missing (null-time) segments.. - # TODO: ideally these can never exist! - # -[ ] somehow it seems sometimes we're writing zero-ed - # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this - # work PREVENTAVELY instead? - # -[ ] fill in non-zero epoch time values ALWAYS! - # await maybe_fill_null_segments( - nulls_detected: trio.Event = await tn.start(partial( - maybe_fill_null_segments, - - shm=shm, - timeframe=timeframe, - get_hist=get_hist, - sampler_stream=sampler_stream, - mkt=mkt, - )) + shm=shm, + timeframe=timeframe, + get_hist=get_hist, + sampler_stream=sampler_stream, + mkt=mkt, + )) # 2nd nursery END # TODO: who would want to? - await nulls_detected.wait() + if nulls_detected: + await nulls_detected.wait() + await bf_done.wait() # TODO: maybe start history anal and load missing "history # gaps" via backend.. @@ -1087,7 +1114,6 @@ async def tsdb_backfill( async def manage_history( mod: ModuleType, - bus: _FeedsBus, mkt: MktPair, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -1244,7 +1270,6 @@ async def manage_history( tsdb_backfill, mod=mod, storemod=storemod, - # bus, storage=client, mkt=mkt, shm=tf2mem[timeframe], @@ -1337,5 +1362,3 @@ def iter_dfs_from_shms( shm, df, ) - - diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index 35b64dcb..c8044bef 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -526,7 +526,7 @@ def with_dts( pl.from_epoch(pl.col(time_col)).alias('dt'), ]).with_columns([ pl.from_epoch( - pl.col(f'{time_col}_prev') + column=pl.col(f'{time_col}_prev'), ).alias('dt_prev'), pl.col('dt').diff().alias('dt_diff'), ]) #.with_columns(