From 17491ba8194d10420ff2b1f2245aa7bc1a171702 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Sep 2020 09:03:11 -0400 Subject: [PATCH] Disconnect stdlib's resource_tracker, fix .push() Logic in `SharedArray.push()` was totally wrong. Remove all the `multiprocessing.resource_tracker` crap such that we aren't loading an extra process at every layer and we don't get tons of errors when cleaning on in an SC way. --- piker/data/_sharedmem.py | 55 ++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index cb565b26..cfe162a7 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -1,13 +1,36 @@ """ -NumPy based shared memory for real-time FSP. +NumPy compatible shared memory buffers for real-time FSP. """ from sys import byteorder from contextlib import contextmanager from typing import Tuple, Optional from multiprocessing import shared_memory +from multiprocessing import resource_tracker as mantracker +from _posixshmem import shm_unlink import numpy as np -# from numpy.lib import recfunctions as rfn + + +# Tell the "resource tracker" thing to fuck off. +class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + +# "know your land and know your prey" +# https://www.dailymotion.com/video/x6ozzco +mantracker._resource_tracker = ManTracker() +mantracker.register = mantracker._resource_tracker.register +mantracker.ensure_running = mantracker._resource_tracker.ensure_running +ensure_running = mantracker._resource_tracker.ensure_running +mantracker.unregister = mantracker._resource_tracker.unregister +mantracker.getfd = mantracker._resource_tracker.getfd base_ohlc_dtype = np.dtype( @@ -88,22 +111,31 @@ class SharedArray: self, data: np.ndarray, ) -> int: - # push array data and return updated index + """Ring buffer like "push" to append data + into the buffer and return updated index. + """ length = len(data) - self._array[self._i.value:length] = data - self._i.value += length + # TODO: use .index for actual ring logic? + index = self._i.value + end = index + length + self._array[index:end] = data[:] + self._i.value = end + return end def close(self) -> None: self._i._shm.close() self._shm.close() def destroy(self) -> None: - self._i._shm.unlink() - self._shm.unlink() + if shared_memory._USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + shm_unlink(self._i._shm.name) + shm_unlink(self._shm.name) - # def flush(self) -> None: - # # flush to storage backend? - # ... + def flush(self) -> None: + # TODO: flush to storage backend like markestore? + ... @contextmanager @@ -155,7 +187,8 @@ def attach_shared_array( dtype: np.dtype = base_ohlc_dtype, readonly: bool = True, ) -> SharedArray: - """Load and attach to an existing shared memory array. + """Load and attach to an existing shared memory array previously + created by another process using ``open_shared_array``. """ array_name, counter_name = token