Use uniform step arithmetic in `slice_from_time()`
If we presume that time indexing using a uniform step we can calculate the exact index (using `//`) for the input time presuming the data set has zero gaps. This gives a massive speedup over `numpy` fancy indexing and (naive) `numba` iteration. Further in the case where time gaps are detected, we can use `numpy.searchsorted()` to binary search for the nearest expected index at lower latency. Deatz, - comment-disable the call to the naive `numba` scan impl. - add a optional `step: int` input (calced if not provided). - add todos for caching binary search results in the gap detection cases. - drop returning the "absolute buffer indexing" slice since the caller can always just use the read-relative slice to acquire it.epoch_index_backup
							parent
							
								
									94e0f48f39
								
							
						
					
					
						commit
						5c417fe815
					
				| 
						 | 
					@ -29,6 +29,7 @@ from numba import (
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# TODO: for ``numba`` typing..
 | 
					# TODO: for ``numba`` typing..
 | 
				
			||||||
# from ._source import numba_ohlc_dtype
 | 
					# from ._source import numba_ohlc_dtype
 | 
				
			||||||
 | 
					from ._sharedmem import ShmArray
 | 
				
			||||||
from ._m4 import ds_m4
 | 
					from ._m4 import ds_m4
 | 
				
			||||||
from .._profile import (
 | 
					from .._profile import (
 | 
				
			||||||
    Profiler,
 | 
					    Profiler,
 | 
				
			||||||
| 
						 | 
					@ -126,6 +127,7 @@ def path_arrays_from_ohlc(
 | 
				
			||||||
        high = q['high']
 | 
					        high = q['high']
 | 
				
			||||||
        low = q['low']
 | 
					        low = q['low']
 | 
				
			||||||
        close = q['close']
 | 
					        close = q['close']
 | 
				
			||||||
 | 
					        # index = float64(q['index'])
 | 
				
			||||||
        index = float64(q['time'])
 | 
					        index = float64(q['time'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
 | 
					        # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
 | 
				
			||||||
| 
						 | 
					@ -276,11 +278,7 @@ def _slice_from_time(
 | 
				
			||||||
    start_t: float,
 | 
					    start_t: float,
 | 
				
			||||||
    stop_t: float,
 | 
					    stop_t: float,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> tuple[
 | 
					) -> tuple[int, int]:
 | 
				
			||||||
    tuple[int, int],
 | 
					 | 
				
			||||||
    tuple[int, int],
 | 
					 | 
				
			||||||
    np.ndarray | None,
 | 
					 | 
				
			||||||
]:
 | 
					 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Slice an input struct array to a time range and return the absolute
 | 
					    Slice an input struct array to a time range and return the absolute
 | 
				
			||||||
    and "readable" slices for that array as well as the indexing mask
 | 
					    and "readable" slices for that array as well as the indexing mask
 | 
				
			||||||
| 
						 | 
					@ -305,14 +303,6 @@ def _slice_from_time(
 | 
				
			||||||
            ),
 | 
					            ),
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: if we can ensure each time field has a uniform
 | 
					 | 
				
			||||||
    # step we can instead do some arithmetic to determine
 | 
					 | 
				
			||||||
    # the equivalent index like we used to?
 | 
					 | 
				
			||||||
    # return array[
 | 
					 | 
				
			||||||
    #     lbar - ifirst:
 | 
					 | 
				
			||||||
    #     (rbar - ifirst) + 1
 | 
					 | 
				
			||||||
    # ]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    read_i_0: int = 0
 | 
					    read_i_0: int = 0
 | 
				
			||||||
    read_i_last: int = 0
 | 
					    read_i_last: int = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -328,62 +318,159 @@ def _slice_from_time(
 | 
				
			||||||
            read_i_last = time
 | 
					            read_i_last = time
 | 
				
			||||||
            break
 | 
					            break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    abs_i_0 = int(index[0]) + read_i_0
 | 
					    return read_i_0, read_i_last
 | 
				
			||||||
    abs_i_last = int(index[0]) + read_i_last
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if read_i_last == 0:
 | 
					 | 
				
			||||||
        read_i_last = times.shape[0]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    abs_slc = (
 | 
					 | 
				
			||||||
        int(abs_i_0),
 | 
					 | 
				
			||||||
        int(abs_i_last),
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    read_slc = (
 | 
					 | 
				
			||||||
        int(read_i_0),
 | 
					 | 
				
			||||||
        int(read_i_last),
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # also return the readable data from the timerange
 | 
					 | 
				
			||||||
    return (
 | 
					 | 
				
			||||||
        abs_slc,
 | 
					 | 
				
			||||||
        read_slc,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def slice_from_time(
 | 
					def slice_from_time(
 | 
				
			||||||
    arr: np.ndarray,
 | 
					    arr: np.ndarray,
 | 
				
			||||||
    start_t: float,
 | 
					    start_t: float,
 | 
				
			||||||
    stop_t: float,
 | 
					    stop_t: float,
 | 
				
			||||||
 | 
					    step: int | None = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> tuple[
 | 
					) -> tuple[
 | 
				
			||||||
    slice,
 | 
					    slice,
 | 
				
			||||||
    slice,
 | 
					    slice,
 | 
				
			||||||
    np.ndarray | None,
 | 
					 | 
				
			||||||
]:
 | 
					]:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Calculate array indices mapped from a time range and return them in
 | 
				
			||||||
 | 
					    a slice.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Given an input array with an epoch `'time'` series entry, calculate
 | 
				
			||||||
 | 
					    the indices which span the time range and return in a slice. Presume
 | 
				
			||||||
 | 
					    each `'time'` step increment is uniform and when the time stamp
 | 
				
			||||||
 | 
					    series contains gaps (the uniform presumption is untrue) use
 | 
				
			||||||
 | 
					    ``np.searchsorted()`` binary search to look up the appropriate
 | 
				
			||||||
 | 
					    index.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
    profiler = Profiler(
 | 
					    profiler = Profiler(
 | 
				
			||||||
        msg='slice_from_time()',
 | 
					        msg='slice_from_time()',
 | 
				
			||||||
        disabled=not pg_profile_enabled(),
 | 
					        disabled=not pg_profile_enabled(),
 | 
				
			||||||
        ms_threshold=ms_slower_then,
 | 
					        ms_threshold=ms_slower_then,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    (
 | 
					    times = arr['time']
 | 
				
			||||||
        abs_slc_tuple,
 | 
					    t_first = round(times[0])
 | 
				
			||||||
        read_slc_tuple,
 | 
					    t_last = round(times[-1])
 | 
				
			||||||
    ) = _slice_from_time(
 | 
					
 | 
				
			||||||
        arr,
 | 
					    index = arr['index']
 | 
				
			||||||
        start_t,
 | 
					    i_first = index[0]
 | 
				
			||||||
        stop_t,
 | 
					    read_i_max = arr.shape[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (
 | 
				
			||||||
 | 
					        start_t < t_first
 | 
				
			||||||
 | 
					        and stop_t > t_last
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					        read_i_start = 0
 | 
				
			||||||
 | 
					        read_i_stop = read_i_max
 | 
				
			||||||
 | 
					        read_slc = slice(
 | 
				
			||||||
 | 
					            0,
 | 
				
			||||||
 | 
					            read_i_max,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        return read_slc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if step is None:
 | 
				
			||||||
 | 
					        step = round(times[-1] - times[-2])
 | 
				
			||||||
 | 
					        if step == 0:
 | 
				
			||||||
 | 
					            # XXX: HOW TF is this happening?
 | 
				
			||||||
 | 
					            step = 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # compute (presumed) uniform-time-step index offsets
 | 
				
			||||||
 | 
					    i_start_t = round(start_t)
 | 
				
			||||||
 | 
					    read_i_start = (i_start_t - t_first) // step
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    i_stop_t = round(stop_t)
 | 
				
			||||||
 | 
					    read_i_stop = (i_stop_t - t_first) // step
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # always clip outputs to array support
 | 
				
			||||||
 | 
					    # for read start:
 | 
				
			||||||
 | 
					    # - never allow a start < the 0 index
 | 
				
			||||||
 | 
					    # - never allow an end index > the read array len
 | 
				
			||||||
 | 
					    read_i_start = min(
 | 
				
			||||||
 | 
					        max(0, read_i_start),
 | 
				
			||||||
 | 
					        read_i_max,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    read_i_stop = max(
 | 
				
			||||||
 | 
					        0,
 | 
				
			||||||
 | 
					        min(read_i_stop, read_i_max),
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # check for larger-then-latest calculated index for given start
 | 
				
			||||||
 | 
					    # time, in which case we do a binary search for the correct index.
 | 
				
			||||||
 | 
					    # NOTE: this is usually the result of a time series with time gaps
 | 
				
			||||||
 | 
					    # where it is expected that each index step maps to a uniform step
 | 
				
			||||||
 | 
					    # in the time stamp series.
 | 
				
			||||||
 | 
					    i_iv_start = index[read_i_start - 1]
 | 
				
			||||||
 | 
					    t_iv_start = times[read_i_start - 1]
 | 
				
			||||||
 | 
					    if (
 | 
				
			||||||
 | 
					        i_iv_start >= i_first
 | 
				
			||||||
 | 
					        and t_iv_start > i_start_t
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					        # do a binary search for the best index mapping to ``start_t``
 | 
				
			||||||
 | 
					        # given we measured an overshoot using the uniform-time-step
 | 
				
			||||||
 | 
					        # calculation from above.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # TODO: once we start caching these per source-array,
 | 
				
			||||||
 | 
					        # we can just overwrite ``read_i_start`` directly.
 | 
				
			||||||
 | 
					        new_read_i_start = np.searchsorted(
 | 
				
			||||||
 | 
					            times,
 | 
				
			||||||
 | 
					            i_start_t,
 | 
				
			||||||
 | 
					            side='left',
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # TODO: minimize binary search work as much as possible:
 | 
				
			||||||
 | 
					        # - cache these remap values which compensate for gaps in the
 | 
				
			||||||
 | 
					        #   uniform time step basis where we calc a later start
 | 
				
			||||||
 | 
					        #   index for the given input ``start_t``.
 | 
				
			||||||
 | 
					        # - can we shorten the input search sequence by heuristic?
 | 
				
			||||||
 | 
					        #   up_to_arith_start = index[:read_i_start]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (
 | 
				
			||||||
 | 
					            new_read_i_start < read_i_start
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					            # t_diff = t_iv_start - start_t
 | 
				
			||||||
 | 
					            # print(
 | 
				
			||||||
 | 
					            #     f"WE'RE CUTTING OUT TIME - STEP:{step}\n"
 | 
				
			||||||
 | 
					            #     f'start_t:{start_t} -> 0index start_t:{t_iv_start}\n'
 | 
				
			||||||
 | 
					            #     f'diff: {t_diff}\n'
 | 
				
			||||||
 | 
					            #     f'REMAPPED START i: {read_i_start} -> {new_read_i_start}\n'
 | 
				
			||||||
 | 
					            # )
 | 
				
			||||||
 | 
					            read_i_start = new_read_i_start
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # old much slower non-bin-search ``numba`` approach..
 | 
				
			||||||
 | 
					    # (
 | 
				
			||||||
 | 
					    #     read_i_start,
 | 
				
			||||||
 | 
					    #     read_i_stop,
 | 
				
			||||||
 | 
					    # ) = _slice_from_time(
 | 
				
			||||||
 | 
					    #     arr,
 | 
				
			||||||
 | 
					    #     start_t,
 | 
				
			||||||
 | 
					    #     stop_t,
 | 
				
			||||||
 | 
					    # )
 | 
				
			||||||
 | 
					    # abs_i_start = int(index[0]) + read_i_0
 | 
				
			||||||
 | 
					    # abs_i_stop = int(index[0]) + read_i_last
 | 
				
			||||||
 | 
					    # if read_i_stop == 0:
 | 
				
			||||||
 | 
					    #     read_i_stop = times.shape[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # read-relative indexes: gives a slice where `shm.array[read_slc]`
 | 
				
			||||||
 | 
					    # will be the data spanning the input time range `start_t` ->
 | 
				
			||||||
 | 
					    # `stop_t`
 | 
				
			||||||
 | 
					    read_slc = slice(
 | 
				
			||||||
 | 
					        int(read_i_start),
 | 
				
			||||||
 | 
					        int(read_i_stop),
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    abs_slc = slice(*abs_slc_tuple)
 | 
					 | 
				
			||||||
    read_slc = slice(*read_slc_tuple)
 | 
					 | 
				
			||||||
    profiler(
 | 
					    profiler(
 | 
				
			||||||
        'slicing complete'
 | 
					        'slicing complete'
 | 
				
			||||||
        # f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
 | 
					        # f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
 | 
				
			||||||
        # f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
 | 
					        # f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    return (
 | 
					
 | 
				
			||||||
        abs_slc,
 | 
					    # NOTE: if caller needs absolute buffer indices they can
 | 
				
			||||||
        read_slc,
 | 
					    # slice the buffer abs index like so:
 | 
				
			||||||
    )
 | 
					    # abs_indx = index[read_slc]
 | 
				
			||||||
 | 
					    # abs_slc = slice(
 | 
				
			||||||
 | 
					    #     int(abs_indx[0]),
 | 
				
			||||||
 | 
					    #     int(abs_indx[-1]),
 | 
				
			||||||
 | 
					    # )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return read_slc
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue