diff --git a/piker/data/_pathops.py b/piker/data/_pathops.py index 41dd6ad4..d83752c3 100644 --- a/piker/data/_pathops.py +++ b/piker/data/_pathops.py @@ -29,6 +29,7 @@ from numba import ( # TODO: for ``numba`` typing.. # from ._source import numba_ohlc_dtype +from ._sharedmem import ShmArray from ._m4 import ds_m4 from .._profile import ( Profiler, @@ -126,6 +127,7 @@ def path_arrays_from_ohlc( high = q['high'] low = q['low'] close = q['close'] + # index = float64(q['index']) index = float64(q['time']) # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622 @@ -276,11 +278,7 @@ def _slice_from_time( start_t: float, stop_t: float, -) -> tuple[ - tuple[int, int], - tuple[int, int], - np.ndarray | None, -]: +) -> tuple[int, int]: ''' Slice an input struct array to a time range and return the absolute and "readable" slices for that array as well as the indexing mask @@ -305,14 +303,6 @@ def _slice_from_time( ), ) - # TODO: if we can ensure each time field has a uniform - # step we can instead do some arithmetic to determine - # the equivalent index like we used to? - # return array[ - # lbar - ifirst: - # (rbar - ifirst) + 1 - # ] - read_i_0: int = 0 read_i_last: int = 0 @@ -328,62 +318,159 @@ def _slice_from_time( read_i_last = time break - abs_i_0 = int(index[0]) + read_i_0 - abs_i_last = int(index[0]) + read_i_last - - if read_i_last == 0: - read_i_last = times.shape[0] - - abs_slc = ( - int(abs_i_0), - int(abs_i_last), - ) - - read_slc = ( - int(read_i_0), - int(read_i_last), - ) - - # also return the readable data from the timerange - return ( - abs_slc, - read_slc, - ) + return read_i_0, read_i_last def slice_from_time( arr: np.ndarray, start_t: float, stop_t: float, + step: int | None = None, ) -> tuple[ slice, slice, - np.ndarray | None, ]: + ''' + Calculate array indices mapped from a time range and return them in + a slice. + + Given an input array with an epoch `'time'` series entry, calculate + the indices which span the time range and return in a slice. Presume + each `'time'` step increment is uniform and when the time stamp + series contains gaps (the uniform presumption is untrue) use + ``np.searchsorted()`` binary search to look up the appropriate + index. + + ''' profiler = Profiler( msg='slice_from_time()', disabled=not pg_profile_enabled(), ms_threshold=ms_slower_then, ) - ( - abs_slc_tuple, - read_slc_tuple, - ) = _slice_from_time( - arr, - start_t, - stop_t, + times = arr['time'] + t_first = round(times[0]) + t_last = round(times[-1]) + + index = arr['index'] + i_first = index[0] + read_i_max = arr.shape[0] + + if ( + start_t < t_first + and stop_t > t_last + ): + read_i_start = 0 + read_i_stop = read_i_max + read_slc = slice( + 0, + read_i_max, + ) + return read_slc + + if step is None: + step = round(times[-1] - times[-2]) + if step == 0: + # XXX: HOW TF is this happening? + step = 1 + + # compute (presumed) uniform-time-step index offsets + i_start_t = round(start_t) + read_i_start = (i_start_t - t_first) // step + + i_stop_t = round(stop_t) + read_i_stop = (i_stop_t - t_first) // step + + # always clip outputs to array support + # for read start: + # - never allow a start < the 0 index + # - never allow an end index > the read array len + read_i_start = min( + max(0, read_i_start), + read_i_max, + ) + read_i_stop = max( + 0, + min(read_i_stop, read_i_max), + ) + + # check for larger-then-latest calculated index for given start + # time, in which case we do a binary search for the correct index. + # NOTE: this is usually the result of a time series with time gaps + # where it is expected that each index step maps to a uniform step + # in the time stamp series. + i_iv_start = index[read_i_start - 1] + t_iv_start = times[read_i_start - 1] + if ( + i_iv_start >= i_first + and t_iv_start > i_start_t + ): + # do a binary search for the best index mapping to ``start_t`` + # given we measured an overshoot using the uniform-time-step + # calculation from above. + + # TODO: once we start caching these per source-array, + # we can just overwrite ``read_i_start`` directly. + new_read_i_start = np.searchsorted( + times, + i_start_t, + side='left', + ) + + # TODO: minimize binary search work as much as possible: + # - cache these remap values which compensate for gaps in the + # uniform time step basis where we calc a later start + # index for the given input ``start_t``. + # - can we shorten the input search sequence by heuristic? + # up_to_arith_start = index[:read_i_start] + + if ( + new_read_i_start < read_i_start + ): + # t_diff = t_iv_start - start_t + # print( + # f"WE'RE CUTTING OUT TIME - STEP:{step}\n" + # f'start_t:{start_t} -> 0index start_t:{t_iv_start}\n' + # f'diff: {t_diff}\n' + # f'REMAPPED START i: {read_i_start} -> {new_read_i_start}\n' + # ) + read_i_start = new_read_i_start + + # old much slower non-bin-search ``numba`` approach.. + # ( + # read_i_start, + # read_i_stop, + # ) = _slice_from_time( + # arr, + # start_t, + # stop_t, + # ) + # abs_i_start = int(index[0]) + read_i_0 + # abs_i_stop = int(index[0]) + read_i_last + # if read_i_stop == 0: + # read_i_stop = times.shape[0] + + # read-relative indexes: gives a slice where `shm.array[read_slc]` + # will be the data spanning the input time range `start_t` -> + # `stop_t` + read_slc = slice( + int(read_i_start), + int(read_i_stop), ) - abs_slc = slice(*abs_slc_tuple) - read_slc = slice(*read_slc_tuple) profiler( 'slicing complete' # f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' # f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' ) - return ( - abs_slc, - read_slc, - ) + + # NOTE: if caller needs absolute buffer indices they can + # slice the buffer abs index like so: + # abs_indx = index[read_slc] + # abs_slc = slice( + # int(abs_indx[0]), + # int(abs_indx[-1]), + # ) + + return read_slc