Add `store ldshm` subcmd

Changed from the old `store clone` to instead simply load any shm buffer
matching a user provided `FQME: str` pattern; writing to parquet file is
only done if an explicit option flag is passed by user.

Implement new `iter_dfs_from_shms()` generator which allows interatively
loading both 1m and 1s buffers delivering the `Path`, `ShmArray` and
`polars.DataFrame` instances per matching file B)

Also add a todo for a `NativeStorageClient.clear_range()` method.
basic_buy_bot
Tyler Goodlet 2023-06-19 14:29:05 -04:00
parent 58c096bfad
commit d704d631ba
2 changed files with 164 additions and 77 deletions

View File

@ -20,10 +20,13 @@ Storage middle-ware CLIs.
"""
from __future__ import annotations
from pathlib import Path
import time
from typing import Generator
# from typing import TYPE_CHECKING
import polars as pl
import numpy as np
import tractor
# import pendulum
from rich.console import Console
import trio
@ -32,6 +35,16 @@ import typer
from piker.service import open_piker_runtime
from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray,
)
from piker.data.history import (
_default_hist_size,
_default_rt_size,
)
from . import (
log,
)
@ -132,8 +145,6 @@ def anal(
) -> np.ndarray:
import tractor
async def main():
async with (
open_piker_runtime(
@ -171,25 +182,90 @@ def anal(
trio.run(main)
def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
size: int = sizes[shmfile.name.rsplit('.')[-1]]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
start = time.time()
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: ohlcv[field_name]
for field_name in ohlcv.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)
yield (
shmfile,
shm,
df,
)
@store.command()
def clone(
def ldshm(
fqme: str,
write_parquet: bool = False,
) -> None:
import time
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
)
import polars as pl
# TODO: actually look up an existing shm buf (set) from
# an fqme and file name parsing..
# open existing shm buffer for kucoin backend
key: str = 'piker.brokerd[3595d316-3c15-46].xmrusdt.kucoin.hist'
shmpath: Path = Path('/dev/shm') / key
assert shmpath.is_file()
'''
Linux ONLY: load any fqme file name matching shm buffer from
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
optionally write to .parquet file.
'''
async def main():
async with (
open_piker_runtime(
@ -197,73 +273,59 @@ def clone(
enable_modules=['piker.data._sharedmem'],
),
):
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=key,
dtype=def_iohlcv_fields,
)
assert not opened
ohlcv = shm.array
start = time.time()
df: pl.DataFrame | None = None
for shmfile, shm, df in iter_dfs_from_shms(fqme):
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: ohlcv[field_name]
for field_name in ohlcv.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
secs: float = times[-1] - times[-2]
if secs < 1.:
breakpoint()
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
# compute ohlc properties for naming
times: np.ndarray = ohlcv['time']
secs: float = times[-1] - times[-2]
if secs < 1.:
breakpoint()
raise ValueError(
f'Something is wrong with time period for {shm}:\n{ohlcv}'
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
await tractor.breakpoint()
timeframe: str = f'{secs}s'
# write to parquet file?
if write_parquet:
timeframe: str = f'{secs}s'
# write to parquet file
datadir: Path = get_conf_dir() / 'parqdb'
if not datadir.is_dir():
datadir.mkdir()
datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():
datadir.mkdir()
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)
if df is None:
log.error(f'No matching shm buffers for {fqme} ?')
trio.run(main)

View File

@ -137,6 +137,14 @@ def mk_ohlcv_shm_keyed_filepath(
return path
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
filename: str = str(path.name)
fqme, fmt_descr, suffix = filename.split('.')
assert suffix == 'parquet'
return fqme
ohlc_key_map = None
@ -347,10 +355,27 @@ class NativeStorageClient:
path.unlink()
log.warning(f'Deleting parquet entry:\n{path}')
else:
log.warning(f'No path exists:\n{path}')
log.error(f'No path exists:\n{path}')
return path
# TODO: allow wiping and refetching a segment of the OHLCV timeseries
# data.
# def clear_range(
# self,
# key: str,
# start_dt: datetime,
# end_dt: datetime,
# timeframe: int | None = None,
# ) -> pl.DataFrame:
# '''
# Clear and re-fetch a range of datums for the OHLCV time series.
# Useful for series editing from a chart B)
# '''
# ...
@acm
async def get_client(