diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 103fa6d5..2ffa8465 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -152,7 +152,8 @@ def _make_token( class ShmArray: - """A shared memory ``numpy`` (compatible) array API. + ''' + A shared memory ``numpy`` (compatible) array API. An underlying shared memory buffer is allocated based on a user specified ``numpy.ndarray``. This fixed size array @@ -162,7 +163,7 @@ class ShmArray: ``SharedInt`` interfaces) values such that multiple processes can interact with the same array using a synchronized-index. - """ + ''' def __init__( self, shmarr: np.ndarray, @@ -209,7 +210,8 @@ class ShmArray: @property def array(self) -> np.ndarray: - '''Return an up-to-date ``np.ndarray`` view of the + ''' + Return an up-to-date ``np.ndarray`` view of the so-far-written data to the underlying shm buffer. ''' @@ -238,19 +240,21 @@ class ShmArray: self, data: np.ndarray, + field_map: Optional[dict[str, str]] = None, prepend: bool = False, start: Optional[int] = None, ) -> int: - '''Ring buffer like "push" to append data + ''' + Ring buffer like "push" to append data into the buffer and return updated "last" index. NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. + ''' - self._post_init = True length = len(data) - index = start or self._last.value + index = start if start is not None else self._last.value if prepend: index = self._first.value - length @@ -263,10 +267,15 @@ class ShmArray: end = index + length - fields = self._write_fields + if field_map: + src_names, dst_names = zip(*field_map.items()) + else: + dst_names = src_names = self._write_fields try: - self._array[fields][index:end] = data[fields][:] + self._array[ + list(dst_names) + ][index:end] = data[list(src_names)][:] # NOTE: there was a race here between updating # the first and last indices and when the next reader @@ -281,9 +290,18 @@ class ShmArray: else: self._last.value = end + self._post_init = True return end except ValueError as err: + if field_map: + raise + # dsize = data.size + # if dsize > self._len: + # raise ValueError( + # f'Input data is size {dsize} > our shm buffer {self._len}' + # ) + # should raise if diff detected self.diff_err_fields(data) raise err @@ -339,7 +357,7 @@ class ShmArray: # how much is probably dependent on lifestyle _secs_in_day = int(60 * 60 * 24) # we try for 3 times but only on a run-every-other-day kinda week. -_default_size = 3 * _secs_in_day +_default_size = 4 * _secs_in_day def open_shm_array( @@ -392,7 +410,24 @@ def open_shm_array( ) ) - last.value = first.value = int(_secs_in_day) + # start the "real-time" updated section after 3-days worth of 1s + # sampled OHLC. this allows appending up to a days worth from + # tick/quote feeds before having to flush to a (tsdb) storage + # backend, and looks something like, + # ------------------------- + # | | i + # _________________________ + # <-------------> <-------> + # history real-time + # + # Once fully "prepended", the history section will leave the + # ``ShmArray._start.value: int = 0`` and the yet-to-be written + # real-time section will start at ``ShmArray.index: int``. + + # this sets the index to 3/4 of the length of the buffer + # leaving a "days worth of second samples" for the real-time + # section. + last.value = first.value = int(3*_secs_in_day) shmarr = ShmArray( array,