diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 1c8ff11b..5c087898 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -441,11 +441,37 @@ def ldshm( wdts, deduped, diff, - ) = tsp.dedupe( + valid_races, + dq_issues, + ) = tsp.dedupe_ohlcv_smart( shm_df, - period=period_s, ) + # Report duplicate analysis + if diff > 0: + log.info( + f'Removed {diff} duplicate timestamp(s)\n' + ) + if valid_races is not None: + identical: int = ( + valid_races + .filter(pl.col('identical_bars')) + .height + ) + monotonic: int = valid_races.height - identical + log.info( + f'Valid race conditions: {valid_races.height}\n' + f' - Identical bars: {identical}\n' + f' - Volume monotonic: {monotonic}\n' + ) + + if dq_issues is not None: + log.warning( + f'DATA QUALITY ISSUES from provider: ' + f'{dq_issues.height} timestamp(s)\n' + f'{dq_issues}\n' + ) + # detect gaps from in expected (uniform OHLC) sample period step_gaps: pl.DataFrame = tsp.detect_time_gaps( deduped, @@ -460,7 +486,8 @@ def ldshm( # TODO: actually pull the exact duration # expected for each venue operational period? - gap_dt_unit='days', + # gap_dt_unit='day', + gap_dt_unit='day', gap_thresh=1, ) @@ -534,8 +561,13 @@ def ldshm( tf2aids[period_s] = aids else: - # allow interaction even when no ts problems. - assert not diff + # No significant gaps to handle, but may have had + # duplicates removed (valid race conditions are ok) + if diff > 0 and dq_issues is not None: + log.warning( + 'Found duplicates with data quality issues ' + 'but no significant time gaps!\n' + ) await tractor.pause() log.info('Exiting TSP shm anal-izer!') diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 1df0a554..81274ed8 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -40,6 +40,9 @@ from ._anal import ( # `numpy` only slice_from_time as slice_from_time, ) +from ._dedupe_smart import ( + dedupe_ohlcv_smart as dedupe_ohlcv_smart, +) from ._history import ( iter_dfs_from_shms as iter_dfs_from_shms, manage_history as manage_history, diff --git a/piker/tsp/_dedupe_smart.py b/piker/tsp/_dedupe_smart.py new file mode 100644 index 00000000..8c0ac55a --- /dev/null +++ b/piker/tsp/_dedupe_smart.py @@ -0,0 +1,206 @@ +''' +Smart OHLCV deduplication with data quality validation. + +Handles concurrent write conflicts by keeping the most complete bar +(highest volume) while detecting data quality anomalies. + +''' +import polars as pl + +from ._anal import with_dts + + +def dedupe_ohlcv_smart( + src_df: pl.DataFrame, + time_col: str = 'time', + volume_col: str = 'volume', + sort: bool = True, + +) -> tuple[ + pl.DataFrame, # with dts + pl.DataFrame, # deduped (keeping higher volume bars) + int, # count of dupes removed + pl.DataFrame|None, # valid race conditions + pl.DataFrame|None, # data quality violations +]: + ''' + Smart OHLCV deduplication keeping most complete bars. + + For duplicate timestamps, keeps bar with highest volume under + the assumption that higher volume indicates more complete/final + data from backfill vs partial live updates. + + Returns + ------- + Tuple of: + - wdts: original dataframe with datetime columns added + - deduped: deduplicated frame keeping highest-volume bars + - diff: number of duplicate rows removed + - valid_races: duplicates meeting expected race condition pattern + (volume monotonic, OHLC ranges valid) + - data_quality_issues: duplicates violating expected relationships + indicating provider data problems + + ''' + wdts: pl.DataFrame = with_dts(src_df) + + # Find duplicate timestamps + dupes: pl.DataFrame = wdts.filter( + pl.col(time_col).is_duplicated() + ) + + if dupes.is_empty(): + # No duplicates, return as-is + return (wdts, wdts, 0, None, None) + + # Analyze duplicate groups for validation + dupe_analysis: pl.DataFrame = ( + dupes + .sort([time_col, 'index']) + .group_by(time_col, maintain_order=True) + .agg([ + pl.col('index').alias('indices'), + pl.col('volume').alias('volumes'), + pl.col('high').alias('highs'), + pl.col('low').alias('lows'), + pl.col('open').alias('opens'), + pl.col('close').alias('closes'), + pl.col('dt').first().alias('dt'), + pl.len().alias('count'), + ]) + ) + + # Validate OHLCV monotonicity for each duplicate group + def check_ohlcv_validity(row) -> dict[str, bool]: + ''' + Check if duplicate bars follow expected race condition pattern. + + For a valid live-update → backfill race: + - volume should be monotonically increasing + - high should be monotonically non-decreasing + - low should be monotonically non-increasing + - open should be identical (fixed at bar start) + + Returns dict of violation flags. + + ''' + vols: list = row['volumes'] + highs: list = row['highs'] + lows: list = row['lows'] + opens: list = row['opens'] + + violations: dict[str, bool] = { + 'volume_non_monotonic': False, + 'high_decreased': False, + 'low_increased': False, + 'open_mismatch': False, + 'identical_bars': False, + } + + # Check if all bars are identical (pure duplicate) + if ( + len(set(vols)) == 1 + and len(set(highs)) == 1 + and len(set(lows)) == 1 + and len(set(opens)) == 1 + ): + violations['identical_bars'] = True + return violations + + # Check volume monotonicity + for i in range(1, len(vols)): + if vols[i] < vols[i-1]: + violations['volume_non_monotonic'] = True + break + + # Check high monotonicity (can only increase or stay same) + for i in range(1, len(highs)): + if highs[i] < highs[i-1]: + violations['high_decreased'] = True + break + + # Check low monotonicity (can only decrease or stay same) + for i in range(1, len(lows)): + if lows[i] > lows[i-1]: + violations['low_increased'] = True + break + + # Check open consistency (should be fixed) + if len(set(opens)) > 1: + violations['open_mismatch'] = True + + return violations + + # Apply validation + dupe_analysis = dupe_analysis.with_columns([ + pl.struct(['volumes', 'highs', 'lows', 'opens']) + .map_elements( + check_ohlcv_validity, + return_dtype=pl.Struct([ + pl.Field('volume_non_monotonic', pl.Boolean), + pl.Field('high_decreased', pl.Boolean), + pl.Field('low_increased', pl.Boolean), + pl.Field('open_mismatch', pl.Boolean), + pl.Field('identical_bars', pl.Boolean), + ]) + ) + .alias('validity') + ]) + + # Unnest validity struct + dupe_analysis = dupe_analysis.unnest('validity') + + # Separate valid races from data quality issues + valid_races: pl.DataFrame|None = ( + dupe_analysis + .filter( + # Valid if no violations OR just identical bars + ~pl.col('volume_non_monotonic') + & ~pl.col('high_decreased') + & ~pl.col('low_increased') + & ~pl.col('open_mismatch') + ) + ) + if valid_races.is_empty(): + valid_races = None + + data_quality_issues: pl.DataFrame|None = ( + dupe_analysis + .filter( + # Issues if any non-identical violation exists + ( + pl.col('volume_non_monotonic') + | pl.col('high_decreased') + | pl.col('low_increased') + | pl.col('open_mismatch') + ) + & ~pl.col('identical_bars') + ) + ) + if data_quality_issues.is_empty(): + data_quality_issues = None + + # Deduplicate: keep highest volume bar for each timestamp + deduped: pl.DataFrame = ( + wdts + .sort([time_col, volume_col]) + .unique( + subset=[time_col], + keep='last', + maintain_order=False, + ) + ) + + # Re-sort by time or index + if sort: + deduped = deduped.sort(by=time_col) + + diff: int = wdts.height - deduped.height + + return ( + wdts, + deduped, + diff, + valid_races, + data_quality_issues, + )