From 9fd412f6313e1e2b2fb65317e209aeea7f0aeee2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 6 Jun 2023 13:00:25 -0400 Subject: [PATCH] Add basic time-sampling gap detection via `polars` For OHLCV time series we normally presume a uniform sampling period (1s or 60s by default) and it's handy to have tools to ensure a series is gapless or contains expected gaps based on (legacy) market hours. For this we leverage `polars`: - add `.nativedb.with_dts()` a datetime-from-epoch-time-column frame "column-expander" which inserts datetime-casted, epoch-diff and dt-diff columns. - add `.nativedb.detect_time_gaps()` which filters to any larger then expected sampling period rows. - wrap the above (for now) in a `piker store anal` (analysis) cmd which atm always enters a breakpoint for tinkering. Supporting storage client additions: - add a `detect_period()` helper for extracting expected OHLC time step. - add new `NativedbStorageClient` methods and attrs to provide for the above: - `.mk_path()` to **only** deliver a parquet-file path for use in other methods. - `._dfs` to house cached `pl.DataFrame`s loaded from `.parquet` files. - `.as_df()` which loads cached frames or loads them from disk and then caches (for next use). - `_write_ohlcv()` a private-sync version of the public equivalent meth since we don't currently have any actual async file IO underneath; add a flag for whether to return as a `numpy.ndarray`. --- piker/storage/cli.py | 38 ++++----- piker/storage/nativedb.py | 161 +++++++++++++++++++++++++++++++++----- 2 files changed, 162 insertions(+), 37 deletions(-) diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 5d7119e2..ae7393e4 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -20,10 +20,11 @@ Storage middle-ware CLIs. """ from __future__ import annotations from pathlib import Path -from typing import TYPE_CHECKING +# from typing import TYPE_CHECKING +import polars as pl import numpy as np -import pendulum +# import pendulum from rich.console import Console import trio # from rich.markdown import Markdown @@ -34,9 +35,10 @@ from piker.cli import cli from . import ( log, ) - -if TYPE_CHECKING: - from . import Storage +from . import ( + __tsdbs__, + open_storage_client, +) store = typer.Typer() @@ -49,11 +51,6 @@ def ls( help='Storage backends to query, default is all.' ), ): - # from piker.service import open_piker_runtime - from . import ( - __tsdbs__, - open_storage_client, - ) from rich.table import Table if not backends: @@ -129,21 +126,18 @@ def delete( @store.command() -def read( +def anal( fqme: str, - - limit: int = int(800e3), - # client_type: str = 'async', + period: int = 60, ) -> np.ndarray: - # end: int | None = None # import tractor - from .nativedb import get_client async def main(): - async with get_client() as client: + async with open_storage_client() as (mod, client): syms: list[str] = await client.list_keys() + print(f'{len(syms)} FOUND for {mod.name}') ( history, @@ -151,10 +145,16 @@ def read( last_dt, ) = await client.load( fqme, - 60, + period, ) assert first_dt < last_dt - print(f'{fqme} SIZE -> {history.size}') + + src_df = await client.as_df(fqme, period) + df = mod.with_dts(src_df) + gaps: pl.DataFrame = mod.detect_time_gaps(df) + if gaps.is_empty(): + breakpoint() + breakpoint() # await tractor.breakpoint() diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 1a4c5e12..9e4e848d 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -65,7 +65,7 @@ from pendulum import ( from piker import config from piker.data import def_iohlcv_fields -# from piker.data import ShmArray +from piker.data import ShmArray from piker.log import get_logger # from .._profile import Profiler @@ -86,6 +86,7 @@ def np2pl(array: np.ndarray) -> pl.DataFrame: def pl2np( df: pl.DataFrame, dtype: np.dtype, + ) -> np.ndarray: # Create numpy struct array of the correct size and dtype @@ -103,18 +104,31 @@ def pl2np( return array +def detect_period(shm: ShmArray) -> float: + ''' + Attempt to detect the series time step sampling period + in seconds. + + ''' + # TODO: detect sample rate helper? + # calc ohlc sample period for naming + ohlcv: np.ndarray = shm.array + times: np.ndarray = ohlcv['time'] + period: float = times[-1] - times[-2] + if period == 0: + # maybe just last sample is borked? + period: float = times[-2] - times[-3] + + return period + + def mk_ohlcv_shm_keyed_filepath( fqme: str, period: float, # ow known as the "timeframe" - # shm: ShmArray, datadir: Path, ) -> str: - # calc ohlc sample period for naming - # ohlcv: np.ndarray = shm.array - # times: np.ndarray = ohlcv['time'] - # period: float = times[-1] - times[-2] if period < 1.: raise ValueError('Sample period should be >= 1.!?') @@ -146,7 +160,7 @@ class NativeStorageClient: self._index: dict[str, dict] = {} # series' cache from tsdb reads - self._dfs: dict[str, pl.DataFrame] = {} + self._dfs: dict[str, dict[str, pl.DataFrame]] = {} @property def address(self) -> str: @@ -217,6 +231,17 @@ class NativeStorageClient: from_timestamp(times[-1]), ) + def mk_path( + self, + fqme: str, + period: float, + ) -> Path: + return mk_ohlcv_shm_keyed_filepath( + fqme=fqme, + period=period, + datadir=self._datadir, + ) + async def read_ohlcv( self, fqme: str, @@ -225,36 +250,51 @@ class NativeStorageClient: # limit: int = int(200e3), ) -> np.ndarray: - path: Path = mk_ohlcv_shm_keyed_filepath( - fqme=fqme, - period=timeframe, - datadir=self._datadir, - ) + path: Path = self.mk_path(fqme, period=int(timeframe)) df: pl.DataFrame = pl.read_parquet(path) + self._dfs.setdefault(timeframe, {})[fqme] = df # TODO: filter by end and limit inputs # times: pl.Series = df['time'] - - return pl2np( + array: np.ndarray = pl2np( df, dtype=np.dtype(def_iohlcv_fields), ) + return array - async def write_ohlcv( + async def as_df( self, fqme: str, - ohlcv: np.ndarray, + period: int = 60, + + ) -> pl.DataFrame: + try: + return self._dfs[period][fqme] + except KeyError: + await self.read_ohlcv(fqme, period) + return self._dfs[period][fqme] + + def _write_ohlcv( + self, + fqme: str, + ohlcv: np.ndarray | pl.DataFrame, timeframe: int, - # limit: int = int(800e3), ) -> Path: + ''' + Sync version of the public interface meth, since we don't + currently actually need or support an async impl. + ''' path: Path = mk_ohlcv_shm_keyed_filepath( fqme=fqme, period=timeframe, datadir=self._datadir, ) - df: pl.DataFrame = np2pl(ohlcv) + if isinstance(ohlcv, np.ndarray): + df: pl.DataFrame = np2pl(ohlcv) + else: + df = ohlcv # TODO: use a proper profiler start = time.time() @@ -269,6 +309,25 @@ class NativeStorageClient: ) return path + + async def write_ohlcv( + self, + fqme: str, + ohlcv: np.ndarray, + timeframe: int, + + ) -> Path: + ''' + Write input ohlcv time series for fqme and sampling period + to (local) disk. + + ''' + return self._write_ohlcv( + fqme, + ohlcv, + timeframe, + ) + async def delete_ts( self, key: str, @@ -312,3 +371,69 @@ async def get_client( client = NativeStorageClient(datadir) client.index_files() yield client + + +def with_dts( + df: pl.DataFrame, + time_col: str = 'time', +) -> pl.DataFrame: + ''' + Insert datetime (casted) columns to a (presumably) OHLC sampled + time series with an epoch-time column keyed by ``time_col``. + + ''' + return df.with_columns([ + pl.col(time_col).shift(1).suffix('_prev'), + pl.col(time_col).diff().alias('s_diff'), + pl.from_epoch(pl.col(time_col)).alias('dt'), + ]).with_columns([ + pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'), + ]).with_columns( + (pl.col('dt') - pl.col('dt_prev')).alias('dt_diff'), + ) + + +def detect_time_gaps( + df: pl.DataFrame, + expect_period: float = 60, + time_col: str = 'time', + +) -> pl.DataFrame: + ''' + Filter to OHLC datums which contain sample step gaps. + + For eg. legacy markets which have venue close gaps and/or + actual missing data segments. + + ''' + return with_dts(df).filter(pl.col('s_diff') > expect_period) + + +def detect_price_gaps( + df: pl.DataFrame, + gt_multiplier: float = 2., + price_fields: list[str] = ['high', 'low'], + +) -> pl.DataFrame: + ''' + Detect gaps in clearing price over an OHLC series. + + 2 types of gaps generally exist; up gaps and down gaps: + + - UP gap: when any next sample's lo price is strictly greater + then the current sample's hi price. + + - DOWN gap: when any next sample's hi price is strictly + less then the current samples lo price. + + ''' + # return df.filter( + # pl.col('high') - ) > expect_period, + # ).select([ + # pl.dt.datetime(pl.col(time_col).shift(1)).suffix('_previous'), + # pl.all(), + # ]).select([ + # pl.all(), + # (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'), + # ]) + ...