From f7cc43ee0b51bfa831283e8654c01e6192ebb487 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Dec 2023 11:56:57 -0500 Subject: [PATCH] Add pauses to `store anal/ldshm` only on bad segs Particularly halting before maybe writing the repaired timeseries history in `store anal` to optionally allow user to avoid writing to storage. --- piker/storage/cli.py | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 28ff23ab..c15f4273 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -36,11 +36,8 @@ from piker.cli import cli from piker.config import get_conf_dir from piker.data import ( ShmArray, - tsp, -) -from piker.data.history import ( - iter_dfs_from_shms, ) +from piker import tsp from . import ( log, ) @@ -189,8 +186,8 @@ def anal( frame=history, period=period, ) - if null_segs: - await tractor.pause() + # TODO: do tsp queries to backcend to fill i missing + # history and then prolly write it to tsdb! shm_df: pl.DataFrame = await client.as_df( fqme, @@ -206,18 +203,27 @@ def anal( diff, ) = tsp.dedupe(shm_df) + write_edits: bool = True + if ( + write_edits + and ( + diff + or null_segs + ) + ): + await tractor.pause() - if diff: await client.write_ohlcv( fqme, ohlcv=deduped, timeframe=period, ) - # TODO: something better with tab completion.. - # is there something more minimal but nearly as - # functional as ipython? - await tractor.pause() + else: + # TODO: something better with tab completion.. + # is there something more minimal but nearly as + # functional as ipython? + await tractor.pause() trio.run(main) @@ -243,7 +249,7 @@ def ldshm( ), ): df: pl.DataFrame | None = None - for shmfile, shm, shm_df in iter_dfs_from_shms(fqme): + for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme): # compute ohlc properties for naming times: np.ndarray = shm.array['time'] @@ -270,10 +276,10 @@ def ldshm( # TODO: maybe only optionally enter this depending # on some CLI flags and/or gap detection? - if not gaps.is_empty(): - await tractor.pause() - - if null_segs: + if ( + not gaps.is_empty() + or null_segs + ): await tractor.pause() # write to parquet file?