From 14037cd1dcf2382e1024c38fd8d8bc480cf03161 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 20 Apr 2022 11:43:47 -0400 Subject: [PATCH] More WIP, implement `BarItems` rendering in `Flow.update_graphics()` --- piker/ui/_flows.py | 400 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 312 insertions(+), 88 deletions(-) diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index 772aa026..8d5e7e77 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -23,6 +23,8 @@ incremental update. ''' from __future__ import annotations +from functools import partial +import time from typing import ( Optional, Callable, @@ -30,6 +32,7 @@ from typing import ( import msgspec import numpy as np +from numpy.lib import recfunctions as rfn import pyqtgraph as pg from PyQt5.QtGui import QPainterPath @@ -37,6 +40,7 @@ from ..data._sharedmem import ( ShmArray, # attach_shm_array ) +from .._profile import pg_profile_enabled, ms_slower_then from ._ohlc import ( BarItems, gen_qpath, @@ -46,7 +50,12 @@ from ._curve import ( ) from ._compression import ( ohlc_flatten, + ds_m4, ) +from ..log import get_logger + + +log = get_logger(__name__) # class FlowsTable(msgspec.Struct): # ''' @@ -72,11 +81,63 @@ from ._compression import ( # return cls(shm) +def rowarr_to_path( + rows_array: np.ndarray, + x_basis: np.ndarray, + flow: Flow, + +) -> QPainterPath: + + # TODO: we could in theory use ``numba`` to flatten + # if needed? + + # to 1d + y = rows_array.flatten() + + return pg.functions.arrayToQPath( + # these get passed at render call time + x=x_basis[:y.size], + y=y, + connect='all', + finiteCheck=False, + path=flow.path, + ) + + +def ohlc_flat_view( + ohlc_shm: ShmArray, + + # XXX: we bind this in currently.. + x_basis: np.ndarray, + + # vr: Optional[slice] = None, + +) -> np.ndarray: + ''' + Return flattened-non-copy view into an OHLC shm array. + + ''' + ohlc = ohlc_shm._array[['open', 'high', 'low', 'close']] + # if vr: + # ohlc = ohlc[vr] + # x = x_basis[vr] + + unstructured = rfn.structured_to_unstructured( + ohlc, + copy=False, + ) + # breakpoint() + y = unstructured.flatten() + x = x_basis[:y.size] + return x, y + + class Flow(msgspec.Struct): # , frozen=True): ''' - (FinancialSignal-)Flow compound type which wraps a real-time - graphics (curve) and its backing data stream together for high level - access and control. + (Financial Signal-)Flow compound type which wraps a real-time + shm array stream with displayed graphics (curves, charts) + for high level access and control as well as efficient incremental + update. The intention is for this type to eventually be capable of shm-passing of incrementally updated graphics stream data between actors. @@ -89,6 +150,8 @@ class Flow(msgspec.Struct): # , frozen=True): is_ohlc: bool = False render: bool = True # toggle for display loop + flat: Optional[ShmArray] = None + x_basis: Optional[np.ndarray] = None _last_uppx: float = 0 _in_ds: bool = False @@ -96,6 +159,7 @@ class Flow(msgspec.Struct): # , frozen=True): _graphics_tranform_fn: Optional[Callable[ShmArray, np.ndarray]] = None # map from uppx -> (downsampled data, incremental graphics) + _src_r: Optional[Renderer] = None _render_table: dict[ Optional[int], tuple[Renderer, pg.GraphicsItem], @@ -215,7 +279,9 @@ class Flow(msgspec.Struct): # , frozen=True): int, int, np.ndarray, int, int, np.ndarray, ]: + # read call array = self.shm.array + indexes = array['index'] ifirst = indexes[0] ilast = indexes[-1] @@ -245,6 +311,8 @@ class Flow(msgspec.Struct): # , frozen=True): render: bool = True, array_key: Optional[str] = None, + profiler=None, + **kwargs, ) -> pg.GraphicsObject: @@ -253,8 +321,19 @@ class Flow(msgspec.Struct): # , frozen=True): render to graphics. ''' + + profiler = profiler or pg.debug.Profiler( + msg=f'Flow.update_graphics() for {self.name}', + disabled=not pg_profile_enabled(), + gt=ms_slower_then, + delayed=True, + ) # shm read and slice to view - read = xfirst, xlast, array, ivl, ivr, in_view = self.read() + read = ( + xfirst, xlast, array, + ivl, ivr, in_view, + ) = self.read() + profiler('read src shm data') if ( not in_view.size @@ -265,100 +344,182 @@ class Flow(msgspec.Struct): # , frozen=True): graphics = self.graphics if isinstance(graphics, BarItems): - # ugh, not luvin dis, should we have just a designated - # instance var? - r = self._render_table.get('src') + # if no source data renderer exists create one. + r = self._src_r if not r: - r = Renderer( + # OHLC bars path renderer + r = self._src_r = Renderer( flow=self, - draw=gen_qpath, # TODO: rename this to something with ohlc + # TODO: rename this to something with ohlc + draw_path=gen_qpath, last_read=read, ) - self._render_table['src'] = (r, graphics) + + # create a flattened view onto the OHLC array + # which can be read as a line-style format + # shm = self.shm + # self.flat = shm.unstruct_view(['open', 'high', 'low', 'close']) + # import pdbpp + # pdbpp.set_trace() + # x = self.x_basis = ( + # np.broadcast_to( + # shm._array['index'][:, None], + # # self.flat._array.shape, + # self.flat.shape, + # ) + np.array([-0.5, 0, 0, 0.5]) + # ) ds_curve_r = Renderer( flow=self, - draw=gen_qpath, # TODO: rename this to something with ohlc + + # just swap in the flat view + data_t=lambda array: self.flat.array, + # data_t=partial( + # ohlc_flat_view, + # self.shm, + # ), last_read=read, - prerender_fn=ohlc_flatten, + draw_path=partial( + rowarr_to_path, + x_basis=None, + ), + ) + curve = FastAppendCurve( + # y=y, + # x=x, + name='OHLC', + color=graphics._color, + ) + curve.hide() + self.plot.addItem(curve) # baseline "line" downsampled OHLC curve that should # kick on only when we reach a certain uppx threshold. self._render_table[0] = ( ds_curve_r, - FastAppendCurve( - y=y, - x=x, - name='OHLC', - color=self._color, - ), + curve, ) + dsc_r, curve = self._render_table[0] + # do checks for whether or not we require downsampling: # - if we're **not** downsampling then we simply want to # render the bars graphics curve and update.. # - if insteam we are in a downsamplig state then we to + x_gt = 8 + uppx = curve.x_uppx() + in_line = should_line = curve.isVisible() + if ( + should_line + and uppx < x_gt + ): + should_line = False + + elif ( + not should_line + and uppx >= x_gt + ): + should_line = True + + profiler(f'ds logic complete line={should_line}') + + # do graphics updates + if should_line: + # start = time.time() + # y = self.shm.unstruct_view( + # ['open', 'high', 'low', 'close'], + # ) + # print(f'unstruct diff: {time.time() - start}') + # profiler('read unstr view bars to line') + # # start = self.flat._first.value + + # x = self.x_basis[:y.size].flatten() + # y = y.flatten() + # profiler('flattening bars to line') + # path, last = dsc_r.render(read) + # x, flat = ohlc_flat_view( + # ohlc_shm=self.shm, + # x_basis=x_basis, + # ) + # y = y.flatten() + # y_iv = y[ivl:ivr].flatten() + # x_iv = x[ivl:ivr].flatten() + # assert y.size == x.size + + x, y = self.flat = ohlc_flatten(array) + x_iv, y_iv = ohlc_flatten(in_view) + profiler('flattened OHLC data') + + curve.update_from_array( + x, + y, + x_iv=x_iv, + y_iv=y_iv, + view_range=None, # hack + profiler=profiler, + ) + profiler('updated ds curve') + + else: + # render incremental or in-view update + # and apply ouput (path) to graphics. + path, last = r.render( + read, + only_in_view=True, + ) + + graphics.path = path + graphics.draw_last(last) + + # NOTE: on appends we used to have to flip the coords + # cache thought it doesn't seem to be required any more? + # graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache) + # graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + + graphics.prepareGeometryChange() + graphics.update() + + if ( + not in_line + and should_line + ): + # change to line graphic + + log.info( + f'downsampling to line graphic {self.name}' + ) + graphics.hide() + # graphics.update() + curve.show() + curve.update() + + elif in_line and not should_line: + log.info(f'showing bars graphic {self.name}') + curve.hide() + graphics.show() + graphics.update() + # update our pre-downsample-ready data and then pass that # new data the downsampler algo for incremental update. - else: - pass - # do incremental update - graphics.update_from_array( - array, - in_view, - view_range=(ivl, ivr) if use_vr else None, + # graphics.update_from_array( + # array, + # in_view, + # view_range=(ivl, ivr) if use_vr else None, - **kwargs, - ) + # **kwargs, + # ) - # generate and apply path to graphics obj - graphics.path, last = r.render(only_in_view=True) - graphics.draw_last(last) + # generate and apply path to graphics obj + # graphics.path, last = r.render( + # read, + # only_in_view=True, + # ) + # graphics.draw_last(last) else: - # should_ds = False - # should_redraw = False - - # # downsampling incremental state checking - # uppx = bars.x_uppx() - # px_width = bars.px_width() - # uppx_diff = (uppx - self._last_uppx) - - # if self.renderer is None: - # self.renderer = Renderer( - # flow=self, - - # if not self._in_ds: - # # in not currently marked as downsampling graphics - # # then only draw the full bars graphic for datums "in - # # view". - - # # check for downsampling conditions - # if ( - # # std m4 downsample conditions - # px_width - # and uppx_diff >= 4 - # or uppx_diff <= -3 - # or self._step_mode and abs(uppx_diff) >= 4 - - # ): - # log.info( - # f'{self._name} sampler change: {self._last_uppx} -> {uppx}' - # ) - # self._last_uppx = uppx - # should_ds = True - - # elif ( - # uppx <= 2 - # and self._in_ds - # ): - # # we should de-downsample back to our original - # # source data so we clear our path data in prep - # # to generate a new one from original source data. - # should_redraw = True - # should_ds = False + # ``FastAppendCurve`` case: array_key = array_key or self.name @@ -376,23 +537,64 @@ class Flow(msgspec.Struct): # , frozen=True): return graphics +def xy_downsample( + x, + y, + px_width, + uppx, + + x_spacer: float = 0.5, + +) -> tuple[np.ndarray, np.ndarray]: + + # downsample whenever more then 1 pixels per datum can be shown. + # always refresh data bounds until we get diffing + # working properly, see above.. + bins, x, y = ds_m4( + x, + y, + px_width=px_width, + uppx=uppx, + log_scale=bool(uppx) + ) + + # flatten output to 1d arrays suitable for path-graphics generation. + x = np.broadcast_to(x[:, None], y.shape) + x = (x + np.array( + [-x_spacer, 0, 0, x_spacer] + )).flatten() + y = y.flatten() + + return x, y + + class Renderer(msgspec.Struct): flow: Flow # called to render path graphics - draw: Callable[np.ndarray, QPainterPath] + draw_path: Callable[np.ndarray, QPainterPath] - # called on input data but before - prerender_fn: Optional[Callable[ShmArray, np.ndarray]] = None + # called on input data but before any graphics format + # conversions or processing. + data_t: Optional[Callable[ShmArray, np.ndarray]] = None + data_t_shm: Optional[ShmArray] = None + # called on the final data (transform) output to convert + # to "graphical data form" a format that can be passed to + # the ``.draw()`` implementation. + graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None + graphics_t_shm: Optional[ShmArray] = None + + # path graphics update implementation methods prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None # last array view read last_read: Optional[np.ndarray] = None - # output graphics rendering + # output graphics rendering, the main object + # processed in ``QGraphicsObject.paint()`` path: Optional[QPainterPath] = None # def diff( @@ -411,8 +613,10 @@ class Renderer(msgspec.Struct): def render( self, + new_read, + # only render datums "in view" of the ``ChartView`` - only_in_view: bool = True, + only_in_view: bool = False, ) -> list[QPainterPath]: ''' @@ -428,28 +632,42 @@ class Renderer(msgspec.Struct): ''' # do full source data render to path - xfirst, xlast, array, ivl, ivr, in_view = self.last_read + last_read = ( + xfirst, xlast, array, + ivl, ivr, in_view, + ) = self.last_read if only_in_view: - # get latest data from flow shm - self.last_read = ( - xfirst, xlast, array, ivl, ivr, in_view - ) = self.flow.read() - array = in_view + # # get latest data from flow shm + # self.last_read = ( + # xfirst, xlast, array, ivl, ivr, in_view + # ) = new_read - if self.path is None or in_view: + if self.path is None or only_in_view: # redraw the entire source data if we have either of: # - no prior path graphic rendered or, # - we always intend to re-render the data only in view - if self.prerender_fn: - array = self.prerender_fn(array) + # data transform: convert source data to a format + # expected to be incrementally updates and later rendered + # to a more graphics native format. + if self.data_t: + array = self.data_t(array) - hist, last = array[:-1], array[-1] + # maybe allocate shm for data transform output + # if self.data_t_shm is None: + # fshm = self.flow.shm - # call path render func on history - self.path = self.draw(hist) + # shm, opened = maybe_open_shm_array( + # f'{self.flow.name}_data_t', + # # TODO: create entry for each time frame + # dtype=array.dtype, + # readonly=False, + # ) + # assert opened + # shm.push(array) + # self.data_t_shm = shm elif self.path: print(f'inremental update not supported yet {self.flow.name}') @@ -459,4 +677,10 @@ class Renderer(msgspec.Struct): # do path generation for each segment # and then push into graphics object. + hist, last = array[:-1], array[-1] + + # call path render func on history + self.path = self.draw_path(hist) + + self.last_read = new_read return self.path, last