Tolerate various "bad data" cases in `markup_gaps()`
Namely such that when the previous-df-row by our shm-abs-'index' doesn't
exist we ignore certain cases which are likely due to borked-but-benign
samples written to the tsdb or rt shm buffers prior.
Particularly we now ignore,
- any `dt`/`prev_dt` values which are UNIX-epoch timestamped (val of 0).
- any row-is-first-row in the df; there is no previous.
- any missing previous datum by 'index', in which case we lookup the
`wdts` prior row and use that instead.
* this would indicate a missing sample for the time-step but we can
still detect a "gap" by looking at the prior row, by df-abs-index
`i`, and use its timestamp to determine the period/size of missing
samples (which need to likely still be retrieved).
* in this case i'm leaving in a pause-point for introspecting these
rarer cases when `--pdb` is passed via CLI.
Relatedly in the `piker store` CLI ep,
- add `--pdb` flag to `piker store`, pass it verbatim as `debug_mode`.
- when `times` has only a single row, don't calc a `period_s` median.
- only trace `null_segs` when in debug mode.
- always markup/dedupe gaps for `period_s==60`
parent
56b69f97b3
commit
9ebb977731
|
|
@ -242,6 +242,7 @@ def anal(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO, move to `.tsp._annotate`
|
||||||
async def markup_gaps(
|
async def markup_gaps(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
|
|
@ -288,18 +289,38 @@ async def markup_gaps(
|
||||||
)
|
)
|
||||||
# XXX: probably a gap in the (newly sorted or de-duplicated)
|
# XXX: probably a gap in the (newly sorted or de-duplicated)
|
||||||
# dt-df, so we might need to re-index first..
|
# dt-df, so we might need to re-index first..
|
||||||
|
dt: pl.Series = row['dt']
|
||||||
|
dt_prev: pl.Series = row['dt_prev']
|
||||||
if prev_r.is_empty():
|
if prev_r.is_empty():
|
||||||
|
|
||||||
|
# XXX, filter out any special ignore cases,
|
||||||
|
# - UNIX-epoch stamped datums
|
||||||
|
# - first row
|
||||||
|
if (
|
||||||
|
dt_prev.dt.epoch()[0] == 0
|
||||||
|
or
|
||||||
|
dt.dt.epoch()[0] == 0
|
||||||
|
):
|
||||||
|
log.warning('Skipping row with UNIX epoch timestamp ??')
|
||||||
|
continue
|
||||||
|
|
||||||
|
if wdts[0]['index'][0] == iend: # first row
|
||||||
|
log.warning('Skipping first-row (has no previous obvi) !!')
|
||||||
|
continue
|
||||||
|
|
||||||
|
# XXX, if the previous-row by shm-index is missing,
|
||||||
|
# meaning there is a missing sample (set), get the prior
|
||||||
|
# row by df index and attempt to use it?
|
||||||
|
i_wdts: pl.DataFrame = wdts.with_row_index(name='i')
|
||||||
|
i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0]
|
||||||
|
prev_row_by_i = wdts[i_row]
|
||||||
|
prev_r: pl.DataFrame = prev_row_by_i
|
||||||
|
|
||||||
|
# debug any missing pre-row
|
||||||
|
if tractor._state.is_debug_mode():
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
istart: int = prev_r['index'][0]
|
istart: int = prev_r['index'][0]
|
||||||
# dt_start_t: float = dt_prev.timestamp()
|
|
||||||
|
|
||||||
# start_t: float = prev_r['time']
|
|
||||||
# assert (
|
|
||||||
# dt_start_t
|
|
||||||
# ==
|
|
||||||
# start_t
|
|
||||||
# )
|
|
||||||
|
|
||||||
# TODO: implement px-col width measure
|
# TODO: implement px-col width measure
|
||||||
# and ensure at least as many px-cols
|
# and ensure at least as many px-cols
|
||||||
|
|
@ -358,6 +379,7 @@ def ldshm(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
write_parquet: bool = True,
|
write_parquet: bool = True,
|
||||||
reload_parquet_to_shm: bool = True,
|
reload_parquet_to_shm: bool = True,
|
||||||
|
pdb: bool = False, # --pdb passed?
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
|
@ -377,7 +399,7 @@ def ldshm(
|
||||||
open_piker_runtime(
|
open_piker_runtime(
|
||||||
'polars_boi',
|
'polars_boi',
|
||||||
enable_modules=['piker.data._sharedmem'],
|
enable_modules=['piker.data._sharedmem'],
|
||||||
debug_mode=True,
|
debug_mode=pdb,
|
||||||
),
|
),
|
||||||
open_storage_client() as (
|
open_storage_client() as (
|
||||||
mod,
|
mod,
|
||||||
|
|
@ -397,6 +419,9 @@ def ldshm(
|
||||||
|
|
||||||
times: np.ndarray = shm.array['time']
|
times: np.ndarray = shm.array['time']
|
||||||
d1: float = float(times[-1] - times[-2])
|
d1: float = float(times[-1] - times[-2])
|
||||||
|
d2: float = 0
|
||||||
|
# XXX, take a median sample rate if sufficient data
|
||||||
|
if times.size > 2:
|
||||||
d2: float = float(times[-2] - times[-3])
|
d2: float = float(times[-2] - times[-3])
|
||||||
med: float = np.median(np.diff(times))
|
med: float = np.median(np.diff(times))
|
||||||
if (
|
if (
|
||||||
|
|
@ -407,7 +432,6 @@ def ldshm(
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'Something is wrong with time period for {shm}:\n{times}'
|
f'Something is wrong with time period for {shm}:\n{times}'
|
||||||
)
|
)
|
||||||
|
|
||||||
period_s: float = float(max(d1, d2, med))
|
period_s: float = float(max(d1, d2, med))
|
||||||
|
|
||||||
null_segs: tuple = tsp.get_null_segs(
|
null_segs: tuple = tsp.get_null_segs(
|
||||||
|
|
@ -417,6 +441,8 @@ def ldshm(
|
||||||
|
|
||||||
# TODO: call null-seg fixer somehow?
|
# TODO: call null-seg fixer somehow?
|
||||||
if null_segs:
|
if null_segs:
|
||||||
|
|
||||||
|
if tractor._state.is_debug_mode():
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
# async with (
|
# async with (
|
||||||
# trio.open_nursery() as tn,
|
# trio.open_nursery() as tn,
|
||||||
|
|
@ -498,8 +524,11 @@ def ldshm(
|
||||||
if (
|
if (
|
||||||
not venue_gaps.is_empty()
|
not venue_gaps.is_empty()
|
||||||
or (
|
or (
|
||||||
period_s < 60
|
not step_gaps.is_empty()
|
||||||
and not step_gaps.is_empty()
|
# XXX, i presume i put this bc i was guarding
|
||||||
|
# for ib venue gaps?
|
||||||
|
# and
|
||||||
|
# period_s < 60
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
# write repaired ts to parquet-file?
|
# write repaired ts to parquet-file?
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue