diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 352db2cd..3afb696a 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -19,13 +19,18 @@ Storage middle-ware CLIs. """ from __future__ import annotations +from pathlib import Path from typing import TYPE_CHECKING -import trio + +import numpy as np +import pendulum from rich.console import Console +import trio # from rich.markdown import Markdown import typer -from ..cli import cli +from piker.service import open_piker_runtime +from piker.cli import cli from . import ( log, ) @@ -44,7 +49,7 @@ def ls( help='Storage backends to query, default is all.' ), ): - from piker.service import open_piker_runtime + # from piker.service import open_piker_runtime from . import ( __tsdbs__, open_storage_client, @@ -132,7 +137,6 @@ def delete( ``symbols``. ''' - from piker.service import open_piker_runtime from . import open_storage_client async def main(symbols: list[str]): @@ -157,5 +161,186 @@ def delete( trio.run(main, symbols) +@store.command() +def read( + fqme: str, + + limit: int = int(800e3), + client_type: str = 'async', + +) -> np.ndarray: + + end: int | None = None + + if client_type == 'sync': + import pymarketstore as pymkts + cli = pymkts.Client() + + + while end != 0: + param = pymkts.Params( + fqme, + '1Min', + 'OHLCV', + limit=limit, + # limit_from_start=True, + end=end, + ) + if end is not None: + breakpoint() + reply = cli.query(param) + ds: pymkts.results.DataSet = reply.first() + array: np.ndarray = ds.array + + print(f'loaded {len(array)}-len array:\n{array}') + + times = array['Epoch'] + end: float = float(times[0]) + dt = pendulum.from_timestamp(end) + # end: str = dt.isoformat('T') + breakpoint() + print( + f'trying to load next {limit} datums frame starting @ {dt}' + ) + else: + from anyio_marketstore import ( # noqa + open_marketstore_client, + MarketstoreClient, + Params, + ) + async def main(): + + end: int | None = None + + async with open_marketstore_client( + 'localhost', + 5995, + ) as client: + + while end != 0: + params = Params( + symbols=fqme, + # timeframe=tfstr, + timeframe='1Min', + attrgroup='OHLCV', + end=end, + # limit_from_start=True, + + # TODO: figure the max limit here given the + # ``purepc`` msg size limit of purerpc: 33554432 + limit=limit, + ) + + if end is not None: + breakpoint() + result = await client.query(params) + data_set = result.by_symbols()[fqme] + array = data_set.array + times = array['Epoch'] + end: float = float(times[0]) + dt = pendulum.from_timestamp(end) + breakpoint() + print( + f'trying to load next {limit} datums frame starting @ {dt}' + ) + + trio.run(main) + + +@store.command() +def clone( + fqme: str, +) -> None: + import time + from piker.config import get_conf_dir + from piker.data import ( + maybe_open_shm_array, + def_iohlcv_fields, + ) + import polars as pl + + # open existing shm buffer for kucoin backend + key: str = 'piker.brokerd[d07c9bb7-b720-41].tlosusdt.kucoin.hist' + shmpath: Path = Path('/dev/shm') / key + assert shmpath.is_file() + + async def main(): + async with ( + open_piker_runtime( + 'polars_boi', + enable_modules=['piker.data._sharedmem'], + ), + ): + # attach to any shm buffer, load array into polars df, + # write to local parquet file. + shm, opened = maybe_open_shm_array( + key=key, + dtype=def_iohlcv_fields, + ) + assert not opened + ohlcv = shm.array + + start = time.time() + + # XXX: thanks to this SO answer for this conversion tip: + # https://stackoverflow.com/a/72054819 + df = pl.DataFrame({ + field_name: ohlcv[field_name] + for field_name in ohlcv.dtype.fields + }) + delay: float = round( + time.time() - start, + ndigits=6, + ) + print( + f'numpy -> polars conversion took {delay} secs\n' + f'polars df: {df}' + ) + + # compute ohlc properties for naming + times: np.ndarray = ohlcv['time'] + secs: float = times[-1] - times[-2] + if secs < 1.: + breakpoint() + raise ValueError( + f'Something is wrong with time period for {shm}:\n{ohlcv}' + ) + + timeframe: str = f'{secs}s' + + # write to parquet file + datadir: Path = get_conf_dir() / 'parqdb' + if not datadir.is_dir(): + datadir.mkdir() + + path: Path = datadir / f'{fqme}.{timeframe}.parquet' + + # write to fs + start = time.time() + df.write_parquet(path) + delay: float = round( + time.time() - start, + ndigits=6, + ) + print( + f'parquet write took {delay} secs\n' + f'file path: {path}' + ) + + # read back from fs + start = time.time() + read_df: pl.DataFrame = pl.read_parquet(path) + delay: float = round( + time.time() - start, + ndigits=6, + ) + print( + f'parquet read took {delay} secs\n' + f'polars df: {read_df}' + ) + + trio.run(main) + + typer_click_object = typer.main.get_command(store) cli.add_command(typer_click_object, 'store')