Fix shm index update race

There was a lingering issue where the fsp daemon would sync its shm
array with the source data and we'd set the start/end indices to the
same value. Under some races a reader would then read an empty `.array`
which it wasn't expecting. This fixes that as well as tidies up the
`ShmArray.push()` logic and adds a temporary check in `.array` for zero
length if the array hasn't been written yet.

We can now start removing read array length checks in consumer code
and hopefully no more races will show up.
fsp_drunken_alignment
Tyler Goodlet 2021-09-30 07:33:43 -04:00
parent 2cd594ed35
commit 2b9fb952a9
1 changed files with 40 additions and 6 deletions

View File

@ -168,6 +168,7 @@ class ShmArray:
self._len = len(shmarr) self._len = len(shmarr)
self._shm = shm self._shm = shm
self._post_init: bool = False
# pushing data does not write the index (aka primary key) # pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:] self._write_fields = list(shmarr.dtype.fields.keys())[1:]
@ -196,19 +197,37 @@ class ShmArray:
@property @property
def array(self) -> np.ndarray: def array(self) -> np.ndarray:
return self._array[self._first.value:self._last.value] '''Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer.
'''
a = self._array[self._first.value:self._last.value]
# first, last = self._first.value, self._last.value
# a = self._array[first:last]
# TODO: eventually comment this once we've not seen it in the
# wild in a long time..
# XXX: race where first/last indexes cause a reader
# to load an empty array..
if len(a) == 0 and self._post_init:
raise RuntimeError('Empty array race condition hit!?')
# breakpoint()
return a
def last( def last(
self, self,
length: int = 1, length: int = 1,
) -> np.ndarray: ) -> np.ndarray:
return self.array[-length:] return self.array[-length]
def push( def push(
self, self,
data: np.ndarray, data: np.ndarray,
prepend: bool = False, prepend: bool = False,
start: Optional[int] = None,
) -> int: ) -> int:
'''Ring buffer like "push" to append data '''Ring buffer like "push" to append data
@ -217,17 +236,18 @@ class ShmArray:
NB: no actual ring logic yet to give a "loop around" on overflow NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel. condition, lel.
''' '''
self._post_init = True
length = len(data) length = len(data)
index = start or self._last.value
if prepend: if prepend:
index = self._first.value - length index = self._first.value - length
if index < 0: if index < 0:
raise ValueError( raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n' f'Array size of {self._len} was overrun during prepend.\n'
'You have passed {abs(index)} too many datums.' 'You have passed {abs(index)} too many datums.'
) )
else:
index = self._last.value
end = index + length end = index + length
@ -235,11 +255,22 @@ class ShmArray:
try: try:
self._array[fields][index:end] = data[fields][:] self._array[fields][index:end] = data[fields][:]
# NOTE: there was a race here between updating
# the first and last indices and when the next reader
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if prepend: if prepend:
assert index < self._first.value
if index < self._first.value:
self._first.value = index self._first.value = index
else: else:
self._last.value = end self._last.value = end
return end return end
except ValueError as err: except ValueError as err:
# shoudl raise if diff detected # shoudl raise if diff detected
self.diff_err_fields(data) self.diff_err_fields(data)
@ -301,16 +332,19 @@ _default_size = 3 * _secs_in_day
def open_shm_array( def open_shm_array(
key: Optional[str] = None, key: Optional[str] = None,
size: int = _default_size, size: int = _default_size,
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
readonly: bool = False, readonly: bool = False,
) -> ShmArray: ) -> ShmArray:
"""Open a memory shared ``numpy`` using the standard library. '''Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process). and thus should be used from the parent-most accessor (process).
"""
'''
# create new shared mem segment for which we # create new shared mem segment for which we
# have write permission # have write permission
a = np.zeros(size, dtype=dtype) a = np.zeros(size, dtype=dtype)