diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 1d998558..8c6d67ea 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -20,8 +20,12 @@ Storage middle-ware CLIs. """ from __future__ import annotations # from datetime import datetime +# from contextlib import ( +# AsyncExitStack, +# ) from pathlib import Path import time +from types import ModuleType import polars as pl import numpy as np @@ -34,7 +38,6 @@ import typer from piker.service import open_piker_runtime from piker.cli import cli -from piker.config import get_conf_dir from piker.data import ( ShmArray, ) @@ -45,6 +48,7 @@ from . import ( from . import ( __tsdbs__, open_storage_client, + StorageClient, ) @@ -232,7 +236,8 @@ def anal( @store.command() def ldshm( fqme: str, - write_parquet: bool = False, + write_parquet: bool = True, + reload_parquet_to_shm: bool = True, ) -> None: ''' @@ -242,15 +247,32 @@ def ldshm( ''' async def main(): + from piker.ui._remote_ctl import ( + open_annot_ctl, + AnnotCtl, + ) + actl: AnnotCtl + mod: ModuleType + client: StorageClient async with ( open_piker_runtime( 'polars_boi', enable_modules=['piker.data._sharedmem'], debug_mode=True, ), + open_storage_client() as ( + mod, + client, + ), + open_annot_ctl() as actl, ): - df: pl.DataFrame | None = None - for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme): + shm_df: pl.DataFrame | None = None + for ( + shmfile, + shm, + # parquet_path, + shm_df, + ) in tsp.iter_dfs_from_shms(fqme): # compute ohlc properties for naming times: np.ndarray = shm.array['time'] @@ -275,122 +297,136 @@ def ldshm( period=period_s, ) - # TODO: maybe only optionally enter this depending - # on some CLI flags and/or gap detection? - if ( + needs_correction: bool = ( not gaps.is_empty() or null_segs - ): - from piker.ui._remote_ctl import ( - open_annot_ctl, - AnnotCtl, - ) - annot_ctl: AnnotCtl - async with open_annot_ctl() as annot_ctl: - for i in range(gaps.height): + ) + # TODO: maybe only optionally enter this depending + # on some CLI flags and/or gap detection? + if needs_correction: + for i in range(gaps.height): + row: pl.DataFrame = gaps[i] - row: pl.DataFrame = gaps[i] + # TODO: can we eventually remove this + # once we figure out why the epoch cols + # don't match? + iend: int = row['index'][0] + # dt: datetime = row['dt'][0] + # dt_prev: datetime = row['dt_prev'][0] - # TODO: can we eventually remove this - # once we figure out why the epoch cols - # don't match? - iend: int = row['index'][0] - # dt: datetime = row['dt'][0] - # dt_prev: datetime = row['dt_prev'][0] + # the gap's right-most bar's OPEN value + # at that time (sample) step. + # dt_end_t: float = dt.timestamp() - # the gap's right-most bar's OPEN value - # at that time (sample) step. - # dt_end_t: float = dt.timestamp() + # TODO: FIX HOW/WHY these aren't matching + # and are instead off by 4hours (EST + # vs. UTC?!?!) + # end_t: float = row['time'] + # assert ( + # dt.timestamp() + # == + # end_t + # ) - # TODO: FIX HOW/WHY these aren't matching - # and are instead off by 4hours (EST - # vs. UTC?!?!) - # end_t: float = row['time'] - # assert ( - # dt.timestamp() - # == - # end_t - # ) + # the gap's left-most bar's CLOSE value + # at that time (sample) step. + prev_r: pl.DataFrame = df.filter( + pl.col('index') == iend - 1 + ) + istart: int = prev_r['index'][0] + # dt_start_t: float = dt_prev.timestamp() - # the gap's left-most bar's CLOSE value - # at that time (sample) step. + # start_t: float = prev_r['time'] + # assert ( + # dt_start_t + # == + # start_t + # ) - prev_r: pl.DataFrame = df.filter( - pl.col('index') == gaps[0]['index'] - 1 + # TODO: implement px-col width measure + # and ensure at least as many px-cols + # shown per rect as configured by user. + gap_w: float = abs((iend - istart)) + if gap_w < 6: + margin: float = 6 + iend += margin + istart -= margin + + ro: tuple[float, float] = ( + # dt_end_t, + iend, + row['open'][0], + ) + lc: tuple[float, float] = ( + # dt_start_t, + istart, + prev_r['close'][0], + ) + + # async with actl.open_rect( + # ) as aid: + aid: int = await actl.add_rect( + fqme=fqme, + timeframe=period_s, + start_pos=lc, + end_pos=ro, + ) + assert aid + + # write to parquet file? + if ( + write_parquet + ): + # write to fs + start = time.time() + path: Path = await client.write_ohlcv( + fqme, + ohlcv=deduped, + timeframe=period_s, + ) + write_delay: float = round( + time.time() - start, + ndigits=6, + ) + + # read back from fs + start = time.time() + read_df: pl.DataFrame = pl.read_parquet(path) + read_delay: float = round( + time.time() - start, + ndigits=6, + ) + log.info( + f'parquet write took {write_delay} secs\n' + f'file path: {path}' + f'parquet read took {read_delay} secs\n' + f'polars df: {read_df}' + ) + + if reload_parquet_to_shm: + new = tsp.pl2np( + deduped, + dtype=shm.array.dtype, ) - istart: int = prev_r['index'][0] - # dt_start_t: float = dt_prev.timestamp() - - # start_t: float = prev_r['time'] - # assert ( - # dt_start_t - # == - # start_t - # ) - - # TODO: implement px-col width measure - # and ensure at least as many px-cols - # shown per rect as configured by user. - gap_w: float = abs((iend - istart)) - # await tractor.pause() - if gap_w < 6: - margin: float = 6 - iend += margin - istart -= margin - - ro: tuple[float, float] = ( - # dt_end_t, - iend, - row['open'][0], + # since normally readonly + shm._array.setflags( + write=int(1), ) - lc: tuple[float, float] = ( - # dt_start_t, - istart, - prev_r['close'][0], + shm.push( + new, + prepend=True, + start=new['index'][-1], + update_first=False, # don't update ._first ) - aid: int = await annot_ctl.add_rect( - fqme=fqme, - timeframe=period_s, - start_pos=lc, - end_pos=ro, - ) - assert aid - await tractor.pause() + await tractor.pause() + assert diff - # write to parquet file? - if write_parquet: - timeframe: str = f'{period_s}s' + else: + # allow interaction even when no ts problems. + await tractor.pause() + assert not diff - datadir: Path = get_conf_dir() / 'nativedb' - if not datadir.is_dir(): - datadir.mkdir() - - path: Path = datadir / f'{fqme}.{timeframe}.parquet' - - # write to fs - start = time.time() - df.write_parquet(path) - delay: float = round( - time.time() - start, - ndigits=6, - ) - log.info( - f'parquet write took {delay} secs\n' - f'file path: {path}' - ) - - # read back from fs - start = time.time() - read_df: pl.DataFrame = pl.read_parquet(path) - delay: float = round( - time.time() - start, - ndigits=6, - ) - print( - f'parquet read took {delay} secs\n' - f'polars df: {read_df}' - ) if df is None: log.error(f'No matching shm buffers for {fqme} ?') diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 7d64cb6e..bc7f10e3 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -95,16 +95,19 @@ def detect_period(shm: ShmArray) -> float: def mk_ohlcv_shm_keyed_filepath( fqme: str, - period: float, # ow known as the "timeframe" + period: float | int, # ow known as the "timeframe" datadir: Path, -) -> str: +) -> Path: if period < 1.: raise ValueError('Sample period should be >= 1.!?') - period_s: str = f'{period}s' - path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet' + path: Path = ( + datadir + / + f'{fqme}.ohlcv{int(period)}s.parquet' + ) return path @@ -227,6 +230,7 @@ class NativeStorageClient: self, fqme: str, period: float, + ) -> Path: return mk_ohlcv_shm_keyed_filepath( fqme=fqme, @@ -239,6 +243,7 @@ class NativeStorageClient: fqme: str, df: pl.DataFrame, timeframe: float, + ) -> None: # cache df for later usage since we (currently) need to # convert to np.ndarrays to push to our `ShmArray` rt