From 0663880a6db421f39dcd9b4b39b70976b8eed9f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 13 Dec 2022 13:05:56 -0500 Subject: [PATCH] Fix formatter xy ndarray first prepend case First allocation vs. first "prepend" of source data to an xy `ndarray` format **must be mutex** in order to avoid a double prepend. Previously when both blocks were executed we'd end up with a `.xy_nd_start` that was decremented (at least) twice as much as it should be on the first `.format_to_1d()` call which is obviously incorrect (and causes problems for m4 downsampling as discussed below). Further, since the underlying `ShmArray` buffer indexing is managed (i.e. write-updated) completely independently from the incremental formatter updates and internal xy indexing, we can't use `ShmArray._first.value` and instead need to use the particular `.diff()` output's prepend length value to decrement the `.xy_nd_start` on updates after initial alloc. Problems this resolves with m4: - m4 uses a x-domain diff to calculate the number of "frames" to downsample to, this is normally based on the ratio of pixel columns on screen vs. the size of the input xy data. - previously using an int-index (not epoch time) the max diff between first and last index would be the size of the input buffer and thus would never cause a large mem allocation issue (though it may have been inefficient in terms of needed size). - with an epoch time index this max diff could explode if you had some near-now epoch time stamp **minus** an x-allocation value: generally some value in `[0.5, -0.5]` which would result in a massive frames and thus internal `np.ndarray()` allocation causing either a crash in `numba` code or actual system mem over allocation. Further, put in some more x value checks that trigger breakpoints if we detect values that caused this issue - we'll remove em after this has been tested enough. --- piker/data/_formatters.py | 88 ++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/piker/data/_formatters.py b/piker/data/_formatters.py index 0c402aa4..9d8fc00d 100644 --- a/piker/data/_formatters.py +++ b/piker/data/_formatters.py @@ -278,9 +278,9 @@ class IncrementalFormatter(msgspec.Struct): post_slice, ) = self.diff(new_read) + # we first need to allocate xy data arrays + # from the source data. if self.y_nd is None: - # we first need to allocate xy data arrays - # from the source data. self.xy_nd_start = shm._first.value self.xy_nd_stop = shm._last.value self.x_nd, self.y_nd = self.allocate_xy_nd( @@ -289,45 +289,52 @@ class IncrementalFormatter(msgspec.Struct): ) profiler('allocated xy history') - if prepend_len: - self.incr_update_xy_nd( - shm, - array_key, + # once allocated we do incremental pre/append + # updates from the diff with the source buffer. + else: + if prepend_len: - # this is the pre-sliced, "normally expected" - # new data that an updater would normally be - # expected to process, however in some cases (like - # step curves) the updater routine may want to do - # the source history-data reading itself, so we pass - # both here. - shm._array[pre_slice], - pre_slice, - prepend_len, + self.incr_update_xy_nd( + shm, + array_key, - self.xy_nd_start, - self.xy_nd_stop, - is_append=False, - ) + # this is the pre-sliced, "normally expected" + # new data that an updater would normally be + # expected to process, however in some cases (like + # step curves) the updater routine may want to do + # the source history-data reading itself, so we pass + # both here. + shm._array[pre_slice], + pre_slice, + prepend_len, - # self.y_nd[y_nd_slc] = new_y_nd - self.xy_nd_start = shm._first.value - profiler('prepended xy history: {prepend_length}') + self.xy_nd_start, + self.xy_nd_stop, + is_append=False, + ) - if append_len: - self.incr_update_xy_nd( - shm, - array_key, + self.xy_nd_start -= prepend_len + profiler('prepended xy history: {prepend_length}') - shm._array[post_slice], - post_slice, - append_len, + xndall = self.x_nd[self.xy_slice] + if xndall.any() and (xndall == 0.5).any(): + breakpoint() - self.xy_nd_start, - self.xy_nd_stop, - is_append=True, - ) - self.xy_nd_stop = shm._last.value - profiler('appened xy history: {append_length}') + if append_len: + self.incr_update_xy_nd( + shm, + array_key, + + shm._array[post_slice], + post_slice, + append_len, + + self.xy_nd_start, + self.xy_nd_stop, + is_append=True, + ) + self.xy_nd_stop += append_len + profiler('appened xy history: {append_length}') view_changed: bool = False view_range: tuple[int, int] = (ivl, ivr) @@ -491,9 +498,14 @@ class IncrementalFormatter(msgspec.Struct): ''' # NOTE: we don't include the very last datum which is filled in # normally by another graphics object. + x_1d = array[self.index_field][:-1] + if x_1d.any() and (x_1d[-1] == 0.5).any(): + breakpoint() + + y_1d = array[array_key][:-1] return ( - array[self.index_field][:-1], - array[array_key][:-1], + x_1d, + y_1d, # 1d connection array or style-key to # ``pg.functions.arrayToQPath()`` @@ -797,7 +809,7 @@ class StepCurveFmtr(IncrementalFormatter): if not x_1d.size == y_1d.size: breakpoint() - if x_1d.any() and (x_1d[-1] == 0.5).any(): + if x_1d.any() and (x_1d == 0.5).any(): breakpoint() # debugging