From ec0be781f86763a45bf6636511af83b5632e4a97 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Dec 2020 10:30:31 -0500 Subject: [PATCH] Add prepend support to shm system --- piker/data/_sharedmem.py | 219 +++++++++++++++++++++++++++------------ 1 file changed, 150 insertions(+), 69 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 7f06bf78..fb442d7c 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -17,11 +17,10 @@ """ NumPy compatible shared memory buffers for real-time FSP. """ -from typing import List from dataclasses import dataclass, asdict from sys import byteorder -from typing import Tuple, Optional -from multiprocessing import shared_memory +from typing import List, Tuple, Optional +from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing import resource_tracker as mantracker from _posixshmem import shm_unlink @@ -29,7 +28,7 @@ import tractor import numpy as np from ..log import get_logger -from ._source import base_ohlc_dtype +from ._source import base_ohlc_dtype, base_iohlc_dtype log = get_logger(__name__) @@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd class SharedInt: + """Wrapper around a single entry shared memory array which + holds an ``int`` value used as an index counter. + + """ def __init__( self, - token: str, - create: bool = False, + shm: SharedMemory, ) -> None: - # create a single entry array for storing an index counter - self._shm = shared_memory.SharedMemory( - name=token, - create=create, - size=4, # std int - ) + self._shm = shm @property def value(self) -> int: @@ -79,7 +76,7 @@ class SharedInt: self._shm.buf[:] = value.to_bytes(4, byteorder) def destroy(self) -> None: - if shared_memory._USE_POSIX: + if _USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. shm_unlink(self._shm.name) @@ -91,7 +88,8 @@ class _Token: which can be used to key a system wide post shm entry. """ shm_name: str # this servers as a "key" value - shm_counter_name: str + shm_first_index_name: str + shm_last_index_name: str dtype_descr: List[Tuple[str]] def __post_init__(self): @@ -130,27 +128,47 @@ def _make_token( """Create a serializable token that can be used to access a shared array. """ - dtype = base_ohlc_dtype if dtype is None else dtype + dtype = base_iohlc_dtype if dtype is None else dtype return _Token( key, - key + "_counter", + key + "_first", + key + "_last", np.dtype(dtype).descr ) class ShmArray: + """A shared memory ``numpy`` (compatible) array API. + + An underlying shared memory buffer is allocated based on + a user specified ``numpy.ndarray``. This fixed size array + can be read and written to by pushing data both onto the "front" + or "back" of a set index range. The indexes for the "first" and + "last" index are themselves stored in shared memory (accessed via + ``SharedInt`` interfaces) values such that multiple processes can + interact with the same array using a synchronized-index. + + """ def __init__( self, shmarr: np.ndarray, - counter: SharedInt, - shm: shared_memory.SharedMemory, - readonly: bool = True, + first: SharedInt, + last: SharedInt, + shm: SharedMemory, + # readonly: bool = True, ) -> None: self._array = shmarr - self._i = counter + + # indexes for first and last indices corresponding + # to fille data + self._first = first + self._last = last + self._len = len(shmarr) self._shm = shm - self._readonly = readonly + + # pushing data does not write the index (aka primary key) + self._write_fields = list(shmarr.dtype.fields.keys())[1:] # TODO: ringbuf api? @@ -158,24 +176,25 @@ class ShmArray: def _token(self) -> _Token: return _Token( self._shm.name, - self._i._shm.name, + self._first._shm.name, + self._last._shm.name, self._array.dtype.descr, ) @property def token(self) -> dict: - """Shared memory token that can be serialized - and used by another process to attach to this array. + """Shared memory token that can be serialized and used by + another process to attach to this array. """ return self._token.as_msg() @property def index(self) -> int: - return self._i.value % self._len + return self._last.value % self._len @property def array(self) -> np.ndarray: - return self._array[:self._i.value] + return self._array[self._first.value:self._last.value] def last( self, @@ -186,62 +205,90 @@ class ShmArray: def push( self, data: np.ndarray, + prepend: bool = False, ) -> int: """Ring buffer like "push" to append data - into the buffer and return updated index. + into the buffer and return updated "last" index. """ length = len(data) - # TODO: use .index for actual ring logic? - index = self._i.value + + if prepend: + index = self._first.value - length + else: + index = self._last.value + end = index + length + + fields = self._write_fields + try: - self._array[index:end] = data[:] - self._i.value = end + self._array[fields][index:end] = data[fields][:] + if prepend: + self._first.value = index + else: + self._last.value = end return end except ValueError as err: - # reraise with any field discrepancy - our_fields, their_fields = ( - set(self._array.dtype.fields), - set(data.dtype.fields), + # shoudl raise if diff detected + self.diff_err_fields(data) + + raise err + + def diff_err_fields( + self, + data: np.ndarray, + ) -> None: + # reraise with any field discrepancy + our_fields, their_fields = ( + set(self._array.dtype.fields), + set(data.dtype.fields), + ) + + only_in_ours = our_fields - their_fields + only_in_theirs = their_fields - our_fields + + if only_in_ours: + raise TypeError( + f"Input array is missing field(s): {only_in_ours}" + ) + elif only_in_theirs: + raise TypeError( + f"Input array has unknown field(s): {only_in_theirs}" ) - only_in_ours = our_fields - their_fields - only_in_theirs = their_fields - our_fields - - if only_in_ours: - raise TypeError( - f"Input array is missing field(s): {only_in_ours}" - ) - elif only_in_theirs: - raise TypeError( - f"Input array has unknown field(s): {only_in_theirs}" - ) - else: - raise err + def prepend( + self, + data: np.ndarray, + ) -> int: + end = self.push(data, prepend=True) + assert end def close(self) -> None: - self._i._shm.close() + self._first._shm.close() + self._last._shm.close() self._shm.close() def destroy(self) -> None: - if shared_memory._USE_POSIX: + if _USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. shm_unlink(self._shm.name) - self._i.destroy() + + self._first.destroy() + self._last.destroy() def flush(self) -> None: # TODO: flush to storage backend like markestore? ... -_lotsa_5s = int(5 * 60 * 60 * 10 / 5) - +# how much is probably dependent on lifestyle +_secs_in_day = int(60 * 60 * 12) +_default_size = 2 * _secs_in_day def open_shm_array( key: Optional[str] = None, - # approx number of 5s bars in a "day" x2 - size: int = _lotsa_5s, + size: int = _default_size, dtype: Optional[np.dtype] = None, readonly: bool = False, ) -> ShmArray: @@ -253,7 +300,9 @@ def open_shm_array( # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype) - shm = shared_memory.SharedMemory( + a['index'] = np.arange(len(a)) + + shm = SharedMemory( name=key, create=True, size=a.nbytes @@ -267,17 +316,30 @@ def open_shm_array( dtype=dtype ) - counter = SharedInt( - token=token.shm_counter_name, - create=True, + # create single entry arrays for storing an first and last indices + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=True, + size=4, # std int + ) ) - counter.value = 0 + + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=True, + size=4, # std int + ) + ) + + last.value = first.value = int(_secs_in_day) shmarr = ShmArray( array, - counter, + first, + last, shm, - readonly=readonly, ) assert shmarr._token == token @@ -293,18 +355,23 @@ def open_shm_array( def attach_shm_array( token: Tuple[str, str, Tuple[str, str]], - size: int = _lotsa_5s, + size: int = _default_size, readonly: bool = True, ) -> ShmArray: - """Load and attach to an existing shared memory array previously + """Attach to an existing shared memory array previously created by another process using ``open_shared_array``. + + No new shared mem is allocated but wrapper types for read/write + access are constructed. """ token = _Token.from_msg(token) key = token.shm_name + if key in _known_tokens: assert _known_tokens[key] == token, "WTF" - shm = shared_memory.SharedMemory(name=key) + # attach to array buffer and view as per dtype + shm = SharedMemory(name=key) shmarr = np.ndarray( (size,), dtype=token.dtype_descr, @@ -312,15 +379,29 @@ def attach_shm_array( ) shmarr.setflags(write=int(not readonly)) - counter = SharedInt(token=token.shm_counter_name) + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=False, + size=4, # std int + ), + ) + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=False, + size=4, # std int + ), + ) + # make sure we can read - counter.value + first.value sha = ShmArray( shmarr, - counter, + first, + last, shm, - readonly=readonly, ) # read test sha.array