diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 5c087898..e97f4023 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -242,6 +242,7 @@ def anal( trio.run(main) +# TODO, move to `.tsp._annotate` async def markup_gaps( fqme: str, timeframe: float, @@ -288,18 +289,38 @@ async def markup_gaps( ) # XXX: probably a gap in the (newly sorted or de-duplicated) # dt-df, so we might need to re-index first.. + dt: pl.Series = row['dt'] + dt_prev: pl.Series = row['dt_prev'] if prev_r.is_empty(): - await tractor.pause() + + # XXX, filter out any special ignore cases, + # - UNIX-epoch stamped datums + # - first row + if ( + dt_prev.dt.epoch()[0] == 0 + or + dt.dt.epoch()[0] == 0 + ): + log.warning('Skipping row with UNIX epoch timestamp ??') + continue + + if wdts[0]['index'][0] == iend: # first row + log.warning('Skipping first-row (has no previous obvi) !!') + continue + + # XXX, if the previous-row by shm-index is missing, + # meaning there is a missing sample (set), get the prior + # row by df index and attempt to use it? + i_wdts: pl.DataFrame = wdts.with_row_index(name='i') + i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0] + prev_row_by_i = wdts[i_row] + prev_r: pl.DataFrame = prev_row_by_i + + # debug any missing pre-row + if tractor._state.is_debug_mode(): + await tractor.pause() istart: int = prev_r['index'][0] - # dt_start_t: float = dt_prev.timestamp() - - # start_t: float = prev_r['time'] - # assert ( - # dt_start_t - # == - # start_t - # ) # TODO: implement px-col width measure # and ensure at least as many px-cols @@ -358,6 +379,7 @@ def ldshm( fqme: str, write_parquet: bool = True, reload_parquet_to_shm: bool = True, + pdb: bool = False, # --pdb passed? ) -> None: ''' @@ -377,7 +399,7 @@ def ldshm( open_piker_runtime( 'polars_boi', enable_modules=['piker.data._sharedmem'], - debug_mode=True, + debug_mode=pdb, ), open_storage_client() as ( mod, @@ -397,17 +419,19 @@ def ldshm( times: np.ndarray = shm.array['time'] d1: float = float(times[-1] - times[-2]) - d2: float = float(times[-2] - times[-3]) - med: float = np.median(np.diff(times)) - if ( - d1 < 1. - and d2 < 1. - and med < 1. - ): - raise ValueError( - f'Something is wrong with time period for {shm}:\n{times}' - ) - + d2: float = 0 + # XXX, take a median sample rate if sufficient data + if times.size > 2: + d2: float = float(times[-2] - times[-3]) + med: float = np.median(np.diff(times)) + if ( + d1 < 1. + and d2 < 1. + and med < 1. + ): + raise ValueError( + f'Something is wrong with time period for {shm}:\n{times}' + ) period_s: float = float(max(d1, d2, med)) null_segs: tuple = tsp.get_null_segs( @@ -417,7 +441,9 @@ def ldshm( # TODO: call null-seg fixer somehow? if null_segs: - await tractor.pause() + + if tractor._state.is_debug_mode(): + await tractor.pause() # async with ( # trio.open_nursery() as tn, # mod.open_history_client( @@ -498,8 +524,11 @@ def ldshm( if ( not venue_gaps.is_empty() or ( - period_s < 60 - and not step_gaps.is_empty() + not step_gaps.is_empty() + # XXX, i presume i put this bc i was guarding + # for ib venue gaps? + # and + # period_s < 60 ) ): # write repaired ts to parquet-file?