diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 8cb39440..73cf737e 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -20,10 +20,13 @@ Storage middle-ware CLIs. """ from __future__ import annotations from pathlib import Path +import time +from typing import Generator # from typing import TYPE_CHECKING import polars as pl import numpy as np +import tractor # import pendulum from rich.console import Console import trio @@ -32,6 +35,16 @@ import typer from piker.service import open_piker_runtime from piker.cli import cli +from piker.config import get_conf_dir +from piker.data import ( + maybe_open_shm_array, + def_iohlcv_fields, + ShmArray, +) +from piker.data.history import ( + _default_hist_size, + _default_rt_size, +) from . import ( log, ) @@ -132,8 +145,6 @@ def anal( ) -> np.ndarray: - import tractor - async def main(): async with ( open_piker_runtime( @@ -171,25 +182,90 @@ def anal( trio.run(main) +def iter_dfs_from_shms(fqme: str) -> Generator[ + tuple[Path, ShmArray, pl.DataFrame], + None, + None, +]: + # shm buffer size table based on known sample rates + sizes: dict[str, int] = { + 'hist': _default_hist_size, + 'rt': _default_rt_size, + } + + # load all detected shm buffer files which have the + # passed FQME pattern in the file name. + shmfiles: list[Path] = [] + shmdir = Path('/dev/shm/') + + for shmfile in shmdir.glob(f'*{fqme}*'): + filename: str = shmfile.name + + # skip index files + if ( + '_first' in filename + or '_last' in filename + ): + continue + + assert shmfile.is_file() + log.debug(f'Found matching shm buffer file: {filename}') + shmfiles.append(shmfile) + + for shmfile in shmfiles: + + # lookup array buffer size based on file suffix + # being either .rt or .hist + size: int = sizes[shmfile.name.rsplit('.')[-1]] + + # attach to any shm buffer, load array into polars df, + # write to local parquet file. + shm, opened = maybe_open_shm_array( + key=shmfile.name, + size=size, + dtype=def_iohlcv_fields, + readonly=True, + ) + assert not opened + ohlcv = shm.array + + start = time.time() + + # XXX: thanks to this SO answer for this conversion tip: + # https://stackoverflow.com/a/72054819 + df = pl.DataFrame({ + field_name: ohlcv[field_name] + for field_name in ohlcv.dtype.fields + }) + delay: float = round( + time.time() - start, + ndigits=6, + ) + log.info( + f'numpy -> polars conversion took {delay} secs\n' + f'polars df: {df}' + ) + + yield ( + shmfile, + shm, + df, + ) + + @store.command() -def clone( +def ldshm( fqme: str, + + write_parquet: bool = False, + ) -> None: - import time - from piker.config import get_conf_dir - from piker.data import ( - maybe_open_shm_array, - def_iohlcv_fields, - ) - import polars as pl - - # TODO: actually look up an existing shm buf (set) from - # an fqme and file name parsing.. - # open existing shm buffer for kucoin backend - key: str = 'piker.brokerd[3595d316-3c15-46].xmrusdt.kucoin.hist' - shmpath: Path = Path('/dev/shm') / key - assert shmpath.is_file() + ''' + Linux ONLY: load any fqme file name matching shm buffer from + /dev/shm/ into an OHLCV numpy array and polars DataFrame, + optionally write to .parquet file. + ''' async def main(): async with ( open_piker_runtime( @@ -197,73 +273,59 @@ def clone( enable_modules=['piker.data._sharedmem'], ), ): - # attach to any shm buffer, load array into polars df, - # write to local parquet file. - shm, opened = maybe_open_shm_array( - key=key, - dtype=def_iohlcv_fields, - ) - assert not opened - ohlcv = shm.array - start = time.time() + df: pl.DataFrame | None = None + for shmfile, shm, df in iter_dfs_from_shms(fqme): - # XXX: thanks to this SO answer for this conversion tip: - # https://stackoverflow.com/a/72054819 - df = pl.DataFrame({ - field_name: ohlcv[field_name] - for field_name in ohlcv.dtype.fields - }) - delay: float = round( - time.time() - start, - ndigits=6, - ) - print( - f'numpy -> polars conversion took {delay} secs\n' - f'polars df: {df}' - ) + # compute ohlc properties for naming + times: np.ndarray = shm.array['time'] + secs: float = times[-1] - times[-2] + if secs < 1.: + breakpoint() + raise ValueError( + f'Something is wrong with time period for {shm}:\n{times}' + ) - # compute ohlc properties for naming - times: np.ndarray = ohlcv['time'] - secs: float = times[-1] - times[-2] - if secs < 1.: - breakpoint() - raise ValueError( - f'Something is wrong with time period for {shm}:\n{ohlcv}' - ) + # TODO: maybe only optionally enter this depending + # on some CLI flags and/or gap detection? + await tractor.breakpoint() - timeframe: str = f'{secs}s' + # write to parquet file? + if write_parquet: + timeframe: str = f'{secs}s' - # write to parquet file - datadir: Path = get_conf_dir() / 'parqdb' - if not datadir.is_dir(): - datadir.mkdir() + datadir: Path = get_conf_dir() / 'nativedb' + if not datadir.is_dir(): + datadir.mkdir() - path: Path = datadir / f'{fqme}.{timeframe}.parquet' + path: Path = datadir / f'{fqme}.{timeframe}.parquet' - # write to fs - start = time.time() - df.write_parquet(path) - delay: float = round( - time.time() - start, - ndigits=6, - ) - print( - f'parquet write took {delay} secs\n' - f'file path: {path}' - ) + # write to fs + start = time.time() + df.write_parquet(path) + delay: float = round( + time.time() - start, + ndigits=6, + ) + log.info( + f'parquet write took {delay} secs\n' + f'file path: {path}' + ) - # read back from fs - start = time.time() - read_df: pl.DataFrame = pl.read_parquet(path) - delay: float = round( - time.time() - start, - ndigits=6, - ) - print( - f'parquet read took {delay} secs\n' - f'polars df: {read_df}' - ) + # read back from fs + start = time.time() + read_df: pl.DataFrame = pl.read_parquet(path) + delay: float = round( + time.time() - start, + ndigits=6, + ) + print( + f'parquet read took {delay} secs\n' + f'polars df: {read_df}' + ) + + if df is None: + log.error(f'No matching shm buffers for {fqme} ?') trio.run(main) diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 9561d4e9..ff914245 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -137,6 +137,14 @@ def mk_ohlcv_shm_keyed_filepath( return path +def unpack_fqme_from_parquet_filepath(path: Path) -> str: + + filename: str = str(path.name) + fqme, fmt_descr, suffix = filename.split('.') + assert suffix == 'parquet' + return fqme + + ohlc_key_map = None @@ -347,10 +355,27 @@ class NativeStorageClient: path.unlink() log.warning(f'Deleting parquet entry:\n{path}') else: - log.warning(f'No path exists:\n{path}') + log.error(f'No path exists:\n{path}') return path + # TODO: allow wiping and refetching a segment of the OHLCV timeseries + # data. + # def clear_range( + # self, + # key: str, + # start_dt: datetime, + # end_dt: datetime, + # timeframe: int | None = None, + # ) -> pl.DataFrame: + # ''' + # Clear and re-fetch a range of datums for the OHLCV time series. + + # Useful for series editing from a chart B) + + # ''' + # ... + @acm async def get_client(