Merge pull request #257 from pikers/plotitem_overlays

`PlotItem` overlays
py3.10_support
goodboy 2022-01-25 08:24:23 -05:00 committed by GitHub
commit 05b8e3a199
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1181 additions and 298 deletions

View File

@ -18,6 +18,7 @@
Chart axes graphics and behavior. Chart axes graphics and behavior.
""" """
import functools
from typing import List, Tuple, Optional from typing import List, Tuple, Optional
from math import floor from math import floor
@ -33,17 +34,18 @@ _axis_pen = pg.mkPen(hcolor('bracket'))
class Axis(pg.AxisItem): class Axis(pg.AxisItem):
"""A better axis that sizes tick contents considering font size. '''
A better axis that sizes tick contents considering font size.
""" '''
def __init__( def __init__(
self, self,
linkedsplits, linkedsplits,
typical_max_str: str = '100 000.000', typical_max_str: str = '100 000.000',
min_tick: int = 2, min_tick: int = 2,
**kwargs **kwargs
) -> None:
) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
# XXX: pretty sure this makes things slower # XXX: pretty sure this makes things slower
@ -95,7 +97,12 @@ class PriceAxis(Axis):
# XXX: drop for now since it just eats up h space # XXX: drop for now since it just eats up h space
def tickStrings(self, vals, scale, spacing): def tickStrings(
self,
vals,
scale,
spacing,
):
# TODO: figure out how to enforce min tick spacing by passing # TODO: figure out how to enforce min tick spacing by passing
# it into the parent type # it into the parent type
@ -131,9 +138,8 @@ class DynamicDateAxis(Axis):
indexes: List[int], indexes: List[int],
) -> List[str]: ) -> List[str]:
# try:
chart = self.linkedsplits.chart chart = self.linkedsplits.chart
bars = chart._arrays['ohlc'] bars = chart._arrays[chart.name]
shm = self.linkedsplits.chart._shm shm = self.linkedsplits.chart._shm
first = shm._first.value first = shm._first.value
@ -156,7 +162,14 @@ class DynamicDateAxis(Axis):
delay = times[-1] - times[-2] delay = times[-1] - times[-2]
return dts.strftime(self.tick_tpl[delay]) return dts.strftime(self.tick_tpl[delay])
def tickStrings(self, values: List[float], scale, spacing): def tickStrings(
self,
values: tuple[float],
scale,
spacing,
):
# info = self.tickStrings.cache_info()
# print(info)
return self._indexes_to_timestrs(values) return self._indexes_to_timestrs(values)

View File

@ -19,6 +19,8 @@ High level chart-widget apis.
''' '''
from __future__ import annotations from __future__ import annotations
from functools import partial
from dataclasses import dataclass
from typing import Optional from typing import Optional
from PyQt5 import QtCore, QtWidgets from PyQt5 import QtCore, QtWidgets
@ -29,7 +31,6 @@ from PyQt5.QtWidgets import (
QHBoxLayout, QHBoxLayout,
QVBoxLayout, QVBoxLayout,
QSplitter, QSplitter,
# QSizePolicy,
) )
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
@ -61,6 +62,7 @@ from ..data._sharedmem import ShmArray
from ..log import get_logger from ..log import get_logger
from ._interaction import ChartView from ._interaction import ChartView
from ._forms import FieldsForm from ._forms import FieldsForm
from ._overlay import PlotItemOverlay
log = get_logger(__name__) log = get_logger(__name__)
@ -322,17 +324,8 @@ class LinkedSplits(QWidget):
self.subplots: dict[tuple[str, ...], ChartPlotWidget] = {} self.subplots: dict[tuple[str, ...], ChartPlotWidget] = {}
self.godwidget = godwidget self.godwidget = godwidget
# placeholder for last appended ``PlotItem``'s bottom axis.
self.xaxis = DynamicDateAxis( self.xaxis_chart = None
orientation='bottom',
linkedsplits=self
)
# if _xaxis_at == 'bottom':
# self.xaxis.setStyle(showValues=False)
# self.xaxis.hide()
# else:
# self.xaxis_ind.setStyle(showValues=False)
# self.xaxis.hide()
self.splitter = QSplitter(QtCore.Qt.Vertical) self.splitter = QSplitter(QtCore.Qt.Vertical)
self.splitter.setMidLineWidth(0) self.splitter.setMidLineWidth(0)
@ -410,7 +403,6 @@ class LinkedSplits(QWidget):
name=symbol.key, name=symbol.key,
array=array, array=array,
# xaxis=self.xaxis,
style=style, style=style,
_is_main=True, _is_main=True,
@ -420,7 +412,10 @@ class LinkedSplits(QWidget):
self.chart.addItem(self.cursor) self.chart.addItem(self.cursor)
# axis placement # axis placement
if _xaxis_at == 'bottom': if (
_xaxis_at == 'bottom' and
'bottom' in self.chart.plotItem.axes
):
self.chart.hideAxis('bottom') self.chart.hideAxis('bottom')
# style? # style?
@ -438,7 +433,6 @@ class LinkedSplits(QWidget):
array: np.ndarray, array: np.ndarray,
array_key: Optional[str] = None, array_key: Optional[str] = None,
# xaxis: Optional[DynamicDateAxis] = None,
style: str = 'line', style: str = 'line',
_is_main: bool = False, _is_main: bool = False,
@ -446,31 +440,28 @@ class LinkedSplits(QWidget):
**cpw_kwargs, **cpw_kwargs,
) -> 'ChartPlotWidget': ) -> ChartPlotWidget:
'''Add (sub)plots to chart widget by key. '''
Add (sub)plots to chart widget by key.
''' '''
if self.chart is None and not _is_main: if self.chart is None and not _is_main:
raise RuntimeError( raise RuntimeError(
"A main plot must be created first with `.plot_ohlc_main()`") "A main plot must be created first with `.plot_ohlc_main()`")
# source of our custom interactions
cv = ChartView(name)
cv.linkedsplits = self
# use "indicator axis" by default # use "indicator axis" by default
# TODO: we gotta possibly assign this back # TODO: we gotta possibly assign this back
# to the last subplot on removal of some last subplot # to the last subplot on removal of some last subplot
xaxis = DynamicDateAxis( xaxis = DynamicDateAxis(
orientation='bottom', orientation='bottom',
linkedsplits=self linkedsplits=self
) )
axes = {
if self.xaxis: 'right': PriceAxis(linkedsplits=self, orientation='right'),
self.xaxis.hide() 'left': PriceAxis(linkedsplits=self, orientation='left'),
self.xaxis = xaxis 'bottom': xaxis,
}
qframe = ChartnPane( qframe = ChartnPane(
sidepane=sidepane, sidepane=sidepane,
@ -486,15 +477,21 @@ class LinkedSplits(QWidget):
array=array, array=array,
parent=qframe, parent=qframe,
linkedsplits=self, linkedsplits=self,
axisItems={ axisItems=axes,
'bottom': xaxis,
'right': PriceAxis(linkedsplits=self, orientation='right'),
'left': PriceAxis(linkedsplits=self, orientation='left'),
},
viewBox=cv,
**cpw_kwargs, **cpw_kwargs,
) )
if self.xaxis_chart:
# presuming we only want it at the true bottom of all charts.
# XXX: uses new api from our ``pyqtgraph`` fork.
# https://github.com/pikers/pyqtgraph/tree/plotitemoverlay_onto_pg_master
_ = self.xaxis_chart.removeAxis('bottom', unlink=False)
assert 'bottom' not in self.xaxis_chart.plotItem.axes
self.xaxis_chart = cpw
if self.xaxis_chart is None:
self.xaxis_chart = cpw
qframe.chart = cpw qframe.chart = cpw
qframe.hbox.addWidget(cpw) qframe.hbox.addWidget(cpw)
@ -510,17 +507,13 @@ class LinkedSplits(QWidget):
) )
cpw.sidepane = sidepane cpw.sidepane = sidepane
# give viewbox as reference to chart
# allowing for kb controls and interactions on **this** widget
# (see our custom view mode in `._interactions.py`)
cv.chart = cpw
cpw.plotItem.vb.linkedsplits = self cpw.plotItem.vb.linkedsplits = self
cpw.setFrameStyle( cpw.setFrameStyle(
QtWidgets.QFrame.StyledPanel QtWidgets.QFrame.StyledPanel
# | QtWidgets.QFrame.Plain # | QtWidgets.QFrame.Plain
) )
# don't show the little "autoscale" A label.
cpw.hideButtons() cpw.hideButtons()
# XXX: gives us outline on backside of y-axis # XXX: gives us outline on backside of y-axis
@ -531,15 +524,27 @@ class LinkedSplits(QWidget):
# comes from ;) # comes from ;)
cpw.setXLink(self.chart) cpw.setXLink(self.chart)
# add to cross-hair's known plots add_label = False
self.cursor.add_plot(cpw) anchor_at = ('top', 'left')
# draw curve graphics # draw curve graphics
if style == 'bar': if style == 'bar':
cpw.draw_ohlc(name, array, array_key=array_key)
graphics, data_key = cpw.draw_ohlc(
name,
array,
array_key=array_key
)
self.cursor.contents_labels.add_label(
cpw,
'ohlc',
anchor_at=('top', 'left'),
update_func=ContentsLabel.update_from_ohlc,
)
elif style == 'line': elif style == 'line':
cpw.draw_curve( add_label = True
graphics, data_key = cpw.draw_curve(
name, name,
array, array,
array_key=array_key, array_key=array_key,
@ -547,7 +552,8 @@ class LinkedSplits(QWidget):
) )
elif style == 'step': elif style == 'step':
cpw.draw_curve( add_label = True
graphics, data_key = cpw.draw_curve(
name, name,
array, array,
array_key=array_key, array_key=array_key,
@ -569,6 +575,22 @@ class LinkedSplits(QWidget):
else: else:
assert style == 'bar', 'main chart must be OHLC' assert style == 'bar', 'main chart must be OHLC'
# add to cross-hair's known plots
# NOTE: add **AFTER** creating the underlying ``PlotItem``s
# since we require that global (linked charts wide) axes have
# been created!
self.cursor.add_plot(cpw)
if self.cursor and style != 'bar':
self.cursor.add_curve_cursor(cpw, graphics)
if add_label:
self.cursor.contents_labels.add_label(
cpw,
data_key,
anchor_at=anchor_at,
)
self.resize_sidepanes() self.resize_sidepanes()
return cpw return cpw
@ -587,6 +609,18 @@ class LinkedSplits(QWidget):
cpw.sidepane.setMinimumWidth(sp_w) cpw.sidepane.setMinimumWidth(sp_w)
cpw.sidepane.setMaximumWidth(sp_w) cpw.sidepane.setMaximumWidth(sp_w)
# import pydantic
# class ArrayScene(pydantic.BaseModel):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``ShmArray``s with high level processing routines mostly for
# graphics summary and display.
# '''
# arrays: dict[str, np.ndarray] = {}
# graphics: dict[str, pg.GraphicsObject] = {}
class ChartPlotWidget(pg.PlotWidget): class ChartPlotWidget(pg.PlotWidget):
''' '''
@ -611,6 +645,10 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: can take a ``background`` color setting - maybe there's # TODO: can take a ``background`` color setting - maybe there's
# a better one? # a better one?
def mk_vb(self, name: str) -> ChartView:
cv = ChartView(name)
cv.linkedsplits = self.linked
return cv
def __init__( def __init__(
self, self,
@ -639,17 +677,31 @@ class ChartPlotWidget(pg.PlotWidget):
self.view_color = view_color self.view_color = view_color
self.pen_color = pen_color self.pen_color = pen_color
# NOTE: must be set bfore calling ``.mk_vb()``
self.linked = linkedsplits
# source of our custom interactions
self.cv = cv = self.mk_vb(name)
super().__init__( super().__init__(
background=hcolor(view_color), background=hcolor(view_color),
viewBox=cv,
# parent=None, # parent=None,
# plotItem=None, # plotItem=None,
# antialias=True, # antialias=True,
**kwargs **kwargs
) )
# give viewbox as reference to chart
# allowing for kb controls and interactions on **this** widget
# (see our custom view mode in `._interactions.py`)
cv.chart = self
# ensure internal pi matches
assert self.cv is self.plotItem.vb
self.useOpenGL(use_open_gl) self.useOpenGL(use_open_gl)
self.name = name self.name = name
self.data_key = data_key self.data_key = data_key or name
self.linked = linkedsplits
# scene-local placeholder for book graphics # scene-local placeholder for book graphics
# sizing to avoid overlap with data contents # sizing to avoid overlap with data contents
@ -658,9 +710,10 @@ class ChartPlotWidget(pg.PlotWidget):
# self.setViewportMargins(0, 0, 0, 0) # self.setViewportMargins(0, 0, 0, 0)
# self._ohlc = array # readonly view of ohlc data # self._ohlc = array # readonly view of ohlc data
# TODO: move to Aggr above XD
# readonly view of data arrays # readonly view of data arrays
self._arrays = { self._arrays = {
'ohlc': array, self.data_key: array,
} }
self._graphics = {} # registry of underlying graphics self._graphics = {} # registry of underlying graphics
# registry of overlay curve names # registry of overlay curve names
@ -671,7 +724,6 @@ class ChartPlotWidget(pg.PlotWidget):
self._labels = {} # registry of underlying graphics self._labels = {} # registry of underlying graphics
self._ysticks = {} # registry of underlying graphics self._ysticks = {} # registry of underlying graphics
self._vb = self.plotItem.vb
self._static_yrange = static_yrange # for "known y-range style" self._static_yrange = static_yrange # for "known y-range style"
self._view_mode: str = 'follow' self._view_mode: str = 'follow'
@ -684,16 +736,9 @@ class ChartPlotWidget(pg.PlotWidget):
self.showGrid(x=False, y=True, alpha=0.3) self.showGrid(x=False, y=True, alpha=0.3)
self.default_view() self.default_view()
self.cv.enable_auto_yrange()
# Assign callback for rescaling y-axis automatically self.overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)
# based on data contents and ``ViewBox`` state.
# self.sigXRangeChanged.connect(self._set_yrange)
# for mouse wheel which doesn't seem to emit XRangeChanged
self._vb.sigRangeChangedManually.connect(self._set_yrange)
# for when the splitter(s) are resized
self._vb.sigResized.connect(self._set_yrange)
def resume_all_feeds(self): def resume_all_feeds(self):
for feed in self._feeds.values(): for feed in self._feeds.values():
@ -705,16 +750,16 @@ class ChartPlotWidget(pg.PlotWidget):
@property @property
def view(self) -> ChartView: def view(self) -> ChartView:
return self._vb return self.plotItem.vb
def focus(self) -> None: def focus(self) -> None:
self._vb.setFocus() self.view.setFocus()
def last_bar_in_view(self) -> int: def last_bar_in_view(self) -> int:
self._arrays['ohlc'][-1]['index'] self._arrays[self.name][-1]['index']
def is_valid_index(self, index: int) -> bool: def is_valid_index(self, index: int) -> bool:
return index >= 0 and index < self._arrays['ohlc'][-1]['index'] return index >= 0 and index < self._arrays[self.name][-1]['index']
def _set_xlimits( def _set_xlimits(
self, self,
@ -738,7 +783,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Return a range tuple for the bars present in view. """Return a range tuple for the bars present in view.
""" """
l, r = self.view_range() l, r = self.view_range()
array = self._arrays['ohlc'] array = self._arrays[self.name]
lbar = max(l, array[0]['index']) lbar = max(l, array[0]['index'])
rbar = min(r, array[-1]['index']) rbar = min(r, array[-1]['index'])
return l, lbar, rbar, r return l, lbar, rbar, r
@ -750,7 +795,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Set the view box to the "default" startup view of the scene. """Set the view box to the "default" startup view of the scene.
""" """
xlast = self._arrays['ohlc'][index]['index'] xlast = self._arrays[self.name][index]['index']
begin = xlast - _bars_to_left_in_follow_mode begin = xlast - _bars_to_left_in_follow_mode
end = xlast + _bars_from_right_in_follow_mode end = xlast + _bars_from_right_in_follow_mode
@ -758,12 +803,13 @@ class ChartPlotWidget(pg.PlotWidget):
if self._static_yrange == 'axis': if self._static_yrange == 'axis':
self._static_yrange = None self._static_yrange = None
self.plotItem.vb.setXRange( view = self.view
view.setXRange(
min=begin, min=begin,
max=end, max=end,
padding=0, padding=0,
) )
self._set_yrange() view._set_yrange()
def increment_view( def increment_view(
self, self,
@ -774,7 +820,7 @@ class ChartPlotWidget(pg.PlotWidget):
""" """
l, r = self.view_range() l, r = self.view_range()
self._vb.setXRange( self.view.setXRange(
min=l + 1, min=l + 1,
max=r + 1, max=r + 1,
@ -791,11 +837,11 @@ class ChartPlotWidget(pg.PlotWidget):
array_key: Optional[str] = None, array_key: Optional[str] = None,
) -> pg.GraphicsObject: ) -> (pg.GraphicsObject, str):
""" '''
Draw OHLC datums to chart. Draw OHLC datums to chart.
""" '''
graphics = BarItems( graphics = BarItems(
self.plotItem, self.plotItem,
pen_color=self.pen_color pen_color=self.pen_color
@ -810,17 +856,9 @@ class ChartPlotWidget(pg.PlotWidget):
data_key = array_key or name data_key = array_key or name
self._graphics[data_key] = graphics self._graphics[data_key] = graphics
self.linked.cursor.contents_labels.add_label(
self,
'ohlc',
anchor_at=('top', 'left'),
update_func=ContentsLabel.update_from_ohlc,
)
self._add_sticky(name, bg_color='davies') self._add_sticky(name, bg_color='davies')
return graphics return graphics, data_key
def draw_curve( def draw_curve(
self, self,
@ -830,16 +868,18 @@ class ChartPlotWidget(pg.PlotWidget):
array_key: Optional[str] = None, array_key: Optional[str] = None,
overlay: bool = False, overlay: bool = False,
separate_axes: bool = False,
color: Optional[str] = None, color: Optional[str] = None,
add_label: bool = True, add_label: bool = True,
**pdi_kwargs, **pdi_kwargs,
) -> pg.PlotDataItem: ) -> (pg.PlotDataItem, str):
"""Draw a "curve" (line plot graphics) for the provided data in '''
Draw a "curve" (line plot graphics) for the provided data in
the input array ``data``. the input array ``data``.
""" '''
color = color or self.pen_color or 'default_light' color = color or self.pen_color or 'default_light'
pdi_kwargs.update({ pdi_kwargs.update({
'color': color 'color': color
@ -847,10 +887,6 @@ class ChartPlotWidget(pg.PlotWidget):
data_key = array_key or name data_key = array_key or name
# pg internals for reference.
# curve = pg.PlotDataItem(
# curve = pg.PlotCurveItem(
# yah, we wrote our own B) # yah, we wrote our own B)
curve = FastAppendCurve( curve = FastAppendCurve(
y=data[data_key], y=data[data_key],
@ -881,34 +917,88 @@ class ChartPlotWidget(pg.PlotWidget):
# and is disastrous for performance. # and is disastrous for performance.
# curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache) # curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache)
self.addItem(curve)
# register curve graphics and backing array for name # register curve graphics and backing array for name
self._graphics[name] = curve self._graphics[name] = curve
self._arrays[data_key or name] = data self._arrays[data_key] = data
if overlay: if overlay:
anchor_at = ('bottom', 'left') # anchor_at = ('bottom', 'left')
self._overlays[name] = None self._overlays[name] = None
if separate_axes:
# Custom viewbox impl
cv = self.mk_vb(name)
def maxmin():
return self.maxmin(name=data_key)
# ensure view maxmin is computed from correct array
# cv._maxmin = partial(self.maxmin, name=data_key)
cv._maxmin = maxmin
cv.chart = self
# xaxis = DynamicDateAxis(
# orientation='bottom',
# linkedsplits=self.linked,
# )
yaxis = PriceAxis(
orientation='right',
linkedsplits=self.linked,
)
plotitem = pg.PlotItem(
parent=self.plotItem,
name=name,
enableMenu=False,
viewBox=cv,
axisItems={
# 'bottom': xaxis,
'right': yaxis,
},
default_axes=[],
)
# plotitem.setAxisItems(
# add_to_layout=False,
# axisItems={
# 'bottom': xaxis,
# 'right': yaxis,
# },
# )
# plotite.hideAxis('right')
# plotite.hideAxis('bottom')
plotitem.addItem(curve)
cv.enable_auto_yrange()
# config
# plotitem.setAutoVisible(y=True)
# plotitem.enableAutoRange(axis='y')
plotitem.hideButtons()
self.overlay.add_plotitem(
plotitem,
# only link x-axes,
link_axes=(0,),
)
else:
# this intnernally calls `PlotItem.addItem()` on the
# graphics object
self.addItem(curve)
else: else:
anchor_at = ('top', 'left') # this intnernally calls `PlotItem.addItem()` on the
# graphics object
self.addItem(curve)
# anchor_at = ('top', 'left')
# TODO: something instead of stickies for overlays # TODO: something instead of stickies for overlays
# (we need something that avoids clutter on x-axis). # (we need something that avoids clutter on x-axis).
self._add_sticky(name, bg_color=color) self._add_sticky(name, bg_color=color)
if self.linked.cursor: return curve, data_key
self.linked.cursor.add_curve_cursor(self, curve)
if add_label:
self.linked.cursor.contents_labels.add_label(
self,
data_key or name,
anchor_at=anchor_at
)
return curve
# TODO: make this a ctx mngr # TODO: make this a ctx mngr
def _add_sticky( def _add_sticky(
@ -949,7 +1039,7 @@ class ChartPlotWidget(pg.PlotWidget):
'''Update the named internal graphics from ``array``. '''Update the named internal graphics from ``array``.
''' '''
self._arrays['ohlc'] = array self._arrays[self.name] = array
graphics = self._graphics[graphics_name] graphics = self._graphics[graphics_name]
graphics.update_from_array(array, **kwargs) graphics.update_from_array(array, **kwargs)
return graphics return graphics
@ -970,7 +1060,7 @@ class ChartPlotWidget(pg.PlotWidget):
data_key = array_key or graphics_name data_key = array_key or graphics_name
if graphics_name not in self._overlays: if graphics_name not in self._overlays:
self._arrays['ohlc'] = array self._arrays[self.name] = array
else: else:
self._arrays[data_key] = array self._arrays[data_key] = array
@ -992,102 +1082,6 @@ class ChartPlotWidget(pg.PlotWidget):
return curve return curve
def _set_yrange(
self,
*,
yrange: Optional[tuple[float, float]] = None,
range_margin: float = 0.06,
bars_range: Optional[tuple[int, int, int, int]] = None,
# flag to prevent triggering sibling charts from the same linked
# set from recursion errors.
autoscale_linked_plots: bool = True,
) -> None:
'''Set the viewable y-range based on embedded data.
This adds auto-scaling like zoom on the scroll wheel such
that data always fits nicely inside the current view of the
data set.
'''
set_range = True
if self._static_yrange == 'axis':
set_range = False
elif self._static_yrange is not None:
ylow, yhigh = self._static_yrange
elif yrange is not None:
ylow, yhigh = yrange
else:
# Determine max, min y values in viewable x-range from data.
# Make sure min bars/datums on screen is adhered.
l, lbar, rbar, r = bars_range or self.bars_range()
if autoscale_linked_plots:
# avoid recursion by sibling plots
linked = self.linked
plots = list(linked.subplots.copy().values())
main = linked.chart
if main:
plots.append(main)
for chart in plots:
if chart and not chart._static_yrange:
chart._set_yrange(
bars_range=(l, lbar, rbar, r),
autoscale_linked_plots=False,
)
# TODO: logic to check if end of bars in view
# extra = view_len - _min_points_to_show
# begin = self._arrays['ohlc'][0]['index'] - extra
# # end = len(self._arrays['ohlc']) - 1 + extra
# end = self._arrays['ohlc'][-1]['index'] - 1 + extra
# bars_len = rbar - lbar
# log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
a = self._arrays['ohlc']
ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst + 1]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
return
if self.data_key != self.linked.symbol.key:
bars = bars[self.data_key]
ylow = np.nanmin(bars)
yhigh = np.nanmax(bars)
# print(f'{(ylow, yhigh)}')
else:
# just the std ohlc bars
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
if set_range:
# view margins: stay within a % of the "true range"
diff = yhigh - ylow
ylow = ylow - (diff * range_margin)
yhigh = yhigh + (diff * range_margin)
self.setLimits(
yMin=ylow,
yMax=yhigh,
)
self.setYRange(ylow, yhigh)
# def _label_h(self, yhigh: float, ylow: float) -> float: # def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms # # compute contents label "height" in view terms
# # to avoid having data "contents" overlap with them # # to avoid having data "contents" overlap with them
@ -1140,3 +1134,59 @@ class ChartPlotWidget(pg.PlotWidget):
return ohlc['index'][indexes][-1] return ohlc['index'][indexes][-1]
else: else:
return ohlc['index'][-1] return ohlc['index'][-1]
def maxmin(
self,
name: Optional[str] = None,
bars_range: Optional[tuple[int, int, int, int]] = None,
) -> tuple[float, float]:
'''
Return the max and min y-data values "in view".
If ``bars_range`` is provided use that range.
'''
l, lbar, rbar, r = bars_range or self.bars_range()
# TODO: logic to check if end of bars in view
# extra = view_len - _min_points_to_show
# begin = self._arrays['ohlc'][0]['index'] - extra
# # end = len(self._arrays['ohlc']) - 1 + extra
# end = self._arrays['ohlc'][-1]['index'] - 1 + extra
# bars_len = rbar - lbar
# log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
a = self._arrays[name or self.name]
ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst + 1]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
return
if (
self.data_key == self.linked.symbol.key
):
# ohlc sampled bars hi/lo lookup
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
else:
try:
view = bars[name or self.data_key]
except:
breakpoint()
# if self.data_key != 'volume':
# else:
# view = bars
ylow = np.nanmin(view)
yhigh = np.nanmax(view)
# print(f'{(ylow, yhigh)}')
return ylow, yhigh

View File

@ -276,7 +276,7 @@ class ContentsLabels:
) -> ContentsLabel: ) -> ContentsLabel:
label = ContentsLabel( label = ContentsLabel(
view=chart._vb, view=chart.view,
anchor_at=anchor_at, anchor_at=anchor_at,
) )
self._labels.append( self._labels.append(
@ -418,13 +418,16 @@ class Cursor(pg.GraphicsObject):
# keep x-axis right below main chart # keep x-axis right below main chart
plot_index = -1 if _xaxis_at == 'bottom' else 0 plot_index = -1 if _xaxis_at == 'bottom' else 0
self.xaxis_label = XAxisLabel( # ONLY create an x-axis label for the cursor
parent=self.plots[plot_index].getAxis('bottom'), # if this plot owns the 'bottom' axis.
opacity=_ch_label_opac, if 'bottom' in plot.plotItem.axes:
bg_color=self.label_color, self.xaxis_label = XAxisLabel(
) parent=self.plots[plot_index].getAxis('bottom'),
# place label off-screen during startup opacity=_ch_label_opac,
self.xaxis_label.setPos(self.plots[0].mapFromView(QPointF(0, 0))) bg_color=self.label_color,
)
# place label off-screen during startup
self.xaxis_label.setPos(self.plots[0].mapFromView(QPointF(0, 0)))
def add_curve_cursor( def add_curve_cursor(
self, self,
@ -435,7 +438,7 @@ class Cursor(pg.GraphicsObject):
# the current sample under the mouse # the current sample under the mouse
cursor = LineDot( cursor = LineDot(
curve, curve,
index=plot._arrays['ohlc'][-1]['index'], index=plot._arrays[plot.name][-1]['index'],
plot=plot plot=plot
) )
plot.addItem(cursor) plot.addItem(cursor)
@ -525,17 +528,18 @@ class Cursor(pg.GraphicsObject):
for cursor in opts.get('cursors', ()): for cursor in opts.get('cursors', ()):
cursor.setIndex(ix) cursor.setIndex(ix)
# update the label on the bottom of the crosshair # update the label on the bottom of the crosshair
self.xaxis_label.update_label( if 'bottom' in plot.plotItem.axes:
self.xaxis_label.update_label(
# XXX: requires: # XXX: requires:
# https://github.com/pyqtgraph/pyqtgraph/pull/1418 # https://github.com/pyqtgraph/pyqtgraph/pull/1418
# otherwise gobbles tons of CPU.. # otherwise gobbles tons of CPU..
# map back to abs (label-local) coordinates # map back to abs (label-local) coordinates
abs_pos=plot.mapFromView(QPointF(ix + line_offset, iy)), abs_pos=plot.mapFromView(QPointF(ix + line_offset, iy)),
value=ix, value=ix,
) )
self._datum_xy = ix, iy self._datum_xy = ix, iy

View File

@ -117,7 +117,7 @@ def update_fsp_chart(
array, array,
array_key=array_key or graphics_name, array_key=array_key or graphics_name,
) )
chart._set_yrange() chart.cv._set_yrange()
# XXX: re: ``array_key``: fsp func names must be unique meaning we # XXX: re: ``array_key``: fsp func names must be unique meaning we
# can't have duplicates of the underlying data even if multiple # can't have duplicates of the underlying data even if multiple
@ -155,7 +155,7 @@ def chart_maxmin(
# https://arxiv.org/abs/cs/0610046 # https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin # https://github.com/lemire/pythonmaxmin
array = chart._arrays['ohlc'] array = chart._arrays[chart.name]
ifirst = array[0]['index'] ifirst = array[0]['index']
last_bars_range = chart.bars_range() last_bars_range = chart.bars_range()
@ -212,6 +212,7 @@ async def update_chart_from_quotes(
if vlm_chart: if vlm_chart:
vlm_sticky = vlm_chart._ysticks['volume'] vlm_sticky = vlm_chart._ysticks['volume']
vlm_view = vlm_chart.view
maxmin = partial(chart_maxmin, chart, vlm_chart) maxmin = partial(chart_maxmin, chart, vlm_chart)
@ -248,6 +249,7 @@ async def update_chart_from_quotes(
tick_margin = 3 * tick_size tick_margin = 3 * tick_size
chart.show() chart.show()
view = chart.view
last_quote = time.time() last_quote = time.time()
async for quotes in stream: async for quotes in stream:
@ -295,8 +297,10 @@ async def update_chart_from_quotes(
mx_vlm_in_view != last_mx_vlm or mx_vlm_in_view != last_mx_vlm or
mx_vlm_in_view > last_mx_vlm mx_vlm_in_view > last_mx_vlm
): ):
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) vlm_view._set_yrange(
yrange=(0, mx_vlm_in_view * 1.375)
)
last_mx_vlm = mx_vlm_in_view last_mx_vlm = mx_vlm_in_view
ticks_frame = quote.get('ticks', ()) ticks_frame = quote.get('ticks', ())
@ -412,9 +416,12 @@ async def update_chart_from_quotes(
l1.bid_label.update_fields({'level': price, 'size': size}) l1.bid_label.update_fields({'level': price, 'size': size})
# check for y-range re-size # check for y-range re-size
if (mx > last_mx) or (mn < last_mn): if (
# print(f'new y range: {(mn, mx)}') (mx > last_mx) or (mn < last_mn)
chart._set_yrange( and not chart._static_yrange == 'axis'
):
print(f'new y range: {(mn, mx)}')
view._set_yrange(
yrange=(mn, mx), yrange=(mn, mx),
# TODO: we should probably scale # TODO: we should probably scale
# the view margin based on the size # the view margin based on the size
@ -436,6 +443,7 @@ async def update_chart_from_quotes(
name, name,
array_key=name, array_key=name,
) )
subchart.cv._set_yrange()
# TODO: all overlays on all subplots.. # TODO: all overlays on all subplots..
@ -447,6 +455,7 @@ async def update_chart_from_quotes(
curve_name, curve_name,
array_key=curve_name, array_key=curve_name,
) )
# chart._set_yrange()
def maybe_mk_fsp_shm( def maybe_mk_fsp_shm(
@ -790,7 +799,7 @@ async def update_chart_from_fsp(
level_line(chart, 70, orient_v='bottom') level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top') level_line(chart, 80, orient_v='top')
chart._set_yrange() chart.cv._set_yrange()
done() # status updates done() # status updates
profiler(f'fsp:{func_name} starting update loop') profiler(f'fsp:{func_name} starting update loop')
@ -981,7 +990,7 @@ async def maybe_open_vlm_display(
) )
# size view to data once at outset # size view to data once at outset
chart._set_yrange() chart.cv._set_yrange()
yield chart yield chart
@ -1070,7 +1079,7 @@ async def display_symbol_data(
) )
# size view to data once at outset # size view to data once at outset
chart._set_yrange() chart.cv._set_yrange()
# TODO: a data view api that makes this less shit # TODO: a data view api that makes this less shit
chart._shm = ohlcv chart._shm = ohlcv

View File

@ -342,7 +342,8 @@ class SelectRect(QtGui.QGraphicsRectItem):
ixmn, ixmx = round(xmn), round(xmx) ixmn, ixmx = round(xmn), round(xmx)
nbars = ixmx - ixmn + 1 nbars = ixmx - ixmn + 1
data = self._chart._arrays['ohlc'][ixmn:ixmx] chart = self._chart
data = chart._arrays[chart.name][ixmn:ixmx]
if len(data): if len(data):
std = data['close'].std() std = data['close'].std()

View File

@ -18,6 +18,7 @@
Chart view box primitives Chart view box primitives
""" """
from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import time import time
from typing import Optional, Callable from typing import Optional, Callable
@ -155,6 +156,7 @@ async def handle_viewmode_kb_inputs(
# View modes # View modes
if key == Qt.Key_R: if key == Qt.Key_R:
# TODO: set this for all subplots
# edge triggered default view activation # edge triggered default view activation
view.chart.default_view() view.chart.default_view()
@ -332,12 +334,23 @@ class ChartView(ViewBox):
''' '''
mode_name: str = 'view' mode_name: str = 'view'
# "relay events" for making overlaid views work.
# NOTE: these MUST be defined here (and can't be monkey patched
# on later) due to signal construction requiring refs to be
# in place during the run of meta-class machinery.
mouseDragEventRelay = QtCore.Signal(object, object, object)
wheelEventRelay = QtCore.Signal(object, object, object)
event_relay_source: 'Optional[ViewBox]' = None
relays: dict[str, Signal] = {}
def __init__( def __init__(
self, self,
name: str, name: str,
parent: pg.PlotItem = None, parent: pg.PlotItem = None,
static_yrange: Optional[tuple[float, float]] = None,
**kwargs, **kwargs,
): ):
@ -350,8 +363,15 @@ class ChartView(ViewBox):
**kwargs **kwargs
) )
# for "known y-range style"
self._static_yrange = static_yrange
self._maxmin = None
# disable vertical scrolling # disable vertical scrolling
self.setMouseEnabled(x=True, y=False) self.setMouseEnabled(
x=True,
y=True,
)
self.linkedsplits = None self.linkedsplits = None
self._chart: 'ChartPlotWidget' = None # noqa self._chart: 'ChartPlotWidget' = None # noqa
@ -398,8 +418,15 @@ class ChartView(ViewBox):
def chart(self, chart: 'ChartPlotWidget') -> None: # type: ignore # noqa def chart(self, chart: 'ChartPlotWidget') -> None: # type: ignore # noqa
self._chart = chart self._chart = chart
self.select_box.chart = chart self.select_box.chart = chart
if self._maxmin is None:
self._maxmin = chart.maxmin
def wheelEvent(self, ev, axis=None): def wheelEvent(
self,
ev,
axis=None,
relayed_from: ChartView = None,
):
'''Override "center-point" location for scrolling. '''Override "center-point" location for scrolling.
This is an override of the ``ViewBox`` method simply changing This is an override of the ``ViewBox`` method simply changing
@ -424,7 +451,7 @@ class ChartView(ViewBox):
log.debug("Max zoom bruh...") log.debug("Max zoom bruh...")
return return
if ev.delta() < 0 and vl >= len(chart._arrays['ohlc']) + 666: if ev.delta() < 0 and vl >= len(chart._arrays[chart.name]) + 666:
log.debug("Min zoom bruh...") log.debug("Min zoom bruh...")
return return
@ -432,67 +459,89 @@ class ChartView(ViewBox):
s = 1.015 ** (ev.delta() * -1 / 20) # self.state['wheelScaleFactor']) s = 1.015 ** (ev.delta() * -1 / 20) # self.state['wheelScaleFactor'])
s = [(None if m is False else s) for m in mask] s = [(None if m is False else s) for m in mask]
# center = pg.Point( if (
# fn.invertQTransform(self.childGroup.transform()).map(ev.pos()) # zoom happened on axis
# ) axis == 1
# XXX: scroll "around" the right most element in the view # if already in axis zoom mode then keep it
# which stays "pinned" in place. or self.chart._static_yrange == 'axis'
):
self.chart._static_yrange = 'axis'
self.setLimits(yMin=None, yMax=None)
# furthest_right_coord = self.boundingRect().topRight() # print(scale_y)
# pos = ev.pos()
# lastPos = ev.lastPos()
# dif = pos - lastPos
# dif = dif * -1
center = Point(fn.invertQTransform(self.childGroup.transform()).map(ev.pos()))
# scale_y = 1.3 ** (center.y() * -1 / 20)
self.scaleBy(s, center)
# yaxis = pg.Point( else:
# fn.invertQTransform(
# self.childGroup.transform()
# ).map(furthest_right_coord)
# )
# This seems like the most "intuitive option, a hybrid of # center = pg.Point(
# tws and tv styles # fn.invertQTransform(self.childGroup.transform()).map(ev.pos())
last_bar = pg.Point(int(rbar)) + 1 # )
ryaxis = chart.getAxis('right') # XXX: scroll "around" the right most element in the view
r_axis_x = ryaxis.pos().x() # which stays "pinned" in place.
end_of_l1 = pg.Point( # furthest_right_coord = self.boundingRect().topRight()
round(
chart._vb.mapToView( # yaxis = pg.Point(
pg.Point(r_axis_x - chart._max_l1_line_len) # fn.invertQTransform(
# QPointF(chart._max_l1_line_len, 0) # self.childGroup.transform()
).x() # ).map(furthest_right_coord)
# )
# This seems like the most "intuitive option, a hybrid of
# tws and tv styles
last_bar = pg.Point(int(rbar)) + 1
ryaxis = chart.getAxis('right')
r_axis_x = ryaxis.pos().x()
end_of_l1 = pg.Point(
round(
chart.cv.mapToView(
pg.Point(r_axis_x - chart._max_l1_line_len)
# QPointF(chart._max_l1_line_len, 0)
).x()
)
) # .x()
# self.state['viewRange'][0][1] = end_of_l1
# focal = pg.Point((last_bar.x() + end_of_l1)/2)
focal = min(
last_bar,
end_of_l1,
key=lambda p: p.x()
) )
) # .x() # focal = pg.Point(last_bar.x() + end_of_l1)
# self.state['viewRange'][0][1] = end_of_l1 self._resetTarget()
self.scaleBy(s, focal)
# focal = pg.Point((last_bar.x() + end_of_l1)/2) self.sigRangeChangedManually.emit(mask)
ev.accept()
focal = min(
last_bar,
end_of_l1,
key=lambda p: p.x()
)
# focal = pg.Point(last_bar.x() + end_of_l1)
self._resetTarget()
self.scaleBy(s, focal)
ev.accept()
self.sigRangeChangedManually.emit(mask)
def mouseDragEvent( def mouseDragEvent(
self, self,
ev, ev,
axis: Optional[int] = None, axis: Optional[int] = None,
relayed_from: ChartView = None,
) -> None: ) -> None:
# if axis is specified, event will only affect that axis.
ev.accept() # we accept all buttons
button = ev.button()
pos = ev.pos() pos = ev.pos()
lastPos = ev.lastPos() lastPos = ev.lastPos()
dif = pos - lastPos dif = pos - lastPos
dif = dif * -1 dif = dif * -1
# NOTE: if axis is specified, event will only affect that axis.
button = ev.button()
# Ignore axes if mouse is disabled # Ignore axes if mouse is disabled
mouseEnabled = np.array(self.state['mouseEnabled'], dtype=np.float) mouseEnabled = np.array(self.state['mouseEnabled'], dtype=np.float)
mask = mouseEnabled.copy() mask = mouseEnabled.copy()
@ -500,21 +549,28 @@ class ChartView(ViewBox):
mask[1-axis] = 0.0 mask[1-axis] = 0.0
# Scale or translate based on mouse button # Scale or translate based on mouse button
if button & (QtCore.Qt.LeftButton | QtCore.Qt.MidButton): if button & (
QtCore.Qt.LeftButton | QtCore.Qt.MidButton
):
# zoom y-axis ONLY when click-n-drag on it # zoom y-axis ONLY when click-n-drag on it
if axis == 1: # if axis == 1:
# set a static y range special value on chart widget to # # set a static y range special value on chart widget to
# prevent sizing to data in view. # # prevent sizing to data in view.
self.chart._static_yrange = 'axis' # self.chart._static_yrange = 'axis'
scale_y = 1.3 ** (dif.y() * -1 / 20) # scale_y = 1.3 ** (dif.y() * -1 / 20)
self.setLimits(yMin=None, yMax=None) # self.setLimits(yMin=None, yMax=None)
# print(scale_y) # # print(scale_y)
self.scaleBy((0, scale_y)) # self.scaleBy((0, scale_y))
if self.state['mouseMode'] == ViewBox.RectMode: # SELECTION MODE
if (
self.state['mouseMode'] == ViewBox.RectMode
and axis is None
):
# XXX: WHY
ev.accept()
down_pos = ev.buttonDownPos() down_pos = ev.buttonDownPos()
@ -523,23 +579,36 @@ class ChartView(ViewBox):
self.select_box.mouse_drag_released(down_pos, pos) self.select_box.mouse_drag_released(down_pos, pos)
# ax = QtCore.QRectF(down_pos, pos) ax = QtCore.QRectF(down_pos, pos)
# ax = self.childGroup.mapRectFromParent(ax) ax = self.childGroup.mapRectFromParent(ax)
# print(ax)
# this is the zoom transform cmd # this is the zoom transform cmd
# self.showAxRect(ax) self.showAxRect(ax)
# axis history tracking
self.axHistoryPointer += 1
self.axHistory = self.axHistory[
:self.axHistoryPointer] + [ax]
# self.axHistoryPointer += 1
# self.axHistory = self.axHistory[
# :self.axHistoryPointer] + [ax]
else: else:
print('drag finish?')
self.select_box.set_pos(down_pos, pos) self.select_box.set_pos(down_pos, pos)
# update shape of scale box # update shape of scale box
# self.updateScaleBox(ev.buttonDownPos(), ev.pos()) # self.updateScaleBox(ev.buttonDownPos(), ev.pos())
self.updateScaleBox(
down_pos,
ev.pos(),
)
# PANNING MODE
else: else:
# default bevavior: click to pan view # XXX: WHY
ev.accept()
if axis == 1:
self.chart._static_yrange = 'axis'
tr = self.childGroup.transform() tr = self.childGroup.transform()
tr = fn.invertQTransform(tr) tr = fn.invertQTransform(tr)
tr = tr.map(dif*mask) - tr.map(Point(0, 0)) tr = tr.map(dif*mask) - tr.map(Point(0, 0))
@ -554,10 +623,9 @@ class ChartView(ViewBox):
self.sigRangeChangedManually.emit(self.state['mouseEnabled']) self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif button & QtCore.Qt.RightButton: elif button & QtCore.Qt.RightButton:
# right click zoom to center behaviour
if self.state['aspectLocked'] is not False: if self.state['aspectLocked'] is not False:
mask[0] = 0 mask[0] = 0
@ -577,6 +645,9 @@ class ChartView(ViewBox):
self.scaleBy(x=x, y=y, center=center) self.scaleBy(x=x, y=y, center=center)
self.sigRangeChangedManually.emit(self.state['mouseEnabled']) self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
# XXX: WHY
ev.accept()
# def mouseClickEvent(self, event: QtCore.QEvent) -> None: # def mouseClickEvent(self, event: QtCore.QEvent) -> None:
# '''This routine is rerouted to an async handler. # '''This routine is rerouted to an async handler.
# ''' # '''
@ -591,3 +662,107 @@ class ChartView(ViewBox):
'''This routine is rerouted to an async handler. '''This routine is rerouted to an async handler.
''' '''
pass pass
def _set_yrange(
self,
*,
yrange: Optional[tuple[float, float]] = None,
range_margin: float = 0.06,
bars_range: Optional[tuple[int, int, int, int]] = None,
# flag to prevent triggering sibling charts from the same linked
# set from recursion errors.
autoscale_linked_plots: bool = True,
autoscale_overlays: bool = False,
) -> None:
'''
Set the viewable y-range based on embedded data.
This adds auto-scaling like zoom on the scroll wheel such
that data always fits nicely inside the current view of the
data set.
'''
set_range = True
chart = self._chart
# view has been set in 'axis' mode
# meaning it can be panned and zoomed
# arbitrarily on the y-axis:
# - disable autoranging
# - remove any y range limits
if chart._static_yrange == 'axis':
set_range = False
self.setLimits(yMin=None, yMax=None)
# static y-range has been set likely by
# a specialized FSP configuration.
elif chart._static_yrange is not None:
ylow, yhigh = chart._static_yrange
# range passed in by caller, usually a
# maxmin detection algos inside the
# display loop for re-draw efficiency.
elif yrange is not None:
ylow, yhigh = yrange
# calculate max, min y values in viewable x-range from data.
# Make sure min bars/datums on screen is adhered.
else:
br = bars_range or chart.bars_range()
# TODO: maybe should be a method on the
# chart widget/item?
if autoscale_linked_plots:
# avoid recursion by sibling plots
linked = self.linkedsplits
plots = list(linked.subplots.copy().values())
main = linked.chart
if main:
plots.append(main)
for chart in plots:
if chart and not chart._static_yrange:
chart.cv._set_yrange(
bars_range=br,
autoscale_linked_plots=False,
)
if set_range:
ylow, yhigh = self._maxmin()
# view margins: stay within a % of the "true range"
diff = yhigh - ylow
ylow = ylow - (diff * range_margin)
yhigh = yhigh + (diff * range_margin)
# XXX: this often needs to be unset
# to get different view modes to operate
# correctly!
self.setLimits(
yMin=ylow,
yMax=yhigh,
)
self.setYRange(ylow, yhigh)
def enable_auto_yrange(
vb: ChartView,
) -> None:
'''
Assign callback for rescaling y-axis automatically
based on data contents and ``ViewBox`` state.
'''
vb.sigXRangeChanged.connect(vb._set_yrange)
# mouse wheel doesn't emit XRangeChanged
vb.sigRangeChangedManually.connect(vb._set_yrange)
vb.sigResized.connect(vb._set_yrange) # splitter(s) resizing
def disable_auto_yrange(
self,
) -> None:
self._chart._static_yrange = 'axis'

View File

@ -0,0 +1,631 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Charting overlay helpers.
'''
from typing import Callable, Optional
from pyqtgraph.Qt.QtCore import (
# QObject,
# Signal,
Qt,
# QEvent,
)
from pyqtgraph.graphicsItems.AxisItem import AxisItem
from pyqtgraph.graphicsItems.ViewBox import ViewBox
from pyqtgraph.graphicsItems.GraphicsWidget import GraphicsWidget
from pyqtgraph.graphicsItems.PlotItem.PlotItem import PlotItem
from pyqtgraph.Qt.QtCore import QObject, Signal, QEvent
from pyqtgraph.Qt.QtWidgets import QGraphicsGridLayout, QGraphicsLinearLayout
from ._interaction import ChartView
__all__ = ["PlotItemOverlay"]
# Define the layout "position" indices as to be passed
# to a ``QtWidgets.QGraphicsGridlayout.addItem()`` call:
# https://doc.qt.io/qt-5/qgraphicsgridlayout.html#addItem
# This was pulled from the internals of ``PlotItem.setAxisItem()``.
_axes_layout_indices: dict[str] = {
# row incremented axes
'top': (1, 1),
'bottom': (3, 1),
# view is @ (2, 1)
# column incremented axes
'left': (2, 0),
'right': (2, 2),
}
# NOTE: To clarify this indexing, ``PlotItem.__init__()`` makes a grid
# with dimensions 4x3 and puts the ``ViewBox`` at postiion (2, 1) (aka
# row=2, col=1) in the grid layout since row (0, 1) is reserved for
# a title label and row 1 is for any potential "top" axis. Column 1
# is the "middle" (since 3 columns) and is where the plot/vb is placed.
class ComposedGridLayout:
'''
List-like interface to managing a sequence of overlayed
``PlotItem``s in the form:
| | | | | top0 | | | | |
| | | | | top1 | | | | |
| | | | | ... | | | | |
| | | | | topN | | | | |
| lN | ... | l1 | l0 | ViewBox | r0 | r1 | ... | rN |
| | | | | bottom0 | | | | |
| | | | | bottom1 | | | | |
| | | | | ... | | | | |
| | | | | bottomN | | | | |
Where the index ``i`` in the sequence specifies the index
``<axis_name>i`` in the layout.
The ``item: PlotItem`` passed to the constructor's grid layout is
used verbatim as the "main plot" who's view box is give precedence
for input handling. The main plot's axes are removed from it's
layout and placed in the surrounding exterior layouts to allow for
re-ordering if desired.
'''
def __init__(
self,
item: PlotItem,
grid: QGraphicsGridLayout,
reverse: bool = False, # insert items to the "center"
) -> None:
self.items: list[PlotItem] = []
# self.grid = grid
self.reverse = reverse
# TODO: use a ``bidict`` here?
self._pi2axes: dict[
int,
dict[str, AxisItem],
] = {}
self._axes2pi: dict[
AxisItem,
dict[str, PlotItem],
] = {}
# TODO: better name?
# construct surrounding layouts for placing outer axes and
# their legends and title labels.
self.sides: dict[
str,
tuple[QGraphicsLinearLayout, list[AxisItem]]
] = {}
for name, pos in _axes_layout_indices.items():
layout = QGraphicsLinearLayout()
self.sides[name] = (layout, [])
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
if name in ('top', 'bottom'):
orient = Qt.Vertical
elif name in ('left', 'right'):
orient = Qt.Horizontal
layout.setOrientation(orient)
self.insert(0, item)
# insert surrounding linear layouts into the parent pi's layout
# such that additional axes can be appended arbitrarily without
# having to expand or resize the parent's grid layout.
for name, (linlayout, axes) in self.sides.items():
# TODO: do we need this?
# axis should have been removed during insert above
index = _axes_layout_indices[name]
axis = item.layout.itemAt(*index)
if axis and axis.isVisible():
assert linlayout.itemAt(0) is axis
# item.layout.removeItem(axis)
item.layout.addItem(linlayout, *index)
layout = item.layout.itemAt(*index)
assert layout is linlayout
def _register_item(
self,
index: int,
plotitem: PlotItem,
) -> None:
for name, axis_info in plotitem.axes.items():
axis = axis_info['item']
# register this plot's (maybe re-placed) axes for lookup.
self._pi2axes.setdefault(index, {})[name] = axis
self._axes2pi.setdefault(index, {})[name] = plotitem
# enter plot into list for index tracking
self.items.insert(index, plotitem)
def insert(
self,
index: int,
plotitem: PlotItem,
) -> (int, int):
'''
Place item at index by inserting all axes into the grid
at list-order appropriate position.
'''
if index < 0:
raise ValueError('`insert()` only supports an index >= 0')
# add plot's axes in sequence to the embedded linear layouts
# for each "side" thus avoiding graphics collisions.
for name, axis_info in plotitem.axes.copy().items():
linlayout, axes = self.sides[name]
axis = axis_info['item']
if axis in axes:
# TODO: re-order using ``.pop()`` ?
ValueError(f'{axis} is already in {name} layout!?')
# linking sanity
axis_view = axis.linkedView()
assert axis_view is plotitem.vb
if (
not axis.isVisible()
# XXX: we never skip moving the axes for the *first*
# plotitem inserted (even if not shown) since we need to
# move all the hidden axes into linear sub-layouts for
# that "central" plot in the overlay. Also if we don't
# do it there's weird geomoetry calc offsets that make
# view coords slightly off somehow .. smh
and not len(self.items) == 0
):
continue
# XXX: Remove old axis? No, turns out we don't need this?
# DON'T unlink it since we the original ``ViewBox``
# to still drive it B)
# popped = plotitem.removeAxis(name, unlink=False)
# assert axis is popped
# invert insert index for layouts which are
# not-left-to-right, top-to-bottom insert oriented
if name in ('top', 'left'):
index = min(len(axes) - index, 0)
assert index >= 0
linlayout.insertItem(index, axis)
axes.insert(index, axis)
self._register_item(index, plotitem)
return index
def append(
self,
item: PlotItem,
) -> (int, int):
'''
Append item's axes at indexes which put its axes "outside"
previously overlayed entries.
'''
# for left and bottom axes we have to first remove
# items and re-insert to maintain a list-order.
return self.insert(len(self.items), item)
def get_axis(
self,
plot: PlotItem,
name: str,
) -> AxisItem:
'''
Retrieve the named axis for overlayed ``plot``.
'''
index = self.items.index(plot)
return self._pi2axes[index][name]
def pop(
self,
item: PlotItem,
) -> PlotItem:
'''
Remove item and restack all axes in list-order.
'''
raise NotImplementedError
# Unimplemented features TODO:
# - 'A' (autobtn) should relay to all views
# - context menu single handler + relay?
# - layout unwind and re-pack for 'left' and 'top' axes
# - add labels to layout if detected in source ``PlotItem``
# UX nice-to-have TODO:
# - optional "focussed" view box support for view boxes
# that have custom input handlers (eg. you might want to
# scale the view to some "focussed" data view and have overlayed
# viewboxes only respond to relayed events.)
# - figure out how to deal with menu raise events for multi-viewboxes.
# (we might want to add a different menu which specs the name of the
# view box currently being handled?
# - allow selection of a particular view box by interacting with its
# axis?
# TODO: we might want to enabled some kind of manual flag to disable
# this method wrapping during type creation? As example a user could
# definitively decide **not** to enable broadcasting support by
# setting something like ``ViewBox.disable_relays = True``?
def mk_relay_method(
signame: str,
slot: Callable[
[ViewBox,
'QEvent',
Optional[AxisItem]],
None,
],
) -> Callable[
[
ViewBox,
# lol, there isn't really a generic type thanks
# to the rewrite of Qt's event system XD
'QEvent',
'Optional[AxisItem]',
'Optional[ViewBox]', # the ``relayed_from`` arg we provide
],
None,
]:
def maybe_broadcast(
vb: 'ViewBox',
ev: 'QEvent',
axis: 'Optional[int]' = None,
relayed_from: 'ViewBox' = None,
) -> None:
'''
(soon to be) Decorator which makes an event handler
"broadcastable" to overlayed ``GraphicsWidget``s.
Adds relay signals based on the decorated handler's name
and conducts a signal broadcast of the relay signal if there
are consumers registered.
'''
# When no relay source has been set just bypass all
# the broadcast machinery.
if vb.event_relay_source is None:
ev.accept()
return slot(
vb,
ev,
axis=axis,
)
if relayed_from:
assert axis is None
# this is a relayed event and should be ignored (so it does not
# halt/short circuit the graphicscene loop). Further the
# surrounding handler for this signal must be allowed to execute
# and get processed by **this consumer**.
print(f'{vb.name} rx relayed from {relayed_from.name}')
ev.ignore()
return slot(
vb,
ev,
axis=axis,
)
if axis is not None:
print(f'{vb.name} handling axis event:\n{str(ev)}')
ev.accept()
return slot(
vb,
ev,
axis=axis,
)
elif (
relayed_from is None
and vb.event_relay_source is vb # we are the broadcaster
and axis is None
):
# Broadcast case: this is a source event which will be
# relayed to attached consumers and accepted after all
# consumers complete their own handling followed by this
# routine's processing. Sequence is,
# - pre-relay to all consumers *first* - ``.emit()`` blocks
# until all downstream relay handlers have run.
# - run the source handler for **this** event and accept
# the event
# Access the "bound signal" that is created
# on the widget type as part of instantiation.
signal = getattr(vb, signame)
# print(f'{vb.name} emitting {signame}')
# TODO/NOTE: we could also just bypass a "relay" signal
# entirely and instead call the handlers manually in
# a loop? This probably is a lot simpler and also doesn't
# have any downside, and allows not touching target widget
# internals.
signal.emit(
ev,
axis,
# passing this demarks a broadcasted/relayed event
vb,
)
# accept event so no more relays are fired.
ev.accept()
# call underlying wrapped method with an extra
# ``relayed_from`` value to denote that this is a relayed
# event handling case.
return slot(
vb,
ev,
axis=axis,
)
return maybe_broadcast
# XXX: :( can't define signals **after** class compile time
# so this is not really useful.
# def mk_relay_signal(
# func,
# name: str = None,
# ) -> Signal:
# (
# args,
# varargs,
# varkw,
# defaults,
# kwonlyargs,
# kwonlydefaults,
# annotations
# ) = inspect.getfullargspec(func)
# # XXX: generate a relay signal with 1 extra
# # argument for a ``relayed_from`` kwarg. Since
# # ``'self'`` is already ignored by signals we just need
# # to count the arguments since we're adding only 1 (and
# # ``args`` will capture that).
# numargs = len(args + list(defaults))
# signal = Signal(*tuple(numargs * [object]))
# signame = name or func.__name__ + 'Relay'
# return signame, signal
def enable_relays(
widget: GraphicsWidget,
handler_names: list[str],
) -> list[Signal]:
'''
Method override helper which enables relay of a particular
``Signal`` from some chosen broadcaster widget to a set of
consumer widgets which should operate their event handlers normally
but instead of signals "relayed" from the broadcaster.
Mostly useful for overlaying widgets that handle user input
that you want to overlay graphically. The target ``widget`` type must
define ``QtCore.Signal``s each with a `'Relay'` suffix for each
name provided in ``handler_names: list[str]``.
'''
signals = []
for name in handler_names:
handler = getattr(widget, name)
signame = name + 'Relay'
# ensure the target widget defines a relay signal
relay = getattr(widget, signame)
widget.relays[signame] = name
signals.append(relay)
method = mk_relay_method(signame, handler)
setattr(widget, name, method)
return signals
enable_relays(
ChartView,
['wheelEvent', 'mouseDragEvent']
)
class PlotItemOverlay:
'''
A composite for managing overlaid ``PlotItem`` instances such that
you can make multiple graphics appear on the same graph with
separate (non-colliding) axes apply ``ViewBox`` signal broadcasting
such that all overlaid items respond to input simultaneously.
'''
def __init__(
self,
root_plotitem: PlotItem
) -> None:
self.root_plotitem: PlotItem = root_plotitem
vb = root_plotitem.vb
vb.event_relay_source = vb # TODO: maybe change name?
vb.setZValue(1000) # XXX: critical for scene layering/relaying
self.overlays: list[PlotItem] = []
from piker.ui._overlay import ComposedGridLayout
self.layout = ComposedGridLayout(
root_plotitem,
root_plotitem.layout,
)
self._relays: dict[str, Signal] = {}
def add_plotitem(
self,
plotitem: PlotItem,
index: Optional[int] = None,
# TODO: we could also put the ``ViewBox.XAxis``
# style enum here?
# (0,), # link x
# (1,), # link y
# (0, 1), # link both
link_axes: tuple[int] = (),
) -> None:
index = index or 0
root = self.root_plotitem
# layout: QGraphicsGridLayout = root.layout
self.overlays.insert(index, plotitem)
vb: ViewBox = plotitem.vb
# mark this consumer overlay as ready to expect relayed events
# from the root plotitem.
vb.event_relay_source = root.vb
# TODO: some sane way to allow menu event broadcast XD
# vb.setMenuEnabled(False)
# TODO: inside the `maybe_broadcast()` (soon to be) decorator
# we need have checks that consumers have been attached to
# these relay signals.
if link_axes != (0, 1):
# wire up relay signals
for relay_signal_name, handler_name in vb.relays.items():
# print(handler_name)
# XXX: Signal class attrs are bound after instantiation
# of the defining type, so we need to access that bound
# version here.
signal = getattr(root.vb, relay_signal_name)
handler = getattr(vb, handler_name)
signal.connect(handler)
# link dim-axes to root if requested by user.
# TODO: solve more-then-wanted scaled panning on click drag
# which seems to be due to broadcast. So we probably need to
# disable broadcast when axes are linked in a particular
# dimension?
for dim in link_axes:
# link x and y axes to new view box such that the top level
# viewbox propagates to the root (and whatever other
# plotitem overlays that have been added).
vb.linkView(dim, root.vb)
# make overlaid viewbox impossible to focus since the top
# level should handle all input and relay to overlays.
# NOTE: this was solved with the `setZValue()` above!
# TODO: we will probably want to add a "focus" api such that
# a new "top level" ``PlotItem`` can be selected dynamically
# (and presumably the axes dynamically sorted to match).
vb.setFlag(
vb.GraphicsItemFlag.ItemIsFocusable,
False
)
vb.setFocusPolicy(Qt.NoFocus)
# append-compose into the layout all axes from this plot
self.layout.insert(index, plotitem)
plotitem.setGeometry(root.vb.sceneBoundingRect())
def size_to_viewbox(vb: 'ViewBox'):
plotitem.setGeometry(vb.sceneBoundingRect())
root.vb.sigResized.connect(size_to_viewbox)
# ensure the overlayed view is redrawn on each cycle
root.scene().sigPrepareForPaint.connect(vb.prepareForPaint)
# focus state sanity
vb.clearFocus()
assert not vb.focusWidget()
root.vb.setFocus()
assert root.vb.focusWidget()
# XXX: do we need this? Why would you build then destroy?
def remove_plotitem(self, plotItem: PlotItem) -> None:
'''
Remove this ``PlotItem`` from the overlayed set making not shown
and unable to accept input.
'''
...
# TODO: i think this would be super hot B)
def focus_item(self, plotitem: PlotItem) -> PlotItem:
'''
Apply focus to a contained PlotItem thus making it the "top level"
item in the overlay able to accept peripheral's input from the user
and responsible for zoom and panning control via its ``ViewBox``.
'''
...
def get_axis(
self,
plot: PlotItem,
name: str,
) -> AxisItem:
'''
Retrieve the named axis for overlayed ``plot``.
'''
return self.layout.get_axis(plot, name)
# TODO: i guess we need this if you want to detach existing plots
# dynamically? XXX: untested as of now.
def _disconnect_all(
self,
plotitem: PlotItem,
) -> list[Signal]:
'''
Disconnects all signals related to this widget for the given chart.
'''
disconnected = []
for pi, sig in self._relays.items():
QObject.disconnect(sig)
disconnected.append(sig)
return disconnected