From 066b8df619dc820ab8079e135bf429246abc2a5f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 25 May 2022 19:45:09 -0400 Subject: [PATCH] Implement OHLC downsampled curve via renderer, drop old bypass code --- piker/ui/_flows.py | 440 +++++++++++++++---------------------------- piker/ui/_pathops.py | 5 +- 2 files changed, 157 insertions(+), 288 deletions(-) diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index 7714dd67..e74ee123 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -23,7 +23,7 @@ incremental update. ''' from __future__ import annotations -# from functools import partial +from functools import partial from typing import ( Optional, Callable, @@ -45,7 +45,6 @@ from PyQt5.QtCore import ( from ..data._sharedmem import ( ShmArray, - # open_shm_array, ) from .._profile import ( pg_profile_enabled, @@ -68,6 +67,7 @@ from ..log import get_logger log = get_logger(__name__) + # class FlowsTable(msgspec.Struct): # ''' # Data-AGGRegate: high level API onto multiple (categorized) @@ -77,44 +77,56 @@ log = get_logger(__name__) # ''' # flows: dict[str, np.ndarray] = {} -# @classmethod -# def from_token( -# cls, -# shm_token: tuple[ -# str, -# str, -# tuple[str, str], -# ], -# ) -> Renderer: +def update_ohlc_to_line( + src_shm: ShmArray, + array_key: str, + src_update: np.ndarray, + slc: slice, + ln: int, + first: int, + last: int, + is_append: bool, -# shm = attach_shm_array(token) -# return cls(shm) +) -> np.ndarray: - -def rowarr_to_path( - rows_array: np.ndarray, - x_basis: np.ndarray, - flow: Flow, - -) -> QPainterPath: - - # TODO: we could in theory use ``numba`` to flatten - # if needed? - - # to 1d - y = rows_array.flatten() - - return pg.functions.arrayToQPath( - # these get passed at render call time - x=x_basis[:y.size], - y=y, - connect='all', - finiteCheck=False, - path=flow.path, + fields = ['open', 'high', 'low', 'close'] + return ( + rfn.structured_to_unstructured(src_update[fields]), + slc, ) +def ohlc_flat_to_xy( + r: Renderer, + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + +) -> tuple[ + np.ndarray, + np.nd.array, + str, +]: + # TODO: in the case of an existing ``.update_xy()`` + # should we be passing in array as an xy arrays tuple? + + # 2 more datum-indexes to capture zero at end + x_flat = r.x_data[r._xy_first:r._xy_last] + y_flat = r.y_data[r._xy_first:r._xy_last] + + # slice to view + ivl, ivr = vr + x_iv_flat = x_flat[ivl:ivr] + y_iv_flat = y_flat[ivl:ivr] + + # reshape to 1d for graphics rendering + y_iv = y_iv_flat.reshape(-1) + x_iv = x_iv_flat.reshape(-1) + + return x_iv, y_iv, 'all' + + def render_baritems( flow: Flow, graphics: BarItems, @@ -137,10 +149,7 @@ def render_baritems( layer, if not a `Renderer` then something just above it? ''' - ( - xfirst, xlast, array, - ivl, ivr, in_view, - ) = read + bars = graphics # if no source data renderer exists create one. self = flow @@ -156,35 +165,28 @@ def render_baritems( last_read=read, ) - # ds_curve_r = Renderer( - # flow=self, + ds_curve_r = Renderer( + flow=self, + last_read=read, - # # just swap in the flat view - # # data_t=lambda array: self.gy.array, - # last_read=read, - # draw_path=partial( - # rowarr_to_path, - # x_basis=None, - # ), + # incr update routines + allocate_xy=ohlc_to_line, + update_xy=update_ohlc_to_line, + format_xy=ohlc_flat_to_xy, + ) - # ) curve = FastAppendCurve( - name='OHLC', - color=graphics._color, + name=f'{flow.name}_ds_ohlc', + color=bars._color, ) curve.hide() self.plot.addItem(curve) # baseline "line" downsampled OHLC curve that should # kick on only when we reach a certain uppx threshold. - self._render_table[0] = curve - # ( - # # ds_curve_r, - # curve, - # ) + self._render_table = (ds_curve_r, curve) - curve = self._render_table[0] - # dsc_r, curve = self._render_table[0] + ds_r, curve = self._render_table # do checks for whether or not we require downsampling: # - if we're **not** downsampling then we simply want to @@ -197,183 +199,74 @@ def render_baritems( should_line and uppx < x_gt ): - print('FLIPPING TO BARS') + # print('FLIPPING TO BARS') should_line = False elif ( not should_line and uppx >= x_gt ): - print('FLIPPING TO LINE') + # print('FLIPPING TO LINE') should_line = True profiler(f'ds logic complete line={should_line}') # do graphics updates if should_line: - - fields = ['open', 'high', 'low', 'close'] - - if self.gy is None: - # create a flattened view onto the OHLC array - # which can be read as a line-style format - shm = self.shm - ( - self._iflat_first, - self._iflat_last, - self.gx, - self.gy, - ) = ohlc_to_line( - shm, - fields=fields, - ) - - # print(f'unstruct diff: {time.time() - start}') - - gy = self.gy - - # update flatted ohlc copy - ( - iflat_first, - iflat, - ishm_last, - ishm_first, - ) = ( - self._iflat_first, - self._iflat_last, - self.shm._last.value, - self.shm._first.value - ) - - # check for shm prepend updates since last read. - if iflat_first != ishm_first: - - # write newly prepended data to flattened copy - gy[ - ishm_first:iflat_first - ] = rfn.structured_to_unstructured( - self.shm._array[fields][ishm_first:iflat_first] - ) - self._iflat_first = ishm_first - - to_update = rfn.structured_to_unstructured( - self.shm._array[iflat:ishm_last][fields] - ) - - gy[iflat:ishm_last][:] = to_update - profiler('updated ustruct OHLC data') - - # slice out up-to-last step contents - y_flat = gy[ishm_first:ishm_last] - x_flat = self.gx[ishm_first:ishm_last] - - # update local last-index tracking - self._iflat_last = ishm_last - - # reshape to 1d for graphics rendering - y = y_flat.reshape(-1) - x = x_flat.reshape(-1) - profiler('flattened ustruct OHLC data') - - # do all the same for only in-view data - y_iv_flat = y_flat[ivl:ivr] - x_iv_flat = x_flat[ivl:ivr] - y_iv = y_iv_flat.reshape(-1) - x_iv = x_iv_flat.reshape(-1) - profiler('flattened ustruct in-view OHLC data') - - # pass into curve graphics processing - # curve.update_from_array( - # x, - # y, - # x_iv=x_iv, - # y_iv=y_iv, - # view_range=(ivl, ivr), # hack - # profiler=profiler, - # # should_redraw=False, - - # # NOTE: already passed through by display loop? - # # do_append=uppx < 16, - # **kwargs, - # ) - curve.draw_last(x, y) - curve.show() + r = ds_r + graphics = curve profiler('updated ds curve') else: - # render incremental or in-view update - # and apply ouput (path) to graphics. - path, data = r.render( - read, - 'ohlc', - profiler=profiler, - # uppx=1, - use_vr=True, - # graphics=graphics, - # should_redraw=True, # always - ) - assert path + graphics = bars - graphics.path = path - graphics.draw_last(data[-1]) - if show_bars: - graphics.show() - - # NOTE: on appends we used to have to flip the coords - # cache thought it doesn't seem to be required any more? - # graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache) - # graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) - - # graphics.prepareGeometryChange() - graphics.update() + if show_bars: + bars.show() + changed_to_line = False if ( not in_line and should_line ): # change to line graphic - log.info( f'downsampling to line graphic {self.name}' ) - graphics.hide() - # graphics.update() + bars.hide() curve.show() curve.update() + changed_to_line = True elif in_line and not should_line: + # change to bars graphic log.info(f'showing bars graphic {self.name}') curve.hide() - graphics.show() - graphics.update() + bars.show() + bars.update() - # update our pre-downsample-ready data and then pass that - # new data the downsampler algo for incremental update. - - # graphics.update_from_array( - # array, - # in_view, - # view_range=(ivl, ivr) if use_vr else None, - - # **kwargs, - # ) - - # generate and apply path to graphics obj - # graphics.path, last = r.render( - # read, - # only_in_view=True, - # ) - # graphics.draw_last(last) + draw_last = False + lasts = self.shm.array[-2:] + last = lasts[-1] if should_line: - return ( - curve, - x, - y, - x_iv, - y_iv, + def draw_last(): + x, y = lasts['index'], lasts['close'] + curve.draw_last(x, y) + else: + draw_last = partial( + bars.draw_last, + last, ) + return ( + graphics, + r, + {'read_from_key': False}, + draw_last, + should_line, + changed_to_line, + ) + def update_step_xy( src_shm: ShmArray, @@ -484,7 +377,7 @@ class Flow(msgspec.Struct): # , frozen=True): _render_table: dict[ Optional[int], tuple[Renderer, pg.GraphicsItem], - ] = {} + ] = (None, None) # TODO: hackery to be able to set a shm later # but whilst also allowing this type to hashable, @@ -672,62 +565,58 @@ class Flow(msgspec.Struct): # , frozen=True): not in_view.size or not render ): + # print('exiting early') return graphics draw_last: bool = True slice_to_head: int = -1 - input_data = None + # input_data = None + + should_redraw: bool = False + + rkwargs = {} + bars = False - out: Optional[tuple] = None if isinstance(graphics, BarItems): - draw_last = False - # XXX: special case where we change out graphics # to a line after a certain uppx threshold. - out = render_baritems( + ( + graphics, + r, + rkwargs, + draw_last, + should_line, + changed_to_line, + ) = render_baritems( self, graphics, read, profiler, **kwargs, ) + bars = True + should_redraw = changed_to_line or not should_line - if out is None: - return graphics - - # return graphics + else: + r = self._src_r + if not r: + # just using for ``.diff()`` atm.. + r = self._src_r = Renderer( + flow=self, + # TODO: rename this to something with ohlc + last_read=read, + ) # ``FastAppendCurve`` case: array_key = array_key or self.name - - if out is not None: - # hack to handle ds curve from bars above - ( - graphics, # curve - x, - y, - x_iv, - y_iv, - ) = out - input_data = out[1:] - - r = self._src_r - if not r: - # just using for ``.diff()`` atm.. - r = self._src_r = Renderer( - flow=self, - # TODO: rename this to something with ohlc - # draw_path=gen_ohlc_qpath, - last_read=read, - ) + # print(array_key) # ds update config new_sample_rate: bool = False - should_redraw: bool = False should_ds: bool = r._in_ds showing_src_data: bool = not r._in_ds - if graphics._step_mode: + if getattr(graphics, '_step_mode', False): r.allocate_xy = to_step_format r.update_xy = update_step_xy @@ -756,8 +645,6 @@ class Flow(msgspec.Struct): # , frozen=True): x_last - w, 0, x_last + w, y_last, ) - - # should_redraw = bool(append_diff) draw_last = False # downsampling incremental state checking @@ -795,15 +682,11 @@ class Flow(msgspec.Struct): # , frozen=True): # - determine downsampling ops if needed # - (incrementally) update ``QPainterPath`` - # path = graphics.path - # fast_path = graphics.fast_path - path, data = r.render( read, array_key, profiler, uppx=uppx, - input_data=input_data, # use_vr=True, # TODO: better way to detect and pass this? @@ -816,6 +699,8 @@ class Flow(msgspec.Struct): # , frozen=True): slice_to_head=slice_to_head, do_append=do_append, + + **rkwargs, ) # TODO: does this actuallly help us in any way (prolly should # look at the source / ask ogi). @@ -825,11 +710,16 @@ class Flow(msgspec.Struct): # , frozen=True): graphics.path = r.path graphics.fast_path = r.fast_path - if draw_last: + if draw_last and not bars: + # TODO: how to handle this draw last stuff.. x = data['index'] y = data[array_key] graphics.draw_last(x, y) - profiler('draw last segment') + + elif bars and draw_last: + draw_last() + + profiler('draw last segment') graphics.update() profiler('.update()') @@ -1004,10 +894,8 @@ class Renderer(msgspec.Struct): profiler: pg.debug.Profiler, uppx: float = 1, - input_data: Optional[tuple[np.ndarray]] = None, - # redraw and ds flags - should_redraw: bool = True, + should_redraw: bool = False, new_sample_rate: bool = False, should_ds: bool = False, showing_src_data: bool = True, @@ -1018,6 +906,7 @@ class Renderer(msgspec.Struct): # only render datums "in view" of the ``ChartView`` use_vr: bool = True, + read_from_key: bool = True, ) -> list[QPainterPath]: ''' @@ -1067,7 +956,10 @@ class Renderer(msgspec.Struct): profiler('allocated xy history') if prepend_length: - y_prepend = shm._array[array_key][pre_slice] + y_prepend = shm._array[pre_slice] + + if read_from_key: + y_prepend = y_prepend[array_key] xy_data, xy_slice = self.update_xy( shm, @@ -1092,7 +984,10 @@ class Renderer(msgspec.Struct): profiler('prepended xy history: {prepend_length}') if append_length: - y_append = shm._array[array_key][post_slice] + y_append = shm._array[post_slice] + + if read_from_key: + y_append = y_append[array_key] xy_data, xy_slice = self.update_xy( shm, @@ -1114,43 +1009,22 @@ class Renderer(msgspec.Struct): if use_vr: array = in_view - - if input_data: - # allow input data passing for now from alt curve updaters. - ( - x_out, - y_out, - x_iv, - y_iv, - ) = input_data - connect = 'all' - - if use_vr: - x_out = x_iv - y_out = y_iv - else: - # xy-path data transform: convert source data to a format - # able to be passed to a `QPainterPath` rendering routine. - # expected to be incrementally updates and later rendered to - # a more graphics native format. - # if self.data_t: - # array = self.data_t(array) + ivl, ivr = xfirst, xlast - hist = array[:slice_to_head] - ( - x_out, - y_out, - connect, - ) = self.format_xy( - self, - # TODO: hist here should be the pre-sliced - # x/y_data in the case where allocate_xy is - # defined? - hist, - array_key, - (ivl, ivr), - ) + hist = array[:slice_to_head] + + # xy-path data transform: convert source data to a format + # able to be passed to a `QPainterPath` rendering routine. + x_out, y_out, connect = self.format_xy( + self, + # TODO: hist here should be the pre-sliced + # x/y_data in the case where allocate_xy is + # defined? + hist, + array_key, + (ivl, ivr), + ) profiler('sliced input arrays') @@ -1168,7 +1042,7 @@ class Renderer(msgspec.Struct): zoom_or_append = False last_vr = self._last_vr - last_ivr = self._last_ivr + last_ivr = self._last_ivr or vl, vr # incremental in-view data update. if last_vr: @@ -1203,7 +1077,6 @@ class Renderer(msgspec.Struct): ) ): should_redraw = True - # print("REDRAWING BRUH") self._last_vr = view_range if len(x_out): @@ -1224,7 +1097,7 @@ class Renderer(msgspec.Struct): or new_sample_rate or prepend_length > 0 ): - + # print("REDRAWING BRUH") if new_sample_rate and showing_src_data: log.info(f'DEDOWN -> {array_key}') self._in_ds = False @@ -1252,7 +1125,6 @@ class Renderer(msgspec.Struct): f'(should_redraw: {should_redraw} ' f'should_ds: {should_ds} new_sample_rate: {new_sample_rate})' ) - # profiler(f'DRAW PATH IN VIEW -> {self.name}') # TODO: get this piecewise prepend working - right now it's # giving heck on vwap... @@ -1297,7 +1169,7 @@ class Renderer(msgspec.Struct): x=new_x, y=new_y, connect=connect, - # path=fast_path, + path=fast_path, ) profiler('generated append qpath') diff --git a/piker/ui/_pathops.py b/piker/ui/_pathops.py index 89f7c5dc..83b46f43 100644 --- a/piker/ui/_pathops.py +++ b/piker/ui/_pathops.py @@ -168,11 +168,10 @@ def gen_ohlc_qpath( def ohlc_to_line( ohlc_shm: ShmArray, + data_field: str, fields: list[str] = ['open', 'high', 'low', 'close'] ) -> tuple[ - int, # flattened first index - int, # flattened last index np.ndarray, np.ndarray, ]: @@ -205,8 +204,6 @@ def ohlc_to_line( assert y_out.any() return ( - first, - last, x_out, y_out, )