Generalize time-gap detector to accept unit and threshold
parent
0dcfcea6ee
commit
2dbcecdac7
piker/storage
|
@ -54,6 +54,9 @@ from contextlib import asynccontextmanager as acm
|
|||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import time
|
||||
from typing import (
|
||||
Literal,
|
||||
)
|
||||
|
||||
# from bidict import bidict
|
||||
# import tractor
|
||||
|
@ -388,15 +391,38 @@ def with_dts(
|
|||
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
||||
]).with_columns([
|
||||
pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'),
|
||||
]).with_columns(
|
||||
(pl.col('dt') - pl.col('dt_prev')).alias('dt_diff'),
|
||||
)
|
||||
pl.col('dt').diff().alias('dt_diff'),
|
||||
]) #.with_columns(
|
||||
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
|
||||
# )
|
||||
|
||||
|
||||
t_unit: Literal[
|
||||
'days',
|
||||
'hours',
|
||||
'minutes',
|
||||
'seconds',
|
||||
'miliseconds',
|
||||
'microseconds',
|
||||
'nanoseconds',
|
||||
]
|
||||
|
||||
|
||||
def detect_time_gaps(
|
||||
df: pl.DataFrame,
|
||||
expect_period: float = 60,
|
||||
|
||||
time_col: str = 'time',
|
||||
# epoch sampling step diff
|
||||
expect_period: float = 60,
|
||||
|
||||
# datetime diff unit and gap value
|
||||
# crypto mkts
|
||||
# gap_dt_unit: t_unit = 'minutes',
|
||||
# gap_thresh: int = 1,
|
||||
|
||||
# legacy stock mkts
|
||||
gap_dt_unit: t_unit = 'days',
|
||||
gap_thresh: int = 2,
|
||||
|
||||
) -> pl.DataFrame:
|
||||
'''
|
||||
|
@ -406,7 +432,19 @@ def detect_time_gaps(
|
|||
actual missing data segments.
|
||||
|
||||
'''
|
||||
return with_dts(df).filter(pl.col('s_diff') > expect_period)
|
||||
dt_gap_col: str = f'{gap_dt_unit}_diff'
|
||||
return with_dts(
|
||||
df
|
||||
).filter(
|
||||
pl.col('s_diff').abs() > expect_period
|
||||
).with_columns(
|
||||
getattr(
|
||||
pl.col('dt_diff').dt,
|
||||
gap_dt_unit, # NOTE: must be valid ``Expr.dt.<name>``
|
||||
)().alias(dt_gap_col)
|
||||
).filter(
|
||||
pl.col(dt_gap_col).abs() > gap_thresh
|
||||
)
|
||||
|
||||
|
||||
def detect_price_gaps(
|
||||
|
|
Loading…
Reference in New Issue