Store lines graphics in struct array to simplify indexing

bar_select
Tyler Goodlet 2020-07-09 08:37:30 -04:00
parent 4ceffdd83f
commit f9084b8650
1 changed files with 79 additions and 74 deletions

View File

@ -1,9 +1,9 @@
"""
Chart graphics for displaying a slew of different data types.
"""
# from typing import Dict, Any
from typing import List
from enum import Enum
from contextlib import contextmanager
from itertools import chain
import numpy as np
import pyqtgraph as pg
@ -169,17 +169,34 @@ class CrossHairItem(pg.GraphicsObject):
return self.parent.boundingRect()
def _mk_lines_array(data: List, size: int) -> np.ndarray:
"""Create an ndarray to hold lines graphics objects.
"""
# TODO: might want to just make this a 2d array to be faster at
# flattening using .ravel()?
return np.empty_like(
data,
shape=(int(size),),
dtype=[
('index', int),
('body', object),
('rarm', object),
('larm', object)
],
)
def bars_from_ohlc(
data: np.ndarray,
start: int = 0,
w: float = 0.4,
w: float = 0.43,
) -> np.ndarray:
lines = np.empty_like(data, shape=(data.shape[0]*3,), dtype=object)
"""Generate an array of lines objects from input ohlc data.
"""
lines = _mk_lines_array(data, data.shape[0])
for i, q in enumerate(data[start:], start=start):
low = q['low']
high = q['high']
index = q['index']
low, high, index = q['low'], q['high'], q['index']
# high - low line
if low != high:
@ -195,7 +212,8 @@ def bars_from_ohlc(
c = QLineF(index + w, q['close'], index, q['close'])
# indexing here is as per the below comments
lines[3*i:3*i+3] = (hl, o, c)
# lines[3*i:3*i+3] = (hl, o, c)
lines[i] = (index, hl, o, c)
# if not _tina_mode: # piker mode
# else _tina_mode:
@ -240,27 +258,10 @@ class BarItems(pg.GraphicsObject):
# XXX: not sure this actually needs to be an array other
# then for the old tina mode calcs for up/down bars below?
# lines container
self.lines: np.ndarray = np.empty_like(
[], shape=(int(50e3),), dtype=object)
self._index = 0
self.lines = _mk_lines_array([], 50e3)
def _set_index(self, val):
# breakpoint()
self._index = val
def _get_index(self):
return self._index
index = property(_get_index, _set_index)
# TODO: can we be faster just dropping this?
@contextmanager
def painter(self):
# pre-computing a QPicture object allows paint() to run much
# more quickly, rather than re-drawing the shapes every time.
p = QtGui.QPainter(self.picture)
yield p
p.end()
# track the current length of drawable lines within the larger array
self.index: int = 0
@timeit
def draw_from_data(
@ -270,17 +271,28 @@ class BarItems(pg.GraphicsObject):
):
"""Draw OHLC datum graphics from a ``np.recarray``.
"""
lines = bars_from_ohlc(data, start=start, w=self.w)
lines = bars_from_ohlc(data, start=start)
# save graphics for later reference and keep track
# of current internal "last index"
index = len(lines)
self.lines[:index] = lines
self.index = index
self.draw_lines()
with self.painter() as p:
p.setPen(self.bull_pen)
p.drawLines(*self.lines[:index])
def draw_lines(self):
"""Draw the current line set using the painter.
"""
to_draw = self.lines[
['body', 'larm', 'rarm']][:self.index]
# pre-computing a QPicture object allows paint() to run much
# more quickly, rather than re-drawing the shapes every time.
p = QtGui.QPainter(self.picture)
p.setPen(self.bull_pen)
# TODO: might be better to use 2d array
p.drawLines(*chain.from_iterable(to_draw))
p.end()
def update_from_array(
self,
@ -297,58 +309,51 @@ class BarItems(pg.GraphicsObject):
"""
index = self.index
length = len(array)
idata = int(index/3) - 1
extra = length - idata
extra = length - index
if extra > 0:
# generate new graphics to match provided array
new = array[-extra:]
lines = bars_from_ohlc(new, w=self.w)
new = array[index:index + extra]
lines = bars_from_ohlc(new)
bars_added = len(lines)
self.lines[index:index + bars_added] = lines
self.index += bars_added
added = len(new) * 3
assert len(lines) == added
# else: # current bar update
# do we really need to verify the entire past data set?
# index, time, open, high, low, close, volume
i, time, _, _, _, close, _ = array[-1]
last = close
i, body, larm, rarm = self.lines[index-1]
if not rarm:
breakpoint()
self.lines[index:index + len(lines)] = lines
# XXX: is there a faster way to modify this?
# update right arm
rarm.setLine(rarm.x1(), last, rarm.x2(), last)
self.index += len(lines)
# update body
high = body.y2()
low = body.y1()
if last < low:
low = last
else: # current bar update
# do we really need to verify the entire past data set?
# index, time, open, high, low, close, volume
i, time, _, _, _, close, _ = array[-1]
last = close
body, larm, rarm = self.lines[index-3:index]
if not rarm:
breakpoint()
if last > high:
high = last
# XXX: is there a faster way to modify this?
# update right arm
rarm.setLine(rarm.x1(), last, rarm.x2(), last)
# update body
high = body.y2()
low = body.y1()
if last < low:
low = last
if last > high:
high = last
if getattr(body, '_flat', None) and low != high:
# if the bar was flat it likely does not have
# the index set correctly due to a rendering bug
# see above
body.setLine(i, low, i, high)
body._flat = False
else:
body.setLine(body.x1(), low, body.x2(), high)
if getattr(body, '_flat', None) and low != high:
# if the bar was flat it likely does not have
# the index set correctly due to a rendering bug
# see above
body.setLine(i, low, i, high)
body._flat = False
else:
body.setLine(body.x1(), low, body.x2(), high)
# draw the pic
with self.painter() as p:
p.setPen(self.bull_pen)
p.drawLines(*self.lines[:index])
self.draw_lines()
# trigger re-render
self.update()
# trigger re-render
self.update()
# be compat with ``pg.PlotCurveItem``
setData = update_from_array