Add vlm-based "smart" OHLCV de-duping & bar validation
Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter" duplicate bars by attempting to distinguish between erroneous bars partially written during concurrent backfill race conditions vs. **actual** data quality issues from historical providers. Problem: -------- Concurrent writes (live updates vs. backfilling) can result in create duplicate timestamped ohlcv vars with different values. Some potential scenarios include, - a market live feed is cancelled during live update resulting in the "last" datum being partially updated with all the ticks for the time step. - when the feed is rebooted during charting, the backfiller will not finalize this bar since rn it presumes it should only fill data for time steps not already in the tsdb storage. Our current naive `.unique()` approach obvi keeps the incomplete bar and a "smarter" approach is to compare the provider's final vlm amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from the provider likely indicates the cancelled-during-live-write and **not** a datum discrepancy from said data provider. Analysis (with `claude`) of `zecusdt` data revealed: - 1000 duplicate timestamps - 999 identical bars (pure duplicates from 2022 backfill overlap) - 1 volume-monotonic conflict (live partial vs backfill complete) A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()` which: - sorts by vlm **before** deduplication and keep the most complete bar based on vlm monotonicity as well as the following OHLCV validation assumptions: * volume should always increase * high should be non-decreasing, * low should be non-increasing * open should be identical - Separates valid race conditions from provider data quality issues and reports and returns both dfs. Change summary by `claude`: - `.tsp._dedupe_smart`: new module with validation logic - `.tsp.__init__`: expose `dedupe_ohlcv_smart()` - `.storage.cli`: integrate smart dedupe, add logging for: * duplicate counts (identical vs monotonic races) * data quality violations (non-monotonic, invalid OHLC ranges) * warnings for provider data issues - Remove `assert not diff` (duplicates are valid now) Verified on `zecusdt`: correctly keeps index 3143645 (volume=287.777) over 3143644 (volume=140.299) for conflicting 2026-01-16 18:54 UTC bar. `claude`'s Summary of reasoning ------------------------------- - volume monotonicity is critical: a bar's volume only increases during its time window. - a backfilled bar should always have volume >= live updated. - violations indicate any of: * Provider data corruption * Non-OHLCV aggregation semantics * Timestamp misalignment (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
parent
f3530b2f6b
commit
176090b234
|
|
@ -441,11 +441,37 @@ def ldshm(
|
|||
wdts,
|
||||
deduped,
|
||||
diff,
|
||||
) = tsp.dedupe(
|
||||
valid_races,
|
||||
dq_issues,
|
||||
) = tsp.dedupe_ohlcv_smart(
|
||||
shm_df,
|
||||
period=period_s,
|
||||
)
|
||||
|
||||
# Report duplicate analysis
|
||||
if diff > 0:
|
||||
log.info(
|
||||
f'Removed {diff} duplicate timestamp(s)\n'
|
||||
)
|
||||
if valid_races is not None:
|
||||
identical: int = (
|
||||
valid_races
|
||||
.filter(pl.col('identical_bars'))
|
||||
.height
|
||||
)
|
||||
monotonic: int = valid_races.height - identical
|
||||
log.info(
|
||||
f'Valid race conditions: {valid_races.height}\n'
|
||||
f' - Identical bars: {identical}\n'
|
||||
f' - Volume monotonic: {monotonic}\n'
|
||||
)
|
||||
|
||||
if dq_issues is not None:
|
||||
log.warning(
|
||||
f'DATA QUALITY ISSUES from provider: '
|
||||
f'{dq_issues.height} timestamp(s)\n'
|
||||
f'{dq_issues}\n'
|
||||
)
|
||||
|
||||
# detect gaps from in expected (uniform OHLC) sample period
|
||||
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
|
||||
deduped,
|
||||
|
|
@ -460,7 +486,8 @@ def ldshm(
|
|||
|
||||
# TODO: actually pull the exact duration
|
||||
# expected for each venue operational period?
|
||||
gap_dt_unit='days',
|
||||
# gap_dt_unit='day',
|
||||
gap_dt_unit='day',
|
||||
gap_thresh=1,
|
||||
)
|
||||
|
||||
|
|
@ -534,8 +561,13 @@ def ldshm(
|
|||
tf2aids[period_s] = aids
|
||||
|
||||
else:
|
||||
# allow interaction even when no ts problems.
|
||||
assert not diff
|
||||
# No significant gaps to handle, but may have had
|
||||
# duplicates removed (valid race conditions are ok)
|
||||
if diff > 0 and dq_issues is not None:
|
||||
log.warning(
|
||||
'Found duplicates with data quality issues '
|
||||
'but no significant time gaps!\n'
|
||||
)
|
||||
|
||||
await tractor.pause()
|
||||
log.info('Exiting TSP shm anal-izer!')
|
||||
|
|
|
|||
|
|
@ -40,6 +40,9 @@ from ._anal import (
|
|||
# `numpy` only
|
||||
slice_from_time as slice_from_time,
|
||||
)
|
||||
from ._dedupe_smart import (
|
||||
dedupe_ohlcv_smart as dedupe_ohlcv_smart,
|
||||
)
|
||||
from ._history import (
|
||||
iter_dfs_from_shms as iter_dfs_from_shms,
|
||||
manage_history as manage_history,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,206 @@
|
|||
'''
|
||||
Smart OHLCV deduplication with data quality validation.
|
||||
|
||||
Handles concurrent write conflicts by keeping the most complete bar
|
||||
(highest volume) while detecting data quality anomalies.
|
||||
|
||||
'''
|
||||
import polars as pl
|
||||
|
||||
from ._anal import with_dts
|
||||
|
||||
|
||||
def dedupe_ohlcv_smart(
|
||||
src_df: pl.DataFrame,
|
||||
time_col: str = 'time',
|
||||
volume_col: str = 'volume',
|
||||
sort: bool = True,
|
||||
|
||||
) -> tuple[
|
||||
pl.DataFrame, # with dts
|
||||
pl.DataFrame, # deduped (keeping higher volume bars)
|
||||
int, # count of dupes removed
|
||||
pl.DataFrame|None, # valid race conditions
|
||||
pl.DataFrame|None, # data quality violations
|
||||
]:
|
||||
'''
|
||||
Smart OHLCV deduplication keeping most complete bars.
|
||||
|
||||
For duplicate timestamps, keeps bar with highest volume under
|
||||
the assumption that higher volume indicates more complete/final
|
||||
data from backfill vs partial live updates.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Tuple of:
|
||||
- wdts: original dataframe with datetime columns added
|
||||
- deduped: deduplicated frame keeping highest-volume bars
|
||||
- diff: number of duplicate rows removed
|
||||
- valid_races: duplicates meeting expected race condition pattern
|
||||
(volume monotonic, OHLC ranges valid)
|
||||
- data_quality_issues: duplicates violating expected relationships
|
||||
indicating provider data problems
|
||||
|
||||
'''
|
||||
wdts: pl.DataFrame = with_dts(src_df)
|
||||
|
||||
# Find duplicate timestamps
|
||||
dupes: pl.DataFrame = wdts.filter(
|
||||
pl.col(time_col).is_duplicated()
|
||||
)
|
||||
|
||||
if dupes.is_empty():
|
||||
# No duplicates, return as-is
|
||||
return (wdts, wdts, 0, None, None)
|
||||
|
||||
# Analyze duplicate groups for validation
|
||||
dupe_analysis: pl.DataFrame = (
|
||||
dupes
|
||||
.sort([time_col, 'index'])
|
||||
.group_by(time_col, maintain_order=True)
|
||||
.agg([
|
||||
pl.col('index').alias('indices'),
|
||||
pl.col('volume').alias('volumes'),
|
||||
pl.col('high').alias('highs'),
|
||||
pl.col('low').alias('lows'),
|
||||
pl.col('open').alias('opens'),
|
||||
pl.col('close').alias('closes'),
|
||||
pl.col('dt').first().alias('dt'),
|
||||
pl.len().alias('count'),
|
||||
])
|
||||
)
|
||||
|
||||
# Validate OHLCV monotonicity for each duplicate group
|
||||
def check_ohlcv_validity(row) -> dict[str, bool]:
|
||||
'''
|
||||
Check if duplicate bars follow expected race condition pattern.
|
||||
|
||||
For a valid live-update → backfill race:
|
||||
- volume should be monotonically increasing
|
||||
- high should be monotonically non-decreasing
|
||||
- low should be monotonically non-increasing
|
||||
- open should be identical (fixed at bar start)
|
||||
|
||||
Returns dict of violation flags.
|
||||
|
||||
'''
|
||||
vols: list = row['volumes']
|
||||
highs: list = row['highs']
|
||||
lows: list = row['lows']
|
||||
opens: list = row['opens']
|
||||
|
||||
violations: dict[str, bool] = {
|
||||
'volume_non_monotonic': False,
|
||||
'high_decreased': False,
|
||||
'low_increased': False,
|
||||
'open_mismatch': False,
|
||||
'identical_bars': False,
|
||||
}
|
||||
|
||||
# Check if all bars are identical (pure duplicate)
|
||||
if (
|
||||
len(set(vols)) == 1
|
||||
and len(set(highs)) == 1
|
||||
and len(set(lows)) == 1
|
||||
and len(set(opens)) == 1
|
||||
):
|
||||
violations['identical_bars'] = True
|
||||
return violations
|
||||
|
||||
# Check volume monotonicity
|
||||
for i in range(1, len(vols)):
|
||||
if vols[i] < vols[i-1]:
|
||||
violations['volume_non_monotonic'] = True
|
||||
break
|
||||
|
||||
# Check high monotonicity (can only increase or stay same)
|
||||
for i in range(1, len(highs)):
|
||||
if highs[i] < highs[i-1]:
|
||||
violations['high_decreased'] = True
|
||||
break
|
||||
|
||||
# Check low monotonicity (can only decrease or stay same)
|
||||
for i in range(1, len(lows)):
|
||||
if lows[i] > lows[i-1]:
|
||||
violations['low_increased'] = True
|
||||
break
|
||||
|
||||
# Check open consistency (should be fixed)
|
||||
if len(set(opens)) > 1:
|
||||
violations['open_mismatch'] = True
|
||||
|
||||
return violations
|
||||
|
||||
# Apply validation
|
||||
dupe_analysis = dupe_analysis.with_columns([
|
||||
pl.struct(['volumes', 'highs', 'lows', 'opens'])
|
||||
.map_elements(
|
||||
check_ohlcv_validity,
|
||||
return_dtype=pl.Struct([
|
||||
pl.Field('volume_non_monotonic', pl.Boolean),
|
||||
pl.Field('high_decreased', pl.Boolean),
|
||||
pl.Field('low_increased', pl.Boolean),
|
||||
pl.Field('open_mismatch', pl.Boolean),
|
||||
pl.Field('identical_bars', pl.Boolean),
|
||||
])
|
||||
)
|
||||
.alias('validity')
|
||||
])
|
||||
|
||||
# Unnest validity struct
|
||||
dupe_analysis = dupe_analysis.unnest('validity')
|
||||
|
||||
# Separate valid races from data quality issues
|
||||
valid_races: pl.DataFrame|None = (
|
||||
dupe_analysis
|
||||
.filter(
|
||||
# Valid if no violations OR just identical bars
|
||||
~pl.col('volume_non_monotonic')
|
||||
& ~pl.col('high_decreased')
|
||||
& ~pl.col('low_increased')
|
||||
& ~pl.col('open_mismatch')
|
||||
)
|
||||
)
|
||||
if valid_races.is_empty():
|
||||
valid_races = None
|
||||
|
||||
data_quality_issues: pl.DataFrame|None = (
|
||||
dupe_analysis
|
||||
.filter(
|
||||
# Issues if any non-identical violation exists
|
||||
(
|
||||
pl.col('volume_non_monotonic')
|
||||
| pl.col('high_decreased')
|
||||
| pl.col('low_increased')
|
||||
| pl.col('open_mismatch')
|
||||
)
|
||||
& ~pl.col('identical_bars')
|
||||
)
|
||||
)
|
||||
if data_quality_issues.is_empty():
|
||||
data_quality_issues = None
|
||||
|
||||
# Deduplicate: keep highest volume bar for each timestamp
|
||||
deduped: pl.DataFrame = (
|
||||
wdts
|
||||
.sort([time_col, volume_col])
|
||||
.unique(
|
||||
subset=[time_col],
|
||||
keep='last',
|
||||
maintain_order=False,
|
||||
)
|
||||
)
|
||||
|
||||
# Re-sort by time or index
|
||||
if sort:
|
||||
deduped = deduped.sort(by=time_col)
|
||||
|
||||
diff: int = wdts.height - deduped.height
|
||||
|
||||
return (
|
||||
wdts,
|
||||
deduped,
|
||||
diff,
|
||||
valid_races,
|
||||
data_quality_issues,
|
||||
)
|
||||
Loading…
Reference in New Issue