Extend `Flume` methods

Add some (untested) data slicing util methods for mapping time ranges to
source data indices:
- `.get_index()` which maps a single input epoch time to an equiv array
  (int) index.
- add `slice_from_time()` which returns a view of the shm data from an
  input epoch range presuming the underlying struct array contains
  a `'time'` field with epoch stamps.
- `.view_data()` which slices out the "in view" data according to the
  current state of the passed in `pg.PlotItem`'s view box.
samplerd_service
Tyler Goodlet 2022-11-24 11:51:56 -05:00
parent 7da5c2b238
commit e5e70a6011
1 changed files with 108 additions and 0 deletions

View File

@ -87,6 +87,7 @@ from ..brokers._util import (
if TYPE_CHECKING:
from .marketstore import Storage
from pyqtgraph import PlotItem
log = get_logger(__name__)
@ -1037,6 +1038,113 @@ class Flume(Struct):
**msg,
)
def get_index(
self,
time_s: float,
) -> int:
'''
Return array shm-buffer index for for epoch time.
'''
array = self.rt_shm.array
times = array['time']
mask = (times >= time_s)
if any(mask):
return array['index'][mask][0]
# just the latest index
array['index'][-1]
def slice_from_time(
self,
array: np.ndarray,
start_t: float,
stop_t: float,
timeframe_s: int = 1,
return_data: bool = False,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
arr = {
1: self.rt_shm.array,
60: self.hist_shm.arry,
}[timeframe_s]
times = arr['time']
index = array['index']
# use advanced indexing to map the
# time range to the index range.
mask = (
(times >= start_t)
&
(times < stop_t)
)
# TODO: if we can ensure each time field has a uniform
# step we can instead do some arithmetic to determine
# the equivalent index like we used to?
# return array[
# lbar - ifirst:
# (rbar - ifirst) + 1
# ]
i_by_t = index[mask]
i_0 = i_by_t[0]
abs_slc = slice(
i_0,
i_by_t[-1],
)
# slice data by offset from the first index
# available in the passed datum set.
read_slc = slice(
0,
i_by_t[-1] - i_0,
)
if not return_data:
return (
abs_slc,
read_slc,
)
# also return the readable data from the timerange
return (
abs_slc,
read_slc,
arr[mask],
)
def view_data(
self,
plot: PlotItem,
timeframe_s: int = 1,
) -> np.ndarray:
# get far-side x-indices plot view
vr = plot.viewRect()
l = vr.left()
r = vr.right()
(
abs_slc,
buf_slc,
iv_arr,
) = self.slice_from_time(
start_t=l,
stop_t=r,
timeframe_s=timeframe_s,
return_data=True,
)
return iv_arr
async def allocate_persistent_feed(
bus: _FeedsBus,