From 712e36b9d57c7406ab4ef6c2649407faca7f944c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Sep 2020 09:25:11 -0400 Subject: [PATCH] First draft of a shared numpy array sub-system This adds a shared memory "incrementing array" sub-sys interface for single writer, multi-reader style data passing. The main motivation is to avoid multiple copies of the same `numpy` array across actors (plus now we can start being fancy like ray). There still seems to be some odd issues with the "resource tracker" complaining at teardown (likely partially to do with SIGINT stuff) so some further digging in the stdlib code is likely coming. Pertains to #107 and #98 --- piker/data/_sharedmem.py | 200 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 piker/data/_sharedmem.py diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py new file mode 100644 index 00000000..cb565b26 --- /dev/null +++ b/piker/data/_sharedmem.py @@ -0,0 +1,200 @@ +""" +NumPy based shared memory for real-time FSP. +""" +from sys import byteorder +from contextlib import contextmanager +from typing import Tuple, Optional +from multiprocessing import shared_memory + +import numpy as np +# from numpy.lib import recfunctions as rfn + + +base_ohlc_dtype = np.dtype( + [ + ('index', int), + ('time', float), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', int), + ] +) + + +class SharedInt: + def __init__( + self, + token: str, + create: bool = False, + ) -> None: + # create a single entry array for storing an index counter + self._shm = shared_memory.SharedMemory( + name=token, + create=create, + size=4, # std int + ) + self._token = self._shm.name + + @property + def value(self) -> int: + return int.from_bytes(self._shm.buf, byteorder) + + @value.setter + def value(self, value) -> None: + self._shm.buf[:] = value.to_bytes(4, byteorder) + + +class SharedArray: + def __init__( + self, + shmarr: np.ndarray, + counter: SharedInt, + shm: shared_memory.SharedMemory, + readonly: bool = True, + ) -> None: + self._array = shmarr + self._i = counter + self._len = len(shmarr) + self._shm = shm + self._readonly = readonly + + # TODO: ringbuf api? + + @property + def token(self) -> Tuple[str, str]: + return (self._shm.name, self._i._token) + + @property + def name(self) -> str: + return self._shm.name + + @property + def index(self) -> int: + return self._i.value % self._len + + @property + def array(self) -> np.ndarray: + return self._array[:self._i.value] + + def last( + self, + length: int = 1, + ) -> np.ndarray: + return self.array[-length:] + + def push( + self, + data: np.ndarray, + ) -> int: + # push array data and return updated index + length = len(data) + self._array[self._i.value:length] = data + self._i.value += length + + def close(self) -> None: + self._i._shm.close() + self._shm.close() + + def destroy(self) -> None: + self._i._shm.unlink() + self._shm.unlink() + + # def flush(self) -> None: + # # flush to storage backend? + # ... + + +@contextmanager +def open_shared_array( + name: Optional[str] = None, + create: bool = True, + # approx number of 5s bars in a "day" + size: int = int(60*60*10/5), + dtype: np.dtype = base_ohlc_dtype, + readonly: bool = False, +) -> SharedArray: + """Open a memory shared ``numpy`` using the standard library. + + This call unlinks (aka permanently destroys) the buffer on teardown + and thus should be used from the parent-most accessor (process). + """ + # create new shared mem segment for which we + # have write permission + a = np.zeros(size, dtype=dtype) + shm = shared_memory.SharedMemory(name=name, create=True, size=a.nbytes) + shmarr = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) + shmarr[:] = a[:] + shmarr.setflags(write=int(not readonly)) + + counter = SharedInt( + token=shm.name + "_counter", + create=True, + ) + counter.value = 0 + + sha = SharedArray( + shmarr, + counter, + shm, + readonly=readonly, + ) + try: + yield sha + finally: + sha.close() + print(f"UNLINKING {sha.token}") + sha.destroy() + + +@contextmanager +def attach_shared_array( + token: Tuple[str, str], + size: int = int(60*60*10/5), + dtype: np.dtype = base_ohlc_dtype, + readonly: bool = True, +) -> SharedArray: + """Load and attach to an existing shared memory array. + """ + array_name, counter_name = token + + shm = shared_memory.SharedMemory(name=array_name) + shmarr = np.ndarray((size,), dtype=dtype, buffer=shm.buf) + shmarr.setflags(write=int(not readonly)) + + counter = SharedInt(token=counter_name) + # make sure we can read + counter.value + + sha = SharedArray( + shmarr, + counter, + shm, + readonly=readonly, + ) + sha.array + try: + yield sha + finally: + pass + sha.close() + + +@contextmanager +def maybe_open_shared_array( + name: str, + **kwargs, +) -> SharedArray: + try: + with open_shared_array( + name=name, + **kwargs, + ) as shm: + yield shm + except FileExistsError: + with attach_shared_array( + token=(name, name + '_counter'), + **kwargs, + ) as shm: + yield shm