Bleh, fix another off-by-one issue in `np.argwhere()`

Apparently it returns the index of the prior zero-row (prolly since we
do the backward difference) so ensure `fi_zgaps += 1`..

Also fix remaining edge case handling when there's only 2 zero-segs
which was borked after a refactor to the special case blocks (like
a single zero row) prior to the `absi_zsegs` building loop AND make sure
to always return abs indices OUTSIDE the zero seg, i.e. the indices of
the non-zero row just before and just after so that the history
backfiller can use non-zero timestamps to generate range datetimes for
backend frame queries.

Add much more detailed doc-comments with a small ascii diagram to
explain how all these somewhat subtle vec ops work. Also toss in some
sanity checks on the output indices to ensure they don't point to
zero (time) valued rows when used to read the frame.
distribute_dis
Tyler Goodlet 2023-12-15 12:48:50 -05:00
parent 83bdca46a2
commit a4084d6a0b
1 changed files with 154 additions and 83 deletions

View File

@ -31,7 +31,8 @@ from math import (
import time
from typing import (
Literal,
AsyncGenerator,
# AsyncGenerator,
Generator,
)
import numpy as np
@ -252,11 +253,6 @@ def get_null_segs(
zeroed for the `col` as `zero_t`
'''
# TODO: remove this?
# NOTE: ensure we read buffer state only once so that ShmArray rt
# circular-buffer updates don't cause a indexing/size mismatch.
# frame: np.ndarray = shm.array
times: Seq = frame['time']
zero_pred: Seq = (times == 0)
@ -268,24 +264,45 @@ def get_null_segs(
if not tis_zeros:
return None
# TODO: use ndarray for this?!
absi_zsegs: list[list[int, int]] = []
if isinstance(frame, np.ndarray):
# ifirst: int = frame[0]['index']
# view of ONLY the zero segments as one continuous chunk
zero_t: np.ndarray = frame[zero_pred]
# abs indices of said zeroed rows
absi_zeros = zero_t['index']
# relative frame-indexes of zeros
# fizeros = np.ndarray = zero_t['index'] - ifirst
# diff of abs index steps between each zeroed row
absi_zdiff: np.ndarray = np.diff(absi_zeros)
# scan for all frame-indices where the
# zeroed-row-abs-index-step-diff is greater then the
# expected increment of 1.
# data 1st zero seg data zeros
# ---- ------------ ---- ----- ------ ----
# ||||..000000000000..||||..00000..||||||..0000
# ---- ------------ ---- ----- ------ ----
# ^zero_t[0] ^zero_t[-1]
# ^fi_zgaps[0] ^fi_zgaps[1]
# ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple
# absi_zsegs[0][1]^
#
# NOTE: the first entry in `fi_zgaps` is where
# the first (absolute) index step diff is > 1.
# and it is a frame-relative index into `zero_t`.
fi_zgaps = np.argwhere(
absi_zdiff > 1
# OR null / inf?
# OR is 0? for first zero-row entry?
)
# NOTE: +1 here is ensure we index to the "start" of each
# segment (if we didn't the below loop needs to be
# re-written to expect `fi_end_rows`!
) + 1
# the rows from the contiguous zeroed segments which have
# abs-index steps >1 compared to the previous zero row
# (indicating an end of zeroed segment).
fi_zseg_start_rows = zero_t[fi_zgaps]
else: # pl.DataFrame case
# TODO: equiv for pl.DataFrame case!
else:
izeros: pl.Series = zero_pred.arg_true()
zero_t: pl.DataFrame = frame[izeros]
@ -293,75 +310,126 @@ def get_null_segs(
absi_zdiff: pl.Series = absi_zeros.diff()
fi_zgaps = (absi_zdiff > 1).arg_true()
# select out slice index pairs for each null-segment
# portion detected throughout entire input frame.
# import pdbp; pdbp.set_trace()
# XXX: our goal (in this func) is to select out slice index
# pairs (zseg0_start, zseg_end) in abs index units for each
# null-segment portion detected throughout entire input frame.
# only one null-segment in entire frame?
if not fi_zgaps.size:
# check for number null rows
# TODO: use ndarray for this!
# only up to one null-segment in entire frame?
num_gaps: int = fi_zgaps.size + 1
if num_gaps < 1:
if absi_zeros.size > 1:
absi_zsegs = [[
absi_zeros[0], # - 1, # - ifirst,
# TODO: need the + 1 or no?
absi_zeros[-1] + 1, # - ifirst,
]]
else:
absi_zsegs = [[
# absi_zeros[0] + 1,
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
absi_zeros[0] - 1,
# NOTE: need the + 1 to guarantee we index "up to"
# the next non-null row-datum.
absi_zeros[-1] + 1,
]]
else:
# XXX EDGE CASE: only one null-datum found so
# mark the start abs index as None to trigger
# a full frame-len query to the respective backend?
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
None,
absi_zeros[0] + 1,
]]
# XXX NOTE XXX: if >= 2 zeroed segments are found, there should
# ALWAYS be more then one zero-segment-abs-index-step-diff row
# in `absi_zdiff`, so loop through all such
# abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`)
# and add them as the "end index" entries for each segment.
# Then, iif NOT iterating the first such segment end, look back
# for the prior segments zero-segment start indext by relative
# indexing the `zero_t` frame by -1 and grabbing the abs index
# of what should be the prior zero-segment abs start index.
else:
# NOTE: since `absi_zdiff` will never have a row
# corresponding to the first zero-segment's row, we add it
# manually here.
absi_zsegs.append([
absi_zeros[0] - 1, # - ifirst,
absi_zeros[0] - 1,
None,
])
# TODO: can we do it with vec ops?
for i, (
fi,
zseg_start_row,
fi, # frame index of zero-seg start
zseg_start_row, # full row for ^
) in enumerate(zip(
fi_zgaps,
fi_zseg_start_rows,
)):
assert (zseg_start_row == zero_t[fi]).all()
absi: int = zseg_start_row['index'][0]
iabs: int = zseg_start_row['index'][0]
absi_zsegs.append([
iabs - 1,
None, # backfilled on next iter
])
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
# absi_pre_zseg = absi - 1
if i > 0:
# final iter case, backfill FINAL end iabs!
if (i + 1) == fi_zgaps.size:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
# NOTE: only after the first segment (due to `.diff()`
# usage above) can we do a lookback to the prior
# segment's end row and determine it's abs index to
# retroactively insert to the prior
# `absi_zsegs[i-1][1]` entry Bo
last_end: int = absi_zsegs[i][1]
if last_end is None:
prev_zseg_row = zero_t[fi - 1]
absi_post_zseg = prev_zseg_row['index'][0] + 1
absi_zsegs[i - 1][1] = absi_post_zseg
# XXX: MUST BACKFILL previous end iabs!
absi_zsegs[i][1] = absi_post_zseg
if (i + 1) < fi_zgaps.size:
absi_zsegs.append([
absi,
None,
])
else:
if 0 < num_gaps < 2:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
iabs_first: int = frame['index'][0]
for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first]
if (
ts_start == 0
or
ts_end == 0
):
import pdbp
pdbp.set_trace()
assert end
assert start < end
log.warning(
f'Frame has {len(absi_zsegs)} NULL GAPS!?\n'
f'period: {period}\n'
f'total null samples: {len(zero_t)}\n'
)
return (
absi_zsegs, # start indices of null
absi_zeros,
zero_t,
absi_zsegs, # [start, end] abs slice indices of seg
absi_zeros, # all abs indices within all null-segs
zero_t, # sliced-view of all null-segment rows-datums
)
async def iter_null_segs(
frame: Frame,
def iter_null_segs(
timeframe: float,
) -> AsyncGenerator[
frame: Frame | None = None,
null_segs: tuple | None = None,
) -> Generator[
tuple[
int, int,
int, int,
@ -377,48 +445,51 @@ async def iter_null_segs(
],
None,
]:
if null_segs := get_null_segs(
frame,
period=timeframe,
):
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
if null_segs is None:
null_segs: tuple = get_null_segs(
frame,
period=timeframe,
)
absi_first: int = frame[0]['index']
for (
absi_start,
absi_end,
) in absi_pairs_zsegs:
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
fi_end: int = absi_end - absi_first
end_row: Seq = frame[fi_end]
end_t: float = end_row['time']
end_dt: DateTime = from_timestamp(end_t)
absi_first: int = frame[0]['index']
for (
absi_start,
absi_end,
) in absi_pairs_zsegs:
if absi_start is not None:
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)
fi_end: int = absi_end - absi_first
end_row: Seq = frame[fi_end]
end_t: float = end_row['time']
end_dt: DateTime = from_timestamp(end_t)
else:
fi_start = None
start_row = None
start_t = None
start_dt = None
fi_start = None
start_row = None
start_t = None
start_dt = None
if (
absi_start is not None
and start_t != 0
):
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
)
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
)
def with_dts(