Support an array field map to `ShmArray.push()`, start index 3days in

broker_bumpz
Tyler Goodlet 2022-03-07 07:25:45 -05:00
parent 66ea74c6d5
commit 24fa1b8ff7
1 changed files with 45 additions and 10 deletions

View File

@ -152,7 +152,8 @@ def _make_token(
class ShmArray: class ShmArray:
"""A shared memory ``numpy`` (compatible) array API. '''
A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array a user specified ``numpy.ndarray``. This fixed size array
@ -162,7 +163,7 @@ class ShmArray:
``SharedInt`` interfaces) values such that multiple processes can ``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index. interact with the same array using a synchronized-index.
""" '''
def __init__( def __init__(
self, self,
shmarr: np.ndarray, shmarr: np.ndarray,
@ -209,7 +210,8 @@ class ShmArray:
@property @property
def array(self) -> np.ndarray: def array(self) -> np.ndarray:
'''Return an up-to-date ``np.ndarray`` view of the '''
Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer. so-far-written data to the underlying shm buffer.
''' '''
@ -238,19 +240,21 @@ class ShmArray:
self, self,
data: np.ndarray, data: np.ndarray,
field_map: Optional[dict[str, str]] = None,
prepend: bool = False, prepend: bool = False,
start: Optional[int] = None, start: Optional[int] = None,
) -> int: ) -> int:
'''Ring buffer like "push" to append data '''
Ring buffer like "push" to append data
into the buffer and return updated "last" index. into the buffer and return updated "last" index.
NB: no actual ring logic yet to give a "loop around" on overflow NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel. condition, lel.
''' '''
self._post_init = True
length = len(data) length = len(data)
index = start or self._last.value index = start if start is not None else self._last.value
if prepend: if prepend:
index = self._first.value - length index = self._first.value - length
@ -263,10 +267,15 @@ class ShmArray:
end = index + length end = index + length
fields = self._write_fields if field_map:
src_names, dst_names = zip(*field_map.items())
else:
dst_names = src_names = self._write_fields
try: try:
self._array[fields][index:end] = data[fields][:] self._array[
list(dst_names)
][index:end] = data[list(src_names)][:]
# NOTE: there was a race here between updating # NOTE: there was a race here between updating
# the first and last indices and when the next reader # the first and last indices and when the next reader
@ -281,9 +290,18 @@ class ShmArray:
else: else:
self._last.value = end self._last.value = end
self._post_init = True
return end return end
except ValueError as err: except ValueError as err:
if field_map:
raise
# dsize = data.size
# if dsize > self._len:
# raise ValueError(
# f'Input data is size {dsize} > our shm buffer {self._len}'
# )
# should raise if diff detected # should raise if diff detected
self.diff_err_fields(data) self.diff_err_fields(data)
raise err raise err
@ -339,7 +357,7 @@ class ShmArray:
# how much is probably dependent on lifestyle # how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 24) _secs_in_day = int(60 * 60 * 24)
# we try for 3 times but only on a run-every-other-day kinda week. # we try for 3 times but only on a run-every-other-day kinda week.
_default_size = 3 * _secs_in_day _default_size = 4 * _secs_in_day
def open_shm_array( def open_shm_array(
@ -392,7 +410,24 @@ def open_shm_array(
) )
) )
last.value = first.value = int(_secs_in_day) # start the "real-time" updated section after 3-days worth of 1s
# sampled OHLC. this allows appending up to a days worth from
# tick/quote feeds before having to flush to a (tsdb) storage
# backend, and looks something like,
# -------------------------
# | | i
# _________________________
# <-------------> <------->
# history real-time
#
# Once fully "prepended", the history section will leave the
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
# real-time section will start at ``ShmArray.index: int``.
# this sets the index to 3/4 of the length of the buffer
# leaving a "days worth of second samples" for the real-time
# section.
last.value = first.value = int(3*_secs_in_day)
shmarr = ShmArray( shmarr = ShmArray(
array, array,