diff --git a/piker/storage/cli.py b/piker/storage/cli.py index bf3f9072..6890192d 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -286,6 +286,11 @@ async def markup_gaps( prev_r: pl.DataFrame = wdts.filter( pl.col('index') == iend - 1 ) + # XXX: probably a gap in the (newly sorted or de-duplicated) + # dt-df, so we might need to re-index first.. + if prev_r.is_empty(): + await tractor.pause() + istart: int = prev_r['index'][0] # dt_start_t: float = dt_prev.timestamp() @@ -403,41 +408,11 @@ def ldshm( period_s: float = float(max(d1, d2, med)) - # over-write back to shm? - wdts: pl.DataFrame # with dts - deduped: pl.DataFrame # deduplicated dts - ( - wdts, - deduped, - diff, - ) = tsp.dedupe( - shm_df, - period=period_s, - ) - null_segs: tuple = tsp.get_null_segs( frame=shm.array, period=period_s, ) - # detect gaps from in expected (uniform OHLC) sample period - step_gaps: pl.DataFrame = tsp.detect_time_gaps( - wdts, - expect_period=period_s, - ) - - # TODO: by default we always want to mark these up - # with rects showing up/down gaps Bo - venue_gaps: pl.DataFrame = tsp.detect_time_gaps( - wdts, - expect_period=period_s, - - # TODO: actually pull the exact duration - # expected for each venue operational period? - gap_dt_unit='days', - gap_thresh=1, - ) - # TODO: call null-seg fixer somehow? if null_segs: await tractor.pause() @@ -457,25 +432,47 @@ def ldshm( # mkt=mkt, # )) + # over-write back to shm? + wdts: pl.DataFrame # with dts + deduped: pl.DataFrame # deduplicated dts + ( + wdts, + deduped, + diff, + ) = tsp.dedupe( + shm_df, + period=period_s, + ) + + # detect gaps from in expected (uniform OHLC) sample period + step_gaps: pl.DataFrame = tsp.detect_time_gaps( + deduped, + expect_period=period_s, + ) + + # TODO: by default we always want to mark these up + # with rects showing up/down gaps Bo + venue_gaps: pl.DataFrame = tsp.detect_time_gaps( + deduped, + expect_period=period_s, + + # TODO: actually pull the exact duration + # expected for each venue operational period? + gap_dt_unit='days', + gap_thresh=1, + ) + # TODO: find the disjoint set of step gaps from # venue (closure) set! # -[ ] do a set diff by checking for the unique # gap set only in the step_gaps? if ( not venue_gaps.is_empty() - # and not step_gaps.is_empty() + or ( + period_s < 60 + and not step_gaps.is_empty() + ) ): - do_markup_gaps: bool = True - if do_markup_gaps: - aids: dict = await markup_gaps( - fqme, - period_s, - actl, - wdts, - step_gaps, - ) - assert aids - # write repaired ts to parquet-file? if write_parquet: start: float = time.time() @@ -512,8 +509,6 @@ def ldshm( shm._array.setflags( write=int(1), ) - # last chance manual overwrites in REPL - await tractor.pause() shm.push( new, prepend=True, @@ -521,11 +516,25 @@ def ldshm( update_first=False, # don't update ._first ) + do_markup_gaps: bool = True + if do_markup_gaps: + new_df: pl.DataFrame = tsp.np2pl(new) + aids: dict = await markup_gaps( + fqme, + period_s, + actl, + new_df, + step_gaps, + ) + + # last chance manual overwrites in REPL + await tractor.pause() + assert aids else: # allow interaction even when no ts problems. await tractor.pause() - assert not diff + # assert not diff if shm_df is None: