diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a3a97164..8aa10091 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -34,9 +34,7 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QSplitter, ) -import msgspec import numpy as np -# from pydantic import BaseModel import pyqtgraph as pg import trio @@ -49,6 +47,7 @@ from ._cursor import ( Cursor, ContentsLabel, ) +from ..data._sharedmem import ShmArray from ._l1 import L1Labels from ._ohlc import BarItems from ._curve import FastAppendCurve @@ -60,15 +59,12 @@ from ._style import ( ) from ..data.feed import Feed from ..data._source import Symbol -from ..data._sharedmem import ( - ShmArray, - # _Token, -) from ..log import get_logger from ._interaction import ChartView from ._forms import FieldsForm from .._profile import pg_profile_enabled, ms_slower_then from ._overlay import PlotItemOverlay +from ._flows import Flow if TYPE_CHECKING: from ._display import DisplayState @@ -419,7 +415,7 @@ class LinkedSplits(QWidget): self, symbol: Symbol, - array: np.ndarray, + shm: ShmArray, sidepane: FieldsForm, style: str = 'bar', @@ -444,7 +440,7 @@ class LinkedSplits(QWidget): self.chart = self.add_plot( name=symbol.key, - array=array, + shm=shm, style=style, _is_main=True, @@ -472,7 +468,7 @@ class LinkedSplits(QWidget): self, name: str, - array: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, style: str = 'line', @@ -516,7 +512,6 @@ class LinkedSplits(QWidget): name=name, data_key=array_key or name, - array=array, parent=qframe, linkedsplits=self, axisItems=axes, @@ -580,7 +575,7 @@ class LinkedSplits(QWidget): graphics, data_key = cpw.draw_ohlc( name, - array, + shm, array_key=array_key ) self.cursor.contents_labels.add_label( @@ -594,7 +589,7 @@ class LinkedSplits(QWidget): add_label = True graphics, data_key = cpw.draw_curve( name, - array, + shm, array_key=array_key, color='default_light', ) @@ -603,7 +598,7 @@ class LinkedSplits(QWidget): add_label = True graphics, data_key = cpw.draw_curve( name, - array, + shm, array_key=array_key, step_mode=True, color='davies', @@ -691,7 +686,6 @@ class ChartPlotWidget(pg.PlotWidget): # the "data view" we generate graphics from name: str, - array: np.ndarray, data_key: str, linkedsplits: LinkedSplits, @@ -744,14 +738,6 @@ class ChartPlotWidget(pg.PlotWidget): self._max_l1_line_len: float = 0 # self.setViewportMargins(0, 0, 0, 0) - # self._ohlc = array # readonly view of ohlc data - - # TODO: move to Aggr above XD - # readonly view of data arrays - self._arrays = { - self.data_key: array, - } - self._graphics = {} # registry of underlying graphics # registry of overlay curve names self._flows: dict[str, Flow] = {} @@ -767,7 +753,6 @@ class ChartPlotWidget(pg.PlotWidget): # show background grid self.showGrid(x=False, y=True, alpha=0.3) - self.default_view() self.cv.enable_auto_yrange() self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) @@ -816,14 +801,8 @@ class ChartPlotWidget(pg.PlotWidget): Return a range tuple for the bars present in view. ''' - l, r = self.view_range() - array = self._arrays[self.name] - start, stop = self._xrange = ( - array[0]['index'], - array[-1]['index'], - ) - lbar = max(l, start) - rbar = min(r, stop) + main_flow = self._flows[self.name] + ifirst, l, lbar, rbar, r, ilast = main_flow.datums_range() return l, lbar, rbar, r def curve_width_pxs( @@ -877,40 +856,51 @@ class ChartPlotWidget(pg.PlotWidget): def default_view( self, - steps_on_screen: Optional[int] = None + bars_from_y: int = 5000, ) -> None: ''' Set the view box to the "default" startup view of the scene. ''' - try: - index = self._arrays[self.name]['index'] - except IndexError: - log.warning(f'array for {self.name} not loaded yet?') + flow = self._flows.get(self.name) + if not flow: + log.warning(f'`Flow` for {self.name} not loaded yet?') return + index = flow.shm.array['index'] xfirst, xlast = index[0], index[-1] l, lbar, rbar, r = self.bars_range() - - marker_pos, l1_len = self.pre_l1_xs() - end = xlast + l1_len + 1 + view = self.view if ( rbar < 0 or l < xfirst + or l < 0 or (rbar - lbar) < 6 ): - # set fixed bars count on screen that approx includes as + # TODO: set fixed bars count on screen that approx includes as # many bars as possible before a downsample line is shown. - begin = xlast - round(6116 / 6) + begin = xlast - bars_from_y + view.setXRange( + min=begin, + max=xlast, + padding=0, + ) + # re-get range + l, lbar, rbar, r = self.bars_range() - else: - begin = end - (r - l) + # we get the L1 spread label "length" in view coords + # terms now that we've scaled either by user control + # or to the default set of bars as per the immediate block + # above. + marker_pos, l1_len = self.pre_l1_xs() + end = xlast + l1_len + 1 + begin = end - (r - l) # for debugging # print( - # f'bars range: {brange}\n' + # # f'bars range: {brange}\n' # f'xlast: {xlast}\n' # f'marker pos: {marker_pos}\n' # f'l1 len: {l1_len}\n' @@ -922,14 +912,13 @@ class ChartPlotWidget(pg.PlotWidget): if self._static_yrange == 'axis': self._static_yrange = None - view = self.view view.setXRange( min=begin, max=end, padding=0, ) - view._set_yrange() self.view.maybe_downsample_graphics() + view._set_yrange() try: self.linked.graphics_cycle() except IndexError: @@ -960,7 +949,7 @@ class ChartPlotWidget(pg.PlotWidget): def draw_ohlc( self, name: str, - data: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, @@ -980,19 +969,21 @@ class ChartPlotWidget(pg.PlotWidget): # the np array buffer to be drawn on next render cycle self.plotItem.addItem(graphics) - # draw after to allow self.scene() to work... - graphics.draw_from_data(data) - data_key = array_key or name - self._graphics[data_key] = graphics self._flows[data_key] = Flow( name=name, plot=self.plotItem, + _shm=shm, is_ohlc=True, graphics=graphics, ) + # TODO: i think we can eventually remove this if + # we write the ``Flow.update_graphics()`` method right? + # draw after to allow self.scene() to work... + graphics.draw_from_data(shm.array) + self._add_sticky(name, bg_color='davies') return graphics, data_key @@ -1058,7 +1049,7 @@ class ChartPlotWidget(pg.PlotWidget): self, name: str, - data: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, overlay: bool = False, @@ -1071,7 +1062,7 @@ class ChartPlotWidget(pg.PlotWidget): ) -> (pg.PlotDataItem, str): ''' Draw a "curve" (line plot graphics) for the provided data in - the input array ``data``. + the input shm array ``shm``. ''' color = color or self.pen_color or 'default_light' @@ -1082,6 +1073,7 @@ class ChartPlotWidget(pg.PlotWidget): data_key = array_key or name # yah, we wrote our own B) + data = shm.array curve = FastAppendCurve( y=data[data_key], x=data['index'], @@ -1105,16 +1097,14 @@ class ChartPlotWidget(pg.PlotWidget): # and is disastrous for performance. # curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache) - # register curve graphics and backing array for name - self._graphics[name] = curve - self._arrays[data_key] = data - pi = pi or self.plotItem self._flows[data_key] = Flow( name=name, plot=pi, + _shm=shm, is_ohlc=False, + # register curve graphics with this flow graphics=curve, ) @@ -1175,16 +1165,11 @@ class ChartPlotWidget(pg.PlotWidget): ) return last - def update_graphics_from_array( + def update_graphics_from_flow( self, graphics_name: str, - - array: Optional[np.ndarray] = None, array_key: Optional[str] = None, - use_vr: bool = True, - render: bool = True, - **kwargs, ) -> pg.GraphicsObject: @@ -1192,63 +1177,11 @@ class ChartPlotWidget(pg.PlotWidget): Update the named internal graphics from ``array``. ''' - if array is not None: - assert len(array) - - data_key = array_key or graphics_name - if graphics_name not in self._flows: - data_key = self.name - - if array is not None: - # write array to internal graphics table - self._arrays[data_key] = array - else: - array = self._arrays[data_key] - - # array key and graphics "name" might be different.. - graphics = self._graphics[graphics_name] - - # compute "in-view" indices - l, lbar, rbar, r = self.bars_range() - indexes = array['index'] - ifirst = indexes[0] - ilast = indexes[-1] - - lbar_i = max(l, ifirst) - ifirst - rbar_i = min(r, ilast) - ifirst - - # TODO: we could do it this way as well no? - # to_draw = array[lbar - ifirst:(rbar - ifirst) + 1] - in_view = array[lbar_i: rbar_i + 1] - - if ( - not in_view.size - or not render - ): - return graphics - - if isinstance(graphics, BarItems): - graphics.update_from_array( - array, - in_view, - view_range=(lbar_i, rbar_i) if use_vr else None, - - **kwargs, - ) - - else: - graphics.update_from_array( - x=array['index'], - y=array[data_key], - - x_iv=in_view['index'], - y_iv=in_view[data_key], - view_range=(lbar_i, rbar_i) if use_vr else None, - - **kwargs - ) - - return graphics + flow = self._flows[array_key or graphics_name] + return flow.update_graphics( + array_key=array_key, + **kwargs, + ) # def _label_h(self, yhigh: float, ylow: float) -> float: # # compute contents label "height" in view terms @@ -1295,7 +1228,7 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: this should go onto some sort of # data-view thinger..right? - ohlc = self._shm.array + ohlc = self._flows[self.name].shm.array # XXX: not sure why the time is so off here # looks like we're gonna have to do some fixing.. @@ -1341,9 +1274,6 @@ class ChartPlotWidget(pg.PlotWidget): delayed=True, ) - l, lbar, rbar, r = bars_range or self.bars_range() - profiler(f'{self.name} got bars range') - # TODO: here we should instead look up the ``Flow.shm.array`` # and read directly from shm to avoid copying to memory first # and then reading it again here. @@ -1356,6 +1286,9 @@ class ChartPlotWidget(pg.PlotWidget): res = 0, 0 else: + first, l, lbar, rbar, r, last = bars_range or flow.datums_range() + profiler(f'{self.name} got bars range') + key = round(lbar), round(rbar) res = flow.maxmin(*key) profiler(f'yrange mxmn: {key} -> {res}') @@ -1366,99 +1299,3 @@ class ChartPlotWidget(pg.PlotWidget): res = 0, 0 return res - - -# class FlowsTable(pydantic.BaseModel): -# ''' -# Data-AGGRegate: high level API onto multiple (categorized) -# ``Flow``s with high level processing routines for -# multi-graphics computations and display. - -# ''' -# flows: dict[str, np.ndarray] = {} - - -class Flow(msgspec.Struct): # , frozen=True): - ''' - (FinancialSignal-)Flow compound type which wraps a real-time - graphics (curve) and its backing data stream together for high level - access and control. - - The intention is for this type to eventually be capable of shm-passing - of incrementally updated graphics stream data between actors. - - ''' - name: str - plot: pg.PlotItem - is_ohlc: bool = False - graphics: pg.GraphicsObject - - # TODO: hackery to be able to set a shm later - # but whilst also allowing this type to hashable, - # likely will require serializable token that is used to attach - # to the underlying shm ref after startup? - _shm: Optional[ShmArray] = None # currently, may be filled in "later" - - # cache of y-range values per x-range input. - _mxmns: dict[tuple[int, int], tuple[float, float]] = {} - - @property - def shm(self) -> ShmArray: - return self._shm - - @shm.setter - def shm(self, shm: ShmArray) -> ShmArray: - self._shm = shm - - def maxmin( - self, - lbar, - rbar, - - ) -> tuple[float, float]: - ''' - Compute the cached max and min y-range values for a given - x-range determined by ``lbar`` and ``rbar``. - - ''' - rkey = (lbar, rbar) - cached_result = self._mxmns.get(rkey) - if cached_result: - return cached_result - - shm = self.shm - if shm is None: - mxmn = None - - else: # new block for profiling?.. - arr = shm.array - - # build relative indexes into shm array - # TODO: should we just add/use a method - # on the shm to do this? - ifirst = arr[0]['index'] - slice_view = arr[ - lbar - ifirst: - (rbar - ifirst) + 1 - ] - - if not slice_view.size: - mxmn = None - - else: - if self.is_ohlc: - ylow = np.min(slice_view['low']) - yhigh = np.max(slice_view['high']) - - else: - view = slice_view[self.name] - ylow = np.min(view) - yhigh = np.max(view) - - mxmn = ylow, yhigh - - if mxmn is not None: - # cache new mxmn result - self._mxmns[rkey] = mxmn - - return mxmn diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py new file mode 100644 index 00000000..a9eb6a4f --- /dev/null +++ b/piker/ui/_flows.py @@ -0,0 +1,303 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +High level streaming graphics primitives. + +This is an intermediate layer which associates real-time low latency +graphics primitives with underlying FSP related data structures for fast +incremental update. + +''' +from typing import ( + Optional, + Callable, +) + +import msgspec +import numpy as np +import pyqtgraph as pg +from PyQt5.QtGui import QPainterPath + +from ..data._sharedmem import ( + ShmArray, + # attach_shm_array +) +from ._ohlc import BarItems + + +# class FlowsTable(msgspec.Struct): +# ''' +# Data-AGGRegate: high level API onto multiple (categorized) +# ``Flow``s with high level processing routines for +# multi-graphics computations and display. + +# ''' +# flows: dict[str, np.ndarray] = {} + + +class Flow(msgspec.Struct): # , frozen=True): + ''' + (FinancialSignal-)Flow compound type which wraps a real-time + graphics (curve) and its backing data stream together for high level + access and control. + + The intention is for this type to eventually be capable of shm-passing + of incrementally updated graphics stream data between actors. + + ''' + name: str + plot: pg.PlotItem + is_ohlc: bool = False + render: bool = True # toggle for display loop + + graphics: pg.GraphicsObject + + # TODO: hackery to be able to set a shm later + # but whilst also allowing this type to hashable, + # likely will require serializable token that is used to attach + # to the underlying shm ref after startup? + _shm: Optional[ShmArray] = None # currently, may be filled in "later" + + # last read from shm (usually due to an update call) + _last_read: Optional[np.ndarray] = None + + # cache of y-range values per x-range input. + _mxmns: dict[tuple[int, int], tuple[float, float]] = {} + + @property + def shm(self) -> ShmArray: + return self._shm + + # TODO: remove this and only allow setting through + # private ``._shm`` attr? + @shm.setter + def shm(self, shm: ShmArray) -> ShmArray: + print(f'{self.name} DO NOT SET SHM THIS WAY!?') + self._shm = shm + + def maxmin( + self, + lbar, + rbar, + + ) -> tuple[float, float]: + ''' + Compute the cached max and min y-range values for a given + x-range determined by ``lbar`` and ``rbar``. + + ''' + rkey = (lbar, rbar) + cached_result = self._mxmns.get(rkey) + if cached_result: + return cached_result + + shm = self.shm + if shm is None: + mxmn = None + + else: # new block for profiling?.. + arr = shm.array + + # build relative indexes into shm array + # TODO: should we just add/use a method + # on the shm to do this? + ifirst = arr[0]['index'] + slice_view = arr[ + lbar - ifirst: + (rbar - ifirst) + 1 + ] + + if not slice_view.size: + mxmn = None + + else: + if self.is_ohlc: + ylow = np.min(slice_view['low']) + yhigh = np.max(slice_view['high']) + + else: + view = slice_view[self.name] + ylow = np.min(view) + yhigh = np.max(view) + + mxmn = ylow, yhigh + + if mxmn is not None: + # cache new mxmn result + self._mxmns[rkey] = mxmn + + return mxmn + + def view_range(self) -> tuple[int, int]: + ''' + Return the indexes in view for the associated + plot displaying this flow's data. + + ''' + vr = self.plot.viewRect() + return int(vr.left()), int(vr.right()) + + def datums_range(self) -> tuple[ + int, int, int, int, int, int + ]: + ''' + Return a range tuple for the datums present in view. + + ''' + l, r = self.view_range() + + # TODO: avoid this and have shm passed + # in earlier. + if self.shm is None: + # haven't initialized the flow yet + return (0, l, 0, 0, r, 0) + + array = self.shm.array + index = array['index'] + start = index[0] + end = index[-1] + lbar = max(l, start) + rbar = min(r, end) + return ( + start, l, lbar, rbar, r, end, + ) + + def read(self) -> tuple[ + int, int, np.ndarray, + int, int, np.ndarray, + ]: + array = self.shm.array + indexes = array['index'] + ifirst = indexes[0] + ilast = indexes[-1] + + ifirst, l, lbar, rbar, r, ilast = self.datums_range() + + # get read-relative indices adjusting + # for master shm index. + lbar_i = max(l, ifirst) - ifirst + rbar_i = min(r, ilast) - ifirst + + # TODO: we could do it this way as well no? + # to_draw = array[lbar - ifirst:(rbar - ifirst) + 1] + in_view = array[lbar_i: rbar_i + 1] + + return ( + # abs indices + full data set + ifirst, ilast, array, + + # relative indices + in view datums + lbar_i, rbar_i, in_view, + ) + + def update_graphics( + self, + use_vr: bool = True, + render: bool = True, + array_key: Optional[str] = None, + + **kwargs, + + ) -> pg.GraphicsObject: + ''' + Read latest datums from shm and render to (incrementally) + render to graphics. + + ''' + # shm read and slice to view + xfirst, xlast, array, ivl, ivr, in_view = self.read() + + if ( + not in_view.size + or not render + ): + return self.graphics + + array_key = array_key or self.name + + graphics = self.graphics + if isinstance(graphics, BarItems): + graphics.update_from_array( + array, + in_view, + view_range=(ivl, ivr) if use_vr else None, + + **kwargs, + ) + + else: + graphics.update_from_array( + x=array['index'], + y=array[array_key], + + x_iv=in_view['index'], + y_iv=in_view[array_key], + view_range=(ivl, ivr) if use_vr else None, + + **kwargs + ) + + return graphics + + # @classmethod + # def from_token( + # cls, + # shm_token: tuple[ + # str, + # str, + # tuple[str, str], + # ], + + # ) -> PathRenderer: + + # shm = attach_shm_array(token) + # return cls(shm) + + +class PathRenderer(msgspec.Struct): + + # output graphics rendering + path: Optional[QPainterPath] = None + + last_read_src_array: np.ndarray + # called on input data but before + prerender_fn: Callable[ShmArray, np.ndarray] + + def diff( + self, + ) -> dict[str, np.ndarray]: + ... + + def update(self) -> QPainterPath: + ''' + Incrementally update the internal path graphics from + updates in shm data and deliver the new (sub)-path + generated. + + ''' + ... + + + def render( + self, + + ) -> list[QPainterPath]: + ''' + Render the current graphics path(s) + + ''' + ...