More WIP, implement `BarItems` rendering in `Flow.update_graphics()`

incremental_update_paths
Tyler Goodlet 2022-04-20 11:43:47 -04:00
parent f4dc0fbab8
commit 427a33654b
1 changed files with 312 additions and 88 deletions

View File

@ -23,6 +23,8 @@ incremental update.
''' '''
from __future__ import annotations from __future__ import annotations
from functools import partial
import time
from typing import ( from typing import (
Optional, Optional,
Callable, Callable,
@ -30,6 +32,7 @@ from typing import (
import msgspec import msgspec
import numpy as np import numpy as np
from numpy.lib import recfunctions as rfn
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5.QtGui import QPainterPath from PyQt5.QtGui import QPainterPath
@ -37,6 +40,7 @@ from ..data._sharedmem import (
ShmArray, ShmArray,
# attach_shm_array # attach_shm_array
) )
from .._profile import pg_profile_enabled, ms_slower_then
from ._ohlc import ( from ._ohlc import (
BarItems, BarItems,
gen_qpath, gen_qpath,
@ -46,7 +50,12 @@ from ._curve import (
) )
from ._compression import ( from ._compression import (
ohlc_flatten, ohlc_flatten,
ds_m4,
) )
from ..log import get_logger
log = get_logger(__name__)
# class FlowsTable(msgspec.Struct): # class FlowsTable(msgspec.Struct):
# ''' # '''
@ -72,11 +81,63 @@ from ._compression import (
# return cls(shm) # return cls(shm)
def rowarr_to_path(
rows_array: np.ndarray,
x_basis: np.ndarray,
flow: Flow,
) -> QPainterPath:
# TODO: we could in theory use ``numba`` to flatten
# if needed?
# to 1d
y = rows_array.flatten()
return pg.functions.arrayToQPath(
# these get passed at render call time
x=x_basis[:y.size],
y=y,
connect='all',
finiteCheck=False,
path=flow.path,
)
def ohlc_flat_view(
ohlc_shm: ShmArray,
# XXX: we bind this in currently..
x_basis: np.ndarray,
# vr: Optional[slice] = None,
) -> np.ndarray:
'''
Return flattened-non-copy view into an OHLC shm array.
'''
ohlc = ohlc_shm._array[['open', 'high', 'low', 'close']]
# if vr:
# ohlc = ohlc[vr]
# x = x_basis[vr]
unstructured = rfn.structured_to_unstructured(
ohlc,
copy=False,
)
# breakpoint()
y = unstructured.flatten()
x = x_basis[:y.size]
return x, y
class Flow(msgspec.Struct): # , frozen=True): class Flow(msgspec.Struct): # , frozen=True):
''' '''
(Financial Signal-)Flow compound type which wraps a real-time (Financial Signal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level shm array stream with displayed graphics (curves, charts)
access and control. for high level access and control as well as efficient incremental
update.
The intention is for this type to eventually be capable of shm-passing The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors. of incrementally updated graphics stream data between actors.
@ -89,6 +150,8 @@ class Flow(msgspec.Struct): # , frozen=True):
is_ohlc: bool = False is_ohlc: bool = False
render: bool = True # toggle for display loop render: bool = True # toggle for display loop
flat: Optional[ShmArray] = None
x_basis: Optional[np.ndarray] = None
_last_uppx: float = 0 _last_uppx: float = 0
_in_ds: bool = False _in_ds: bool = False
@ -96,6 +159,7 @@ class Flow(msgspec.Struct): # , frozen=True):
_graphics_tranform_fn: Optional[Callable[ShmArray, np.ndarray]] = None _graphics_tranform_fn: Optional[Callable[ShmArray, np.ndarray]] = None
# map from uppx -> (downsampled data, incremental graphics) # map from uppx -> (downsampled data, incremental graphics)
_src_r: Optional[Renderer] = None
_render_table: dict[ _render_table: dict[
Optional[int], Optional[int],
tuple[Renderer, pg.GraphicsItem], tuple[Renderer, pg.GraphicsItem],
@ -215,7 +279,9 @@ class Flow(msgspec.Struct): # , frozen=True):
int, int, np.ndarray, int, int, np.ndarray,
int, int, np.ndarray, int, int, np.ndarray,
]: ]:
# read call
array = self.shm.array array = self.shm.array
indexes = array['index'] indexes = array['index']
ifirst = indexes[0] ifirst = indexes[0]
ilast = indexes[-1] ilast = indexes[-1]
@ -245,6 +311,8 @@ class Flow(msgspec.Struct): # , frozen=True):
render: bool = True, render: bool = True,
array_key: Optional[str] = None, array_key: Optional[str] = None,
profiler=None,
**kwargs, **kwargs,
) -> pg.GraphicsObject: ) -> pg.GraphicsObject:
@ -253,8 +321,19 @@ class Flow(msgspec.Struct): # , frozen=True):
render to graphics. render to graphics.
''' '''
profiler = profiler or pg.debug.Profiler(
msg=f'Flow.update_graphics() for {self.name}',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
delayed=True,
)
# shm read and slice to view # shm read and slice to view
read = xfirst, xlast, array, ivl, ivr, in_view = self.read() read = (
xfirst, xlast, array,
ivl, ivr, in_view,
) = self.read()
profiler('read src shm data')
if ( if (
not in_view.size not in_view.size
@ -265,100 +344,182 @@ class Flow(msgspec.Struct): # , frozen=True):
graphics = self.graphics graphics = self.graphics
if isinstance(graphics, BarItems): if isinstance(graphics, BarItems):
# ugh, not luvin dis, should we have just a designated # if no source data renderer exists create one.
# instance var? r = self._src_r
r = self._render_table.get('src')
if not r: if not r:
r = Renderer( # OHLC bars path renderer
r = self._src_r = Renderer(
flow=self, flow=self,
draw=gen_qpath, # TODO: rename this to something with ohlc # TODO: rename this to something with ohlc
draw_path=gen_qpath,
last_read=read, last_read=read,
) )
self._render_table['src'] = (r, graphics)
# create a flattened view onto the OHLC array
# which can be read as a line-style format
# shm = self.shm
# self.flat = shm.unstruct_view(['open', 'high', 'low', 'close'])
# import pdbpp
# pdbpp.set_trace()
# x = self.x_basis = (
# np.broadcast_to(
# shm._array['index'][:, None],
# # self.flat._array.shape,
# self.flat.shape,
# ) + np.array([-0.5, 0, 0, 0.5])
# )
ds_curve_r = Renderer( ds_curve_r = Renderer(
flow=self, flow=self,
draw=gen_qpath, # TODO: rename this to something with ohlc
# just swap in the flat view
data_t=lambda array: self.flat.array,
# data_t=partial(
# ohlc_flat_view,
# self.shm,
# ),
last_read=read, last_read=read,
prerender_fn=ohlc_flatten, draw_path=partial(
rowarr_to_path,
x_basis=None,
),
) )
curve = FastAppendCurve(
# y=y,
# x=x,
name='OHLC',
color=graphics._color,
)
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should # baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold. # kick on only when we reach a certain uppx threshold.
self._render_table[0] = ( self._render_table[0] = (
ds_curve_r, ds_curve_r,
FastAppendCurve( curve,
y=y,
x=x,
name='OHLC',
color=self._color,
),
) )
dsc_r, curve = self._render_table[0]
# do checks for whether or not we require downsampling: # do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to # - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update.. # render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to # - if insteam we are in a downsamplig state then we to
# update our pre-downsample-ready data and then pass that x_gt = 8
# new data the downsampler algo for incremental update. uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
should_line
and uppx < x_gt
):
should_line = False
elif (
not should_line
and uppx >= x_gt
):
should_line = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
# start = time.time()
# y = self.shm.unstruct_view(
# ['open', 'high', 'low', 'close'],
# )
# print(f'unstruct diff: {time.time() - start}')
# profiler('read unstr view bars to line')
# # start = self.flat._first.value
# x = self.x_basis[:y.size].flatten()
# y = y.flatten()
# profiler('flattening bars to line')
# path, last = dsc_r.render(read)
# x, flat = ohlc_flat_view(
# ohlc_shm=self.shm,
# x_basis=x_basis,
# )
# y = y.flatten()
# y_iv = y[ivl:ivr].flatten()
# x_iv = x[ivl:ivr].flatten()
# assert y.size == x.size
x, y = self.flat = ohlc_flatten(array)
x_iv, y_iv = ohlc_flatten(in_view)
profiler('flattened OHLC data')
curve.update_from_array(
x,
y,
x_iv=x_iv,
y_iv=y_iv,
view_range=None, # hack
profiler=profiler,
)
profiler('updated ds curve')
else: else:
pass # render incremental or in-view update
# do incremental update # and apply ouput (path) to graphics.
path, last = r.render(
graphics.update_from_array( read,
array, only_in_view=True,
in_view,
view_range=(ivl, ivr) if use_vr else None,
**kwargs,
) )
# generate and apply path to graphics obj graphics.path = path
graphics.path, last = r.render(only_in_view=True)
graphics.draw_last(last) graphics.draw_last(last)
else: # NOTE: on appends we used to have to flip the coords
# should_ds = False # cache thought it doesn't seem to be required any more?
# should_redraw = False # graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# # downsampling incremental state checking graphics.prepareGeometryChange()
# uppx = bars.x_uppx() graphics.update()
# px_width = bars.px_width()
# uppx_diff = (uppx - self._last_uppx)
# if self.renderer is None: if (
# self.renderer = Renderer( not in_line
# flow=self, and should_line
):
# change to line graphic
# if not self._in_ds: log.info(
# # in not currently marked as downsampling graphics f'downsampling to line graphic {self.name}'
# # then only draw the full bars graphic for datums "in )
# # view". graphics.hide()
# graphics.update()
curve.show()
curve.update()
# # check for downsampling conditions elif in_line and not should_line:
# if ( log.info(f'showing bars graphic {self.name}')
# # std m4 downsample conditions curve.hide()
# px_width graphics.show()
# and uppx_diff >= 4 graphics.update()
# or uppx_diff <= -3
# or self._step_mode and abs(uppx_diff) >= 4
# ): # update our pre-downsample-ready data and then pass that
# log.info( # new data the downsampler algo for incremental update.
# f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
# graphics.update_from_array(
# array,
# in_view,
# view_range=(ivl, ivr) if use_vr else None,
# **kwargs,
# ) # )
# self._last_uppx = uppx
# should_ds = True
# elif ( # generate and apply path to graphics obj
# uppx <= 2 # graphics.path, last = r.render(
# and self._in_ds # read,
# ): # only_in_view=True,
# # we should de-downsample back to our original # )
# # source data so we clear our path data in prep # graphics.draw_last(last)
# # to generate a new one from original source data.
# should_redraw = True else:
# should_ds = False # ``FastAppendCurve`` case:
array_key = array_key or self.name array_key = array_key or self.name
@ -376,23 +537,64 @@ class Flow(msgspec.Struct): # , frozen=True):
return graphics return graphics
def xy_downsample(
x,
y,
px_width,
uppx,
x_spacer: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y = ds_m4(
x,
y,
px_width=px_width,
uppx=uppx,
log_scale=bool(uppx)
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
return x, y
class Renderer(msgspec.Struct): class Renderer(msgspec.Struct):
flow: Flow flow: Flow
# called to render path graphics # called to render path graphics
draw: Callable[np.ndarray, QPainterPath] draw_path: Callable[np.ndarray, QPainterPath]
# called on input data but before # called on input data but before any graphics format
prerender_fn: Optional[Callable[ShmArray, np.ndarray]] = None # conversions or processing.
data_t: Optional[Callable[ShmArray, np.ndarray]] = None
data_t_shm: Optional[ShmArray] = None
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
# the ``.draw()`` implementation.
graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None
graphics_t_shm: Optional[ShmArray] = None
# path graphics update implementation methods
prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# last array view read # last array view read
last_read: Optional[np.ndarray] = None last_read: Optional[np.ndarray] = None
# output graphics rendering # output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()``
path: Optional[QPainterPath] = None path: Optional[QPainterPath] = None
# def diff( # def diff(
@ -411,8 +613,10 @@ class Renderer(msgspec.Struct):
def render( def render(
self, self,
new_read,
# only render datums "in view" of the ``ChartView`` # only render datums "in view" of the ``ChartView``
only_in_view: bool = True, only_in_view: bool = False,
) -> list[QPainterPath]: ) -> list[QPainterPath]:
''' '''
@ -428,28 +632,42 @@ class Renderer(msgspec.Struct):
''' '''
# do full source data render to path # do full source data render to path
xfirst, xlast, array, ivl, ivr, in_view = self.last_read last_read = (
xfirst, xlast, array,
ivl, ivr, in_view,
) = self.last_read
if only_in_view: if only_in_view:
# get latest data from flow shm
self.last_read = (
xfirst, xlast, array, ivl, ivr, in_view
) = self.flow.read()
array = in_view array = in_view
# # get latest data from flow shm
# self.last_read = (
# xfirst, xlast, array, ivl, ivr, in_view
# ) = new_read
if self.path is None or in_view: if self.path is None or only_in_view:
# redraw the entire source data if we have either of: # redraw the entire source data if we have either of:
# - no prior path graphic rendered or, # - no prior path graphic rendered or,
# - we always intend to re-render the data only in view # - we always intend to re-render the data only in view
if self.prerender_fn: # data transform: convert source data to a format
array = self.prerender_fn(array) # expected to be incrementally updates and later rendered
# to a more graphics native format.
if self.data_t:
array = self.data_t(array)
hist, last = array[:-1], array[-1] # maybe allocate shm for data transform output
# if self.data_t_shm is None:
# fshm = self.flow.shm
# call path render func on history # shm, opened = maybe_open_shm_array(
self.path = self.draw(hist) # f'{self.flow.name}_data_t',
# # TODO: create entry for each time frame
# dtype=array.dtype,
# readonly=False,
# )
# assert opened
# shm.push(array)
# self.data_t_shm = shm
elif self.path: elif self.path:
print(f'inremental update not supported yet {self.flow.name}') print(f'inremental update not supported yet {self.flow.name}')
@ -459,4 +677,10 @@ class Renderer(msgspec.Struct):
# do path generation for each segment # do path generation for each segment
# and then push into graphics object. # and then push into graphics object.
hist, last = array[:-1], array[-1]
# call path render func on history
self.path = self.draw_path(hist)
self.last_read = new_read
return self.path, last return self.path, last