piker/piker/storage/cli.py

554 lines
16 KiB
Python

# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Storage middle-ware CLIs.
"""
from __future__ import annotations
# from datetime import datetime
# from contextlib import (
# AsyncExitStack,
# )
from pathlib import Path
from math import copysign
import time
from types import ModuleType
from typing import (
Any,
TYPE_CHECKING,
)
import polars as pl
import numpy as np
import tractor
# import pendulum
from rich.console import Console
import trio
# from rich.markdown import Markdown
import typer
from piker.service import open_piker_runtime
from piker.cli import cli
from piker.data import (
ShmArray,
)
from piker import tsp
from piker.data._formatters import BGM
from . import log
from . import (
__tsdbs__,
open_storage_client,
StorageClient,
)
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
store = typer.Typer()
@store.command()
def ls(
backends: list[str] = typer.Argument(
default=None,
help='Storage backends to query, default is all.'
),
):
from rich.table import Table
if not backends:
backends: list[str] = __tsdbs__
console = Console()
async def query_all():
nonlocal backends
async with (
open_piker_runtime(
'tsdb_storage',
),
):
for i, backend in enumerate(backends):
table = Table()
try:
async with open_storage_client(backend=backend) as (
mod,
client,
):
table.add_column(f'{mod.name}@{client.address}')
keys: list[str] = await client.list_keys()
for key in keys:
table.add_row(key)
console.print(table)
except Exception:
log.error(f'Unable to connect to storage engine: `{backend}`')
trio.run(query_all)
# TODO: like ls but takes in a pattern and matches
# @store.command()
# def search(
# patt: str,
# backends: list[str] = typer.Argument(
# default=None,
# help='Storage backends to query, default is all.'
# ),
# ):
# ...
@store.command()
def delete(
symbols: list[str],
backend: str = typer.Option(
default=None,
help='Storage backend to update'
),
# TODO: expose this as flagged multi-option?
timeframes: list[int] = [1, 60],
):
'''
Delete a storage backend's time series for (table) keys provided as
``symbols``.
'''
from . import open_storage_client
async def main(symbols: list[str]):
async with (
open_piker_runtime(
'tsdb_storage',
),
open_storage_client(backend) as (_, client),
trio.open_nursery() as n,
):
# spawn queries as tasks for max conc!
for fqme in symbols:
for tf in timeframes:
n.start_soon(
client.delete_ts,
fqme,
tf,
)
trio.run(main, symbols)
@store.command()
def anal(
fqme: str,
period: int = 60,
pdb: bool = False,
) -> np.ndarray:
'''
Anal-ysis is when you take the data do stuff to it.
NOTE: This ONLY loads the offline timeseries data (by default
from a parquet file) NOT the in-shm version you might be seeing
in a chart.
'''
async def main():
async with (
open_piker_runtime(
# are you a bear or boi?
'tsdb_polars_anal',
debug_mode=pdb,
),
open_storage_client() as (
mod,
client,
),
):
syms: list[str] = await client.list_keys()
log.info(f'{len(syms)} FOUND for {mod.name}')
history: ShmArray # np buffer format
(
history,
first_dt,
last_dt,
) = await client.load(
fqme,
period,
)
assert first_dt < last_dt
null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
# TODO: do tsp queries to backcend to fill i missing
# history and then prolly write it to tsdb!
shm_df: pl.DataFrame = await client.as_df(
fqme,
period,
)
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period,
)
write_edits: bool = True
if (
write_edits
and (
diff
or null_segs
)
):
await tractor.pause()
await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period,
)
else:
# TODO: something better with tab completion..
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
assert not null_segs
trio.run(main)
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
if prev_r.is_empty():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
rect_gap: float = BGM*3/8
opn: float = row['open'][0]
ro: tuple[float, float] = (
# dt_end_t,
iend + rect_gap + 1,
opn,
)
cls: float = prev_r['close'][0]
lc: tuple[float, float] = (
# dt_start_t,
istart - rect_gap, # + 1 ,
cls,
)
color: str = 'dad_blue'
diff: float = cls - opn
sgn: float = copysign(1, diff)
color: str = {
-1: 'buy_green',
1: 'sell_red',
}[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
)
aid: int = await actl.add_rect(**rect_kwargs)
assert aid
aids[aid] = rect_kwargs
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids
@store.command()
def ldshm(
fqme: str,
write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
) -> None:
'''
Linux ONLY: load any fqme file name matching shm buffer from
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
optionally write to offline storage via `.parquet` file.
'''
async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
)
actl: AnnotCtl
mod: ModuleType
client: StorageClient
async with (
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
debug_mode=True,
),
open_storage_client() as (
mod,
client,
),
open_annot_ctl() as actl,
):
shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for (
shmfile,
shm,
# parquet_path,
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2])
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
period_s: float = float(max(d1, d2, med))
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: call null-seg fixer somehow?
if null_segs:
await tractor.pause()
# async with (
# trio.open_nursery() as tn,
# mod.open_history_client(
# mkt,
# ) as (get_hist, config),
# ):
# nulls_detected: trio.Event = await tn.start(partial(
# tsp.maybe_fill_null_segments,
# shm=shm,
# timeframe=timeframe,
# get_hist=get_hist,
# sampler_stream=sampler_stream,
# mkt=mkt,
# ))
# over-write back to shm?
wdts: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
wdts,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period_s,
)
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
)
# TODO: by default we always want to mark these up
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
gap_thresh=1,
)
# TODO: find the disjoint set of step gaps from
# venue (closure) set!
# -[ ] do a set diff by checking for the unique
# gap set only in the step_gaps?
if (
not venue_gaps.is_empty()
or (
period_s < 60
and not step_gaps.is_empty()
)
):
# write repaired ts to parquet-file?
if write_parquet:
start: float = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period_s,
)
write_delay: float = round(
time.time() - start,
ndigits=6,
)
# read back from fs
start: float = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {write_delay} secs\n'
f'file path: {path}'
f'parquet read took {read_delay} secs\n'
f'polars df: {read_df}'
)
if reload_parquet_to_shm:
new = tsp.pl2np(
deduped,
dtype=shm.array.dtype,
)
# since normally readonly
shm._array.setflags(
write=int(1),
)
shm.push(
new,
prepend=True,
start=new['index'][-1],
update_first=False, # don't update ._first
)
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await markup_gaps(
fqme,
period_s,
actl,
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
# await tractor.pause()
assert aids
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None:
log.error(
f'No matching shm buffers for {fqme} ?'
)
trio.run(main)
typer_click_object = typer.main.get_command(store)
cli.add_command(typer_click_object, 'store')