Always reload shm data before annotating gaps, so they line up..

distribute_dis
Tyler Goodlet 2024-01-09 15:55:16 -05:00
parent 6959429af8
commit 52b349fe79
1 changed files with 54 additions and 45 deletions

View File

@ -286,6 +286,11 @@ async def markup_gaps(
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
if prev_r.is_empty():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
@ -403,41 +408,11 @@ def ldshm(
period_s: float = float(max(d1, d2, med))
# over-write back to shm?
wdts: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
wdts,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period_s,
)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
wdts,
expect_period=period_s,
)
# TODO: by default we always want to mark these up
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
wdts,
expect_period=period_s,
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
gap_thresh=1,
)
# TODO: call null-seg fixer somehow?
if null_segs:
await tractor.pause()
@ -457,25 +432,47 @@ def ldshm(
# mkt=mkt,
# ))
# over-write back to shm?
wdts: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
wdts,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period_s,
)
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
)
# TODO: by default we always want to mark these up
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
gap_thresh=1,
)
# TODO: find the disjoint set of step gaps from
# venue (closure) set!
# -[ ] do a set diff by checking for the unique
# gap set only in the step_gaps?
if (
not venue_gaps.is_empty()
# and not step_gaps.is_empty()
):
do_markup_gaps: bool = True
if do_markup_gaps:
aids: dict = await markup_gaps(
fqme,
period_s,
actl,
wdts,
step_gaps,
or (
period_s < 60
and not step_gaps.is_empty()
)
assert aids
):
# write repaired ts to parquet-file?
if write_parquet:
start: float = time.time()
@ -512,8 +509,6 @@ def ldshm(
shm._array.setflags(
write=int(1),
)
# last chance manual overwrites in REPL
await tractor.pause()
shm.push(
new,
prepend=True,
@ -521,11 +516,25 @@ def ldshm(
update_first=False, # don't update ._first
)
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await markup_gaps(
fqme,
period_s,
actl,
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
await tractor.pause()
assert aids
else:
# allow interaction even when no ts problems.
await tractor.pause()
assert not diff
# assert not diff
if shm_df is None: