Merge pull request #447 from pikers/pregraphics_formatters

Pregraphics formatters: `IncrementalFormatter`
update_qt_screen_info_script
goodboy 2023-01-31 13:55:04 -05:00 committed by GitHub
commit dcdfd2577a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1017 additions and 718 deletions

View File

@ -28,10 +28,7 @@ from PyQt5.QtWidgets import QGraphicsItem
from PyQt5.QtCore import ( from PyQt5.QtCore import (
Qt, Qt,
QLineF, QLineF,
QSizeF,
QRectF, QRectF,
# QRect,
QPointF,
) )
from PyQt5.QtGui import ( from PyQt5.QtGui import (
QPainter, QPainter,
@ -89,9 +86,9 @@ class Curve(pg.GraphicsObject):
''' '''
# sub-type customization methods # sub-type customization methods
sub_br: Optional[Callable] = None
sub_paint: Optional[Callable] = None
declare_paintables: Optional[Callable] = None declare_paintables: Optional[Callable] = None
sub_paint: Optional[Callable] = None
# sub_br: Optional[Callable] = None
def __init__( def __init__(
self, self,
@ -140,9 +137,7 @@ class Curve(pg.GraphicsObject):
# self.last_step_pen = pg.mkPen(hcolor(color), width=2) # self.last_step_pen = pg.mkPen(hcolor(color), width=2)
self.last_step_pen = pg.mkPen(pen, width=2) self.last_step_pen = pg.mkPen(pen, width=2)
# self._last_line: Optional[QLineF] = None
self._last_line = QLineF() self._last_line = QLineF()
self._last_w: float = 1
# flat-top style histogram-like discrete curve # flat-top style histogram-like discrete curve
# self._step_mode: bool = step_mode # self._step_mode: bool = step_mode
@ -231,8 +226,8 @@ class Curve(pg.GraphicsObject):
self.path.clear() self.path.clear()
if self.fast_path: if self.fast_path:
# self.fast_path.clear() self.fast_path.clear()
self.fast_path = None # self.fast_path = None
@cm @cm
def reset_cache(self) -> None: def reset_cache(self) -> None:
@ -252,77 +247,65 @@ class Curve(pg.GraphicsObject):
self.boundingRect = self._path_br self.boundingRect = self._path_br
return self._path_br() return self._path_br()
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
def _path_br(self): def _path_br(self):
''' '''
Post init ``.boundingRect()```. Post init ``.boundingRect()```.
''' '''
# hb = self.path.boundingRect() # profiler = Profiler(
hb = self.path.controlPointRect() # msg=f'Curve.boundingRect(): `{self._name}`',
hb_size = hb.size() # disabled=not pg_profile_enabled(),
# ms_threshold=ms_slower_then,
fp = self.fast_path
if fp:
fhb = fp.controlPointRect()
hb_size = fhb.size() + hb_size
# print(f'hb_size: {hb_size}')
# if self._last_step_rect:
# hb_size += self._last_step_rect.size()
# if self._line:
# br = self._last_step_rect.bottomRight()
# tl = QPointF(
# # self._vr[0],
# # hb.topLeft().y(),
# # 0,
# # hb_size.height() + 1
# ) # )
pr = self.path.controlPointRect()
# br = self._last_step_rect.bottomRight() hb_tl, hb_br = (
pr.topLeft(),
w = hb_size.width() pr.bottomRight(),
h = hb_size.height() )
mn_y = hb_tl.y()
sbr = self.sub_br mx_y = hb_br.y()
if sbr: most_left = hb_tl.x()
w, h = self.sub_br(w, h) most_right = hb_br.x()
else: # profiler('calc path vertices')
# assume plain line graphic and use
# default unit step in each direction. # TODO: if/when we get fast path appends working in the
# `Renderer`, then we might need to actually use this..
# only on a plane line do we include # fp = self.fast_path
# and extra index step's worth of width # if fp:
# since in the step case the end of the curve # fhb = fp.controlPointRect()
# actually terminates earlier so we don't need # # hb_size = fhb.size() + hb_size
# this for the last step. # br = pr.united(fhb)
w += self._last_w
# ll = self._last_line # XXX: *was* a way to allow sub-types to extend the
h += 1 # ll.y2() - ll.y1() # boundingrect calc, but in the one use case for a step curve
# doesn't seem like we need it as long as the last line segment
# br = QPointF( # is drawn as it is?
# self._vr[-1],
# # tl.x() + w, # sbr = self.sub_br
# tl.y() + h, # if sbr:
# ) # # w, h = self.sub_br(w, h)
# sub_br = sbr()
br = QRectF( # br = br.united(sub_br)
# top left # assume plain line graphic and use
# hb.topLeft() # default unit step in each direction.
# tl, ll = self._last_line
QPointF(hb.topLeft()), y1, y2 = ll.y1(), ll.y2()
x1, x2 = ll.x1(), ll.x2()
# br,
# total size ymn = min(y1, y2, mn_y)
# QSizeF(hb_size) ymx = max(y1, y2, mx_y)
# hb_size, most_left = min(x1, x2, most_left)
QSizeF(w, h) most_right = max(x1, x2, most_right)
# profiler('calc last line vertices')
return QRectF(
most_left,
ymn,
most_right - most_left + 1,
ymx,
) )
# print(f'bounding rect: {br}')
return br
def paint( def paint(
self, self,
@ -340,7 +323,7 @@ class Curve(pg.GraphicsObject):
sub_paint = self.sub_paint sub_paint = self.sub_paint
if sub_paint: if sub_paint:
sub_paint(p, profiler) sub_paint(p)
p.setPen(self.last_step_pen) p.setPen(self.last_step_pen)
p.drawLine(self._last_line) p.drawLine(self._last_line)
@ -450,36 +433,34 @@ class StepCurve(Curve):
y = src_data[array_key] y = src_data[array_key]
x_last = x[-1] x_last = x[-1]
x_2last = x[-2]
y_last = y[-1] y_last = y[-1]
step_size = x_last - x_2last
half_step = step_size / 2
# lol, commenting this makes step curves # lol, commenting this makes step curves
# all "black" for me :eyeroll:.. # all "black" for me :eyeroll:..
self._last_line = QLineF( self._last_line = QLineF(
x_last - w, 0, x_2last, 0,
x_last + w, 0, x_last, 0,
) )
self._last_step_rect = QRectF( self._last_step_rect = QRectF(
x_last - w, 0, x_last - half_step, 0,
x_last + w, y_last, step_size, y_last,
) )
return x, y return x, y
def sub_paint( def sub_paint(
self, self,
p: QPainter, p: QPainter,
profiler: Profiler,
) -> None: ) -> None:
# p.drawLines(*tuple(filter(bool, self._last_step_lines))) # p.drawLines(*tuple(filter(bool, self._last_step_lines)))
# p.drawRect(self._last_step_rect) # p.drawRect(self._last_step_rect)
p.fillRect(self._last_step_rect, self._brush) p.fillRect(self._last_step_rect, self._brush)
profiler('.fillRect()')
def sub_br( # def sub_br(
self, # self,
path_w: float, # parent_br: QRectF | None = None,
path_h: float, # ) -> QRectF:
# return self._last_step_rect
) -> (float, float):
# passthrough
return path_w, path_h

View File

@ -25,13 +25,10 @@ incremental update.
from __future__ import annotations from __future__ import annotations
from typing import ( from typing import (
Optional, Optional,
Callable,
Union,
) )
import msgspec import msgspec
import numpy as np import numpy as np
from numpy.lib import recfunctions as rfn
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5.QtGui import QPainterPath from PyQt5.QtGui import QPainterPath
from PyQt5.QtCore import QLineF from PyQt5.QtCore import QLineF
@ -44,9 +41,10 @@ from .._profile import (
# ms_slower_then, # ms_slower_then,
) )
from ._pathops import ( from ._pathops import (
gen_ohlc_qpath, IncrementalFormatter,
ohlc_to_line, OHLCBarsFmtr, # Plain OHLC renderer
to_step_format, OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm)
xy_downsample, xy_downsample,
) )
from ._ohlc import ( from ._ohlc import (
@ -65,65 +63,6 @@ from .._profile import Profiler
log = get_logger(__name__) log = get_logger(__name__)
# class FlowsTable(msgspec.Struct):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
def update_ohlc_to_line(
src_shm: ShmArray,
array_key: str,
src_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> np.ndarray:
fields = ['open', 'high', 'low', 'close']
return (
rfn.structured_to_unstructured(src_update[fields]),
slc,
)
def ohlc_flat_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
# TODO: in the case of an existing ``.update_xy()``
# should we be passing in array as an xy arrays tuple?
# 2 more datum-indexes to capture zero at end
x_flat = r.x_data[r._xy_first:r._xy_last]
y_flat = r.y_data[r._xy_first:r._xy_last]
# slice to view
ivl, ivr = vr
x_iv_flat = x_flat[ivl:ivr]
y_iv_flat = y_flat[ivl:ivr]
# reshape to 1d for graphics rendering
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
return x_iv, y_iv, 'all'
def render_baritems( def render_baritems(
flow: Flow, flow: Flow,
graphics: BarItems, graphics: BarItems,
@ -155,21 +94,24 @@ def render_baritems(
r = self._src_r r = self._src_r
if not r: if not r:
show_bars = True show_bars = True
# OHLC bars path renderer # OHLC bars path renderer
r = self._src_r = Renderer( r = self._src_r = Renderer(
flow=self, flow=self,
format_xy=gen_ohlc_qpath, fmtr=OHLCBarsFmtr(
last_read=read, shm=flow.shm,
flow=flow,
_last_read=read,
),
) )
ds_curve_r = Renderer( ds_curve_r = Renderer(
flow=self, flow=self,
last_read=read, fmtr=OHLCBarsAsCurveFmtr(
shm=flow.shm,
# incr update routines flow=flow,
allocate_xy=ohlc_to_line, _last_read=read,
update_xy=update_ohlc_to_line, ),
format_xy=ohlc_flat_to_xy,
) )
curve = FlattenedOHLC( curve = FlattenedOHLC(
@ -253,77 +195,6 @@ def render_baritems(
) )
def update_step_xy(
src_shm: ShmArray,
array_key: str,
y_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> np.ndarray:
# for a step curve we slice from one datum prior
# to the current "update slice" to get the previous
# "level".
if is_append:
start = max(last - 1, 0)
end = src_shm._last.value
new_y = src_shm._array[start:end][array_key]
slc = slice(start, end)
else:
new_y = y_update
return (
np.broadcast_to(
new_y[:, None], (new_y.size, 2),
),
slc,
)
def step_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
# 2 more datum-indexes to capture zero at end
x_step = r.x_data[r._xy_first:r._xy_last+2]
y_step = r.y_data[r._xy_first:r._xy_last+2]
lasts = array[['index', array_key]]
last = lasts[array_key][-1]
y_step[-1] = last
# slice out in-view data
ivl, ivr = vr
ys_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr+1]
# flatten to 1d
y_iv = ys_iv.reshape(ys_iv.size)
x_iv = xs_iv.reshape(xs_iv.size)
# print(
# f'ys_iv : {ys_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n'
# f'xs_iv: {xs_iv[-s:]}\n'
# f'x_iv: {x_iv[-s:]}\n'
# )
return x_iv, y_iv, 'all'
class Flow(msgspec.Struct): # , frozen=True): class Flow(msgspec.Struct): # , frozen=True):
''' '''
(Financial Signal-)Flow compound type which wraps a real-time (Financial Signal-)Flow compound type which wraps a real-time
@ -337,7 +208,7 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
name: str name: str
plot: pg.PlotItem plot: pg.PlotItem
graphics: Union[Curve, BarItems] graphics: Curve | BarItems
_shm: ShmArray _shm: ShmArray
yrange: tuple[float, float] = None yrange: tuple[float, float] = None
@ -346,7 +217,6 @@ class Flow(msgspec.Struct): # , frozen=True):
# normally this is just a plain line. # normally this is just a plain line.
ds_graphics: Optional[Curve] = None ds_graphics: Optional[Curve] = None
is_ohlc: bool = False is_ohlc: bool = False
render: bool = True # toggle for display loop render: bool = True # toggle for display loop
@ -554,9 +424,14 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head: int = -1 slice_to_head: int = -1
should_redraw: bool = False should_redraw: bool = False
should_line: bool = False
rkwargs = {} rkwargs = {}
should_line = False # TODO: probably specialize ``Renderer`` types instead of
# these logic checks?
# - put these blocks into a `.load_renderer()` meth?
# - consider a OHLCRenderer, StepCurveRenderer, Renderer?
r = self._src_r
if isinstance(graphics, BarItems): if isinstance(graphics, BarItems):
# XXX: special case where we change out graphics # XXX: special case where we change out graphics
# to a line after a certain uppx threshold. # to a line after a certain uppx threshold.
@ -576,16 +451,36 @@ class Flow(msgspec.Struct): # , frozen=True):
should_redraw = changed_to_line or not should_line should_redraw = changed_to_line or not should_line
self._in_ds = should_line self._in_ds = should_line
else: elif not r:
r = self._src_r if isinstance(graphics, StepCurve):
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer( r = self._src_r = Renderer(
flow=self, flow=self,
# TODO: rename this to something with ohlc fmtr=StepCurveFmtr(
last_read=read, shm=self.shm,
flow=self,
_last_read=read,
),
) )
# TODO: append logic inside ``.render()`` isn't
# correct yet for step curves.. remove this to see it.
should_redraw = True
slice_to_head = -2
else:
r = self._src_r
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer(
flow=self,
fmtr=IncrementalFormatter(
shm=self.shm,
flow=self,
_last_read=read,
),
)
# ``Curve`` derivative case(s): # ``Curve`` derivative case(s):
array_key = array_key or self.name array_key = array_key or self.name
# print(array_key) # print(array_key)
@ -595,19 +490,6 @@ class Flow(msgspec.Struct): # , frozen=True):
should_ds: bool = r._in_ds should_ds: bool = r._in_ds
showing_src_data: bool = not r._in_ds showing_src_data: bool = not r._in_ds
# step_mode = getattr(graphics, '_step_mode', False)
step_mode = isinstance(graphics, StepCurve)
if step_mode:
r.allocate_xy = to_step_format
r.update_xy = update_step_xy
r.format_xy = step_to_xy
# TODO: append logic inside ``.render()`` isn't
# correct yet for step curves.. remove this to see it.
should_redraw = True
slice_to_head = -2
# downsampling incremental state checking # downsampling incremental state checking
# check for and set std m4 downsample conditions # check for and set std m4 downsample conditions
uppx = graphics.x_uppx() uppx = graphics.x_uppx()
@ -683,26 +565,27 @@ class Flow(msgspec.Struct): # , frozen=True):
# XXX: SUPER UGGGHHH... without this we get stale cache # XXX: SUPER UGGGHHH... without this we get stale cache
# graphics that don't update until you downsampler again.. # graphics that don't update until you downsampler again..
if reset: # reset = False
with graphics.reset_cache(): # if reset:
# assign output paths to graphicis obj # with graphics.reset_cache():
graphics.path = r.path # # assign output paths to graphicis obj
graphics.fast_path = r.fast_path # graphics.path = r.path
# graphics.fast_path = r.fast_path
# XXX: we don't need this right? # # XXX: we don't need this right?
# graphics.draw_last_datum( # # graphics.draw_last_datum(
# path, # # path,
# src_array, # # src_array,
# data, # # data,
# reset, # # reset,
# array_key, # # array_key,
# ) # # )
# graphics.update() # # graphics.update()
# profiler('.update()') # # profiler('.update()')
else: # else:
# assign output paths to graphicis obj # assign output paths to graphicis obj
graphics.path = r.path graphics.path = r.path
graphics.fast_path = r.fast_path graphics.fast_path = r.fast_path
graphics.draw_last_datum( graphics.draw_last_datum(
path, path,
@ -786,51 +669,10 @@ class Flow(msgspec.Struct): # , frozen=True):
g.update() g.update()
def by_index_and_key(
renderer: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
return array['index'], array[array_key], 'all'
class Renderer(msgspec.Struct): class Renderer(msgspec.Struct):
flow: Flow flow: Flow
# last array view read fmtr: IncrementalFormatter
last_read: Optional[tuple] = None
# default just returns index, and named array from data
format_xy: Callable[
[np.ndarray, str],
tuple[np.ndarray]
] = by_index_and_key
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
allocate_xy: Optional[Callable[
[int, slice],
tuple[np.ndarray, np.nd.array]
]] = None
update_xy: Optional[Callable[
[int, slice], None]
] = None
x_data: Optional[np.ndarray] = None
y_data: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
_xy_first: int = 0
_xy_last: int = 0
# output graphics rendering, the main object # output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()`` # processed in ``QGraphicsObject.paint()``
@ -852,58 +694,11 @@ class Renderer(msgspec.Struct):
_last_uppx: float = 0 _last_uppx: float = 0
_in_ds: bool = False _in_ds: bool = False
# incremental update state(s)
_last_vr: Optional[tuple[float, float]] = None
_last_ivr: Optional[tuple[float, float]] = None
def diff(
self,
new_read: tuple[np.ndarray],
) -> tuple[
np.ndarray,
np.ndarray,
]:
(
last_xfirst,
last_xlast,
last_array,
last_ivl,
last_ivr,
last_in_view,
) = self.last_read
# TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read
(
xfirst,
xlast,
array,
ivl,
ivr,
in_view,
) = new_read
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast)
# blah blah blah
# do diffing for prepend, append and last entry
return (
slice(xfirst, last_xfirst),
prepend_length,
append_length,
slice(last_xlast, xlast),
)
def draw_path( def draw_path(
self, self,
x: np.ndarray, x: np.ndarray,
y: np.ndarray, y: np.ndarray,
connect: Union[str, np.ndarray] = 'all', connect: str | np.ndarray = 'all',
path: Optional[QPainterPath] = None, path: Optional[QPainterPath] = None,
redraw: bool = False, redraw: bool = False,
@ -981,166 +776,54 @@ class Renderer(msgspec.Struct):
''' '''
# TODO: can the renderer just call ``Flow.read()`` directly? # TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read # unpack latest source data read
fmtr = self.fmtr
( (
xfirst, _,
xlast, _,
array, array,
ivl, ivl,
ivr, ivr,
in_view, in_view,
) = new_read ) = new_read
(
pre_slice,
prepend_length,
append_length,
post_slice,
) = self.diff(new_read)
if self.update_xy:
shm = self.flow.shm
if self.y_data is None:
# we first need to allocate xy data arrays
# from the source data.
assert self.allocate_xy
self.x_data, self.y_data = self.allocate_xy(
shm,
array_key,
)
self._xy_first = shm._first.value
self._xy_last = shm._last.value
profiler('allocated xy history')
if prepend_length:
y_prepend = shm._array[pre_slice]
if read_from_key:
y_prepend = y_prepend[array_key]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
y_prepend,
pre_slice,
prepend_length,
self._xy_first,
self._xy_last,
is_append=False,
)
self.y_data[xy_slice] = xy_data
self._xy_first = shm._first.value
profiler('prepended xy history: {prepend_length}')
if append_length:
y_append = shm._array[post_slice]
if read_from_key:
y_append = y_append[array_key]
xy_data, xy_slice = self.update_xy(
shm,
array_key,
y_append,
post_slice,
append_length,
self._xy_first,
self._xy_last,
is_append=True,
)
# self.y_data[post_slice] = xy_data
# self.y_data[xy_slice or post_slice] = xy_data
self.y_data[xy_slice] = xy_data
self._xy_last = shm._last.value
profiler('appened xy history: {append_length}')
if use_vr:
array = in_view
# else:
# ivl, ivr = xfirst, xlast
hist = array[:slice_to_head]
# xy-path data transform: convert source data to a format # xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine. # able to be passed to a `QPainterPath` rendering routine.
if not len(hist): fmt_out = fmtr.format_to_1d(
new_read,
array_key,
profiler,
slice_to_head=slice_to_head,
read_src_from_key=read_from_key,
slice_to_inview=use_vr,
)
# no history in view case
if not fmt_out:
# XXX: this might be why the profiler only has exits? # XXX: this might be why the profiler only has exits?
return return
x_out, y_out, connect = self.format_xy( (
self, x_1d,
# TODO: hist here should be the pre-sliced y_1d,
# x/y_data in the case where allocate_xy is connect,
# defined? prepend_length,
hist, append_length,
array_key, view_changed,
(ivl, ivr), # append_tres,
)
profiler('sliced input arrays') ) = fmt_out
if (
use_vr
):
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
view_range = (ivl, ivr)
# print(f'{self._name} vr: {view_range}')
profiler(f'view range slice {view_range}')
vl, vr = view_range
zoom_or_append = False
last_vr = self._last_vr
last_ivr = self._last_ivr or vl, vr
# incremental in-view data update.
if last_vr:
# relative slice indices
lvl, lvr = last_vr
# abs slice indices
al, ar = last_ivr
# left_change = abs(x_iv[0] - al) >= 1
# right_change = abs(x_iv[-1] - ar) >= 1
if (
# likely a zoom view change
(vr - lvr) > 2 or vl < lvl
# append / prepend update
# we had an append update where the view range
# didn't change but the data-viewed (shifted)
# underneath, so we need to redraw.
# or left_change and right_change and last_vr == view_range
# not (left_change and right_change) and ivr
# (
# or abs(x_iv[ivr] - livr) > 1
):
zoom_or_append = True
self._last_vr = view_range
if len(x_out):
self._last_ivr = x_out[0], x_out[slice_to_head]
# redraw conditions # redraw conditions
if ( if (
prepend_length > 0 prepend_length > 0
or new_sample_rate or new_sample_rate
or view_changed
# NOTE: comment this to try and make "append paths"
# work below..
or append_length > 0 or append_length > 0
or zoom_or_append
): ):
should_redraw = True should_redraw = True
@ -1162,9 +845,9 @@ class Renderer(msgspec.Struct):
elif should_ds and uppx > 1: elif should_ds and uppx > 1:
x_out, y_out, ymn, ymx = xy_downsample( x_1d, y_1d, ymn, ymx = xy_downsample(
x_out, x_1d,
y_out, y_1d,
uppx, uppx,
) )
self.flow.yrange = ymn, ymx self.flow.yrange = ymn, ymx
@ -1175,8 +858,8 @@ class Renderer(msgspec.Struct):
self._in_ds = True self._in_ds = True
path = self.draw_path( path = self.draw_path(
x=x_out, x=x_1d,
y=y_out, y=y_1d,
connect=connect, connect=connect,
path=path, path=path,
redraw=True, redraw=True,
@ -1191,7 +874,6 @@ class Renderer(msgspec.Struct):
# TODO: get this piecewise prepend working - right now it's # TODO: get this piecewise prepend working - right now it's
# giving heck on vwap... # giving heck on vwap...
# elif prepend_length: # elif prepend_length:
# breakpoint()
# prepend_path = pg.functions.arrayToQPath( # prepend_path = pg.functions.arrayToQPath(
# x[0:prepend_length], # x[0:prepend_length],
@ -1208,18 +890,22 @@ class Renderer(msgspec.Struct):
elif ( elif (
append_length > 0 append_length > 0
and do_append and do_append
and not should_redraw
): ):
# print(f'{array_key} append len: {append_length}') print(f'{array_key} append len: {append_length}')
new_x = x_out[-append_length - 2:] # slice_to_head] # new_x = x_1d[-append_length - 2:] # slice_to_head]
new_y = y_out[-append_length - 2:] # slice_to_head] # new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path') profiler('sliced append path')
# (
# x_1d,
# y_1d,
# connect,
# ) = append_tres
profiler( profiler(
f'diffed array input, append_length={append_length}' f'diffed array input, append_length={append_length}'
) )
# if should_ds: # if should_ds and uppx > 1:
# new_x, new_y = xy_downsample( # new_x, new_y = xy_downsample(
# new_x, # new_x,
# new_y, # new_y,
@ -1228,14 +914,15 @@ class Renderer(msgspec.Struct):
# profiler(f'fast path downsample redraw={should_ds}') # profiler(f'fast path downsample redraw={should_ds}')
append_path = self.draw_path( append_path = self.draw_path(
x=new_x, x=x_1d,
y=new_y, y=y_1d,
connect=connect, connect=connect,
path=fast_path, path=fast_path,
) )
profiler('generated append qpath') profiler('generated append qpath')
if use_fpath: if use_fpath:
# print(f'{self.flow.name}: FAST PATH')
# an attempt at trying to make append-updates faster.. # an attempt at trying to make append-updates faster..
if fast_path is None: if fast_path is None:
fast_path = append_path fast_path = append_path
@ -1245,7 +932,12 @@ class Renderer(msgspec.Struct):
size = fast_path.capacity() size = fast_path.capacity()
profiler(f'connected fast path w size: {size}') profiler(f'connected fast path w size: {size}')
# print(f"append_path br: {append_path.boundingRect()}") print(
f"append_path br: {append_path.boundingRect()}\n"
f"path size: {size}\n"
f"append_path len: {append_path.length()}\n"
f"fast_path len: {fast_path.length()}\n"
)
# graphics.path.moveTo(new_x[0], new_y[0]) # graphics.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path) # path.connectPath(append_path)
@ -1259,10 +951,4 @@ class Renderer(msgspec.Struct):
self.path = path self.path = path
self.fast_path = fast_path self.fast_path = fast_path
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self.last_read = new_read
return self.path, array, reset return self.path, array, reset

View File

@ -25,8 +25,15 @@ from typing import (
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5 import (
from PyQt5.QtCore import QLineF, QPointF QtGui,
QtWidgets,
)
from PyQt5.QtCore import (
QLineF,
QRectF,
)
from PyQt5.QtGui import QPainterPath from PyQt5.QtGui import QPainterPath
from .._profile import pg_profile_enabled, ms_slower_then from .._profile import pg_profile_enabled, ms_slower_then
@ -114,8 +121,13 @@ class BarItems(pg.GraphicsObject):
# we expect the downsample curve report this. # we expect the downsample curve report this.
return 0 return 0
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
def boundingRect(self): def boundingRect(self):
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect # profiler = Profiler(
# msg=f'BarItems.boundingRect(): `{self._name}`',
# disabled=not pg_profile_enabled(),
# ms_threshold=ms_slower_then,
# )
# TODO: Can we do rect caching to make this faster # TODO: Can we do rect caching to make this faster
# like `pg.PlotCurveItem` does? In theory it's just # like `pg.PlotCurveItem` does? In theory it's just
@ -135,32 +147,37 @@ class BarItems(pg.GraphicsObject):
hb.topLeft(), hb.topLeft(),
hb.bottomRight(), hb.bottomRight(),
) )
mn_y = hb_tl.y()
mx_y = hb_br.y()
most_left = hb_tl.x()
most_right = hb_br.x()
# profiler('calc path vertices')
# need to include last bar height or BR will be off # need to include last bar height or BR will be off
mx_y = hb_br.y() # OHLC line segments: [hl, o, c]
mn_y = hb_tl.y() last_lines: tuple[QLineF] | None = self._last_bar_lines
last_lines = self._last_bar_lines
if last_lines: if last_lines:
body_line = self._last_bar_lines[0] (
if body_line: hl,
mx_y = max(mx_y, max(body_line.y1(), body_line.y2())) o,
mn_y = min(mn_y, min(body_line.y1(), body_line.y2())) c,
) = last_lines
most_right = c.x2() + 1
ymx = ymn = c.y2()
return QtCore.QRectF( if hl:
y1, y2 = hl.y1(), hl.y2()
# top left ymn = min(y1, y2)
QPointF( ymx = max(y1, y2)
hb_tl.x(), mx_y = max(ymx, mx_y)
mn_y, mn_y = min(ymn, mn_y)
), # profiler('calc last bar vertices')
# bottom right
QPointF(
hb_br.x() + 1,
mx_y,
)
return QRectF(
most_left,
mn_y,
most_right - most_left + 1,
mx_y - mn_y,
) )
def paint( def paint(
@ -213,11 +230,15 @@ class BarItems(pg.GraphicsObject):
# relevant fields # relevant fields
ohlc = src_data[fields] ohlc = src_data[fields]
last_row = ohlc[-1:] # last_row = ohlc[-1:]
# individual values # individual values
last_row = i, o, h, l, last = ohlc[-1] last_row = i, o, h, l, last = ohlc[-1]
# times = src_data['time']
# if times[-1] - times[-2]:
# breakpoint()
# generate new lines objects for updatable "current bar" # generate new lines objects for updatable "current bar"
self._last_bar_lines = bar_from_ohlc_row(last_row) self._last_bar_lines = bar_from_ohlc_row(last_row)
@ -248,4 +269,5 @@ class BarItems(pg.GraphicsObject):
# date / from some previous sample. It's weird though # date / from some previous sample. It's weird though
# because i've seen it do this to bars i - 3 back? # because i've seen it do this to bars i - 3 back?
# return ohlc['time'], ohlc['close']
return ohlc['index'], ohlc['close'] return ohlc['index'], ohlc['close']

View File

@ -19,15 +19,16 @@ Super fast ``QPainterPath`` generation related operator routines.
""" """
from __future__ import annotations from __future__ import annotations
from typing import ( from typing import (
# Optional, Optional,
TYPE_CHECKING, TYPE_CHECKING,
) )
import msgspec
import numpy as np import numpy as np
from numpy.lib import recfunctions as rfn from numpy.lib import recfunctions as rfn
from numba import njit, float64, int64 # , optional from numba import njit, float64, int64 # , optional
# import pyqtgraph as pg # import pyqtgraph as pg
from PyQt5 import QtGui # from PyQt5 import QtGui
# from PyQt5.QtCore import QLineF, QPointF # from PyQt5.QtCore import QLineF, QPointF
from ..data._sharedmem import ( from ..data._sharedmem import (
@ -39,7 +40,778 @@ from ._compression import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._flows import Renderer from ._flows import (
Renderer,
Flow,
)
from .._profile import Profiler
def by_index_and_key(
renderer: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
return array['index'], array[array_key], 'all'
class IncrementalFormatter(msgspec.Struct):
'''
Incrementally updating, pre-path-graphics tracking, formatter.
Allows tracking source data state in an updateable pre-graphics
``np.ndarray`` format (in local process memory) as well as
incrementally rendering from that format **to** 1d x/y for path
generation using ``pg.functions.arrayToQPath()``.
'''
shm: ShmArray
flow: Flow
# last read from shm (usually due to an update call)
_last_read: tuple[
int,
int,
np.ndarray
]
@property
def last_read(self) -> tuple | None:
return self._last_read
def __repr__(self) -> str:
msg = (
f'{type(self)}: ->\n\n'
f'fqsn={self.flow.name}\n'
f'shm_name={self.shm.token["shm_name"]}\n\n'
f'last_vr={self._last_vr}\n'
f'last_ivdr={self._last_ivdr}\n\n'
f'xy_nd_start={self.xy_nd_start}\n'
f'xy_nd_stop={self.xy_nd_stop}\n\n'
)
x_nd_len = 0
y_nd_len = 0
if self.x_nd is not None:
x_nd_len = len(self.x_nd)
y_nd_len = len(self.y_nd)
msg += (
f'x_nd_len={x_nd_len}\n'
f'y_nd_len={y_nd_len}\n'
)
return msg
def diff(
self,
new_read: tuple[np.ndarray],
) -> tuple[
np.ndarray,
np.ndarray,
]:
(
last_xfirst,
last_xlast,
last_array,
last_ivl,
last_ivr,
last_in_view,
) = self.last_read
# TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read
(
xfirst,
xlast,
array,
ivl,
ivr,
in_view,
) = new_read
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast)
# blah blah blah
# do diffing for prepend, append and last entry
return (
slice(xfirst, last_xfirst),
prepend_length,
append_length,
slice(last_xlast, xlast),
)
# Incrementally updated xy ndarray formatted data, a pre-1d
# format which is updated and cached independently of the final
# pre-graphics-path 1d format.
x_nd: Optional[np.ndarray] = None
y_nd: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
xy_nd_start: int = 0
xy_nd_stop: int = 0
# TODO: eventually incrementally update 1d-pre-graphics path data?
# x_1d: Optional[np.ndarray] = None
# y_1d: Optional[np.ndarray] = None
# incremental view-change state(s) tracking
_last_vr: tuple[float, float] | None = None
_last_ivdr: tuple[float, float] | None = None
def _track_inview_range(
self,
view_range: tuple[int, int],
) -> bool:
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
vl, vr = view_range
zoom_or_append = False
last_vr = self._last_vr
# incremental in-view data update.
if last_vr:
lvl, lvr = last_vr # relative slice indices
# TODO: detecting more specifically the interaction changes
# last_ivr = self._last_ivdr or (vl, vr)
# al, ar = last_ivr # abs slice indices
# left_change = abs(x_iv[0] - al) >= 1
# right_change = abs(x_iv[-1] - ar) >= 1
# likely a zoom/pan view change or data append update
if (
(vr - lvr) > 2
or vl < lvl
# append / prepend update
# we had an append update where the view range
# didn't change but the data-viewed (shifted)
# underneath, so we need to redraw.
# or left_change and right_change and last_vr == view_range
# not (left_change and right_change) and ivr
# (
# or abs(x_iv[ivr] - livr) > 1
):
zoom_or_append = True
self._last_vr = view_range
return zoom_or_append
def format_to_1d(
self,
new_read: tuple,
array_key: str,
profiler: Profiler,
slice_to_head: int = -1,
read_src_from_key: bool = True,
slice_to_inview: bool = True,
) -> tuple[
np.ndarray,
np.ndarray,
]:
shm = self.shm
(
_,
_,
array,
ivl,
ivr,
in_view,
) = new_read
(
pre_slice,
prepend_len,
append_len,
post_slice,
) = self.diff(new_read)
if self.y_nd is None:
# we first need to allocate xy data arrays
# from the source data.
self.x_nd, self.y_nd = self.allocate_xy_nd(
shm,
array_key,
)
self.xy_nd_start = shm._first.value
self.xy_nd_stop = shm._last.value
profiler('allocated xy history')
if prepend_len:
y_prepend = shm._array[pre_slice]
if read_src_from_key:
y_prepend = y_prepend[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm,
array_key,
# this is the pre-sliced, "normally expected"
# new data that an updater would normally be
# expected to process, however in some cases (like
# step curves) the updater routine may want to do
# the source history-data reading itself, so we pass
# both here.
y_prepend,
pre_slice,
prepend_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=False,
)
# y_nd_view = self.y_nd[y_nd_slc]
self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
self.xy_nd_start = shm._first.value
profiler('prepended xy history: {prepend_length}')
if append_len:
y_append = shm._array[post_slice]
if read_src_from_key:
y_append = y_append[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm,
array_key,
y_append,
post_slice,
append_len,
self.xy_nd_start,
self.xy_nd_stop,
is_append=True,
)
# self.y_nd[post_slice] = new_y_nd
# self.y_nd[xy_slice or post_slice] = xy_data
self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
self.xy_nd_stop = shm._last.value
profiler('appened xy history: {append_length}')
view_changed: bool = False
view_range: tuple[int, int] = (ivl, ivr)
if slice_to_inview:
view_changed = self._track_inview_range(view_range)
array = in_view
profiler(f'{self.flow.name} view range slice {view_range}')
hist = array[:slice_to_head]
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
if not len(hist):
# XXX: this might be why the profiler only has exits?
return
# TODO: hist here should be the pre-sliced
# x/y_data in the case where allocate_xy is
# defined?
x_1d, y_1d, connect = self.format_xy_nd_to_1d(
hist,
array_key,
view_range,
)
# app_tres = None
# if append_len:
# appended = array[-append_len-1:slice_to_head]
# app_tres = self.format_xy_nd_to_1d(
# appended,
# array_key,
# (
# view_range[1] - append_len + slice_to_head,
# view_range[1]
# ),
# )
# # assert (len(appended) - 1) == append_len
# # assert len(appended) == append_len
# print(
# f'{self.flow.name} APPEND LEN: {append_len}\n'
# f'{self.flow.name} APPENDED: {appended}\n'
# f'{self.flow.name} app_tres: {app_tres}\n'
# )
# update the last "in view data range"
if len(x_1d):
self._last_ivdr = x_1d[0], x_1d[slice_to_head]
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self._last_read = new_read
profiler('.format_to_1d()')
return (
x_1d,
y_1d,
connect,
prepend_len,
append_len,
view_changed,
# app_tres,
)
###############################
# Sub-type override interface #
###############################
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
# XXX: was ``.allocate_xy()``
def allocate_xy_nd(
self,
src_shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert the structured-array ``src_shm`` format to
a equivalently shaped (and field-less) ``np.ndarray``.
Eg. a 4 field x N struct-array => (N, 4)
'''
y_nd = src_shm._array[data_field].copy()
x_nd = src_shm._array[index_field].copy()
return x_nd, y_nd
# XXX: was ``.update_xy()``
def incr_update_xy_nd(
self,
src_shm: ShmArray,
data_field: str,
new_from_src: np.ndarray, # portion of source that was updated
read_slc: slice,
ln: int, # len of updated
nd_start: int,
nd_stop: int,
is_append: bool,
index_field: str = 'index',
) -> tuple[
np.ndarray,
slice,
]:
# write pushed data to flattened copy
new_y_nd = new_from_src
# XXX
# TODO: this should be returned and written by caller!
# XXX
# generate same-valued-per-row x support based on y shape
if index_field != 'index':
self.x_nd[read_slc, :] = new_from_src[index_field]
return new_y_nd, read_slc
# XXX: was ``.format_xy()``
def format_xy_nd_to_1d(
self,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray, # 1d x
np.ndarray, # 1d y
np.ndarray | str, # connection array/style
]:
'''
Default xy-nd array to 1d pre-graphics-path render routine.
Return single field column data verbatim
'''
return (
array['index'],
array[array_key],
# 1d connection array or style-key to
# ``pg.functions.arrayToQPath()``
'all',
)
class OHLCBarsFmtr(IncrementalFormatter):
fields: list[str] = ['open', 'high', 'low', 'close']
def allocate_xy_nd(
self,
ohlc_shm: ShmArray,
data_field: str,
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert an input struct-array holding OHLC samples into a pair of
flattened x, y arrays with the same size (datums wise) as the source
data.
'''
y_nd = ohlc_shm.ustruct(self.fields)
# generate an flat-interpolated x-domain
x_nd = (
np.broadcast_to(
ohlc_shm._array['index'][:, None],
(
ohlc_shm._array.size,
# 4, # only ohlc
y_nd.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_nd.any()
# write pushed data to flattened copy
return (
x_nd,
y_nd,
)
@staticmethod
@njit(
# TODO: for now need to construct this manually for readonly
# arrays, see https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
# TODO: can we drop this frame and just use the above?
def format_xy_nd_to_1d(
self,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43,
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
'''
More or less direct proxy to the ``numba``-fied
``path_arrays_from_ohlc()`` (above) but with closed in kwargs
for line spacing.
'''
x, y, c = self.path_arrays_from_ohlc(
array,
start,
bar_gap=w,
)
return x, y, c
def incr_update_xy_nd(
self,
src_shm: ShmArray,
data_field: str,
new_from_src: np.ndarray, # portion of source that was updated
read_slc: slice,
ln: int, # len of updated
nd_start: int,
nd_stop: int,
is_append: bool,
index_field: str = 'index',
) -> tuple[
np.ndarray,
slice,
]:
# write newly pushed data to flattened copy
# a struct-arr is always passed in.
new_y_nd = rfn.structured_to_unstructured(
new_from_src[self.fields]
)
# XXX
# TODO: this should be returned and written by caller!
# XXX
# generate same-valued-per-row x support based on y shape
if index_field != 'index':
self.x_nd[read_slc, :] = new_from_src[index_field]
return new_y_nd, read_slc
class OHLCBarsAsCurveFmtr(OHLCBarsFmtr):
def format_xy_nd_to_1d(
self,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.ndarray,
str,
]:
# TODO: in the case of an existing ``.update_xy()``
# should we be passing in array as an xy arrays tuple?
# 2 more datum-indexes to capture zero at end
x_flat = self.x_nd[self.xy_nd_start:self.xy_nd_stop]
y_flat = self.y_nd[self.xy_nd_start:self.xy_nd_stop]
# slice to view
ivl, ivr = vr
x_iv_flat = x_flat[ivl:ivr]
y_iv_flat = y_flat[ivl:ivr]
# reshape to 1d for graphics rendering
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
return x_iv, y_iv, 'all'
class StepCurveFmtr(IncrementalFormatter):
def allocate_xy_nd(
self,
shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[
np.ndarray, # x
np.nd.array # y
]:
'''
Convert an input 1d shm array to a "step array" format
for use by path graphics generation.
'''
i = shm._array['index'].copy()
out = shm._array[data_field].copy()
x_out = np.broadcast_to(
i[:, None],
(i.size, 2),
) + np.array([-0.5, 0.5])
y_out = np.empty((len(out), 2), dtype=out.dtype)
y_out[:] = out[:, np.newaxis]
# start y at origin level
y_out[0, 0] = 0
return x_out, y_out
def incr_update_xy_nd(
self,
src_shm: ShmArray,
array_key: str,
src_update: np.ndarray, # portion of source that was updated
slc: slice,
ln: int, # len of updated
first: int,
last: int,
is_append: bool,
) -> tuple[
np.ndarray,
slice,
]:
# for a step curve we slice from one datum prior
# to the current "update slice" to get the previous
# "level".
if is_append:
start = max(last - 1, 0)
end = src_shm._last.value
new_y = src_shm._array[start:end][array_key]
slc = slice(start, end)
else:
new_y = src_update
return (
np.broadcast_to(
new_y[:, None], (new_y.size, 2),
),
slc,
)
def format_xy_nd_to_1d(
self,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.ndarray,
str,
]:
lasts = array[['index', array_key]]
last = lasts[array_key][-1]
# 2 more datum-indexes to capture zero at end
x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2]
y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2]
y_step[-1] = last
# slice out in-view data
ivl, ivr = vr
ys_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr+1]
# flatten to 1d
y_iv = ys_iv.reshape(ys_iv.size)
x_iv = xs_iv.reshape(xs_iv.size)
# print(
# f'ys_iv : {ys_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n'
# f'xs_iv: {xs_iv[-s:]}\n'
# f'x_iv: {x_iv[-s:]}\n'
# )
return x_iv, y_iv, 'all'
def xy_downsample( def xy_downsample(
@ -55,7 +827,11 @@ def xy_downsample(
float, float,
float, float,
]: ]:
'''
Downsample 1D (flat ``numpy.ndarray``) arrays using M4 given an input
``uppx`` (units-per-pixel) and add space between discreet datums.
'''
# downsample whenever more then 1 pixels per datum can be shown. # downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing # always refresh data bounds until we get diffing
# working properly, see above.. # working properly, see above..
@ -73,169 +849,3 @@ def xy_downsample(
y = y.flatten() y = y.flatten()
return x, y, ymn, ymx return x, y, ymn, ymx
@njit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
def gen_ohlc_qpath(
r: Renderer,
data: np.ndarray,
array_key: str, # we ignore this
vr: tuple[int, int],
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43,
) -> QtGui.QPainterPath:
'''
More or less direct proxy to ``path_arrays_from_ohlc()``
but with closed in kwargs for line spacing.
'''
x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
return x, y, c
def ohlc_to_line(
ohlc_shm: ShmArray,
data_field: str,
fields: list[str] = ['open', 'high', 'low', 'close']
) -> tuple[
np.ndarray,
np.ndarray,
]:
'''
Convert an input struct-array holding OHLC samples into a pair of
flattened x, y arrays with the same size (datums wise) as the source
data.
'''
y_out = ohlc_shm.ustruct(fields)
first = ohlc_shm._first.value
last = ohlc_shm._last.value
# write pushed data to flattened copy
y_out[first:last] = rfn.structured_to_unstructured(
ohlc_shm.array[fields]
)
# generate an flat-interpolated x-domain
x_out = (
np.broadcast_to(
ohlc_shm._array['index'][:, None],
(
ohlc_shm._array.size,
# 4, # only ohlc
y_out.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_out.any()
return (
x_out,
y_out,
)
def to_step_format(
shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[int, np.ndarray, np.ndarray]:
'''
Convert an input 1d shm array to a "step array" format
for use by path graphics generation.
'''
i = shm._array['index'].copy()
out = shm._array[data_field].copy()
x_out = np.broadcast_to(
i[:, None],
(i.size, 2),
) + np.array([-0.5, 0.5])
y_out = np.empty((len(out), 2), dtype=out.dtype)
y_out[:] = out[:, np.newaxis]
# start y at origin level
y_out[0, 0] = 0
return x_out, y_out