Add gap detection into the `store ldshm` cmd

account_tests
Tyler Goodlet 2023-07-26 15:45:55 -04:00
parent d94ab9d5b2
commit 385561276b
1 changed files with 21 additions and 7 deletions

View File

@ -150,6 +150,7 @@ def anal(
open_piker_runtime( open_piker_runtime(
'tsdb_polars_anal', 'tsdb_polars_anal',
# enable_modules=['piker.service._ahab'] # enable_modules=['piker.service._ahab']
debug_mode=True,
), ),
open_storage_client() as (mod, client), open_storage_client() as (mod, client),
): ):
@ -168,10 +169,10 @@ def anal(
src_df = await client.as_df(fqme, period) src_df = await client.as_df(fqme, period)
from piker.data import _timeseries as tsmod from piker.data import _timeseries as tsmod
df = tsmod.with_dts(src_df) df: pl.DataFrame = tsmod.with_dts(src_df)
gaps: pl.DataFrame = tsmod.detect_time_gaps(df) gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
if gaps: if not gaps.is_empty():
print(f'Gaps found:\n{gaps}') print(f'Gaps found:\n{gaps}')
# TODO: something better with tab completion.. # TODO: something better with tab completion..
@ -216,7 +217,13 @@ def iter_dfs_from_shms(fqme: str) -> Generator[
# lookup array buffer size based on file suffix # lookup array buffer size based on file suffix
# being either .rt or .hist # being either .rt or .hist
size: int = sizes[shmfile.name.rsplit('.')[-1]] key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df, # attach to any shm buffer, load array into polars df,
# write to local parquet file. # write to local parquet file.
@ -271,24 +278,31 @@ def ldshm(
open_piker_runtime( open_piker_runtime(
'polars_boi', 'polars_boi',
enable_modules=['piker.data._sharedmem'], enable_modules=['piker.data._sharedmem'],
debug_mode=True,
), ),
): ):
df: pl.DataFrame | None = None df: pl.DataFrame | None = None
for shmfile, shm, df in iter_dfs_from_shms(fqme): for shmfile, shm, src_df in iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
secs: float = times[-1] - times[-2] secs: float = times[-1] - times[-2]
if secs < 1.: if secs < 1.:
breakpoint()
raise ValueError( raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
from piker.data import _timeseries as tsmod
df: pl.DataFrame = tsmod.with_dts(src_df)
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?
await tractor.pause() if (
not gaps.is_empty()
or secs > 2
):
await tractor.pause()
# write to parquet file? # write to parquet file?
if write_parquet: if write_parquet: