diff --git a/piker/storage/cli.py b/piker/storage/cli.py index f0ae671f..f4570fa6 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -136,6 +136,53 @@ def delete( trio.run(main, symbols) +def dedupe(src_df: pl.DataFrame) -> tuple[ + pl.DataFrame, # with dts + pl.DataFrame, # gaps + pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) + bool, +]: + ''' + Check for time series gaps and if found + de-duplicate any datetime entries, check for + a frame height diff and return the newly + dt-deduplicated frame. + + ''' + from piker.data import _timeseries as tsp + df: pl.DataFrame = tsp.with_dts(src_df) + gaps: pl.DataFrame = tsp.detect_time_gaps(df) + if not gaps.is_empty(): + + # remove duplicated datetime samples/sections + deduped: pl.DataFrame = tsp.dedup_dt(df) + deduped_gaps = tsp.detect_time_gaps(deduped) + + log.warning( + f'Gaps found:\n{gaps}\n' + f'deduped Gaps found:\n{deduped_gaps}' + ) + # TODO: rewrite this in polars and/or convert to + # ndarray to detect and remove? + # null_gaps = tsp.detect_null_time_gap() + + diff: int = ( + df.height + - + deduped.height + ) + was_deduped: bool = False + if diff: + deduped: bool = True + + return ( + df, + gaps, + deduped, + was_deduped, + ) + + @store.command() def anal( fqme: str, @@ -144,7 +191,11 @@ def anal( ) -> np.ndarray: ''' - Anal-ysis is when you take the data do stuff to it, i think. + Anal-ysis is when you take the data do stuff to it. + + NOTE: This ONLY loads the offline timeseries data (by default + from a parquet file) NOT the in-shm version you might be seeing + in a chart. ''' async def main(): @@ -162,6 +213,7 @@ def anal( syms: list[str] = await client.list_keys() log.info(f'{len(syms)} FOUND for {mod.name}') + history: ShmArray # np buffer format ( history, first_dt, @@ -172,13 +224,26 @@ def anal( ) assert first_dt < last_dt - src_df = await client.as_df(fqme, period) - from piker.data import _timeseries as tsmod - df: pl.DataFrame = tsmod.with_dts(src_df) - gaps: pl.DataFrame = tsmod.detect_time_gaps(df) + shm_df: pl.DataFrame = await client.as_df( + fqme, + period, + ) - if not gaps.is_empty(): - print(f'Gaps found:\n{gaps}') + df: pl.DataFrame # with dts + deduped: pl.DataFrame # deduplicated dts + ( + df, + gaps, + deduped, + shortened, + ) = dedupe(shm_df) + + if shortened: + await client.write_ohlcv( + fqme, + ohlcv=deduped, + timeframe=period, + ) # TODO: something better with tab completion.. # is there something more minimal but nearly as @@ -275,7 +340,7 @@ def ldshm( ''' Linux ONLY: load any fqme file name matching shm buffer from /dev/shm/ into an OHLCV numpy array and polars DataFrame, - optionally write to .parquet file. + optionally write to offline storage via `.parquet` file. ''' async def main(): @@ -287,7 +352,7 @@ def ldshm( ), ): df: pl.DataFrame | None = None - for shmfile, shm, src_df in iter_dfs_from_shms(fqme): + for shmfile, shm, shm_df in iter_dfs_from_shms(fqme): # compute ohlc properties for naming times: np.ndarray = shm.array['time'] @@ -297,9 +362,16 @@ def ldshm( f'Something is wrong with time period for {shm}:\n{times}' ) - from piker.data import _timeseries as tsmod - df: pl.DataFrame = tsmod.with_dts(src_df) - gaps: pl.DataFrame = tsmod.detect_time_gaps(df) + + # over-write back to shm? + df: pl.DataFrame # with dts + deduped: pl.DataFrame # deduplicated dts + ( + df, + gaps, + deduped, + was_dded, + ) = dedupe(shm_df) # TODO: maybe only optionally enter this depending # on some CLI flags and/or gap detection? diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index f4ecfbea..0b15d4d7 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -272,9 +272,21 @@ class NativeStorageClient: # limit: int = int(200e3), ) -> np.ndarray: - path: Path = self.mk_path(fqme, period=int(timeframe)) + path: Path = self.mk_path( + fqme, + period=int(timeframe), + ) df: pl.DataFrame = pl.read_parquet(path) - self._dfs.setdefault(timeframe, {})[fqme] = df + + # cache df for later usage since we (currently) need to + # convert to np.ndarrays to push to our `ShmArray` rt + # buffers subsys but later we may operate entirely on + # pyarrow arrays/buffers so keeping the dfs around for + # a variety of purposes is handy. + self._dfs.setdefault( + timeframe, + {}, + )[fqme] = df # TODO: filter by end and limit inputs # times: pl.Series = df['time'] @@ -329,7 +341,7 @@ class NativeStorageClient: time.time() - start, ndigits=6, ) - print( + log.info( f'parquet write took {delay} secs\n' f'file path: {path}' ) @@ -339,7 +351,7 @@ class NativeStorageClient: async def write_ohlcv( self, fqme: str, - ohlcv: np.ndarray, + ohlcv: np.ndarray | pl.DataFrame, timeframe: int, ) -> Path: