Simplify formatter update methodology
Don't expect values (array + slice) to be returned and applied by `.incr_update_xy_nd()` and instead presume this will implemented internally in each (sub)formatter. Attempt to simplify some incr-update routines, (particularly in the step curve formatter, though most of it was reverted to just a simpler form of the original implementation XD) including: - dropping the need for the `slice_to_head: int` control. - using the `xy_nd_start/stop` index counters over custom lookups.pre_viz_calls
							parent
							
								
									a9da11451f
								
							
						
					
					
						commit
						301bfa2463
					
				| 
						 | 
				
			
			@ -271,15 +271,7 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
            profiler('allocated xy history')
 | 
			
		||||
 | 
			
		||||
        if prepend_len:
 | 
			
		||||
            y_prepend = shm._array[pre_slice]
 | 
			
		||||
            # if read_src_from_key:
 | 
			
		||||
            #     y_prepend = y_prepend[array_key]
 | 
			
		||||
 | 
			
		||||
            (
 | 
			
		||||
                new_y_nd,
 | 
			
		||||
                y_nd_slc,
 | 
			
		||||
 | 
			
		||||
            ) = self.incr_update_xy_nd(
 | 
			
		||||
            self.incr_update_xy_nd(
 | 
			
		||||
                shm,
 | 
			
		||||
                array_key,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -289,7 +281,7 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
                # step curves) the updater routine may want to do
 | 
			
		||||
                # the source history-data reading itself, so we pass
 | 
			
		||||
                # both here.
 | 
			
		||||
                y_prepend,
 | 
			
		||||
                shm._array[pre_slice],
 | 
			
		||||
                pre_slice,
 | 
			
		||||
                prepend_len,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -298,30 +290,16 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
                is_append=False,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # y_nd_view = self.y_nd[y_nd_slc]
 | 
			
		||||
            self.y_nd[y_nd_slc] = new_y_nd
 | 
			
		||||
            # if read_src_from_key:
 | 
			
		||||
            #     y_nd_view[:][array_key] = new_y_nd
 | 
			
		||||
            # else:
 | 
			
		||||
            #     y_nd_view[:] = new_y_nd
 | 
			
		||||
 | 
			
		||||
            # self.y_nd[y_nd_slc] = new_y_nd
 | 
			
		||||
            self.xy_nd_start = shm._first.value
 | 
			
		||||
            profiler('prepended xy history: {prepend_length}')
 | 
			
		||||
 | 
			
		||||
        if append_len:
 | 
			
		||||
            y_append = shm._array[post_slice]
 | 
			
		||||
            # if read_src_from_key:
 | 
			
		||||
            #     y_append = y_append[array_key]
 | 
			
		||||
 | 
			
		||||
            (
 | 
			
		||||
                new_y_nd,
 | 
			
		||||
                y_nd_slc,
 | 
			
		||||
 | 
			
		||||
            ) = self.incr_update_xy_nd(
 | 
			
		||||
            self.incr_update_xy_nd(
 | 
			
		||||
                shm,
 | 
			
		||||
                array_key,
 | 
			
		||||
 | 
			
		||||
                y_append,
 | 
			
		||||
                shm._array[post_slice],
 | 
			
		||||
                post_slice,
 | 
			
		||||
                append_len,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -329,10 +307,6 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
                self.xy_nd_stop,
 | 
			
		||||
                is_append=True,
 | 
			
		||||
            )
 | 
			
		||||
            # self.y_nd[post_slice] = new_y_nd
 | 
			
		||||
            # self.y_nd[xy_slice or post_slice] = xy_data
 | 
			
		||||
            self.y_nd[y_nd_slc] = new_y_nd
 | 
			
		||||
 | 
			
		||||
            self.xy_nd_stop = shm._last.value
 | 
			
		||||
            profiler('appened xy history: {append_length}')
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -391,11 +365,11 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
        # update the last "in view data range"
 | 
			
		||||
        if len(x_1d):
 | 
			
		||||
            self._last_ivdr = x_1d[0], x_1d[slice_to_head]
 | 
			
		||||
 | 
			
		||||
        profiler('.format_to_1d()')
 | 
			
		||||
            if (x_1d[-1] == 0.5).any():
 | 
			
		||||
                breakpoint()
 | 
			
		||||
 | 
			
		||||
        profiler('.format_to_1d()')
 | 
			
		||||
 | 
			
		||||
        return (
 | 
			
		||||
            x_1d,
 | 
			
		||||
            y_1d,
 | 
			
		||||
| 
						 | 
				
			
			@ -450,10 +424,7 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
 | 
			
		||||
        is_append: bool,
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        np.ndarray,
 | 
			
		||||
        slice,
 | 
			
		||||
    ]:
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        # write pushed data to flattened copy
 | 
			
		||||
        new_y_nd = new_from_src[data_field]
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -466,7 +437,7 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
            x_nd_new = self.x_nd[read_slc]
 | 
			
		||||
            x_nd_new[:] = new_from_src[index_field]
 | 
			
		||||
 | 
			
		||||
        return new_y_nd, read_slc
 | 
			
		||||
        self.y_nd[read_slc] = new_y_nd
 | 
			
		||||
 | 
			
		||||
    # XXX: was ``.format_xy()``
 | 
			
		||||
    def format_xy_nd_to_1d(
 | 
			
		||||
| 
						 | 
				
			
			@ -488,8 +459,8 @@ class IncrementalFormatter(msgspec.Struct):
 | 
			
		|||
 | 
			
		||||
        '''
 | 
			
		||||
        return (
 | 
			
		||||
            array[self.index_field],
 | 
			
		||||
            array[array_key],
 | 
			
		||||
            array[self.index_field][:-1],
 | 
			
		||||
            array[array_key][:-1],
 | 
			
		||||
 | 
			
		||||
            # 1d connection array or style-key to
 | 
			
		||||
            # ``pg.functions.arrayToQPath()``
 | 
			
		||||
| 
						 | 
				
			
			@ -672,10 +643,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
 | 
			
		|||
 | 
			
		||||
        is_append: bool,
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        np.ndarray,
 | 
			
		||||
        slice,
 | 
			
		||||
    ]:
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        # write newly pushed data to flattened copy
 | 
			
		||||
        # a struct-arr is always passed in.
 | 
			
		||||
        new_y_nd = rfn.structured_to_unstructured(
 | 
			
		||||
| 
						 | 
				
			
			@ -694,7 +662,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
 | 
			
		|||
            if (self.x_nd[self.xy_slice] == 0.5).any():
 | 
			
		||||
                breakpoint()
 | 
			
		||||
 | 
			
		||||
        return new_y_nd, read_slc
 | 
			
		||||
        self.y_nd[read_slc] = new_y_nd
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OHLCBarsAsCurveFmtr(OHLCBarsFmtr):
 | 
			
		||||
| 
						 | 
				
			
			@ -764,8 +732,10 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        # fill in (current) values from source shm buffer
 | 
			
		||||
        y_out[:] = out[:, np.newaxis]
 | 
			
		||||
 | 
			
		||||
        # TODO: pretty sure we can drop this?
 | 
			
		||||
        # start y at origin level
 | 
			
		||||
        y_out[self.xy_nd_start] = 0
 | 
			
		||||
        # y_out[0, 0] = 0
 | 
			
		||||
        # y_out[self.xy_nd_start] = 0
 | 
			
		||||
        return x_out, y_out
 | 
			
		||||
 | 
			
		||||
    def incr_update_xy_nd(
 | 
			
		||||
| 
						 | 
				
			
			@ -790,13 +760,29 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        # for a step curve we slice from one datum prior
 | 
			
		||||
        # to the current "update slice" to get the previous
 | 
			
		||||
        # "level".
 | 
			
		||||
        # if is_append:
 | 
			
		||||
        #     start = max(last - 1, 0)
 | 
			
		||||
        #     end = src_shm._last.value
 | 
			
		||||
        #     new_y = src_shm._array[start:end][array_key]
 | 
			
		||||
        #     append_slc = slice(start, end)
 | 
			
		||||
        last_2 = slice(
 | 
			
		||||
            read_slc.start,
 | 
			
		||||
            read_slc.stop+1,
 | 
			
		||||
        )
 | 
			
		||||
        y_nd_new = self.y_nd[last_2]
 | 
			
		||||
        y_nd_new[:] = src_shm._array[last_2][array_key][:, None]
 | 
			
		||||
 | 
			
		||||
        new_y = new_from_src[array_key][:, np.newaxis]
 | 
			
		||||
        # NOTE: we can't use the append slice since we need to "look
 | 
			
		||||
        # forward" one step to get the current level and copy it as
 | 
			
		||||
        # well? (though i still don't really grok why..)
 | 
			
		||||
        # y_nd_new[:] = new_from_src[array_key][:, None]
 | 
			
		||||
 | 
			
		||||
        # XXX: old approach now duplicated above (we can probably drop
 | 
			
		||||
        # this since the key part was the ``nd_stop + 1``
 | 
			
		||||
        # if is_append:
 | 
			
		||||
        #     start = max(nd_stop - 1, 0)
 | 
			
		||||
        #     end = src_shm._last.value
 | 
			
		||||
        #     y_nd_new = src_shm._array[start:end][array_key]#[:, np.newaxis]
 | 
			
		||||
        #     slc = slice(start, end)
 | 
			
		||||
        #     self.y_nd[slc] = np.broadcast_to(
 | 
			
		||||
        #         y_nd_new[:, None],
 | 
			
		||||
        #         (y_nd_new.size, 2),
 | 
			
		||||
        #     )
 | 
			
		||||
 | 
			
		||||
        index_field = self.index_field
 | 
			
		||||
        if index_field != 'index':
 | 
			
		||||
| 
						 | 
				
			
			@ -806,15 +792,6 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        if (self.x_nd[self.xy_slice][-1] == 0.5).any():
 | 
			
		||||
            breakpoint()
 | 
			
		||||
 | 
			
		||||
        return (
 | 
			
		||||
            new_y,
 | 
			
		||||
            # np.broadcast_to(
 | 
			
		||||
            #     new_x[:, None],
 | 
			
		||||
            #     (new_y.size, 2),
 | 
			
		||||
            # ),
 | 
			
		||||
            read_slc,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def format_xy_nd_to_1d(
 | 
			
		||||
        self,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -830,11 +807,7 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        last_t, last = array[-1][[self.index_field, array_key]]
 | 
			
		||||
 | 
			
		||||
        start = self.xy_nd_start
 | 
			
		||||
 | 
			
		||||
        # 2 more datum-indexes to capture zero at end
 | 
			
		||||
        # XXX: can we drop this ``extra`` bit?
 | 
			
		||||
        extra = 2
 | 
			
		||||
        stop = self.xy_nd_stop + extra
 | 
			
		||||
        stop = self.xy_nd_stop
 | 
			
		||||
 | 
			
		||||
        x_step = self.x_nd[start:stop]
 | 
			
		||||
        y_step = self.y_nd[start:stop]
 | 
			
		||||
| 
						 | 
				
			
			@ -843,32 +816,44 @@ class StepCurveFmtr(IncrementalFormatter):
 | 
			
		|||
        #     breakpoint()
 | 
			
		||||
 | 
			
		||||
        # pack in duplicate final value to complete last step level
 | 
			
		||||
        x_step[-1] = last_t
 | 
			
		||||
        y_step[-1] = last
 | 
			
		||||
        # x_step[-1] = last_t
 | 
			
		||||
        # y_step[-1] = last
 | 
			
		||||
        # x_step[-1, 1] = last_t
 | 
			
		||||
        y_step[-1, 1] = last
 | 
			
		||||
 | 
			
		||||
        # if y_step.any():
 | 
			
		||||
        #     s = 3
 | 
			
		||||
        #     print(
 | 
			
		||||
        #         f'x_step:\n{x_step[-s:]}\n'
 | 
			
		||||
        #         f'y_step:\n{y_step[-s:]}\n\n'
 | 
			
		||||
        #     )
 | 
			
		||||
 | 
			
		||||
        # slice out in-view data
 | 
			
		||||
        ivl, ivr = vr
 | 
			
		||||
        # ys_iv = y_step[ivl:ivr+1]
 | 
			
		||||
        # xs_iv = x_step[ivl:ivr+1]
 | 
			
		||||
        ys_iv = y_step[ivl:ivr]
 | 
			
		||||
        xs_iv = x_step[ivl:ivr]
 | 
			
		||||
        # TODO: WHY do we need the extra +1 index?
 | 
			
		||||
        x_step_iv = x_step[ivl:ivr+1]
 | 
			
		||||
        y_step_iv = y_step[ivl:ivr+1]
 | 
			
		||||
 | 
			
		||||
        # flatten to 1d
 | 
			
		||||
        y_iv = ys_iv.reshape(ys_iv.size)
 | 
			
		||||
        x_iv = xs_iv.reshape(xs_iv.size)
 | 
			
		||||
        x_1d = x_step_iv.reshape(x_step_iv.size)
 | 
			
		||||
        y_1d = y_step_iv.reshape(y_step_iv.size)
 | 
			
		||||
 | 
			
		||||
        if (x_iv[-1] == 0.5).any():
 | 
			
		||||
        if not x_1d.size == y_1d.size:
 | 
			
		||||
            breakpoint()
 | 
			
		||||
 | 
			
		||||
        # s = 100
 | 
			
		||||
        if x_1d.any() and (x_1d[-1] == 0.5).any():
 | 
			
		||||
            breakpoint()
 | 
			
		||||
 | 
			
		||||
        # if y_1d.any():
 | 
			
		||||
        #     s = 6
 | 
			
		||||
        #     print(
 | 
			
		||||
        #     f'ys_iv : {ys_iv[-s:]}\n'
 | 
			
		||||
        #     f'y_iv: {y_iv[-s:]}\n'
 | 
			
		||||
        #     f'xs_iv: {xs_iv[-s:]}\n'
 | 
			
		||||
        #     f'x_iv: {x_iv[-s:]}\n'
 | 
			
		||||
        #         f'x_step_iv:\n{x_step_iv[-s:]}\n'
 | 
			
		||||
        #         f'y_step_iv:\n{y_step_iv[-s:]}\n\n'
 | 
			
		||||
        #         f'x_1d:\n{x_1d[-s:]}\n'
 | 
			
		||||
        #         f'y_1d:\n{y_1d[-s:]}\n'
 | 
			
		||||
        #     )
 | 
			
		||||
 | 
			
		||||
        return x_iv, y_iv, 'all'
 | 
			
		||||
        return x_1d, y_1d, 'all'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def xy_downsample(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue