diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 53c40423..8afa3214 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -168,6 +168,7 @@ class ShmArray: self._len = len(shmarr) self._shm = shm + self._post_init: bool = False # pushing data does not write the index (aka primary key) self._write_fields = list(shmarr.dtype.fields.keys())[1:] @@ -196,19 +197,37 @@ class ShmArray: @property def array(self) -> np.ndarray: - return self._array[self._first.value:self._last.value] + '''Return an up-to-date ``np.ndarray`` view of the + so-far-written data to the underlying shm buffer. + + ''' + a = self._array[self._first.value:self._last.value] + + # first, last = self._first.value, self._last.value + # a = self._array[first:last] + + # TODO: eventually comment this once we've not seen it in the + # wild in a long time.. + # XXX: race where first/last indexes cause a reader + # to load an empty array.. + if len(a) == 0 and self._post_init: + raise RuntimeError('Empty array race condition hit!?') + # breakpoint() + + return a def last( self, length: int = 1, ) -> np.ndarray: - return self.array[-length:] + return self.array[-length] def push( self, data: np.ndarray, prepend: bool = False, + start: Optional[int] = None, ) -> int: '''Ring buffer like "push" to append data @@ -217,17 +236,18 @@ class ShmArray: NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. ''' + self._post_init = True length = len(data) + index = start or self._last.value if prepend: index = self._first.value - length + if index < 0: raise ValueError( f'Array size of {self._len} was overrun during prepend.\n' 'You have passed {abs(index)} too many datums.' ) - else: - index = self._last.value end = index + length @@ -235,11 +255,22 @@ class ShmArray: try: self._array[fields][index:end] = data[fields][:] + + # NOTE: there was a race here between updating + # the first and last indices and when the next reader + # tries to access ``.array`` (which due to the index + # overlap will be empty). Pretty sure we've fixed it now + # but leaving this here as a reminder. if prepend: + assert index < self._first.value + + if index < self._first.value: self._first.value = index else: self._last.value = end + return end + except ValueError as err: # shoudl raise if diff detected self.diff_err_fields(data) @@ -301,16 +332,19 @@ _default_size = 3 * _secs_in_day def open_shm_array( + key: Optional[str] = None, size: int = _default_size, dtype: Optional[np.dtype] = None, readonly: bool = False, + ) -> ShmArray: - """Open a memory shared ``numpy`` using the standard library. + '''Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown and thus should be used from the parent-most accessor (process). - """ + + ''' # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype)