diff --git a/piker/data/feed.py b/piker/data/feed.py index 2f5313e1..e51b5f6d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -87,6 +87,7 @@ from ..brokers._util import ( if TYPE_CHECKING: from .marketstore import Storage + from pyqtgraph import PlotItem log = get_logger(__name__) @@ -1037,6 +1038,113 @@ class Flume(Struct): **msg, ) + def get_index( + self, + time_s: float, + + ) -> int: + ''' + Return array shm-buffer index for for epoch time. + + ''' + array = self.rt_shm.array + times = array['time'] + mask = (times >= time_s) + + if any(mask): + return array['index'][mask][0] + + # just the latest index + array['index'][-1] + + def slice_from_time( + self, + array: np.ndarray, + start_t: float, + stop_t: float, + timeframe_s: int = 1, + return_data: bool = False, + + ) -> np.ndarray: + ''' + Slice an input struct array providing only datums + "in view" of this chart. + + ''' + arr = { + 1: self.rt_shm.array, + 60: self.hist_shm.arry, + }[timeframe_s] + + times = arr['time'] + index = array['index'] + + # use advanced indexing to map the + # time range to the index range. + mask = ( + (times >= start_t) + & + (times < stop_t) + ) + + # TODO: if we can ensure each time field has a uniform + # step we can instead do some arithmetic to determine + # the equivalent index like we used to? + # return array[ + # lbar - ifirst: + # (rbar - ifirst) + 1 + # ] + + i_by_t = index[mask] + i_0 = i_by_t[0] + + abs_slc = slice( + i_0, + i_by_t[-1], + ) + # slice data by offset from the first index + # available in the passed datum set. + read_slc = slice( + 0, + i_by_t[-1] - i_0, + ) + if not return_data: + return ( + abs_slc, + read_slc, + ) + + # also return the readable data from the timerange + return ( + abs_slc, + read_slc, + arr[mask], + ) + + def view_data( + self, + plot: PlotItem, + timeframe_s: int = 1, + + ) -> np.ndarray: + + # get far-side x-indices plot view + vr = plot.viewRect() + l = vr.left() + r = vr.right() + + ( + abs_slc, + buf_slc, + iv_arr, + ) = self.slice_from_time( + start_t=l, + stop_t=r, + timeframe_s=timeframe_s, + return_data=True, + ) + return iv_arr + async def allocate_persistent_feed( bus: _FeedsBus,