From 0d18cb65c37ce125fc45b51fcb4cfc8dbb3ec4d9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Dec 2023 16:55:00 -0500 Subject: [PATCH] Lul, actually detect gaps for 1s OHLC Turns out we were always filtering to time gaps longer then a day smh.. Instead tweak `detect_time_gaps()` to only return venue-gaps when a `gap_dt_unit: str` is passed and pass `'days'` (like it was by default before) from `dedupe()` though we should really pass in an actual venue gap duration in the future. --- piker/tsp/__init__.py | 7 +++- piker/tsp/_anal.py | 91 ++++++++++++++++++++++++------------------- 2 files changed, 55 insertions(+), 43 deletions(-) diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 417b5f30..d79fbd0b 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -440,8 +440,11 @@ async def start_backfill( # broker says there never was or is no more history to pull except DataUnavailable: log.warning( - f'NO-MORE-DATA: backend {mod.name} halted history:\n' - f'{timeframe}@{mkt.fqme}' + f'NO-MORE-DATA in range?\n' + f'`{mod.name}` halted history:\n' + f'tf@fqme: {timeframe}@{mkt.fqme}\n' + 'bf_until <- last_start_dt:\n' + f'{backfill_until_dt} <- {last_start_dt}\n' ) # ugh, what's a better way? diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index d99607c7..7ffdef2a 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -510,10 +510,10 @@ def iter_null_segs( ) -# TODO: move to ._pl_anal def with_dts( df: pl.DataFrame, time_col: str = 'time', + ) -> pl.DataFrame: ''' Insert datetime (casted) columns to a (presumably) OHLC sampled @@ -529,9 +529,7 @@ def with_dts( column=pl.col(f'{time_col}_prev'), ).alias('dt_prev'), pl.col('dt').diff().alias('dt_diff'), - ]) #.with_columns( - # pl.col('dt').diff().dt.days().alias('days_dt_diff'), - # ) + ]) t_unit: Literal = Literal[ @@ -546,25 +544,23 @@ t_unit: Literal = Literal[ def detect_time_gaps( - df: pl.DataFrame, + w_dts: pl.DataFrame, time_col: str = 'time', # epoch sampling step diff expect_period: float = 60, - # datetime diff unit and gap value - # crypto mkts - # gap_dt_unit: t_unit = 'minutes', - # gap_thresh: int = 1, - # NOTE: legacy stock mkts have venue operating hours # and thus gaps normally no more then 1-2 days at # a time. + gap_thresh: float = 1., + + # TODO: allow passing in a frame of operating hours? + # -[ ] durations/ranges for faster legit gap checks? # XXX -> must be valid ``polars.Expr.dt.`` - # TODO: allow passing in a frame of operating hours - # durations/ranges for faster legit gap checks. - gap_dt_unit: t_unit = 'days', - gap_thresh: int = 1, + # like 'days' which a sane default for venue closures + # though will detect weekend gaps which are normal :o + gap_dt_unit: t_unit | None = None, ) -> pl.DataFrame: ''' @@ -574,19 +570,24 @@ def detect_time_gaps( actual missing data segments. ''' - return ( - with_dts(df) - # First by a seconds unit step size - .filter( - pl.col('s_diff').abs() > expect_period - ) - .filter( - # Second by an arbitrary dt-unit step size - getattr( - pl.col('dt_diff').dt, - gap_dt_unit, - )().abs() > gap_thresh - ) + # first select by any sample-period (in seconds unit) step size + # greater then expected. + step_gaps: pl.DataFrame = w_dts.filter( + pl.col('s_diff').abs() > expect_period + ) + + if gap_dt_unit is None: + return step_gaps + + # NOTE: this flag is to indicate that on this (sampling) time + # scale we expect to only be filtering against larger venue + # closures-scale time gaps. + return step_gaps.filter( + # Second by an arbitrary dt-unit step size + getattr( + pl.col('dt_diff').dt, + gap_dt_unit, + )().abs() > gap_thresh ) @@ -624,6 +625,8 @@ def dedupe( src_df: pl.DataFrame, sort: bool = True, + period: float = 60, + ) -> tuple[ pl.DataFrame, # with dts pl.DataFrame, # gaps @@ -637,33 +640,39 @@ def dedupe( dt-deduplicated frame. ''' - df: pl.DataFrame = with_dts(src_df) - - # TODO: enable passing existing `with_dts` df for speedup? - gaps: pl.DataFrame = detect_time_gaps(df) + wdts: pl.DataFrame = with_dts(src_df) + src_gaps: pl.DataFrame = detect_time_gaps( + wdts, + expect_period=period, + gap_dt_unit=None if period < 60 else 'days', + ) # if no gaps detected just return carbon copies # and no len diff. - if gaps.is_empty(): + if src_gaps.is_empty(): return ( - df, - gaps, - df, + wdts, + src_gaps, + wdts, 0, ) # remove duplicated datetime samples/sections - deduped: pl.DataFrame = df.unique( + deduped: pl.DataFrame = wdts.unique( subset=['dt'], maintain_order=True, ) if sort: deduped = deduped.sort(by='time') - deduped_gaps: pl.DataFrame = detect_time_gaps(deduped) + deduped_gaps: pl.DataFrame = detect_time_gaps( + deduped, + expect_period=period, + gap_dt_unit=None if period < 60 else 'days', + ) diff: int = ( - df.height + wdts.height - deduped.height ) @@ -673,8 +682,8 @@ def dedupe( f'deduped Gaps found:\n{deduped_gaps}' ) return ( - df, - gaps, + wdts, + deduped_gaps, deduped, diff, ) @@ -708,7 +717,7 @@ def sort_diff( # to go from numpy struct-arrays to polars dataframes and back: # https://stackoverflow.com/a/72054819 def np2pl(array: np.ndarray) -> pl.DataFrame: - start = time.time() + start: float = time.time() # XXX: thanks to this SO answer for this conversion tip: # https://stackoverflow.com/a/72054819