Fix formatter xy ndarray first prepend case

First allocation vs. first "prepend" of source data to an xy `ndarray`
format **must be mutex** in order to avoid a double prepend.

Previously when both blocks were executed we'd end up with
a `.xy_nd_start` that was decremented (at least) twice as much as it
should be on the first `.format_to_1d()` call which is obviously
incorrect (and causes problems for m4 downsampling as discussed below).
Further, since the underlying `ShmArray` buffer indexing is managed
(i.e. write-updated) completely independently from the incremental
formatter updates and internal xy indexing, we can't use
`ShmArray._first.value` and instead need to use the particular `.diff()`
output's prepend length value to decrement the `.xy_nd_start` on updates
after initial alloc.

Problems this resolves with m4:
- m4 uses a x-domain diff to calculate the number of "frames" to
  downsample to, this is normally based on the ratio of pixel columns on
  screen vs. the size of the input xy data.
- previously using an int-index (not epoch time) the max diff between
  first and last index would be the size of the input buffer and thus
  would never cause a large mem allocation issue (though it may have
  been inefficient in terms of needed size).
- with an epoch time index this max diff could explode if you had some
  near-now epoch time stamp **minus** an x-allocation value: generally
  some value in `[0.5, -0.5]` which would result in a massive frames and
  thus internal `np.ndarray()` allocation causing either a crash in
  `numba` code or actual system mem over allocation.

Further, put in some more x value checks that trigger breakpoints if we
detect values that caused this issue - we'll remove em after this has
been tested enough.
epoch_indexing_and_dataviz_layer
Tyler Goodlet 2022-12-13 13:05:56 -05:00
parent 3bed142d15
commit 0663880a6d
1 changed files with 50 additions and 38 deletions

View File

@ -278,9 +278,9 @@ class IncrementalFormatter(msgspec.Struct):
post_slice, post_slice,
) = self.diff(new_read) ) = self.diff(new_read)
# we first need to allocate xy data arrays
# from the source data.
if self.y_nd is None: if self.y_nd is None:
# we first need to allocate xy data arrays
# from the source data.
self.xy_nd_start = shm._first.value self.xy_nd_start = shm._first.value
self.xy_nd_stop = shm._last.value self.xy_nd_stop = shm._last.value
self.x_nd, self.y_nd = self.allocate_xy_nd( self.x_nd, self.y_nd = self.allocate_xy_nd(
@ -289,45 +289,52 @@ class IncrementalFormatter(msgspec.Struct):
) )
profiler('allocated xy history') profiler('allocated xy history')
if prepend_len: # once allocated we do incremental pre/append
self.incr_update_xy_nd( # updates from the diff with the source buffer.
shm, else:
array_key, if prepend_len:
# this is the pre-sliced, "normally expected" self.incr_update_xy_nd(
# new data that an updater would normally be shm,
# expected to process, however in some cases (like array_key,
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
shm._array[pre_slice],
pre_slice,
prepend_len,
self.xy_nd_start, # this is the pre-sliced, "normally expected"
self.xy_nd_stop, # new data that an updater would normally be
is_append=False, # expected to process, however in some cases (like
) # step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
shm._array[pre_slice],
pre_slice,
prepend_len,
# self.y_nd[y_nd_slc] = new_y_nd self.xy_nd_start,
self.xy_nd_start = shm._first.value self.xy_nd_stop,
profiler('prepended xy history: {prepend_length}') is_append=False,
)
if append_len: self.xy_nd_start -= prepend_len
self.incr_update_xy_nd( profiler('prepended xy history: {prepend_length}')
shm,
array_key,
shm._array[post_slice], xndall = self.x_nd[self.xy_slice]
post_slice, if xndall.any() and (xndall == 0.5).any():
append_len, breakpoint()
self.xy_nd_start, if append_len:
self.xy_nd_stop, self.incr_update_xy_nd(
is_append=True, shm,
) array_key,
self.xy_nd_stop = shm._last.value
profiler('appened xy history: {append_length}') shm._array[post_slice],
post_slice,
append_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=True,
)
self.xy_nd_stop += append_len
profiler('appened xy history: {append_length}')
view_changed: bool = False view_changed: bool = False
view_range: tuple[int, int] = (ivl, ivr) view_range: tuple[int, int] = (ivl, ivr)
@ -491,9 +498,14 @@ class IncrementalFormatter(msgspec.Struct):
''' '''
# NOTE: we don't include the very last datum which is filled in # NOTE: we don't include the very last datum which is filled in
# normally by another graphics object. # normally by another graphics object.
x_1d = array[self.index_field][:-1]
if x_1d.any() and (x_1d[-1] == 0.5).any():
breakpoint()
y_1d = array[array_key][:-1]
return ( return (
array[self.index_field][:-1], x_1d,
array[array_key][:-1], y_1d,
# 1d connection array or style-key to # 1d connection array or style-key to
# ``pg.functions.arrayToQPath()`` # ``pg.functions.arrayToQPath()``
@ -797,7 +809,7 @@ class StepCurveFmtr(IncrementalFormatter):
if not x_1d.size == y_1d.size: if not x_1d.size == y_1d.size:
breakpoint() breakpoint()
if x_1d.any() and (x_1d[-1] == 0.5).any(): if x_1d.any() and (x_1d == 0.5).any():
breakpoint() breakpoint()
# debugging # debugging