Use uniform step arithmetic in `slice_from_time()`
If we presume that time indexing using a uniform step we can calculate the exact index (using `//`) for the input time presuming the data set has zero gaps. This gives a massive speedup over `numpy` fancy indexing and (naive) `numba` iteration. Further in the case where time gaps are detected, we can use `numpy.searchsorted()` to binary search for the nearest expected index at lower latency. Deatz, - comment-disable the call to the naive `numba` scan impl. - add a optional `step: int` input (calced if not provided). - add todos for caching binary search results in the gap detection cases. - drop returning the "absolute buffer indexing" slice since the caller can always just use the read-relative slice to acquire it.epoch_indexing_and_dataviz_layer
							parent
							
								
									bb84715bf0
								
							
						
					
					
						commit
						f2c0987a04
					
				|  | @ -29,6 +29,7 @@ from numba import ( | |||
| 
 | ||||
| # TODO: for ``numba`` typing.. | ||||
| # from ._source import numba_ohlc_dtype | ||||
| from ._sharedmem import ShmArray | ||||
| from ._m4 import ds_m4 | ||||
| from .._profile import ( | ||||
|     Profiler, | ||||
|  | @ -126,6 +127,7 @@ def path_arrays_from_ohlc( | |||
|         high = q['high'] | ||||
|         low = q['low'] | ||||
|         close = q['close'] | ||||
|         # index = float64(q['index']) | ||||
|         index = float64(q['time']) | ||||
| 
 | ||||
|         # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622 | ||||
|  | @ -276,11 +278,7 @@ def _slice_from_time( | |||
|     start_t: float, | ||||
|     stop_t: float, | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|     tuple[int, int], | ||||
|     tuple[int, int], | ||||
|     np.ndarray | None, | ||||
| ]: | ||||
| ) -> tuple[int, int]: | ||||
|     ''' | ||||
|     Slice an input struct array to a time range and return the absolute | ||||
|     and "readable" slices for that array as well as the indexing mask | ||||
|  | @ -305,14 +303,6 @@ def _slice_from_time( | |||
|             ), | ||||
|         ) | ||||
| 
 | ||||
|     # TODO: if we can ensure each time field has a uniform | ||||
|     # step we can instead do some arithmetic to determine | ||||
|     # the equivalent index like we used to? | ||||
|     # return array[ | ||||
|     #     lbar - ifirst: | ||||
|     #     (rbar - ifirst) + 1 | ||||
|     # ] | ||||
| 
 | ||||
|     read_i_0: int = 0 | ||||
|     read_i_last: int = 0 | ||||
| 
 | ||||
|  | @ -328,62 +318,159 @@ def _slice_from_time( | |||
|             read_i_last = time | ||||
|             break | ||||
| 
 | ||||
|     abs_i_0 = int(index[0]) + read_i_0 | ||||
|     abs_i_last = int(index[0]) + read_i_last | ||||
| 
 | ||||
|     if read_i_last == 0: | ||||
|         read_i_last = times.shape[0] | ||||
| 
 | ||||
|     abs_slc = ( | ||||
|         int(abs_i_0), | ||||
|         int(abs_i_last), | ||||
|     ) | ||||
| 
 | ||||
|     read_slc = ( | ||||
|         int(read_i_0), | ||||
|         int(read_i_last), | ||||
|     ) | ||||
| 
 | ||||
|     # also return the readable data from the timerange | ||||
|     return ( | ||||
|         abs_slc, | ||||
|         read_slc, | ||||
|     ) | ||||
|     return read_i_0, read_i_last | ||||
| 
 | ||||
| 
 | ||||
| def slice_from_time( | ||||
|     arr: np.ndarray, | ||||
|     start_t: float, | ||||
|     stop_t: float, | ||||
|     step: int | None = None, | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|     slice, | ||||
|     slice, | ||||
|     np.ndarray | None, | ||||
| ]: | ||||
|     ''' | ||||
|     Calculate array indices mapped from a time range and return them in | ||||
|     a slice. | ||||
| 
 | ||||
|     Given an input array with an epoch `'time'` series entry, calculate | ||||
|     the indices which span the time range and return in a slice. Presume | ||||
|     each `'time'` step increment is uniform and when the time stamp | ||||
|     series contains gaps (the uniform presumption is untrue) use | ||||
|     ``np.searchsorted()`` binary search to look up the appropriate | ||||
|     index. | ||||
| 
 | ||||
|     ''' | ||||
|     profiler = Profiler( | ||||
|         msg='slice_from_time()', | ||||
|         disabled=not pg_profile_enabled(), | ||||
|         ms_threshold=ms_slower_then, | ||||
|     ) | ||||
| 
 | ||||
|     ( | ||||
|         abs_slc_tuple, | ||||
|         read_slc_tuple, | ||||
|     ) = _slice_from_time( | ||||
|         arr, | ||||
|         start_t, | ||||
|         stop_t, | ||||
|     times = arr['time'] | ||||
|     t_first = round(times[0]) | ||||
|     t_last = round(times[-1]) | ||||
| 
 | ||||
|     index = arr['index'] | ||||
|     i_first = index[0] | ||||
|     read_i_max = arr.shape[0] | ||||
| 
 | ||||
|     if ( | ||||
|         start_t < t_first | ||||
|         and stop_t > t_last | ||||
|     ): | ||||
|         read_i_start = 0 | ||||
|         read_i_stop = read_i_max | ||||
|         read_slc = slice( | ||||
|             0, | ||||
|             read_i_max, | ||||
|         ) | ||||
|         return read_slc | ||||
| 
 | ||||
|     if step is None: | ||||
|         step = round(times[-1] - times[-2]) | ||||
|         if step == 0: | ||||
|             # XXX: HOW TF is this happening? | ||||
|             step = 1 | ||||
| 
 | ||||
|     # compute (presumed) uniform-time-step index offsets | ||||
|     i_start_t = round(start_t) | ||||
|     read_i_start = (i_start_t - t_first) // step | ||||
| 
 | ||||
|     i_stop_t = round(stop_t) | ||||
|     read_i_stop = (i_stop_t - t_first) // step | ||||
| 
 | ||||
|     # always clip outputs to array support | ||||
|     # for read start: | ||||
|     # - never allow a start < the 0 index | ||||
|     # - never allow an end index > the read array len | ||||
|     read_i_start = min( | ||||
|         max(0, read_i_start), | ||||
|         read_i_max, | ||||
|     ) | ||||
|     read_i_stop = max( | ||||
|         0, | ||||
|         min(read_i_stop, read_i_max), | ||||
|     ) | ||||
| 
 | ||||
|     # check for larger-then-latest calculated index for given start | ||||
|     # time, in which case we do a binary search for the correct index. | ||||
|     # NOTE: this is usually the result of a time series with time gaps | ||||
|     # where it is expected that each index step maps to a uniform step | ||||
|     # in the time stamp series. | ||||
|     i_iv_start = index[read_i_start - 1] | ||||
|     t_iv_start = times[read_i_start - 1] | ||||
|     if ( | ||||
|         i_iv_start >= i_first | ||||
|         and t_iv_start > i_start_t | ||||
|     ): | ||||
|         # do a binary search for the best index mapping to ``start_t`` | ||||
|         # given we measured an overshoot using the uniform-time-step | ||||
|         # calculation from above. | ||||
| 
 | ||||
|         # TODO: once we start caching these per source-array, | ||||
|         # we can just overwrite ``read_i_start`` directly. | ||||
|         new_read_i_start = np.searchsorted( | ||||
|             times, | ||||
|             i_start_t, | ||||
|             side='left', | ||||
|         ) | ||||
| 
 | ||||
|         # TODO: minimize binary search work as much as possible: | ||||
|         # - cache these remap values which compensate for gaps in the | ||||
|         #   uniform time step basis where we calc a later start | ||||
|         #   index for the given input ``start_t``. | ||||
|         # - can we shorten the input search sequence by heuristic? | ||||
|         #   up_to_arith_start = index[:read_i_start] | ||||
| 
 | ||||
|         if ( | ||||
|             new_read_i_start < read_i_start | ||||
|         ): | ||||
|             # t_diff = t_iv_start - start_t | ||||
|             # print( | ||||
|             #     f"WE'RE CUTTING OUT TIME - STEP:{step}\n" | ||||
|             #     f'start_t:{start_t} -> 0index start_t:{t_iv_start}\n' | ||||
|             #     f'diff: {t_diff}\n' | ||||
|             #     f'REMAPPED START i: {read_i_start} -> {new_read_i_start}\n' | ||||
|             # ) | ||||
|             read_i_start = new_read_i_start | ||||
| 
 | ||||
|     # old much slower non-bin-search ``numba`` approach.. | ||||
|     # ( | ||||
|     #     read_i_start, | ||||
|     #     read_i_stop, | ||||
|     # ) = _slice_from_time( | ||||
|     #     arr, | ||||
|     #     start_t, | ||||
|     #     stop_t, | ||||
|     # ) | ||||
|     # abs_i_start = int(index[0]) + read_i_0 | ||||
|     # abs_i_stop = int(index[0]) + read_i_last | ||||
|     # if read_i_stop == 0: | ||||
|     #     read_i_stop = times.shape[0] | ||||
| 
 | ||||
|     # read-relative indexes: gives a slice where `shm.array[read_slc]` | ||||
|     # will be the data spanning the input time range `start_t` -> | ||||
|     # `stop_t` | ||||
|     read_slc = slice( | ||||
|         int(read_i_start), | ||||
|         int(read_i_stop), | ||||
|     ) | ||||
| 
 | ||||
|     abs_slc = slice(*abs_slc_tuple) | ||||
|     read_slc = slice(*read_slc_tuple) | ||||
|     profiler( | ||||
|         'slicing complete' | ||||
|         # f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' | ||||
|         # f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' | ||||
|     ) | ||||
|     return ( | ||||
|         abs_slc, | ||||
|         read_slc, | ||||
|     ) | ||||
| 
 | ||||
|     # NOTE: if caller needs absolute buffer indices they can | ||||
|     # slice the buffer abs index like so: | ||||
|     # abs_indx = index[read_slc] | ||||
|     # abs_slc = slice( | ||||
|     #     int(abs_indx[0]), | ||||
|     #     int(abs_indx[-1]), | ||||
|     # ) | ||||
| 
 | ||||
|     return read_slc | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue