From a4084d6a0bf488fe8f42ed826136b38d2a560545 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Dec 2023 12:48:50 -0500 Subject: [PATCH] Bleh, fix another off-by-one issue in `np.argwhere()` Apparently it returns the index of the prior zero-row (prolly since we do the backward difference) so ensure `fi_zgaps += 1`.. Also fix remaining edge case handling when there's only 2 zero-segs which was borked after a refactor to the special case blocks (like a single zero row) prior to the `absi_zsegs` building loop AND make sure to always return abs indices OUTSIDE the zero seg, i.e. the indices of the non-zero row just before and just after so that the history backfiller can use non-zero timestamps to generate range datetimes for backend frame queries. Add much more detailed doc-comments with a small ascii diagram to explain how all these somewhat subtle vec ops work. Also toss in some sanity checks on the output indices to ensure they don't point to zero (time) valued rows when used to read the frame. --- piker/data/tsp.py | 237 ++++++++++++++++++++++++++++++---------------- 1 file changed, 154 insertions(+), 83 deletions(-) diff --git a/piker/data/tsp.py b/piker/data/tsp.py index 8027908b..a33fb474 100644 --- a/piker/data/tsp.py +++ b/piker/data/tsp.py @@ -31,7 +31,8 @@ from math import ( import time from typing import ( Literal, - AsyncGenerator, + # AsyncGenerator, + Generator, ) import numpy as np @@ -252,11 +253,6 @@ def get_null_segs( zeroed for the `col` as `zero_t` ''' - # TODO: remove this? - # NOTE: ensure we read buffer state only once so that ShmArray rt - # circular-buffer updates don't cause a indexing/size mismatch. - # frame: np.ndarray = shm.array - times: Seq = frame['time'] zero_pred: Seq = (times == 0) @@ -268,24 +264,45 @@ def get_null_segs( if not tis_zeros: return None + # TODO: use ndarray for this?! absi_zsegs: list[list[int, int]] = [] if isinstance(frame, np.ndarray): - # ifirst: int = frame[0]['index'] + # view of ONLY the zero segments as one continuous chunk zero_t: np.ndarray = frame[zero_pred] - + # abs indices of said zeroed rows absi_zeros = zero_t['index'] - # relative frame-indexes of zeros - # fizeros = np.ndarray = zero_t['index'] - ifirst + # diff of abs index steps between each zeroed row absi_zdiff: np.ndarray = np.diff(absi_zeros) + + # scan for all frame-indices where the + # zeroed-row-abs-index-step-diff is greater then the + # expected increment of 1. + # data 1st zero seg data zeros + # ---- ------------ ---- ----- ------ ---- + # ||||..000000000000..||||..00000..||||||..0000 + # ---- ------------ ---- ----- ------ ---- + # ^zero_t[0] ^zero_t[-1] + # ^fi_zgaps[0] ^fi_zgaps[1] + # ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple + # absi_zsegs[0][1]^ + # + # NOTE: the first entry in `fi_zgaps` is where + # the first (absolute) index step diff is > 1. + # and it is a frame-relative index into `zero_t`. fi_zgaps = np.argwhere( absi_zdiff > 1 - # OR null / inf? - # OR is 0? for first zero-row entry? - ) + # NOTE: +1 here is ensure we index to the "start" of each + # segment (if we didn't the below loop needs to be + # re-written to expect `fi_end_rows`! + ) + 1 + # the rows from the contiguous zeroed segments which have + # abs-index steps >1 compared to the previous zero row + # (indicating an end of zeroed segment). fi_zseg_start_rows = zero_t[fi_zgaps] - else: # pl.DataFrame case + # TODO: equiv for pl.DataFrame case! + else: izeros: pl.Series = zero_pred.arg_true() zero_t: pl.DataFrame = frame[izeros] @@ -293,75 +310,126 @@ def get_null_segs( absi_zdiff: pl.Series = absi_zeros.diff() fi_zgaps = (absi_zdiff > 1).arg_true() - # select out slice index pairs for each null-segment - # portion detected throughout entire input frame. - # import pdbp; pdbp.set_trace() + # XXX: our goal (in this func) is to select out slice index + # pairs (zseg0_start, zseg_end) in abs index units for each + # null-segment portion detected throughout entire input frame. - # only one null-segment in entire frame? - if not fi_zgaps.size: - - # check for number null rows - # TODO: use ndarray for this! + # only up to one null-segment in entire frame? + num_gaps: int = fi_zgaps.size + 1 + if num_gaps < 1: if absi_zeros.size > 1: absi_zsegs = [[ - absi_zeros[0], # - 1, # - ifirst, - # TODO: need the + 1 or no? - absi_zeros[-1] + 1, # - ifirst, - ]] - else: - absi_zsegs = [[ - # absi_zeros[0] + 1, # see `get_hist()` in backend, should ALWAYS be # able to handle a `start_dt=None`! + # None, + absi_zeros[0] - 1, + # NOTE: need the + 1 to guarantee we index "up to" + # the next non-null row-datum. + absi_zeros[-1] + 1, + ]] + else: + # XXX EDGE CASE: only one null-datum found so + # mark the start abs index as None to trigger + # a full frame-len query to the respective backend? + absi_zsegs = [[ + # see `get_hist()` in backend, should ALWAYS be + # able to handle a `start_dt=None`! + # None, None, absi_zeros[0] + 1, ]] + + # XXX NOTE XXX: if >= 2 zeroed segments are found, there should + # ALWAYS be more then one zero-segment-abs-index-step-diff row + # in `absi_zdiff`, so loop through all such + # abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`) + # and add them as the "end index" entries for each segment. + # Then, iif NOT iterating the first such segment end, look back + # for the prior segments zero-segment start indext by relative + # indexing the `zero_t` frame by -1 and grabbing the abs index + # of what should be the prior zero-segment abs start index. else: + # NOTE: since `absi_zdiff` will never have a row + # corresponding to the first zero-segment's row, we add it + # manually here. absi_zsegs.append([ - absi_zeros[0] - 1, # - ifirst, + absi_zeros[0] - 1, None, ]) # TODO: can we do it with vec ops? for i, ( - fi, - zseg_start_row, + fi, # frame index of zero-seg start + zseg_start_row, # full row for ^ ) in enumerate(zip( fi_zgaps, fi_zseg_start_rows, )): assert (zseg_start_row == zero_t[fi]).all() - absi: int = zseg_start_row['index'][0] + iabs: int = zseg_start_row['index'][0] + absi_zsegs.append([ + iabs - 1, + None, # backfilled on next iter + ]) + # row = zero_t[fi] # absi_pre_zseg = row['index'][0] - 1 # absi_pre_zseg = absi - 1 - if i > 0: + # final iter case, backfill FINAL end iabs! + if (i + 1) == fi_zgaps.size: + absi_zsegs[-1][1] = absi_zeros[-1] + 1 + + # NOTE: only after the first segment (due to `.diff()` + # usage above) can we do a lookback to the prior + # segment's end row and determine it's abs index to + # retroactively insert to the prior + # `absi_zsegs[i-1][1]` entry Bo + last_end: int = absi_zsegs[i][1] + if last_end is None: prev_zseg_row = zero_t[fi - 1] absi_post_zseg = prev_zseg_row['index'][0] + 1 - absi_zsegs[i - 1][1] = absi_post_zseg + # XXX: MUST BACKFILL previous end iabs! + absi_zsegs[i][1] = absi_post_zseg - if (i + 1) < fi_zgaps.size: - absi_zsegs.append([ - absi, - None, - ]) else: + if 0 < num_gaps < 2: + absi_zsegs[-1][1] = absi_zeros[-1] + 1 + + iabs_first: int = frame['index'][0] for start, end in absi_zsegs: + ts_start: float = times[start - iabs_first] + ts_end: float = times[end - iabs_first] + if ( + ts_start == 0 + or + ts_end == 0 + ): + import pdbp + pdbp.set_trace() + assert end assert start < end + log.warning( + f'Frame has {len(absi_zsegs)} NULL GAPS!?\n' + f'period: {period}\n' + f'total null samples: {len(zero_t)}\n' + ) + return ( - absi_zsegs, # start indices of null - absi_zeros, - zero_t, + absi_zsegs, # [start, end] abs slice indices of seg + absi_zeros, # all abs indices within all null-segs + zero_t, # sliced-view of all null-segment rows-datums ) -async def iter_null_segs( - frame: Frame, +def iter_null_segs( timeframe: float, -) -> AsyncGenerator[ + frame: Frame | None = None, + null_segs: tuple | None = None, + +) -> Generator[ tuple[ int, int, int, int, @@ -377,48 +445,51 @@ async def iter_null_segs( ], None, ]: - if null_segs := get_null_segs( - frame, - period=timeframe, - ): - absi_pairs_zsegs: list[list[float, float]] - izeros: Seq - zero_t: Frame - ( - absi_pairs_zsegs, - izeros, - zero_t, - ) = null_segs + if null_segs is None: + null_segs: tuple = get_null_segs( + frame, + period=timeframe, + ) - absi_first: int = frame[0]['index'] - for ( - absi_start, - absi_end, - ) in absi_pairs_zsegs: + absi_pairs_zsegs: list[list[float, float]] + izeros: Seq + zero_t: Frame + ( + absi_pairs_zsegs, + izeros, + zero_t, + ) = null_segs - fi_end: int = absi_end - absi_first - end_row: Seq = frame[fi_end] - end_t: float = end_row['time'] - end_dt: DateTime = from_timestamp(end_t) + absi_first: int = frame[0]['index'] + for ( + absi_start, + absi_end, + ) in absi_pairs_zsegs: - if absi_start is not None: - fi_start: int = absi_start - absi_first - start_row: Seq = frame[fi_start] - start_t: float = start_row['time'] - start_dt: DateTime = from_timestamp(start_t) + fi_end: int = absi_end - absi_first + end_row: Seq = frame[fi_end] + end_t: float = end_row['time'] + end_dt: DateTime = from_timestamp(end_t) - else: - fi_start = None - start_row = None - start_t = None - start_dt = None + fi_start = None + start_row = None + start_t = None + start_dt = None + if ( + absi_start is not None + and start_t != 0 + ): + fi_start: int = absi_start - absi_first + start_row: Seq = frame[fi_start] + start_t: float = start_row['time'] + start_dt: DateTime = from_timestamp(start_t) - yield ( - absi_start, absi_end, # abs indices - fi_start, fi_end, # relative "frame" indices - start_t, end_t, - start_dt, end_dt, - ) + yield ( + absi_start, absi_end, # abs indices + fi_start, fi_end, # relative "frame" indices + start_t, end_t, + start_dt, end_dt, + ) def with_dts(