diff --git a/piker/data/flows.py b/piker/data/flows.py index f665e5a8..7a07fe38 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -48,6 +48,10 @@ from ._sharedmem import ( from ._sampling import ( open_sample_stream, ) +from .._profile import ( + Profiler, + pg_profile_enabled, +) if TYPE_CHECKING: from pyqtgraph import PlotItem @@ -251,6 +255,13 @@ class Flume(Struct): for the caller to use to slice the input array if needed. ''' + profiler = Profiler( + msg='Flume.slice_from_time()', + disabled=not pg_profile_enabled(), + ms_threshold=4, + # ms_threshold=ms_slower_then, + ) + times = arr['time'] index = arr['index'] @@ -272,12 +283,12 @@ class Flume(Struct): # use advanced indexing to map the # time range to the index range. - mask: np.ndarray = ( + mask: np.ndarray = np.where( (times >= start_t) - | # fml, i guess it's not an `&` ?? + & (times < stop_t) ) - + profiler('advanced indexing slice') # TODO: if we can ensure each time field has a uniform # step we can instead do some arithmetic to determine # the equivalent index like we used to? @@ -289,6 +300,8 @@ class Flume(Struct): i_by_t = index[mask] try: i_0 = i_by_t[0] + i_last = i_by_t[-1] + i_first_read = index[0] except IndexError: if ( start_t < times[0] @@ -306,17 +319,20 @@ class Flume(Struct): None, ) - abs_slc = slice( - i_0, - i_by_t[-1], - ) + abs_slc = slice(i_0, i_last) + # slice data by offset from the first index # available in the passed datum set. read_slc = slice( - i_0 - index[0], - i_by_t[-1] - i_0, + i_0 - i_first_read, + i_last - i_first_read + 1, ) + profiler( + 'slicing complete' + f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' + f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' + ) # also return the readable data from the timerange return ( abs_slc,