Add pauses to `store anal/ldshm` only on bad segs

Particularly halting before maybe writing the repaired timeseries
history in `store anal` to optionally allow user to avoid writing to
storage.
distribute_dis
Tyler Goodlet 2023-12-18 11:56:57 -05:00
parent f5dc21d3f4
commit f7cc43ee0b
1 changed files with 23 additions and 17 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -36,11 +36,8 @@ from piker.cli import cli
from piker.config import get_conf_dir from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
ShmArray, ShmArray,
tsp,
)
from piker.data.history import (
iter_dfs_from_shms,
) )
from piker import tsp
from . import ( from . import (
log, log,
) )
@ -189,8 +186,8 @@ def anal(
frame=history, frame=history,
period=period, period=period,
) )
if null_segs: # TODO: do tsp queries to backcend to fill i missing
await tractor.pause() # history and then prolly write it to tsdb!
shm_df: pl.DataFrame = await client.as_df( shm_df: pl.DataFrame = await client.as_df(
fqme, fqme,
@ -206,14 +203,23 @@ def anal(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
write_edits: bool = True
if (
write_edits
and (
diff
or null_segs
)
):
await tractor.pause()
if diff:
await client.write_ohlcv( await client.write_ohlcv(
fqme, fqme,
ohlcv=deduped, ohlcv=deduped,
timeframe=period, timeframe=period,
) )
else:
# TODO: something better with tab completion.. # TODO: something better with tab completion..
# is there something more minimal but nearly as # is there something more minimal but nearly as
# functional as ipython? # functional as ipython?
@ -243,7 +249,7 @@ def ldshm(
), ),
): ):
df: pl.DataFrame | None = None df: pl.DataFrame | None = None
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme): for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
@ -270,10 +276,10 @@ def ldshm(
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?
if not gaps.is_empty(): if (
await tractor.pause() not gaps.is_empty()
or null_segs
if null_segs: ):
await tractor.pause() await tractor.pause()
# write to parquet file? # write to parquet file?