2020-11-06 17:23:14 +00:00
|
|
|
# piker: trading gear for hackers
|
|
|
|
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
|
|
|
|
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU Affero General Public License for more details.
|
|
|
|
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
2020-09-25 19:34:54 +00:00
|
|
|
"""
|
2023-05-30 00:11:57 +00:00
|
|
|
Storage middle-ware CLIs.
|
2022-02-17 21:34:22 +00:00
|
|
|
|
2020-09-25 19:34:54 +00:00
|
|
|
"""
|
2023-05-29 21:41:40 +00:00
|
|
|
from __future__ import annotations
|
2023-05-31 22:39:41 +00:00
|
|
|
from pathlib import Path
|
2023-06-19 18:29:05 +00:00
|
|
|
import time
|
|
|
|
from typing import Generator
|
2023-06-06 17:00:25 +00:00
|
|
|
# from typing import TYPE_CHECKING
|
2023-05-31 22:39:41 +00:00
|
|
|
|
2023-06-06 17:00:25 +00:00
|
|
|
import polars as pl
|
2023-05-31 22:39:41 +00:00
|
|
|
import numpy as np
|
2023-06-19 18:29:05 +00:00
|
|
|
import tractor
|
2023-06-06 17:00:25 +00:00
|
|
|
# import pendulum
|
2023-05-29 21:41:40 +00:00
|
|
|
from rich.console import Console
|
2023-05-31 22:39:41 +00:00
|
|
|
import trio
|
2023-05-29 21:41:40 +00:00
|
|
|
# from rich.markdown import Markdown
|
|
|
|
import typer
|
2022-01-17 20:47:20 +00:00
|
|
|
|
2023-05-31 22:39:41 +00:00
|
|
|
from piker.service import open_piker_runtime
|
|
|
|
from piker.cli import cli
|
2023-06-19 18:29:05 +00:00
|
|
|
from piker.config import get_conf_dir
|
|
|
|
from piker.data import (
|
|
|
|
maybe_open_shm_array,
|
|
|
|
def_iohlcv_fields,
|
|
|
|
ShmArray,
|
|
|
|
)
|
|
|
|
from piker.data.history import (
|
|
|
|
_default_hist_size,
|
|
|
|
_default_rt_size,
|
|
|
|
)
|
2023-05-29 21:41:40 +00:00
|
|
|
from . import (
|
2023-04-18 22:17:45 +00:00
|
|
|
log,
|
2022-03-10 02:07:48 +00:00
|
|
|
)
|
2023-06-06 17:00:25 +00:00
|
|
|
from . import (
|
|
|
|
__tsdbs__,
|
|
|
|
open_storage_client,
|
|
|
|
)
|
2022-03-10 02:07:48 +00:00
|
|
|
|
2023-05-29 21:41:40 +00:00
|
|
|
|
2023-05-30 00:11:57 +00:00
|
|
|
store = typer.Typer()
|
2022-03-10 02:07:48 +00:00
|
|
|
|
2023-05-29 21:41:40 +00:00
|
|
|
|
|
|
|
@store.command()
|
|
|
|
def ls(
|
|
|
|
backends: list[str] = typer.Argument(
|
|
|
|
default=None,
|
|
|
|
help='Storage backends to query, default is all.'
|
|
|
|
),
|
|
|
|
):
|
|
|
|
from rich.table import Table
|
|
|
|
|
|
|
|
if not backends:
|
|
|
|
backends: list[str] = __tsdbs__
|
|
|
|
|
|
|
|
console = Console()
|
|
|
|
|
|
|
|
async def query_all():
|
|
|
|
nonlocal backends
|
|
|
|
|
|
|
|
async with (
|
|
|
|
open_piker_runtime(
|
|
|
|
'tsdb_storage',
|
|
|
|
enable_modules=['piker.service._ahab'],
|
|
|
|
),
|
2022-02-18 17:17:41 +00:00
|
|
|
):
|
2023-06-02 17:20:28 +00:00
|
|
|
for i, backend in enumerate(backends):
|
|
|
|
table = Table()
|
|
|
|
try:
|
|
|
|
async with open_storage_client(backend=backend) as (
|
|
|
|
mod,
|
|
|
|
client,
|
|
|
|
):
|
|
|
|
table.add_column(f'{mod.name}@{client.address}')
|
|
|
|
keys: list[str] = await client.list_keys()
|
|
|
|
for key in keys:
|
|
|
|
table.add_row(key)
|
|
|
|
|
|
|
|
console.print(table)
|
|
|
|
except Exception:
|
|
|
|
log.error(f'Unable to connect to storage engine: `{backend}`')
|
2023-01-29 20:17:39 +00:00
|
|
|
|
2023-05-29 21:41:40 +00:00
|
|
|
trio.run(query_all)
|
2023-01-29 20:17:39 +00:00
|
|
|
|
|
|
|
|
2023-05-29 21:41:40 +00:00
|
|
|
@store.command()
|
|
|
|
def delete(
|
2023-01-29 20:17:39 +00:00
|
|
|
symbols: list[str],
|
|
|
|
|
2023-05-29 21:41:40 +00:00
|
|
|
backend: str = typer.Option(
|
|
|
|
default=None,
|
|
|
|
help='Storage backend to update'
|
|
|
|
),
|
2023-06-02 18:22:42 +00:00
|
|
|
# TODO: expose this as flagged multi-option?
|
|
|
|
timeframes: list[int] = [1, 60],
|
2023-01-29 20:17:39 +00:00
|
|
|
):
|
|
|
|
'''
|
2023-05-29 21:41:40 +00:00
|
|
|
Delete a storage backend's time series for (table) keys provided as
|
|
|
|
``symbols``.
|
2023-01-29 20:17:39 +00:00
|
|
|
|
|
|
|
'''
|
2023-05-29 21:41:40 +00:00
|
|
|
from . import open_storage_client
|
|
|
|
|
|
|
|
async def main(symbols: list[str]):
|
|
|
|
async with (
|
|
|
|
open_piker_runtime(
|
|
|
|
'tsdb_storage',
|
|
|
|
enable_modules=['piker.service._ahab']
|
|
|
|
),
|
2023-06-02 18:22:42 +00:00
|
|
|
open_storage_client(backend) as (_, client),
|
2023-05-29 21:41:40 +00:00
|
|
|
trio.open_nursery() as n,
|
|
|
|
):
|
|
|
|
# spawn queries as tasks for max conc!
|
|
|
|
for fqme in symbols:
|
2023-06-02 18:22:42 +00:00
|
|
|
for tf in timeframes:
|
2023-05-29 21:41:40 +00:00
|
|
|
n.start_soon(
|
2023-06-02 18:22:42 +00:00
|
|
|
client.delete_ts,
|
2023-05-29 21:41:40 +00:00
|
|
|
fqme,
|
|
|
|
tf,
|
|
|
|
)
|
2023-01-29 20:17:39 +00:00
|
|
|
|
2023-05-29 21:41:40 +00:00
|
|
|
trio.run(main, symbols)
|
2023-01-29 20:17:39 +00:00
|
|
|
|
2022-03-29 17:33:43 +00:00
|
|
|
|
2023-05-31 22:39:41 +00:00
|
|
|
@store.command()
|
2023-06-06 17:00:25 +00:00
|
|
|
def anal(
|
2023-05-31 22:39:41 +00:00
|
|
|
fqme: str,
|
2023-06-06 17:00:25 +00:00
|
|
|
period: int = 60,
|
2023-05-31 22:39:41 +00:00
|
|
|
|
|
|
|
) -> np.ndarray:
|
|
|
|
|
2023-06-02 16:02:49 +00:00
|
|
|
async def main():
|
2023-06-19 17:36:06 +00:00
|
|
|
async with (
|
|
|
|
open_piker_runtime(
|
|
|
|
'tsdb_polars_anal',
|
|
|
|
# enable_modules=['piker.service._ahab']
|
|
|
|
),
|
|
|
|
open_storage_client() as (mod, client),
|
|
|
|
):
|
2023-06-02 16:02:49 +00:00
|
|
|
syms: list[str] = await client.list_keys()
|
2023-06-06 17:00:25 +00:00
|
|
|
print(f'{len(syms)} FOUND for {mod.name}')
|
2023-06-02 16:02:49 +00:00
|
|
|
|
|
|
|
(
|
|
|
|
history,
|
|
|
|
first_dt,
|
|
|
|
last_dt,
|
|
|
|
) = await client.load(
|
2023-05-31 22:39:41 +00:00
|
|
|
fqme,
|
2023-06-06 17:00:25 +00:00
|
|
|
period,
|
2023-05-31 22:39:41 +00:00
|
|
|
)
|
2023-06-02 16:02:49 +00:00
|
|
|
assert first_dt < last_dt
|
2023-06-06 17:00:25 +00:00
|
|
|
|
|
|
|
src_df = await client.as_df(fqme, period)
|
2023-06-08 15:52:59 +00:00
|
|
|
from piker.data import _timeseries as tsmod
|
|
|
|
df = tsmod.with_dts(src_df)
|
|
|
|
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
|
2023-06-06 17:00:25 +00:00
|
|
|
|
2023-06-19 17:36:06 +00:00
|
|
|
if gaps:
|
|
|
|
print(f'Gaps found:\n{gaps}')
|
|
|
|
|
2023-06-08 14:24:42 +00:00
|
|
|
# TODO: something better with tab completion..
|
|
|
|
# is there something more minimal but nearly as
|
|
|
|
# functional as ipython?
|
2023-06-19 17:36:06 +00:00
|
|
|
await tractor.breakpoint()
|
2023-06-02 16:02:49 +00:00
|
|
|
|
|
|
|
trio.run(main)
|
|
|
|
|
2023-05-31 22:39:41 +00:00
|
|
|
|
2023-06-19 18:29:05 +00:00
|
|
|
def iter_dfs_from_shms(fqme: str) -> Generator[
|
|
|
|
tuple[Path, ShmArray, pl.DataFrame],
|
|
|
|
None,
|
|
|
|
None,
|
|
|
|
]:
|
|
|
|
# shm buffer size table based on known sample rates
|
|
|
|
sizes: dict[str, int] = {
|
|
|
|
'hist': _default_hist_size,
|
|
|
|
'rt': _default_rt_size,
|
|
|
|
}
|
|
|
|
|
|
|
|
# load all detected shm buffer files which have the
|
|
|
|
# passed FQME pattern in the file name.
|
|
|
|
shmfiles: list[Path] = []
|
|
|
|
shmdir = Path('/dev/shm/')
|
|
|
|
|
|
|
|
for shmfile in shmdir.glob(f'*{fqme}*'):
|
|
|
|
filename: str = shmfile.name
|
|
|
|
|
|
|
|
# skip index files
|
|
|
|
if (
|
|
|
|
'_first' in filename
|
|
|
|
or '_last' in filename
|
|
|
|
):
|
|
|
|
continue
|
|
|
|
|
|
|
|
assert shmfile.is_file()
|
|
|
|
log.debug(f'Found matching shm buffer file: {filename}')
|
|
|
|
shmfiles.append(shmfile)
|
|
|
|
|
|
|
|
for shmfile in shmfiles:
|
|
|
|
|
|
|
|
# lookup array buffer size based on file suffix
|
|
|
|
# being either .rt or .hist
|
|
|
|
size: int = sizes[shmfile.name.rsplit('.')[-1]]
|
|
|
|
|
|
|
|
# attach to any shm buffer, load array into polars df,
|
|
|
|
# write to local parquet file.
|
|
|
|
shm, opened = maybe_open_shm_array(
|
|
|
|
key=shmfile.name,
|
|
|
|
size=size,
|
|
|
|
dtype=def_iohlcv_fields,
|
|
|
|
readonly=True,
|
|
|
|
)
|
|
|
|
assert not opened
|
|
|
|
ohlcv = shm.array
|
|
|
|
|
|
|
|
start = time.time()
|
|
|
|
|
|
|
|
# XXX: thanks to this SO answer for this conversion tip:
|
|
|
|
# https://stackoverflow.com/a/72054819
|
|
|
|
df = pl.DataFrame({
|
|
|
|
field_name: ohlcv[field_name]
|
|
|
|
for field_name in ohlcv.dtype.fields
|
|
|
|
})
|
|
|
|
delay: float = round(
|
|
|
|
time.time() - start,
|
|
|
|
ndigits=6,
|
|
|
|
)
|
|
|
|
log.info(
|
|
|
|
f'numpy -> polars conversion took {delay} secs\n'
|
|
|
|
f'polars df: {df}'
|
|
|
|
)
|
|
|
|
|
|
|
|
yield (
|
|
|
|
shmfile,
|
|
|
|
shm,
|
|
|
|
df,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2023-05-31 22:39:41 +00:00
|
|
|
@store.command()
|
2023-06-19 18:29:05 +00:00
|
|
|
def ldshm(
|
2023-05-31 22:39:41 +00:00
|
|
|
fqme: str,
|
2023-06-19 18:29:05 +00:00
|
|
|
|
|
|
|
write_parquet: bool = False,
|
|
|
|
|
2023-05-31 22:39:41 +00:00
|
|
|
) -> None:
|
2023-06-19 18:29:05 +00:00
|
|
|
'''
|
|
|
|
Linux ONLY: load any fqme file name matching shm buffer from
|
|
|
|
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
|
|
|
|
optionally write to .parquet file.
|
2023-05-31 22:39:41 +00:00
|
|
|
|
2023-06-19 18:29:05 +00:00
|
|
|
'''
|
2023-05-31 22:39:41 +00:00
|
|
|
async def main():
|
|
|
|
async with (
|
|
|
|
open_piker_runtime(
|
|
|
|
'polars_boi',
|
|
|
|
enable_modules=['piker.data._sharedmem'],
|
|
|
|
),
|
|
|
|
):
|
|
|
|
|
2023-06-19 18:29:05 +00:00
|
|
|
df: pl.DataFrame | None = None
|
|
|
|
for shmfile, shm, df in iter_dfs_from_shms(fqme):
|
2023-05-31 22:39:41 +00:00
|
|
|
|
2023-06-19 18:29:05 +00:00
|
|
|
# compute ohlc properties for naming
|
|
|
|
times: np.ndarray = shm.array['time']
|
|
|
|
secs: float = times[-1] - times[-2]
|
|
|
|
if secs < 1.:
|
|
|
|
breakpoint()
|
|
|
|
raise ValueError(
|
|
|
|
f'Something is wrong with time period for {shm}:\n{times}'
|
|
|
|
)
|
|
|
|
|
|
|
|
# TODO: maybe only optionally enter this depending
|
|
|
|
# on some CLI flags and/or gap detection?
|
|
|
|
await tractor.breakpoint()
|
|
|
|
|
|
|
|
# write to parquet file?
|
|
|
|
if write_parquet:
|
|
|
|
timeframe: str = f'{secs}s'
|
|
|
|
|
|
|
|
datadir: Path = get_conf_dir() / 'nativedb'
|
|
|
|
if not datadir.is_dir():
|
|
|
|
datadir.mkdir()
|
|
|
|
|
|
|
|
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
|
|
|
|
|
|
|
|
# write to fs
|
|
|
|
start = time.time()
|
|
|
|
df.write_parquet(path)
|
|
|
|
delay: float = round(
|
|
|
|
time.time() - start,
|
|
|
|
ndigits=6,
|
|
|
|
)
|
|
|
|
log.info(
|
|
|
|
f'parquet write took {delay} secs\n'
|
|
|
|
f'file path: {path}'
|
|
|
|
)
|
|
|
|
|
|
|
|
# read back from fs
|
|
|
|
start = time.time()
|
|
|
|
read_df: pl.DataFrame = pl.read_parquet(path)
|
|
|
|
delay: float = round(
|
|
|
|
time.time() - start,
|
|
|
|
ndigits=6,
|
|
|
|
)
|
|
|
|
print(
|
|
|
|
f'parquet read took {delay} secs\n'
|
|
|
|
f'polars df: {read_df}'
|
|
|
|
)
|
|
|
|
|
|
|
|
if df is None:
|
|
|
|
log.error(f'No matching shm buffers for {fqme} ?')
|
2023-05-31 22:39:41 +00:00
|
|
|
|
|
|
|
trio.run(main)
|
|
|
|
|
|
|
|
|
2023-05-29 21:41:40 +00:00
|
|
|
typer_click_object = typer.main.get_command(store)
|
|
|
|
cli.add_command(typer_click_object, 'store')
|