Fix formatter xy ndarray first prepend case

First allocation vs. first "prepend" of source data to an xy `ndarray`
format **must be mutex** in order to avoid a double prepend.

Previously when both blocks were executed we'd end up with
a `.xy_nd_start` that was decremented (at least) twice as much as it
should be on the first `.format_to_1d()` call which is obviously
incorrect (and causes problems for m4 downsampling as discussed below).
Further, since the underlying `ShmArray` buffer indexing is managed
(i.e. write-updated) completely independently from the incremental
formatter updates and internal xy indexing, we can't use
`ShmArray._first.value` and instead need to use the particular `.diff()`
output's prepend length value to decrement the `.xy_nd_start` on updates
after initial alloc.

Problems this resolves with m4:
- m4 uses a x-domain diff to calculate the number of "frames" to
  downsample to, this is normally based on the ratio of pixel columns on
  screen vs. the size of the input xy data.
- previously using an int-index (not epoch time) the max diff between
  first and last index would be the size of the input buffer and thus
  would never cause a large mem allocation issue (though it may have
  been inefficient in terms of needed size).
- with an epoch time index this max diff could explode if you had some
  near-now epoch time stamp **minus** an x-allocation value: generally
  some value in `[0.5, -0.5]` which would result in a massive frames and
  thus internal `np.ndarray()` allocation causing either a crash in
  `numba` code or actual system mem over allocation.

Further, put in some more x value checks that trigger breakpoints if we
detect values that caused this issue - we'll remove em after this has
been tested enough.
multichartz
Tyler Goodlet 2022-12-13 13:05:56 -05:00
parent d85a8c09fa
commit a36b82f781
1 changed files with 50 additions and 38 deletions

View File

@ -277,9 +277,9 @@ class IncrementalFormatter(msgspec.Struct):
post_slice,
) = self.diff(new_read)
# we first need to allocate xy data arrays
# from the source data.
if self.y_nd is None:
# we first need to allocate xy data arrays
# from the source data.
self.xy_nd_start = shm._first.value
self.xy_nd_stop = shm._last.value
self.x_nd, self.y_nd = self.allocate_xy_nd(
@ -288,45 +288,52 @@ class IncrementalFormatter(msgspec.Struct):
)
profiler('allocated xy history')
if prepend_len:
self.incr_update_xy_nd(
shm,
array_key,
# once allocated we do incremental pre/append
# updates from the diff with the source buffer.
else:
if prepend_len:
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
shm._array[pre_slice],
pre_slice,
prepend_len,
self.incr_update_xy_nd(
shm,
array_key,
self.xy_nd_start,
self.xy_nd_stop,
is_append=False,
)
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
shm._array[pre_slice],
pre_slice,
prepend_len,
# self.y_nd[y_nd_slc] = new_y_nd
self.xy_nd_start = shm._first.value
profiler('prepended xy history: {prepend_length}')
self.xy_nd_start,
self.xy_nd_stop,
is_append=False,
)
if append_len:
self.incr_update_xy_nd(
shm,
array_key,
self.xy_nd_start -= prepend_len
profiler('prepended xy history: {prepend_length}')
shm._array[post_slice],
post_slice,
append_len,
xndall = self.x_nd[self.xy_slice]
if xndall.any() and (xndall == 0.5).any():
breakpoint()
self.xy_nd_start,
self.xy_nd_stop,
is_append=True,
)
self.xy_nd_stop = shm._last.value
profiler('appened xy history: {append_length}')
if append_len:
self.incr_update_xy_nd(
shm,
array_key,
shm._array[post_slice],
post_slice,
append_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=True,
)
self.xy_nd_stop += append_len
profiler('appened xy history: {append_length}')
view_changed: bool = False
view_range: tuple[int, int] = (ivl, ivr)
@ -490,9 +497,14 @@ class IncrementalFormatter(msgspec.Struct):
'''
# NOTE: we don't include the very last datum which is filled in
# normally by another graphics object.
x_1d = array[self.index_field][:-1]
if x_1d.any() and (x_1d[-1] == 0.5).any():
breakpoint()
y_1d = array[array_key][:-1]
return (
array[self.index_field][:-1],
array[array_key][:-1],
x_1d,
y_1d,
# 1d connection array or style-key to
# ``pg.functions.arrayToQPath()``
@ -794,7 +806,7 @@ class StepCurveFmtr(IncrementalFormatter):
if not x_1d.size == y_1d.size:
breakpoint()
if x_1d.any() and (x_1d[-1] == 0.5).any():
if x_1d.any() and (x_1d == 0.5).any():
breakpoint()
# debugging