diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 9e4e848d..4a6ecf0e 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -54,6 +54,9 @@ from contextlib import asynccontextmanager as acm from datetime import datetime from pathlib import Path import time +from typing import ( + Literal, +) # from bidict import bidict # import tractor @@ -388,15 +391,38 @@ def with_dts( pl.from_epoch(pl.col(time_col)).alias('dt'), ]).with_columns([ pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'), - ]).with_columns( - (pl.col('dt') - pl.col('dt_prev')).alias('dt_diff'), - ) + pl.col('dt').diff().alias('dt_diff'), + ]) #.with_columns( + # pl.col('dt').diff().dt.days().alias('days_dt_diff'), + # ) + + +t_unit: Literal[ + 'days', + 'hours', + 'minutes', + 'seconds', + 'miliseconds', + 'microseconds', + 'nanoseconds', +] def detect_time_gaps( df: pl.DataFrame, - expect_period: float = 60, + time_col: str = 'time', + # epoch sampling step diff + expect_period: float = 60, + + # datetime diff unit and gap value + # crypto mkts + # gap_dt_unit: t_unit = 'minutes', + # gap_thresh: int = 1, + + # legacy stock mkts + gap_dt_unit: t_unit = 'days', + gap_thresh: int = 2, ) -> pl.DataFrame: ''' @@ -406,7 +432,19 @@ def detect_time_gaps( actual missing data segments. ''' - return with_dts(df).filter(pl.col('s_diff') > expect_period) + dt_gap_col: str = f'{gap_dt_unit}_diff' + return with_dts( + df + ).filter( + pl.col('s_diff').abs() > expect_period + ).with_columns( + getattr( + pl.col('dt_diff').dt, + gap_dt_unit, # NOTE: must be valid ``Expr.dt.`` + )().alias(dt_gap_col) + ).filter( + pl.col(dt_gap_col).abs() > gap_thresh + ) def detect_price_gaps(