482 lines
15 KiB
Python
482 lines
15 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
Storage middle-ware CLIs.
|
|
|
|
"""
|
|
from __future__ import annotations
|
|
from pathlib import Path
|
|
import time
|
|
from types import ModuleType
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
)
|
|
|
|
import polars as pl
|
|
import numpy as np
|
|
import tractor
|
|
# import pendulum
|
|
from rich.console import Console
|
|
import trio
|
|
# from rich.markdown import Markdown
|
|
import typer
|
|
|
|
from piker.service import open_piker_runtime
|
|
from piker.cli import cli
|
|
from piker.data import (
|
|
ShmArray,
|
|
)
|
|
from piker import tsp
|
|
from . import log
|
|
from . import (
|
|
__tsdbs__,
|
|
open_storage_client,
|
|
StorageClient,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from piker.ui._remote_ctl import AnnotCtl
|
|
|
|
|
|
store = typer.Typer()
|
|
|
|
|
|
@store.command()
|
|
def ls(
|
|
backends: list[str] = typer.Argument(
|
|
default=None,
|
|
help='Storage backends to query, default is all.'
|
|
),
|
|
):
|
|
from rich.table import Table
|
|
|
|
if not backends:
|
|
backends: list[str] = __tsdbs__
|
|
|
|
console = Console()
|
|
|
|
async def query_all():
|
|
nonlocal backends
|
|
|
|
async with (
|
|
open_piker_runtime(
|
|
'tsdb_storage',
|
|
),
|
|
):
|
|
for i, backend in enumerate(backends):
|
|
table = Table()
|
|
try:
|
|
async with open_storage_client(backend=backend) as (
|
|
mod,
|
|
client,
|
|
):
|
|
table.add_column(f'{mod.name}@{client.address}')
|
|
keys: list[str] = await client.list_keys()
|
|
for key in keys:
|
|
table.add_row(key)
|
|
|
|
console.print(table)
|
|
except Exception:
|
|
log.error(f'Unable to connect to storage engine: `{backend}`')
|
|
|
|
trio.run(query_all)
|
|
|
|
|
|
# TODO: like ls but takes in a pattern and matches
|
|
# @store.command()
|
|
# def search(
|
|
# patt: str,
|
|
# backends: list[str] = typer.Argument(
|
|
# default=None,
|
|
# help='Storage backends to query, default is all.'
|
|
# ),
|
|
# ):
|
|
# ...
|
|
|
|
|
|
@store.command()
|
|
def delete(
|
|
symbols: list[str],
|
|
|
|
backend: str = typer.Option(
|
|
default=None,
|
|
help='Storage backend to update'
|
|
),
|
|
# TODO: expose this as flagged multi-option?
|
|
timeframes: list[int] = [1, 60],
|
|
):
|
|
'''
|
|
Delete a storage backend's time series for (table) keys provided as
|
|
``symbols``.
|
|
|
|
'''
|
|
from . import open_storage_client
|
|
|
|
async def main(symbols: list[str]):
|
|
async with (
|
|
open_piker_runtime(
|
|
'tsdb_storage',
|
|
),
|
|
open_storage_client(backend) as (_, client),
|
|
trio.open_nursery() as n,
|
|
):
|
|
# spawn queries as tasks for max conc!
|
|
for fqme in symbols:
|
|
for tf in timeframes:
|
|
n.start_soon(
|
|
client.delete_ts,
|
|
fqme,
|
|
tf,
|
|
)
|
|
|
|
trio.run(main, symbols)
|
|
|
|
|
|
@store.command()
|
|
def anal(
|
|
fqme: str,
|
|
period: int = 60,
|
|
pdb: bool = False,
|
|
|
|
) -> np.ndarray:
|
|
'''
|
|
Anal-ysis is when you take the data do stuff to it.
|
|
|
|
NOTE: This ONLY loads the offline timeseries data (by default
|
|
from a parquet file) NOT the in-shm version you might be seeing
|
|
in a chart.
|
|
|
|
'''
|
|
async def main():
|
|
async with (
|
|
open_piker_runtime(
|
|
# are you a bear or boi?
|
|
'tsdb_polars_anal',
|
|
debug_mode=pdb,
|
|
),
|
|
open_storage_client() as (
|
|
mod,
|
|
client,
|
|
),
|
|
):
|
|
syms: list[str] = await client.list_keys()
|
|
log.info(f'{len(syms)} FOUND for {mod.name}')
|
|
|
|
history: ShmArray # np buffer format
|
|
(
|
|
history,
|
|
first_dt,
|
|
last_dt,
|
|
) = await client.load(
|
|
fqme,
|
|
period,
|
|
)
|
|
assert first_dt < last_dt
|
|
|
|
null_segs: tuple = tsp.get_null_segs(
|
|
frame=history,
|
|
period=period,
|
|
)
|
|
# TODO: do tsp queries to backcend to fill i missing
|
|
# history and then prolly write it to tsdb!
|
|
|
|
shm_df: pl.DataFrame = await client.as_df(
|
|
fqme,
|
|
period,
|
|
)
|
|
|
|
df: pl.DataFrame # with dts
|
|
deduped: pl.DataFrame # deduplicated dts
|
|
(
|
|
df,
|
|
deduped,
|
|
diff,
|
|
) = tsp.dedupe(
|
|
shm_df,
|
|
period=period,
|
|
)
|
|
|
|
write_edits: bool = True
|
|
if (
|
|
write_edits
|
|
and (
|
|
diff
|
|
or null_segs
|
|
)
|
|
):
|
|
await tractor.pause()
|
|
await client.write_ohlcv(
|
|
fqme,
|
|
ohlcv=deduped,
|
|
timeframe=period,
|
|
)
|
|
|
|
else:
|
|
# TODO: something better with tab completion..
|
|
# is there something more minimal but nearly as
|
|
# functional as ipython?
|
|
await tractor.pause()
|
|
assert not null_segs
|
|
|
|
trio.run(main)
|
|
|
|
|
|
@store.command()
|
|
def ldshm(
|
|
fqme: str,
|
|
write_parquet: bool = True,
|
|
reload_parquet_to_shm: bool = True,
|
|
pdb: bool = False, # --pdb passed?
|
|
|
|
) -> None:
|
|
'''
|
|
Linux ONLY: load any fqme file name matching shm buffer from
|
|
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
|
|
optionally write to offline storage via `.parquet` file.
|
|
|
|
'''
|
|
async def main():
|
|
from piker.ui._remote_ctl import (
|
|
open_annot_ctl,
|
|
)
|
|
actl: AnnotCtl
|
|
mod: ModuleType
|
|
client: StorageClient
|
|
async with (
|
|
open_piker_runtime(
|
|
'polars_boi',
|
|
enable_modules=['piker.data._sharedmem'],
|
|
debug_mode=pdb,
|
|
),
|
|
open_storage_client() as (
|
|
mod,
|
|
client,
|
|
),
|
|
open_annot_ctl() as actl,
|
|
):
|
|
shm_df: pl.DataFrame | None = None
|
|
tf2aids: dict[float, dict] = {}
|
|
|
|
for (
|
|
shmfile,
|
|
shm,
|
|
# parquet_path,
|
|
shm_df,
|
|
) in tsp.iter_dfs_from_shms(fqme):
|
|
|
|
times: np.ndarray = shm.array['time']
|
|
d1: float = float(times[-1] - times[-2])
|
|
d2: float = 0
|
|
# XXX, take a median sample rate if sufficient data
|
|
if times.size > 2:
|
|
d2: float = float(times[-2] - times[-3])
|
|
med: float = np.median(np.diff(times))
|
|
if (
|
|
d1 < 1.
|
|
and d2 < 1.
|
|
and med < 1.
|
|
):
|
|
raise ValueError(
|
|
f'Something is wrong with time period for {shm}:\n{times}'
|
|
)
|
|
period_s: float = float(max(d1, d2, med))
|
|
|
|
null_segs: tuple = tsp.get_null_segs(
|
|
frame=shm.array,
|
|
period=period_s,
|
|
)
|
|
|
|
# TODO: call null-seg fixer somehow?
|
|
if null_segs:
|
|
|
|
if tractor._state.is_debug_mode():
|
|
await tractor.pause()
|
|
# async with (
|
|
# trio.open_nursery() as tn,
|
|
# mod.open_history_client(
|
|
# mkt,
|
|
# ) as (get_hist, config),
|
|
# ):
|
|
# nulls_detected: trio.Event = await tn.start(partial(
|
|
# tsp.maybe_fill_null_segments,
|
|
|
|
# shm=shm,
|
|
# timeframe=timeframe,
|
|
# get_hist=get_hist,
|
|
# sampler_stream=sampler_stream,
|
|
# mkt=mkt,
|
|
# ))
|
|
|
|
# over-write back to shm?
|
|
wdts: pl.DataFrame # with dts
|
|
deduped: pl.DataFrame # deduplicated dts
|
|
(
|
|
wdts,
|
|
deduped,
|
|
diff,
|
|
valid_races,
|
|
dq_issues,
|
|
) = tsp.dedupe_ohlcv_smart(
|
|
shm_df,
|
|
)
|
|
|
|
# Report duplicate analysis
|
|
if diff > 0:
|
|
log.info(
|
|
f'Removed {diff} duplicate timestamp(s)\n'
|
|
)
|
|
if valid_races is not None:
|
|
identical: int = (
|
|
valid_races
|
|
.filter(pl.col('identical_bars'))
|
|
.height
|
|
)
|
|
monotonic: int = valid_races.height - identical
|
|
log.info(
|
|
f'Valid race conditions: {valid_races.height}\n'
|
|
f' - Identical bars: {identical}\n'
|
|
f' - Volume monotonic: {monotonic}\n'
|
|
)
|
|
|
|
if dq_issues is not None:
|
|
log.warning(
|
|
f'DATA QUALITY ISSUES from provider: '
|
|
f'{dq_issues.height} timestamp(s)\n'
|
|
f'{dq_issues}\n'
|
|
)
|
|
|
|
# detect gaps from in expected (uniform OHLC) sample period
|
|
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
|
|
deduped,
|
|
expect_period=period_s,
|
|
)
|
|
|
|
# TODO: by default we always want to mark these up
|
|
# with rects showing up/down gaps Bo
|
|
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
|
|
deduped,
|
|
expect_period=period_s,
|
|
|
|
# TODO: actually pull the exact duration
|
|
# expected for each venue operational period?
|
|
# gap_dt_unit='day',
|
|
gap_dt_unit='day',
|
|
gap_thresh=1,
|
|
)
|
|
|
|
# TODO: find the disjoint set of step gaps from
|
|
# venue (closure) set!
|
|
# -[ ] do a set diff by checking for the unique
|
|
# gap set only in the step_gaps?
|
|
if (
|
|
not venue_gaps.is_empty()
|
|
or (
|
|
not step_gaps.is_empty()
|
|
# XXX, i presume i put this bc i was guarding
|
|
# for ib venue gaps?
|
|
# and
|
|
# period_s < 60
|
|
)
|
|
):
|
|
# write repaired ts to parquet-file?
|
|
if write_parquet:
|
|
start: float = time.time()
|
|
path: Path = await client.write_ohlcv(
|
|
fqme,
|
|
ohlcv=deduped,
|
|
timeframe=period_s,
|
|
)
|
|
write_delay: float = round(
|
|
time.time() - start,
|
|
ndigits=6,
|
|
)
|
|
|
|
# read back from fs
|
|
start: float = time.time()
|
|
read_df: pl.DataFrame = pl.read_parquet(path)
|
|
read_delay: float = round(
|
|
time.time() - start,
|
|
ndigits=6,
|
|
)
|
|
log.info(
|
|
f'parquet write took {write_delay} secs\n'
|
|
f'file path: {path}'
|
|
f'parquet read took {read_delay} secs\n'
|
|
f'polars df: {read_df}'
|
|
)
|
|
|
|
if reload_parquet_to_shm:
|
|
new = tsp.pl2np(
|
|
deduped,
|
|
dtype=shm.array.dtype,
|
|
)
|
|
# since normally readonly
|
|
shm._array.setflags(
|
|
write=int(1),
|
|
)
|
|
shm.push(
|
|
new,
|
|
prepend=True,
|
|
start=new['index'][-1],
|
|
update_first=False, # don't update ._first
|
|
)
|
|
|
|
do_markup_gaps: bool = True
|
|
if do_markup_gaps:
|
|
new_df: pl.DataFrame = tsp.np2pl(new)
|
|
aids: dict = await tsp._annotate.markup_gaps(
|
|
fqme,
|
|
period_s,
|
|
actl,
|
|
new_df,
|
|
step_gaps,
|
|
)
|
|
# last chance manual overwrites in REPL
|
|
# await tractor.pause()
|
|
if not aids:
|
|
log.warning(
|
|
f'No gaps were found !?\n'
|
|
f'fqme: {fqme!r}\n'
|
|
f'timeframe: {period_s!r}\n'
|
|
f"WELL THAT'S GOOD NOOZ!\n"
|
|
)
|
|
tf2aids[period_s] = aids
|
|
|
|
else:
|
|
# No significant gaps to handle, but may have had
|
|
# duplicates removed (valid race conditions are ok)
|
|
if diff > 0 and dq_issues is not None:
|
|
log.warning(
|
|
'Found duplicates with data quality issues '
|
|
'but no significant time gaps!\n'
|
|
)
|
|
|
|
await tractor.pause()
|
|
log.info('Exiting TSP shm anal-izer!')
|
|
|
|
if shm_df is None:
|
|
log.error(
|
|
f'No matching shm buffers for {fqme} ?'
|
|
|
|
)
|
|
|
|
trio.run(main)
|
|
|
|
|
|
typer_click_object = typer.main.get_command(store)
|
|
cli.add_command(typer_click_object, 'store')
|