From bd2abcb91f1d7adf697a22ce5d4ccee7242802ad Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Nov 2022 09:05:06 -0500 Subject: [PATCH] Simplify formatter update methodology Don't expect values (array + slice) to be returned and applied by `.incr_update_xy_nd()` and instead presume this will implemented internally in each (sub)formatter. Attempt to simplify some incr-update routines, (particularly in the step curve formatter, though most of it was reverted to just a simpler form of the original implementation XD) including: - dropping the need for the `slice_to_head: int` control. - using the `xy_nd_start/stop` index counters over custom lookups. --- piker/data/_pathops.py | 151 +++++++++++++++++++---------------------- 1 file changed, 68 insertions(+), 83 deletions(-) diff --git a/piker/data/_pathops.py b/piker/data/_pathops.py index 2e32d910..b784205c 100644 --- a/piker/data/_pathops.py +++ b/piker/data/_pathops.py @@ -271,15 +271,7 @@ class IncrementalFormatter(msgspec.Struct): profiler('allocated xy history') if prepend_len: - y_prepend = shm._array[pre_slice] - # if read_src_from_key: - # y_prepend = y_prepend[array_key] - - ( - new_y_nd, - y_nd_slc, - - ) = self.incr_update_xy_nd( + self.incr_update_xy_nd( shm, array_key, @@ -289,7 +281,7 @@ class IncrementalFormatter(msgspec.Struct): # step curves) the updater routine may want to do # the source history-data reading itself, so we pass # both here. - y_prepend, + shm._array[pre_slice], pre_slice, prepend_len, @@ -298,30 +290,16 @@ class IncrementalFormatter(msgspec.Struct): is_append=False, ) - # y_nd_view = self.y_nd[y_nd_slc] - self.y_nd[y_nd_slc] = new_y_nd - # if read_src_from_key: - # y_nd_view[:][array_key] = new_y_nd - # else: - # y_nd_view[:] = new_y_nd - + # self.y_nd[y_nd_slc] = new_y_nd self.xy_nd_start = shm._first.value profiler('prepended xy history: {prepend_length}') if append_len: - y_append = shm._array[post_slice] - # if read_src_from_key: - # y_append = y_append[array_key] - - ( - new_y_nd, - y_nd_slc, - - ) = self.incr_update_xy_nd( + self.incr_update_xy_nd( shm, array_key, - y_append, + shm._array[post_slice], post_slice, append_len, @@ -329,10 +307,6 @@ class IncrementalFormatter(msgspec.Struct): self.xy_nd_stop, is_append=True, ) - # self.y_nd[post_slice] = new_y_nd - # self.y_nd[xy_slice or post_slice] = xy_data - self.y_nd[y_nd_slc] = new_y_nd - self.xy_nd_stop = shm._last.value profiler('appened xy history: {append_length}') @@ -391,10 +365,10 @@ class IncrementalFormatter(msgspec.Struct): # update the last "in view data range" if len(x_1d): self._last_ivdr = x_1d[0], x_1d[slice_to_head] + if (x_1d[-1] == 0.5).any(): + breakpoint() profiler('.format_to_1d()') - if (x_1d[-1] == 0.5).any(): - breakpoint() return ( x_1d, @@ -450,10 +424,7 @@ class IncrementalFormatter(msgspec.Struct): is_append: bool, - ) -> tuple[ - np.ndarray, - slice, - ]: + ) -> None: # write pushed data to flattened copy new_y_nd = new_from_src[data_field] @@ -466,7 +437,7 @@ class IncrementalFormatter(msgspec.Struct): x_nd_new = self.x_nd[read_slc] x_nd_new[:] = new_from_src[index_field] - return new_y_nd, read_slc + self.y_nd[read_slc] = new_y_nd # XXX: was ``.format_xy()`` def format_xy_nd_to_1d( @@ -488,8 +459,8 @@ class IncrementalFormatter(msgspec.Struct): ''' return ( - array[self.index_field], - array[array_key], + array[self.index_field][:-1], + array[array_key][:-1], # 1d connection array or style-key to # ``pg.functions.arrayToQPath()`` @@ -672,10 +643,7 @@ class OHLCBarsFmtr(IncrementalFormatter): is_append: bool, - ) -> tuple[ - np.ndarray, - slice, - ]: + ) -> None: # write newly pushed data to flattened copy # a struct-arr is always passed in. new_y_nd = rfn.structured_to_unstructured( @@ -694,7 +662,7 @@ class OHLCBarsFmtr(IncrementalFormatter): if (self.x_nd[self.xy_slice] == 0.5).any(): breakpoint() - return new_y_nd, read_slc + self.y_nd[read_slc] = new_y_nd class OHLCBarsAsCurveFmtr(OHLCBarsFmtr): @@ -764,8 +732,10 @@ class StepCurveFmtr(IncrementalFormatter): # fill in (current) values from source shm buffer y_out[:] = out[:, np.newaxis] + # TODO: pretty sure we can drop this? # start y at origin level - y_out[self.xy_nd_start] = 0 + # y_out[0, 0] = 0 + # y_out[self.xy_nd_start] = 0 return x_out, y_out def incr_update_xy_nd( @@ -790,13 +760,29 @@ class StepCurveFmtr(IncrementalFormatter): # for a step curve we slice from one datum prior # to the current "update slice" to get the previous # "level". - # if is_append: - # start = max(last - 1, 0) - # end = src_shm._last.value - # new_y = src_shm._array[start:end][array_key] - # append_slc = slice(start, end) + last_2 = slice( + read_slc.start, + read_slc.stop+1, + ) + y_nd_new = self.y_nd[last_2] + y_nd_new[:] = src_shm._array[last_2][array_key][:, None] - new_y = new_from_src[array_key][:, np.newaxis] + # NOTE: we can't use the append slice since we need to "look + # forward" one step to get the current level and copy it as + # well? (though i still don't really grok why..) + # y_nd_new[:] = new_from_src[array_key][:, None] + + # XXX: old approach now duplicated above (we can probably drop + # this since the key part was the ``nd_stop + 1`` + # if is_append: + # start = max(nd_stop - 1, 0) + # end = src_shm._last.value + # y_nd_new = src_shm._array[start:end][array_key]#[:, np.newaxis] + # slc = slice(start, end) + # self.y_nd[slc] = np.broadcast_to( + # y_nd_new[:, None], + # (y_nd_new.size, 2), + # ) index_field = self.index_field if index_field != 'index': @@ -806,15 +792,6 @@ class StepCurveFmtr(IncrementalFormatter): if (self.x_nd[self.xy_slice][-1] == 0.5).any(): breakpoint() - return ( - new_y, - # np.broadcast_to( - # new_x[:, None], - # (new_y.size, 2), - # ), - read_slc, - ) - def format_xy_nd_to_1d( self, @@ -830,11 +807,7 @@ class StepCurveFmtr(IncrementalFormatter): last_t, last = array[-1][[self.index_field, array_key]] start = self.xy_nd_start - - # 2 more datum-indexes to capture zero at end - # XXX: can we drop this ``extra`` bit? - extra = 2 - stop = self.xy_nd_stop + extra + stop = self.xy_nd_stop x_step = self.x_nd[start:stop] y_step = self.y_nd[start:stop] @@ -843,32 +816,44 @@ class StepCurveFmtr(IncrementalFormatter): # breakpoint() # pack in duplicate final value to complete last step level - x_step[-1] = last_t - y_step[-1] = last + # x_step[-1] = last_t + # y_step[-1] = last + # x_step[-1, 1] = last_t + y_step[-1, 1] = last + + # if y_step.any(): + # s = 3 + # print( + # f'x_step:\n{x_step[-s:]}\n' + # f'y_step:\n{y_step[-s:]}\n\n' + # ) # slice out in-view data ivl, ivr = vr - # ys_iv = y_step[ivl:ivr+1] - # xs_iv = x_step[ivl:ivr+1] - ys_iv = y_step[ivl:ivr] - xs_iv = x_step[ivl:ivr] + # TODO: WHY do we need the extra +1 index? + x_step_iv = x_step[ivl:ivr+1] + y_step_iv = y_step[ivl:ivr+1] # flatten to 1d - y_iv = ys_iv.reshape(ys_iv.size) - x_iv = xs_iv.reshape(xs_iv.size) + x_1d = x_step_iv.reshape(x_step_iv.size) + y_1d = y_step_iv.reshape(y_step_iv.size) - if (x_iv[-1] == 0.5).any(): + if not x_1d.size == y_1d.size: breakpoint() - # s = 100 - # print( - # f'ys_iv : {ys_iv[-s:]}\n' - # f'y_iv: {y_iv[-s:]}\n' - # f'xs_iv: {xs_iv[-s:]}\n' - # f'x_iv: {x_iv[-s:]}\n' - # ) + if x_1d.any() and (x_1d[-1] == 0.5).any(): + breakpoint() - return x_iv, y_iv, 'all' + # if y_1d.any(): + # s = 6 + # print( + # f'x_step_iv:\n{x_step_iv[-s:]}\n' + # f'y_step_iv:\n{y_step_iv[-s:]}\n\n' + # f'x_1d:\n{x_1d[-s:]}\n' + # f'y_1d:\n{y_1d[-s:]}\n' + # ) + + return x_1d, y_1d, 'all' def xy_downsample(