From ab1f15506d75a8ee1fd972d9dfe5e1a3aa050fcb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Nov 2022 17:28:58 -0500 Subject: [PATCH] Add graphics incr-updated "formatter" subsys After trying to hack epoch indexed time series and failing miserably, decided to properly factor out all formatting routines into a common subsystem API: ``IncrementalFormatter`` which provides the interface for incrementally updating and tracking pre-path-graphics formatted data. Previously this functionality was mangled into our `Renderer` (which also does the work of `QPath` generation and update) but splitting it out also preps for being able to do graphics-buffer downsampling and caching on a remote host B) The ``IncrementalFormatter`` (parent type) has the default behaviour of tracking a single field-array on some source `ShmArray`, updating a flattened `numpy.ndarray` in-mem allocation, and providing a default 1d conversion for pre-downsampling and path generation. Changed out of `Renderer`, - `.allocate_xy()`, `update_xy()` and `format_xy()` all are moved to more explicitly named formatter methods. - all `.x/y_data` nd array management and update - "last view range" tracking - `.last_read`, `.diff()` - now calls `IncrementalFormatter.format_to_1d()` inside `.render()` The new API gets, - `.diff()`, `.last_read` - all view range diff tracking through `.track_inview_range()`. - better nd format array names: `.x/y_nd`, `xy_nd_start/stop`. - `.format_to_1d()` which renders pre-path formatted arrays ready for both m4 sampling and path gen. - better explicit overloadable formatting method names: * `.allocate_xy()` -> `.allocate_xy_nd()` * `.update_xy()` -> `.incr_update_xy_nd()` * `.format_xy()` -> `.format_xy_nd_to_1d()` Finally this implements per-graphics-type formatters which define each set up related formatting routines: - `OHLCBarsFmtr`: std multi-line style bars - `OHLCBarsAsCurveFmtr`: draws an interpolated line for ohlc sampled data - `StepCurveFmtr`: handles vlm style curves --- piker/ui/_flows.py | 369 ++++----------- piker/ui/_pathops.py | 1036 +++++++++++++++++++++++++++++------------- 2 files changed, 809 insertions(+), 596 deletions(-) diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index d793b6d0..1450b41e 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -25,8 +25,6 @@ incremental update. from __future__ import annotations from typing import ( Optional, - Callable, - Union, ) import msgspec @@ -43,21 +41,10 @@ from .._profile import ( # ms_slower_then, ) from ._pathops import ( - by_index_and_key, - - # Plain OHLC renderer - gen_ohlc_qpath, - - # OHLC -> line renderer - ohlc_to_line, - update_ohlc_to_line, - ohlc_flat_to_xy, - - # step curve renderer - to_step_format, - update_step_xy, - step_to_xy, - + IncrementalFormatter, + OHLCBarsFmtr, # Plain OHLC renderer + OHLCBarsAsCurveFmtr, # OHLC converted to line + StepCurveFmtr, # "step" curve (like for vlm) xy_downsample, ) from ._ohlc import ( @@ -76,16 +63,6 @@ from .._profile import Profiler log = get_logger(__name__) -# class FlowsTable(msgspec.Struct): -# ''' -# Data-AGGRegate: high level API onto multiple (categorized) -# ``Flow``s with high level processing routines for -# multi-graphics computations and display. - -# ''' -# flows: dict[str, np.ndarray] = {} - - def render_baritems( flow: Flow, graphics: BarItems, @@ -117,21 +94,24 @@ def render_baritems( r = self._src_r if not r: show_bars = True + # OHLC bars path renderer r = self._src_r = Renderer( flow=self, - format_xy=gen_ohlc_qpath, - last_read=read, + fmtr=OHLCBarsFmtr( + shm=flow.shm, + flow=flow, + _last_read=read, + ), ) ds_curve_r = Renderer( flow=self, - last_read=read, - - # incr update routines - allocate_xy=ohlc_to_line, - update_xy=update_ohlc_to_line, - format_xy=ohlc_flat_to_xy, + fmtr=OHLCBarsAsCurveFmtr( + shm=flow.shm, + flow=flow, + _last_read=read, + ), ) curve = FlattenedOHLC( @@ -228,7 +208,7 @@ class Flow(msgspec.Struct): # , frozen=True): ''' name: str plot: pg.PlotItem - graphics: Union[Curve, BarItems] + graphics: Curve | BarItems _shm: ShmArray yrange: tuple[float, float] = None @@ -237,7 +217,6 @@ class Flow(msgspec.Struct): # , frozen=True): # normally this is just a plain line. ds_graphics: Optional[Curve] = None - is_ohlc: bool = False render: bool = True # toggle for display loop @@ -445,9 +424,14 @@ class Flow(msgspec.Struct): # , frozen=True): slice_to_head: int = -1 should_redraw: bool = False + should_line: bool = False rkwargs = {} - should_line = False + # TODO: probably specialize ``Renderer`` types instead of + # these logic checks? + # - put these blocks into a `.load_renderer()` meth? + # - consider a OHLCRenderer, StepCurveRenderer, Renderer? + r = self._src_r if isinstance(graphics, BarItems): # XXX: special case where we change out graphics # to a line after a certain uppx threshold. @@ -467,16 +451,36 @@ class Flow(msgspec.Struct): # , frozen=True): should_redraw = changed_to_line or not should_line self._in_ds = should_line - else: - r = self._src_r - if not r: - # just using for ``.diff()`` atm.. + elif not r: + if isinstance(graphics, StepCurve): + r = self._src_r = Renderer( flow=self, - # TODO: rename this to something with ohlc - last_read=read, + fmtr=StepCurveFmtr( + shm=self.shm, + flow=self, + _last_read=read, + ), ) + # TODO: append logic inside ``.render()`` isn't + # correct yet for step curves.. remove this to see it. + should_redraw = True + slice_to_head = -2 + + else: + r = self._src_r + if not r: + # just using for ``.diff()`` atm.. + r = self._src_r = Renderer( + flow=self, + fmtr=IncrementalFormatter( + shm=self.shm, + flow=self, + _last_read=read, + ), + ) + # ``Curve`` derivative case(s): array_key = array_key or self.name # print(array_key) @@ -486,19 +490,6 @@ class Flow(msgspec.Struct): # , frozen=True): should_ds: bool = r._in_ds showing_src_data: bool = not r._in_ds - # step_mode = getattr(graphics, '_step_mode', False) - step_mode = isinstance(graphics, StepCurve) - if step_mode: - - r.allocate_xy = to_step_format - r.update_xy = update_step_xy - r.format_xy = step_to_xy - - # TODO: append logic inside ``.render()`` isn't - # correct yet for step curves.. remove this to see it. - should_redraw = True - slice_to_head = -2 - # downsampling incremental state checking # check for and set std m4 downsample conditions uppx = graphics.x_uppx() @@ -680,34 +671,7 @@ class Flow(msgspec.Struct): # , frozen=True): class Renderer(msgspec.Struct): flow: Flow - # last array view read - last_read: Optional[tuple] = None - - # default just returns index, and named array from data - format_xy: Callable[ - [np.ndarray, str], - tuple[np.ndarray] - ] = by_index_and_key - - # optional pre-graphics xy formatted data which - # is incrementally updated in sync with the source data. - allocate_xy: Optional[Callable[ - [int, slice], - tuple[np.ndarray, np.nd.array] - ]] = None - - update_xy: Optional[Callable[ - [int, slice], None] - ] = None - - x_data: Optional[np.ndarray] = None - y_data: Optional[np.ndarray] = None - - # indexes which slice into the above arrays (which are allocated - # based on source data shm input size) and allow retrieving - # incrementally updated data. - _xy_first: int = 0 - _xy_last: int = 0 + fmtr: IncrementalFormatter # output graphics rendering, the main object # processed in ``QGraphicsObject.paint()`` @@ -729,58 +693,11 @@ class Renderer(msgspec.Struct): _last_uppx: float = 0 _in_ds: bool = False - # incremental update state(s) - _last_vr: Optional[tuple[float, float]] = None - _last_ivr: Optional[tuple[float, float]] = None - - def diff( - self, - new_read: tuple[np.ndarray], - - ) -> tuple[ - np.ndarray, - np.ndarray, - ]: - ( - last_xfirst, - last_xlast, - last_array, - last_ivl, - last_ivr, - last_in_view, - ) = self.last_read - - # TODO: can the renderer just call ``Flow.read()`` directly? - # unpack latest source data read - ( - xfirst, - xlast, - array, - ivl, - ivr, - in_view, - ) = new_read - - # compute the length diffs between the first/last index entry in - # the input data and the last indexes we have on record from the - # last time we updated the curve index. - prepend_length = int(last_xfirst - xfirst) - append_length = int(xlast - last_xlast) - - # blah blah blah - # do diffing for prepend, append and last entry - return ( - slice(xfirst, last_xfirst), - prepend_length, - append_length, - slice(last_xlast, xlast), - ) - def draw_path( self, x: np.ndarray, y: np.ndarray, - connect: Union[str, np.ndarray] = 'all', + connect: str | np.ndarray = 'all', path: Optional[QPainterPath] = None, redraw: bool = False, @@ -858,166 +775,50 @@ class Renderer(msgspec.Struct): ''' # TODO: can the renderer just call ``Flow.read()`` directly? # unpack latest source data read + fmtr = self.fmtr + ( - xfirst, - xlast, + _, + _, array, ivl, ivr, in_view, ) = new_read - ( - pre_slice, - prepend_length, - append_length, - post_slice, - ) = self.diff(new_read) - - if self.update_xy: - - shm = self.flow.shm - - if self.y_data is None: - # we first need to allocate xy data arrays - # from the source data. - assert self.allocate_xy - self.x_data, self.y_data = self.allocate_xy( - shm, - array_key, - ) - self._xy_first = shm._first.value - self._xy_last = shm._last.value - profiler('allocated xy history') - - if prepend_length: - y_prepend = shm._array[pre_slice] - - if read_from_key: - y_prepend = y_prepend[array_key] - - xy_data, xy_slice = self.update_xy( - shm, - array_key, - - # this is the pre-sliced, "normally expected" - # new data that an updater would normally be - # expected to process, however in some cases (like - # step curves) the updater routine may want to do - # the source history-data reading itself, so we pass - # both here. - y_prepend, - - pre_slice, - prepend_length, - self._xy_first, - self._xy_last, - is_append=False, - ) - self.y_data[xy_slice] = xy_data - self._xy_first = shm._first.value - profiler('prepended xy history: {prepend_length}') - - if append_length: - y_append = shm._array[post_slice] - - if read_from_key: - y_append = y_append[array_key] - - xy_data, xy_slice = self.update_xy( - shm, - array_key, - - y_append, - post_slice, - append_length, - - self._xy_first, - self._xy_last, - is_append=True, - ) - # self.y_data[post_slice] = xy_data - # self.y_data[xy_slice or post_slice] = xy_data - self.y_data[xy_slice] = xy_data - self._xy_last = shm._last.value - profiler('appened xy history: {append_length}') - - if use_vr: - array = in_view - # else: - # ivl, ivr = xfirst, xlast - - hist = array[:slice_to_head] - # xy-path data transform: convert source data to a format # able to be passed to a `QPainterPath` rendering routine. - if not len(hist): + fmt_out = fmtr.format_to_1d( + new_read, + array_key, + profiler, + + slice_to_head=slice_to_head, + read_src_from_key=read_from_key, + slice_to_inview=use_vr, + ) + + # no history in view case + if not fmt_out: # XXX: this might be why the profiler only has exits? return - x_out, y_out, connect = self.format_xy( - self, - # TODO: hist here should be the pre-sliced - # x/y_data in the case where allocate_xy is - # defined? - hist, - array_key, - (ivl, ivr), - ) + ( + x_1d, + y_1d, + connect, + prepend_length, + append_length, + view_changed, - profiler('sliced input arrays') - - if ( - use_vr - ): - # if a view range is passed, plan to draw the - # source ouput that's "in view" of the chart. - view_range = (ivl, ivr) - # print(f'{self._name} vr: {view_range}') - - profiler(f'view range slice {view_range}') - - vl, vr = view_range - - zoom_or_append = False - last_vr = self._last_vr - last_ivr = self._last_ivr or vl, vr - - # incremental in-view data update. - if last_vr: - # relative slice indices - lvl, lvr = last_vr - # abs slice indices - al, ar = last_ivr - - # left_change = abs(x_iv[0] - al) >= 1 - # right_change = abs(x_iv[-1] - ar) >= 1 - - if ( - # likely a zoom view change - (vr - lvr) > 2 or vl < lvl - # append / prepend update - # we had an append update where the view range - # didn't change but the data-viewed (shifted) - # underneath, so we need to redraw. - # or left_change and right_change and last_vr == view_range - - # not (left_change and right_change) and ivr - # ( - # or abs(x_iv[ivr] - livr) > 1 - ): - zoom_or_append = True - - self._last_vr = view_range - if len(x_out): - self._last_ivr = x_out[0], x_out[slice_to_head] + ) = fmt_out # redraw conditions if ( prepend_length > 0 or new_sample_rate or append_length > 0 - or zoom_or_append + or view_changed ): should_redraw = True @@ -1039,9 +840,9 @@ class Renderer(msgspec.Struct): elif should_ds and uppx > 1: - x_out, y_out, ymn, ymx = xy_downsample( - x_out, - y_out, + x_1d, y_1d, ymn, ymx = xy_downsample( + x_1d, + y_1d, uppx, ) self.flow.yrange = ymn, ymx @@ -1052,8 +853,8 @@ class Renderer(msgspec.Struct): self._in_ds = True path = self.draw_path( - x=x_out, - y=y_out, + x=x_1d, + y=y_1d, connect=connect, path=path, redraw=True, @@ -1088,8 +889,8 @@ class Renderer(msgspec.Struct): and not should_redraw ): print(f'{array_key} append len: {append_length}') - new_x = x_out[-append_length - 2:] # slice_to_head] - new_y = y_out[-append_length - 2:] # slice_to_head] + new_x = x_1d[-append_length - 2:] # slice_to_head] + new_y = y_1d[-append_length - 2:] # slice_to_head] profiler('sliced append path') profiler( @@ -1137,10 +938,4 @@ class Renderer(msgspec.Struct): self.path = path self.fast_path = fast_path - # TODO: eventually maybe we can implement some kind of - # transform on the ``QPainterPath`` that will more or less - # detect the diff in "elements" terms? - # update diff state since we've now rendered paths. - self.last_read = new_read - return self.path, array, reset diff --git a/piker/ui/_pathops.py b/piker/ui/_pathops.py index 5e630570..5a73a4ad 100644 --- a/piker/ui/_pathops.py +++ b/piker/ui/_pathops.py @@ -20,7 +20,6 @@ Super fast ``QPainterPath`` generation related operator routines. from __future__ import annotations from typing import ( Optional, - Callable, TYPE_CHECKING, ) @@ -29,7 +28,7 @@ import numpy as np from numpy.lib import recfunctions as rfn from numba import njit, float64, int64 # , optional # import pyqtgraph as pg -from PyQt5 import QtGui +# from PyQt5 import QtGui # from PyQt5.QtCore import QLineF, QPointF from ..data._sharedmem import ( @@ -41,7 +40,11 @@ from ._compression import ( ) if TYPE_CHECKING: - from ._flows import Renderer + from ._flows import ( + Renderer, + Flow, + ) + from .._profile import Profiler def by_index_and_key( @@ -59,39 +62,736 @@ def by_index_and_key( class IncrementalFormatter(msgspec.Struct): + ''' + Incrementally updating, pre-path-graphics tracking, formatter. + Allows tracking source data state in an updateable pre-graphics + ``np.ndarray`` format (in local process memory) as well as + incrementally rendering from that format **to** 1d x/y for path + generation using ``pg.functions.arrayToQPath()``. + + ''' shm: ShmArray + flow: Flow - # optional pre-graphics xy formatted data which - # is incrementally updated in sync with the source data. - allocate_xy_nd: Optional[Callable[ - [int, slice], - tuple[np.ndarray, np.nd.array] - ]] = None + # last read from shm (usually due to an update call) + _last_read: tuple[ + int, + int, + np.ndarray - incr_update_xy_nd: Optional[Callable[ - [int, slice], None] - ] = None + ] - # default just returns index, and named array from data - format_xy_nd_to_1d: Callable[ - [np.ndarray, str], - tuple[np.ndarray] - ] = by_index_and_key + @property + def last_read(self) -> tuple | None: + return self._last_read + def __repr__(self) -> str: + msg = ( + f'{type(self)}: ->\n\n' + f'fqsn={self.flow.name}\n' + f'shm_name={self.shm.token["shm_name"]}\n\n' + + f'last_vr={self._last_vr}\n' + f'last_ivdr={self._last_ivdr}\n\n' + + f'xy_nd_start={self.xy_nd_start}\n' + f'xy_nd_stop={self.xy_nd_stop}\n\n' + ) + + x_nd_len = 0 + y_nd_len = 0 + if self.x_nd is not None: + x_nd_len = len(self.x_nd) + y_nd_len = len(self.y_nd) + + msg += ( + f'x_nd_len={x_nd_len}\n' + f'y_nd_len={y_nd_len}\n' + ) + + return msg + + def diff( + self, + new_read: tuple[np.ndarray], + + ) -> tuple[ + np.ndarray, + np.ndarray, + ]: + ( + last_xfirst, + last_xlast, + last_array, + last_ivl, + last_ivr, + last_in_view, + ) = self.last_read + + # TODO: can the renderer just call ``Flow.read()`` directly? + # unpack latest source data read + ( + xfirst, + xlast, + array, + ivl, + ivr, + in_view, + ) = new_read + + # compute the length diffs between the first/last index entry in + # the input data and the last indexes we have on record from the + # last time we updated the curve index. + prepend_length = int(last_xfirst - xfirst) + append_length = int(xlast - last_xlast) + + # blah blah blah + # do diffing for prepend, append and last entry + return ( + slice(xfirst, last_xfirst), + prepend_length, + append_length, + slice(last_xlast, xlast), + ) + + # Incrementally updated xy ndarray formatted data, a pre-1d + # format which is updated and cached independently of the final + # pre-graphics-path 1d format. x_nd: Optional[np.ndarray] = None y_nd: Optional[np.ndarray] = None - x_1d: Optional[np.ndarray] = None - y_1d: Optional[np.ndarray] = None - # indexes which slice into the above arrays (which are allocated # based on source data shm input size) and allow retrieving # incrementally updated data. - # _xy_first: int = 0 - # _xy_last: int = 0 xy_nd_start: int = 0 - xy_nd_end: int = 0 + xy_nd_stop: int = 0 + + # TODO: eventually incrementally update 1d-pre-graphics path data? + # x_1d: Optional[np.ndarray] = None + # y_1d: Optional[np.ndarray] = None + + # incremental view-change state(s) tracking + _last_vr: tuple[float, float] | None = None + _last_ivdr: tuple[float, float] | None = None + + def _track_inview_range( + self, + view_range: tuple[int, int], + + ) -> bool: + # if a view range is passed, plan to draw the + # source ouput that's "in view" of the chart. + vl, vr = view_range + zoom_or_append = False + last_vr = self._last_vr + + # incremental in-view data update. + if last_vr: + lvl, lvr = last_vr # relative slice indices + + # TODO: detecting more specifically the interaction changes + # last_ivr = self._last_ivdr or (vl, vr) + # al, ar = last_ivr # abs slice indices + # left_change = abs(x_iv[0] - al) >= 1 + # right_change = abs(x_iv[-1] - ar) >= 1 + + # likely a zoom/pan view change or data append update + if ( + (vr - lvr) > 2 + or vl < lvl + + # append / prepend update + # we had an append update where the view range + # didn't change but the data-viewed (shifted) + # underneath, so we need to redraw. + # or left_change and right_change and last_vr == view_range + + # not (left_change and right_change) and ivr + # ( + # or abs(x_iv[ivr] - livr) > 1 + ): + zoom_or_append = True + + self._last_vr = view_range + + return zoom_or_append + + def format_to_1d( + self, + new_read: tuple, + array_key: str, + profiler: Profiler, + + slice_to_head: int = -1, + read_src_from_key: bool = True, + slice_to_inview: bool = True, + + ) -> tuple[ + np.ndarray, + np.ndarray, + ]: + shm = self.shm + + ( + _, + _, + array, + ivl, + ivr, + in_view, + + ) = new_read + + ( + pre_slice, + prepend_len, + append_len, + post_slice, + ) = self.diff(new_read) + + if self.y_nd is None: + # we first need to allocate xy data arrays + # from the source data. + self.x_nd, self.y_nd = self.allocate_xy_nd( + shm, + array_key, + ) + self.xy_nd_start = shm._first.value + self.xy_nd_stop = shm._last.value + profiler('allocated xy history') + + if prepend_len: + y_prepend = shm._array[pre_slice] + if read_src_from_key: + y_prepend = y_prepend[array_key] + + ( + new_y_nd, + y_nd_slc, + + ) = self.incr_update_xy_nd( + shm, + array_key, + + # this is the pre-sliced, "normally expected" + # new data that an updater would normally be + # expected to process, however in some cases (like + # step curves) the updater routine may want to do + # the source history-data reading itself, so we pass + # both here. + y_prepend, + pre_slice, + prepend_len, + + self.xy_nd_start, + self.xy_nd_stop, + is_append=False, + ) + + # y_nd_view = self.y_nd[y_nd_slc] + self.y_nd[y_nd_slc] = new_y_nd + # if read_src_from_key: + # y_nd_view[:][array_key] = new_y_nd + # else: + # y_nd_view[:] = new_y_nd + + self.xy_nd_start = shm._first.value + profiler('prepended xy history: {prepend_length}') + + if append_len: + y_append = shm._array[post_slice] + if read_src_from_key: + y_append = y_append[array_key] + + ( + new_y_nd, + y_nd_slc, + + ) = self.incr_update_xy_nd( + shm, + array_key, + + y_append, + post_slice, + append_len, + + self.xy_nd_start, + self.xy_nd_stop, + is_append=True, + ) + # self.y_nd[post_slice] = new_y_nd + # self.y_nd[xy_slice or post_slice] = xy_data + self.y_nd[y_nd_slc] = new_y_nd + # if read_src_from_key: + # y_nd_view[:][array_key] = new_y_nd + # else: + # y_nd_view[:] = new_y_nd + + self.xy_nd_stop = shm._last.value + profiler('appened xy history: {append_length}') + + view_changed: bool = False + view_range: tuple[int, int] = (ivl, ivr) + if slice_to_inview: + view_changed = self._track_inview_range(view_range) + array = in_view + profiler(f'{self.flow.name} view range slice {view_range}') + + hist = array[:slice_to_head] + + # xy-path data transform: convert source data to a format + # able to be passed to a `QPainterPath` rendering routine. + if not len(hist): + # XXX: this might be why the profiler only has exits? + return + + # TODO: hist here should be the pre-sliced + # x/y_data in the case where allocate_xy is + # defined? + x_1d, y_1d, connect = self.format_xy_nd_to_1d( + hist, + array_key, + view_range, + ) + + # update the last "in view data range" + if len(x_1d): + self._last_ivdr = x_1d[0], x_1d[slice_to_head] + + # TODO: eventually maybe we can implement some kind of + # transform on the ``QPainterPath`` that will more or less + # detect the diff in "elements" terms? + # update diff state since we've now rendered paths. + self._last_read = new_read + + profiler('.format_to_1d()') + return ( + x_1d, + y_1d, + connect, + prepend_len, + append_len, + view_changed, + ) + + ############################### + # Sub-type override interface # + ############################### + + # optional pre-graphics xy formatted data which + # is incrementally updated in sync with the source data. + # XXX: was ``.allocate_xy()`` + def allocate_xy_nd( + self, + src_shm: ShmArray, + data_field: str, + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, # x + np.nd.array # y + ]: + ''' + Convert the structured-array ``src_shm`` format to + a equivalently shaped (and field-less) ``np.ndarray``. + + Eg. a 4 field x N struct-array => (N, 4) + + ''' + y_nd = src_shm._array[data_field].copy() + x_nd = src_shm._array[index_field].copy() + return x_nd, y_nd + + # XXX: was ``.update_xy()`` + def incr_update_xy_nd( + self, + + src_shm: ShmArray, + data_field: str, + + new_from_src: np.ndarray, # portion of source that was updated + + read_slc: slice, + ln: int, # len of updated + + nd_start: int, + nd_stop: int, + + is_append: bool, + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, + slice, + ]: + # write pushed data to flattened copy + new_y_nd = new_from_src + + # XXX + # TODO: this should be returned and written by caller! + # XXX + # generate same-valued-per-row x support based on y shape + if index_field != 'index': + self.x_nd[read_slc, :] = new_from_src[index_field] + + return new_y_nd, read_slc + + # XXX: was ``.format_xy()`` + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + ) -> tuple[ + np.ndarray, # 1d x + np.ndarray, # 1d y + np.ndarray | str, # connection array/style + ]: + ''' + Default xy-nd array to 1d pre-graphics-path render routine. + + Return single field column data verbatim + + ''' + return ( + array['index'], + array[array_key], + + # 1d connection array or style-key to + # ``pg.functions.arrayToQPath()`` + 'all', + ) + + +class OHLCBarsFmtr(IncrementalFormatter): + + fields: list[str] = ['open', 'high', 'low', 'close'] + + def allocate_xy_nd( + self, + + ohlc_shm: ShmArray, + data_field: str, + + ) -> tuple[ + np.ndarray, # x + np.nd.array # y + ]: + ''' + Convert an input struct-array holding OHLC samples into a pair of + flattened x, y arrays with the same size (datums wise) as the source + data. + + ''' + y_nd = ohlc_shm.ustruct(self.fields) + + # generate an flat-interpolated x-domain + x_nd = ( + np.broadcast_to( + ohlc_shm._array['index'][:, None], + ( + ohlc_shm._array.size, + # 4, # only ohlc + y_nd.shape[1], + ), + ) + np.array([-0.5, 0, 0, 0.5]) + ) + assert y_nd.any() + + # write pushed data to flattened copy + return ( + x_nd, + y_nd, + ) + + @staticmethod + @njit( + # TODO: for now need to construct this manually for readonly + # arrays, see https://github.com/numba/numba/issues/4511 + # ntypes.tuple((float64[:], float64[:], float64[:]))( + # numba_ohlc_dtype[::1], # contiguous + # int64, + # optional(float64), + # ), + nogil=True + ) + def path_arrays_from_ohlc( + data: np.ndarray, + start: int64, + bar_gap: float64 = 0.43, + + ) -> tuple[ + np.ndarray, + np.ndarray, + np.ndarray, + ]: + ''' + Generate an array of lines objects from input ohlc data. + + ''' + size = int(data.shape[0] * 6) + + x = np.zeros( + # data, + shape=size, + dtype=float64, + ) + y, c = x.copy(), x.copy() + + # TODO: report bug for assert @ + # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 + for i, q in enumerate(data[start:], start): + + # TODO: ask numba why this doesn't work.. + # open, high, low, close, index = q[ + # ['open', 'high', 'low', 'close', 'index']] + + open = q['open'] + high = q['high'] + low = q['low'] + close = q['close'] + index = float64(q['index']) + + istart = i * 6 + istop = istart + 6 + + # x,y detail the 6 points which connect all vertexes of a ohlc bar + x[istart:istop] = ( + index - bar_gap, + index, + index, + index, + index, + index + bar_gap, + ) + y[istart:istop] = ( + open, + open, + low, + high, + close, + close, + ) + + # specifies that the first edge is never connected to the + # prior bars last edge thus providing a small "gap"/"space" + # between bars determined by ``bar_gap``. + c[istart:istop] = (1, 1, 1, 1, 1, 0) + + return x, y, c + + # TODO: can we drop this frame and just use the above? + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + start: int = 0, # XXX: do we need this? + # 0.5 is no overlap between arms, 1.0 is full overlap + w: float = 0.43, + + ) -> tuple[ + np.ndarray, + np.ndarray, + np.ndarray, + ]: + ''' + More or less direct proxy to the ``numba``-fied + ``path_arrays_from_ohlc()`` (above) but with closed in kwargs + for line spacing. + + ''' + x, y, c = self.path_arrays_from_ohlc( + array, + start, + bar_gap=w, + ) + return x, y, c + + def incr_update_xy_nd( + self, + + src_shm: ShmArray, + data_field: str, + + new_from_src: np.ndarray, # portion of source that was updated + + read_slc: slice, + ln: int, # len of updated + + nd_start: int, + nd_stop: int, + + is_append: bool, + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, + slice, + ]: + # write newly pushed data to flattened copy + # a struct-arr is always passed in. + new_y_nd = rfn.structured_to_unstructured( + new_from_src[self.fields] + ) + + # XXX + # TODO: this should be returned and written by caller! + # XXX + # generate same-valued-per-row x support based on y shape + if index_field != 'index': + self.x_nd[read_slc, :] = new_from_src[index_field] + + return new_y_nd, read_slc + + +class OHLCBarsAsCurveFmtr(OHLCBarsFmtr): + + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + ) -> tuple[ + np.ndarray, + np.ndarray, + str, + ]: + # TODO: in the case of an existing ``.update_xy()`` + # should we be passing in array as an xy arrays tuple? + + # 2 more datum-indexes to capture zero at end + x_flat = self.x_nd[self.xy_nd_start:self.xy_nd_stop] + y_flat = self.y_nd[self.xy_nd_start:self.xy_nd_stop] + + # slice to view + ivl, ivr = vr + x_iv_flat = x_flat[ivl:ivr] + y_iv_flat = y_flat[ivl:ivr] + + # reshape to 1d for graphics rendering + y_iv = y_iv_flat.reshape(-1) + x_iv = x_iv_flat.reshape(-1) + + return x_iv, y_iv, 'all' + + +class StepCurveFmtr(IncrementalFormatter): + + def allocate_xy_nd( + self, + + shm: ShmArray, + data_field: str, + + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, # x + np.nd.array # y + ]: + ''' + Convert an input 1d shm array to a "step array" format + for use by path graphics generation. + + ''' + i = shm._array['index'].copy() + out = shm._array[data_field].copy() + + x_out = np.broadcast_to( + i[:, None], + (i.size, 2), + ) + np.array([-0.5, 0.5]) + + y_out = np.empty((len(out), 2), dtype=out.dtype) + y_out[:] = out[:, np.newaxis] + + # start y at origin level + y_out[0, 0] = 0 + return x_out, y_out + + def incr_update_xy_nd( + self, + + src_shm: ShmArray, + array_key: str, + + src_update: np.ndarray, # portion of source that was updated + slc: slice, + ln: int, # len of updated + + first: int, + last: int, + + is_append: bool, + + ) -> tuple[ + np.ndarray, + slice, + ]: + # for a step curve we slice from one datum prior + # to the current "update slice" to get the previous + # "level". + if is_append: + start = max(last - 1, 0) + end = src_shm._last.value + new_y = src_shm._array[start:end][array_key] + slc = slice(start, end) + + else: + new_y = src_update + + return ( + np.broadcast_to( + new_y[:, None], (new_y.size, 2), + ), + slc, + ) + + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + ) -> tuple[ + np.ndarray, + np.ndarray, + str, + ]: + # 2 more datum-indexes to capture zero at end + x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2] + y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2] + + lasts = array[['index', array_key]] + last = lasts[array_key][-1] + y_step[-1] = last + + # slice out in-view data + ivl, ivr = vr + ys_iv = y_step[ivl:ivr+1] + xs_iv = x_step[ivl:ivr+1] + + # flatten to 1d + y_iv = ys_iv.reshape(ys_iv.size) + x_iv = xs_iv.reshape(xs_iv.size) + + # print( + # f'ys_iv : {ys_iv[-s:]}\n' + # f'y_iv: {y_iv[-s:]}\n' + # f'xs_iv: {xs_iv[-s:]}\n' + # f'x_iv: {x_iv[-s:]}\n' + # ) + + return x_iv, y_iv, 'all' def xy_downsample( @@ -107,7 +807,11 @@ def xy_downsample( float, float, ]: + ''' + Downsample 1D (flat ``numpy.ndarray``) arrays using M4 given an input + ``uppx`` (units-per-pixel) and add space between discreet datums. + ''' # downsample whenever more then 1 pixels per datum can be shown. # always refresh data bounds until we get diffing # working properly, see above.. @@ -125,289 +829,3 @@ def xy_downsample( y = y.flatten() return x, y, ymn, ymx - - -@njit( - # TODO: for now need to construct this manually for readonly arrays, see - # https://github.com/numba/numba/issues/4511 - # ntypes.tuple((float64[:], float64[:], float64[:]))( - # numba_ohlc_dtype[::1], # contiguous - # int64, - # optional(float64), - # ), - nogil=True -) -def path_arrays_from_ohlc( - data: np.ndarray, - start: int64, - bar_gap: float64 = 0.43, - -) -> np.ndarray: - ''' - Generate an array of lines objects from input ohlc data. - - ''' - size = int(data.shape[0] * 6) - - x = np.zeros( - # data, - shape=size, - dtype=float64, - ) - y, c = x.copy(), x.copy() - - # TODO: report bug for assert @ - # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 - for i, q in enumerate(data[start:], start): - - # TODO: ask numba why this doesn't work.. - # open, high, low, close, index = q[ - # ['open', 'high', 'low', 'close', 'index']] - - open = q['open'] - high = q['high'] - low = q['low'] - close = q['close'] - index = float64(q['index']) - - istart = i * 6 - istop = istart + 6 - - # x,y detail the 6 points which connect all vertexes of a ohlc bar - x[istart:istop] = ( - index - bar_gap, - index, - index, - index, - index, - index + bar_gap, - ) - y[istart:istop] = ( - open, - open, - low, - high, - close, - close, - ) - - # specifies that the first edge is never connected to the - # prior bars last edge thus providing a small "gap"/"space" - # between bars determined by ``bar_gap``. - c[istart:istop] = (1, 1, 1, 1, 1, 0) - - return x, y, c - - -def gen_ohlc_qpath( - r: Renderer, - data: np.ndarray, - array_key: str, # we ignore this - vr: tuple[int, int], - - start: int = 0, # XXX: do we need this? - # 0.5 is no overlap between arms, 1.0 is full overlap - w: float = 0.43, - -) -> QtGui.QPainterPath: - ''' - More or less direct proxy to ``path_arrays_from_ohlc()`` - but with closed in kwargs for line spacing. - - ''' - x, y, c = path_arrays_from_ohlc( - data, - start, - bar_gap=w, - ) - return x, y, c - - -def ohlc_to_line( - ohlc_shm: ShmArray, - data_field: str, - fields: list[str] = ['open', 'high', 'low', 'close'] - -) -> tuple[ - np.ndarray, - np.ndarray, -]: - ''' - Convert an input struct-array holding OHLC samples into a pair of - flattened x, y arrays with the same size (datums wise) as the source - data. - - ''' - y_out = ohlc_shm.ustruct(fields) - first = ohlc_shm._first.value - last = ohlc_shm._last.value - - # write pushed data to flattened copy - y_out[first:last] = rfn.structured_to_unstructured( - ohlc_shm.array[fields] - ) - - # generate an flat-interpolated x-domain - x_out = ( - np.broadcast_to( - ohlc_shm._array['index'][:, None], - ( - ohlc_shm._array.size, - # 4, # only ohlc - y_out.shape[1], - ), - ) + np.array([-0.5, 0, 0, 0.5]) - ) - assert y_out.any() - - return ( - x_out, - y_out, - ) - - -def update_ohlc_to_line( - src_shm: ShmArray, - array_key: str, - src_update: np.ndarray, - slc: slice, - ln: int, - first: int, - last: int, - is_append: bool, - -) -> np.ndarray: - - fields = ['open', 'high', 'low', 'close'] - return ( - rfn.structured_to_unstructured(src_update[fields]), - slc, - ) - - -def ohlc_flat_to_xy( - r: Renderer, - array: np.ndarray, - array_key: str, - vr: tuple[int, int], - -) -> tuple[ - np.ndarray, - np.nd.array, - str, -]: - # TODO: in the case of an existing ``.update_xy()`` - # should we be passing in array as an xy arrays tuple? - - # 2 more datum-indexes to capture zero at end - x_flat = r.x_data[r._xy_first:r._xy_last] - y_flat = r.y_data[r._xy_first:r._xy_last] - - # slice to view - ivl, ivr = vr - x_iv_flat = x_flat[ivl:ivr] - y_iv_flat = y_flat[ivl:ivr] - - # reshape to 1d for graphics rendering - y_iv = y_iv_flat.reshape(-1) - x_iv = x_iv_flat.reshape(-1) - - return x_iv, y_iv, 'all' - - -def to_step_format( - shm: ShmArray, - data_field: str, - index_field: str = 'index', - -) -> tuple[int, np.ndarray, np.ndarray]: - ''' - Convert an input 1d shm array to a "step array" format - for use by path graphics generation. - - ''' - i = shm._array['index'].copy() - out = shm._array[data_field].copy() - - x_out = np.broadcast_to( - i[:, None], - (i.size, 2), - ) + np.array([-0.5, 0.5]) - - y_out = np.empty((len(out), 2), dtype=out.dtype) - y_out[:] = out[:, np.newaxis] - - # start y at origin level - y_out[0, 0] = 0 - return x_out, y_out - - -def update_step_xy( - src_shm: ShmArray, - array_key: str, - y_update: np.ndarray, - slc: slice, - ln: int, - first: int, - last: int, - is_append: bool, - -) -> np.ndarray: - - # for a step curve we slice from one datum prior - # to the current "update slice" to get the previous - # "level". - if is_append: - start = max(last - 1, 0) - end = src_shm._last.value - new_y = src_shm._array[start:end][array_key] - slc = slice(start, end) - - else: - new_y = y_update - - return ( - np.broadcast_to( - new_y[:, None], (new_y.size, 2), - ), - slc, - ) - - -def step_to_xy( - r: Renderer, - array: np.ndarray, - array_key: str, - vr: tuple[int, int], - -) -> tuple[ - np.ndarray, - np.nd.array, - str, -]: - - # 2 more datum-indexes to capture zero at end - x_step = r.x_data[r._xy_first:r._xy_last+2] - y_step = r.y_data[r._xy_first:r._xy_last+2] - - lasts = array[['index', array_key]] - last = lasts[array_key][-1] - y_step[-1] = last - - # slice out in-view data - ivl, ivr = vr - ys_iv = y_step[ivl:ivr+1] - xs_iv = x_step[ivl:ivr+1] - - # flatten to 1d - y_iv = ys_iv.reshape(ys_iv.size) - x_iv = xs_iv.reshape(xs_iv.size) - - # print( - # f'ys_iv : {ys_iv[-s:]}\n' - # f'y_iv: {y_iv[-s:]}\n' - # f'xs_iv: {xs_iv[-s:]}\n' - # f'x_iv: {x_iv[-s:]}\n' - # ) - - return x_iv, y_iv, 'all'