Implement OHLC downsampled curve via renderer, drop old bypass code

incremental_update_paths
Tyler Goodlet 2022-05-25 19:45:09 -04:00
parent d4f31f2b3c
commit 066b8df619
2 changed files with 157 additions and 288 deletions

View File

@ -23,7 +23,7 @@ incremental update.
'''
from __future__ import annotations
# from functools import partial
from functools import partial
from typing import (
Optional,
Callable,
@ -45,7 +45,6 @@ from PyQt5.QtCore import (
from ..data._sharedmem import (
ShmArray,
# open_shm_array,
)
from .._profile import (
pg_profile_enabled,
@ -68,6 +67,7 @@ from ..log import get_logger
log = get_logger(__name__)
# class FlowsTable(msgspec.Struct):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
@ -77,44 +77,56 @@ log = get_logger(__name__)
# '''
# flows: dict[str, np.ndarray] = {}
# @classmethod
# def from_token(
# cls,
# shm_token: tuple[
# str,
# str,
# tuple[str, str],
# ],
# ) -> Renderer:
def update_ohlc_to_line(
src_shm: ShmArray,
array_key: str,
src_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
# shm = attach_shm_array(token)
# return cls(shm)
) -> np.ndarray:
def rowarr_to_path(
rows_array: np.ndarray,
x_basis: np.ndarray,
flow: Flow,
) -> QPainterPath:
# TODO: we could in theory use ``numba`` to flatten
# if needed?
# to 1d
y = rows_array.flatten()
return pg.functions.arrayToQPath(
# these get passed at render call time
x=x_basis[:y.size],
y=y,
connect='all',
finiteCheck=False,
path=flow.path,
fields = ['open', 'high', 'low', 'close']
return (
rfn.structured_to_unstructured(src_update[fields]),
slc,
)
def ohlc_flat_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
# TODO: in the case of an existing ``.update_xy()``
# should we be passing in array as an xy arrays tuple?
# 2 more datum-indexes to capture zero at end
x_flat = r.x_data[r._xy_first:r._xy_last]
y_flat = r.y_data[r._xy_first:r._xy_last]
# slice to view
ivl, ivr = vr
x_iv_flat = x_flat[ivl:ivr]
y_iv_flat = y_flat[ivl:ivr]
# reshape to 1d for graphics rendering
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
return x_iv, y_iv, 'all'
def render_baritems(
flow: Flow,
graphics: BarItems,
@ -137,10 +149,7 @@ def render_baritems(
layer, if not a `Renderer` then something just above it?
'''
(
xfirst, xlast, array,
ivl, ivr, in_view,
) = read
bars = graphics
# if no source data renderer exists create one.
self = flow
@ -156,35 +165,28 @@ def render_baritems(
last_read=read,
)
# ds_curve_r = Renderer(
# flow=self,
ds_curve_r = Renderer(
flow=self,
last_read=read,
# # just swap in the flat view
# # data_t=lambda array: self.gy.array,
# last_read=read,
# draw_path=partial(
# rowarr_to_path,
# x_basis=None,
# ),
# incr update routines
allocate_xy=ohlc_to_line,
update_xy=update_ohlc_to_line,
format_xy=ohlc_flat_to_xy,
)
# )
curve = FastAppendCurve(
name='OHLC',
color=graphics._color,
name=f'{flow.name}_ds_ohlc',
color=bars._color,
)
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold.
self._render_table[0] = curve
# (
# # ds_curve_r,
# curve,
# )
self._render_table = (ds_curve_r, curve)
curve = self._render_table[0]
# dsc_r, curve = self._render_table[0]
ds_r, curve = self._render_table
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
@ -197,183 +199,74 @@ def render_baritems(
should_line
and uppx < x_gt
):
print('FLIPPING TO BARS')
# print('FLIPPING TO BARS')
should_line = False
elif (
not should_line
and uppx >= x_gt
):
print('FLIPPING TO LINE')
# print('FLIPPING TO LINE')
should_line = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
fields = ['open', 'high', 'low', 'close']
if self.gy is None:
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm
(
self._iflat_first,
self._iflat_last,
self.gx,
self.gy,
) = ohlc_to_line(
shm,
fields=fields,
)
# print(f'unstruct diff: {time.time() - start}')
gy = self.gy
# update flatted ohlc copy
(
iflat_first,
iflat,
ishm_last,
ishm_first,
) = (
self._iflat_first,
self._iflat_last,
self.shm._last.value,
self.shm._first.value
)
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
# write newly prepended data to flattened copy
gy[
ishm_first:iflat_first
] = rfn.structured_to_unstructured(
self.shm._array[fields][ishm_first:iflat_first]
)
self._iflat_first = ishm_first
to_update = rfn.structured_to_unstructured(
self.shm._array[iflat:ishm_last][fields]
)
gy[iflat:ishm_last][:] = to_update
profiler('updated ustruct OHLC data')
# slice out up-to-last step contents
y_flat = gy[ishm_first:ishm_last]
x_flat = self.gx[ishm_first:ishm_last]
# update local last-index tracking
self._iflat_last = ishm_last
# reshape to 1d for graphics rendering
y = y_flat.reshape(-1)
x = x_flat.reshape(-1)
profiler('flattened ustruct OHLC data')
# do all the same for only in-view data
y_iv_flat = y_flat[ivl:ivr]
x_iv_flat = x_flat[ivl:ivr]
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
profiler('flattened ustruct in-view OHLC data')
# pass into curve graphics processing
# curve.update_from_array(
# x,
# y,
# x_iv=x_iv,
# y_iv=y_iv,
# view_range=(ivl, ivr), # hack
# profiler=profiler,
# # should_redraw=False,
# # NOTE: already passed through by display loop?
# # do_append=uppx < 16,
# **kwargs,
# )
curve.draw_last(x, y)
curve.show()
r = ds_r
graphics = curve
profiler('updated ds curve')
else:
# render incremental or in-view update
# and apply ouput (path) to graphics.
path, data = r.render(
read,
'ohlc',
profiler=profiler,
# uppx=1,
use_vr=True,
# graphics=graphics,
# should_redraw=True, # always
)
assert path
graphics = bars
graphics.path = path
graphics.draw_last(data[-1])
if show_bars:
graphics.show()
# NOTE: on appends we used to have to flip the coords
# cache thought it doesn't seem to be required any more?
# graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# graphics.prepareGeometryChange()
graphics.update()
if show_bars:
bars.show()
changed_to_line = False
if (
not in_line
and should_line
):
# change to line graphic
log.info(
f'downsampling to line graphic {self.name}'
)
graphics.hide()
# graphics.update()
bars.hide()
curve.show()
curve.update()
changed_to_line = True
elif in_line and not should_line:
# change to bars graphic
log.info(f'showing bars graphic {self.name}')
curve.hide()
graphics.show()
graphics.update()
bars.show()
bars.update()
# update our pre-downsample-ready data and then pass that
# new data the downsampler algo for incremental update.
# graphics.update_from_array(
# array,
# in_view,
# view_range=(ivl, ivr) if use_vr else None,
# **kwargs,
# )
# generate and apply path to graphics obj
# graphics.path, last = r.render(
# read,
# only_in_view=True,
# )
# graphics.draw_last(last)
draw_last = False
lasts = self.shm.array[-2:]
last = lasts[-1]
if should_line:
return (
curve,
x,
y,
x_iv,
y_iv,
def draw_last():
x, y = lasts['index'], lasts['close']
curve.draw_last(x, y)
else:
draw_last = partial(
bars.draw_last,
last,
)
return (
graphics,
r,
{'read_from_key': False},
draw_last,
should_line,
changed_to_line,
)
def update_step_xy(
src_shm: ShmArray,
@ -484,7 +377,7 @@ class Flow(msgspec.Struct): # , frozen=True):
_render_table: dict[
Optional[int],
tuple[Renderer, pg.GraphicsItem],
] = {}
] = (None, None)
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
@ -672,62 +565,58 @@ class Flow(msgspec.Struct): # , frozen=True):
not in_view.size
or not render
):
# print('exiting early')
return graphics
draw_last: bool = True
slice_to_head: int = -1
input_data = None
# input_data = None
should_redraw: bool = False
rkwargs = {}
bars = False
out: Optional[tuple] = None
if isinstance(graphics, BarItems):
draw_last = False
# XXX: special case where we change out graphics
# to a line after a certain uppx threshold.
out = render_baritems(
(
graphics,
r,
rkwargs,
draw_last,
should_line,
changed_to_line,
) = render_baritems(
self,
graphics,
read,
profiler,
**kwargs,
)
bars = True
should_redraw = changed_to_line or not should_line
if out is None:
return graphics
# return graphics
else:
r = self._src_r
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
last_read=read,
)
# ``FastAppendCurve`` case:
array_key = array_key or self.name
if out is not None:
# hack to handle ds curve from bars above
(
graphics, # curve
x,
y,
x_iv,
y_iv,
) = out
input_data = out[1:]
r = self._src_r
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
# draw_path=gen_ohlc_qpath,
last_read=read,
)
# print(array_key)
# ds update config
new_sample_rate: bool = False
should_redraw: bool = False
should_ds: bool = r._in_ds
showing_src_data: bool = not r._in_ds
if graphics._step_mode:
if getattr(graphics, '_step_mode', False):
r.allocate_xy = to_step_format
r.update_xy = update_step_xy
@ -756,8 +645,6 @@ class Flow(msgspec.Struct): # , frozen=True):
x_last - w, 0,
x_last + w, y_last,
)
# should_redraw = bool(append_diff)
draw_last = False
# downsampling incremental state checking
@ -795,15 +682,11 @@ class Flow(msgspec.Struct): # , frozen=True):
# - determine downsampling ops if needed
# - (incrementally) update ``QPainterPath``
# path = graphics.path
# fast_path = graphics.fast_path
path, data = r.render(
read,
array_key,
profiler,
uppx=uppx,
input_data=input_data,
# use_vr=True,
# TODO: better way to detect and pass this?
@ -816,6 +699,8 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head=slice_to_head,
do_append=do_append,
**rkwargs,
)
# TODO: does this actuallly help us in any way (prolly should
# look at the source / ask ogi).
@ -825,11 +710,16 @@ class Flow(msgspec.Struct): # , frozen=True):
graphics.path = r.path
graphics.fast_path = r.fast_path
if draw_last:
if draw_last and not bars:
# TODO: how to handle this draw last stuff..
x = data['index']
y = data[array_key]
graphics.draw_last(x, y)
profiler('draw last segment')
elif bars and draw_last:
draw_last()
profiler('draw last segment')
graphics.update()
profiler('.update()')
@ -1004,10 +894,8 @@ class Renderer(msgspec.Struct):
profiler: pg.debug.Profiler,
uppx: float = 1,
input_data: Optional[tuple[np.ndarray]] = None,
# redraw and ds flags
should_redraw: bool = True,
should_redraw: bool = False,
new_sample_rate: bool = False,
should_ds: bool = False,
showing_src_data: bool = True,
@ -1018,6 +906,7 @@ class Renderer(msgspec.Struct):
# only render datums "in view" of the ``ChartView``
use_vr: bool = True,
read_from_key: bool = True,
) -> list[QPainterPath]:
'''
@ -1067,7 +956,10 @@ class Renderer(msgspec.Struct):
profiler('allocated xy history')
if prepend_length:
y_prepend = shm._array[array_key][pre_slice]
y_prepend = shm._array[pre_slice]
if read_from_key:
y_prepend = y_prepend[array_key]
xy_data, xy_slice = self.update_xy(
shm,
@ -1092,7 +984,10 @@ class Renderer(msgspec.Struct):
profiler('prepended xy history: {prepend_length}')
if append_length:
y_append = shm._array[array_key][post_slice]
y_append = shm._array[post_slice]
if read_from_key:
y_append = y_append[array_key]
xy_data, xy_slice = self.update_xy(
shm,
@ -1114,43 +1009,22 @@ class Renderer(msgspec.Struct):
if use_vr:
array = in_view
if input_data:
# allow input data passing for now from alt curve updaters.
(
x_out,
y_out,
x_iv,
y_iv,
) = input_data
connect = 'all'
if use_vr:
x_out = x_iv
y_out = y_iv
else:
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
# expected to be incrementally updates and later rendered to
# a more graphics native format.
# if self.data_t:
# array = self.data_t(array)
ivl, ivr = xfirst, xlast
hist = array[:slice_to_head]
(
x_out,
y_out,
connect,
) = self.format_xy(
self,
# TODO: hist here should be the pre-sliced
# x/y_data in the case where allocate_xy is
# defined?
hist,
array_key,
(ivl, ivr),
)
hist = array[:slice_to_head]
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
x_out, y_out, connect = self.format_xy(
self,
# TODO: hist here should be the pre-sliced
# x/y_data in the case where allocate_xy is
# defined?
hist,
array_key,
(ivl, ivr),
)
profiler('sliced input arrays')
@ -1168,7 +1042,7 @@ class Renderer(msgspec.Struct):
zoom_or_append = False
last_vr = self._last_vr
last_ivr = self._last_ivr
last_ivr = self._last_ivr or vl, vr
# incremental in-view data update.
if last_vr:
@ -1203,7 +1077,6 @@ class Renderer(msgspec.Struct):
)
):
should_redraw = True
# print("REDRAWING BRUH")
self._last_vr = view_range
if len(x_out):
@ -1224,7 +1097,7 @@ class Renderer(msgspec.Struct):
or new_sample_rate
or prepend_length > 0
):
# print("REDRAWING BRUH")
if new_sample_rate and showing_src_data:
log.info(f'DEDOWN -> {array_key}')
self._in_ds = False
@ -1252,7 +1125,6 @@ class Renderer(msgspec.Struct):
f'(should_redraw: {should_redraw} '
f'should_ds: {should_ds} new_sample_rate: {new_sample_rate})'
)
# profiler(f'DRAW PATH IN VIEW -> {self.name}')
# TODO: get this piecewise prepend working - right now it's
# giving heck on vwap...
@ -1297,7 +1169,7 @@ class Renderer(msgspec.Struct):
x=new_x,
y=new_y,
connect=connect,
# path=fast_path,
path=fast_path,
)
profiler('generated append qpath')

View File

@ -168,11 +168,10 @@ def gen_ohlc_qpath(
def ohlc_to_line(
ohlc_shm: ShmArray,
data_field: str,
fields: list[str] = ['open', 'high', 'low', 'close']
) -> tuple[
int, # flattened first index
int, # flattened last index
np.ndarray,
np.ndarray,
]:
@ -205,8 +204,6 @@ def ohlc_to_line(
assert y_out.any()
return (
first,
last,
x_out,
y_out,
)