Implement pre-graphics format incremental update

Adds a new pre-graphics data-format callback incremental update api to
our `Renderer`. `Renderer` instance can now overload these custom routines:

- `.update_xy()` a routine which accepts the latest [pre/a]pended data
  sliced out from shm and returns it in a format suitable to store in
  the optional `.[x/y]_data` arrays.
- `.allocate_xy()` which initially does the work of pre-allocating the
   `.[x/y]_data` arrays based on the source shm sizing such that new
   data can be filled in (to memory).
- `._xy_[first/last]: int` attrs to track index diffs between src shm
  and the xy format data updates.

Implement the step curve data format with 3 super simple routines:
- `.allocate_xy()` -> `._pathops.to_step_format()`
- `.update_xy()` -> `._flows.update_step_xy()`
- `.format_xy()` -> `._flows.step_to_xy()`

Further, adjust `._pathops.gen_ohlc_qpath()` to adhere to the new
call signature.
incremental_update_paths
Tyler Goodlet 2022-05-25 11:15:46 -04:00
parent 42572d3808
commit 04897fd402
2 changed files with 215 additions and 209 deletions

View File

@ -144,8 +144,9 @@ def render_baritems(
# if no source data renderer exists create one.
self = flow
r = self._src_r
show_bars: bool = False
r = self._src_r
if not r:
show_bars = True
# OHLC bars path renderer
@ -188,7 +189,7 @@ def render_baritems(
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to
# - if instead we are in a downsamplig state then we to
x_gt = 6
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
@ -212,6 +213,7 @@ def render_baritems(
if should_line:
fields = ['open', 'high', 'low', 'close']
if self.gy is None:
# create a flattened view onto the OHLC array
# which can be read as a line-style format
@ -373,119 +375,75 @@ def render_baritems(
)
def update_step_data(
flow: Flow,
shm: ShmArray,
ivl: int,
ivr: int,
def update_step_xy(
src_shm: ShmArray,
array_key: str,
iflat_first: int,
iflat: int,
profiler: pg.debug.Profiler,
y_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> tuple:
) -> np.ndarray:
self = flow
(
# iflat_first,
# iflat,
ishm_last,
ishm_first,
) = (
# self._iflat_first,
# self._iflat_last,
shm._last.value,
shm._first.value
)
il = max(iflat - 1, 0)
profiler('read step mode incr update indices')
# for a step curve we slice from one datum prior
# to the current "update slice" to get the previous
# "level".
if is_append:
start = max(last - 1, 0)
end = src_shm._last.value
new_y = src_shm._array[start:end][array_key]
slc = slice(start, end)
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
else:
new_y = y_update
print(f'prepend {array_key}')
# i_prepend = self.shm._array['index'][
# ishm_first:iflat_first]
y_prepend = self.shm._array[array_key][
ishm_first:iflat_first
]
y2_prepend = np.broadcast_to(
y_prepend[:, None], (y_prepend.size, 2),
)
# write newly prepended data to flattened copy
self.gy[ishm_first:iflat_first] = y2_prepend
self._iflat_first = ishm_first
profiler('prepended step mode history')
append_diff = ishm_last - iflat
if append_diff:
# slice up to the last datum since last index/append update
# new_x = self.shm._array[il:ishm_last]['index']
new_y = self.shm._array[il:ishm_last][array_key]
new_y2 = np.broadcast_to(
return (
np.broadcast_to(
new_y[:, None], (new_y.size, 2),
),
slc,
)
self.gy[il:ishm_last] = new_y2
profiler('updated step curve data')
# print(
# f'append size: {append_diff}\n'
# f'new_x: {new_x}\n'
# f'new_y: {new_y}\n'
# f'new_y2: {new_y2}\n'
# f'new gy: {gy}\n'
# )
# update local last-index tracking
self._iflat_last = ishm_last
def step_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
# slice out up-to-last step contents
x_step = self.gx[ishm_first:ishm_last+2]
# shape to 1d
x = x_step.reshape(-1)
profiler('sliced step x')
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
y_step = self.gy[ishm_first:ishm_last+2]
lasts = self.shm.array[['index', array_key]]
# 2 more datum-indexes to capture zero at end
x_step = r.x_data[r._xy_first:r._xy_last+2]
y_step = r.y_data[r._xy_first:r._xy_last+2]
lasts = array[['index', array_key]]
last = lasts[array_key][-1]
y_step[-1] = last
# shape to 1d
y = y_step.reshape(-1)
# s = 6
# print(f'lasts: {x[-2*s:]}, {y[-2*s:]}')
profiler('sliced step y')
# do all the same for only in-view data
# slice out in-view data
ivl, ivr = vr
ys_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr+1]
# flatten to 1d
y_iv = ys_iv.reshape(ys_iv.size)
x_iv = xs_iv.reshape(xs_iv.size)
# print(
# f'ys_iv : {ys_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n'
# f'xs_iv: {xs_iv[-s:]}\n'
# f'x_iv: {x_iv[-s:]}\n'
# )
profiler('sliced in view step data')
# legacy full-recompute-everytime method
# x, y = ohlc_flatten(array)
# x_iv, y_iv = ohlc_flatten(in_view)
# profiler('flattened OHLC data')
return (
x,
y,
x_iv,
y_iv,
append_diff,
)
return x_iv, y_iv, 'all'
class Flow(msgspec.Struct): # , frozen=True):
@ -508,7 +466,7 @@ class Flow(msgspec.Struct): # , frozen=True):
render: bool = True # toggle for display loop
# pre-graphics formatted data
gy: Optional[ShmArray] = None
gy: Optional[np.ndarray] = None
gx: Optional[np.ndarray] = None
# pre-graphics update indices
@ -723,9 +681,9 @@ class Flow(msgspec.Struct): # , frozen=True):
out: Optional[tuple] = None
if isinstance(graphics, BarItems):
draw_last = False
# XXX: special case where we change out graphics
# to a line after a certain uppx threshold.
# render_baritems(
out = render_baritems(
self,
graphics,
@ -739,19 +697,8 @@ class Flow(msgspec.Struct): # , frozen=True):
# return graphics
r = self._src_r
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
# draw_path=gen_ohlc_qpath,
last_read=read,
)
# ``FastAppendCurve`` case:
array_key = array_key or self.name
shm = self.shm
if out is not None:
# hack to handle ds curve from bars above
@ -763,7 +710,49 @@ class Flow(msgspec.Struct): # , frozen=True):
y_iv,
) = out
input_data = out[1:]
# breakpoint()
r = self._src_r
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
# draw_path=gen_ohlc_qpath,
last_read=read,
)
if graphics._step_mode:
r.allocate_xy = to_step_format
r.update_xy = update_step_xy
r.format_xy = step_to_xy
slice_to_head = -2
# TODO: append logic inside ``.render()`` isn't
# corrent yet for step curves.. remove this to see it.
should_redraw = True
# TODO: remove this and instead place all step curve
# updating into pre-path data render callbacks.
# full input data
x = array['index']
y = array[array_key]
x_last = x[-1]
y_last = y[-1]
w = 0.5
graphics._last_line = QLineF(
x_last - w, 0,
x_last + w, 0,
)
graphics._last_step_rect = QRectF(
x_last - w, 0,
x_last + w, y_last,
)
# should_redraw = bool(append_diff)
draw_last = False
# ds update config
new_sample_rate: bool = False
@ -780,7 +769,7 @@ class Flow(msgspec.Struct): # , frozen=True):
uppx > 1
and abs(uppx_diff) >= 1
):
log.info(
log.debug(
f'{array_key} sampler change: {self._last_uppx} -> {uppx}'
)
self._last_uppx = uppx
@ -801,69 +790,6 @@ class Flow(msgspec.Struct): # , frozen=True):
should_ds = False
showing_src_data = True
if graphics._step_mode:
slice_to_head = -2
# TODO: remove this and instead place all step curve
# updating into pre-path data render callbacks.
# full input data
x = array['index']
y = array[array_key]
x_last = x[-1]
y_last = y[-1]
# inview data
x_iv = in_view['index']
y_iv = in_view[array_key]
if self.gy is None:
(
self._iflat_first,
self.gx,
self.gy,
) = to_step_format(
shm,
array_key,
)
profiler('generated step mode data')
out = (
x,
y,
x_iv,
y_iv,
append_diff,
) = update_step_data(
self,
shm,
ivl,
ivr,
array_key,
self._iflat_first,
self._iflat_last,
profiler,
)
input_data = out[:-1]
w = 0.5
graphics._last_line = QLineF(
x_last - 0.5, 0,
x_last + 0.5, 0,
)
graphics._last_step_rect = QRectF(
x_last - 0.5, 0,
x_last + 0.5, y_last,
)
should_redraw = bool(append_diff)
draw_last = False
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
# prepend_length, append_length = r.diff(read)
# MAIN RENDER LOGIC:
# - determine in view data and redraw on range change
# - determine downsampling ops if needed
@ -913,8 +839,10 @@ class Flow(msgspec.Struct): # , frozen=True):
def by_index_and_key(
renderer: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
@ -936,15 +864,31 @@ class Renderer(msgspec.Struct):
tuple[np.ndarray]
] = by_index_and_key
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
allocate_xy: Optional[Callable[
[int, slice],
tuple[np.ndarray, np.nd.array]
]] = None
update_xy: Optional[Callable[
[int, slice], None]
] = None
x_data: Optional[np.ndarray] = None
y_data: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
_xy_first: int = 0
_xy_last: int = 0
# output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()``
path: Optional[QPainterPath] = None
fast_path: Optional[QPainterPath] = None
# called on input data but before any graphics format
# conversions or processing.
format_data: Optional[Callable[ShmArray, np.ndarray]] = None
# XXX: just ideas..
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
@ -998,17 +942,13 @@ class Renderer(msgspec.Struct):
prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast)
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update state
self.last_read = new_read
# blah blah blah
# do diffing for prepend, append and last entry
return (
slice(xfirst, last_xfirst),
prepend_length,
append_length,
slice(last_xlast, xlast),
)
def draw_path(
@ -1103,6 +1043,75 @@ class Renderer(msgspec.Struct):
in_view,
) = new_read
(
pre_slice,
prepend_length,
append_length,
post_slice,
) = self.diff(new_read)
if self.update_xy:
shm = self.flow.shm
if self.y_data is None:
# we first need to allocate xy data arrays
# from the source data.
assert self.allocate_xy
self.x_data, self.y_data = self.allocate_xy(
shm,
array_key,
)
self._xy_first = shm._first.value
self._xy_last = shm._last.value
profiler('allocated xy history')
if prepend_length:
y_prepend = shm._array[array_key][pre_slice]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
y_prepend,
pre_slice,
prepend_length,
self._xy_first,
self._xy_last,
is_append=False,
)
self.y_data[xy_slice] = xy_data
self._xy_first = shm._first.value
profiler('prepended xy history: {prepend_length}')
if append_length:
y_append = shm._array[array_key][post_slice]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
y_append,
post_slice,
append_length,
self._xy_first,
self._xy_last,
is_append=True,
)
# self.y_data[post_slice] = xy_data
# self.y_data[xy_slice or post_slice] = xy_data
self.y_data[xy_slice] = xy_data
self._xy_last = shm._last.value
profiler('appened xy history: {append_length}')
if use_vr:
array = in_view
@ -1120,45 +1129,31 @@ class Renderer(msgspec.Struct):
x_out = x_iv
y_out = y_iv
# last = y_out[slice_to_head]
else:
hist = array[:slice_to_head]
# last = array[slice_to_head]
# maybe allocate shm for data transform output
# if self.format_data is None:
# fshm = self.flow.shm
# shm, opened = maybe_open_shm_array(
# f'{self.flow.name}_data_t',
# # TODO: create entry for each time frame
# dtype=array.dtype,
# readonly=False,
# )
# assert opened
# shm.push(array)
# self.data_t_shm = shm
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
# expected to be incrementally updates and later rendered to
# a more graphics native format.
# if self.data_t:
# array = self.data_t(array)
hist = array[:slice_to_head]
(
x_out,
y_out,
connect,
) = self.format_xy(hist, array_key)
) = self.format_xy(
self,
# TODO: hist here should be the pre-sliced
# x/y_data in the case where allocate_xy is
# defined?
hist,
array_key,
(ivl, ivr),
)
profiler('sliced input arrays')
(
prepend_length,
append_length,
) = self.diff(new_read)
if (
use_vr
):
@ -1330,5 +1325,10 @@ class Renderer(msgspec.Struct):
self.path = path
self.fast_path = fast_path
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self.last_read = new_read
return self.path, array

View File

@ -17,25 +17,30 @@
Super fast ``QPainterPath`` generation related operator routines.
"""
from __future__ import annotations
from typing import (
Optional,
# Optional,
TYPE_CHECKING,
)
import numpy as np
from numpy.lib import recfunctions as rfn
from numba import njit, float64, int64 # , optional
import pyqtgraph as pg
# import pyqtgraph as pg
from PyQt5 import QtGui
# from PyQt5.QtCore import QLineF, QPointF
from ..data._sharedmem import (
ShmArray,
)
from .._profile import pg_profile_enabled, ms_slower_then
# from .._profile import pg_profile_enabled, ms_slower_then
from ._compression import (
ds_m4,
)
if TYPE_CHECKING:
from ._flows import Renderer
def xy_downsample(
x,
@ -138,8 +143,10 @@ def path_arrays_from_ohlc(
def gen_ohlc_qpath(
r: Renderer,
data: np.ndarray,
array_key: str, # we ignore this
vr: tuple[int, int],
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
@ -216,7 +223,6 @@ def to_step_format(
for use by path graphics generation.
'''
first = shm._first.value
i = shm._array['index'].copy()
out = shm._array[data_field].copy()
@ -230,4 +236,4 @@ def to_step_format(
# start y at origin level
y_out[0, 0] = 0
return first, x_out, y_out
return x_out, y_out