Move `Flume.slice_from_time()` to `.data._pathops` mod func

epoch_indexing_and_dataviz_layer
Tyler Goodlet 2022-12-02 20:13:17 -05:00
parent a4392696a1
commit a33f58a61a
4 changed files with 118 additions and 111 deletions

View File

@ -30,6 +30,10 @@ from numba import (
# TODO: for ``numba`` typing.. # TODO: for ``numba`` typing..
# from ._source import numba_ohlc_dtype # from ._source import numba_ohlc_dtype
from ._m4 import ds_m4 from ._m4 import ds_m4
from .._profile import (
Profiler,
pg_profile_enabled,
)
def xy_downsample( def xy_downsample(
@ -121,7 +125,7 @@ def path_arrays_from_ohlc(
high = q['high'] high = q['high']
low = q['low'] low = q['low']
close = q['close'] close = q['close']
index = float64(q['index']) index = float64(q['time'])
# XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622 # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
# index = float64(q[index_field]) # index = float64(q[index_field])
@ -263,3 +267,105 @@ def ohlc_flatten(
num=len(flat), num=len(flat),
) )
return x, flat return x, flat
def slice_from_time(
arr: np.ndarray,
start_t: float,
stop_t: float,
) -> tuple[
slice,
slice,
np.ndarray | None,
]:
'''
Slice an input struct array to a time range and return the absolute
and "readable" slices for that array as well as the indexing mask
for the caller to use to slice the input array if needed.
'''
profiler = Profiler(
msg='slice_from_time()',
disabled=not pg_profile_enabled(),
ms_threshold=4,
# ms_threshold=ms_slower_then,
)
times = arr['time']
index = arr['index']
if (
start_t < 0
or start_t >= stop_t
):
return (
slice(
index[0],
index[-1],
),
slice(
0,
len(arr),
),
None,
)
# use advanced indexing to map the
# time range to the index range.
mask: np.ndarray = np.where(
(times >= start_t)
&
(times < stop_t)
)
profiler('advanced indexing slice')
# TODO: if we can ensure each time field has a uniform
# step we can instead do some arithmetic to determine
# the equivalent index like we used to?
# return array[
# lbar - ifirst:
# (rbar - ifirst) + 1
# ]
i_by_t = index[mask]
try:
i_0 = i_by_t[0]
i_last = i_by_t[-1]
i_first_read = index[0]
except IndexError:
if (
start_t < times[0]
or stop_t >= times[-1]
):
return (
slice(
index[0],
index[-1],
),
slice(
0,
len(arr),
),
None,
)
abs_slc = slice(i_0, i_last)
# slice data by offset from the first index
# available in the passed datum set.
read_slc = slice(
i_0 - i_first_read,
i_last - i_first_read + 1,
)
profiler(
'slicing complete'
f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
)
# also return the readable data from the timerange
return (
abs_slc,
read_slc,
mask,
)

View File

@ -48,6 +48,7 @@ from ._sharedmem import (
from ._sampling import ( from ._sampling import (
open_sample_stream, open_sample_stream,
) )
from ._pathops import slice_from_time
from .._profile import ( from .._profile import (
Profiler, Profiler,
pg_profile_enabled, pg_profile_enabled,
@ -238,108 +239,6 @@ class Flume(Struct):
# just the latest index # just the latest index
return array['index'][-1] return array['index'][-1]
def slice_from_time(
self,
arr: np.ndarray,
start_t: float,
stop_t: float,
) -> tuple[
slice,
slice,
np.ndarray | None,
]:
'''
Slice an input struct array to a time range and return the absolute
and "readable" slices for that array as well as the indexing mask
for the caller to use to slice the input array if needed.
'''
profiler = Profiler(
msg='Flume.slice_from_time()',
disabled=not pg_profile_enabled(),
ms_threshold=4,
# ms_threshold=ms_slower_then,
)
times = arr['time']
index = arr['index']
if (
start_t < 0
or start_t >= stop_t
):
return (
slice(
index[0],
index[-1],
),
slice(
0,
len(arr),
),
None,
)
# use advanced indexing to map the
# time range to the index range.
mask: np.ndarray = np.where(
(times >= start_t)
&
(times < stop_t)
)
profiler('advanced indexing slice')
# TODO: if we can ensure each time field has a uniform
# step we can instead do some arithmetic to determine
# the equivalent index like we used to?
# return array[
# lbar - ifirst:
# (rbar - ifirst) + 1
# ]
i_by_t = index[mask]
try:
i_0 = i_by_t[0]
i_last = i_by_t[-1]
i_first_read = index[0]
except IndexError:
if (
start_t < times[0]
or stop_t >= times[-1]
):
return (
slice(
index[0],
index[-1],
),
slice(
0,
len(arr),
),
None,
)
abs_slc = slice(i_0, i_last)
# slice data by offset from the first index
# available in the passed datum set.
read_slc = slice(
i_0 - i_first_read,
i_last - i_first_read + 1,
)
profiler(
'slicing complete'
f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
)
# also return the readable data from the timerange
return (
abs_slc,
read_slc,
mask,
)
# TODO: maybe move this our ``Viz`` type to avoid # TODO: maybe move this our ``Viz`` type to avoid
# the shm lookup discrepancy? # the shm lookup discrepancy?
def view_data( def view_data(
@ -366,7 +265,7 @@ class Flume(Struct):
abs_slc, abs_slc,
read_slc, read_slc,
mask, mask,
) = self.slice_from_time( ) = slice_from_time(
arr, arr,
start_t=vr.left(), start_t=vr.left(),
stop_t=vr.right(), stop_t=vr.right(),

View File

@ -66,6 +66,7 @@ from ..data.feed import (
Feed, Feed,
Flume, Flume,
) )
from ..data._pathops import slice_from_time
from ..data._source import Symbol from ..data._source import Symbol
from ..log import get_logger from ..log import get_logger
from ._interaction import ChartView from ._interaction import ChartView
@ -1037,7 +1038,7 @@ class ChartPlotWidget(pg.PlotWidget):
read_slc, read_slc,
mask, mask,
) = viz.flume.slice_from_time( ) = slice_from_time(
array, array,
start_t=vtl, start_t=vtl,
stop_t=vtr, stop_t=vtr,

View File

@ -43,7 +43,10 @@ from ..data._formatters import (
OHLCBarsAsCurveFmtr, # OHLC converted to line OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm) StepCurveFmtr, # "step" curve (like for vlm)
) )
from ..data._pathops import xy_downsample from ..data._pathops import (
xy_downsample,
slice_from_time,
)
from ._ohlc import ( from ._ohlc import (
BarItems, BarItems,
# bar_from_ohlc_row, # bar_from_ohlc_row,
@ -101,7 +104,6 @@ def render_baritems(
fmtr=OHLCBarsFmtr( fmtr=OHLCBarsFmtr(
shm=viz.shm, shm=viz.shm,
viz=viz, viz=viz,
# index_field=viz.index_field,
), ),
) )
@ -110,7 +112,6 @@ def render_baritems(
fmtr=OHLCBarsAsCurveFmtr( fmtr=OHLCBarsAsCurveFmtr(
shm=viz.shm, shm=viz.shm,
viz=viz, viz=viz,
# index_field=viz.index_field,
), ),
) )
@ -291,7 +292,7 @@ class Viz(msgspec.Struct): # , frozen=True):
abs_slc, abs_slc,
read_slc, read_slc,
mask, mask,
) = self.flume.slice_from_time( ) = slice_from_time(
arr, arr,
start_t=lbar, start_t=lbar,
stop_t=rbar, stop_t=rbar,
@ -444,7 +445,7 @@ class Viz(msgspec.Struct): # , frozen=True):
abs_slc, abs_slc,
read_slc, read_slc,
mask, mask,
) = self.flume.slice_from_time( ) = slice_from_time(
array, array,
start_t=lbar, start_t=lbar,
stop_t=rbar, stop_t=rbar,
@ -463,7 +464,7 @@ class Viz(msgspec.Struct): # , frozen=True):
# ) # )
if profiler: if profiler:
profiler( profiler(
'`Flume.slice_from_time(' '`slice_from_time('
f'start_t={lbar}' f'start_t={lbar}'
f'stop_t={rbar})' f'stop_t={rbar})'
) )