Add basic time-sampling gap detection via `polars`

For OHLCV time series we normally presume a uniform sampling period
(1s or 60s by default) and it's handy to have tools to ensure a series
is gapless or contains expected gaps based on (legacy) market hours.

For this we leverage `polars`:
- add `.nativedb.with_dts()` a datetime-from-epoch-time-column frame
  "column-expander" which inserts datetime-casted, epoch-diff and
  dt-diff columns.
- add `.nativedb.detect_time_gaps()` which filters to any larger then
  expected sampling period rows.
- wrap the above (for now) in a `piker store anal` (analysis) cmd which
  atm always enters a breakpoint for tinkering.

Supporting storage client additions:
- add a `detect_period()` helper for extracting expected OHLC time step.
- add new `NativedbStorageClient` methods and attrs to provide for the above:
    - `.mk_path()` to **only** deliver a parquet-file path for use in
      other methods.
    - `._dfs` to house cached `pl.DataFrame`s loaded from `.parquet` files.
    - `.as_df()` which loads cached frames or loads them from disk and
      then caches (for next use).
    - `_write_ohlcv()` a private-sync version of the public equivalent
      meth since we don't currently have any actual async file IO
      underneath; add a flag for whether to return as a `numpy.ndarray`.
basic_buy_bot
Tyler Goodlet 2023-06-06 13:00:25 -04:00
parent d027ad5a4f
commit 9fd412f631
2 changed files with 162 additions and 37 deletions

View File

@ -20,10 +20,11 @@ Storage middle-ware CLIs.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
# from typing import TYPE_CHECKING
import polars as pl
import numpy as np
import pendulum
# import pendulum
from rich.console import Console
import trio
# from rich.markdown import Markdown
@ -34,9 +35,10 @@ from piker.cli import cli
from . import (
log,
)
if TYPE_CHECKING:
from . import Storage
from . import (
__tsdbs__,
open_storage_client,
)
store = typer.Typer()
@ -49,11 +51,6 @@ def ls(
help='Storage backends to query, default is all.'
),
):
# from piker.service import open_piker_runtime
from . import (
__tsdbs__,
open_storage_client,
)
from rich.table import Table
if not backends:
@ -129,21 +126,18 @@ def delete(
@store.command()
def read(
def anal(
fqme: str,
limit: int = int(800e3),
# client_type: str = 'async',
period: int = 60,
) -> np.ndarray:
# end: int | None = None
# import tractor
from .nativedb import get_client
async def main():
async with get_client() as client:
async with open_storage_client() as (mod, client):
syms: list[str] = await client.list_keys()
print(f'{len(syms)} FOUND for {mod.name}')
(
history,
@ -151,10 +145,16 @@ def read(
last_dt,
) = await client.load(
fqme,
60,
period,
)
assert first_dt < last_dt
print(f'{fqme} SIZE -> {history.size}')
src_df = await client.as_df(fqme, period)
df = mod.with_dts(src_df)
gaps: pl.DataFrame = mod.detect_time_gaps(df)
if gaps.is_empty():
breakpoint()
breakpoint()
# await tractor.breakpoint()

View File

@ -65,7 +65,7 @@ from pendulum import (
from piker import config
from piker.data import def_iohlcv_fields
# from piker.data import ShmArray
from piker.data import ShmArray
from piker.log import get_logger
# from .._profile import Profiler
@ -86,6 +86,7 @@ def np2pl(array: np.ndarray) -> pl.DataFrame:
def pl2np(
df: pl.DataFrame,
dtype: np.dtype,
) -> np.ndarray:
# Create numpy struct array of the correct size and dtype
@ -103,18 +104,31 @@ def pl2np(
return array
def detect_period(shm: ShmArray) -> float:
'''
Attempt to detect the series time step sampling period
in seconds.
'''
# TODO: detect sample rate helper?
# calc ohlc sample period for naming
ohlcv: np.ndarray = shm.array
times: np.ndarray = ohlcv['time']
period: float = times[-1] - times[-2]
if period == 0:
# maybe just last sample is borked?
period: float = times[-2] - times[-3]
return period
def mk_ohlcv_shm_keyed_filepath(
fqme: str,
period: float, # ow known as the "timeframe"
# shm: ShmArray,
datadir: Path,
) -> str:
# calc ohlc sample period for naming
# ohlcv: np.ndarray = shm.array
# times: np.ndarray = ohlcv['time']
# period: float = times[-1] - times[-2]
if period < 1.:
raise ValueError('Sample period should be >= 1.!?')
@ -146,7 +160,7 @@ class NativeStorageClient:
self._index: dict[str, dict] = {}
# series' cache from tsdb reads
self._dfs: dict[str, pl.DataFrame] = {}
self._dfs: dict[str, dict[str, pl.DataFrame]] = {}
@property
def address(self) -> str:
@ -217,6 +231,17 @@ class NativeStorageClient:
from_timestamp(times[-1]),
)
def mk_path(
self,
fqme: str,
period: float,
) -> Path:
return mk_ohlcv_shm_keyed_filepath(
fqme=fqme,
period=period,
datadir=self._datadir,
)
async def read_ohlcv(
self,
fqme: str,
@ -225,36 +250,51 @@ class NativeStorageClient:
# limit: int = int(200e3),
) -> np.ndarray:
path: Path = mk_ohlcv_shm_keyed_filepath(
fqme=fqme,
period=timeframe,
datadir=self._datadir,
)
path: Path = self.mk_path(fqme, period=int(timeframe))
df: pl.DataFrame = pl.read_parquet(path)
self._dfs.setdefault(timeframe, {})[fqme] = df
# TODO: filter by end and limit inputs
# times: pl.Series = df['time']
return pl2np(
array: np.ndarray = pl2np(
df,
dtype=np.dtype(def_iohlcv_fields),
)
return array
async def write_ohlcv(
async def as_df(
self,
fqme: str,
ohlcv: np.ndarray,
period: int = 60,
) -> pl.DataFrame:
try:
return self._dfs[period][fqme]
except KeyError:
await self.read_ohlcv(fqme, period)
return self._dfs[period][fqme]
def _write_ohlcv(
self,
fqme: str,
ohlcv: np.ndarray | pl.DataFrame,
timeframe: int,
# limit: int = int(800e3),
) -> Path:
'''
Sync version of the public interface meth, since we don't
currently actually need or support an async impl.
'''
path: Path = mk_ohlcv_shm_keyed_filepath(
fqme=fqme,
period=timeframe,
datadir=self._datadir,
)
df: pl.DataFrame = np2pl(ohlcv)
if isinstance(ohlcv, np.ndarray):
df: pl.DataFrame = np2pl(ohlcv)
else:
df = ohlcv
# TODO: use a proper profiler
start = time.time()
@ -269,6 +309,25 @@ class NativeStorageClient:
)
return path
async def write_ohlcv(
self,
fqme: str,
ohlcv: np.ndarray,
timeframe: int,
) -> Path:
'''
Write input ohlcv time series for fqme and sampling period
to (local) disk.
'''
return self._write_ohlcv(
fqme,
ohlcv,
timeframe,
)
async def delete_ts(
self,
key: str,
@ -312,3 +371,69 @@ async def get_client(
client = NativeStorageClient(datadir)
client.index_files()
yield client
def with_dts(
df: pl.DataFrame,
time_col: str = 'time',
) -> pl.DataFrame:
'''
Insert datetime (casted) columns to a (presumably) OHLC sampled
time series with an epoch-time column keyed by ``time_col``.
'''
return df.with_columns([
pl.col(time_col).shift(1).suffix('_prev'),
pl.col(time_col).diff().alias('s_diff'),
pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([
pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'),
]).with_columns(
(pl.col('dt') - pl.col('dt_prev')).alias('dt_diff'),
)
def detect_time_gaps(
df: pl.DataFrame,
expect_period: float = 60,
time_col: str = 'time',
) -> pl.DataFrame:
'''
Filter to OHLC datums which contain sample step gaps.
For eg. legacy markets which have venue close gaps and/or
actual missing data segments.
'''
return with_dts(df).filter(pl.col('s_diff') > expect_period)
def detect_price_gaps(
df: pl.DataFrame,
gt_multiplier: float = 2.,
price_fields: list[str] = ['high', 'low'],
) -> pl.DataFrame:
'''
Detect gaps in clearing price over an OHLC series.
2 types of gaps generally exist; up gaps and down gaps:
- UP gap: when any next sample's lo price is strictly greater
then the current sample's hi price.
- DOWN gap: when any next sample's hi price is strictly
less then the current samples lo price.
'''
# return df.filter(
# pl.col('high') - ) > expect_period,
# ).select([
# pl.dt.datetime(pl.col(time_col).shift(1)).suffix('_previous'),
# pl.all(),
# ]).select([
# pl.all(),
# (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'),
# ])
...