Move `Flume.slice_from_time()` to `.data._pathops` mod func
							parent
							
								
									49ea4e1ef6
								
							
						
					
					
						commit
						cd58bfb8cf
					
				| 
						 | 
					@ -30,6 +30,10 @@ from numba import (
 | 
				
			||||||
# TODO: for ``numba`` typing..
 | 
					# TODO: for ``numba`` typing..
 | 
				
			||||||
# from ._source import numba_ohlc_dtype
 | 
					# from ._source import numba_ohlc_dtype
 | 
				
			||||||
from ._m4 import ds_m4
 | 
					from ._m4 import ds_m4
 | 
				
			||||||
 | 
					from .._profile import (
 | 
				
			||||||
 | 
					    Profiler,
 | 
				
			||||||
 | 
					    pg_profile_enabled,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def xy_downsample(
 | 
					def xy_downsample(
 | 
				
			||||||
| 
						 | 
					@ -121,7 +125,7 @@ def path_arrays_from_ohlc(
 | 
				
			||||||
        high = q['high']
 | 
					        high = q['high']
 | 
				
			||||||
        low = q['low']
 | 
					        low = q['low']
 | 
				
			||||||
        close = q['close']
 | 
					        close = q['close']
 | 
				
			||||||
        index = float64(q['index'])
 | 
					        index = float64(q['time'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
 | 
					        # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
 | 
				
			||||||
        # index = float64(q[index_field])
 | 
					        # index = float64(q[index_field])
 | 
				
			||||||
| 
						 | 
					@ -263,3 +267,105 @@ def ohlc_flatten(
 | 
				
			||||||
            num=len(flat),
 | 
					            num=len(flat),
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
    return x, flat
 | 
					    return x, flat
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def slice_from_time(
 | 
				
			||||||
 | 
					    arr: np.ndarray,
 | 
				
			||||||
 | 
					    start_t: float,
 | 
				
			||||||
 | 
					    stop_t: float,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> tuple[
 | 
				
			||||||
 | 
					    slice,
 | 
				
			||||||
 | 
					    slice,
 | 
				
			||||||
 | 
					    np.ndarray | None,
 | 
				
			||||||
 | 
					]:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Slice an input struct array to a time range and return the absolute
 | 
				
			||||||
 | 
					    and "readable" slices for that array as well as the indexing mask
 | 
				
			||||||
 | 
					    for the caller to use to slice the input array if needed.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    profiler = Profiler(
 | 
				
			||||||
 | 
					        msg='slice_from_time()',
 | 
				
			||||||
 | 
					        disabled=not pg_profile_enabled(),
 | 
				
			||||||
 | 
					        ms_threshold=4,
 | 
				
			||||||
 | 
					        # ms_threshold=ms_slower_then,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    times = arr['time']
 | 
				
			||||||
 | 
					    index = arr['index']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (
 | 
				
			||||||
 | 
					        start_t < 0
 | 
				
			||||||
 | 
					        or start_t >= stop_t
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					        return (
 | 
				
			||||||
 | 
					            slice(
 | 
				
			||||||
 | 
					                index[0],
 | 
				
			||||||
 | 
					                index[-1],
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					            slice(
 | 
				
			||||||
 | 
					                0,
 | 
				
			||||||
 | 
					                len(arr),
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					            None,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # use advanced indexing to map the
 | 
				
			||||||
 | 
					    # time range to the index range.
 | 
				
			||||||
 | 
					    mask: np.ndarray = np.where(
 | 
				
			||||||
 | 
					        (times >= start_t)
 | 
				
			||||||
 | 
					        &
 | 
				
			||||||
 | 
					        (times < stop_t)
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    profiler('advanced indexing slice')
 | 
				
			||||||
 | 
					    # TODO: if we can ensure each time field has a uniform
 | 
				
			||||||
 | 
					    # step we can instead do some arithmetic to determine
 | 
				
			||||||
 | 
					    # the equivalent index like we used to?
 | 
				
			||||||
 | 
					    # return array[
 | 
				
			||||||
 | 
					    #     lbar - ifirst:
 | 
				
			||||||
 | 
					    #     (rbar - ifirst) + 1
 | 
				
			||||||
 | 
					    # ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    i_by_t = index[mask]
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        i_0 = i_by_t[0]
 | 
				
			||||||
 | 
					        i_last = i_by_t[-1]
 | 
				
			||||||
 | 
					        i_first_read = index[0]
 | 
				
			||||||
 | 
					    except IndexError:
 | 
				
			||||||
 | 
					        if (
 | 
				
			||||||
 | 
					            start_t < times[0]
 | 
				
			||||||
 | 
					            or stop_t >= times[-1]
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					            return (
 | 
				
			||||||
 | 
					                slice(
 | 
				
			||||||
 | 
					                    index[0],
 | 
				
			||||||
 | 
					                    index[-1],
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                slice(
 | 
				
			||||||
 | 
					                    0,
 | 
				
			||||||
 | 
					                    len(arr),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                None,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    abs_slc = slice(i_0, i_last)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # slice data by offset from the first index
 | 
				
			||||||
 | 
					    # available in the passed datum set.
 | 
				
			||||||
 | 
					    read_slc = slice(
 | 
				
			||||||
 | 
					        i_0 - i_first_read,
 | 
				
			||||||
 | 
					        i_last - i_first_read + 1,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    profiler(
 | 
				
			||||||
 | 
					        'slicing complete'
 | 
				
			||||||
 | 
					        f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
 | 
				
			||||||
 | 
					        f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    # also return the readable data from the timerange
 | 
				
			||||||
 | 
					    return (
 | 
				
			||||||
 | 
					        abs_slc,
 | 
				
			||||||
 | 
					        read_slc,
 | 
				
			||||||
 | 
					        mask,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -48,6 +48,7 @@ from ._sharedmem import (
 | 
				
			||||||
from ._sampling import (
 | 
					from ._sampling import (
 | 
				
			||||||
    iter_ohlc_periods,
 | 
					    iter_ohlc_periods,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					from ._pathops import slice_from_time
 | 
				
			||||||
from .._profile import (
 | 
					from .._profile import (
 | 
				
			||||||
    Profiler,
 | 
					    Profiler,
 | 
				
			||||||
    pg_profile_enabled,
 | 
					    pg_profile_enabled,
 | 
				
			||||||
| 
						 | 
					@ -251,108 +252,6 @@ class Flume(Struct):
 | 
				
			||||||
        # just the latest index
 | 
					        # just the latest index
 | 
				
			||||||
        return array['index'][-1]
 | 
					        return array['index'][-1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def slice_from_time(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        arr: np.ndarray,
 | 
					 | 
				
			||||||
        start_t: float,
 | 
					 | 
				
			||||||
        stop_t: float,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ) -> tuple[
 | 
					 | 
				
			||||||
        slice,
 | 
					 | 
				
			||||||
        slice,
 | 
					 | 
				
			||||||
        np.ndarray | None,
 | 
					 | 
				
			||||||
    ]:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Slice an input struct array to a time range and return the absolute
 | 
					 | 
				
			||||||
        and "readable" slices for that array as well as the indexing mask
 | 
					 | 
				
			||||||
        for the caller to use to slice the input array if needed.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        profiler = Profiler(
 | 
					 | 
				
			||||||
            msg='Flume.slice_from_time()',
 | 
					 | 
				
			||||||
            disabled=not pg_profile_enabled(),
 | 
					 | 
				
			||||||
            ms_threshold=4,
 | 
					 | 
				
			||||||
            # ms_threshold=ms_slower_then,
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        times = arr['time']
 | 
					 | 
				
			||||||
        index = arr['index']
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if (
 | 
					 | 
				
			||||||
            start_t < 0
 | 
					 | 
				
			||||||
            or start_t >= stop_t
 | 
					 | 
				
			||||||
        ):
 | 
					 | 
				
			||||||
            return (
 | 
					 | 
				
			||||||
                slice(
 | 
					 | 
				
			||||||
                    index[0],
 | 
					 | 
				
			||||||
                    index[-1],
 | 
					 | 
				
			||||||
                ),
 | 
					 | 
				
			||||||
                slice(
 | 
					 | 
				
			||||||
                    0,
 | 
					 | 
				
			||||||
                    len(arr),
 | 
					 | 
				
			||||||
                ),
 | 
					 | 
				
			||||||
                None,
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # use advanced indexing to map the
 | 
					 | 
				
			||||||
        # time range to the index range.
 | 
					 | 
				
			||||||
        mask: np.ndarray = np.where(
 | 
					 | 
				
			||||||
            (times >= start_t)
 | 
					 | 
				
			||||||
            &
 | 
					 | 
				
			||||||
            (times < stop_t)
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        profiler('advanced indexing slice')
 | 
					 | 
				
			||||||
        # TODO: if we can ensure each time field has a uniform
 | 
					 | 
				
			||||||
        # step we can instead do some arithmetic to determine
 | 
					 | 
				
			||||||
        # the equivalent index like we used to?
 | 
					 | 
				
			||||||
        # return array[
 | 
					 | 
				
			||||||
        #     lbar - ifirst:
 | 
					 | 
				
			||||||
        #     (rbar - ifirst) + 1
 | 
					 | 
				
			||||||
        # ]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        i_by_t = index[mask]
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            i_0 = i_by_t[0]
 | 
					 | 
				
			||||||
            i_last = i_by_t[-1]
 | 
					 | 
				
			||||||
            i_first_read = index[0]
 | 
					 | 
				
			||||||
        except IndexError:
 | 
					 | 
				
			||||||
            if (
 | 
					 | 
				
			||||||
                start_t < times[0]
 | 
					 | 
				
			||||||
                or stop_t >= times[-1]
 | 
					 | 
				
			||||||
            ):
 | 
					 | 
				
			||||||
                return (
 | 
					 | 
				
			||||||
                    slice(
 | 
					 | 
				
			||||||
                        index[0],
 | 
					 | 
				
			||||||
                        index[-1],
 | 
					 | 
				
			||||||
                    ),
 | 
					 | 
				
			||||||
                    slice(
 | 
					 | 
				
			||||||
                        0,
 | 
					 | 
				
			||||||
                        len(arr),
 | 
					 | 
				
			||||||
                    ),
 | 
					 | 
				
			||||||
                    None,
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        abs_slc = slice(i_0, i_last)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # slice data by offset from the first index
 | 
					 | 
				
			||||||
        # available in the passed datum set.
 | 
					 | 
				
			||||||
        read_slc = slice(
 | 
					 | 
				
			||||||
            i_0 - i_first_read,
 | 
					 | 
				
			||||||
            i_last - i_first_read + 1,
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        profiler(
 | 
					 | 
				
			||||||
            'slicing complete'
 | 
					 | 
				
			||||||
            f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
 | 
					 | 
				
			||||||
            f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        # also return the readable data from the timerange
 | 
					 | 
				
			||||||
        return (
 | 
					 | 
				
			||||||
            abs_slc,
 | 
					 | 
				
			||||||
            read_slc,
 | 
					 | 
				
			||||||
            mask,
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # TODO: maybe move this our ``Viz`` type to avoid
 | 
					    # TODO: maybe move this our ``Viz`` type to avoid
 | 
				
			||||||
    # the shm lookup discrepancy?
 | 
					    # the shm lookup discrepancy?
 | 
				
			||||||
    def view_data(
 | 
					    def view_data(
 | 
				
			||||||
| 
						 | 
					@ -379,7 +278,7 @@ class Flume(Struct):
 | 
				
			||||||
            abs_slc,
 | 
					            abs_slc,
 | 
				
			||||||
            read_slc,
 | 
					            read_slc,
 | 
				
			||||||
            mask,
 | 
					            mask,
 | 
				
			||||||
        ) = self.slice_from_time(
 | 
					        ) = slice_from_time(
 | 
				
			||||||
            arr,
 | 
					            arr,
 | 
				
			||||||
            start_t=vr.left(),
 | 
					            start_t=vr.left(),
 | 
				
			||||||
            stop_t=vr.right(),
 | 
					            stop_t=vr.right(),
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -66,6 +66,7 @@ from ..data.feed import (
 | 
				
			||||||
    Feed,
 | 
					    Feed,
 | 
				
			||||||
    Flume,
 | 
					    Flume,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					from ..data._pathops import slice_from_time
 | 
				
			||||||
from ..data._source import Symbol
 | 
					from ..data._source import Symbol
 | 
				
			||||||
from ..log import get_logger
 | 
					from ..log import get_logger
 | 
				
			||||||
from ._interaction import ChartView
 | 
					from ._interaction import ChartView
 | 
				
			||||||
| 
						 | 
					@ -1037,7 +1038,7 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
				
			||||||
                read_slc,
 | 
					                read_slc,
 | 
				
			||||||
                mask,
 | 
					                mask,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            ) = viz.flume.slice_from_time(
 | 
					            ) = slice_from_time(
 | 
				
			||||||
                array,
 | 
					                array,
 | 
				
			||||||
                start_t=vtl,
 | 
					                start_t=vtl,
 | 
				
			||||||
                stop_t=vtr,
 | 
					                stop_t=vtr,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -43,7 +43,10 @@ from ..data._formatters import (
 | 
				
			||||||
    OHLCBarsAsCurveFmtr,  # OHLC converted to line
 | 
					    OHLCBarsAsCurveFmtr,  # OHLC converted to line
 | 
				
			||||||
    StepCurveFmtr,  # "step" curve (like for vlm)
 | 
					    StepCurveFmtr,  # "step" curve (like for vlm)
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from ..data._pathops import xy_downsample
 | 
					from ..data._pathops import (
 | 
				
			||||||
 | 
					    xy_downsample,
 | 
				
			||||||
 | 
					    slice_from_time,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
from ._ohlc import (
 | 
					from ._ohlc import (
 | 
				
			||||||
    BarItems,
 | 
					    BarItems,
 | 
				
			||||||
    # bar_from_ohlc_row,
 | 
					    # bar_from_ohlc_row,
 | 
				
			||||||
| 
						 | 
					@ -101,7 +104,6 @@ def render_baritems(
 | 
				
			||||||
            fmtr=OHLCBarsFmtr(
 | 
					            fmtr=OHLCBarsFmtr(
 | 
				
			||||||
                shm=viz.shm,
 | 
					                shm=viz.shm,
 | 
				
			||||||
                viz=viz,
 | 
					                viz=viz,
 | 
				
			||||||
                # index_field=viz.index_field,
 | 
					 | 
				
			||||||
            ),
 | 
					            ),
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -110,7 +112,6 @@ def render_baritems(
 | 
				
			||||||
            fmtr=OHLCBarsAsCurveFmtr(
 | 
					            fmtr=OHLCBarsAsCurveFmtr(
 | 
				
			||||||
                shm=viz.shm,
 | 
					                shm=viz.shm,
 | 
				
			||||||
                viz=viz,
 | 
					                viz=viz,
 | 
				
			||||||
                # index_field=viz.index_field,
 | 
					 | 
				
			||||||
            ),
 | 
					            ),
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -291,7 +292,7 @@ class Viz(msgspec.Struct):  # , frozen=True):
 | 
				
			||||||
                abs_slc,
 | 
					                abs_slc,
 | 
				
			||||||
                read_slc,
 | 
					                read_slc,
 | 
				
			||||||
                mask,
 | 
					                mask,
 | 
				
			||||||
            ) = self.flume.slice_from_time(
 | 
					            ) = slice_from_time(
 | 
				
			||||||
                arr,
 | 
					                arr,
 | 
				
			||||||
                start_t=lbar,
 | 
					                start_t=lbar,
 | 
				
			||||||
                stop_t=rbar,
 | 
					                stop_t=rbar,
 | 
				
			||||||
| 
						 | 
					@ -444,7 +445,7 @@ class Viz(msgspec.Struct):  # , frozen=True):
 | 
				
			||||||
                abs_slc,
 | 
					                abs_slc,
 | 
				
			||||||
                read_slc,
 | 
					                read_slc,
 | 
				
			||||||
                mask,
 | 
					                mask,
 | 
				
			||||||
            ) = self.flume.slice_from_time(
 | 
					            ) = slice_from_time(
 | 
				
			||||||
                array,
 | 
					                array,
 | 
				
			||||||
                start_t=lbar,
 | 
					                start_t=lbar,
 | 
				
			||||||
                stop_t=rbar,
 | 
					                stop_t=rbar,
 | 
				
			||||||
| 
						 | 
					@ -463,7 +464,7 @@ class Viz(msgspec.Struct):  # , frozen=True):
 | 
				
			||||||
            #     )
 | 
					            #     )
 | 
				
			||||||
            if profiler:
 | 
					            if profiler:
 | 
				
			||||||
                profiler(
 | 
					                profiler(
 | 
				
			||||||
                    '`Flume.slice_from_time('
 | 
					                    '`slice_from_time('
 | 
				
			||||||
                    f'start_t={lbar}'
 | 
					                    f'start_t={lbar}'
 | 
				
			||||||
                    f'stop_t={rbar})'
 | 
					                    f'stop_t={rbar})'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue