Lul, actually detect gaps for 1s OHLC

Turns out we were always filtering to time gaps longer then a day smh..
Instead tweak `detect_time_gaps()` to only return venue-gaps when
a `gap_dt_unit: str` is passed and pass `'days'` (like it was by default
before) from `dedupe()` though we should really pass in an actual venue
gap duration in the future.
distribute_dis
Tyler Goodlet 2023-12-27 16:55:00 -05:00
parent ad565936ec
commit 0d18cb65c3
2 changed files with 55 additions and 43 deletions

View File

@ -440,8 +440,11 @@ async def start_backfill(
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable:
log.warning( log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history:\n' f'NO-MORE-DATA in range?\n'
f'{timeframe}@{mkt.fqme}' f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
) )
# ugh, what's a better way? # ugh, what's a better way?

View File

@ -510,10 +510,10 @@ def iter_null_segs(
) )
# TODO: move to ._pl_anal
def with_dts( def with_dts(
df: pl.DataFrame, df: pl.DataFrame,
time_col: str = 'time', time_col: str = 'time',
) -> pl.DataFrame: ) -> pl.DataFrame:
''' '''
Insert datetime (casted) columns to a (presumably) OHLC sampled Insert datetime (casted) columns to a (presumably) OHLC sampled
@ -529,9 +529,7 @@ def with_dts(
column=pl.col(f'{time_col}_prev'), column=pl.col(f'{time_col}_prev'),
).alias('dt_prev'), ).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'), pl.col('dt').diff().alias('dt_diff'),
]) #.with_columns( ])
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
# )
t_unit: Literal = Literal[ t_unit: Literal = Literal[
@ -546,25 +544,23 @@ t_unit: Literal = Literal[
def detect_time_gaps( def detect_time_gaps(
df: pl.DataFrame, w_dts: pl.DataFrame,
time_col: str = 'time', time_col: str = 'time',
# epoch sampling step diff # epoch sampling step diff
expect_period: float = 60, expect_period: float = 60,
# datetime diff unit and gap value
# crypto mkts
# gap_dt_unit: t_unit = 'minutes',
# gap_thresh: int = 1,
# NOTE: legacy stock mkts have venue operating hours # NOTE: legacy stock mkts have venue operating hours
# and thus gaps normally no more then 1-2 days at # and thus gaps normally no more then 1-2 days at
# a time. # a time.
gap_thresh: float = 1.,
# TODO: allow passing in a frame of operating hours?
# -[ ] durations/ranges for faster legit gap checks?
# XXX -> must be valid ``polars.Expr.dt.<name>`` # XXX -> must be valid ``polars.Expr.dt.<name>``
# TODO: allow passing in a frame of operating hours # like 'days' which a sane default for venue closures
# durations/ranges for faster legit gap checks. # though will detect weekend gaps which are normal :o
gap_dt_unit: t_unit = 'days', gap_dt_unit: t_unit | None = None,
gap_thresh: int = 1,
) -> pl.DataFrame: ) -> pl.DataFrame:
''' '''
@ -574,19 +570,24 @@ def detect_time_gaps(
actual missing data segments. actual missing data segments.
''' '''
return ( # first select by any sample-period (in seconds unit) step size
with_dts(df) # greater then expected.
# First by a seconds unit step size step_gaps: pl.DataFrame = w_dts.filter(
.filter( pl.col('s_diff').abs() > expect_period
pl.col('s_diff').abs() > expect_period )
)
.filter( if gap_dt_unit is None:
# Second by an arbitrary dt-unit step size return step_gaps
getattr(
pl.col('dt_diff').dt, # NOTE: this flag is to indicate that on this (sampling) time
gap_dt_unit, # scale we expect to only be filtering against larger venue
)().abs() > gap_thresh # closures-scale time gaps.
) return step_gaps.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
gap_dt_unit,
)().abs() > gap_thresh
) )
@ -624,6 +625,8 @@ def dedupe(
src_df: pl.DataFrame, src_df: pl.DataFrame,
sort: bool = True, sort: bool = True,
period: float = 60,
) -> tuple[ ) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps pl.DataFrame, # gaps
@ -637,33 +640,39 @@ def dedupe(
dt-deduplicated frame. dt-deduplicated frame.
''' '''
df: pl.DataFrame = with_dts(src_df) wdts: pl.DataFrame = with_dts(src_df)
src_gaps: pl.DataFrame = detect_time_gaps(
# TODO: enable passing existing `with_dts` df for speedup? wdts,
gaps: pl.DataFrame = detect_time_gaps(df) expect_period=period,
gap_dt_unit=None if period < 60 else 'days',
)
# if no gaps detected just return carbon copies # if no gaps detected just return carbon copies
# and no len diff. # and no len diff.
if gaps.is_empty(): if src_gaps.is_empty():
return ( return (
df, wdts,
gaps, src_gaps,
df, wdts,
0, 0,
) )
# remove duplicated datetime samples/sections # remove duplicated datetime samples/sections
deduped: pl.DataFrame = df.unique( deduped: pl.DataFrame = wdts.unique(
subset=['dt'], subset=['dt'],
maintain_order=True, maintain_order=True,
) )
if sort: if sort:
deduped = deduped.sort(by='time') deduped = deduped.sort(by='time')
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped) deduped_gaps: pl.DataFrame = detect_time_gaps(
deduped,
expect_period=period,
gap_dt_unit=None if period < 60 else 'days',
)
diff: int = ( diff: int = (
df.height wdts.height
- -
deduped.height deduped.height
) )
@ -673,8 +682,8 @@ def dedupe(
f'deduped Gaps found:\n{deduped_gaps}' f'deduped Gaps found:\n{deduped_gaps}'
) )
return ( return (
df, wdts,
gaps, deduped_gaps,
deduped, deduped,
diff, diff,
) )
@ -708,7 +717,7 @@ def sort_diff(
# to go from numpy struct-arrays to polars dataframes and back: # to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819 # https://stackoverflow.com/a/72054819
def np2pl(array: np.ndarray) -> pl.DataFrame: def np2pl(array: np.ndarray) -> pl.DataFrame:
start = time.time() start: float = time.time()
# XXX: thanks to this SO answer for this conversion tip: # XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819 # https://stackoverflow.com/a/72054819