Move `Flume.slice_from_time()` to `.data._pathops` mod func
parent
152c9e2c98
commit
a87f062a26
|
@ -30,6 +30,10 @@ from numba import (
|
||||||
# TODO: for ``numba`` typing..
|
# TODO: for ``numba`` typing..
|
||||||
# from ._source import numba_ohlc_dtype
|
# from ._source import numba_ohlc_dtype
|
||||||
from ._m4 import ds_m4
|
from ._m4 import ds_m4
|
||||||
|
from .._profile import (
|
||||||
|
Profiler,
|
||||||
|
pg_profile_enabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def xy_downsample(
|
def xy_downsample(
|
||||||
|
@ -121,7 +125,7 @@ def path_arrays_from_ohlc(
|
||||||
high = q['high']
|
high = q['high']
|
||||||
low = q['low']
|
low = q['low']
|
||||||
close = q['close']
|
close = q['close']
|
||||||
index = float64(q['index'])
|
index = float64(q['time'])
|
||||||
|
|
||||||
# XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
|
# XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
|
||||||
# index = float64(q[index_field])
|
# index = float64(q[index_field])
|
||||||
|
@ -263,3 +267,105 @@ def ohlc_flatten(
|
||||||
num=len(flat),
|
num=len(flat),
|
||||||
)
|
)
|
||||||
return x, flat
|
return x, flat
|
||||||
|
|
||||||
|
|
||||||
|
def slice_from_time(
|
||||||
|
arr: np.ndarray,
|
||||||
|
start_t: float,
|
||||||
|
stop_t: float,
|
||||||
|
|
||||||
|
) -> tuple[
|
||||||
|
slice,
|
||||||
|
slice,
|
||||||
|
np.ndarray | None,
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Slice an input struct array to a time range and return the absolute
|
||||||
|
and "readable" slices for that array as well as the indexing mask
|
||||||
|
for the caller to use to slice the input array if needed.
|
||||||
|
|
||||||
|
'''
|
||||||
|
profiler = Profiler(
|
||||||
|
msg='slice_from_time()',
|
||||||
|
disabled=not pg_profile_enabled(),
|
||||||
|
ms_threshold=4,
|
||||||
|
# ms_threshold=ms_slower_then,
|
||||||
|
)
|
||||||
|
|
||||||
|
times = arr['time']
|
||||||
|
index = arr['index']
|
||||||
|
|
||||||
|
if (
|
||||||
|
start_t < 0
|
||||||
|
or start_t >= stop_t
|
||||||
|
):
|
||||||
|
return (
|
||||||
|
slice(
|
||||||
|
index[0],
|
||||||
|
index[-1],
|
||||||
|
),
|
||||||
|
slice(
|
||||||
|
0,
|
||||||
|
len(arr),
|
||||||
|
),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# use advanced indexing to map the
|
||||||
|
# time range to the index range.
|
||||||
|
mask: np.ndarray = np.where(
|
||||||
|
(times >= start_t)
|
||||||
|
&
|
||||||
|
(times < stop_t)
|
||||||
|
)
|
||||||
|
profiler('advanced indexing slice')
|
||||||
|
# TODO: if we can ensure each time field has a uniform
|
||||||
|
# step we can instead do some arithmetic to determine
|
||||||
|
# the equivalent index like we used to?
|
||||||
|
# return array[
|
||||||
|
# lbar - ifirst:
|
||||||
|
# (rbar - ifirst) + 1
|
||||||
|
# ]
|
||||||
|
|
||||||
|
i_by_t = index[mask]
|
||||||
|
try:
|
||||||
|
i_0 = i_by_t[0]
|
||||||
|
i_last = i_by_t[-1]
|
||||||
|
i_first_read = index[0]
|
||||||
|
except IndexError:
|
||||||
|
if (
|
||||||
|
start_t < times[0]
|
||||||
|
or stop_t >= times[-1]
|
||||||
|
):
|
||||||
|
return (
|
||||||
|
slice(
|
||||||
|
index[0],
|
||||||
|
index[-1],
|
||||||
|
),
|
||||||
|
slice(
|
||||||
|
0,
|
||||||
|
len(arr),
|
||||||
|
),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
|
abs_slc = slice(i_0, i_last)
|
||||||
|
|
||||||
|
# slice data by offset from the first index
|
||||||
|
# available in the passed datum set.
|
||||||
|
read_slc = slice(
|
||||||
|
i_0 - i_first_read,
|
||||||
|
i_last - i_first_read + 1,
|
||||||
|
)
|
||||||
|
|
||||||
|
profiler(
|
||||||
|
'slicing complete'
|
||||||
|
f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
|
||||||
|
f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
|
||||||
|
)
|
||||||
|
# also return the readable data from the timerange
|
||||||
|
return (
|
||||||
|
abs_slc,
|
||||||
|
read_slc,
|
||||||
|
mask,
|
||||||
|
)
|
||||||
|
|
|
@ -48,6 +48,7 @@ from ._sharedmem import (
|
||||||
from ._sampling import (
|
from ._sampling import (
|
||||||
open_sample_stream,
|
open_sample_stream,
|
||||||
)
|
)
|
||||||
|
from ._pathops import slice_from_time
|
||||||
from .._profile import (
|
from .._profile import (
|
||||||
Profiler,
|
Profiler,
|
||||||
pg_profile_enabled,
|
pg_profile_enabled,
|
||||||
|
@ -238,108 +239,6 @@ class Flume(Struct):
|
||||||
# just the latest index
|
# just the latest index
|
||||||
return array['index'][-1]
|
return array['index'][-1]
|
||||||
|
|
||||||
def slice_from_time(
|
|
||||||
self,
|
|
||||||
arr: np.ndarray,
|
|
||||||
start_t: float,
|
|
||||||
stop_t: float,
|
|
||||||
|
|
||||||
) -> tuple[
|
|
||||||
slice,
|
|
||||||
slice,
|
|
||||||
np.ndarray | None,
|
|
||||||
]:
|
|
||||||
'''
|
|
||||||
Slice an input struct array to a time range and return the absolute
|
|
||||||
and "readable" slices for that array as well as the indexing mask
|
|
||||||
for the caller to use to slice the input array if needed.
|
|
||||||
|
|
||||||
'''
|
|
||||||
profiler = Profiler(
|
|
||||||
msg='Flume.slice_from_time()',
|
|
||||||
disabled=not pg_profile_enabled(),
|
|
||||||
ms_threshold=4,
|
|
||||||
# ms_threshold=ms_slower_then,
|
|
||||||
)
|
|
||||||
|
|
||||||
times = arr['time']
|
|
||||||
index = arr['index']
|
|
||||||
|
|
||||||
if (
|
|
||||||
start_t < 0
|
|
||||||
or start_t >= stop_t
|
|
||||||
):
|
|
||||||
return (
|
|
||||||
slice(
|
|
||||||
index[0],
|
|
||||||
index[-1],
|
|
||||||
),
|
|
||||||
slice(
|
|
||||||
0,
|
|
||||||
len(arr),
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
|
|
||||||
# use advanced indexing to map the
|
|
||||||
# time range to the index range.
|
|
||||||
mask: np.ndarray = np.where(
|
|
||||||
(times >= start_t)
|
|
||||||
&
|
|
||||||
(times < stop_t)
|
|
||||||
)
|
|
||||||
profiler('advanced indexing slice')
|
|
||||||
# TODO: if we can ensure each time field has a uniform
|
|
||||||
# step we can instead do some arithmetic to determine
|
|
||||||
# the equivalent index like we used to?
|
|
||||||
# return array[
|
|
||||||
# lbar - ifirst:
|
|
||||||
# (rbar - ifirst) + 1
|
|
||||||
# ]
|
|
||||||
|
|
||||||
i_by_t = index[mask]
|
|
||||||
try:
|
|
||||||
i_0 = i_by_t[0]
|
|
||||||
i_last = i_by_t[-1]
|
|
||||||
i_first_read = index[0]
|
|
||||||
except IndexError:
|
|
||||||
if (
|
|
||||||
start_t < times[0]
|
|
||||||
or stop_t >= times[-1]
|
|
||||||
):
|
|
||||||
return (
|
|
||||||
slice(
|
|
||||||
index[0],
|
|
||||||
index[-1],
|
|
||||||
),
|
|
||||||
slice(
|
|
||||||
0,
|
|
||||||
len(arr),
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
|
|
||||||
abs_slc = slice(i_0, i_last)
|
|
||||||
|
|
||||||
# slice data by offset from the first index
|
|
||||||
# available in the passed datum set.
|
|
||||||
read_slc = slice(
|
|
||||||
i_0 - i_first_read,
|
|
||||||
i_last - i_first_read + 1,
|
|
||||||
)
|
|
||||||
|
|
||||||
profiler(
|
|
||||||
'slicing complete'
|
|
||||||
f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
|
|
||||||
f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
|
|
||||||
)
|
|
||||||
# also return the readable data from the timerange
|
|
||||||
return (
|
|
||||||
abs_slc,
|
|
||||||
read_slc,
|
|
||||||
mask,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: maybe move this our ``Viz`` type to avoid
|
# TODO: maybe move this our ``Viz`` type to avoid
|
||||||
# the shm lookup discrepancy?
|
# the shm lookup discrepancy?
|
||||||
def view_data(
|
def view_data(
|
||||||
|
@ -366,7 +265,7 @@ class Flume(Struct):
|
||||||
abs_slc,
|
abs_slc,
|
||||||
read_slc,
|
read_slc,
|
||||||
mask,
|
mask,
|
||||||
) = self.slice_from_time(
|
) = slice_from_time(
|
||||||
arr,
|
arr,
|
||||||
start_t=vr.left(),
|
start_t=vr.left(),
|
||||||
stop_t=vr.right(),
|
stop_t=vr.right(),
|
||||||
|
|
|
@ -66,6 +66,7 @@ from ..data.feed import (
|
||||||
Feed,
|
Feed,
|
||||||
Flume,
|
Flume,
|
||||||
)
|
)
|
||||||
|
from ..data._pathops import slice_from_time
|
||||||
from ..data._source import Symbol
|
from ..data._source import Symbol
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ._interaction import ChartView
|
from ._interaction import ChartView
|
||||||
|
@ -1037,7 +1038,7 @@ class ChartPlotWidget(pg.PlotWidget):
|
||||||
read_slc,
|
read_slc,
|
||||||
mask,
|
mask,
|
||||||
|
|
||||||
) = viz.flume.slice_from_time(
|
) = slice_from_time(
|
||||||
array,
|
array,
|
||||||
start_t=vtl,
|
start_t=vtl,
|
||||||
stop_t=vtr,
|
stop_t=vtr,
|
||||||
|
|
|
@ -43,7 +43,10 @@ from ..data._formatters import (
|
||||||
OHLCBarsAsCurveFmtr, # OHLC converted to line
|
OHLCBarsAsCurveFmtr, # OHLC converted to line
|
||||||
StepCurveFmtr, # "step" curve (like for vlm)
|
StepCurveFmtr, # "step" curve (like for vlm)
|
||||||
)
|
)
|
||||||
from ..data._pathops import xy_downsample
|
from ..data._pathops import (
|
||||||
|
xy_downsample,
|
||||||
|
slice_from_time,
|
||||||
|
)
|
||||||
from ._ohlc import (
|
from ._ohlc import (
|
||||||
BarItems,
|
BarItems,
|
||||||
# bar_from_ohlc_row,
|
# bar_from_ohlc_row,
|
||||||
|
@ -101,7 +104,6 @@ def render_baritems(
|
||||||
fmtr=OHLCBarsFmtr(
|
fmtr=OHLCBarsFmtr(
|
||||||
shm=viz.shm,
|
shm=viz.shm,
|
||||||
viz=viz,
|
viz=viz,
|
||||||
# index_field=viz.index_field,
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -110,7 +112,6 @@ def render_baritems(
|
||||||
fmtr=OHLCBarsAsCurveFmtr(
|
fmtr=OHLCBarsAsCurveFmtr(
|
||||||
shm=viz.shm,
|
shm=viz.shm,
|
||||||
viz=viz,
|
viz=viz,
|
||||||
# index_field=viz.index_field,
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -291,7 +292,7 @@ class Viz(msgspec.Struct): # , frozen=True):
|
||||||
abs_slc,
|
abs_slc,
|
||||||
read_slc,
|
read_slc,
|
||||||
mask,
|
mask,
|
||||||
) = self.flume.slice_from_time(
|
) = slice_from_time(
|
||||||
arr,
|
arr,
|
||||||
start_t=lbar,
|
start_t=lbar,
|
||||||
stop_t=rbar,
|
stop_t=rbar,
|
||||||
|
@ -444,7 +445,7 @@ class Viz(msgspec.Struct): # , frozen=True):
|
||||||
abs_slc,
|
abs_slc,
|
||||||
read_slc,
|
read_slc,
|
||||||
mask,
|
mask,
|
||||||
) = self.flume.slice_from_time(
|
) = slice_from_time(
|
||||||
array,
|
array,
|
||||||
start_t=lbar,
|
start_t=lbar,
|
||||||
stop_t=rbar,
|
stop_t=rbar,
|
||||||
|
@ -463,7 +464,7 @@ class Viz(msgspec.Struct): # , frozen=True):
|
||||||
# )
|
# )
|
||||||
if profiler:
|
if profiler:
|
||||||
profiler(
|
profiler(
|
||||||
'`Flume.slice_from_time('
|
'`slice_from_time('
|
||||||
f'start_t={lbar}'
|
f'start_t={lbar}'
|
||||||
f'stop_t={rbar})'
|
f'stop_t={rbar})'
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue