From 9ebb977731f34b8f5a53204d2ed2e3e03dd7fe62 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 21 Jan 2026 21:34:45 -0500 Subject: [PATCH] Tolerate various "bad data" cases in `markup_gaps()` Namely such that when the previous-df-row by our shm-abs-'index' doesn't exist we ignore certain cases which are likely due to borked-but-benign samples written to the tsdb or rt shm buffers prior. Particularly we now ignore, - any `dt`/`prev_dt` values which are UNIX-epoch timestamped (val of 0). - any row-is-first-row in the df; there is no previous. - any missing previous datum by 'index', in which case we lookup the `wdts` prior row and use that instead. * this would indicate a missing sample for the time-step but we can still detect a "gap" by looking at the prior row, by df-abs-index `i`, and use its timestamp to determine the period/size of missing samples (which need to likely still be retrieved). * in this case i'm leaving in a pause-point for introspecting these rarer cases when `--pdb` is passed via CLI. Relatedly in the `piker store` CLI ep, - add `--pdb` flag to `piker store`, pass it verbatim as `debug_mode`. - when `times` has only a single row, don't calc a `period_s` median. - only trace `null_segs` when in debug mode. - always markup/dedupe gaps for `period_s==60` --- piker/storage/cli.py | 77 ++++++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 5c087898..e97f4023 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -242,6 +242,7 @@ def anal( trio.run(main) +# TODO, move to `.tsp._annotate` async def markup_gaps( fqme: str, timeframe: float, @@ -288,18 +289,38 @@ async def markup_gaps( ) # XXX: probably a gap in the (newly sorted or de-duplicated) # dt-df, so we might need to re-index first.. + dt: pl.Series = row['dt'] + dt_prev: pl.Series = row['dt_prev'] if prev_r.is_empty(): - await tractor.pause() + + # XXX, filter out any special ignore cases, + # - UNIX-epoch stamped datums + # - first row + if ( + dt_prev.dt.epoch()[0] == 0 + or + dt.dt.epoch()[0] == 0 + ): + log.warning('Skipping row with UNIX epoch timestamp ??') + continue + + if wdts[0]['index'][0] == iend: # first row + log.warning('Skipping first-row (has no previous obvi) !!') + continue + + # XXX, if the previous-row by shm-index is missing, + # meaning there is a missing sample (set), get the prior + # row by df index and attempt to use it? + i_wdts: pl.DataFrame = wdts.with_row_index(name='i') + i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0] + prev_row_by_i = wdts[i_row] + prev_r: pl.DataFrame = prev_row_by_i + + # debug any missing pre-row + if tractor._state.is_debug_mode(): + await tractor.pause() istart: int = prev_r['index'][0] - # dt_start_t: float = dt_prev.timestamp() - - # start_t: float = prev_r['time'] - # assert ( - # dt_start_t - # == - # start_t - # ) # TODO: implement px-col width measure # and ensure at least as many px-cols @@ -358,6 +379,7 @@ def ldshm( fqme: str, write_parquet: bool = True, reload_parquet_to_shm: bool = True, + pdb: bool = False, # --pdb passed? ) -> None: ''' @@ -377,7 +399,7 @@ def ldshm( open_piker_runtime( 'polars_boi', enable_modules=['piker.data._sharedmem'], - debug_mode=True, + debug_mode=pdb, ), open_storage_client() as ( mod, @@ -397,17 +419,19 @@ def ldshm( times: np.ndarray = shm.array['time'] d1: float = float(times[-1] - times[-2]) - d2: float = float(times[-2] - times[-3]) - med: float = np.median(np.diff(times)) - if ( - d1 < 1. - and d2 < 1. - and med < 1. - ): - raise ValueError( - f'Something is wrong with time period for {shm}:\n{times}' - ) - + d2: float = 0 + # XXX, take a median sample rate if sufficient data + if times.size > 2: + d2: float = float(times[-2] - times[-3]) + med: float = np.median(np.diff(times)) + if ( + d1 < 1. + and d2 < 1. + and med < 1. + ): + raise ValueError( + f'Something is wrong with time period for {shm}:\n{times}' + ) period_s: float = float(max(d1, d2, med)) null_segs: tuple = tsp.get_null_segs( @@ -417,7 +441,9 @@ def ldshm( # TODO: call null-seg fixer somehow? if null_segs: - await tractor.pause() + + if tractor._state.is_debug_mode(): + await tractor.pause() # async with ( # trio.open_nursery() as tn, # mod.open_history_client( @@ -498,8 +524,11 @@ def ldshm( if ( not venue_gaps.is_empty() or ( - period_s < 60 - and not step_gaps.is_empty() + not step_gaps.is_empty() + # XXX, i presume i put this bc i was guarding + # for ib venue gaps? + # and + # period_s < 60 ) ): # write repaired ts to parquet-file?