Add graphics incr-updated "formatter" subsys

After trying to hack epoch indexed time series and failing miserably,
decided to properly factor out all formatting routines into a common
subsystem API: ``IncrementalFormatter`` which provides the interface for
incrementally updating and tracking pre-path-graphics formatted data.

Previously this functionality was mangled into our `Renderer` (which
also does the work of `QPath` generation and update) but splitting it
out also preps for being able to do graphics-buffer downsampling and
caching on a remote host B)

The ``IncrementalFormatter`` (parent type) has the default behaviour of
tracking a single field-array on some source `ShmArray`, updating
a flattened `numpy.ndarray` in-mem allocation, and providing a default
1d conversion for pre-downsampling and path generation.

Changed out of `Renderer`,
- `.allocate_xy()`, `update_xy()` and `format_xy()` all are moved to
  more explicitly named formatter methods.
- all `.x/y_data` nd array management and update
- "last view range" tracking
- `.last_read`, `.diff()`
- now calls `IncrementalFormatter.format_to_1d()` inside `.render()`

The new API gets,
- `.diff()`, `.last_read`
- all view range diff tracking through `.track_inview_range()`.
- better nd format array names: `.x/y_nd`, `xy_nd_start/stop`.
- `.format_to_1d()` which renders pre-path formatted arrays ready for
  both m4 sampling and path gen.
- better explicit overloadable formatting method names:
  * `.allocate_xy()` -> `.allocate_xy_nd()`
  * `.update_xy()` -> `.incr_update_xy_nd()`
  * `.format_xy()` -> `.format_xy_nd_to_1d()`

Finally this implements per-graphics-type formatters which define
each set up related formatting routines:
- `OHLCBarsFmtr`: std multi-line style bars
- `OHLCBarsAsCurveFmtr`: draws an interpolated line for ohlc sampled data
- `StepCurveFmtr`: handles vlm style curves
pre_viz_calls
Tyler Goodlet 2022-11-22 17:28:58 -05:00
parent 4c799386c6
commit d6ae75d743
2 changed files with 809 additions and 596 deletions

View File

@ -25,8 +25,6 @@ incremental update.
from __future__ import annotations from __future__ import annotations
from typing import ( from typing import (
Optional, Optional,
Callable,
Union,
) )
import msgspec import msgspec
@ -43,21 +41,10 @@ from .._profile import (
# ms_slower_then, # ms_slower_then,
) )
from ._pathops import ( from ._pathops import (
by_index_and_key, IncrementalFormatter,
OHLCBarsFmtr, # Plain OHLC renderer
# Plain OHLC renderer OHLCBarsAsCurveFmtr, # OHLC converted to line
gen_ohlc_qpath, StepCurveFmtr, # "step" curve (like for vlm)
# OHLC -> line renderer
ohlc_to_line,
update_ohlc_to_line,
ohlc_flat_to_xy,
# step curve renderer
to_step_format,
update_step_xy,
step_to_xy,
xy_downsample, xy_downsample,
) )
from ._ohlc import ( from ._ohlc import (
@ -76,16 +63,6 @@ from .._profile import Profiler
log = get_logger(__name__) log = get_logger(__name__)
# class FlowsTable(msgspec.Struct):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
def render_baritems( def render_baritems(
flow: Flow, flow: Flow,
graphics: BarItems, graphics: BarItems,
@ -117,21 +94,24 @@ def render_baritems(
r = self._src_r r = self._src_r
if not r: if not r:
show_bars = True show_bars = True
# OHLC bars path renderer # OHLC bars path renderer
r = self._src_r = Renderer( r = self._src_r = Renderer(
flow=self, flow=self,
format_xy=gen_ohlc_qpath, fmtr=OHLCBarsFmtr(
last_read=read, shm=flow.shm,
flow=flow,
_last_read=read,
),
) )
ds_curve_r = Renderer( ds_curve_r = Renderer(
flow=self, flow=self,
last_read=read, fmtr=OHLCBarsAsCurveFmtr(
shm=flow.shm,
# incr update routines flow=flow,
allocate_xy=ohlc_to_line, _last_read=read,
update_xy=update_ohlc_to_line, ),
format_xy=ohlc_flat_to_xy,
) )
curve = FlattenedOHLC( curve = FlattenedOHLC(
@ -228,7 +208,7 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
name: str name: str
plot: pg.PlotItem plot: pg.PlotItem
graphics: Union[Curve, BarItems] graphics: Curve | BarItems
_shm: ShmArray _shm: ShmArray
yrange: tuple[float, float] = None yrange: tuple[float, float] = None
@ -237,7 +217,6 @@ class Flow(msgspec.Struct): # , frozen=True):
# normally this is just a plain line. # normally this is just a plain line.
ds_graphics: Optional[Curve] = None ds_graphics: Optional[Curve] = None
is_ohlc: bool = False is_ohlc: bool = False
render: bool = True # toggle for display loop render: bool = True # toggle for display loop
@ -445,9 +424,14 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head: int = -1 slice_to_head: int = -1
should_redraw: bool = False should_redraw: bool = False
should_line: bool = False
rkwargs = {} rkwargs = {}
should_line = False # TODO: probably specialize ``Renderer`` types instead of
# these logic checks?
# - put these blocks into a `.load_renderer()` meth?
# - consider a OHLCRenderer, StepCurveRenderer, Renderer?
r = self._src_r
if isinstance(graphics, BarItems): if isinstance(graphics, BarItems):
# XXX: special case where we change out graphics # XXX: special case where we change out graphics
# to a line after a certain uppx threshold. # to a line after a certain uppx threshold.
@ -467,14 +451,34 @@ class Flow(msgspec.Struct): # , frozen=True):
should_redraw = changed_to_line or not should_line should_redraw = changed_to_line or not should_line
self._in_ds = should_line self._in_ds = should_line
elif not r:
if isinstance(graphics, StepCurve):
r = self._src_r = Renderer(
flow=self,
fmtr=StepCurveFmtr(
shm=self.shm,
flow=self,
_last_read=read,
),
)
# TODO: append logic inside ``.render()`` isn't
# correct yet for step curves.. remove this to see it.
should_redraw = True
slice_to_head = -2
else: else:
r = self._src_r r = self._src_r
if not r: if not r:
# just using for ``.diff()`` atm.. # just using for ``.diff()`` atm..
r = self._src_r = Renderer( r = self._src_r = Renderer(
flow=self, flow=self,
# TODO: rename this to something with ohlc fmtr=IncrementalFormatter(
last_read=read, shm=self.shm,
flow=self,
_last_read=read,
),
) )
# ``Curve`` derivative case(s): # ``Curve`` derivative case(s):
@ -486,19 +490,6 @@ class Flow(msgspec.Struct): # , frozen=True):
should_ds: bool = r._in_ds should_ds: bool = r._in_ds
showing_src_data: bool = not r._in_ds showing_src_data: bool = not r._in_ds
# step_mode = getattr(graphics, '_step_mode', False)
step_mode = isinstance(graphics, StepCurve)
if step_mode:
r.allocate_xy = to_step_format
r.update_xy = update_step_xy
r.format_xy = step_to_xy
# TODO: append logic inside ``.render()`` isn't
# correct yet for step curves.. remove this to see it.
should_redraw = True
slice_to_head = -2
# downsampling incremental state checking # downsampling incremental state checking
# check for and set std m4 downsample conditions # check for and set std m4 downsample conditions
uppx = graphics.x_uppx() uppx = graphics.x_uppx()
@ -680,34 +671,7 @@ class Flow(msgspec.Struct): # , frozen=True):
class Renderer(msgspec.Struct): class Renderer(msgspec.Struct):
flow: Flow flow: Flow
# last array view read fmtr: IncrementalFormatter
last_read: Optional[tuple] = None
# default just returns index, and named array from data
format_xy: Callable[
[np.ndarray, str],
tuple[np.ndarray]
] = by_index_and_key
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
allocate_xy: Optional[Callable[
[int, slice],
tuple[np.ndarray, np.nd.array]
]] = None
update_xy: Optional[Callable[
[int, slice], None]
] = None
x_data: Optional[np.ndarray] = None
y_data: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
_xy_first: int = 0
_xy_last: int = 0
# output graphics rendering, the main object # output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()`` # processed in ``QGraphicsObject.paint()``
@ -729,58 +693,11 @@ class Renderer(msgspec.Struct):
_last_uppx: float = 0 _last_uppx: float = 0
_in_ds: bool = False _in_ds: bool = False
# incremental update state(s)
_last_vr: Optional[tuple[float, float]] = None
_last_ivr: Optional[tuple[float, float]] = None
def diff(
self,
new_read: tuple[np.ndarray],
) -> tuple[
np.ndarray,
np.ndarray,
]:
(
last_xfirst,
last_xlast,
last_array,
last_ivl,
last_ivr,
last_in_view,
) = self.last_read
# TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read
(
xfirst,
xlast,
array,
ivl,
ivr,
in_view,
) = new_read
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast)
# blah blah blah
# do diffing for prepend, append and last entry
return (
slice(xfirst, last_xfirst),
prepend_length,
append_length,
slice(last_xlast, xlast),
)
def draw_path( def draw_path(
self, self,
x: np.ndarray, x: np.ndarray,
y: np.ndarray, y: np.ndarray,
connect: Union[str, np.ndarray] = 'all', connect: str | np.ndarray = 'all',
path: Optional[QPainterPath] = None, path: Optional[QPainterPath] = None,
redraw: bool = False, redraw: bool = False,
@ -858,166 +775,50 @@ class Renderer(msgspec.Struct):
''' '''
# TODO: can the renderer just call ``Flow.read()`` directly? # TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read # unpack latest source data read
fmtr = self.fmtr
( (
xfirst, _,
xlast, _,
array, array,
ivl, ivl,
ivr, ivr,
in_view, in_view,
) = new_read ) = new_read
(
pre_slice,
prepend_length,
append_length,
post_slice,
) = self.diff(new_read)
if self.update_xy:
shm = self.flow.shm
if self.y_data is None:
# we first need to allocate xy data arrays
# from the source data.
assert self.allocate_xy
self.x_data, self.y_data = self.allocate_xy(
shm,
array_key,
)
self._xy_first = shm._first.value
self._xy_last = shm._last.value
profiler('allocated xy history')
if prepend_length:
y_prepend = shm._array[pre_slice]
if read_from_key:
y_prepend = y_prepend[array_key]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
y_prepend,
pre_slice,
prepend_length,
self._xy_first,
self._xy_last,
is_append=False,
)
self.y_data[xy_slice] = xy_data
self._xy_first = shm._first.value
profiler('prepended xy history: {prepend_length}')
if append_length:
y_append = shm._array[post_slice]
if read_from_key:
y_append = y_append[array_key]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
y_append,
post_slice,
append_length,
self._xy_first,
self._xy_last,
is_append=True,
)
# self.y_data[post_slice] = xy_data
# self.y_data[xy_slice or post_slice] = xy_data
self.y_data[xy_slice] = xy_data
self._xy_last = shm._last.value
profiler('appened xy history: {append_length}')
if use_vr:
array = in_view
# else:
# ivl, ivr = xfirst, xlast
hist = array[:slice_to_head]
# xy-path data transform: convert source data to a format # xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine. # able to be passed to a `QPainterPath` rendering routine.
if not len(hist): fmt_out = fmtr.format_to_1d(
new_read,
array_key,
profiler,
slice_to_head=slice_to_head,
read_src_from_key=read_from_key,
slice_to_inview=use_vr,
)
# no history in view case
if not fmt_out:
# XXX: this might be why the profiler only has exits? # XXX: this might be why the profiler only has exits?
return return
x_out, y_out, connect = self.format_xy( (
self, x_1d,
# TODO: hist here should be the pre-sliced y_1d,
# x/y_data in the case where allocate_xy is connect,
# defined? prepend_length,
hist, append_length,
array_key, view_changed,
(ivl, ivr),
)
profiler('sliced input arrays') ) = fmt_out
if (
use_vr
):
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
view_range = (ivl, ivr)
# print(f'{self._name} vr: {view_range}')
profiler(f'view range slice {view_range}')
vl, vr = view_range
zoom_or_append = False
last_vr = self._last_vr
last_ivr = self._last_ivr or vl, vr
# incremental in-view data update.
if last_vr:
# relative slice indices
lvl, lvr = last_vr
# abs slice indices
al, ar = last_ivr
# left_change = abs(x_iv[0] - al) >= 1
# right_change = abs(x_iv[-1] - ar) >= 1
if (
# likely a zoom view change
(vr - lvr) > 2 or vl < lvl
# append / prepend update
# we had an append update where the view range
# didn't change but the data-viewed (shifted)
# underneath, so we need to redraw.
# or left_change and right_change and last_vr == view_range
# not (left_change and right_change) and ivr
# (
# or abs(x_iv[ivr] - livr) > 1
):
zoom_or_append = True
self._last_vr = view_range
if len(x_out):
self._last_ivr = x_out[0], x_out[slice_to_head]
# redraw conditions # redraw conditions
if ( if (
prepend_length > 0 prepend_length > 0
or new_sample_rate or new_sample_rate
or append_length > 0 or append_length > 0
or zoom_or_append or view_changed
): ):
should_redraw = True should_redraw = True
@ -1039,9 +840,9 @@ class Renderer(msgspec.Struct):
elif should_ds and uppx > 1: elif should_ds and uppx > 1:
x_out, y_out, ymn, ymx = xy_downsample( x_1d, y_1d, ymn, ymx = xy_downsample(
x_out, x_1d,
y_out, y_1d,
uppx, uppx,
) )
self.flow.yrange = ymn, ymx self.flow.yrange = ymn, ymx
@ -1052,8 +853,8 @@ class Renderer(msgspec.Struct):
self._in_ds = True self._in_ds = True
path = self.draw_path( path = self.draw_path(
x=x_out, x=x_1d,
y=y_out, y=y_1d,
connect=connect, connect=connect,
path=path, path=path,
redraw=True, redraw=True,
@ -1088,8 +889,8 @@ class Renderer(msgspec.Struct):
and not should_redraw and not should_redraw
): ):
print(f'{array_key} append len: {append_length}') print(f'{array_key} append len: {append_length}')
new_x = x_out[-append_length - 2:] # slice_to_head] new_x = x_1d[-append_length - 2:] # slice_to_head]
new_y = y_out[-append_length - 2:] # slice_to_head] new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path') profiler('sliced append path')
profiler( profiler(
@ -1137,10 +938,4 @@ class Renderer(msgspec.Struct):
self.path = path self.path = path
self.fast_path = fast_path self.fast_path = fast_path
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self.last_read = new_read
return self.path, array, reset return self.path, array, reset

View File

@ -20,7 +20,6 @@ Super fast ``QPainterPath`` generation related operator routines.
from __future__ import annotations from __future__ import annotations
from typing import ( from typing import (
Optional, Optional,
Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -29,7 +28,7 @@ import numpy as np
from numpy.lib import recfunctions as rfn from numpy.lib import recfunctions as rfn
from numba import njit, float64, int64 # , optional from numba import njit, float64, int64 # , optional
# import pyqtgraph as pg # import pyqtgraph as pg
from PyQt5 import QtGui # from PyQt5 import QtGui
# from PyQt5.QtCore import QLineF, QPointF # from PyQt5.QtCore import QLineF, QPointF
from ..data._sharedmem import ( from ..data._sharedmem import (
@ -41,7 +40,11 @@ from ._compression import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._flows import Renderer from ._flows import (
Renderer,
Flow,
)
from .._profile import Profiler
def by_index_and_key( def by_index_and_key(
@ -59,77 +62,455 @@ def by_index_and_key(
class IncrementalFormatter(msgspec.Struct): class IncrementalFormatter(msgspec.Struct):
'''
Incrementally updating, pre-path-graphics tracking, formatter.
Allows tracking source data state in an updateable pre-graphics
``np.ndarray`` format (in local process memory) as well as
incrementally rendering from that format **to** 1d x/y for path
generation using ``pg.functions.arrayToQPath()``.
'''
shm: ShmArray shm: ShmArray
flow: Flow
# optional pre-graphics xy formatted data which # last read from shm (usually due to an update call)
# is incrementally updated in sync with the source data. _last_read: tuple[
allocate_xy_nd: Optional[Callable[ int,
[int, slice], int,
tuple[np.ndarray, np.nd.array] np.ndarray
]] = None
incr_update_xy_nd: Optional[Callable[ ]
[int, slice], None]
] = None
# default just returns index, and named array from data @property
format_xy_nd_to_1d: Callable[ def last_read(self) -> tuple | None:
[np.ndarray, str], return self._last_read
tuple[np.ndarray]
] = by_index_and_key
x_nd: Optional[np.ndarray] = None def __repr__(self) -> str:
y_nd: Optional[np.ndarray] = None msg = (
f'{type(self)}: ->\n\n'
f'fqsn={self.flow.name}\n'
f'shm_name={self.shm.token["shm_name"]}\n\n'
x_1d: Optional[np.ndarray] = None f'last_vr={self._last_vr}\n'
y_1d: Optional[np.ndarray] = None f'last_ivdr={self._last_ivdr}\n\n'
# indexes which slice into the above arrays (which are allocated f'xy_nd_start={self.xy_nd_start}\n'
# based on source data shm input size) and allow retrieving f'xy_nd_stop={self.xy_nd_stop}\n\n'
# incrementally updated data. )
# _xy_first: int = 0
# _xy_last: int = 0
xy_nd_start: int = 0
xy_nd_end: int = 0
x_nd_len = 0
y_nd_len = 0
if self.x_nd is not None:
x_nd_len = len(self.x_nd)
y_nd_len = len(self.y_nd)
def xy_downsample( msg += (
x, f'x_nd_len={x_nd_len}\n'
y, f'y_nd_len={y_nd_len}\n'
uppx, )
x_spacer: float = 0.5, return msg
def diff(
self,
new_read: tuple[np.ndarray],
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
np.ndarray, np.ndarray,
float,
float,
]: ]:
(
last_xfirst,
last_xlast,
last_array,
last_ivl,
last_ivr,
last_in_view,
) = self.last_read
# downsample whenever more then 1 pixels per datum can be shown. # TODO: can the renderer just call ``Flow.read()`` directly?
# always refresh data bounds until we get diffing # unpack latest source data read
# working properly, see above.. (
bins, x, y, ymn, ymx = ds_m4( xfirst,
x, xlast,
y, array,
uppx, ivl,
ivr,
in_view,
) = new_read
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast)
# blah blah blah
# do diffing for prepend, append and last entry
return (
slice(xfirst, last_xfirst),
prepend_length,
append_length,
slice(last_xlast, xlast),
) )
# flatten output to 1d arrays suitable for path-graphics generation. # Incrementally updated xy ndarray formatted data, a pre-1d
x = np.broadcast_to(x[:, None], y.shape) # format which is updated and cached independently of the final
x = (x + np.array( # pre-graphics-path 1d format.
[-x_spacer, 0, 0, x_spacer] x_nd: Optional[np.ndarray] = None
)).flatten() y_nd: Optional[np.ndarray] = None
y = y.flatten()
return x, y, ymn, ymx # indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
xy_nd_start: int = 0
xy_nd_stop: int = 0
# TODO: eventually incrementally update 1d-pre-graphics path data?
# x_1d: Optional[np.ndarray] = None
# y_1d: Optional[np.ndarray] = None
# incremental view-change state(s) tracking
_last_vr: tuple[float, float] | None = None
_last_ivdr: tuple[float, float] | None = None
def _track_inview_range(
self,
view_range: tuple[int, int],
) -> bool:
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
vl, vr = view_range
zoom_or_append = False
last_vr = self._last_vr
# incremental in-view data update.
if last_vr:
lvl, lvr = last_vr # relative slice indices
# TODO: detecting more specifically the interaction changes
# last_ivr = self._last_ivdr or (vl, vr)
# al, ar = last_ivr # abs slice indices
# left_change = abs(x_iv[0] - al) >= 1
# right_change = abs(x_iv[-1] - ar) >= 1
# likely a zoom/pan view change or data append update
if (
(vr - lvr) > 2
or vl < lvl
# append / prepend update
# we had an append update where the view range
# didn't change but the data-viewed (shifted)
# underneath, so we need to redraw.
# or left_change and right_change and last_vr == view_range
# not (left_change and right_change) and ivr
# (
# or abs(x_iv[ivr] - livr) > 1
):
zoom_or_append = True
self._last_vr = view_range
return zoom_or_append
def format_to_1d(
self,
new_read: tuple,
array_key: str,
profiler: Profiler,
slice_to_head: int = -1,
read_src_from_key: bool = True,
slice_to_inview: bool = True,
) -> tuple[
np.ndarray,
np.ndarray,
]:
shm = self.shm
(
_,
_,
array,
ivl,
ivr,
in_view,
) = new_read
(
pre_slice,
prepend_len,
append_len,
post_slice,
) = self.diff(new_read)
if self.y_nd is None:
# we first need to allocate xy data arrays
# from the source data.
self.x_nd, self.y_nd = self.allocate_xy_nd(
shm,
array_key,
)
self.xy_nd_start = shm._first.value
self.xy_nd_stop = shm._last.value
profiler('allocated xy history')
if prepend_len:
y_prepend = shm._array[pre_slice]
if read_src_from_key:
y_prepend = y_prepend[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm,
array_key,
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
y_prepend,
pre_slice,
prepend_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=False,
)
# y_nd_view = self.y_nd[y_nd_slc]
self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
self.xy_nd_start = shm._first.value
profiler('prepended xy history: {prepend_length}')
if append_len:
y_append = shm._array[post_slice]
if read_src_from_key:
y_append = y_append[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm,
array_key,
y_append,
post_slice,
append_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=True,
)
# self.y_nd[post_slice] = new_y_nd
# self.y_nd[xy_slice or post_slice] = xy_data
self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
self.xy_nd_stop = shm._last.value
profiler('appened xy history: {append_length}')
view_changed: bool = False
view_range: tuple[int, int] = (ivl, ivr)
if slice_to_inview:
view_changed = self._track_inview_range(view_range)
array = in_view
profiler(f'{self.flow.name} view range slice {view_range}')
hist = array[:slice_to_head]
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
if not len(hist):
# XXX: this might be why the profiler only has exits?
return
# TODO: hist here should be the pre-sliced
# x/y_data in the case where allocate_xy is
# defined?
x_1d, y_1d, connect = self.format_xy_nd_to_1d(
hist,
array_key,
view_range,
)
# update the last "in view data range"
if len(x_1d):
self._last_ivdr = x_1d[0], x_1d[slice_to_head]
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self._last_read = new_read
profiler('.format_to_1d()')
return (
x_1d,
y_1d,
connect,
prepend_len,
append_len,
view_changed,
)
###############################
# Sub-type override interface #
###############################
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
# XXX: was ``.allocate_xy()``
def allocate_xy_nd(
self,
src_shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert the structured-array ``src_shm`` format to
a equivalently shaped (and field-less) ``np.ndarray``.
Eg. a 4 field x N struct-array => (N, 4)
'''
y_nd = src_shm._array[data_field].copy()
x_nd = src_shm._array[index_field].copy()
return x_nd, y_nd
# XXX: was ``.update_xy()``
def incr_update_xy_nd(
self,
src_shm: ShmArray,
data_field: str,
new_from_src: np.ndarray, # portion of source that was updated
read_slc: slice,
ln: int, # len of updated
nd_start: int,
nd_stop: int,
is_append: bool,
index_field: str = 'index',
) -> tuple[
np.ndarray,
slice,
]:
# write pushed data to flattened copy
new_y_nd = new_from_src
# XXX
# TODO: this should be returned and written by caller!
# XXX
# generate same-valued-per-row x support based on y shape
if index_field != 'index':
self.x_nd[read_slc, :] = new_from_src[index_field]
return new_y_nd, read_slc
# XXX: was ``.format_xy()``
def format_xy_nd_to_1d(
self,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray, # 1d x
np.ndarray, # 1d y
np.ndarray | str, # connection array/style
]:
'''
Default xy-nd array to 1d pre-graphics-path render routine.
Return single field column data verbatim
'''
return (
array['index'],
array[array_key],
# 1d connection array or style-key to
# ``pg.functions.arrayToQPath()``
'all',
)
class OHLCBarsFmtr(IncrementalFormatter):
fields: list[str] = ['open', 'high', 'low', 'close']
def allocate_xy_nd(
self,
ohlc_shm: ShmArray,
data_field: str,
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert an input struct-array holding OHLC samples into a pair of
flattened x, y arrays with the same size (datums wise) as the source
data.
'''
y_nd = ohlc_shm.ustruct(self.fields)
# generate an flat-interpolated x-domain
x_nd = (
np.broadcast_to(
ohlc_shm._array['index'][:, None],
(
ohlc_shm._array.size,
# 4, # only ohlc
y_nd.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_nd.any()
# write pushed data to flattened copy
return (
x_nd,
y_nd,
)
@staticmethod
@njit( @njit(
# TODO: for now need to construct this manually for readonly arrays, see # TODO: for now need to construct this manually for readonly
# https://github.com/numba/numba/issues/4511 # arrays, see https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))( # ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous # numba_ohlc_dtype[::1], # contiguous
# int64, # int64,
@ -142,7 +523,11 @@ def path_arrays_from_ohlc(
start: int64, start: int64,
bar_gap: float64 = 0.43, bar_gap: float64 = 0.43,
) -> np.ndarray: ) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
''' '''
Generate an array of lines objects from input ohlc data. Generate an array of lines objects from input ohlc data.
@ -198,110 +583,93 @@ def path_arrays_from_ohlc(
return x, y, c return x, y, c
# TODO: can we drop this frame and just use the above?
def format_xy_nd_to_1d(
self,
def gen_ohlc_qpath( array: np.ndarray,
r: Renderer, array_key: str,
data: np.ndarray,
array_key: str, # we ignore this
vr: tuple[int, int], vr: tuple[int, int],
start: int = 0, # XXX: do we need this? start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap # 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43, w: float = 0.43,
) -> QtGui.QPainterPath: ) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
''' '''
More or less direct proxy to ``path_arrays_from_ohlc()`` More or less direct proxy to the ``numba``-fied
but with closed in kwargs for line spacing. ``path_arrays_from_ohlc()`` (above) but with closed in kwargs
for line spacing.
''' '''
x, y, c = path_arrays_from_ohlc( x, y, c = self.path_arrays_from_ohlc(
data, array,
start, start,
bar_gap=w, bar_gap=w,
) )
return x, y, c return x, y, c
def incr_update_xy_nd(
self,
def ohlc_to_line( src_shm: ShmArray,
ohlc_shm: ShmArray,
data_field: str, data_field: str,
fields: list[str] = ['open', 'high', 'low', 'close']
new_from_src: np.ndarray, # portion of source that was updated
read_slc: slice,
ln: int, # len of updated
nd_start: int,
nd_stop: int,
is_append: bool,
index_field: str = 'index',
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
np.ndarray, slice,
]: ]:
''' # write newly pushed data to flattened copy
Convert an input struct-array holding OHLC samples into a pair of # a struct-arr is always passed in.
flattened x, y arrays with the same size (datums wise) as the source new_y_nd = rfn.structured_to_unstructured(
data. new_from_src[self.fields]
'''
y_out = ohlc_shm.ustruct(fields)
first = ohlc_shm._first.value
last = ohlc_shm._last.value
# write pushed data to flattened copy
y_out[first:last] = rfn.structured_to_unstructured(
ohlc_shm.array[fields]
) )
# generate an flat-interpolated x-domain # XXX
x_out = ( # TODO: this should be returned and written by caller!
np.broadcast_to( # XXX
ohlc_shm._array['index'][:, None], # generate same-valued-per-row x support based on y shape
( if index_field != 'index':
ohlc_shm._array.size, self.x_nd[read_slc, :] = new_from_src[index_field]
# 4, # only ohlc
y_out.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_out.any()
return ( return new_y_nd, read_slc
x_out,
y_out,
)
def update_ohlc_to_line( class OHLCBarsAsCurveFmtr(OHLCBarsFmtr):
src_shm: ShmArray,
array_key: str,
src_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> np.ndarray: def format_xy_nd_to_1d(
self,
fields = ['open', 'high', 'low', 'close']
return (
rfn.structured_to_unstructured(src_update[fields]),
slc,
)
def ohlc_flat_to_xy(
r: Renderer,
array: np.ndarray, array: np.ndarray,
array_key: str, array_key: str,
vr: tuple[int, int], vr: tuple[int, int],
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
np.nd.array, np.ndarray,
str, str,
]: ]:
# TODO: in the case of an existing ``.update_xy()`` # TODO: in the case of an existing ``.update_xy()``
# should we be passing in array as an xy arrays tuple? # should we be passing in array as an xy arrays tuple?
# 2 more datum-indexes to capture zero at end # 2 more datum-indexes to capture zero at end
x_flat = r.x_data[r._xy_first:r._xy_last] x_flat = self.x_nd[self.xy_nd_start:self.xy_nd_stop]
y_flat = r.y_data[r._xy_first:r._xy_last] y_flat = self.y_nd[self.xy_nd_start:self.xy_nd_stop]
# slice to view # slice to view
ivl, ivr = vr ivl, ivr = vr
@ -315,12 +683,20 @@ def ohlc_flat_to_xy(
return x_iv, y_iv, 'all' return x_iv, y_iv, 'all'
def to_step_format( class StepCurveFmtr(IncrementalFormatter):
def allocate_xy_nd(
self,
shm: ShmArray, shm: ShmArray,
data_field: str, data_field: str,
index_field: str = 'index', index_field: str = 'index',
) -> tuple[int, np.ndarray, np.ndarray]: ) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
''' '''
Convert an input 1d shm array to a "step array" format Convert an input 1d shm array to a "step array" format
for use by path graphics generation. for use by path graphics generation.
@ -341,19 +717,25 @@ def to_step_format(
y_out[0, 0] = 0 y_out[0, 0] = 0
return x_out, y_out return x_out, y_out
def incr_update_xy_nd(
self,
def update_step_xy(
src_shm: ShmArray, src_shm: ShmArray,
array_key: str, array_key: str,
y_update: np.ndarray,
src_update: np.ndarray, # portion of source that was updated
slc: slice, slc: slice,
ln: int, ln: int, # len of updated
first: int, first: int,
last: int, last: int,
is_append: bool, is_append: bool,
) -> np.ndarray: ) -> tuple[
np.ndarray,
slice,
]:
# for a step curve we slice from one datum prior # for a step curve we slice from one datum prior
# to the current "update slice" to get the previous # to the current "update slice" to get the previous
# "level". # "level".
@ -364,7 +746,7 @@ def update_step_xy(
slc = slice(start, end) slc = slice(start, end)
else: else:
new_y = y_update new_y = src_update
return ( return (
np.broadcast_to( np.broadcast_to(
@ -373,22 +755,21 @@ def update_step_xy(
slc, slc,
) )
def format_xy_nd_to_1d(
self,
def step_to_xy(
r: Renderer,
array: np.ndarray, array: np.ndarray,
array_key: str, array_key: str,
vr: tuple[int, int], vr: tuple[int, int],
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
np.nd.array, np.ndarray,
str, str,
]: ]:
# 2 more datum-indexes to capture zero at end # 2 more datum-indexes to capture zero at end
x_step = r.x_data[r._xy_first:r._xy_last+2] x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2]
y_step = r.y_data[r._xy_first:r._xy_last+2] y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2]
lasts = array[['index', array_key]] lasts = array[['index', array_key]]
last = lasts[array_key][-1] last = lasts[array_key][-1]
@ -411,3 +792,40 @@ def step_to_xy(
# ) # )
return x_iv, y_iv, 'all' return x_iv, y_iv, 'all'
def xy_downsample(
x,
y,
uppx,
x_spacer: float = 0.5,
) -> tuple[
np.ndarray,
np.ndarray,
float,
float,
]:
'''
Downsample 1D (flat ``numpy.ndarray``) arrays using M4 given an input
``uppx`` (units-per-pixel) and add space between discreet datums.
'''
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y, ymn, ymx = ds_m4(
x,
y,
uppx,
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
return x, y, ymn, ymx