Add new `ui._flows` module

This begins the removal of data processing / analysis methods from the
chart widget and instead moving them to our new `Flow` API (in the new
module introduce here) and delegating the old chart methods to the
respective internal flow. Most importantly is no longer storing the
"last read" of an array from shm in an internal chart table (was
`._arrays`) and instead the `ShmArray` instance is passed as input and
stored in the `Flow` instance. This greatly simplifies lookup logic such
that the display loop now doesn't have to worry about reading shm, it
can be done by internal graphics logic as desired. Generally speaking,
all previous `._arrays`/`._graphics` lookups are now delegated to the
entries in the chart's `._flows` table.

The new `Flow` methods are generally better factored and provide more
detailed output regarding data-stream <-> graphics inter-relations for
the future purpose of allowing much more efficient update calls in the
display loop as well as supporting low latency interaction UX.

The concept here is that we're introducing an intermediary layer that
ties together graphics and real-time data flows such that widget code is
oriented around plot layout and the flow apis are oriented around
real-time low latency updates and providing an efficient high level
metric layer for the UX.

The summary api transition is something like:
- `update_graphics_from_array()` -> `.update_graphics_from_flow()`
- `.bars_range()` -> `Flow.datums_range()`
- `.bars_range()` -> `Flow.datums_range()`
incr_update_backup
Tyler Goodlet 2022-04-14 09:38:25 -04:00
parent 79eff13e76
commit 82dbdd6148
2 changed files with 360 additions and 220 deletions

View File

@ -34,9 +34,7 @@ from PyQt5.QtWidgets import (
QVBoxLayout,
QSplitter,
)
import msgspec
import numpy as np
# from pydantic import BaseModel
import pyqtgraph as pg
import trio
@ -49,6 +47,7 @@ from ._cursor import (
Cursor,
ContentsLabel,
)
from ..data._sharedmem import ShmArray
from ._l1 import L1Labels
from ._ohlc import BarItems
from ._curve import FastAppendCurve
@ -60,15 +59,12 @@ from ._style import (
)
from ..data.feed import Feed
from ..data._source import Symbol
from ..data._sharedmem import (
ShmArray,
# _Token,
)
from ..log import get_logger
from ._interaction import ChartView
from ._forms import FieldsForm
from .._profile import pg_profile_enabled, ms_slower_then
from ._overlay import PlotItemOverlay
from ._flows import Flow
if TYPE_CHECKING:
from ._display import DisplayState
@ -419,7 +415,7 @@ class LinkedSplits(QWidget):
self,
symbol: Symbol,
array: np.ndarray,
shm: ShmArray,
sidepane: FieldsForm,
style: str = 'bar',
@ -444,7 +440,7 @@ class LinkedSplits(QWidget):
self.chart = self.add_plot(
name=symbol.key,
array=array,
shm=shm,
style=style,
_is_main=True,
@ -472,7 +468,7 @@ class LinkedSplits(QWidget):
self,
name: str,
array: np.ndarray,
shm: ShmArray,
array_key: Optional[str] = None,
style: str = 'line',
@ -516,7 +512,6 @@ class LinkedSplits(QWidget):
name=name,
data_key=array_key or name,
array=array,
parent=qframe,
linkedsplits=self,
axisItems=axes,
@ -580,7 +575,7 @@ class LinkedSplits(QWidget):
graphics, data_key = cpw.draw_ohlc(
name,
array,
shm,
array_key=array_key
)
self.cursor.contents_labels.add_label(
@ -594,7 +589,7 @@ class LinkedSplits(QWidget):
add_label = True
graphics, data_key = cpw.draw_curve(
name,
array,
shm,
array_key=array_key,
color='default_light',
)
@ -603,7 +598,7 @@ class LinkedSplits(QWidget):
add_label = True
graphics, data_key = cpw.draw_curve(
name,
array,
shm,
array_key=array_key,
step_mode=True,
color='davies',
@ -691,7 +686,6 @@ class ChartPlotWidget(pg.PlotWidget):
# the "data view" we generate graphics from
name: str,
array: np.ndarray,
data_key: str,
linkedsplits: LinkedSplits,
@ -744,14 +738,6 @@ class ChartPlotWidget(pg.PlotWidget):
self._max_l1_line_len: float = 0
# self.setViewportMargins(0, 0, 0, 0)
# self._ohlc = array # readonly view of ohlc data
# TODO: move to Aggr above XD
# readonly view of data arrays
self._arrays = {
self.data_key: array,
}
self._graphics = {} # registry of underlying graphics
# registry of overlay curve names
self._flows: dict[str, Flow] = {}
@ -767,7 +753,6 @@ class ChartPlotWidget(pg.PlotWidget):
# show background grid
self.showGrid(x=False, y=True, alpha=0.3)
self.default_view()
self.cv.enable_auto_yrange()
self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)
@ -816,14 +801,8 @@ class ChartPlotWidget(pg.PlotWidget):
Return a range tuple for the bars present in view.
'''
l, r = self.view_range()
array = self._arrays[self.name]
start, stop = self._xrange = (
array[0]['index'],
array[-1]['index'],
)
lbar = max(l, start)
rbar = min(r, stop)
main_flow = self._flows[self.name]
ifirst, l, lbar, rbar, r, ilast = main_flow.datums_range()
return l, lbar, rbar, r
def curve_width_pxs(
@ -877,40 +856,51 @@ class ChartPlotWidget(pg.PlotWidget):
def default_view(
self,
steps_on_screen: Optional[int] = None
bars_from_y: int = 5000,
) -> None:
'''
Set the view box to the "default" startup view of the scene.
'''
try:
index = self._arrays[self.name]['index']
except IndexError:
log.warning(f'array for {self.name} not loaded yet?')
flow = self._flows.get(self.name)
if not flow:
log.warning(f'`Flow` for {self.name} not loaded yet?')
return
index = flow.shm.array['index']
xfirst, xlast = index[0], index[-1]
l, lbar, rbar, r = self.bars_range()
marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1
view = self.view
if (
rbar < 0
or l < xfirst
or l < 0
or (rbar - lbar) < 6
):
# set fixed bars count on screen that approx includes as
# TODO: set fixed bars count on screen that approx includes as
# many bars as possible before a downsample line is shown.
begin = xlast - round(6116 / 6)
begin = xlast - bars_from_y
view.setXRange(
min=begin,
max=xlast,
padding=0,
)
# re-get range
l, lbar, rbar, r = self.bars_range()
else:
begin = end - (r - l)
# we get the L1 spread label "length" in view coords
# terms now that we've scaled either by user control
# or to the default set of bars as per the immediate block
# above.
marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1
begin = end - (r - l)
# for debugging
# print(
# f'bars range: {brange}\n'
# # f'bars range: {brange}\n'
# f'xlast: {xlast}\n'
# f'marker pos: {marker_pos}\n'
# f'l1 len: {l1_len}\n'
@ -922,14 +912,13 @@ class ChartPlotWidget(pg.PlotWidget):
if self._static_yrange == 'axis':
self._static_yrange = None
view = self.view
view.setXRange(
min=begin,
max=end,
padding=0,
)
view._set_yrange()
self.view.maybe_downsample_graphics()
view._set_yrange()
try:
self.linked.graphics_cycle()
except IndexError:
@ -960,7 +949,7 @@ class ChartPlotWidget(pg.PlotWidget):
def draw_ohlc(
self,
name: str,
data: np.ndarray,
shm: ShmArray,
array_key: Optional[str] = None,
@ -980,19 +969,21 @@ class ChartPlotWidget(pg.PlotWidget):
# the np array buffer to be drawn on next render cycle
self.plotItem.addItem(graphics)
# draw after to allow self.scene() to work...
graphics.draw_from_data(data)
data_key = array_key or name
self._graphics[data_key] = graphics
self._flows[data_key] = Flow(
name=name,
plot=self.plotItem,
_shm=shm,
is_ohlc=True,
graphics=graphics,
)
# TODO: i think we can eventually remove this if
# we write the ``Flow.update_graphics()`` method right?
# draw after to allow self.scene() to work...
graphics.draw_from_data(shm.array)
self._add_sticky(name, bg_color='davies')
return graphics, data_key
@ -1058,7 +1049,7 @@ class ChartPlotWidget(pg.PlotWidget):
self,
name: str,
data: np.ndarray,
shm: ShmArray,
array_key: Optional[str] = None,
overlay: bool = False,
@ -1071,7 +1062,7 @@ class ChartPlotWidget(pg.PlotWidget):
) -> (pg.PlotDataItem, str):
'''
Draw a "curve" (line plot graphics) for the provided data in
the input array ``data``.
the input shm array ``shm``.
'''
color = color or self.pen_color or 'default_light'
@ -1082,6 +1073,7 @@ class ChartPlotWidget(pg.PlotWidget):
data_key = array_key or name
# yah, we wrote our own B)
data = shm.array
curve = FastAppendCurve(
y=data[data_key],
x=data['index'],
@ -1105,16 +1097,14 @@ class ChartPlotWidget(pg.PlotWidget):
# and is disastrous for performance.
# curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache)
# register curve graphics and backing array for name
self._graphics[name] = curve
self._arrays[data_key] = data
pi = pi or self.plotItem
self._flows[data_key] = Flow(
name=name,
plot=pi,
_shm=shm,
is_ohlc=False,
# register curve graphics with this flow
graphics=curve,
)
@ -1175,16 +1165,11 @@ class ChartPlotWidget(pg.PlotWidget):
)
return last
def update_graphics_from_array(
def update_graphics_from_flow(
self,
graphics_name: str,
array: Optional[np.ndarray] = None,
array_key: Optional[str] = None,
use_vr: bool = True,
render: bool = True,
**kwargs,
) -> pg.GraphicsObject:
@ -1192,63 +1177,11 @@ class ChartPlotWidget(pg.PlotWidget):
Update the named internal graphics from ``array``.
'''
if array is not None:
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
data_key = self.name
if array is not None:
# write array to internal graphics table
self._arrays[data_key] = array
else:
array = self._arrays[data_key]
# array key and graphics "name" might be different..
graphics = self._graphics[graphics_name]
# compute "in-view" indices
l, lbar, rbar, r = self.bars_range()
indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
in_view = array[lbar_i: rbar_i + 1]
if (
not in_view.size
or not render
):
return graphics
if isinstance(graphics, BarItems):
graphics.update_from_array(
array,
in_view,
view_range=(lbar_i, rbar_i) if use_vr else None,
**kwargs,
)
else:
graphics.update_from_array(
x=array['index'],
y=array[data_key],
x_iv=in_view['index'],
y_iv=in_view[data_key],
view_range=(lbar_i, rbar_i) if use_vr else None,
**kwargs
)
return graphics
flow = self._flows[array_key or graphics_name]
return flow.update_graphics(
array_key=array_key,
**kwargs,
)
# def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms
@ -1295,7 +1228,7 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: this should go onto some sort of
# data-view thinger..right?
ohlc = self._shm.array
ohlc = self._flows[self.name].shm.array
# XXX: not sure why the time is so off here
# looks like we're gonna have to do some fixing..
@ -1341,9 +1274,6 @@ class ChartPlotWidget(pg.PlotWidget):
delayed=True,
)
l, lbar, rbar, r = bars_range or self.bars_range()
profiler(f'{self.name} got bars range')
# TODO: here we should instead look up the ``Flow.shm.array``
# and read directly from shm to avoid copying to memory first
# and then reading it again here.
@ -1356,6 +1286,9 @@ class ChartPlotWidget(pg.PlotWidget):
res = 0, 0
else:
first, l, lbar, rbar, r, last = bars_range or flow.datums_range()
profiler(f'{self.name} got bars range')
key = round(lbar), round(rbar)
res = flow.maxmin(*key)
profiler(f'yrange mxmn: {key} -> {res}')
@ -1366,99 +1299,3 @@ class ChartPlotWidget(pg.PlotWidget):
res = 0, 0
return res
# class FlowsTable(pydantic.BaseModel):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
class Flow(msgspec.Struct): # , frozen=True):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
is_ohlc: bool = False
graphics: pg.GraphicsObject
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
_shm: Optional[ShmArray] = None # currently, may be filled in "later"
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
cached_result = self._mxmns.get(rkey)
if cached_result:
return cached_result
shm = self.shm
if shm is None:
mxmn = None
else: # new block for profiling?..
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:
(rbar - ifirst) + 1
]
if not slice_view.size:
mxmn = None
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
mxmn = ylow, yhigh
if mxmn is not None:
# cache new mxmn result
self._mxmns[rkey] = mxmn
return mxmn

303
piker/ui/_flows.py 100644
View File

@ -0,0 +1,303 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level streaming graphics primitives.
This is an intermediate layer which associates real-time low latency
graphics primitives with underlying FSP related data structures for fast
incremental update.
'''
from typing import (
Optional,
Callable,
)
import msgspec
import numpy as np
import pyqtgraph as pg
from PyQt5.QtGui import QPainterPath
from ..data._sharedmem import (
ShmArray,
# attach_shm_array
)
from ._ohlc import BarItems
# class FlowsTable(msgspec.Struct):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
class Flow(msgspec.Struct): # , frozen=True):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
is_ohlc: bool = False
render: bool = True # toggle for display loop
graphics: pg.GraphicsObject
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
_shm: Optional[ShmArray] = None # currently, may be filled in "later"
# last read from shm (usually due to an update call)
_last_read: Optional[np.ndarray] = None
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
# TODO: remove this and only allow setting through
# private ``._shm`` attr?
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
print(f'{self.name} DO NOT SET SHM THIS WAY!?')
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
cached_result = self._mxmns.get(rkey)
if cached_result:
return cached_result
shm = self.shm
if shm is None:
mxmn = None
else: # new block for profiling?..
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:
(rbar - ifirst) + 1
]
if not slice_view.size:
mxmn = None
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
mxmn = ylow, yhigh
if mxmn is not None:
# cache new mxmn result
self._mxmns[rkey] = mxmn
return mxmn
def view_range(self) -> tuple[int, int]:
'''
Return the indexes in view for the associated
plot displaying this flow's data.
'''
vr = self.plot.viewRect()
return int(vr.left()), int(vr.right())
def datums_range(self) -> tuple[
int, int, int, int, int, int
]:
'''
Return a range tuple for the datums present in view.
'''
l, r = self.view_range()
# TODO: avoid this and have shm passed
# in earlier.
if self.shm is None:
# haven't initialized the flow yet
return (0, l, 0, 0, r, 0)
array = self.shm.array
index = array['index']
start = index[0]
end = index[-1]
lbar = max(l, start)
rbar = min(r, end)
return (
start, l, lbar, rbar, r, end,
)
def read(self) -> tuple[
int, int, np.ndarray,
int, int, np.ndarray,
]:
array = self.shm.array
indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
ifirst, l, lbar, rbar, r, ilast = self.datums_range()
# get read-relative indices adjusting
# for master shm index.
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
in_view = array[lbar_i: rbar_i + 1]
return (
# abs indices + full data set
ifirst, ilast, array,
# relative indices + in view datums
lbar_i, rbar_i, in_view,
)
def update_graphics(
self,
use_vr: bool = True,
render: bool = True,
array_key: Optional[str] = None,
**kwargs,
) -> pg.GraphicsObject:
'''
Read latest datums from shm and render to (incrementally)
render to graphics.
'''
# shm read and slice to view
xfirst, xlast, array, ivl, ivr, in_view = self.read()
if (
not in_view.size
or not render
):
return self.graphics
array_key = array_key or self.name
graphics = self.graphics
if isinstance(graphics, BarItems):
graphics.update_from_array(
array,
in_view,
view_range=(ivl, ivr) if use_vr else None,
**kwargs,
)
else:
graphics.update_from_array(
x=array['index'],
y=array[array_key],
x_iv=in_view['index'],
y_iv=in_view[array_key],
view_range=(ivl, ivr) if use_vr else None,
**kwargs
)
return graphics
# @classmethod
# def from_token(
# cls,
# shm_token: tuple[
# str,
# str,
# tuple[str, str],
# ],
# ) -> PathRenderer:
# shm = attach_shm_array(token)
# return cls(shm)
class PathRenderer(msgspec.Struct):
# output graphics rendering
path: Optional[QPainterPath] = None
last_read_src_array: np.ndarray
# called on input data but before
prerender_fn: Callable[ShmArray, np.ndarray]
def diff(
self,
) -> dict[str, np.ndarray]:
...
def update(self) -> QPainterPath:
'''
Incrementally update the internal path graphics from
updates in shm data and deliver the new (sub)-path
generated.
'''
...
def render(
self,
) -> list[QPainterPath]:
'''
Render the current graphics path(s)
'''
...