From b910eceb3b617c8cb55b63cad0bd31e7d4c7e0d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 May 2022 17:53:46 -0400 Subject: [PATCH] Add `ShmArray.ustruct()`: return an unstructured array copy We return a copy (since since a view doesn't seem to work..) of the (field filtered) shm array contents which is the same index-length as the source data. Further, fence off the resource tracker disable-hack into a helper routine. --- piker/data/_sharedmem.py | 95 ++++++++++++++++++++++++++++++++-------- 1 file changed, 76 insertions(+), 19 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 6bc69eb4..fbdb351e 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -22,7 +22,6 @@ from __future__ import annotations from sys import byteorder from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX -from multiprocessing import resource_tracker as mantracker if _USE_POSIX: from _posixshmem import shm_unlink @@ -30,6 +29,7 @@ if _USE_POSIX: import tractor import numpy as np from pydantic import BaseModel +from numpy.lib import recfunctions as rfn from ..log import get_logger from ._source import base_iohlc_dtype @@ -46,26 +46,33 @@ _default_size = 10 * _secs_in_day _rt_buffer_start = int(9*_secs_in_day) -# Tell the "resource tracker" thing to fuck off. -class ManTracker(mantracker.ResourceTracker): - def register(self, name, rtype): - pass +def cuckoff_mantracker(): - def unregister(self, name, rtype): - pass + from multiprocessing import resource_tracker as mantracker - def ensure_running(self): - pass + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass -# "know your land and know your prey" -# https://www.dailymotion.com/video/x6ozzco -mantracker._resource_tracker = ManTracker() -mantracker.register = mantracker._resource_tracker.register -mantracker.ensure_running = mantracker._resource_tracker.ensure_running -ensure_running = mantracker._resource_tracker.ensure_running -mantracker.unregister = mantracker._resource_tracker.unregister -mantracker.getfd = mantracker._resource_tracker.getfd + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd + + +cuckoff_mantracker() class SharedInt: @@ -191,7 +198,11 @@ class ShmArray: self._post_init: bool = False # pushing data does not write the index (aka primary key) - self._write_fields = list(shmarr.dtype.fields.keys())[1:] + dtype = shmarr.dtype + if dtype.fields: + self._write_fields = list(shmarr.dtype.fields.keys())[1:] + else: + self._write_fields = None # TODO: ringbuf api? @@ -237,6 +248,48 @@ class ShmArray: return a + def ustruct( + self, + fields: Optional[list[str]] = None, + + # type that all field values will be cast to + # in the returned view. + common_dtype: np.dtype = np.float, + + ) -> np.ndarray: + + array = self._array + + if fields: + selection = array[fields] + fcount = len(fields) + else: + selection = array + fcount = len(array.dtype.fields) + + # XXX: manual ``.view()`` attempt that also doesn't work. + # uview = selection.view( + # dtype='