From a33f58a61aac5e49748af21b9f8949430adeef9c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Dec 2022 20:13:17 -0500 Subject: [PATCH] Move `Flume.slice_from_time()` to `.data._pathops` mod func --- piker/data/_pathops.py | 108 ++++++++++++++++++++++++++++++++++++++++- piker/data/flows.py | 105 +-------------------------------------- piker/ui/_chart.py | 3 +- piker/ui/_render.py | 13 ++--- 4 files changed, 118 insertions(+), 111 deletions(-) diff --git a/piker/data/_pathops.py b/piker/data/_pathops.py index 23df989d..91169e73 100644 --- a/piker/data/_pathops.py +++ b/piker/data/_pathops.py @@ -30,6 +30,10 @@ from numba import ( # TODO: for ``numba`` typing.. # from ._source import numba_ohlc_dtype from ._m4 import ds_m4 +from .._profile import ( + Profiler, + pg_profile_enabled, +) def xy_downsample( @@ -121,7 +125,7 @@ def path_arrays_from_ohlc( high = q['high'] low = q['low'] close = q['close'] - index = float64(q['index']) + index = float64(q['time']) # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622 # index = float64(q[index_field]) @@ -263,3 +267,105 @@ def ohlc_flatten( num=len(flat), ) return x, flat + + +def slice_from_time( + arr: np.ndarray, + start_t: float, + stop_t: float, + +) -> tuple[ + slice, + slice, + np.ndarray | None, +]: + ''' + Slice an input struct array to a time range and return the absolute + and "readable" slices for that array as well as the indexing mask + for the caller to use to slice the input array if needed. + + ''' + profiler = Profiler( + msg='slice_from_time()', + disabled=not pg_profile_enabled(), + ms_threshold=4, + # ms_threshold=ms_slower_then, + ) + + times = arr['time'] + index = arr['index'] + + if ( + start_t < 0 + or start_t >= stop_t + ): + return ( + slice( + index[0], + index[-1], + ), + slice( + 0, + len(arr), + ), + None, + ) + + # use advanced indexing to map the + # time range to the index range. + mask: np.ndarray = np.where( + (times >= start_t) + & + (times < stop_t) + ) + profiler('advanced indexing slice') + # TODO: if we can ensure each time field has a uniform + # step we can instead do some arithmetic to determine + # the equivalent index like we used to? + # return array[ + # lbar - ifirst: + # (rbar - ifirst) + 1 + # ] + + i_by_t = index[mask] + try: + i_0 = i_by_t[0] + i_last = i_by_t[-1] + i_first_read = index[0] + except IndexError: + if ( + start_t < times[0] + or stop_t >= times[-1] + ): + return ( + slice( + index[0], + index[-1], + ), + slice( + 0, + len(arr), + ), + None, + ) + + abs_slc = slice(i_0, i_last) + + # slice data by offset from the first index + # available in the passed datum set. + read_slc = slice( + i_0 - i_first_read, + i_last - i_first_read + 1, + ) + + profiler( + 'slicing complete' + f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' + f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' + ) + # also return the readable data from the timerange + return ( + abs_slc, + read_slc, + mask, + ) diff --git a/piker/data/flows.py b/piker/data/flows.py index 7a07fe38..cf3a028a 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -48,6 +48,7 @@ from ._sharedmem import ( from ._sampling import ( open_sample_stream, ) +from ._pathops import slice_from_time from .._profile import ( Profiler, pg_profile_enabled, @@ -238,108 +239,6 @@ class Flume(Struct): # just the latest index return array['index'][-1] - def slice_from_time( - self, - arr: np.ndarray, - start_t: float, - stop_t: float, - - ) -> tuple[ - slice, - slice, - np.ndarray | None, - ]: - ''' - Slice an input struct array to a time range and return the absolute - and "readable" slices for that array as well as the indexing mask - for the caller to use to slice the input array if needed. - - ''' - profiler = Profiler( - msg='Flume.slice_from_time()', - disabled=not pg_profile_enabled(), - ms_threshold=4, - # ms_threshold=ms_slower_then, - ) - - times = arr['time'] - index = arr['index'] - - if ( - start_t < 0 - or start_t >= stop_t - ): - return ( - slice( - index[0], - index[-1], - ), - slice( - 0, - len(arr), - ), - None, - ) - - # use advanced indexing to map the - # time range to the index range. - mask: np.ndarray = np.where( - (times >= start_t) - & - (times < stop_t) - ) - profiler('advanced indexing slice') - # TODO: if we can ensure each time field has a uniform - # step we can instead do some arithmetic to determine - # the equivalent index like we used to? - # return array[ - # lbar - ifirst: - # (rbar - ifirst) + 1 - # ] - - i_by_t = index[mask] - try: - i_0 = i_by_t[0] - i_last = i_by_t[-1] - i_first_read = index[0] - except IndexError: - if ( - start_t < times[0] - or stop_t >= times[-1] - ): - return ( - slice( - index[0], - index[-1], - ), - slice( - 0, - len(arr), - ), - None, - ) - - abs_slc = slice(i_0, i_last) - - # slice data by offset from the first index - # available in the passed datum set. - read_slc = slice( - i_0 - i_first_read, - i_last - i_first_read + 1, - ) - - profiler( - 'slicing complete' - f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' - f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' - ) - # also return the readable data from the timerange - return ( - abs_slc, - read_slc, - mask, - ) - # TODO: maybe move this our ``Viz`` type to avoid # the shm lookup discrepancy? def view_data( @@ -366,7 +265,7 @@ class Flume(Struct): abs_slc, read_slc, mask, - ) = self.slice_from_time( + ) = slice_from_time( arr, start_t=vr.left(), stop_t=vr.right(), diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 4dfcdc80..3e661e51 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -66,6 +66,7 @@ from ..data.feed import ( Feed, Flume, ) +from ..data._pathops import slice_from_time from ..data._source import Symbol from ..log import get_logger from ._interaction import ChartView @@ -1037,7 +1038,7 @@ class ChartPlotWidget(pg.PlotWidget): read_slc, mask, - ) = viz.flume.slice_from_time( + ) = slice_from_time( array, start_t=vtl, stop_t=vtr, diff --git a/piker/ui/_render.py b/piker/ui/_render.py index 5a45cfbe..ab07237e 100644 --- a/piker/ui/_render.py +++ b/piker/ui/_render.py @@ -43,7 +43,10 @@ from ..data._formatters import ( OHLCBarsAsCurveFmtr, # OHLC converted to line StepCurveFmtr, # "step" curve (like for vlm) ) -from ..data._pathops import xy_downsample +from ..data._pathops import ( + xy_downsample, + slice_from_time, +) from ._ohlc import ( BarItems, # bar_from_ohlc_row, @@ -101,7 +104,6 @@ def render_baritems( fmtr=OHLCBarsFmtr( shm=viz.shm, viz=viz, - # index_field=viz.index_field, ), ) @@ -110,7 +112,6 @@ def render_baritems( fmtr=OHLCBarsAsCurveFmtr( shm=viz.shm, viz=viz, - # index_field=viz.index_field, ), ) @@ -291,7 +292,7 @@ class Viz(msgspec.Struct): # , frozen=True): abs_slc, read_slc, mask, - ) = self.flume.slice_from_time( + ) = slice_from_time( arr, start_t=lbar, stop_t=rbar, @@ -444,7 +445,7 @@ class Viz(msgspec.Struct): # , frozen=True): abs_slc, read_slc, mask, - ) = self.flume.slice_from_time( + ) = slice_from_time( array, start_t=lbar, stop_t=rbar, @@ -463,7 +464,7 @@ class Viz(msgspec.Struct): # , frozen=True): # ) if profiler: profiler( - '`Flume.slice_from_time(' + '`slice_from_time(' f'start_t={lbar}' f'stop_t={rbar})' )