Support an array field map to `ShmArray.push()`, start index 3days in
							parent
							
								
									00d7bb089f
								
							
						
					
					
						commit
						c60d523428
					
				| 
						 | 
					@ -152,7 +152,8 @@ def _make_token(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class ShmArray:
 | 
					class ShmArray:
 | 
				
			||||||
    """A shared memory ``numpy`` (compatible) array API.
 | 
					    '''
 | 
				
			||||||
 | 
					    A shared memory ``numpy`` (compatible) array API.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    An underlying shared memory buffer is allocated based on
 | 
					    An underlying shared memory buffer is allocated based on
 | 
				
			||||||
    a user specified ``numpy.ndarray``. This fixed size array
 | 
					    a user specified ``numpy.ndarray``. This fixed size array
 | 
				
			||||||
| 
						 | 
					@ -162,7 +163,7 @@ class ShmArray:
 | 
				
			||||||
    ``SharedInt`` interfaces) values such that multiple processes can
 | 
					    ``SharedInt`` interfaces) values such that multiple processes can
 | 
				
			||||||
    interact with the same array using a synchronized-index.
 | 
					    interact with the same array using a synchronized-index.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    """
 | 
					    '''
 | 
				
			||||||
    def __init__(
 | 
					    def __init__(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        shmarr: np.ndarray,
 | 
					        shmarr: np.ndarray,
 | 
				
			||||||
| 
						 | 
					@ -209,7 +210,8 @@ class ShmArray:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
    def array(self) -> np.ndarray:
 | 
					    def array(self) -> np.ndarray:
 | 
				
			||||||
        '''Return an up-to-date ``np.ndarray`` view of the
 | 
					        '''
 | 
				
			||||||
 | 
					        Return an up-to-date ``np.ndarray`` view of the
 | 
				
			||||||
        so-far-written data to the underlying shm buffer.
 | 
					        so-far-written data to the underlying shm buffer.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
| 
						 | 
					@ -238,19 +240,21 @@ class ShmArray:
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        data: np.ndarray,
 | 
					        data: np.ndarray,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        field_map: Optional[dict[str, str]] = None,
 | 
				
			||||||
        prepend: bool = False,
 | 
					        prepend: bool = False,
 | 
				
			||||||
        start: Optional[int] = None,
 | 
					        start: Optional[int] = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> int:
 | 
					    ) -> int:
 | 
				
			||||||
        '''Ring buffer like "push" to append data
 | 
					        '''
 | 
				
			||||||
 | 
					        Ring buffer like "push" to append data
 | 
				
			||||||
        into the buffer and return updated "last" index.
 | 
					        into the buffer and return updated "last" index.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        NB: no actual ring logic yet to give a "loop around" on overflow
 | 
					        NB: no actual ring logic yet to give a "loop around" on overflow
 | 
				
			||||||
        condition, lel.
 | 
					        condition, lel.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        self._post_init = True
 | 
					 | 
				
			||||||
        length = len(data)
 | 
					        length = len(data)
 | 
				
			||||||
        index = start or self._last.value
 | 
					        index = start if start is not None else self._last.value
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if prepend:
 | 
					        if prepend:
 | 
				
			||||||
            index = self._first.value - length
 | 
					            index = self._first.value - length
 | 
				
			||||||
| 
						 | 
					@ -263,10 +267,15 @@ class ShmArray:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        end = index + length
 | 
					        end = index + length
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        fields = self._write_fields
 | 
					        if field_map:
 | 
				
			||||||
 | 
					            src_names, dst_names = zip(*field_map.items())
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            dst_names = src_names = self._write_fields
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            self._array[fields][index:end] = data[fields][:]
 | 
					            self._array[
 | 
				
			||||||
 | 
					                list(dst_names)
 | 
				
			||||||
 | 
					            ][index:end] = data[list(src_names)][:]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: there was a race here between updating
 | 
					            # NOTE: there was a race here between updating
 | 
				
			||||||
            # the first and last indices and when the next reader
 | 
					            # the first and last indices and when the next reader
 | 
				
			||||||
| 
						 | 
					@ -281,9 +290,18 @@ class ShmArray:
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                self._last.value = end
 | 
					                self._last.value = end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            self._post_init = True
 | 
				
			||||||
            return end
 | 
					            return end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        except ValueError as err:
 | 
					        except ValueError as err:
 | 
				
			||||||
 | 
					            if field_map:
 | 
				
			||||||
 | 
					                raise
 | 
				
			||||||
 | 
					            # dsize = data.size
 | 
				
			||||||
 | 
					            # if dsize > self._len:
 | 
				
			||||||
 | 
					            #     raise ValueError(
 | 
				
			||||||
 | 
					            #         f'Input data is size {dsize} > our shm buffer {self._len}'
 | 
				
			||||||
 | 
					            #     )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # should raise if diff detected
 | 
					            # should raise if diff detected
 | 
				
			||||||
            self.diff_err_fields(data)
 | 
					            self.diff_err_fields(data)
 | 
				
			||||||
            raise err
 | 
					            raise err
 | 
				
			||||||
| 
						 | 
					@ -339,7 +357,7 @@ class ShmArray:
 | 
				
			||||||
# how  much is probably dependent on lifestyle
 | 
					# how  much is probably dependent on lifestyle
 | 
				
			||||||
_secs_in_day = int(60 * 60 * 24)
 | 
					_secs_in_day = int(60 * 60 * 24)
 | 
				
			||||||
# we try for 3 times but only on a run-every-other-day kinda week.
 | 
					# we try for 3 times but only on a run-every-other-day kinda week.
 | 
				
			||||||
_default_size = 3 * _secs_in_day
 | 
					_default_size = 4 * _secs_in_day
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def open_shm_array(
 | 
					def open_shm_array(
 | 
				
			||||||
| 
						 | 
					@ -392,7 +410,24 @@ def open_shm_array(
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    last.value = first.value = int(_secs_in_day)
 | 
					    # start the "real-time" updated section after 3-days worth of 1s
 | 
				
			||||||
 | 
					    # sampled OHLC. this allows appending up to a days worth from
 | 
				
			||||||
 | 
					    # tick/quote feeds before having to flush to a (tsdb) storage
 | 
				
			||||||
 | 
					    # backend, and looks something like,
 | 
				
			||||||
 | 
					    # -------------------------
 | 
				
			||||||
 | 
					    # |              |        i
 | 
				
			||||||
 | 
					    # _________________________
 | 
				
			||||||
 | 
					    # <-------------> <------->
 | 
				
			||||||
 | 
					    #  history         real-time
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    # Once fully "prepended", the history section will leave the
 | 
				
			||||||
 | 
					    # ``ShmArray._start.value: int = 0`` and the yet-to-be written
 | 
				
			||||||
 | 
					    # real-time section will start at ``ShmArray.index: int``.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # this sets the index to 3/4 of the length of the buffer
 | 
				
			||||||
 | 
					    # leaving a "days worth of second samples" for the real-time
 | 
				
			||||||
 | 
					    # section.
 | 
				
			||||||
 | 
					    last.value = first.value = int(3*_secs_in_day)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    shmarr = ShmArray(
 | 
					    shmarr = ShmArray(
 | 
				
			||||||
        array,
 | 
					        array,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue