First attempt, field-index agnostic formatting
Remove harcoded `'index'` field refs from all formatters in a first attempt at moving towards epoch-time alignment (though don't actually use it it yet). Adjustments to the formatter interface: - property for `.xy_nd` the x/y nd arrays. - property for and `.xy_slice` the nd format array(s) start->stop index slice. Internal routine tweaks: - drop `read_src_from_key` and always pass full source array on updates and adjust handlers to expect to have to index the data field of interest. - set `.last_read` right after update calls instead of after 1d conversion. - drop `slice_to_head` array read slicing. - add some debug points for testing 'time' indexing (though not used here yet). - add `.x_nd` array update logic for when the `.index_field` is not 'index' - i.e. when we begin to try and support epoch time. - simplify some new y_nd updates to not require use of `np.broadcast()` where possible.pre_viz_calls
							parent
							
								
									05a7a06416
								
							
						
					
					
						commit
						99b1230442
					
				| 
						 | 
				
			
			@ -27,14 +27,12 @@ import msgspec
 | 
			
		|||
import numpy as np
 | 
			
		||||
from numpy.lib import recfunctions as rfn
 | 
			
		||||
from numba import (
 | 
			
		||||
    types,
 | 
			
		||||
    # types,
 | 
			
		||||
    njit,
 | 
			
		||||
    float64,
 | 
			
		||||
    int64,
 | 
			
		||||
    optional,
 | 
			
		||||
    # optional,
 | 
			
		||||
)
 | 
			
		||||
from numba.core.types.misc import StringLiteral
 | 
			
		||||
# from numba.extending import as_numba_type
 | 
			
		||||
 | 
			
		||||
from ._sharedmem import (
 | 
			
		||||
    ShmArray,
 | 
			
		||||
| 
						 | 
				
			
			@ -77,6 +75,40 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
    def last_read(self) -> tuple | None:
 | 
			
		||||
        return self._last_read
 | 
			
		||||
 | 
			
		||||
    # Incrementally updated xy ndarray formatted data, a pre-1d
 | 
			
		||||
    # format which is updated and cached independently of the final
 | 
			
		||||
    # pre-graphics-path 1d format.
 | 
			
		||||
    x_nd: Optional[np.ndarray] = None
 | 
			
		||||
    y_nd: Optional[np.ndarray] = None
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def xy_nd(self) -> tuple[np.ndarray, np.ndarray]:
 | 
			
		||||
        return (
 | 
			
		||||
            self.x_nd[self.xy_slice],
 | 
			
		||||
            self.y_nd[self.xy_slice],
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def xy_slice(self) -> slice:
 | 
			
		||||
        return slice(
 | 
			
		||||
            self.xy_nd_start,
 | 
			
		||||
            self.xy_nd_stop,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # indexes which slice into the above arrays (which are allocated
 | 
			
		||||
    # based on source data shm input size) and allow retrieving
 | 
			
		||||
    # incrementally updated data.
 | 
			
		||||
    xy_nd_start: int = 0
 | 
			
		||||
    xy_nd_stop: int = 0
 | 
			
		||||
 | 
			
		||||
    # TODO: eventually incrementally update 1d-pre-graphics path data?
 | 
			
		||||
    # x_1d: Optional[np.ndarray] = None
 | 
			
		||||
    # y_1d: Optional[np.ndarray] = None
 | 
			
		||||
 | 
			
		||||
    # incremental view-change state(s) tracking
 | 
			
		||||
    _last_vr: tuple[float, float] | None = None
 | 
			
		||||
    _last_ivdr: tuple[float, float] | None = None
 | 
			
		||||
 | 
			
		||||
    def __repr__(self) -> str:
 | 
			
		||||
        msg = (
 | 
			
		||||
            f'{type(self)}: ->\n\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -86,8 +118,8 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
            f'last_vr={self._last_vr}\n'
 | 
			
		||||
            f'last_ivdr={self._last_ivdr}\n\n'
 | 
			
		||||
 | 
			
		||||
            f'xy_nd_start={self.xy_nd_start}\n'
 | 
			
		||||
            f'xy_nd_stop={self.xy_nd_stop}\n\n'
 | 
			
		||||
            f'xy_slice={self.xy_slice}\n'
 | 
			
		||||
            # f'xy_nd_stop={self.xy_nd_stop}\n\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        x_nd_len = 0
 | 
			
		||||
| 
						 | 
				
			
			@ -137,6 +169,12 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
        prepend_length = int(last_xfirst - xfirst)
 | 
			
		||||
        append_length = int(xlast - last_xlast)
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            prepend_length < 0
 | 
			
		||||
            or append_length < 0
 | 
			
		||||
        ):
 | 
			
		||||
            breakpoint()
 | 
			
		||||
 | 
			
		||||
        # blah blah blah
 | 
			
		||||
        # do diffing for prepend, append and last entry
 | 
			
		||||
        return (
 | 
			
		||||
| 
						 | 
				
			
			@ -146,26 +184,6 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
            slice(last_xlast, xlast),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # Incrementally updated xy ndarray formatted data, a pre-1d
 | 
			
		||||
    # format which is updated and cached independently of the final
 | 
			
		||||
    # pre-graphics-path 1d format.
 | 
			
		||||
    x_nd: Optional[np.ndarray] = None
 | 
			
		||||
    y_nd: Optional[np.ndarray] = None
 | 
			
		||||
 | 
			
		||||
    # indexes which slice into the above arrays (which are allocated
 | 
			
		||||
    # based on source data shm input size) and allow retrieving
 | 
			
		||||
    # incrementally updated data.
 | 
			
		||||
    xy_nd_start: int = 0
 | 
			
		||||
    xy_nd_stop: int = 0
 | 
			
		||||
 | 
			
		||||
    # TODO: eventually incrementally update 1d-pre-graphics path data?
 | 
			
		||||
    # x_1d: Optional[np.ndarray] = None
 | 
			
		||||
    # y_1d: Optional[np.ndarray] = None
 | 
			
		||||
 | 
			
		||||
    # incremental view-change state(s) tracking
 | 
			
		||||
    _last_vr: tuple[float, float] | None = None
 | 
			
		||||
    _last_ivdr: tuple[float, float] | None = None
 | 
			
		||||
 | 
			
		||||
    def _track_inview_range(
 | 
			
		||||
        self,
 | 
			
		||||
        view_range: tuple[int, int],
 | 
			
		||||
| 
						 | 
				
			
			@ -244,18 +262,18 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
        if self.y_nd is None:
 | 
			
		||||
            # we first need to allocate xy data arrays
 | 
			
		||||
            # from the source data.
 | 
			
		||||
            self.xy_nd_start = shm._first.value
 | 
			
		||||
            self.xy_nd_stop = shm._last.value
 | 
			
		||||
            self.x_nd, self.y_nd = self.allocate_xy_nd(
 | 
			
		||||
                shm,
 | 
			
		||||
                array_key,
 | 
			
		||||
            )
 | 
			
		||||
            self.xy_nd_start = shm._first.value
 | 
			
		||||
            self.xy_nd_stop = shm._last.value
 | 
			
		||||
            profiler('allocated xy history')
 | 
			
		||||
 | 
			
		||||
        if prepend_len:
 | 
			
		||||
            y_prepend = shm._array[pre_slice]
 | 
			
		||||
            if read_src_from_key:
 | 
			
		||||
                y_prepend = y_prepend[array_key]
 | 
			
		||||
            # if read_src_from_key:
 | 
			
		||||
            #     y_prepend = y_prepend[array_key]
 | 
			
		||||
 | 
			
		||||
            (
 | 
			
		||||
                new_y_nd,
 | 
			
		||||
| 
						 | 
				
			
			@ -292,8 +310,8 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
 | 
			
		||||
        if append_len:
 | 
			
		||||
            y_append = shm._array[post_slice]
 | 
			
		||||
            if read_src_from_key:
 | 
			
		||||
                y_append = y_append[array_key]
 | 
			
		||||
            # if read_src_from_key:
 | 
			
		||||
            #     y_append = y_append[array_key]
 | 
			
		||||
 | 
			
		||||
            (
 | 
			
		||||
                new_y_nd,
 | 
			
		||||
| 
						 | 
				
			
			@ -314,14 +332,16 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
            # self.y_nd[post_slice] = new_y_nd
 | 
			
		||||
            # self.y_nd[xy_slice or post_slice] = xy_data
 | 
			
		||||
            self.y_nd[y_nd_slc] = new_y_nd
 | 
			
		||||
            # if read_src_from_key:
 | 
			
		||||
            #     y_nd_view[:][array_key] = new_y_nd
 | 
			
		||||
            # else:
 | 
			
		||||
            #     y_nd_view[:] = new_y_nd
 | 
			
		||||
 | 
			
		||||
            self.xy_nd_stop = shm._last.value
 | 
			
		||||
            profiler('appened xy history: {append_length}')
 | 
			
		||||
 | 
			
		||||
        # TODO: eventually maybe we can implement some kind of
 | 
			
		||||
        # transform on the ``QPainterPath`` that will more or less
 | 
			
		||||
        # detect the diff in "elements" terms?
 | 
			
		||||
        # update diff state since we've now rendered paths.
 | 
			
		||||
        self._last_read = new_read
 | 
			
		||||
 | 
			
		||||
        view_changed: bool = False
 | 
			
		||||
        view_range: tuple[int, int] = (ivl, ivr)
 | 
			
		||||
        if slice_to_inview:
 | 
			
		||||
| 
						 | 
				
			
			@ -329,11 +349,14 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
            array = in_view
 | 
			
		||||
            profiler(f'{self.viz.name} view range slice {view_range}')
 | 
			
		||||
 | 
			
		||||
        hist = array[:slice_to_head]
 | 
			
		||||
        # hist = array[:slice_to_head]
 | 
			
		||||
 | 
			
		||||
        # XXX: WOA WTF TRACTOR DEBUGGING BUGGG
 | 
			
		||||
        # assert 0
 | 
			
		||||
 | 
			
		||||
        # xy-path data transform: convert source data to a format
 | 
			
		||||
        # able to be passed to a `QPainterPath` rendering routine.
 | 
			
		||||
        if not len(hist):
 | 
			
		||||
        if not len(array):
 | 
			
		||||
            # XXX: this might be why the profiler only has exits?
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -341,7 +364,7 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
        # x/y_data in the case where allocate_xy is
 | 
			
		||||
        # defined?
 | 
			
		||||
        x_1d, y_1d, connect = self.format_xy_nd_to_1d(
 | 
			
		||||
            hist,
 | 
			
		||||
            array,
 | 
			
		||||
            array_key,
 | 
			
		||||
            view_range,
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -369,13 +392,10 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
        if len(x_1d):
 | 
			
		||||
            self._last_ivdr = x_1d[0], x_1d[slice_to_head]
 | 
			
		||||
 | 
			
		||||
        # TODO: eventually maybe we can implement some kind of
 | 
			
		||||
        # transform on the ``QPainterPath`` that will more or less
 | 
			
		||||
        # detect the diff in "elements" terms?
 | 
			
		||||
        # update diff state since we've now rendered paths.
 | 
			
		||||
        self._last_read = new_read
 | 
			
		||||
 | 
			
		||||
        profiler('.format_to_1d()')
 | 
			
		||||
        if (x_1d[-1] == 0.5).any():
 | 
			
		||||
            breakpoint()
 | 
			
		||||
 | 
			
		||||
        return (
 | 
			
		||||
            x_1d,
 | 
			
		||||
            y_1d,
 | 
			
		||||
| 
						 | 
				
			
			@ -429,21 +449,22 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
        nd_stop: int,
 | 
			
		||||
 | 
			
		||||
        is_append: bool,
 | 
			
		||||
        index_field: str = 'index',
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        np.ndarray,
 | 
			
		||||
        slice,
 | 
			
		||||
    ]:
 | 
			
		||||
        # write pushed data to flattened copy
 | 
			
		||||
        new_y_nd = new_from_src
 | 
			
		||||
        new_y_nd = new_from_src[data_field]
 | 
			
		||||
 | 
			
		||||
        # XXX
 | 
			
		||||
        # TODO: this should be returned and written by caller!
 | 
			
		||||
        # XXX
 | 
			
		||||
        # generate same-valued-per-row x support based on y shape
 | 
			
		||||
        # generate same-valued-per-row x support with Nx1 shape
 | 
			
		||||
        index_field = self.index_field
 | 
			
		||||
        if index_field != 'index':
 | 
			
		||||
            self.x_nd[read_slc, :] = new_from_src[index_field]
 | 
			
		||||
            x_nd_new = self.x_nd[read_slc]
 | 
			
		||||
            x_nd_new[:] = new_from_src[index_field]
 | 
			
		||||
 | 
			
		||||
        return new_y_nd, read_slc
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -573,6 +594,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
 | 
			
		|||
            low = q['low']
 | 
			
		||||
            close = q['close']
 | 
			
		||||
            # index = float64(q[index_field])
 | 
			
		||||
            # index = float64(q['time'])
 | 
			
		||||
            index = float64(q['index'])
 | 
			
		||||
 | 
			
		||||
            istart = i * 6
 | 
			
		||||
| 
						 | 
				
			
			@ -649,7 +671,6 @@ class OHLCBarsFmtr(IncrementalFormatter):
 | 
			
		|||
        nd_stop: int,
 | 
			
		||||
 | 
			
		||||
        is_append: bool,
 | 
			
		||||
        index_field: str = 'index',
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        np.ndarray,
 | 
			
		||||
| 
						 | 
				
			
			@ -665,8 +686,13 @@ class OHLCBarsFmtr(IncrementalFormatter):
 | 
			
		|||
        # TODO: this should be returned and written by caller!
 | 
			
		||||
        # XXX
 | 
			
		||||
        # generate same-valued-per-row x support based on y shape
 | 
			
		||||
        index_field: str = self.index_field
 | 
			
		||||
        if index_field != 'index':
 | 
			
		||||
            self.x_nd[read_slc, :] = new_from_src[index_field]
 | 
			
		||||
            x_nd_new = self.x_nd[read_slc]
 | 
			
		||||
            x_nd_new[:] = new_from_src[index_field][:, np.newaxis]
 | 
			
		||||
 | 
			
		||||
            if (self.x_nd[self.xy_slice] == 0.5).any():
 | 
			
		||||
                breakpoint()
 | 
			
		||||
 | 
			
		||||
        return new_y_nd, read_slc
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -712,8 +738,6 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        shm: ShmArray,
 | 
			
		||||
        data_field: str,
 | 
			
		||||
 | 
			
		||||
        index_field: str = 'index',
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        np.ndarray,  # x
 | 
			
		||||
        np.nd.array  # y
 | 
			
		||||
| 
						 | 
				
			
			@ -731,11 +755,17 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
            (i.size, 2),
 | 
			
		||||
        ) + np.array([-0.5, 0.5])
 | 
			
		||||
 | 
			
		||||
        y_out = np.empty((len(out), 2), dtype=out.dtype)
 | 
			
		||||
        # fill out Nx2 array to hold each step's left + right vertices.
 | 
			
		||||
        y_out = np.empty(
 | 
			
		||||
            # (len(out), 2),
 | 
			
		||||
            x_out.shape,
 | 
			
		||||
            dtype=out.dtype,
 | 
			
		||||
        )
 | 
			
		||||
        # fill in (current) values from source shm buffer
 | 
			
		||||
        y_out[:] = out[:, np.newaxis]
 | 
			
		||||
 | 
			
		||||
        # start y at origin level
 | 
			
		||||
        y_out[0, 0] = 0
 | 
			
		||||
        y_out[self.xy_nd_start] = 0
 | 
			
		||||
        return x_out, y_out
 | 
			
		||||
 | 
			
		||||
    def incr_update_xy_nd(
 | 
			
		||||
| 
						 | 
				
			
			@ -744,12 +774,12 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        src_shm: ShmArray,
 | 
			
		||||
        array_key: str,
 | 
			
		||||
 | 
			
		||||
        src_update: np.ndarray,  # portion of source that was updated
 | 
			
		||||
        slc: slice,
 | 
			
		||||
        new_from_src: np.ndarray,  # portion of source that was updated
 | 
			
		||||
        read_slc: slice,
 | 
			
		||||
        ln: int,  # len of updated
 | 
			
		||||
 | 
			
		||||
        first: int,
 | 
			
		||||
        last: int,
 | 
			
		||||
        nd_start: int,
 | 
			
		||||
        nd_stop: int,
 | 
			
		||||
 | 
			
		||||
        is_append: bool,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -760,20 +790,29 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        # for a step curve we slice from one datum prior
 | 
			
		||||
        # to the current "update slice" to get the previous
 | 
			
		||||
        # "level".
 | 
			
		||||
        if is_append:
 | 
			
		||||
            start = max(last - 1, 0)
 | 
			
		||||
            end = src_shm._last.value
 | 
			
		||||
            new_y = src_shm._array[start:end][array_key]
 | 
			
		||||
            slc = slice(start, end)
 | 
			
		||||
        # if is_append:
 | 
			
		||||
        #     start = max(last - 1, 0)
 | 
			
		||||
        #     end = src_shm._last.value
 | 
			
		||||
        #     new_y = src_shm._array[start:end][array_key]
 | 
			
		||||
        #     append_slc = slice(start, end)
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            new_y = src_update
 | 
			
		||||
        new_y = new_from_src[array_key][:, np.newaxis]
 | 
			
		||||
 | 
			
		||||
        index_field = self.index_field
 | 
			
		||||
        if index_field != 'index':
 | 
			
		||||
            x_nd_new = self.x_nd[read_slc]
 | 
			
		||||
            x_nd_new[:] = new_from_src[index_field][:, np.newaxis]
 | 
			
		||||
 | 
			
		||||
        if (self.x_nd[self.xy_slice][-1] == 0.5).any():
 | 
			
		||||
            breakpoint()
 | 
			
		||||
 | 
			
		||||
        return (
 | 
			
		||||
            np.broadcast_to(
 | 
			
		||||
                new_y[:, None], (new_y.size, 2),
 | 
			
		||||
            ),
 | 
			
		||||
            slc,
 | 
			
		||||
            new_y,
 | 
			
		||||
            # np.broadcast_to(
 | 
			
		||||
            #     new_x[:, None],
 | 
			
		||||
            #     (new_y.size, 2),
 | 
			
		||||
            # ),
 | 
			
		||||
            read_slc,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def format_xy_nd_to_1d(
 | 
			
		||||
| 
						 | 
				
			
			@ -788,23 +827,40 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        np.ndarray,
 | 
			
		||||
        str,
 | 
			
		||||
    ]:
 | 
			
		||||
        lasts = array[['index', array_key]]
 | 
			
		||||
        last = lasts[array_key][-1]
 | 
			
		||||
        last_t, last = array[-1][[self.index_field, array_key]]
 | 
			
		||||
 | 
			
		||||
        start = self.xy_nd_start
 | 
			
		||||
 | 
			
		||||
        # 2 more datum-indexes to capture zero at end
 | 
			
		||||
        x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2]
 | 
			
		||||
        y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2]
 | 
			
		||||
        # XXX: can we drop this ``extra`` bit?
 | 
			
		||||
        extra = 2
 | 
			
		||||
        stop = self.xy_nd_stop + extra
 | 
			
		||||
 | 
			
		||||
        x_step = self.x_nd[start:stop]
 | 
			
		||||
        y_step = self.y_nd[start:stop]
 | 
			
		||||
 | 
			
		||||
        # if (x_step[-1] == 0.5).any():
 | 
			
		||||
        #     breakpoint()
 | 
			
		||||
 | 
			
		||||
        # pack in duplicate final value to complete last step level
 | 
			
		||||
        x_step[-1] = last_t
 | 
			
		||||
        y_step[-1] = last
 | 
			
		||||
 | 
			
		||||
        # slice out in-view data
 | 
			
		||||
        ivl, ivr = vr
 | 
			
		||||
        ys_iv = y_step[ivl:ivr+1]
 | 
			
		||||
        xs_iv = x_step[ivl:ivr+1]
 | 
			
		||||
        # ys_iv = y_step[ivl:ivr+1]
 | 
			
		||||
        # xs_iv = x_step[ivl:ivr+1]
 | 
			
		||||
        ys_iv = y_step[ivl:ivr]
 | 
			
		||||
        xs_iv = x_step[ivl:ivr]
 | 
			
		||||
 | 
			
		||||
        # flatten to 1d
 | 
			
		||||
        y_iv = ys_iv.reshape(ys_iv.size)
 | 
			
		||||
        x_iv = xs_iv.reshape(xs_iv.size)
 | 
			
		||||
 | 
			
		||||
        if (x_iv[-1] == 0.5).any():
 | 
			
		||||
            breakpoint()
 | 
			
		||||
 | 
			
		||||
        # s = 100
 | 
			
		||||
        # print(
 | 
			
		||||
        #     f'ys_iv : {ys_iv[-s:]}\n'
 | 
			
		||||
        #     f'y_iv: {y_iv[-s:]}\n'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue