From 6f5bb9cbe018ddb5adc12bb4dd76518070a91e03 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 21 Apr 2022 14:52:30 -0400 Subject: [PATCH] Incrementally update flattend OHLC data After much effort (and exhaustion) but failure to get a view into our `numpy` OHLC struct-array, this instead allocates an in-thread-memory array which is updated with flattened data every flow update cycle. I need to report what I think is a bug to `numpy` core about the whole view thing not working but, more or less this gets the same behaviour and minimizes work to flatten the sampled data for line-graphics drawing thus improving refresh latency when drawing large downsampled curves. TL;DR: - add `ShmArray.ustruct()` to return a **copy of** (since a view doesn't work..) the (field filtered) shm array which is the same index-length as the source data. - update the OHLC ds curve with view aware data sliced out from the pre-allocated and incrementally updated data (we had to add a last index var `._iflat` to track appends - this should be moved into a renderer eventually?). --- piker/data/_sharedmem.py | 95 ++++++++++++++++++++++++++++-------- piker/ui/_flows.py | 101 +++++++++++++++++++++++++-------------- 2 files changed, 140 insertions(+), 56 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 6bc69eb4..fbdb351e 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -22,7 +22,6 @@ from __future__ import annotations from sys import byteorder from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX -from multiprocessing import resource_tracker as mantracker if _USE_POSIX: from _posixshmem import shm_unlink @@ -30,6 +29,7 @@ if _USE_POSIX: import tractor import numpy as np from pydantic import BaseModel +from numpy.lib import recfunctions as rfn from ..log import get_logger from ._source import base_iohlc_dtype @@ -46,26 +46,33 @@ _default_size = 10 * _secs_in_day _rt_buffer_start = int(9*_secs_in_day) -# Tell the "resource tracker" thing to fuck off. -class ManTracker(mantracker.ResourceTracker): - def register(self, name, rtype): - pass +def cuckoff_mantracker(): - def unregister(self, name, rtype): - pass + from multiprocessing import resource_tracker as mantracker - def ensure_running(self): - pass + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass -# "know your land and know your prey" -# https://www.dailymotion.com/video/x6ozzco -mantracker._resource_tracker = ManTracker() -mantracker.register = mantracker._resource_tracker.register -mantracker.ensure_running = mantracker._resource_tracker.ensure_running -ensure_running = mantracker._resource_tracker.ensure_running -mantracker.unregister = mantracker._resource_tracker.unregister -mantracker.getfd = mantracker._resource_tracker.getfd + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd + + +cuckoff_mantracker() class SharedInt: @@ -191,7 +198,11 @@ class ShmArray: self._post_init: bool = False # pushing data does not write the index (aka primary key) - self._write_fields = list(shmarr.dtype.fields.keys())[1:] + dtype = shmarr.dtype + if dtype.fields: + self._write_fields = list(shmarr.dtype.fields.keys())[1:] + else: + self._write_fields = None # TODO: ringbuf api? @@ -237,6 +248,48 @@ class ShmArray: return a + def ustruct( + self, + fields: Optional[list[str]] = None, + + # type that all field values will be cast to + # in the returned view. + common_dtype: np.dtype = np.float, + + ) -> np.ndarray: + + array = self._array + + if fields: + selection = array[fields] + fcount = len(fields) + else: + selection = array + fcount = len(array.dtype.fields) + + # XXX: manual ``.view()`` attempt that also doesn't work. + # uview = selection.view( + # dtype='