From c084a1122ac4050d3a2bb7ca65f38977274b9bf3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Nov 2022 13:35:38 -0500 Subject: [PATCH] First attempt, field-index agnostic formatting Remove harcoded `'index'` field refs from all formatters in a first attempt at moving towards epoch-time alignment (though don't actually use it it yet). Adjustments to the formatter interface: - property for `.xy_nd` the x/y nd arrays. - property for and `.xy_slice` the nd format array(s) start->stop index slice. Internal routine tweaks: - drop `read_src_from_key` and always pass full source array on updates and adjust handlers to expect to have to index the data field of interest. - set `.last_read` right after update calls instead of after 1d conversion. - drop `slice_to_head` array read slicing. - add some debug points for testing 'time' indexing (though not used here yet). - add `.x_nd` array update logic for when the `.index_field` is not 'index' - i.e. when we begin to try and support epoch time. - simplify some new y_nd updates to not require use of `np.broadcast()` where possible. --- piker/data/_pathops.py | 208 ++++++++++++++++++++++++++--------------- 1 file changed, 132 insertions(+), 76 deletions(-) diff --git a/piker/data/_pathops.py b/piker/data/_pathops.py index 6de9160f..2e32d910 100644 --- a/piker/data/_pathops.py +++ b/piker/data/_pathops.py @@ -27,14 +27,12 @@ import msgspec import numpy as np from numpy.lib import recfunctions as rfn from numba import ( - types, + # types, njit, float64, int64, - optional, + # optional, ) -from numba.core.types.misc import StringLiteral -# from numba.extending import as_numba_type from ._sharedmem import ( ShmArray, @@ -77,6 +75,40 @@ class IncrementalFormatter(msgspec.Struct): def last_read(self) -> tuple | None: return self._last_read + # Incrementally updated xy ndarray formatted data, a pre-1d + # format which is updated and cached independently of the final + # pre-graphics-path 1d format. + x_nd: Optional[np.ndarray] = None + y_nd: Optional[np.ndarray] = None + + @property + def xy_nd(self) -> tuple[np.ndarray, np.ndarray]: + return ( + self.x_nd[self.xy_slice], + self.y_nd[self.xy_slice], + ) + + @property + def xy_slice(self) -> slice: + return slice( + self.xy_nd_start, + self.xy_nd_stop, + ) + + # indexes which slice into the above arrays (which are allocated + # based on source data shm input size) and allow retrieving + # incrementally updated data. + xy_nd_start: int = 0 + xy_nd_stop: int = 0 + + # TODO: eventually incrementally update 1d-pre-graphics path data? + # x_1d: Optional[np.ndarray] = None + # y_1d: Optional[np.ndarray] = None + + # incremental view-change state(s) tracking + _last_vr: tuple[float, float] | None = None + _last_ivdr: tuple[float, float] | None = None + def __repr__(self) -> str: msg = ( f'{type(self)}: ->\n\n' @@ -86,8 +118,8 @@ class IncrementalFormatter(msgspec.Struct): f'last_vr={self._last_vr}\n' f'last_ivdr={self._last_ivdr}\n\n' - f'xy_nd_start={self.xy_nd_start}\n' - f'xy_nd_stop={self.xy_nd_stop}\n\n' + f'xy_slice={self.xy_slice}\n' + # f'xy_nd_stop={self.xy_nd_stop}\n\n' ) x_nd_len = 0 @@ -137,6 +169,12 @@ class IncrementalFormatter(msgspec.Struct): prepend_length = int(last_xfirst - xfirst) append_length = int(xlast - last_xlast) + if ( + prepend_length < 0 + or append_length < 0 + ): + breakpoint() + # blah blah blah # do diffing for prepend, append and last entry return ( @@ -146,26 +184,6 @@ class IncrementalFormatter(msgspec.Struct): slice(last_xlast, xlast), ) - # Incrementally updated xy ndarray formatted data, a pre-1d - # format which is updated and cached independently of the final - # pre-graphics-path 1d format. - x_nd: Optional[np.ndarray] = None - y_nd: Optional[np.ndarray] = None - - # indexes which slice into the above arrays (which are allocated - # based on source data shm input size) and allow retrieving - # incrementally updated data. - xy_nd_start: int = 0 - xy_nd_stop: int = 0 - - # TODO: eventually incrementally update 1d-pre-graphics path data? - # x_1d: Optional[np.ndarray] = None - # y_1d: Optional[np.ndarray] = None - - # incremental view-change state(s) tracking - _last_vr: tuple[float, float] | None = None - _last_ivdr: tuple[float, float] | None = None - def _track_inview_range( self, view_range: tuple[int, int], @@ -244,18 +262,18 @@ class IncrementalFormatter(msgspec.Struct): if self.y_nd is None: # we first need to allocate xy data arrays # from the source data. + self.xy_nd_start = shm._first.value + self.xy_nd_stop = shm._last.value self.x_nd, self.y_nd = self.allocate_xy_nd( shm, array_key, ) - self.xy_nd_start = shm._first.value - self.xy_nd_stop = shm._last.value profiler('allocated xy history') if prepend_len: y_prepend = shm._array[pre_slice] - if read_src_from_key: - y_prepend = y_prepend[array_key] + # if read_src_from_key: + # y_prepend = y_prepend[array_key] ( new_y_nd, @@ -292,8 +310,8 @@ class IncrementalFormatter(msgspec.Struct): if append_len: y_append = shm._array[post_slice] - if read_src_from_key: - y_append = y_append[array_key] + # if read_src_from_key: + # y_append = y_append[array_key] ( new_y_nd, @@ -314,14 +332,16 @@ class IncrementalFormatter(msgspec.Struct): # self.y_nd[post_slice] = new_y_nd # self.y_nd[xy_slice or post_slice] = xy_data self.y_nd[y_nd_slc] = new_y_nd - # if read_src_from_key: - # y_nd_view[:][array_key] = new_y_nd - # else: - # y_nd_view[:] = new_y_nd self.xy_nd_stop = shm._last.value profiler('appened xy history: {append_length}') + # TODO: eventually maybe we can implement some kind of + # transform on the ``QPainterPath`` that will more or less + # detect the diff in "elements" terms? + # update diff state since we've now rendered paths. + self._last_read = new_read + view_changed: bool = False view_range: tuple[int, int] = (ivl, ivr) if slice_to_inview: @@ -329,11 +349,14 @@ class IncrementalFormatter(msgspec.Struct): array = in_view profiler(f'{self.viz.name} view range slice {view_range}') - hist = array[:slice_to_head] + # hist = array[:slice_to_head] + + # XXX: WOA WTF TRACTOR DEBUGGING BUGGG + # assert 0 # xy-path data transform: convert source data to a format # able to be passed to a `QPainterPath` rendering routine. - if not len(hist): + if not len(array): # XXX: this might be why the profiler only has exits? return @@ -341,7 +364,7 @@ class IncrementalFormatter(msgspec.Struct): # x/y_data in the case where allocate_xy is # defined? x_1d, y_1d, connect = self.format_xy_nd_to_1d( - hist, + array, array_key, view_range, ) @@ -369,13 +392,10 @@ class IncrementalFormatter(msgspec.Struct): if len(x_1d): self._last_ivdr = x_1d[0], x_1d[slice_to_head] - # TODO: eventually maybe we can implement some kind of - # transform on the ``QPainterPath`` that will more or less - # detect the diff in "elements" terms? - # update diff state since we've now rendered paths. - self._last_read = new_read - profiler('.format_to_1d()') + if (x_1d[-1] == 0.5).any(): + breakpoint() + return ( x_1d, y_1d, @@ -429,21 +449,22 @@ class IncrementalFormatter(msgspec.Struct): nd_stop: int, is_append: bool, - index_field: str = 'index', ) -> tuple[ np.ndarray, slice, ]: # write pushed data to flattened copy - new_y_nd = new_from_src + new_y_nd = new_from_src[data_field] # XXX # TODO: this should be returned and written by caller! # XXX - # generate same-valued-per-row x support based on y shape + # generate same-valued-per-row x support with Nx1 shape + index_field = self.index_field if index_field != 'index': - self.x_nd[read_slc, :] = new_from_src[index_field] + x_nd_new = self.x_nd[read_slc] + x_nd_new[:] = new_from_src[index_field] return new_y_nd, read_slc @@ -573,6 +594,7 @@ class OHLCBarsFmtr(IncrementalFormatter): low = q['low'] close = q['close'] # index = float64(q[index_field]) + # index = float64(q['time']) index = float64(q['index']) istart = i * 6 @@ -649,7 +671,6 @@ class OHLCBarsFmtr(IncrementalFormatter): nd_stop: int, is_append: bool, - index_field: str = 'index', ) -> tuple[ np.ndarray, @@ -665,8 +686,13 @@ class OHLCBarsFmtr(IncrementalFormatter): # TODO: this should be returned and written by caller! # XXX # generate same-valued-per-row x support based on y shape + index_field: str = self.index_field if index_field != 'index': - self.x_nd[read_slc, :] = new_from_src[index_field] + x_nd_new = self.x_nd[read_slc] + x_nd_new[:] = new_from_src[index_field][:, np.newaxis] + + if (self.x_nd[self.xy_slice] == 0.5).any(): + breakpoint() return new_y_nd, read_slc @@ -712,8 +738,6 @@ class StepCurveFmtr(IncrementalFormatter): shm: ShmArray, data_field: str, - index_field: str = 'index', - ) -> tuple[ np.ndarray, # x np.nd.array # y @@ -731,11 +755,17 @@ class StepCurveFmtr(IncrementalFormatter): (i.size, 2), ) + np.array([-0.5, 0.5]) - y_out = np.empty((len(out), 2), dtype=out.dtype) + # fill out Nx2 array to hold each step's left + right vertices. + y_out = np.empty( + # (len(out), 2), + x_out.shape, + dtype=out.dtype, + ) + # fill in (current) values from source shm buffer y_out[:] = out[:, np.newaxis] # start y at origin level - y_out[0, 0] = 0 + y_out[self.xy_nd_start] = 0 return x_out, y_out def incr_update_xy_nd( @@ -744,12 +774,12 @@ class StepCurveFmtr(IncrementalFormatter): src_shm: ShmArray, array_key: str, - src_update: np.ndarray, # portion of source that was updated - slc: slice, + new_from_src: np.ndarray, # portion of source that was updated + read_slc: slice, ln: int, # len of updated - first: int, - last: int, + nd_start: int, + nd_stop: int, is_append: bool, @@ -760,20 +790,29 @@ class StepCurveFmtr(IncrementalFormatter): # for a step curve we slice from one datum prior # to the current "update slice" to get the previous # "level". - if is_append: - start = max(last - 1, 0) - end = src_shm._last.value - new_y = src_shm._array[start:end][array_key] - slc = slice(start, end) + # if is_append: + # start = max(last - 1, 0) + # end = src_shm._last.value + # new_y = src_shm._array[start:end][array_key] + # append_slc = slice(start, end) - else: - new_y = src_update + new_y = new_from_src[array_key][:, np.newaxis] + + index_field = self.index_field + if index_field != 'index': + x_nd_new = self.x_nd[read_slc] + x_nd_new[:] = new_from_src[index_field][:, np.newaxis] + + if (self.x_nd[self.xy_slice][-1] == 0.5).any(): + breakpoint() return ( - np.broadcast_to( - new_y[:, None], (new_y.size, 2), - ), - slc, + new_y, + # np.broadcast_to( + # new_x[:, None], + # (new_y.size, 2), + # ), + read_slc, ) def format_xy_nd_to_1d( @@ -788,23 +827,40 @@ class StepCurveFmtr(IncrementalFormatter): np.ndarray, str, ]: - lasts = array[['index', array_key]] - last = lasts[array_key][-1] + last_t, last = array[-1][[self.index_field, array_key]] + + start = self.xy_nd_start # 2 more datum-indexes to capture zero at end - x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2] - y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2] + # XXX: can we drop this ``extra`` bit? + extra = 2 + stop = self.xy_nd_stop + extra + + x_step = self.x_nd[start:stop] + y_step = self.y_nd[start:stop] + + # if (x_step[-1] == 0.5).any(): + # breakpoint() + + # pack in duplicate final value to complete last step level + x_step[-1] = last_t y_step[-1] = last # slice out in-view data ivl, ivr = vr - ys_iv = y_step[ivl:ivr+1] - xs_iv = x_step[ivl:ivr+1] + # ys_iv = y_step[ivl:ivr+1] + # xs_iv = x_step[ivl:ivr+1] + ys_iv = y_step[ivl:ivr] + xs_iv = x_step[ivl:ivr] # flatten to 1d y_iv = ys_iv.reshape(ys_iv.size) x_iv = xs_iv.reshape(xs_iv.size) + if (x_iv[-1] == 0.5).any(): + breakpoint() + + # s = 100 # print( # f'ys_iv : {ys_iv[-s:]}\n' # f'y_iv: {y_iv[-s:]}\n'