Add graphics incr-updated "formatter" subsys

After trying to hack epoch indexed time series and failing miserably,
decided to properly factor out all formatting routines into a common
subsystem API: ``IncrementalFormatter`` which provides the interface for
incrementally updating and tracking pre-path-graphics formatted data.

Previously this functionality was mangled into our `Renderer` (which
also does the work of `QPath` generation and update) but splitting it
out also preps for being able to do graphics-buffer downsampling and
caching on a remote host B)

The ``IncrementalFormatter`` (parent type) has the default behaviour of
tracking a single field-array on some source `ShmArray`, updating
a flattened `numpy.ndarray` in-mem allocation, and providing a default
1d conversion for pre-downsampling and path generation.

Changed out of `Renderer`,
- `.allocate_xy()`, `update_xy()` and `format_xy()` all are moved to
  more explicitly named formatter methods.
- all `.x/y_data` nd array management and update
- "last view range" tracking
- `.last_read`, `.diff()`
- now calls `IncrementalFormatter.format_to_1d()` inside `.render()`

The new API gets,
- `.diff()`, `.last_read`
- all view range diff tracking through `.track_inview_range()`.
- better nd format array names: `.x/y_nd`, `xy_nd_start/stop`.
- `.format_to_1d()` which renders pre-path formatted arrays ready for
  both m4 sampling and path gen.
- better explicit overloadable formatting method names:
  * `.allocate_xy()` -> `.allocate_xy_nd()`
  * `.update_xy()` -> `.incr_update_xy_nd()`
  * `.format_xy()` -> `.format_xy_nd_to_1d()`

Finally this implements per-graphics-type formatters which define
each set up related formatting routines:
- `OHLCBarsFmtr`: std multi-line style bars
- `OHLCBarsAsCurveFmtr`: draws an interpolated line for ohlc sampled data
- `StepCurveFmtr`: handles vlm style curves
epoch_index
Tyler Goodlet 2022-11-22 17:28:58 -05:00
parent 0db5451e47
commit ab1f15506d
2 changed files with 809 additions and 596 deletions

View File

@ -25,8 +25,6 @@ incremental update.
from __future__ import annotations
from typing import (
Optional,
Callable,
Union,
)
import msgspec
@ -43,21 +41,10 @@ from .._profile import (
# ms_slower_then,
)
from ._pathops import (
by_index_and_key,
# Plain OHLC renderer
gen_ohlc_qpath,
# OHLC -> line renderer
ohlc_to_line,
update_ohlc_to_line,
ohlc_flat_to_xy,
# step curve renderer
to_step_format,
update_step_xy,
step_to_xy,
IncrementalFormatter,
OHLCBarsFmtr, # Plain OHLC renderer
OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm)
xy_downsample,
)
from ._ohlc import (
@ -76,16 +63,6 @@ from .._profile import Profiler
log = get_logger(__name__)
# class FlowsTable(msgspec.Struct):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
def render_baritems(
flow: Flow,
graphics: BarItems,
@ -117,21 +94,24 @@ def render_baritems(
r = self._src_r
if not r:
show_bars = True
# OHLC bars path renderer
r = self._src_r = Renderer(
flow=self,
format_xy=gen_ohlc_qpath,
last_read=read,
fmtr=OHLCBarsFmtr(
shm=flow.shm,
flow=flow,
_last_read=read,
),
)
ds_curve_r = Renderer(
flow=self,
last_read=read,
# incr update routines
allocate_xy=ohlc_to_line,
update_xy=update_ohlc_to_line,
format_xy=ohlc_flat_to_xy,
fmtr=OHLCBarsAsCurveFmtr(
shm=flow.shm,
flow=flow,
_last_read=read,
),
)
curve = FlattenedOHLC(
@ -228,7 +208,7 @@ class Flow(msgspec.Struct): # , frozen=True):
'''
name: str
plot: pg.PlotItem
graphics: Union[Curve, BarItems]
graphics: Curve | BarItems
_shm: ShmArray
yrange: tuple[float, float] = None
@ -237,7 +217,6 @@ class Flow(msgspec.Struct): # , frozen=True):
# normally this is just a plain line.
ds_graphics: Optional[Curve] = None
is_ohlc: bool = False
render: bool = True # toggle for display loop
@ -445,9 +424,14 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head: int = -1
should_redraw: bool = False
should_line: bool = False
rkwargs = {}
should_line = False
# TODO: probably specialize ``Renderer`` types instead of
# these logic checks?
# - put these blocks into a `.load_renderer()` meth?
# - consider a OHLCRenderer, StepCurveRenderer, Renderer?
r = self._src_r
if isinstance(graphics, BarItems):
# XXX: special case where we change out graphics
# to a line after a certain uppx threshold.
@ -467,14 +451,34 @@ class Flow(msgspec.Struct): # , frozen=True):
should_redraw = changed_to_line or not should_line
self._in_ds = should_line
elif not r:
if isinstance(graphics, StepCurve):
r = self._src_r = Renderer(
flow=self,
fmtr=StepCurveFmtr(
shm=self.shm,
flow=self,
_last_read=read,
),
)
# TODO: append logic inside ``.render()`` isn't
# correct yet for step curves.. remove this to see it.
should_redraw = True
slice_to_head = -2
else:
r = self._src_r
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
last_read=read,
fmtr=IncrementalFormatter(
shm=self.shm,
flow=self,
_last_read=read,
),
)
# ``Curve`` derivative case(s):
@ -486,19 +490,6 @@ class Flow(msgspec.Struct): # , frozen=True):
should_ds: bool = r._in_ds
showing_src_data: bool = not r._in_ds
# step_mode = getattr(graphics, '_step_mode', False)
step_mode = isinstance(graphics, StepCurve)
if step_mode:
r.allocate_xy = to_step_format
r.update_xy = update_step_xy
r.format_xy = step_to_xy
# TODO: append logic inside ``.render()`` isn't
# correct yet for step curves.. remove this to see it.
should_redraw = True
slice_to_head = -2
# downsampling incremental state checking
# check for and set std m4 downsample conditions
uppx = graphics.x_uppx()
@ -680,34 +671,7 @@ class Flow(msgspec.Struct): # , frozen=True):
class Renderer(msgspec.Struct):
flow: Flow
# last array view read
last_read: Optional[tuple] = None
# default just returns index, and named array from data
format_xy: Callable[
[np.ndarray, str],
tuple[np.ndarray]
] = by_index_and_key
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
allocate_xy: Optional[Callable[
[int, slice],
tuple[np.ndarray, np.nd.array]
]] = None
update_xy: Optional[Callable[
[int, slice], None]
] = None
x_data: Optional[np.ndarray] = None
y_data: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
_xy_first: int = 0
_xy_last: int = 0
fmtr: IncrementalFormatter
# output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()``
@ -729,58 +693,11 @@ class Renderer(msgspec.Struct):
_last_uppx: float = 0
_in_ds: bool = False
# incremental update state(s)
_last_vr: Optional[tuple[float, float]] = None
_last_ivr: Optional[tuple[float, float]] = None
def diff(
self,
new_read: tuple[np.ndarray],
) -> tuple[
np.ndarray,
np.ndarray,
]:
(
last_xfirst,
last_xlast,
last_array,
last_ivl,
last_ivr,
last_in_view,
) = self.last_read
# TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read
(
xfirst,
xlast,
array,
ivl,
ivr,
in_view,
) = new_read
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast)
# blah blah blah
# do diffing for prepend, append and last entry
return (
slice(xfirst, last_xfirst),
prepend_length,
append_length,
slice(last_xlast, xlast),
)
def draw_path(
self,
x: np.ndarray,
y: np.ndarray,
connect: Union[str, np.ndarray] = 'all',
connect: str | np.ndarray = 'all',
path: Optional[QPainterPath] = None,
redraw: bool = False,
@ -858,166 +775,50 @@ class Renderer(msgspec.Struct):
'''
# TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read
fmtr = self.fmtr
(
xfirst,
xlast,
_,
_,
array,
ivl,
ivr,
in_view,
) = new_read
(
pre_slice,
prepend_length,
append_length,
post_slice,
) = self.diff(new_read)
if self.update_xy:
shm = self.flow.shm
if self.y_data is None:
# we first need to allocate xy data arrays
# from the source data.
assert self.allocate_xy
self.x_data, self.y_data = self.allocate_xy(
shm,
array_key,
)
self._xy_first = shm._first.value
self._xy_last = shm._last.value
profiler('allocated xy history')
if prepend_length:
y_prepend = shm._array[pre_slice]
if read_from_key:
y_prepend = y_prepend[array_key]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
y_prepend,
pre_slice,
prepend_length,
self._xy_first,
self._xy_last,
is_append=False,
)
self.y_data[xy_slice] = xy_data
self._xy_first = shm._first.value
profiler('prepended xy history: {prepend_length}')
if append_length:
y_append = shm._array[post_slice]
if read_from_key:
y_append = y_append[array_key]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
y_append,
post_slice,
append_length,
self._xy_first,
self._xy_last,
is_append=True,
)
# self.y_data[post_slice] = xy_data
# self.y_data[xy_slice or post_slice] = xy_data
self.y_data[xy_slice] = xy_data
self._xy_last = shm._last.value
profiler('appened xy history: {append_length}')
if use_vr:
array = in_view
# else:
# ivl, ivr = xfirst, xlast
hist = array[:slice_to_head]
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
if not len(hist):
fmt_out = fmtr.format_to_1d(
new_read,
array_key,
profiler,
slice_to_head=slice_to_head,
read_src_from_key=read_from_key,
slice_to_inview=use_vr,
)
# no history in view case
if not fmt_out:
# XXX: this might be why the profiler only has exits?
return
x_out, y_out, connect = self.format_xy(
self,
# TODO: hist here should be the pre-sliced
# x/y_data in the case where allocate_xy is
# defined?
hist,
array_key,
(ivl, ivr),
)
(
x_1d,
y_1d,
connect,
prepend_length,
append_length,
view_changed,
profiler('sliced input arrays')
if (
use_vr
):
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
view_range = (ivl, ivr)
# print(f'{self._name} vr: {view_range}')
profiler(f'view range slice {view_range}')
vl, vr = view_range
zoom_or_append = False
last_vr = self._last_vr
last_ivr = self._last_ivr or vl, vr
# incremental in-view data update.
if last_vr:
# relative slice indices
lvl, lvr = last_vr
# abs slice indices
al, ar = last_ivr
# left_change = abs(x_iv[0] - al) >= 1
# right_change = abs(x_iv[-1] - ar) >= 1
if (
# likely a zoom view change
(vr - lvr) > 2 or vl < lvl
# append / prepend update
# we had an append update where the view range
# didn't change but the data-viewed (shifted)
# underneath, so we need to redraw.
# or left_change and right_change and last_vr == view_range
# not (left_change and right_change) and ivr
# (
# or abs(x_iv[ivr] - livr) > 1
):
zoom_or_append = True
self._last_vr = view_range
if len(x_out):
self._last_ivr = x_out[0], x_out[slice_to_head]
) = fmt_out
# redraw conditions
if (
prepend_length > 0
or new_sample_rate
or append_length > 0
or zoom_or_append
or view_changed
):
should_redraw = True
@ -1039,9 +840,9 @@ class Renderer(msgspec.Struct):
elif should_ds and uppx > 1:
x_out, y_out, ymn, ymx = xy_downsample(
x_out,
y_out,
x_1d, y_1d, ymn, ymx = xy_downsample(
x_1d,
y_1d,
uppx,
)
self.flow.yrange = ymn, ymx
@ -1052,8 +853,8 @@ class Renderer(msgspec.Struct):
self._in_ds = True
path = self.draw_path(
x=x_out,
y=y_out,
x=x_1d,
y=y_1d,
connect=connect,
path=path,
redraw=True,
@ -1088,8 +889,8 @@ class Renderer(msgspec.Struct):
and not should_redraw
):
print(f'{array_key} append len: {append_length}')
new_x = x_out[-append_length - 2:] # slice_to_head]
new_y = y_out[-append_length - 2:] # slice_to_head]
new_x = x_1d[-append_length - 2:] # slice_to_head]
new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path')
profiler(
@ -1137,10 +938,4 @@ class Renderer(msgspec.Struct):
self.path = path
self.fast_path = fast_path
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self.last_read = new_read
return self.path, array, reset

View File

@ -20,7 +20,6 @@ Super fast ``QPainterPath`` generation related operator routines.
from __future__ import annotations
from typing import (
Optional,
Callable,
TYPE_CHECKING,
)
@ -29,7 +28,7 @@ import numpy as np
from numpy.lib import recfunctions as rfn
from numba import njit, float64, int64 # , optional
# import pyqtgraph as pg
from PyQt5 import QtGui
# from PyQt5 import QtGui
# from PyQt5.QtCore import QLineF, QPointF
from ..data._sharedmem import (
@ -41,7 +40,11 @@ from ._compression import (
)
if TYPE_CHECKING:
from ._flows import Renderer
from ._flows import (
Renderer,
Flow,
)
from .._profile import Profiler
def by_index_and_key(
@ -59,90 +62,472 @@ def by_index_and_key(
class IncrementalFormatter(msgspec.Struct):
'''
Incrementally updating, pre-path-graphics tracking, formatter.
Allows tracking source data state in an updateable pre-graphics
``np.ndarray`` format (in local process memory) as well as
incrementally rendering from that format **to** 1d x/y for path
generation using ``pg.functions.arrayToQPath()``.
'''
shm: ShmArray
flow: Flow
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
allocate_xy_nd: Optional[Callable[
[int, slice],
tuple[np.ndarray, np.nd.array]
]] = None
# last read from shm (usually due to an update call)
_last_read: tuple[
int,
int,
np.ndarray
incr_update_xy_nd: Optional[Callable[
[int, slice], None]
] = None
]
# default just returns index, and named array from data
format_xy_nd_to_1d: Callable[
[np.ndarray, str],
tuple[np.ndarray]
] = by_index_and_key
@property
def last_read(self) -> tuple | None:
return self._last_read
def __repr__(self) -> str:
msg = (
f'{type(self)}: ->\n\n'
f'fqsn={self.flow.name}\n'
f'shm_name={self.shm.token["shm_name"]}\n\n'
f'last_vr={self._last_vr}\n'
f'last_ivdr={self._last_ivdr}\n\n'
f'xy_nd_start={self.xy_nd_start}\n'
f'xy_nd_stop={self.xy_nd_stop}\n\n'
)
x_nd_len = 0
y_nd_len = 0
if self.x_nd is not None:
x_nd_len = len(self.x_nd)
y_nd_len = len(self.y_nd)
msg += (
f'x_nd_len={x_nd_len}\n'
f'y_nd_len={y_nd_len}\n'
)
return msg
def diff(
self,
new_read: tuple[np.ndarray],
) -> tuple[
np.ndarray,
np.ndarray,
]:
(
last_xfirst,
last_xlast,
last_array,
last_ivl,
last_ivr,
last_in_view,
) = self.last_read
# TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read
(
xfirst,
xlast,
array,
ivl,
ivr,
in_view,
) = new_read
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast)
# blah blah blah
# do diffing for prepend, append and last entry
return (
slice(xfirst, last_xfirst),
prepend_length,
append_length,
slice(last_xlast, xlast),
)
# Incrementally updated xy ndarray formatted data, a pre-1d
# format which is updated and cached independently of the final
# pre-graphics-path 1d format.
x_nd: Optional[np.ndarray] = None
y_nd: Optional[np.ndarray] = None
x_1d: Optional[np.ndarray] = None
y_1d: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
# _xy_first: int = 0
# _xy_last: int = 0
xy_nd_start: int = 0
xy_nd_end: int = 0
xy_nd_stop: int = 0
# TODO: eventually incrementally update 1d-pre-graphics path data?
# x_1d: Optional[np.ndarray] = None
# y_1d: Optional[np.ndarray] = None
def xy_downsample(
x,
y,
uppx,
# incremental view-change state(s) tracking
_last_vr: tuple[float, float] | None = None
_last_ivdr: tuple[float, float] | None = None
x_spacer: float = 0.5,
def _track_inview_range(
self,
view_range: tuple[int, int],
) -> tuple[
) -> bool:
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
vl, vr = view_range
zoom_or_append = False
last_vr = self._last_vr
# incremental in-view data update.
if last_vr:
lvl, lvr = last_vr # relative slice indices
# TODO: detecting more specifically the interaction changes
# last_ivr = self._last_ivdr or (vl, vr)
# al, ar = last_ivr # abs slice indices
# left_change = abs(x_iv[0] - al) >= 1
# right_change = abs(x_iv[-1] - ar) >= 1
# likely a zoom/pan view change or data append update
if (
(vr - lvr) > 2
or vl < lvl
# append / prepend update
# we had an append update where the view range
# didn't change but the data-viewed (shifted)
# underneath, so we need to redraw.
# or left_change and right_change and last_vr == view_range
# not (left_change and right_change) and ivr
# (
# or abs(x_iv[ivr] - livr) > 1
):
zoom_or_append = True
self._last_vr = view_range
return zoom_or_append
def format_to_1d(
self,
new_read: tuple,
array_key: str,
profiler: Profiler,
slice_to_head: int = -1,
read_src_from_key: bool = True,
slice_to_inview: bool = True,
) -> tuple[
np.ndarray,
np.ndarray,
float,
float,
]:
]:
shm = self.shm
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y, ymn, ymx = ds_m4(
x,
y,
uppx,
(
_,
_,
array,
ivl,
ivr,
in_view,
) = new_read
(
pre_slice,
prepend_len,
append_len,
post_slice,
) = self.diff(new_read)
if self.y_nd is None:
# we first need to allocate xy data arrays
# from the source data.
self.x_nd, self.y_nd = self.allocate_xy_nd(
shm,
array_key,
)
self.xy_nd_start = shm._first.value
self.xy_nd_stop = shm._last.value
profiler('allocated xy history')
if prepend_len:
y_prepend = shm._array[pre_slice]
if read_src_from_key:
y_prepend = y_prepend[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm,
array_key,
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
y_prepend,
pre_slice,
prepend_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=False,
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
# y_nd_view = self.y_nd[y_nd_slc]
self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
return x, y, ymn, ymx
self.xy_nd_start = shm._first.value
profiler('prepended xy history: {prepend_length}')
if append_len:
y_append = shm._array[post_slice]
if read_src_from_key:
y_append = y_append[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm,
array_key,
y_append,
post_slice,
append_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=True,
)
# self.y_nd[post_slice] = new_y_nd
# self.y_nd[xy_slice or post_slice] = xy_data
self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
self.xy_nd_stop = shm._last.value
profiler('appened xy history: {append_length}')
view_changed: bool = False
view_range: tuple[int, int] = (ivl, ivr)
if slice_to_inview:
view_changed = self._track_inview_range(view_range)
array = in_view
profiler(f'{self.flow.name} view range slice {view_range}')
hist = array[:slice_to_head]
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
if not len(hist):
# XXX: this might be why the profiler only has exits?
return
# TODO: hist here should be the pre-sliced
# x/y_data in the case where allocate_xy is
# defined?
x_1d, y_1d, connect = self.format_xy_nd_to_1d(
hist,
array_key,
view_range,
)
# update the last "in view data range"
if len(x_1d):
self._last_ivdr = x_1d[0], x_1d[slice_to_head]
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self._last_read = new_read
profiler('.format_to_1d()')
return (
x_1d,
y_1d,
connect,
prepend_len,
append_len,
view_changed,
)
###############################
# Sub-type override interface #
###############################
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
# XXX: was ``.allocate_xy()``
def allocate_xy_nd(
self,
src_shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert the structured-array ``src_shm`` format to
a equivalently shaped (and field-less) ``np.ndarray``.
Eg. a 4 field x N struct-array => (N, 4)
'''
y_nd = src_shm._array[data_field].copy()
x_nd = src_shm._array[index_field].copy()
return x_nd, y_nd
# XXX: was ``.update_xy()``
def incr_update_xy_nd(
self,
src_shm: ShmArray,
data_field: str,
new_from_src: np.ndarray, # portion of source that was updated
read_slc: slice,
ln: int, # len of updated
nd_start: int,
nd_stop: int,
is_append: bool,
index_field: str = 'index',
) -> tuple[
np.ndarray,
slice,
]:
# write pushed data to flattened copy
new_y_nd = new_from_src
# XXX
# TODO: this should be returned and written by caller!
# XXX
# generate same-valued-per-row x support based on y shape
if index_field != 'index':
self.x_nd[read_slc, :] = new_from_src[index_field]
return new_y_nd, read_slc
# XXX: was ``.format_xy()``
def format_xy_nd_to_1d(
self,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray, # 1d x
np.ndarray, # 1d y
np.ndarray | str, # connection array/style
]:
'''
Default xy-nd array to 1d pre-graphics-path render routine.
Return single field column data verbatim
'''
return (
array['index'],
array[array_key],
# 1d connection array or style-key to
# ``pg.functions.arrayToQPath()``
'all',
)
@njit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
class OHLCBarsFmtr(IncrementalFormatter):
fields: list[str] = ['open', 'high', 'low', 'close']
def allocate_xy_nd(
self,
ohlc_shm: ShmArray,
data_field: str,
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert an input struct-array holding OHLC samples into a pair of
flattened x, y arrays with the same size (datums wise) as the source
data.
'''
y_nd = ohlc_shm.ustruct(self.fields)
# generate an flat-interpolated x-domain
x_nd = (
np.broadcast_to(
ohlc_shm._array['index'][:, None],
(
ohlc_shm._array.size,
# 4, # only ohlc
y_nd.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_nd.any()
# write pushed data to flattened copy
return (
x_nd,
y_nd,
)
@staticmethod
@njit(
# TODO: for now need to construct this manually for readonly
# arrays, see https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
'''
Generate an array of lines objects from input ohlc data.
@ -198,110 +583,93 @@ def path_arrays_from_ohlc(
return x, y, c
# TODO: can we drop this frame and just use the above?
def format_xy_nd_to_1d(
self,
def gen_ohlc_qpath(
r: Renderer,
data: np.ndarray,
array_key: str, # we ignore this
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43,
) -> QtGui.QPainterPath:
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
'''
More or less direct proxy to ``path_arrays_from_ohlc()``
but with closed in kwargs for line spacing.
More or less direct proxy to the ``numba``-fied
``path_arrays_from_ohlc()`` (above) but with closed in kwargs
for line spacing.
'''
x, y, c = path_arrays_from_ohlc(
data,
x, y, c = self.path_arrays_from_ohlc(
array,
start,
bar_gap=w,
)
return x, y, c
def incr_update_xy_nd(
self,
def ohlc_to_line(
ohlc_shm: ShmArray,
data_field: str,
fields: list[str] = ['open', 'high', 'low', 'close']
) -> tuple[
np.ndarray,
np.ndarray,
]:
'''
Convert an input struct-array holding OHLC samples into a pair of
flattened x, y arrays with the same size (datums wise) as the source
data.
'''
y_out = ohlc_shm.ustruct(fields)
first = ohlc_shm._first.value
last = ohlc_shm._last.value
# write pushed data to flattened copy
y_out[first:last] = rfn.structured_to_unstructured(
ohlc_shm.array[fields]
)
# generate an flat-interpolated x-domain
x_out = (
np.broadcast_to(
ohlc_shm._array['index'][:, None],
(
ohlc_shm._array.size,
# 4, # only ohlc
y_out.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_out.any()
return (
x_out,
y_out,
)
def update_ohlc_to_line(
src_shm: ShmArray,
array_key: str,
src_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
data_field: str,
new_from_src: np.ndarray, # portion of source that was updated
read_slc: slice,
ln: int, # len of updated
nd_start: int,
nd_stop: int,
is_append: bool,
index_field: str = 'index',
) -> np.ndarray:
fields = ['open', 'high', 'low', 'close']
return (
rfn.structured_to_unstructured(src_update[fields]),
slc,
) -> tuple[
np.ndarray,
slice,
]:
# write newly pushed data to flattened copy
# a struct-arr is always passed in.
new_y_nd = rfn.structured_to_unstructured(
new_from_src[self.fields]
)
# XXX
# TODO: this should be returned and written by caller!
# XXX
# generate same-valued-per-row x support based on y shape
if index_field != 'index':
self.x_nd[read_slc, :] = new_from_src[index_field]
return new_y_nd, read_slc
class OHLCBarsAsCurveFmtr(OHLCBarsFmtr):
def format_xy_nd_to_1d(
self,
def ohlc_flat_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
) -> tuple[
np.ndarray,
np.ndarray,
np.nd.array,
str,
]:
]:
# TODO: in the case of an existing ``.update_xy()``
# should we be passing in array as an xy arrays tuple?
# 2 more datum-indexes to capture zero at end
x_flat = r.x_data[r._xy_first:r._xy_last]
y_flat = r.y_data[r._xy_first:r._xy_last]
x_flat = self.x_nd[self.xy_nd_start:self.xy_nd_stop]
y_flat = self.y_nd[self.xy_nd_start:self.xy_nd_stop]
# slice to view
ivl, ivr = vr
@ -315,12 +683,20 @@ def ohlc_flat_to_xy(
return x_iv, y_iv, 'all'
def to_step_format(
class StepCurveFmtr(IncrementalFormatter):
def allocate_xy_nd(
self,
shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[int, np.ndarray, np.ndarray]:
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert an input 1d shm array to a "step array" format
for use by path graphics generation.
@ -341,19 +717,25 @@ def to_step_format(
y_out[0, 0] = 0
return x_out, y_out
def incr_update_xy_nd(
self,
def update_step_xy(
src_shm: ShmArray,
array_key: str,
y_update: np.ndarray,
src_update: np.ndarray, # portion of source that was updated
slc: slice,
ln: int,
ln: int, # len of updated
first: int,
last: int,
is_append: bool,
) -> np.ndarray:
) -> tuple[
np.ndarray,
slice,
]:
# for a step curve we slice from one datum prior
# to the current "update slice" to get the previous
# "level".
@ -364,7 +746,7 @@ def update_step_xy(
slc = slice(start, end)
else:
new_y = y_update
new_y = src_update
return (
np.broadcast_to(
@ -373,22 +755,21 @@ def update_step_xy(
slc,
)
def format_xy_nd_to_1d(
self,
def step_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
) -> tuple[
np.ndarray,
np.ndarray,
np.nd.array,
str,
]:
]:
# 2 more datum-indexes to capture zero at end
x_step = r.x_data[r._xy_first:r._xy_last+2]
y_step = r.y_data[r._xy_first:r._xy_last+2]
x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2]
y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2]
lasts = array[['index', array_key]]
last = lasts[array_key][-1]
@ -411,3 +792,40 @@ def step_to_xy(
# )
return x_iv, y_iv, 'all'
def xy_downsample(
x,
y,
uppx,
x_spacer: float = 0.5,
) -> tuple[
np.ndarray,
np.ndarray,
float,
float,
]:
'''
Downsample 1D (flat ``numpy.ndarray``) arrays using M4 given an input
``uppx`` (units-per-pixel) and add space between discreet datums.
'''
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y, ymn, ymx = ds_m4(
x,
y,
uppx,
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
return x, y, ymn, ymx