From 6a44c83e849de679e1f2b1db23f46fb76989cb1c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Apr 2022 09:38:25 -0400 Subject: [PATCH] Add new `ui._flows` module This begins the removal of data processing / analysis methods from the chart widget and instead moving them to our new `Flow` API (in the new module introduce here) and delegating the old chart methods to the respective internal flow. Most importantly is no longer storing the "last read" of an array from shm in an internal chart table (was `._arrays`) and instead the `ShmArray` instance is passed as input and stored in the `Flow` instance. This greatly simplifies lookup logic such that the display loop now doesn't have to worry about reading shm, it can be done by internal graphics logic as desired. Generally speaking, all previous `._arrays`/`._graphics` lookups are now delegated to the entries in the chart's `._flows` table. The new `Flow` methods are generally better factored and provide more detailed output regarding data-stream <-> graphics inter-relations for the future purpose of allowing much more efficient update calls in the display loop as well as supporting low latency interaction UX. The concept here is that we're introducing an intermediary layer that ties together graphics and real-time data flows such that widget code is oriented around plot layout and the flow apis are oriented around real-time low latency updates and providing an efficient high level metric layer for the UX. The summary api transition is something like: - `update_graphics_from_array()` -> `.update_graphics_from_flow()` - `.bars_range()` -> `Flow.datums_range()` - `.bars_range()` -> `Flow.datums_range()` --- piker/ui/_chart.py | 277 +++++++++-------------------------------- piker/ui/_flows.py | 303 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 360 insertions(+), 220 deletions(-) create mode 100644 piker/ui/_flows.py diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a3a97164..8aa10091 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -34,9 +34,7 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QSplitter, ) -import msgspec import numpy as np -# from pydantic import BaseModel import pyqtgraph as pg import trio @@ -49,6 +47,7 @@ from ._cursor import ( Cursor, ContentsLabel, ) +from ..data._sharedmem import ShmArray from ._l1 import L1Labels from ._ohlc import BarItems from ._curve import FastAppendCurve @@ -60,15 +59,12 @@ from ._style import ( ) from ..data.feed import Feed from ..data._source import Symbol -from ..data._sharedmem import ( - ShmArray, - # _Token, -) from ..log import get_logger from ._interaction import ChartView from ._forms import FieldsForm from .._profile import pg_profile_enabled, ms_slower_then from ._overlay import PlotItemOverlay +from ._flows import Flow if TYPE_CHECKING: from ._display import DisplayState @@ -419,7 +415,7 @@ class LinkedSplits(QWidget): self, symbol: Symbol, - array: np.ndarray, + shm: ShmArray, sidepane: FieldsForm, style: str = 'bar', @@ -444,7 +440,7 @@ class LinkedSplits(QWidget): self.chart = self.add_plot( name=symbol.key, - array=array, + shm=shm, style=style, _is_main=True, @@ -472,7 +468,7 @@ class LinkedSplits(QWidget): self, name: str, - array: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, style: str = 'line', @@ -516,7 +512,6 @@ class LinkedSplits(QWidget): name=name, data_key=array_key or name, - array=array, parent=qframe, linkedsplits=self, axisItems=axes, @@ -580,7 +575,7 @@ class LinkedSplits(QWidget): graphics, data_key = cpw.draw_ohlc( name, - array, + shm, array_key=array_key ) self.cursor.contents_labels.add_label( @@ -594,7 +589,7 @@ class LinkedSplits(QWidget): add_label = True graphics, data_key = cpw.draw_curve( name, - array, + shm, array_key=array_key, color='default_light', ) @@ -603,7 +598,7 @@ class LinkedSplits(QWidget): add_label = True graphics, data_key = cpw.draw_curve( name, - array, + shm, array_key=array_key, step_mode=True, color='davies', @@ -691,7 +686,6 @@ class ChartPlotWidget(pg.PlotWidget): # the "data view" we generate graphics from name: str, - array: np.ndarray, data_key: str, linkedsplits: LinkedSplits, @@ -744,14 +738,6 @@ class ChartPlotWidget(pg.PlotWidget): self._max_l1_line_len: float = 0 # self.setViewportMargins(0, 0, 0, 0) - # self._ohlc = array # readonly view of ohlc data - - # TODO: move to Aggr above XD - # readonly view of data arrays - self._arrays = { - self.data_key: array, - } - self._graphics = {} # registry of underlying graphics # registry of overlay curve names self._flows: dict[str, Flow] = {} @@ -767,7 +753,6 @@ class ChartPlotWidget(pg.PlotWidget): # show background grid self.showGrid(x=False, y=True, alpha=0.3) - self.default_view() self.cv.enable_auto_yrange() self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) @@ -816,14 +801,8 @@ class ChartPlotWidget(pg.PlotWidget): Return a range tuple for the bars present in view. ''' - l, r = self.view_range() - array = self._arrays[self.name] - start, stop = self._xrange = ( - array[0]['index'], - array[-1]['index'], - ) - lbar = max(l, start) - rbar = min(r, stop) + main_flow = self._flows[self.name] + ifirst, l, lbar, rbar, r, ilast = main_flow.datums_range() return l, lbar, rbar, r def curve_width_pxs( @@ -877,40 +856,51 @@ class ChartPlotWidget(pg.PlotWidget): def default_view( self, - steps_on_screen: Optional[int] = None + bars_from_y: int = 5000, ) -> None: ''' Set the view box to the "default" startup view of the scene. ''' - try: - index = self._arrays[self.name]['index'] - except IndexError: - log.warning(f'array for {self.name} not loaded yet?') + flow = self._flows.get(self.name) + if not flow: + log.warning(f'`Flow` for {self.name} not loaded yet?') return + index = flow.shm.array['index'] xfirst, xlast = index[0], index[-1] l, lbar, rbar, r = self.bars_range() - - marker_pos, l1_len = self.pre_l1_xs() - end = xlast + l1_len + 1 + view = self.view if ( rbar < 0 or l < xfirst + or l < 0 or (rbar - lbar) < 6 ): - # set fixed bars count on screen that approx includes as + # TODO: set fixed bars count on screen that approx includes as # many bars as possible before a downsample line is shown. - begin = xlast - round(6116 / 6) + begin = xlast - bars_from_y + view.setXRange( + min=begin, + max=xlast, + padding=0, + ) + # re-get range + l, lbar, rbar, r = self.bars_range() - else: - begin = end - (r - l) + # we get the L1 spread label "length" in view coords + # terms now that we've scaled either by user control + # or to the default set of bars as per the immediate block + # above. + marker_pos, l1_len = self.pre_l1_xs() + end = xlast + l1_len + 1 + begin = end - (r - l) # for debugging # print( - # f'bars range: {brange}\n' + # # f'bars range: {brange}\n' # f'xlast: {xlast}\n' # f'marker pos: {marker_pos}\n' # f'l1 len: {l1_len}\n' @@ -922,14 +912,13 @@ class ChartPlotWidget(pg.PlotWidget): if self._static_yrange == 'axis': self._static_yrange = None - view = self.view view.setXRange( min=begin, max=end, padding=0, ) - view._set_yrange() self.view.maybe_downsample_graphics() + view._set_yrange() try: self.linked.graphics_cycle() except IndexError: @@ -960,7 +949,7 @@ class ChartPlotWidget(pg.PlotWidget): def draw_ohlc( self, name: str, - data: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, @@ -980,19 +969,21 @@ class ChartPlotWidget(pg.PlotWidget): # the np array buffer to be drawn on next render cycle self.plotItem.addItem(graphics) - # draw after to allow self.scene() to work... - graphics.draw_from_data(data) - data_key = array_key or name - self._graphics[data_key] = graphics self._flows[data_key] = Flow( name=name, plot=self.plotItem, + _shm=shm, is_ohlc=True, graphics=graphics, ) + # TODO: i think we can eventually remove this if + # we write the ``Flow.update_graphics()`` method right? + # draw after to allow self.scene() to work... + graphics.draw_from_data(shm.array) + self._add_sticky(name, bg_color='davies') return graphics, data_key @@ -1058,7 +1049,7 @@ class ChartPlotWidget(pg.PlotWidget): self, name: str, - data: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, overlay: bool = False, @@ -1071,7 +1062,7 @@ class ChartPlotWidget(pg.PlotWidget): ) -> (pg.PlotDataItem, str): ''' Draw a "curve" (line plot graphics) for the provided data in - the input array ``data``. + the input shm array ``shm``. ''' color = color or self.pen_color or 'default_light' @@ -1082,6 +1073,7 @@ class ChartPlotWidget(pg.PlotWidget): data_key = array_key or name # yah, we wrote our own B) + data = shm.array curve = FastAppendCurve( y=data[data_key], x=data['index'], @@ -1105,16 +1097,14 @@ class ChartPlotWidget(pg.PlotWidget): # and is disastrous for performance. # curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache) - # register curve graphics and backing array for name - self._graphics[name] = curve - self._arrays[data_key] = data - pi = pi or self.plotItem self._flows[data_key] = Flow( name=name, plot=pi, + _shm=shm, is_ohlc=False, + # register curve graphics with this flow graphics=curve, ) @@ -1175,16 +1165,11 @@ class ChartPlotWidget(pg.PlotWidget): ) return last - def update_graphics_from_array( + def update_graphics_from_flow( self, graphics_name: str, - - array: Optional[np.ndarray] = None, array_key: Optional[str] = None, - use_vr: bool = True, - render: bool = True, - **kwargs, ) -> pg.GraphicsObject: @@ -1192,63 +1177,11 @@ class ChartPlotWidget(pg.PlotWidget): Update the named internal graphics from ``array``. ''' - if array is not None: - assert len(array) - - data_key = array_key or graphics_name - if graphics_name not in self._flows: - data_key = self.name - - if array is not None: - # write array to internal graphics table - self._arrays[data_key] = array - else: - array = self._arrays[data_key] - - # array key and graphics "name" might be different.. - graphics = self._graphics[graphics_name] - - # compute "in-view" indices - l, lbar, rbar, r = self.bars_range() - indexes = array['index'] - ifirst = indexes[0] - ilast = indexes[-1] - - lbar_i = max(l, ifirst) - ifirst - rbar_i = min(r, ilast) - ifirst - - # TODO: we could do it this way as well no? - # to_draw = array[lbar - ifirst:(rbar - ifirst) + 1] - in_view = array[lbar_i: rbar_i + 1] - - if ( - not in_view.size - or not render - ): - return graphics - - if isinstance(graphics, BarItems): - graphics.update_from_array( - array, - in_view, - view_range=(lbar_i, rbar_i) if use_vr else None, - - **kwargs, - ) - - else: - graphics.update_from_array( - x=array['index'], - y=array[data_key], - - x_iv=in_view['index'], - y_iv=in_view[data_key], - view_range=(lbar_i, rbar_i) if use_vr else None, - - **kwargs - ) - - return graphics + flow = self._flows[array_key or graphics_name] + return flow.update_graphics( + array_key=array_key, + **kwargs, + ) # def _label_h(self, yhigh: float, ylow: float) -> float: # # compute contents label "height" in view terms @@ -1295,7 +1228,7 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: this should go onto some sort of # data-view thinger..right? - ohlc = self._shm.array + ohlc = self._flows[self.name].shm.array # XXX: not sure why the time is so off here # looks like we're gonna have to do some fixing.. @@ -1341,9 +1274,6 @@ class ChartPlotWidget(pg.PlotWidget): delayed=True, ) - l, lbar, rbar, r = bars_range or self.bars_range() - profiler(f'{self.name} got bars range') - # TODO: here we should instead look up the ``Flow.shm.array`` # and read directly from shm to avoid copying to memory first # and then reading it again here. @@ -1356,6 +1286,9 @@ class ChartPlotWidget(pg.PlotWidget): res = 0, 0 else: + first, l, lbar, rbar, r, last = bars_range or flow.datums_range() + profiler(f'{self.name} got bars range') + key = round(lbar), round(rbar) res = flow.maxmin(*key) profiler(f'yrange mxmn: {key} -> {res}') @@ -1366,99 +1299,3 @@ class ChartPlotWidget(pg.PlotWidget): res = 0, 0 return res - - -# class FlowsTable(pydantic.BaseModel): -# ''' -# Data-AGGRegate: high level API onto multiple (categorized) -# ``Flow``s with high level processing routines for -# multi-graphics computations and display. - -# ''' -# flows: dict[str, np.ndarray] = {} - - -class Flow(msgspec.Struct): # , frozen=True): - ''' - (FinancialSignal-)Flow compound type which wraps a real-time - graphics (curve) and its backing data stream together for high level - access and control. - - The intention is for this type to eventually be capable of shm-passing - of incrementally updated graphics stream data between actors. - - ''' - name: str - plot: pg.PlotItem - is_ohlc: bool = False - graphics: pg.GraphicsObject - - # TODO: hackery to be able to set a shm later - # but whilst also allowing this type to hashable, - # likely will require serializable token that is used to attach - # to the underlying shm ref after startup? - _shm: Optional[ShmArray] = None # currently, may be filled in "later" - - # cache of y-range values per x-range input. - _mxmns: dict[tuple[int, int], tuple[float, float]] = {} - - @property - def shm(self) -> ShmArray: - return self._shm - - @shm.setter - def shm(self, shm: ShmArray) -> ShmArray: - self._shm = shm - - def maxmin( - self, - lbar, - rbar, - - ) -> tuple[float, float]: - ''' - Compute the cached max and min y-range values for a given - x-range determined by ``lbar`` and ``rbar``. - - ''' - rkey = (lbar, rbar) - cached_result = self._mxmns.get(rkey) - if cached_result: - return cached_result - - shm = self.shm - if shm is None: - mxmn = None - - else: # new block for profiling?.. - arr = shm.array - - # build relative indexes into shm array - # TODO: should we just add/use a method - # on the shm to do this? - ifirst = arr[0]['index'] - slice_view = arr[ - lbar - ifirst: - (rbar - ifirst) + 1 - ] - - if not slice_view.size: - mxmn = None - - else: - if self.is_ohlc: - ylow = np.min(slice_view['low']) - yhigh = np.max(slice_view['high']) - - else: - view = slice_view[self.name] - ylow = np.min(view) - yhigh = np.max(view) - - mxmn = ylow, yhigh - - if mxmn is not None: - # cache new mxmn result - self._mxmns[rkey] = mxmn - - return mxmn diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py new file mode 100644 index 00000000..a9eb6a4f --- /dev/null +++ b/piker/ui/_flows.py @@ -0,0 +1,303 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +High level streaming graphics primitives. + +This is an intermediate layer which associates real-time low latency +graphics primitives with underlying FSP related data structures for fast +incremental update. + +''' +from typing import ( + Optional, + Callable, +) + +import msgspec +import numpy as np +import pyqtgraph as pg +from PyQt5.QtGui import QPainterPath + +from ..data._sharedmem import ( + ShmArray, + # attach_shm_array +) +from ._ohlc import BarItems + + +# class FlowsTable(msgspec.Struct): +# ''' +# Data-AGGRegate: high level API onto multiple (categorized) +# ``Flow``s with high level processing routines for +# multi-graphics computations and display. + +# ''' +# flows: dict[str, np.ndarray] = {} + + +class Flow(msgspec.Struct): # , frozen=True): + ''' + (FinancialSignal-)Flow compound type which wraps a real-time + graphics (curve) and its backing data stream together for high level + access and control. + + The intention is for this type to eventually be capable of shm-passing + of incrementally updated graphics stream data between actors. + + ''' + name: str + plot: pg.PlotItem + is_ohlc: bool = False + render: bool = True # toggle for display loop + + graphics: pg.GraphicsObject + + # TODO: hackery to be able to set a shm later + # but whilst also allowing this type to hashable, + # likely will require serializable token that is used to attach + # to the underlying shm ref after startup? + _shm: Optional[ShmArray] = None # currently, may be filled in "later" + + # last read from shm (usually due to an update call) + _last_read: Optional[np.ndarray] = None + + # cache of y-range values per x-range input. + _mxmns: dict[tuple[int, int], tuple[float, float]] = {} + + @property + def shm(self) -> ShmArray: + return self._shm + + # TODO: remove this and only allow setting through + # private ``._shm`` attr? + @shm.setter + def shm(self, shm: ShmArray) -> ShmArray: + print(f'{self.name} DO NOT SET SHM THIS WAY!?') + self._shm = shm + + def maxmin( + self, + lbar, + rbar, + + ) -> tuple[float, float]: + ''' + Compute the cached max and min y-range values for a given + x-range determined by ``lbar`` and ``rbar``. + + ''' + rkey = (lbar, rbar) + cached_result = self._mxmns.get(rkey) + if cached_result: + return cached_result + + shm = self.shm + if shm is None: + mxmn = None + + else: # new block for profiling?.. + arr = shm.array + + # build relative indexes into shm array + # TODO: should we just add/use a method + # on the shm to do this? + ifirst = arr[0]['index'] + slice_view = arr[ + lbar - ifirst: + (rbar - ifirst) + 1 + ] + + if not slice_view.size: + mxmn = None + + else: + if self.is_ohlc: + ylow = np.min(slice_view['low']) + yhigh = np.max(slice_view['high']) + + else: + view = slice_view[self.name] + ylow = np.min(view) + yhigh = np.max(view) + + mxmn = ylow, yhigh + + if mxmn is not None: + # cache new mxmn result + self._mxmns[rkey] = mxmn + + return mxmn + + def view_range(self) -> tuple[int, int]: + ''' + Return the indexes in view for the associated + plot displaying this flow's data. + + ''' + vr = self.plot.viewRect() + return int(vr.left()), int(vr.right()) + + def datums_range(self) -> tuple[ + int, int, int, int, int, int + ]: + ''' + Return a range tuple for the datums present in view. + + ''' + l, r = self.view_range() + + # TODO: avoid this and have shm passed + # in earlier. + if self.shm is None: + # haven't initialized the flow yet + return (0, l, 0, 0, r, 0) + + array = self.shm.array + index = array['index'] + start = index[0] + end = index[-1] + lbar = max(l, start) + rbar = min(r, end) + return ( + start, l, lbar, rbar, r, end, + ) + + def read(self) -> tuple[ + int, int, np.ndarray, + int, int, np.ndarray, + ]: + array = self.shm.array + indexes = array['index'] + ifirst = indexes[0] + ilast = indexes[-1] + + ifirst, l, lbar, rbar, r, ilast = self.datums_range() + + # get read-relative indices adjusting + # for master shm index. + lbar_i = max(l, ifirst) - ifirst + rbar_i = min(r, ilast) - ifirst + + # TODO: we could do it this way as well no? + # to_draw = array[lbar - ifirst:(rbar - ifirst) + 1] + in_view = array[lbar_i: rbar_i + 1] + + return ( + # abs indices + full data set + ifirst, ilast, array, + + # relative indices + in view datums + lbar_i, rbar_i, in_view, + ) + + def update_graphics( + self, + use_vr: bool = True, + render: bool = True, + array_key: Optional[str] = None, + + **kwargs, + + ) -> pg.GraphicsObject: + ''' + Read latest datums from shm and render to (incrementally) + render to graphics. + + ''' + # shm read and slice to view + xfirst, xlast, array, ivl, ivr, in_view = self.read() + + if ( + not in_view.size + or not render + ): + return self.graphics + + array_key = array_key or self.name + + graphics = self.graphics + if isinstance(graphics, BarItems): + graphics.update_from_array( + array, + in_view, + view_range=(ivl, ivr) if use_vr else None, + + **kwargs, + ) + + else: + graphics.update_from_array( + x=array['index'], + y=array[array_key], + + x_iv=in_view['index'], + y_iv=in_view[array_key], + view_range=(ivl, ivr) if use_vr else None, + + **kwargs + ) + + return graphics + + # @classmethod + # def from_token( + # cls, + # shm_token: tuple[ + # str, + # str, + # tuple[str, str], + # ], + + # ) -> PathRenderer: + + # shm = attach_shm_array(token) + # return cls(shm) + + +class PathRenderer(msgspec.Struct): + + # output graphics rendering + path: Optional[QPainterPath] = None + + last_read_src_array: np.ndarray + # called on input data but before + prerender_fn: Callable[ShmArray, np.ndarray] + + def diff( + self, + ) -> dict[str, np.ndarray]: + ... + + def update(self) -> QPainterPath: + ''' + Incrementally update the internal path graphics from + updates in shm data and deliver the new (sub)-path + generated. + + ''' + ... + + + def render( + self, + + ) -> list[QPainterPath]: + ''' + Render the current graphics path(s) + + ''' + ...