diff --git a/piker/storage/cli.py b/piker/storage/cli.py index e4daffa9..abde1b3c 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -150,6 +150,7 @@ def anal( open_piker_runtime( 'tsdb_polars_anal', # enable_modules=['piker.service._ahab'] + debug_mode=True, ), open_storage_client() as (mod, client), ): @@ -168,10 +169,10 @@ def anal( src_df = await client.as_df(fqme, period) from piker.data import _timeseries as tsmod - df = tsmod.with_dts(src_df) + df: pl.DataFrame = tsmod.with_dts(src_df) gaps: pl.DataFrame = tsmod.detect_time_gaps(df) - if gaps: + if not gaps.is_empty(): print(f'Gaps found:\n{gaps}') # TODO: something better with tab completion.. @@ -216,7 +217,13 @@ def iter_dfs_from_shms(fqme: str) -> Generator[ # lookup array buffer size based on file suffix # being either .rt or .hist - size: int = sizes[shmfile.name.rsplit('.')[-1]] + key: str = shmfile.name.rsplit('.')[-1] + + # skip FSP buffers for now.. + if key not in sizes: + continue + + size: int = sizes[key] # attach to any shm buffer, load array into polars df, # write to local parquet file. @@ -271,24 +278,31 @@ def ldshm( open_piker_runtime( 'polars_boi', enable_modules=['piker.data._sharedmem'], + debug_mode=True, ), ): - df: pl.DataFrame | None = None - for shmfile, shm, df in iter_dfs_from_shms(fqme): + for shmfile, shm, src_df in iter_dfs_from_shms(fqme): # compute ohlc properties for naming times: np.ndarray = shm.array['time'] secs: float = times[-1] - times[-2] if secs < 1.: - breakpoint() raise ValueError( f'Something is wrong with time period for {shm}:\n{times}' ) + from piker.data import _timeseries as tsmod + df: pl.DataFrame = tsmod.with_dts(src_df) + gaps: pl.DataFrame = tsmod.detect_time_gaps(df) + # TODO: maybe only optionally enter this depending # on some CLI flags and/or gap detection? - await tractor.pause() + if ( + not gaps.is_empty() + or secs > 2 + ): + await tractor.pause() # write to parquet file? if write_parquet: