From b30b4bb555cd45e257085be117c289dcbb467816 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Apr 2022 17:05:57 -0400 Subject: [PATCH] Add `Flow` type with a real chitty mxmn cacheing method This new type wraps a shm data flow and will eventually include things like incremental path-graphics updates and serialization + bg downsampling techniques. The main immediate motivation was to get a cached y-range max/min calc going since profiling revealed the `numpy` equivalents were actually quite slow as the data set grows large. Likely we can use all this to drive a streaming mx/mn routine that's always launched as part of each on-host flow. This is our official foray into use of `msgspec.Struct` B) and I have to say, pretty impressed; we'll likely completely ditch `pydantic` from here on out. --- piker/ui/_chart.py | 160 ++++++++++++++++++++++++++++++++------------- 1 file changed, 116 insertions(+), 44 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index c09ac89b..80b88b0c 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -34,10 +34,11 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QSplitter, ) +import msgspec import numpy as np +# from pydantic import BaseModel import pyqtgraph as pg import trio -from pydantic import BaseModel from ._axes import ( DynamicDateAxis, @@ -59,10 +60,14 @@ from ._style import ( ) from ..data.feed import Feed from ..data._source import Symbol -from ..data._sharedmem import ShmArray +from ..data._sharedmem import ( + ShmArray, + # _Token, +) from ..log import get_logger from ._interaction import ChartView from ._forms import FieldsForm +from .._profile import pg_profile_enabled, ms_slower_then from ._overlay import PlotItemOverlay if TYPE_CHECKING: @@ -663,19 +668,90 @@ class LinkedSplits(QWidget): # flows: dict[str, np.ndarray] = {} -class Flow(BaseModel): +class Flow(msgspec.Struct): # , frozen=True): ''' (FinancialSignal-)Flow compound type which wraps a real-time graphics (curve) and its backing data stream together for high level access and control. - ''' - class Config: - arbitrary_types_allowed = True + The intention is for this type to eventually be capable of shm-passing + of incrementally updated graphics stream data between actors. + ''' name: str plot: pg.PlotItem - shm: Optional[ShmArray] = None # may be filled in "later" + is_ohlc: bool = False + + # TODO: hackery to be able to set a shm later + # but whilst also allowing this type to hashable, + # likely will require serializable token that is used to attach + # to the underlying shm ref after startup? + _shm: Optional[ShmArray] = None # currently, may be filled in "later" + + # cache of y-range values per x-range input. + _mxmns: dict[tuple[int, int], tuple[float, float]] = {} + + @property + def shm(self) -> ShmArray: + return self._shm + + @shm.setter + def shm(self, shm: ShmArray) -> ShmArray: + self._shm = shm + + def maxmin( + self, + lbar, + rbar, + + ) -> tuple[float, float]: + ''' + Compute the cached max and min y-range values for a given + x-range determined by ``lbar`` and ``rbar``. + + ''' + rkey = (lbar, rbar) + result = self._mxmns.get(rkey) + if result: + return result + + shm = self.shm + if shm is None: + # print(f'no shm {self.name}?') + return 0, 0 + + arr = shm.array + + # build relative indexes into shm array + # TODO: should we just add/use a method + # on the shm to do this? + ifirst = arr[0]['index'] + slice_view = arr[ + lbar - ifirst:(rbar - ifirst) + 1] + + if not slice_view.size: + # print(f'no data in view {self.name}?') + return 0, 0 + + if self.is_ohlc: + ylow = np.min(slice_view['low']) + yhigh = np.max(slice_view['high']) + + else: + view = slice_view[self.name] + ylow = np.min(view) + yhigh = np.max(view) + # else: + # ylow, yhigh = 0, 0 + + result = ylow, yhigh + + if result != (0, 0): + self._mxmns[rkey] = result + + if self.name == 'drk_vlm': + print(f'{self.name} mxmn @ {rkey} -> {result}') + return result class ChartPlotWidget(pg.PlotWidget): @@ -1005,6 +1081,13 @@ class ChartPlotWidget(pg.PlotWidget): data_key = array_key or name self._graphics[data_key] = graphics + + self._flows[data_key] = Flow( + name=name, + plot=self.plotItem, + is_ohlc=True, + ) + self._add_sticky(name, bg_color='davies') return graphics, data_key @@ -1122,6 +1205,12 @@ class ChartPlotWidget(pg.PlotWidget): pi = self.plotItem + self._flows[data_key] = Flow( + name=name, + plot=pi, + is_ohlc=False, + ) + # TODO: this probably needs its own method? if overlay: if isinstance(overlay, pg.PlotItem): @@ -1130,10 +1219,6 @@ class ChartPlotWidget(pg.PlotWidget): f'{overlay} must be from `.plotitem_overlay()`' ) pi = overlay - - # anchor_at = ('bottom', 'left') - self._flows[name] = Flow(name=name, plot=pi) - else: # anchor_at = ('top', 'left') @@ -1342,46 +1427,33 @@ class ChartPlotWidget(pg.PlotWidget): If ``bars_range`` is provided use that range. ''' - l, lbar, rbar, r = bars_range or self.bars_range() - # TODO: logic to check if end of bars in view - # extra = view_len - _min_points_to_show - # begin = self._arrays['ohlc'][0]['index'] - extra - # # end = len(self._arrays['ohlc']) - 1 + extra - # end = self._arrays['ohlc'][-1]['index'] - 1 + extra + profiler = pg.debug.Profiler( + disabled=not pg_profile_enabled(), + gt=ms_slower_then, + delayed=True, + ) - # bars_len = rbar - lbar - # log.debug( - # f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n" - # f"view_len: {view_len}, bars_len: {bars_len}\n" - # f"begin: {begin}, end: {end}, extra: {extra}" - # ) + l, lbar, rbar, r = bars_range or self.bars_range() + profiler(f'{self.name} got bars range') # TODO: here we should instead look up the ``Flow.shm.array`` # and read directly from shm to avoid copying to memory first # and then reading it again here. - a = self._arrays.get(name or self.name) - if a is None: - return None - - ifirst = a[0]['index'] - bars = a[lbar - ifirst:(rbar - ifirst) + 1] - - if not len(bars): - # likely no data loaded yet or extreme scrolling? - log.error(f"WTF bars_range = {lbar}:{rbar}") - return - + flow_key = name or self.name + flow = self._flows.get(flow_key) if ( - self.data_key == self.linked.symbol.key + flow is None ): - # ohlc sampled bars hi/lo lookup - ylow = np.nanmin(bars['low']) - yhigh = np.nanmax(bars['high']) + print(f"flow {flow_key} doesn't exist in chart {self.name}") + return 0, 0 else: - view = bars[name or self.data_key] - ylow = np.nanmin(view) - yhigh = np.nanmax(view) + key = round(lbar), round(rbar) + res = flow.maxmin(*key) + profiler(f'{key} max-min {res}') + if res == (0, 0): + log.error( + f"{flow_key} -> (0, 0) for bars_range = {key}" + ) - # print(f'{(ylow, yhigh)}') - return ylow, yhigh + return res