Fix shm index update race
There was a lingering issue where the fsp daemon would sync its shm array with the source data and we'd set the start/end indices to the same value. Under some races a reader would then read an empty `.array` which it wasn't expecting. This fixes that as well as tidies up the `ShmArray.push()` logic and adds a temporary check in `.array` for zero length if the array hasn't been written yet. We can now start removing read array length checks in consumer code and hopefully no more races will show up.vlm_plotz_backup
parent
cd4f0e3276
commit
614bb1717b
|
@ -168,6 +168,7 @@ class ShmArray:
|
||||||
|
|
||||||
self._len = len(shmarr)
|
self._len = len(shmarr)
|
||||||
self._shm = shm
|
self._shm = shm
|
||||||
|
self._post_init: bool = False
|
||||||
|
|
||||||
# pushing data does not write the index (aka primary key)
|
# pushing data does not write the index (aka primary key)
|
||||||
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
|
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
|
||||||
|
@ -196,19 +197,37 @@ class ShmArray:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def array(self) -> np.ndarray:
|
def array(self) -> np.ndarray:
|
||||||
return self._array[self._first.value:self._last.value]
|
'''Return an up-to-date ``np.ndarray`` view of the
|
||||||
|
so-far-written data to the underlying shm buffer.
|
||||||
|
|
||||||
|
'''
|
||||||
|
a = self._array[self._first.value:self._last.value]
|
||||||
|
|
||||||
|
# first, last = self._first.value, self._last.value
|
||||||
|
# a = self._array[first:last]
|
||||||
|
|
||||||
|
# TODO: eventually comment this once we've not seen it in the
|
||||||
|
# wild in a long time..
|
||||||
|
# XXX: race where first/last indexes cause a reader
|
||||||
|
# to load an empty array..
|
||||||
|
if len(a) == 0 and self._post_init:
|
||||||
|
raise RuntimeError('Empty array race condition hit!?')
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
return a
|
||||||
|
|
||||||
def last(
|
def last(
|
||||||
self,
|
self,
|
||||||
length: int = 1,
|
length: int = 1,
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
return self.array[-length:]
|
return self.array[-length]
|
||||||
|
|
||||||
def push(
|
def push(
|
||||||
self,
|
self,
|
||||||
data: np.ndarray,
|
data: np.ndarray,
|
||||||
|
|
||||||
prepend: bool = False,
|
prepend: bool = False,
|
||||||
|
start: Optional[int] = None,
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
'''Ring buffer like "push" to append data
|
'''Ring buffer like "push" to append data
|
||||||
|
@ -217,17 +236,18 @@ class ShmArray:
|
||||||
NB: no actual ring logic yet to give a "loop around" on overflow
|
NB: no actual ring logic yet to give a "loop around" on overflow
|
||||||
condition, lel.
|
condition, lel.
|
||||||
'''
|
'''
|
||||||
|
self._post_init = True
|
||||||
length = len(data)
|
length = len(data)
|
||||||
|
index = start or self._last.value
|
||||||
|
|
||||||
if prepend:
|
if prepend:
|
||||||
index = self._first.value - length
|
index = self._first.value - length
|
||||||
|
|
||||||
if index < 0:
|
if index < 0:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'Array size of {self._len} was overrun during prepend.\n'
|
f'Array size of {self._len} was overrun during prepend.\n'
|
||||||
'You have passed {abs(index)} too many datums.'
|
'You have passed {abs(index)} too many datums.'
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
index = self._last.value
|
|
||||||
|
|
||||||
end = index + length
|
end = index + length
|
||||||
|
|
||||||
|
@ -235,11 +255,22 @@ class ShmArray:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._array[fields][index:end] = data[fields][:]
|
self._array[fields][index:end] = data[fields][:]
|
||||||
|
|
||||||
|
# NOTE: there was a race here between updating
|
||||||
|
# the first and last indices and when the next reader
|
||||||
|
# tries to access ``.array`` (which due to the index
|
||||||
|
# overlap will be empty). Pretty sure we've fixed it now
|
||||||
|
# but leaving this here as a reminder.
|
||||||
if prepend:
|
if prepend:
|
||||||
|
assert index < self._first.value
|
||||||
|
|
||||||
|
if index < self._first.value:
|
||||||
self._first.value = index
|
self._first.value = index
|
||||||
else:
|
else:
|
||||||
self._last.value = end
|
self._last.value = end
|
||||||
|
|
||||||
return end
|
return end
|
||||||
|
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
# shoudl raise if diff detected
|
# shoudl raise if diff detected
|
||||||
self.diff_err_fields(data)
|
self.diff_err_fields(data)
|
||||||
|
@ -301,16 +332,19 @@ _default_size = 3 * _secs_in_day
|
||||||
|
|
||||||
|
|
||||||
def open_shm_array(
|
def open_shm_array(
|
||||||
|
|
||||||
key: Optional[str] = None,
|
key: Optional[str] = None,
|
||||||
size: int = _default_size,
|
size: int = _default_size,
|
||||||
dtype: Optional[np.dtype] = None,
|
dtype: Optional[np.dtype] = None,
|
||||||
readonly: bool = False,
|
readonly: bool = False,
|
||||||
|
|
||||||
) -> ShmArray:
|
) -> ShmArray:
|
||||||
"""Open a memory shared ``numpy`` using the standard library.
|
'''Open a memory shared ``numpy`` using the standard library.
|
||||||
|
|
||||||
This call unlinks (aka permanently destroys) the buffer on teardown
|
This call unlinks (aka permanently destroys) the buffer on teardown
|
||||||
and thus should be used from the parent-most accessor (process).
|
and thus should be used from the parent-most accessor (process).
|
||||||
"""
|
|
||||||
|
'''
|
||||||
# create new shared mem segment for which we
|
# create new shared mem segment for which we
|
||||||
# have write permission
|
# have write permission
|
||||||
a = np.zeros(size, dtype=dtype)
|
a = np.zeros(size, dtype=dtype)
|
||||||
|
|
|
@ -936,11 +936,12 @@ class ChartPlotWidget(pg.PlotWidget):
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> pg.GraphicsObject:
|
) -> pg.GraphicsObject:
|
||||||
"""Update the named internal graphics from ``array``.
|
'''Update the named internal graphics from ``array``.
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
assert len(array)
|
||||||
data_key = array_key or graphics_name
|
data_key = array_key or graphics_name
|
||||||
|
|
||||||
if graphics_name not in self._overlays:
|
if graphics_name not in self._overlays:
|
||||||
self._arrays['ohlc'] = array
|
self._arrays['ohlc'] = array
|
||||||
else:
|
else:
|
||||||
|
@ -948,21 +949,19 @@ class ChartPlotWidget(pg.PlotWidget):
|
||||||
|
|
||||||
curve = self._graphics[graphics_name]
|
curve = self._graphics[graphics_name]
|
||||||
|
|
||||||
if len(array):
|
# NOTE: back when we weren't implementing the curve graphics
|
||||||
# TODO: we should instead implement a diff based
|
# ourselves you'd have updates using this method:
|
||||||
# "only update with new items" on the pg.PlotCurveItem
|
# curve.setData(y=array[graphics_name], x=array['index'], **kwargs)
|
||||||
# one place to dig around this might be the `QBackingStore`
|
|
||||||
# https://doc.qt.io/qt-5/qbackingstore.html
|
|
||||||
|
|
||||||
# NOTE: back when we weren't implementing the curve graphics
|
# NOTE: graphics **must** implement a diff based update
|
||||||
# ourselves you'd have updates using this method:
|
# operation where an internal ``FastUpdateCurve._xrange`` is
|
||||||
# curve.setData(y=array[graphics_name], x=array['index'], **kwargs)
|
# used to determine if the underlying path needs to be
|
||||||
|
# pre/ap-pended.
|
||||||
curve.update_from_array(
|
curve.update_from_array(
|
||||||
x=array['index'],
|
x=array['index'],
|
||||||
y=array[data_key],
|
y=array[data_key],
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
return curve
|
return curve
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue