diff --git a/piker/data/tsp.py b/piker/data/tsp.py index 8027908b..a33fb474 100644 --- a/piker/data/tsp.py +++ b/piker/data/tsp.py @@ -31,7 +31,8 @@ from math import ( import time from typing import ( Literal, - AsyncGenerator, + # AsyncGenerator, + Generator, ) import numpy as np @@ -252,11 +253,6 @@ def get_null_segs( zeroed for the `col` as `zero_t` ''' - # TODO: remove this? - # NOTE: ensure we read buffer state only once so that ShmArray rt - # circular-buffer updates don't cause a indexing/size mismatch. - # frame: np.ndarray = shm.array - times: Seq = frame['time'] zero_pred: Seq = (times == 0) @@ -268,24 +264,45 @@ def get_null_segs( if not tis_zeros: return None + # TODO: use ndarray for this?! absi_zsegs: list[list[int, int]] = [] if isinstance(frame, np.ndarray): - # ifirst: int = frame[0]['index'] + # view of ONLY the zero segments as one continuous chunk zero_t: np.ndarray = frame[zero_pred] - + # abs indices of said zeroed rows absi_zeros = zero_t['index'] - # relative frame-indexes of zeros - # fizeros = np.ndarray = zero_t['index'] - ifirst + # diff of abs index steps between each zeroed row absi_zdiff: np.ndarray = np.diff(absi_zeros) + + # scan for all frame-indices where the + # zeroed-row-abs-index-step-diff is greater then the + # expected increment of 1. + # data 1st zero seg data zeros + # ---- ------------ ---- ----- ------ ---- + # ||||..000000000000..||||..00000..||||||..0000 + # ---- ------------ ---- ----- ------ ---- + # ^zero_t[0] ^zero_t[-1] + # ^fi_zgaps[0] ^fi_zgaps[1] + # ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple + # absi_zsegs[0][1]^ + # + # NOTE: the first entry in `fi_zgaps` is where + # the first (absolute) index step diff is > 1. + # and it is a frame-relative index into `zero_t`. fi_zgaps = np.argwhere( absi_zdiff > 1 - # OR null / inf? - # OR is 0? for first zero-row entry? - ) + # NOTE: +1 here is ensure we index to the "start" of each + # segment (if we didn't the below loop needs to be + # re-written to expect `fi_end_rows`! + ) + 1 + # the rows from the contiguous zeroed segments which have + # abs-index steps >1 compared to the previous zero row + # (indicating an end of zeroed segment). fi_zseg_start_rows = zero_t[fi_zgaps] - else: # pl.DataFrame case + # TODO: equiv for pl.DataFrame case! + else: izeros: pl.Series = zero_pred.arg_true() zero_t: pl.DataFrame = frame[izeros] @@ -293,75 +310,126 @@ def get_null_segs( absi_zdiff: pl.Series = absi_zeros.diff() fi_zgaps = (absi_zdiff > 1).arg_true() - # select out slice index pairs for each null-segment - # portion detected throughout entire input frame. - # import pdbp; pdbp.set_trace() + # XXX: our goal (in this func) is to select out slice index + # pairs (zseg0_start, zseg_end) in abs index units for each + # null-segment portion detected throughout entire input frame. - # only one null-segment in entire frame? - if not fi_zgaps.size: - - # check for number null rows - # TODO: use ndarray for this! + # only up to one null-segment in entire frame? + num_gaps: int = fi_zgaps.size + 1 + if num_gaps < 1: if absi_zeros.size > 1: absi_zsegs = [[ - absi_zeros[0], # - 1, # - ifirst, - # TODO: need the + 1 or no? - absi_zeros[-1] + 1, # - ifirst, - ]] - else: - absi_zsegs = [[ - # absi_zeros[0] + 1, # see `get_hist()` in backend, should ALWAYS be # able to handle a `start_dt=None`! + # None, + absi_zeros[0] - 1, + # NOTE: need the + 1 to guarantee we index "up to" + # the next non-null row-datum. + absi_zeros[-1] + 1, + ]] + else: + # XXX EDGE CASE: only one null-datum found so + # mark the start abs index as None to trigger + # a full frame-len query to the respective backend? + absi_zsegs = [[ + # see `get_hist()` in backend, should ALWAYS be + # able to handle a `start_dt=None`! + # None, None, absi_zeros[0] + 1, ]] + + # XXX NOTE XXX: if >= 2 zeroed segments are found, there should + # ALWAYS be more then one zero-segment-abs-index-step-diff row + # in `absi_zdiff`, so loop through all such + # abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`) + # and add them as the "end index" entries for each segment. + # Then, iif NOT iterating the first such segment end, look back + # for the prior segments zero-segment start indext by relative + # indexing the `zero_t` frame by -1 and grabbing the abs index + # of what should be the prior zero-segment abs start index. else: + # NOTE: since `absi_zdiff` will never have a row + # corresponding to the first zero-segment's row, we add it + # manually here. absi_zsegs.append([ - absi_zeros[0] - 1, # - ifirst, + absi_zeros[0] - 1, None, ]) # TODO: can we do it with vec ops? for i, ( - fi, - zseg_start_row, + fi, # frame index of zero-seg start + zseg_start_row, # full row for ^ ) in enumerate(zip( fi_zgaps, fi_zseg_start_rows, )): assert (zseg_start_row == zero_t[fi]).all() - absi: int = zseg_start_row['index'][0] + iabs: int = zseg_start_row['index'][0] + absi_zsegs.append([ + iabs - 1, + None, # backfilled on next iter + ]) + # row = zero_t[fi] # absi_pre_zseg = row['index'][0] - 1 # absi_pre_zseg = absi - 1 - if i > 0: + # final iter case, backfill FINAL end iabs! + if (i + 1) == fi_zgaps.size: + absi_zsegs[-1][1] = absi_zeros[-1] + 1 + + # NOTE: only after the first segment (due to `.diff()` + # usage above) can we do a lookback to the prior + # segment's end row and determine it's abs index to + # retroactively insert to the prior + # `absi_zsegs[i-1][1]` entry Bo + last_end: int = absi_zsegs[i][1] + if last_end is None: prev_zseg_row = zero_t[fi - 1] absi_post_zseg = prev_zseg_row['index'][0] + 1 - absi_zsegs[i - 1][1] = absi_post_zseg + # XXX: MUST BACKFILL previous end iabs! + absi_zsegs[i][1] = absi_post_zseg - if (i + 1) < fi_zgaps.size: - absi_zsegs.append([ - absi, - None, - ]) else: + if 0 < num_gaps < 2: + absi_zsegs[-1][1] = absi_zeros[-1] + 1 + + iabs_first: int = frame['index'][0] for start, end in absi_zsegs: + ts_start: float = times[start - iabs_first] + ts_end: float = times[end - iabs_first] + if ( + ts_start == 0 + or + ts_end == 0 + ): + import pdbp + pdbp.set_trace() + assert end assert start < end + log.warning( + f'Frame has {len(absi_zsegs)} NULL GAPS!?\n' + f'period: {period}\n' + f'total null samples: {len(zero_t)}\n' + ) + return ( - absi_zsegs, # start indices of null - absi_zeros, - zero_t, + absi_zsegs, # [start, end] abs slice indices of seg + absi_zeros, # all abs indices within all null-segs + zero_t, # sliced-view of all null-segment rows-datums ) -async def iter_null_segs( - frame: Frame, +def iter_null_segs( timeframe: float, -) -> AsyncGenerator[ + frame: Frame | None = None, + null_segs: tuple | None = None, + +) -> Generator[ tuple[ int, int, int, int, @@ -377,48 +445,51 @@ async def iter_null_segs( ], None, ]: - if null_segs := get_null_segs( - frame, - period=timeframe, - ): - absi_pairs_zsegs: list[list[float, float]] - izeros: Seq - zero_t: Frame - ( - absi_pairs_zsegs, - izeros, - zero_t, - ) = null_segs + if null_segs is None: + null_segs: tuple = get_null_segs( + frame, + period=timeframe, + ) - absi_first: int = frame[0]['index'] - for ( - absi_start, - absi_end, - ) in absi_pairs_zsegs: + absi_pairs_zsegs: list[list[float, float]] + izeros: Seq + zero_t: Frame + ( + absi_pairs_zsegs, + izeros, + zero_t, + ) = null_segs - fi_end: int = absi_end - absi_first - end_row: Seq = frame[fi_end] - end_t: float = end_row['time'] - end_dt: DateTime = from_timestamp(end_t) + absi_first: int = frame[0]['index'] + for ( + absi_start, + absi_end, + ) in absi_pairs_zsegs: - if absi_start is not None: - fi_start: int = absi_start - absi_first - start_row: Seq = frame[fi_start] - start_t: float = start_row['time'] - start_dt: DateTime = from_timestamp(start_t) + fi_end: int = absi_end - absi_first + end_row: Seq = frame[fi_end] + end_t: float = end_row['time'] + end_dt: DateTime = from_timestamp(end_t) - else: - fi_start = None - start_row = None - start_t = None - start_dt = None + fi_start = None + start_row = None + start_t = None + start_dt = None + if ( + absi_start is not None + and start_t != 0 + ): + fi_start: int = absi_start - absi_first + start_row: Seq = frame[fi_start] + start_t: float = start_row['time'] + start_dt: DateTime = from_timestamp(start_t) - yield ( - absi_start, absi_end, # abs indices - fi_start, fi_end, # relative "frame" indices - start_t, end_t, - start_dt, end_dt, - ) + yield ( + absi_start, absi_end, # abs indices + fi_start, fi_end, # relative "frame" indices + start_t, end_t, + start_dt, end_dt, + ) def with_dts(