piker/piker/storage/cli.py

485 lines
15 KiB
Python
Raw Normal View History

2020-11-06 17:23:14 +00:00
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
2020-11-06 17:23:14 +00:00
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Storage middle-ware CLIs.
2022-02-17 21:34:22 +00:00
"""
from __future__ import annotations
from pathlib import Path
import time
from types import ModuleType
from typing import (
TYPE_CHECKING,
)
import polars as pl
import numpy as np
import tractor
# import pendulum
from rich.console import Console
import trio
# from rich.markdown import Markdown
import typer
from piker.service import open_piker_runtime
from piker.cli import cli
from tractor.ipc._shm import ShmArray
from piker import tsp
from . import log
from . import (
__tsdbs__,
open_storage_client,
StorageClient,
)
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
store = typer.Typer()
@store.command()
def ls(
backends: list[str] = typer.Argument(
default=None,
help='Storage backends to query, default is all.'
),
):
from rich.table import Table
if not backends:
backends: list[str] = __tsdbs__
console = Console()
async def query_all():
nonlocal backends
async with (
open_piker_runtime(
'tsdb_storage',
),
2022-02-18 17:17:41 +00:00
):
for i, backend in enumerate(backends):
table = Table()
try:
async with open_storage_client(backend=backend) as (
mod,
client,
):
table.add_column(f'{mod.name}@{client.address}')
keys: list[str] = await client.list_keys()
for key in keys:
table.add_row(key)
console.print(table)
except Exception:
log.error(f'Unable to connect to storage engine: `{backend}`')
trio.run(query_all)
# TODO: like ls but takes in a pattern and matches
# @store.command()
# def search(
# patt: str,
# backends: list[str] = typer.Argument(
# default=None,
# help='Storage backends to query, default is all.'
# ),
# ):
# ...
@store.command()
def delete(
symbols: list[str],
backend: str = typer.Option(
default=None,
help='Storage backend to update'
),
# TODO: expose this as flagged multi-option?
timeframes: list[int] = [1, 60],
):
'''
Delete a storage backend's time series for (table) keys provided as
``symbols``.
'''
from . import open_storage_client
async def main(symbols: list[str]):
async with (
open_piker_runtime(
'tsdb_storage',
),
open_storage_client(backend) as (_, client),
trio.open_nursery() as n,
):
# spawn queries as tasks for max conc!
for fqme in symbols:
for tf in timeframes:
n.start_soon(
client.delete_ts,
fqme,
tf,
)
trio.run(main, symbols)
@store.command()
def anal(
fqme: str,
period: int = 60,
pdb: bool = False,
) -> np.ndarray:
'''
Anal-ysis is when you take the data do stuff to it.
NOTE: This ONLY loads the offline timeseries data (by default
from a parquet file) NOT the in-shm version you might be seeing
in a chart.
'''
async def main():
async with (
open_piker_runtime(
# are you a bear or boi?
'tsdb_polars_anal',
debug_mode=pdb,
),
open_storage_client() as (
mod,
client,
),
):
syms: list[str] = await client.list_keys()
log.info(f'{len(syms)} FOUND for {mod.name}')
history: ShmArray # np buffer format
(
history,
first_dt,
last_dt,
) = await client.load(
fqme,
period,
)
assert first_dt < last_dt
null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
# TODO: do tsp queries to backcend to fill i missing
# history and then prolly write it to tsdb!
shm_df: pl.DataFrame = await client.as_df(
fqme,
period,
)
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period,
)
write_edits: bool = True
if (
write_edits
and (
diff
or null_segs
)
):
await tractor.pause()
await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period,
)
else:
# TODO: something better with tab completion..
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
assert not null_segs
trio.run(main)
@store.command()
def ldshm(
fqme: str,
write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
pdb: bool = False, # --pdb passed?
) -> None:
'''
Linux ONLY: load any fqme file name matching shm buffer from
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
optionally write to offline storage via `.parquet` file.
'''
async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
)
actl: AnnotCtl
mod: ModuleType
client: StorageClient
async with (
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
debug_mode=pdb,
),
open_storage_client() as (
mod,
client,
),
open_annot_ctl() as actl,
):
shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for (
shmfile,
shm,
# parquet_path,
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2])
d2: float = 0
# XXX, take a median sample rate if sufficient data
if times.size > 2:
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
period_s: float = float(max(d1, d2, med))
log.info(
f'Processing shm buffer:\n'
f' file: {shmfile.name}\n'
f' period: {period_s}s\n'
)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: call null-seg fixer somehow?
if null_segs:
if tractor.runtime._state.is_debug_mode():
await tractor.pause()
# async with (
# trio.open_nursery() as tn,
# mod.open_history_client(
# mkt,
# ) as (get_hist, config),
# ):
# nulls_detected: trio.Event = await tn.start(partial(
# tsp.maybe_fill_null_segments,
# shm=shm,
# timeframe=timeframe,
# get_hist=get_hist,
# sampler_stream=sampler_stream,
# mkt=mkt,
# ))
# over-write back to shm?
wdts: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
wdts,
deduped,
diff,
Add vlm-based "smart" OHLCV de-duping & bar validation Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter" duplicate bars by attempting to distinguish between erroneous bars partially written during concurrent backfill race conditions vs. **actual** data quality issues from historical providers. Problem: -------- Concurrent writes (live updates vs. backfilling) can result in create duplicate timestamped ohlcv vars with different values. Some potential scenarios include, - a market live feed is cancelled during live update resulting in the "last" datum being partially updated with all the ticks for the time step. - when the feed is rebooted during charting, the backfiller will not finalize this bar since rn it presumes it should only fill data for time steps not already in the tsdb storage. Our current naive `.unique()` approach obvi keeps the incomplete bar and a "smarter" approach is to compare the provider's final vlm amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from the provider likely indicates the cancelled-during-live-write and **not** a datum discrepancy from said data provider. Analysis (with `claude`) of `zecusdt` data revealed: - 1000 duplicate timestamps - 999 identical bars (pure duplicates from 2022 backfill overlap) - 1 volume-monotonic conflict (live partial vs backfill complete) A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()` which: - sorts by vlm **before** deduplication and keep the most complete bar based on vlm monotonicity as well as the following OHLCV validation assumptions: * volume should always increase * high should be non-decreasing, * low should be non-increasing * open should be identical - Separates valid race conditions from provider data quality issues and reports and returns both dfs. Change summary by `claude`: - `.tsp._dedupe_smart`: new module with validation logic - `.tsp.__init__`: expose `dedupe_ohlcv_smart()` - `.storage.cli`: integrate smart dedupe, add logging for: * duplicate counts (identical vs monotonic races) * data quality violations (non-monotonic, invalid OHLC ranges) * warnings for provider data issues - Remove `assert not diff` (duplicates are valid now) Verified on `zecusdt`: correctly keeps index 3143645 (volume=287.777) over 3143644 (volume=140.299) for conflicting 2026-01-16 18:54 UTC bar. `claude`'s Summary of reasoning ------------------------------- - volume monotonicity is critical: a bar's volume only increases during its time window. - a backfilled bar should always have volume >= live updated. - violations indicate any of: * Provider data corruption * Non-OHLCV aggregation semantics * Timestamp misalignment (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-19 02:00:17 +00:00
valid_races,
dq_issues,
) = tsp.dedupe_ohlcv_smart(
shm_df,
)
Add vlm-based "smart" OHLCV de-duping & bar validation Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter" duplicate bars by attempting to distinguish between erroneous bars partially written during concurrent backfill race conditions vs. **actual** data quality issues from historical providers. Problem: -------- Concurrent writes (live updates vs. backfilling) can result in create duplicate timestamped ohlcv vars with different values. Some potential scenarios include, - a market live feed is cancelled during live update resulting in the "last" datum being partially updated with all the ticks for the time step. - when the feed is rebooted during charting, the backfiller will not finalize this bar since rn it presumes it should only fill data for time steps not already in the tsdb storage. Our current naive `.unique()` approach obvi keeps the incomplete bar and a "smarter" approach is to compare the provider's final vlm amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from the provider likely indicates the cancelled-during-live-write and **not** a datum discrepancy from said data provider. Analysis (with `claude`) of `zecusdt` data revealed: - 1000 duplicate timestamps - 999 identical bars (pure duplicates from 2022 backfill overlap) - 1 volume-monotonic conflict (live partial vs backfill complete) A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()` which: - sorts by vlm **before** deduplication and keep the most complete bar based on vlm monotonicity as well as the following OHLCV validation assumptions: * volume should always increase * high should be non-decreasing, * low should be non-increasing * open should be identical - Separates valid race conditions from provider data quality issues and reports and returns both dfs. Change summary by `claude`: - `.tsp._dedupe_smart`: new module with validation logic - `.tsp.__init__`: expose `dedupe_ohlcv_smart()` - `.storage.cli`: integrate smart dedupe, add logging for: * duplicate counts (identical vs monotonic races) * data quality violations (non-monotonic, invalid OHLC ranges) * warnings for provider data issues - Remove `assert not diff` (duplicates are valid now) Verified on `zecusdt`: correctly keeps index 3143645 (volume=287.777) over 3143644 (volume=140.299) for conflicting 2026-01-16 18:54 UTC bar. `claude`'s Summary of reasoning ------------------------------- - volume monotonicity is critical: a bar's volume only increases during its time window. - a backfilled bar should always have volume >= live updated. - violations indicate any of: * Provider data corruption * Non-OHLCV aggregation semantics * Timestamp misalignment (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-19 02:00:17 +00:00
# Report duplicate analysis
if diff > 0:
log.info(
f'Removed {diff} duplicate timestamp(s)\n'
)
if valid_races is not None:
identical: int = (
valid_races
.filter(pl.col('identical_bars'))
.height
)
monotonic: int = valid_races.height - identical
log.info(
f'Valid race conditions: {valid_races.height}\n'
f' - Identical bars: {identical}\n'
f' - Volume monotonic: {monotonic}\n'
)
if dq_issues is not None:
log.warning(
f'DATA QUALITY ISSUES from provider: '
f'{dq_issues.height} timestamp(s)\n'
f'{dq_issues}\n'
)
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
)
# TODO: by default we always want to mark these up
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
# TODO: actually pull the exact duration
# expected for each venue operational period?
Add vlm-based "smart" OHLCV de-duping & bar validation Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter" duplicate bars by attempting to distinguish between erroneous bars partially written during concurrent backfill race conditions vs. **actual** data quality issues from historical providers. Problem: -------- Concurrent writes (live updates vs. backfilling) can result in create duplicate timestamped ohlcv vars with different values. Some potential scenarios include, - a market live feed is cancelled during live update resulting in the "last" datum being partially updated with all the ticks for the time step. - when the feed is rebooted during charting, the backfiller will not finalize this bar since rn it presumes it should only fill data for time steps not already in the tsdb storage. Our current naive `.unique()` approach obvi keeps the incomplete bar and a "smarter" approach is to compare the provider's final vlm amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from the provider likely indicates the cancelled-during-live-write and **not** a datum discrepancy from said data provider. Analysis (with `claude`) of `zecusdt` data revealed: - 1000 duplicate timestamps - 999 identical bars (pure duplicates from 2022 backfill overlap) - 1 volume-monotonic conflict (live partial vs backfill complete) A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()` which: - sorts by vlm **before** deduplication and keep the most complete bar based on vlm monotonicity as well as the following OHLCV validation assumptions: * volume should always increase * high should be non-decreasing, * low should be non-increasing * open should be identical - Separates valid race conditions from provider data quality issues and reports and returns both dfs. Change summary by `claude`: - `.tsp._dedupe_smart`: new module with validation logic - `.tsp.__init__`: expose `dedupe_ohlcv_smart()` - `.storage.cli`: integrate smart dedupe, add logging for: * duplicate counts (identical vs monotonic races) * data quality violations (non-monotonic, invalid OHLC ranges) * warnings for provider data issues - Remove `assert not diff` (duplicates are valid now) Verified on `zecusdt`: correctly keeps index 3143645 (volume=287.777) over 3143644 (volume=140.299) for conflicting 2026-01-16 18:54 UTC bar. `claude`'s Summary of reasoning ------------------------------- - volume monotonicity is critical: a bar's volume only increases during its time window. - a backfilled bar should always have volume >= live updated. - violations indicate any of: * Provider data corruption * Non-OHLCV aggregation semantics * Timestamp misalignment (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-19 02:00:17 +00:00
# gap_dt_unit='day',
gap_dt_unit='day',
gap_thresh=1,
)
# TODO: find the disjoint set of step gaps from
# venue (closure) set!
# -[ ] do a set diff by checking for the unique
# gap set only in the step_gaps?
if (
not venue_gaps.is_empty()
or (
not step_gaps.is_empty()
# XXX, i presume i put this bc i was guarding
# for ib venue gaps?
# and
# period_s < 60
)
):
# write repaired ts to parquet-file?
if write_parquet:
start: float = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period_s,
)
write_delay: float = round(
time.time() - start,
ndigits=6,
)
# read back from fs
start: float = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {write_delay} secs\n'
f'file path: {path}'
f'parquet read took {read_delay} secs\n'
f'polars df: {read_df}'
)
if reload_parquet_to_shm:
new = tsp.pl2np(
deduped,
dtype=shm.array.dtype,
)
# since normally readonly
shm._array.setflags(
write=int(1),
)
shm.push(
new,
prepend=True,
start=new['index'][-1],
update_first=False, # don't update ._first
)
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await tsp._annotate.markup_gaps(
fqme,
period_s,
actl,
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
# await tractor.pause()
if not aids:
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
tf2aids[period_s] = aids
else:
Add vlm-based "smart" OHLCV de-duping & bar validation Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter" duplicate bars by attempting to distinguish between erroneous bars partially written during concurrent backfill race conditions vs. **actual** data quality issues from historical providers. Problem: -------- Concurrent writes (live updates vs. backfilling) can result in create duplicate timestamped ohlcv vars with different values. Some potential scenarios include, - a market live feed is cancelled during live update resulting in the "last" datum being partially updated with all the ticks for the time step. - when the feed is rebooted during charting, the backfiller will not finalize this bar since rn it presumes it should only fill data for time steps not already in the tsdb storage. Our current naive `.unique()` approach obvi keeps the incomplete bar and a "smarter" approach is to compare the provider's final vlm amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from the provider likely indicates the cancelled-during-live-write and **not** a datum discrepancy from said data provider. Analysis (with `claude`) of `zecusdt` data revealed: - 1000 duplicate timestamps - 999 identical bars (pure duplicates from 2022 backfill overlap) - 1 volume-monotonic conflict (live partial vs backfill complete) A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()` which: - sorts by vlm **before** deduplication and keep the most complete bar based on vlm monotonicity as well as the following OHLCV validation assumptions: * volume should always increase * high should be non-decreasing, * low should be non-increasing * open should be identical - Separates valid race conditions from provider data quality issues and reports and returns both dfs. Change summary by `claude`: - `.tsp._dedupe_smart`: new module with validation logic - `.tsp.__init__`: expose `dedupe_ohlcv_smart()` - `.storage.cli`: integrate smart dedupe, add logging for: * duplicate counts (identical vs monotonic races) * data quality violations (non-monotonic, invalid OHLC ranges) * warnings for provider data issues - Remove `assert not diff` (duplicates are valid now) Verified on `zecusdt`: correctly keeps index 3143645 (volume=287.777) over 3143644 (volume=140.299) for conflicting 2026-01-16 18:54 UTC bar. `claude`'s Summary of reasoning ------------------------------- - volume monotonicity is critical: a bar's volume only increases during its time window. - a backfilled bar should always have volume >= live updated. - violations indicate any of: * Provider data corruption * Non-OHLCV aggregation semantics * Timestamp misalignment (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-19 02:00:17 +00:00
# No significant gaps to handle, but may have had
# duplicates removed (valid race conditions are ok)
if diff > 0 and dq_issues is not None:
log.warning(
'Found duplicates with data quality issues '
'but no significant time gaps!\n'
)
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None:
log.error(
f'No matching shm buffers for {fqme} ?'
)
trio.run(main)
typer_click_object = typer.main.get_command(store)
cli.add_command(typer_click_object, 'store')