Merge pull request #451 from pikers/epoch_indexing_and_dataviz_layer

Epoch indexing and dataviz layer
kraken_deposits_fixes
goodboy 2023-02-12 14:27:43 -05:00 committed by GitHub
commit d690ad2bab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 2955 additions and 2396 deletions

File diff suppressed because it is too large Load Diff

View File

@ -15,17 +15,30 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Graphics related downsampling routines for compressing to pixel
limits on the display device.
Graphics downsampling using the infamous M4 algorithm.
This is one of ``piker``'s secret weapons allowing us to boss all other
charting platforms B)
(AND DON'T YOU DARE TAKE THIS CODE WITHOUT CREDIT OR WE'LL SUE UR F#&@* ASS).
NOTES: this method is a so called "visualization driven data
aggregation" approach. It gives error-free line chart
downsampling, see
further scientific paper resources:
- http://www.vldb.org/pvldb/vol7/p797-jugel.pdf
- http://www.vldb.org/2014/program/papers/demo/p997-jugel.pdf
Details on implementation of this algo are based in,
https://github.com/pikers/piker/issues/109
'''
import math
from typing import Optional
import numpy as np
from numpy.lib import recfunctions as rfn
from numba import (
jit,
njit,
# float64, optional, int64,
)
@ -35,109 +48,6 @@ from ..log import get_logger
log = get_logger(__name__)
def hl2mxmn(ohlc: np.ndarray) -> np.ndarray:
'''
Convert a OHLC struct-array containing 'high'/'low' columns
to a "joined" max/min 1-d array.
'''
index = ohlc['index']
hls = ohlc[[
'low',
'high',
]]
mxmn = np.empty(2*hls.size, dtype=np.float64)
x = np.empty(2*hls.size, dtype=np.float64)
trace_hl(hls, mxmn, x, index[0])
x = x + index[0]
return mxmn, x
@jit(
# TODO: the type annots..
# float64[:](float64[:],),
nopython=True,
)
def trace_hl(
hl: 'np.ndarray',
out: np.ndarray,
x: np.ndarray,
start: int,
# the "offset" values in the x-domain which
# place the 2 output points around each ``int``
# master index.
margin: float = 0.43,
) -> None:
'''
"Trace" the outline of the high-low values of an ohlc sequence
as a line such that the maximum deviation (aka disperaion) between
bars if preserved.
This routine is expected to modify input arrays in-place.
'''
last_l = hl['low'][0]
last_h = hl['high'][0]
for i in range(hl.size):
row = hl[i]
l, h = row['low'], row['high']
up_diff = h - last_l
down_diff = last_h - l
if up_diff > down_diff:
out[2*i + 1] = h
out[2*i] = last_l
else:
out[2*i + 1] = l
out[2*i] = last_h
last_l = l
last_h = h
x[2*i] = int(i) - margin
x[2*i + 1] = int(i) + margin
return out
def ohlc_flatten(
ohlc: np.ndarray,
use_mxmn: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
'''
Convert an OHLCV struct-array into a flat ready-for-line-plotting
1-d array that is 4 times the size with x-domain values distributed
evenly (by 0.5 steps) over each index.
'''
index = ohlc['index']
if use_mxmn:
# traces a line optimally over highs to lows
# using numba. NOTE: pretty sure this is faster
# and looks about the same as the below output.
flat, x = hl2mxmn(ohlc)
else:
flat = rfn.structured_to_unstructured(
ohlc[['open', 'high', 'low', 'close']]
).flatten()
x = np.linspace(
start=index[0] - 0.5,
stop=index[-1] + 0.5,
num=len(flat),
)
return x, flat
def ds_m4(
x: np.ndarray,
y: np.ndarray,
@ -160,16 +70,6 @@ def ds_m4(
This is more or less an OHLC style sampling of a line-style series.
'''
# NOTE: this method is a so called "visualization driven data
# aggregation" approach. It gives error-free line chart
# downsampling, see
# further scientific paper resources:
# - http://www.vldb.org/pvldb/vol7/p797-jugel.pdf
# - http://www.vldb.org/2014/program/papers/demo/p997-jugel.pdf
# Details on implementation of this algo are based in,
# https://github.com/pikers/piker/issues/109
# XXX: from infinite on downsampling viewable graphics:
# "one thing i remembered about the binning - if you are
# picking a range within your timeseries the start and end bin
@ -256,8 +156,7 @@ def ds_m4(
return nb, x_out, y_out, ymn, ymx
@jit(
nopython=True,
@njit(
nogil=True,
)
def _m4(

View File

@ -0,0 +1,432 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Super fast ``QPainterPath`` generation related operator routines.
"""
import numpy as np
from numpy.lib import recfunctions as rfn
from numba import (
# types,
njit,
float64,
int64,
# optional,
)
# TODO: for ``numba`` typing..
# from ._source import numba_ohlc_dtype
from ._m4 import ds_m4
from .._profile import (
Profiler,
pg_profile_enabled,
ms_slower_then,
)
def xy_downsample(
x,
y,
uppx,
x_spacer: float = 0.5,
) -> tuple[
np.ndarray,
np.ndarray,
float,
float,
]:
'''
Downsample 1D (flat ``numpy.ndarray``) arrays using M4 given an input
``uppx`` (units-per-pixel) and add space between discreet datums.
'''
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y, ymn, ymx = ds_m4(
x,
y,
uppx,
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
return x, y, ymn, ymx
@njit(
# NOTE: need to construct this manually for readonly
# arrays, see https://github.com/numba/numba/issues/4511
# (
# types.Array(
# numba_ohlc_dtype,
# 1,
# 'C',
# readonly=True,
# ),
# int64,
# types.unicode_type,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_w: float64,
bar_gap: float64 = 0.16,
use_time_index: bool = True,
# XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
# index_field: str,
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
# XXX: see this for why the dtype might have to be defined outside
# the routine.
# https://github.com/numba/numba/issues/4098#issuecomment-493914533
x = np.zeros(
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
half_w: float = bar_w/2
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
open = q['open']
high = q['high']
low = q['low']
close = q['close']
if use_time_index:
index = float64(q['time'])
else:
index = float64(q['index'])
# XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622
# index = float64(q[index_field])
# AND this (probably)
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
mid: float = index + half_w
x[istart:istop] = (
index + bar_gap,
mid,
mid,
mid,
mid,
index + bar_w - bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
def hl2mxmn(
ohlc: np.ndarray,
index_field: str = 'index',
) -> np.ndarray:
'''
Convert a OHLC struct-array containing 'high'/'low' columns
to a "joined" max/min 1-d array.
'''
index = ohlc[index_field]
hls = ohlc[[
'low',
'high',
]]
mxmn = np.empty(2*hls.size, dtype=np.float64)
x = np.empty(2*hls.size, dtype=np.float64)
trace_hl(hls, mxmn, x, index[0])
x = x + index[0]
return mxmn, x
@njit(
# TODO: the type annots..
# float64[:](float64[:],),
)
def trace_hl(
hl: 'np.ndarray',
out: np.ndarray,
x: np.ndarray,
start: int,
# the "offset" values in the x-domain which
# place the 2 output points around each ``int``
# master index.
margin: float = 0.43,
) -> None:
'''
"Trace" the outline of the high-low values of an ohlc sequence
as a line such that the maximum deviation (aka disperaion) between
bars if preserved.
This routine is expected to modify input arrays in-place.
'''
last_l = hl['low'][0]
last_h = hl['high'][0]
for i in range(hl.size):
row = hl[i]
l, h = row['low'], row['high']
up_diff = h - last_l
down_diff = last_h - l
if up_diff > down_diff:
out[2*i + 1] = h
out[2*i] = last_l
else:
out[2*i + 1] = l
out[2*i] = last_h
last_l = l
last_h = h
x[2*i] = int(i) - margin
x[2*i + 1] = int(i) + margin
return out
def ohlc_flatten(
ohlc: np.ndarray,
use_mxmn: bool = True,
index_field: str = 'index',
) -> tuple[np.ndarray, np.ndarray]:
'''
Convert an OHLCV struct-array into a flat ready-for-line-plotting
1-d array that is 4 times the size with x-domain values distributed
evenly (by 0.5 steps) over each index.
'''
index = ohlc[index_field]
if use_mxmn:
# traces a line optimally over highs to lows
# using numba. NOTE: pretty sure this is faster
# and looks about the same as the below output.
flat, x = hl2mxmn(ohlc)
else:
flat = rfn.structured_to_unstructured(
ohlc[['open', 'high', 'low', 'close']]
).flatten()
x = np.linspace(
start=index[0] - 0.5,
stop=index[-1] + 0.5,
num=len(flat),
)
return x, flat
def slice_from_time(
arr: np.ndarray,
start_t: float,
stop_t: float,
step: int | None = None,
) -> tuple[
slice,
slice,
]:
'''
Calculate array indices mapped from a time range and return them in
a slice.
Given an input array with an epoch `'time'` series entry, calculate
the indices which span the time range and return in a slice. Presume
each `'time'` step increment is uniform and when the time stamp
series contains gaps (the uniform presumption is untrue) use
``np.searchsorted()`` binary search to look up the appropriate
index.
'''
profiler = Profiler(
msg='slice_from_time()',
disabled=not pg_profile_enabled(),
ms_threshold=ms_slower_then,
)
times = arr['time']
t_first = round(times[0])
read_i_max = arr.shape[0]
if step is None:
step = round(times[-1] - times[-2])
if step == 0:
# XXX: HOW TF is this happening?
step = 1
# compute (presumed) uniform-time-step index offsets
i_start_t = round(start_t)
read_i_start = round(((i_start_t - t_first) // step)) - 1
i_stop_t = round(stop_t)
read_i_stop = round((i_stop_t - t_first) // step) + 1
# always clip outputs to array support
# for read start:
# - never allow a start < the 0 index
# - never allow an end index > the read array len
read_i_start = min(
max(0, read_i_start),
read_i_max - 1,
)
read_i_stop = max(
0,
min(read_i_stop, read_i_max),
)
# check for larger-then-latest calculated index for given start
# time, in which case we do a binary search for the correct index.
# NOTE: this is usually the result of a time series with time gaps
# where it is expected that each index step maps to a uniform step
# in the time stamp series.
t_iv_start = times[read_i_start]
if (
t_iv_start > i_start_t
):
# do a binary search for the best index mapping to ``start_t``
# given we measured an overshoot using the uniform-time-step
# calculation from above.
# TODO: once we start caching these per source-array,
# we can just overwrite ``read_i_start`` directly.
new_read_i_start = np.searchsorted(
times,
i_start_t,
side='left',
)
# TODO: minimize binary search work as much as possible:
# - cache these remap values which compensate for gaps in the
# uniform time step basis where we calc a later start
# index for the given input ``start_t``.
# - can we shorten the input search sequence by heuristic?
# up_to_arith_start = index[:read_i_start]
if (
new_read_i_start < read_i_start
):
# t_diff = t_iv_start - start_t
# print(
# f"WE'RE CUTTING OUT TIME - STEP:{step}\n"
# f'start_t:{start_t} -> 0index start_t:{t_iv_start}\n'
# f'diff: {t_diff}\n'
# f'REMAPPED START i: {read_i_start} -> {new_read_i_start}\n'
# )
read_i_start = new_read_i_start - 1
t_iv_stop = times[read_i_stop - 1]
if (
t_iv_stop > i_stop_t
):
# t_diff = stop_t - t_iv_stop
# print(
# f"WE'RE CUTTING OUT TIME - STEP:{step}\n"
# f'calced iv stop:{t_iv_stop} -> stop_t:{stop_t}\n'
# f'diff: {t_diff}\n'
# # f'SHOULD REMAP STOP: {read_i_start} -> {new_read_i_start}\n'
# )
new_read_i_stop = np.searchsorted(
times[read_i_start:],
i_stop_t,
side='left',
)
if (
new_read_i_stop < read_i_stop
):
read_i_stop = read_i_start + new_read_i_stop
# sanity checks for range size
# samples = (i_stop_t - i_start_t) // step
# index_diff = read_i_stop - read_i_start + 1
# if index_diff > (samples + 3):
# breakpoint()
# read-relative indexes: gives a slice where `shm.array[read_slc]`
# will be the data spanning the input time range `start_t` ->
# `stop_t`
read_slc = slice(
int(read_i_start),
int(read_i_stop),
)
profiler(
'slicing complete'
# f'{start_t} -> {abs_slc.start} | {read_slc.start}\n'
# f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n'
)
# NOTE: if caller needs absolute buffer indices they can
# slice the buffer abs index like so:
# index = arr['index']
# abs_indx = index[read_slc]
# abs_slc = slice(
# int(abs_indx[0]),
# int(abs_indx[-1]),
# )
return read_slc

View File

@ -48,9 +48,13 @@ from ._sharedmem import (
from ._sampling import (
open_sample_stream,
)
# from .._profile import (
# Profiler,
# pg_profile_enabled,
# )
if TYPE_CHECKING:
from pyqtgraph import PlotItem
# from pyqtgraph import PlotItem
from .feed import Feed
@ -218,104 +222,18 @@ class Flume(Struct):
def get_index(
self,
time_s: float,
array: np.ndarray,
) -> int:
) -> int | float:
'''
Return array shm-buffer index for for epoch time.
'''
array = self.rt_shm.array
times = array['time']
mask = (times >= time_s)
if any(mask):
return array['index'][mask][0]
# just the latest index
array['index'][-1]
def slice_from_time(
self,
array: np.ndarray,
start_t: float,
stop_t: float,
timeframe_s: int = 1,
return_data: bool = False,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
arr = {
1: self.rt_shm.array,
60: self.hist_shm.arry,
}[timeframe_s]
times = arr['time']
index = array['index']
# use advanced indexing to map the
# time range to the index range.
mask = (
(times >= start_t)
&
(times < stop_t)
first = np.searchsorted(
times,
time_s,
side='left',
)
# TODO: if we can ensure each time field has a uniform
# step we can instead do some arithmetic to determine
# the equivalent index like we used to?
# return array[
# lbar - ifirst:
# (rbar - ifirst) + 1
# ]
i_by_t = index[mask]
i_0 = i_by_t[0]
abs_slc = slice(
i_0,
i_by_t[-1],
)
# slice data by offset from the first index
# available in the passed datum set.
read_slc = slice(
0,
i_by_t[-1] - i_0,
)
if not return_data:
return (
abs_slc,
read_slc,
)
# also return the readable data from the timerange
return (
abs_slc,
read_slc,
arr[mask],
)
def view_data(
self,
plot: PlotItem,
timeframe_s: int = 1,
) -> np.ndarray:
# get far-side x-indices plot view
vr = plot.viewRect()
(
abs_slc,
buf_slc,
iv_arr,
) = self.slice_from_time(
start_t=vr.left(),
stop_t=vr.right(),
timeframe_s=timeframe_s,
return_data=True,
)
return iv_arr
imx = times.shape[0] - 1
return min(first, imx)

View File

@ -188,6 +188,8 @@ async def fsp_compute(
history_by_field['time'] = src_time[-len(history_by_field):]
history_output['time'] = src.array['time']
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make

View File

@ -178,8 +178,7 @@ def _main(
tractor_kwargs,
) -> None:
'''
Sync entry point to start a chart: a ``tractor`` + Qt runtime
entry point
Sync entry point to start a chart: a ``tractor`` + Qt runtime.
'''
run_qtractor(

View File

@ -49,7 +49,7 @@ class Axis(pg.AxisItem):
def __init__(
self,
plotitem: pgo.PlotItem,
typical_max_str: str = '100 000.000',
typical_max_str: str = '100 000.000 ',
text_color: str = 'bracket',
lru_cache_tick_strings: bool = True,
**kwargs
@ -95,9 +95,10 @@ class Axis(pg.AxisItem):
self.setPen(_axis_pen)
# this is the text color
# self.setTextPen(pg.mkPen(hcolor(text_color)))
self.text_color = text_color
# generate a bounding rect based on sizing to a "typical"
# maximum length-ed string defined as init default.
self.typical_br = _font._qfm.boundingRect(typical_max_str)
# size the pertinent axis dimension to a "typical value"
@ -154,8 +155,8 @@ class Axis(pg.AxisItem):
pi: pgo.PlotItem,
name: None | str = None,
digits: None | int = 2,
# axis_name: str = 'right',
bg_color='bracket',
bg_color='default',
fg_color='black',
) -> YAxisLabel:
@ -165,22 +166,20 @@ class Axis(pg.AxisItem):
digits = digits or 2
# TODO: ``._ysticks`` should really be an attr on each
# ``PlotItem`` no instead of the (containing because of
# overlays) widget?
# ``PlotItem`` now instead of the containing widget (because of
# overlays) ?
# add y-axis "last" value label
sticky = self._stickies[name] = YAxisLabel(
pi=pi,
parent=self,
# TODO: pass this from symbol data
digits=digits,
opacity=1,
digits=digits, # TODO: pass this from symbol data
opacity=0.9, # slight see-through
bg_color=bg_color,
fg_color=fg_color,
)
pi.sigRangeChanged.connect(sticky.update_on_resize)
# pi.addItem(sticky)
# pi.addItem(last)
return sticky
@ -244,7 +243,6 @@ class PriceAxis(Axis):
self._min_tick = size
def size_to_values(self) -> None:
# self.typical_br = _font._qfm.boundingRect(typical_max_str)
self.setWidth(self.typical_br.width())
# XXX: drop for now since it just eats up h space
@ -302,27 +300,44 @@ class DynamicDateAxis(Axis):
# XX: ARGGGGG AG:LKSKDJF:LKJSDFD
chart = self.pi.chart_widget
flow = chart._flows[chart.name]
shm = flow.shm
bars = shm.array
first = shm._first.value
viz = chart._vizs[chart.name]
shm = viz.shm
array = shm.array
times = array['time']
i_0, i_l = times[0], times[-1]
bars_len = len(bars)
times = bars['time']
if (
(indexes[0] < i_0
and indexes[-1] < i_l)
or
(indexes[0] > i_0
and indexes[-1] > i_l)
):
return []
epochs = times[list(
map(
int,
filter(
lambda i: i > 0 and i < bars_len,
(i-first for i in indexes)
if viz.index_field == 'index':
arr_len = times.shape[0]
first = shm._first.value
epochs = times[
list(
map(
int,
filter(
lambda i: i > 0 and i < arr_len,
(i - first for i in indexes)
)
)
)
)
)]
]
else:
epochs = list(map(int, indexes))
# TODO: **don't** have this hard coded shift to EST
# delay = times[-1] - times[-2]
dts = np.array(epochs, dtype='datetime64[s]')
dts = np.array(
epochs,
dtype='datetime64[s]',
)
# see units listing:
# https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
@ -340,24 +355,39 @@ class DynamicDateAxis(Axis):
spacing: float,
) -> list[str]:
return self._indexes_to_timestrs(values)
# NOTE: handy for debugging the lru cache
# info = self.tickStrings.cache_info()
# print(info)
return self._indexes_to_timestrs(values)
class AxisLabel(pg.GraphicsObject):
_x_margin = 0
_y_margin = 0
# relative offsets *OF* the bounding rect relative
# to parent graphics object.
# eg. <parent>| => <_x_br_offset> => | <text> |
_x_br_offset: float = 0
_y_br_offset: float = 0
# relative offsets of text *within* bounding rect
# eg. | <_x_margin> => <text> |
_x_margin: float = 0
_y_margin: float = 0
# multiplier of the text content's height in order
# to force a larger (y-dimension) bounding rect.
_y_txt_h_scaling: float = 1
def __init__(
self,
parent: pg.GraphicsItem,
digits: int = 2,
bg_color: str = 'bracket',
bg_color: str = 'default',
fg_color: str = 'black',
opacity: int = 1, # XXX: seriously don't set this to 0
opacity: int = .8, # XXX: seriously don't set this to 0
font_size: str = 'default',
use_arrow: bool = True,
@ -368,6 +398,7 @@ class AxisLabel(pg.GraphicsObject):
self.setParentItem(parent)
self.setFlag(self.ItemIgnoresTransformations)
self.setZValue(100)
# XXX: pretty sure this is faster
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
@ -399,14 +430,14 @@ class AxisLabel(pg.GraphicsObject):
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget
) -> None:
"""Draw a filled rectangle based on the size of ``.label_str`` text.
'''
Draw a filled rectangle based on the size of ``.label_str`` text.
Subtypes can customize further by overloading ``.draw()``.
"""
# p.setCompositionMode(QtWidgets.QPainter.CompositionMode_SourceOver)
'''
if self.label_str:
# if not self.rect:
@ -417,7 +448,11 @@ class AxisLabel(pg.GraphicsObject):
p.setFont(self._dpifont.font)
p.setPen(self.fg_color)
p.drawText(self.rect, self.text_flags, self.label_str)
p.drawText(
self.rect,
self.text_flags,
self.label_str,
)
def draw(
self,
@ -425,6 +460,8 @@ class AxisLabel(pg.GraphicsObject):
rect: QtCore.QRectF
) -> None:
p.setOpacity(self.opacity)
if self._use_arrow:
if not self.path:
self._draw_arrow_path()
@ -432,15 +469,13 @@ class AxisLabel(pg.GraphicsObject):
p.drawPath(self.path)
p.fillPath(self.path, pg.mkBrush(self.bg_color))
# this adds a nice black outline around the label for some odd
# reason; ok by us
p.setOpacity(self.opacity)
# this cause the L1 labels to glitch out if used in the subtype
# and it will leave a small black strip with the arrow path if
# done before the above
p.fillRect(self.rect, self.bg_color)
p.fillRect(
self.rect,
self.bg_color,
)
def boundingRect(self): # noqa
'''
@ -484,15 +519,18 @@ class AxisLabel(pg.GraphicsObject):
txt_h, txt_w = txt_br.height(), txt_br.width()
# print(f'wsw: {self._dpifont.boundingRect(" ")}')
# allow subtypes to specify a static width and height
# allow subtypes to override width and height
h, w = self.size_hint()
# print(f'axis size: {self._parent.size()}')
# print(f'axis geo: {self._parent.geometry()}')
self.rect = QtCore.QRectF(
0, 0,
# relative bounds offsets
self._x_br_offset,
self._y_br_offset,
(w or txt_w) + self._x_margin / 2,
(h or txt_h) + self._y_margin / 2,
(h or txt_h) * self._y_txt_h_scaling + (self._y_margin / 2),
)
# print(self.rect)
# hb = self.path.controlPointRect()

View File

@ -60,7 +60,7 @@ from ._style import (
hcolor,
CHART_MARGINS,
_xaxis_at,
_min_points_to_show,
# _min_points_to_show,
)
from ..data.feed import (
Feed,
@ -72,7 +72,7 @@ from ._interaction import ChartView
from ._forms import FieldsForm
from .._profile import pg_profile_enabled, ms_slower_then
from ._overlay import PlotItemOverlay
from ._flows import Flow
from ._dataviz import Viz
from ._search import SearchWidget
from . import _pg_overrides as pgo
from .._profile import Profiler
@ -711,7 +711,7 @@ class LinkedSplits(QWidget):
if style == 'ohlc_bar':
# graphics, data_key = cpw.draw_ohlc(
flow = cpw.draw_ohlc(
viz = cpw.draw_ohlc(
name,
shm,
flume=flume,
@ -727,7 +727,7 @@ class LinkedSplits(QWidget):
elif style == 'line':
add_label = True
# graphics, data_key = cpw.draw_curve(
flow = cpw.draw_curve(
viz = cpw.draw_curve(
name,
shm,
flume,
@ -738,7 +738,7 @@ class LinkedSplits(QWidget):
elif style == 'step':
add_label = True
# graphics, data_key = cpw.draw_curve(
flow = cpw.draw_curve(
viz = cpw.draw_curve(
name,
shm,
flume,
@ -751,8 +751,8 @@ class LinkedSplits(QWidget):
else:
raise ValueError(f"Chart style {style} is currently unsupported")
graphics = flow.graphics
data_key = flow.name
graphics = viz.graphics
data_key = viz.name
if _is_main:
assert style == 'ohlc_bar', 'main chart must be OHLC'
@ -810,6 +810,8 @@ class LinkedSplits(QWidget):
self.chart.sidepane.setMinimumWidth(sp_w)
# TODO: we should really drop using this type and instead just
# write our own wrapper around `PlotItem`..
class ChartPlotWidget(pg.PlotWidget):
'''
``GraphicsView`` subtype containing a single ``PlotItem``.
@ -908,7 +910,7 @@ class ChartPlotWidget(pg.PlotWidget):
# self.setViewportMargins(0, 0, 0, 0)
# registry of overlay curve names
self._flows: dict[str, Flow] = {}
self._vizs: dict[str, Viz] = {}
self.feed: Feed | None = None
@ -921,8 +923,6 @@ class ChartPlotWidget(pg.PlotWidget):
# show background grid
self.showGrid(x=False, y=True, alpha=0.3)
self.cv.enable_auto_yrange()
self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)
# indempotent startup flag for auto-yrange subsys
@ -951,41 +951,6 @@ class ChartPlotWidget(pg.PlotWidget):
def focus(self) -> None:
self.view.setFocus()
def _set_xlimits(
self,
xfirst: int,
xlast: int
) -> None:
"""Set view limits (what's shown in the main chart "pane")
based on max/min x/y coords.
"""
self.setLimits(
xMin=xfirst,
xMax=xlast,
minXRange=_min_points_to_show,
)
def view_range(self) -> tuple[int, int]:
vr = self.viewRect()
return int(vr.left()), int(vr.right())
def bars_range(self) -> tuple[int, int, int, int]:
'''
Return a range tuple for the bars present in view.
'''
main_flow = self._flows[self.name]
ifirst, l, lbar, rbar, r, ilast = main_flow.datums_range()
return l, lbar, rbar, r
def curve_width_pxs(
self,
) -> float:
_, lbar, rbar, _ = self.bars_range()
return self.view.mapViewToDevice(
QLineF(lbar, 0, rbar, 0)
).length()
def pre_l1_xs(self) -> tuple[float, float]:
'''
Return the view x-coord for the value just before
@ -994,11 +959,16 @@ class ChartPlotWidget(pg.PlotWidget):
'''
line_end, marker_right, yaxis_x = self.marker_right_points()
view = self.view
line = view.mapToView(
line = self.view.mapToView(
QLineF(line_end, 0, yaxis_x, 0)
)
return line.x1(), line.length()
linex, linelen = line.x1(), line.length()
# print(
# f'line: {line}\n'
# f'linex: {linex}\n'
# f'linelen: {linelen}\n'
# )
return linex, linelen
def marker_right_points(
self,
@ -1020,11 +990,16 @@ class ChartPlotWidget(pg.PlotWidget):
ryaxis = self.getAxis('right')
r_axis_x = ryaxis.pos().x()
up_to_l1_sc = r_axis_x - l1_len - 10
up_to_l1_sc = r_axis_x - l1_len
marker_right = up_to_l1_sc - (1.375 * 2 * marker_size)
line_end = marker_right - (6/16 * marker_size)
# print(
# f'r_axis_x: {r_axis_x}\n'
# f'up_to_l1_sc: {up_to_l1_sc}\n'
# f'marker_right: {marker_right}\n'
# f'line_end: {line_end}\n'
# )
return line_end, marker_right, r_axis_x
def default_view(
@ -1038,95 +1013,45 @@ class ChartPlotWidget(pg.PlotWidget):
Set the view box to the "default" startup view of the scene.
'''
flow = self._flows.get(self.name)
if not flow:
log.warning(f'`Flow` for {self.name} not loaded yet?')
viz = self.get_viz(self.name)
if not viz:
log.warning(f'`Viz` for {self.name} not loaded yet?')
return
arr = flow.shm.array
index = arr['index']
# times = arr['time']
# these will be epoch time floats
xfirst, xlast = index[0], index[-1]
l, lbar, rbar, r = self.bars_range()
view = self.view
if (
rbar < 0
or l < xfirst
or l < 0
or (rbar - lbar) < 6
):
# TODO: set fixed bars count on screen that approx includes as
# many bars as possible before a downsample line is shown.
begin = xlast - bars_from_y
view.setXRange(
min=begin,
max=xlast,
padding=0,
)
# re-get range
l, lbar, rbar, r = self.bars_range()
# we get the L1 spread label "length" in view coords
# terms now that we've scaled either by user control
# or to the default set of bars as per the immediate block
# above.
if not y_offset:
marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1
else:
end = xlast + y_offset + 1
begin = end - (r - l)
# for debugging
# print(
# # f'bars range: {brange}\n'
# f'xlast: {xlast}\n'
# f'marker pos: {marker_pos}\n'
# f'l1 len: {l1_len}\n'
# f'begin: {begin}\n'
# f'end: {end}\n'
# )
# remove any custom user yrange setttings
if self._static_yrange == 'axis':
self._static_yrange = None
view.setXRange(
min=begin,
max=end,
padding=0,
viz.default_view(
bars_from_y,
y_offset,
do_ds,
)
if do_ds:
self.view.maybe_downsample_graphics()
view._set_yrange()
try:
self.linked.graphics_cycle()
except IndexError:
pass
def increment_view(
self,
steps: int = 1,
datums: int = 1,
vb: Optional[ChartView] = None,
) -> None:
"""
Increment the data view one step to the right thus "following"
the current time slot/step/bar.
'''
Increment the data view ``datums``` steps toward y-axis thus
"following" the current time slot/step/bar.
"""
l, r = self.view_range()
'''
view = vb or self.view
viz = self.main_viz
l, r = viz.view_range()
x_shift = viz.index_step() * datums
if datums >= 300:
print("FUCKING FIX THE GLOBAL STEP BULLSHIT")
# breakpoint()
return
view.setXRange(
min=l + steps,
max=r + steps,
min=l + x_shift,
max=r + x_shift,
# TODO: holy shit, wtf dude... why tf would this not be 0 by
# default... speechless.
@ -1220,7 +1145,7 @@ class ChartPlotWidget(pg.PlotWidget):
**graphics_kwargs,
) -> Flow:
) -> Viz:
'''
Draw a "curve" (line plot graphics) for the provided data in
the input shm array ``shm``.
@ -1254,17 +1179,17 @@ class ChartPlotWidget(pg.PlotWidget):
**graphics_kwargs,
)
flow = self._flows[data_key] = Flow(
viz = self._vizs[data_key] = Viz(
data_key,
pi,
shm,
flume,
is_ohlc=is_ohlc,
# register curve graphics with this flow
# register curve graphics with this viz
graphics=graphics,
)
assert isinstance(flow.shm, ShmArray)
assert isinstance(viz.shm, ShmArray)
# TODO: this probably needs its own method?
if overlay:
@ -1321,7 +1246,7 @@ class ChartPlotWidget(pg.PlotWidget):
# understand.
pi.addItem(graphics)
return flow
return viz
def draw_ohlc(
self,
@ -1332,7 +1257,7 @@ class ChartPlotWidget(pg.PlotWidget):
array_key: Optional[str] = None,
**draw_curve_kwargs,
) -> Flow:
) -> Viz:
'''
Draw OHLC datums to chart.
@ -1358,41 +1283,12 @@ class ChartPlotWidget(pg.PlotWidget):
Update the named internal graphics from ``array``.
'''
flow = self._flows[array_key or graphics_name]
return flow.update_graphics(
viz = self._vizs[array_key or graphics_name]
return viz.update_graphics(
array_key=array_key,
**kwargs,
)
# def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms
# # to avoid having data "contents" overlap with them
# if self._labels:
# label = self._labels[self.name][0]
# rect = label.itemRect()
# tl, br = rect.topLeft(), rect.bottomRight()
# vb = self.plotItem.vb
# try:
# # on startup labels might not yet be rendered
# top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
# # XXX: magic hack, how do we compute exactly?
# label_h = (top - bottom) * 0.42
# except np.linalg.LinAlgError:
# label_h = 0
# else:
# label_h = 0
# # print(f'label height {self.name}: {label_h}')
# if label_h > yhigh - ylow:
# label_h = 0
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
# TODO: pretty sure we can just call the cursor
# directly not? i don't wee why we need special "signal proxies"
# for this lul..
@ -1426,36 +1322,34 @@ class ChartPlotWidget(pg.PlotWidget):
delayed=True,
)
# TODO: here we should instead look up the ``Flow.shm.array``
# TODO: here we should instead look up the ``Viz.shm.array``
# and read directly from shm to avoid copying to memory first
# and then reading it again here.
flow_key = name or self.name
flow = self._flows.get(flow_key)
if (
flow is None
):
log.error(f"flow {flow_key} doesn't exist in chart {self.name} !?")
viz_key = name or self.name
viz = self._vizs.get(viz_key)
if viz is None:
log.error(f"viz {viz_key} doesn't exist in chart {self.name} !?")
key = res = 0, 0
else:
(
first,
l,
_,
lbar,
rbar,
_,
r,
last,
) = bars_range or flow.datums_range()
profiler(f'{self.name} got bars range')
) = bars_range or viz.datums_range()
key = round(lbar), round(rbar)
res = flow.maxmin(*key)
profiler(f'{self.name} got bars range')
key = lbar, rbar
res = viz.maxmin(*key)
if (
res is None
):
log.warning(
f"{flow_key} no mxmn for bars_range => {key} !?"
f"{viz_key} no mxmn for bars_range => {key} !?"
)
res = 0, 0
if not self._on_screen:
@ -1463,5 +1357,19 @@ class ChartPlotWidget(pg.PlotWidget):
self._on_screen = True
profiler(f'yrange mxmn: {key} -> {res}')
# print(f'{flow_key} yrange mxmn: {key} -> {res}')
# print(f'{viz_key} yrange mxmn: {key} -> {res}')
return res
def get_viz(
self,
key: str,
) -> Viz:
'''
Try to get an underlying ``Viz`` by key.
'''
return self._vizs.get(key)
@property
def main_viz(self) -> Viz:
return self.get_viz(self.name)

View File

@ -274,8 +274,8 @@ class ContentsLabels:
) -> None:
for chart, name, label, update in self._labels:
flow = chart._flows[name]
array = flow.shm.array
viz = chart.get_viz(name)
array = viz.shm.array
if not (
index >= 0
@ -482,25 +482,32 @@ class Cursor(pg.GraphicsObject):
def add_curve_cursor(
self,
plot: ChartPlotWidget, # noqa
chart: ChartPlotWidget, # noqa
curve: 'PlotCurveItem', # noqa
) -> LineDot:
# if this plot contains curves add line dot "cursors" to denote
# if this chart contains curves add line dot "cursors" to denote
# the current sample under the mouse
main_flow = plot._flows[plot.name]
main_viz = chart.get_viz(chart.name)
# read out last index
i = main_flow.shm.array[-1]['index']
i = main_viz.shm.array[-1]['index']
cursor = LineDot(
curve,
index=i,
plot=plot
plot=chart
)
plot.addItem(cursor)
self.graphics[plot].setdefault('cursors', []).append(cursor)
chart.addItem(cursor)
self.graphics[chart].setdefault('cursors', []).append(cursor)
return cursor
def mouseAction(self, action, plot): # noqa
def mouseAction(
self,
action: str,
plot: ChartPlotWidget,
) -> None: # noqa
log.debug(f"{(action, plot.name)}")
if action == 'Enter':
self.active_plot = plot

View File

@ -36,10 +36,6 @@ from PyQt5.QtGui import (
)
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor
# from ._compression import (
# # ohlc_to_m4_line,
# ds_m4,
# )
from ..log import get_logger
from .._profile import Profiler
@ -55,7 +51,39 @@ _line_styles: dict[str, int] = {
}
class Curve(pg.GraphicsObject):
class FlowGraphic(pg.GraphicsObject):
'''
Base class with minimal interface for `QPainterPath` implemented,
real-time updated "data flow" graphics.
See subtypes below.
'''
# sub-type customization methods
declare_paintables: Optional[Callable] = None
sub_paint: Optional[Callable] = None
# TODO: can we remove this?
# sub_br: Optional[Callable] = None
def x_uppx(self) -> int:
px_vecs = self.pixelVectors()[0]
if px_vecs:
return px_vecs.x()
else:
return 0
def x_last(self) -> float | None:
'''
Return the last most x value of the last line segment or if not
drawn yet, ``None``.
'''
return self._last_line.x1() if self._last_line else None
class Curve(FlowGraphic):
'''
A faster, simpler, append friendly version of
``pyqtgraph.PlotCurveItem`` built for highly customizable real-time
@ -72,7 +100,7 @@ class Curve(pg.GraphicsObject):
lower level graphics data can be rendered in different threads and
then read and drawn in this main thread without having to worry
about dealing with Qt's concurrency primitives. See
``piker.ui._flows.Renderer`` for details and logic related to lower
``piker.ui._render.Renderer`` for details and logic related to lower
level path generation and incremental update. The main differences in
the path generation code include:
@ -85,11 +113,6 @@ class Curve(pg.GraphicsObject):
'''
# sub-type customization methods
declare_paintables: Optional[Callable] = None
sub_paint: Optional[Callable] = None
# sub_br: Optional[Callable] = None
def __init__(
self,
*args,
@ -99,7 +122,6 @@ class Curve(pg.GraphicsObject):
fill_color: Optional[str] = None,
style: str = 'solid',
name: Optional[str] = None,
use_fpath: bool = True,
**kwargs
@ -114,11 +136,11 @@ class Curve(pg.GraphicsObject):
# self._last_cap: int = 0
self.path: Optional[QPainterPath] = None
# additional path used for appends which tries to avoid
# triggering an update/redraw of the presumably larger
# historical ``.path`` above.
self.use_fpath = use_fpath
self.fast_path: Optional[QPainterPath] = None
# additional path that can be optionally used for appends which
# tries to avoid triggering an update/redraw of the presumably
# larger historical ``.path`` above. the flag to enable
# this behaviour is found in `Renderer.render()`.
self.fast_path: QPainterPath | None = None
# TODO: we can probably just dispense with the parent since
# we're basically only using the pen setting now...
@ -137,7 +159,7 @@ class Curve(pg.GraphicsObject):
# self.last_step_pen = pg.mkPen(hcolor(color), width=2)
self.last_step_pen = pg.mkPen(pen, width=2)
self._last_line = QLineF()
self._last_line: QLineF = QLineF()
# flat-top style histogram-like discrete curve
# self._step_mode: bool = step_mode
@ -158,51 +180,19 @@ class Curve(pg.GraphicsObject):
# endpoint (something we saw on trade rate curves)
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
# XXX: see explanation for different caching modes:
# https://stackoverflow.com/a/39410081
# seems to only be useful if we don't re-generate the entire
# QPainterPath every time
# curve.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# XXX-NOTE-XXX: graphics caching.
# see explanation for different caching modes:
# https://stackoverflow.com/a/39410081 seems to only be useful
# if we don't re-generate the entire QPainterPath every time
# don't ever use this - it's a colossal nightmare of artefacts
# and is disastrous for performance.
# curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache)
# self.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache)
# allow sub-type customization
declare = self.declare_paintables
if declare:
declare()
# TODO: probably stick this in a new parent
# type which will contain our own version of
# what ``PlotCurveItem`` had in terms of base
# functionality? A `FlowGraphic` maybe?
def x_uppx(self) -> int:
px_vecs = self.pixelVectors()[0]
if px_vecs:
xs_in_px = px_vecs.x()
return round(xs_in_px)
else:
return 0
def px_width(self) -> float:
vb = self.getViewBox()
if not vb:
return 0
vr = self.viewRect()
l, r = int(vr.left()), int(vr.right())
start, stop = self._xrange
lbar = max(l, start)
rbar = min(r, stop)
return vb.mapViewToDevice(
QLineF(lbar, 0, rbar, 0)
).length()
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
def getData(self):
@ -357,22 +347,30 @@ class Curve(pg.GraphicsObject):
self,
path: QPainterPath,
src_data: np.ndarray,
render_data: np.ndarray,
reset: bool,
array_key: str,
index_field: str,
) -> None:
# default line draw last call
# with self.reset_cache():
x = render_data['index']
y = render_data[array_key]
x = src_data[index_field]
y = src_data[array_key]
x_last = x[-1]
x_2last = x[-2]
# draw the "current" step graphic segment so it
# lines up with the "middle" of the current
# (OHLC) sample.
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y[-1],
# NOTE: currently we draw in x-domain
# from last datum to current such that
# the end of line touches the "beginning"
# of the current datum step span.
x_2last , y[-2],
x_last, y[-1],
)
return x, y
@ -388,13 +386,13 @@ class FlattenedOHLC(Curve):
self,
path: QPainterPath,
src_data: np.ndarray,
render_data: np.ndarray,
reset: bool,
array_key: str,
index_field: str,
) -> None:
lasts = src_data[-2:]
x = lasts['index']
x = lasts[index_field]
y = lasts['close']
# draw the "current" step graphic segment so it
@ -418,9 +416,9 @@ class StepCurve(Curve):
self,
path: QPainterPath,
src_data: np.ndarray,
render_data: np.ndarray,
reset: bool,
array_key: str,
index_field: str,
w: float = 0.5,
@ -429,14 +427,13 @@ class StepCurve(Curve):
# TODO: remove this and instead place all step curve
# updating into pre-path data render callbacks.
# full input data
x = src_data['index']
x = src_data[index_field]
y = src_data[array_key]
x_last = x[-1]
x_2last = x[-2]
y_last = y[-1]
step_size = x_last - x_2last
half_step = step_size / 2
# lol, commenting this makes step curves
# all "black" for me :eyeroll:..
@ -445,7 +442,7 @@ class StepCurve(Curve):
x_last, 0,
)
self._last_step_rect = QRectF(
x_last - half_step, 0,
x_last, 0,
step_size, y_last,
)
return x, y
@ -458,9 +455,3 @@ class StepCurve(Curve):
# p.drawLines(*tuple(filter(bool, self._last_step_lines)))
# p.drawRect(self._last_step_rect)
p.fillRect(self._last_step_rect, self._brush)
# def sub_br(
# self,
# parent_br: QRectF | None = None,
# ) -> QRectF:
# return self._last_step_rect

1083
piker/ui/_dataviz.py 100644

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -377,7 +377,7 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
nbars = ixmx - ixmn + 1
chart = self._chart
data = chart._flows[chart.name].shm.array[ixmn:ixmx]
data = chart.get_viz(chart.name).shm.array[ixmn:ixmx]
if len(data):
std = data['close'].std()

View File

@ -1,974 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level streaming graphics primitives.
This is an intermediate layer which associates real-time low latency
graphics primitives with underlying FSP related data structures for fast
incremental update.
'''
from __future__ import annotations
from typing import (
Optional,
)
import msgspec
import numpy as np
import pyqtgraph as pg
from PyQt5.QtGui import QPainterPath
from PyQt5.QtCore import QLineF
from ..data._sharedmem import (
ShmArray,
)
from ..data.feed import Flume
from .._profile import (
pg_profile_enabled,
# ms_slower_then,
)
from ._pathops import (
IncrementalFormatter,
OHLCBarsFmtr, # Plain OHLC renderer
OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm)
xy_downsample,
)
from ._ohlc import (
BarItems,
# bar_from_ohlc_row,
)
from ._curve import (
Curve,
StepCurve,
FlattenedOHLC,
)
from ..log import get_logger
from .._profile import Profiler
log = get_logger(__name__)
def render_baritems(
flow: Flow,
graphics: BarItems,
read: tuple[
int, int, np.ndarray,
int, int, np.ndarray,
],
profiler: Profiler,
**kwargs,
) -> None:
'''
Graphics management logic for a ``BarItems`` object.
Mostly just logic to determine when and how to downsample an OHLC
lines curve into a flattened line graphic and when to display one
graphic or the other.
TODO: this should likely be moved into some kind of better abstraction
layer, if not a `Renderer` then something just above it?
'''
bars = graphics
# if no source data renderer exists create one.
self = flow
show_bars: bool = False
r = self._src_r
if not r:
show_bars = True
# OHLC bars path renderer
r = self._src_r = Renderer(
flow=self,
fmtr=OHLCBarsFmtr(
shm=flow.shm,
flow=flow,
_last_read=read,
),
)
ds_curve_r = Renderer(
flow=self,
fmtr=OHLCBarsAsCurveFmtr(
shm=flow.shm,
flow=flow,
_last_read=read,
),
)
curve = FlattenedOHLC(
name=f'{flow.name}_ds_ohlc',
color=bars._color,
)
flow.ds_graphics = curve
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold.
self._render_table = (ds_curve_r, curve)
ds_r, curve = self._render_table
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update..
# - if instead we are in a downsamplig state then we to
x_gt = 6
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
in_line
and uppx < x_gt
):
# print('FLIPPING TO BARS')
should_line = False
flow._in_ds = False
elif (
not in_line
and uppx >= x_gt
):
# print('FLIPPING TO LINE')
should_line = True
flow._in_ds = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
r = ds_r
graphics = curve
profiler('updated ds curve')
else:
graphics = bars
if show_bars:
bars.show()
changed_to_line = False
if (
not in_line
and should_line
):
# change to line graphic
log.info(
f'downsampling to line graphic {self.name}'
)
bars.hide()
curve.show()
curve.update()
changed_to_line = True
elif in_line and not should_line:
# change to bars graphic
log.info(f'showing bars graphic {self.name}')
curve.hide()
bars.show()
bars.update()
return (
graphics,
r,
{'read_from_key': False},
should_line,
changed_to_line,
)
class Flow(msgspec.Struct): # , frozen=True):
'''
(Financial Signal-)Flow compound type which wraps a real-time
shm array stream with displayed graphics (curves, charts)
for high level access and control as well as efficient incremental
update.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
_shm: ShmArray
flume: Flume
graphics: Curve | BarItems
# for tracking y-mn/mx for y-axis auto-ranging
yrange: tuple[float, float] = None
# in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling, to
# start this is only ever an interpolation line.
ds_graphics: Optional[Curve] = None
is_ohlc: bool = False
render: bool = True # toggle for display loop
# downsampling state
_last_uppx: float = 0
_in_ds: bool = False
# map from uppx -> (downsampled data, incremental graphics)
_src_r: Optional[Renderer] = None
_render_table: dict[
Optional[int],
tuple[Renderer, pg.GraphicsItem],
] = (None, None)
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
# _shm: Optional[ShmArray] = None # currently, may be filled in "later"
# last read from shm (usually due to an update call)
_last_read: Optional[np.ndarray] = None
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
# TODO: remove this and only allow setting through
# private ``._shm`` attr?
# @shm.setter
# def shm(self, shm: ShmArray) -> ShmArray:
# self._shm = shm
def maxmin(
self,
lbar: int,
rbar: int,
) -> Optional[tuple[float, float]]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar`` or ``None``
if no range can be determined (yet).
'''
rkey = (lbar, rbar)
cached_result = self._mxmns.get(rkey)
if cached_result:
return cached_result
shm = self.shm
if shm is None:
return None
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:
(rbar - ifirst) + 1
]
if not slice_view.size:
return None
elif self.yrange:
mxmn = self.yrange
# print(f'{self.name} M4 maxmin: {mxmn}')
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
mxmn = ylow, yhigh
# print(f'{self.name} MANUAL maxmin: {mxmin}')
# cache result for input range
assert mxmn
self._mxmns[rkey] = mxmn
return mxmn
def view_range(self) -> tuple[int, int]:
'''
Return the indexes in view for the associated
plot displaying this flow's data.
'''
vr = self.plot.viewRect()
return (
vr.left(),
vr.right(),
)
def datums_range(
self,
index_field: str = 'index',
) -> tuple[
int, int, int, int, int, int
]:
'''
Return a range tuple for the datums present in view.
'''
l, r = self.view_range()
l = round(l)
r = round(r)
# TODO: avoid this and have shm passed
# in earlier.
if self.shm is None:
# haven't initialized the flow yet
return (0, l, 0, 0, r, 0)
array = self.shm.array
index = array['index']
start = index[0]
end = index[-1]
lbar = max(l, start)
rbar = min(r, end)
return (
start, l, lbar, rbar, r, end,
)
def read(
self,
array_field: Optional[str] = None,
index_field: str = 'index',
) -> tuple[
int, int, np.ndarray,
int, int, np.ndarray,
]:
'''
Read the underlying shm array buffer and
return the data plus indexes for the first
and last
which has been written to.
'''
# readable data
array = self.shm.array
indexes = array[index_field]
ifirst = indexes[0]
ilast = indexes[-1]
ifirst, l, lbar, rbar, r, ilast = self.datums_range()
# get read-relative indices adjusting
# for master shm index.
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
if array_field:
array = array[array_field]
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
in_view = array[lbar_i: rbar_i + 1]
return (
# abs indices + full data set
ifirst, ilast, array,
# relative indices + in view datums
lbar_i, rbar_i, in_view,
)
def update_graphics(
self,
use_vr: bool = True,
render: bool = True,
array_key: Optional[str] = None,
profiler: Optional[Profiler] = None,
do_append: bool = True,
**kwargs,
) -> pg.GraphicsObject:
'''
Read latest datums from shm and render to (incrementally)
render to graphics.
'''
profiler = Profiler(
msg=f'Flow.update_graphics() for {self.name}',
disabled=not pg_profile_enabled(),
ms_threshold=4,
# ms_threshold=ms_slower_then,
)
# shm read and slice to view
read = (
xfirst, xlast, src_array,
ivl, ivr, in_view,
) = self.read()
profiler('read src shm data')
graphics = self.graphics
if (
not in_view.size
or not render
):
# print('exiting early')
return graphics
slice_to_head: int = -1
should_redraw: bool = False
should_line: bool = False
rkwargs = {}
# TODO: probably specialize ``Renderer`` types instead of
# these logic checks?
# - put these blocks into a `.load_renderer()` meth?
# - consider a OHLCRenderer, StepCurveRenderer, Renderer?
r = self._src_r
if isinstance(graphics, BarItems):
# XXX: special case where we change out graphics
# to a line after a certain uppx threshold.
(
graphics,
r,
rkwargs,
should_line,
changed_to_line,
) = render_baritems(
self,
graphics,
read,
profiler,
**kwargs,
)
should_redraw = changed_to_line or not should_line
self._in_ds = should_line
elif not r:
if isinstance(graphics, StepCurve):
r = self._src_r = Renderer(
flow=self,
fmtr=StepCurveFmtr(
shm=self.shm,
flow=self,
_last_read=read,
),
)
# TODO: append logic inside ``.render()`` isn't
# correct yet for step curves.. remove this to see it.
should_redraw = True
slice_to_head = -2
else:
r = self._src_r
if not r:
# just using for ``.diff()`` atm..
r = self._src_r = Renderer(
flow=self,
fmtr=IncrementalFormatter(
shm=self.shm,
flow=self,
_last_read=read,
),
)
# ``Curve`` derivative case(s):
array_key = array_key or self.name
# print(array_key)
# ds update config
new_sample_rate: bool = False
should_ds: bool = r._in_ds
showing_src_data: bool = not r._in_ds
# downsampling incremental state checking
# check for and set std m4 downsample conditions
uppx = graphics.x_uppx()
uppx_diff = (uppx - self._last_uppx)
profiler(f'diffed uppx {uppx}')
if (
uppx > 1
and abs(uppx_diff) >= 1
):
log.debug(
f'{array_key} sampler change: {self._last_uppx} -> {uppx}'
)
self._last_uppx = uppx
new_sample_rate = True
showing_src_data = False
should_ds = True
should_redraw = True
elif (
uppx <= 2
and self._in_ds
):
# we should de-downsample back to our original
# source data so we clear our path data in prep
# to generate a new one from original source data.
new_sample_rate = True
should_ds = False
should_redraw = True
showing_src_data = True
# reset yrange to be computed from source data
self.yrange = None
# MAIN RENDER LOGIC:
# - determine in view data and redraw on range change
# - determine downsampling ops if needed
# - (incrementally) update ``QPainterPath``
out = r.render(
read,
array_key,
profiler,
uppx=uppx,
# use_vr=True,
# TODO: better way to detect and pass this?
# if we want to eventually cache renderers for a given uppx
# we should probably use this as a key + state?
should_redraw=should_redraw,
new_sample_rate=new_sample_rate,
should_ds=should_ds,
showing_src_data=showing_src_data,
slice_to_head=slice_to_head,
do_append=do_append,
**rkwargs,
)
if showing_src_data:
# print(f"{self.name} SHOWING SOURCE")
# reset yrange to be computed from source data
self.yrange = None
if not out:
log.warning(f'{self.name} failed to render!?')
return graphics
path, data, reset = out
# if self.yrange:
# print(f'flow {self.name} yrange from m4: {self.yrange}')
# XXX: SUPER UGGGHHH... without this we get stale cache
# graphics that don't update until you downsampler again..
# reset = False
# if reset:
# with graphics.reset_cache():
# # assign output paths to graphicis obj
# graphics.path = r.path
# graphics.fast_path = r.fast_path
# # XXX: we don't need this right?
# # graphics.draw_last_datum(
# # path,
# # src_array,
# # data,
# # reset,
# # array_key,
# # )
# # graphics.update()
# # profiler('.update()')
# else:
# assign output paths to graphicis obj
graphics.path = r.path
graphics.fast_path = r.fast_path
graphics.draw_last_datum(
path,
src_array,
data,
reset,
array_key,
)
graphics.update()
profiler('.update()')
# TODO: does this actuallly help us in any way (prolly should
# look at the source / ask ogi). I think it avoid artifacts on
# wheel-scroll downsampling curve updates?
# TODO: is this ever better?
# graphics.prepareGeometryChange()
# profiler('.prepareGeometryChange()')
# track downsampled state
self._in_ds = r._in_ds
return graphics
def draw_last(
self,
array_key: Optional[str] = None,
only_last_uppx: bool = False,
) -> None:
# shm read and slice to view
(
xfirst, xlast, src_array,
ivl, ivr, in_view,
) = self.read()
g = self.graphics
array_key = array_key or self.name
x, y = g.draw_last_datum(
g.path,
src_array,
src_array,
False, # never reset path
array_key,
)
# the renderer is downsampling we choose
# to always try and updadte a single (interpolating)
# line segment that spans and tries to display
# the las uppx's worth of datums.
# we only care about the last pixel's
# worth of data since that's all the screen
# can represent on the last column where
# the most recent datum is being drawn.
if self._in_ds or only_last_uppx:
dsg = self.ds_graphics or self.graphics
# XXX: pretty sure we don't need this?
# if isinstance(g, Curve):
# with dsg.reset_cache():
uppx = self._last_uppx
y = y[-uppx:]
ymn, ymx = y.min(), y.max()
# print(f'drawing uppx={uppx} mxmn line: {ymn}, {ymx}')
try:
iuppx = x[-uppx]
except IndexError:
# we're less then an x-px wide so just grab the start
# datum index.
iuppx = x[0]
dsg._last_line = QLineF(
iuppx, ymn,
x[-1], ymx,
)
# print(f'updating DS curve {self.name}')
dsg.update()
else:
# print(f'updating NOT DS curve {self.name}')
g.update()
class Renderer(msgspec.Struct):
flow: Flow
fmtr: IncrementalFormatter
# output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()``
path: Optional[QPainterPath] = None
fast_path: Optional[QPainterPath] = None
# XXX: just ideas..
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
# the ``.draw()`` implementation.
# graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None
# graphics_t_shm: Optional[ShmArray] = None
# path graphics update implementation methods
# prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# downsampling state
_last_uppx: float = 0
_in_ds: bool = False
def draw_path(
self,
x: np.ndarray,
y: np.ndarray,
connect: str | np.ndarray = 'all',
path: Optional[QPainterPath] = None,
redraw: bool = False,
) -> QPainterPath:
path_was_none = path is None
if redraw and path:
path.clear()
# TODO: avoid this?
if self.fast_path:
self.fast_path.clear()
# profiler('cleared paths due to `should_redraw=True`')
path = pg.functions.arrayToQPath(
x,
y,
connect=connect,
finiteCheck=False,
# reserve mem allocs see:
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - https://doc.qt.io/qt-5/qpainterpath.html#clear
# XXX: right now this is based on had hoc checks on a
# hidpi 3840x2160 4k monitor but we should optimize for
# the target display(s) on the sys.
# if no_path_yet:
# graphics.path.reserve(int(500e3))
# path=path, # path re-use / reserving
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
return path
def render(
self,
new_read,
array_key: str,
profiler: Profiler,
uppx: float = 1,
# redraw and ds flags
should_redraw: bool = False,
new_sample_rate: bool = False,
should_ds: bool = False,
showing_src_data: bool = True,
do_append: bool = True,
slice_to_head: int = -1,
use_fpath: bool = True,
# only render datums "in view" of the ``ChartView``
use_vr: bool = True,
read_from_key: bool = True,
) -> list[QPainterPath]:
'''
Render the current graphics path(s)
There are (at least) 3 stages from source data to graphics data:
- a data transform (which can be stored in additional shm)
- a graphics transform which converts discrete basis data to
a `float`-basis view-coords graphics basis. (eg. ``ohlc_flatten()``,
``step_path_arrays_from_1d()``, etc.)
- blah blah blah (from notes)
'''
# TODO: can the renderer just call ``Flow.read()`` directly?
# unpack latest source data read
fmtr = self.fmtr
(
_,
_,
array,
ivl,
ivr,
in_view,
) = new_read
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
fmt_out = fmtr.format_to_1d(
new_read,
array_key,
profiler,
slice_to_head=slice_to_head,
read_src_from_key=read_from_key,
slice_to_inview=use_vr,
)
# no history in view case
if not fmt_out:
# XXX: this might be why the profiler only has exits?
return
(
x_1d,
y_1d,
connect,
prepend_length,
append_length,
view_changed,
# append_tres,
) = fmt_out
# redraw conditions
if (
prepend_length > 0
or new_sample_rate
or view_changed
# NOTE: comment this to try and make "append paths"
# work below..
or append_length > 0
):
should_redraw = True
path = self.path
fast_path = self.fast_path
reset = False
# redraw the entire source data if we have either of:
# - no prior path graphic rendered or,
# - we always intend to re-render the data only in view
if (
path is None
or should_redraw
):
# print(f"{self.flow.name} -> REDRAWING BRUH")
if new_sample_rate and showing_src_data:
log.info(f'DEDOWN -> {array_key}')
self._in_ds = False
elif should_ds and uppx > 1:
x_1d, y_1d, ymn, ymx = xy_downsample(
x_1d,
y_1d,
uppx,
)
self.flow.yrange = ymn, ymx
# print(f'{self.flow.name} post ds: ymn, ymx: {ymn},{ymx}')
reset = True
profiler(f'FULL PATH downsample redraw={should_ds}')
self._in_ds = True
path = self.draw_path(
x=x_1d,
y=y_1d,
connect=connect,
path=path,
redraw=True,
)
profiler(
'generated fresh path. '
f'(should_redraw: {should_redraw} '
f'should_ds: {should_ds} new_sample_rate: {new_sample_rate})'
)
# TODO: get this piecewise prepend working - right now it's
# giving heck on vwap...
# elif prepend_length:
# prepend_path = pg.functions.arrayToQPath(
# x[0:prepend_length],
# y[0:prepend_length],
# connect='all'
# )
# # swap prepend path in "front"
# old_path = graphics.path
# graphics.path = prepend_path
# # graphics.path.moveTo(new_x[0], new_y[0])
# graphics.path.connectPath(old_path)
elif (
append_length > 0
and do_append
):
print(f'{array_key} append len: {append_length}')
# new_x = x_1d[-append_length - 2:] # slice_to_head]
# new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path')
# (
# x_1d,
# y_1d,
# connect,
# ) = append_tres
profiler(
f'diffed array input, append_length={append_length}'
)
# if should_ds and uppx > 1:
# new_x, new_y = xy_downsample(
# new_x,
# new_y,
# uppx,
# )
# profiler(f'fast path downsample redraw={should_ds}')
append_path = self.draw_path(
x=x_1d,
y=y_1d,
connect=connect,
path=fast_path,
)
profiler('generated append qpath')
if use_fpath:
# print(f'{self.flow.name}: FAST PATH')
# an attempt at trying to make append-updates faster..
if fast_path is None:
fast_path = append_path
# fast_path.reserve(int(6e3))
else:
fast_path.connectPath(append_path)
size = fast_path.capacity()
profiler(f'connected fast path w size: {size}')
print(
f"append_path br: {append_path.boundingRect()}\n"
f"path size: {size}\n"
f"append_path len: {append_path.length()}\n"
f"fast_path len: {fast_path.length()}\n"
)
# graphics.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path)
# XXX: lol this causes a hang..
# graphics.path = graphics.path.simplified()
else:
size = path.capacity()
profiler(f'connected history path w size: {size}')
path.connectPath(append_path)
self.path = path
self.fast_path = fast_path
return self.path, array, reset

View File

@ -79,14 +79,14 @@ def has_vlm(ohlcv: ShmArray) -> bool:
def update_fsp_chart(
chart: ChartPlotWidget,
flow,
viz,
graphics_name: str,
array_key: Optional[str],
**kwargs,
) -> None:
shm = flow.shm
shm = viz.shm
if not shm:
return
@ -289,7 +289,7 @@ async def run_fsp_ui(
# first UI update, usually from shm pushed history
update_fsp_chart(
chart,
chart._flows[array_key],
chart.get_viz(array_key),
name,
array_key=array_key,
)
@ -357,7 +357,7 @@ async def run_fsp_ui(
# last = time.time()
# TODO: maybe this should be our ``Flow`` type since it maps
# TODO: maybe this should be our ``Viz`` type since it maps
# one flume to the next? The machinery for task/actor mgmt should
# be part of the instantiation API?
class FspAdmin:
@ -386,7 +386,7 @@ class FspAdmin:
# TODO: make this a `.src_flume` and add
# a `dst_flume`?
# (=> but then wouldn't this be the most basic `Flow`?)
# (=> but then wouldn't this be the most basic `Viz`?)
self.flume = flume
def rr_next_portal(self) -> tractor.Portal:
@ -666,7 +666,7 @@ async def open_vlm_displays(
shm = ohlcv
ohlc_chart = linked.chart
chart = linked.add_plot(
vlm_chart = linked.add_plot(
name='volume',
shm=shm,
flume=flume,
@ -682,10 +682,12 @@ async def open_vlm_displays(
# the curve item internals are pretty convoluted.
style='step',
)
vlm_chart.view.enable_auto_yrange()
# back-link the volume chart to trigger y-autoranging
# in the ohlc (parent) chart.
ohlc_chart.view.enable_auto_yrange(
src_vb=chart.view,
src_vb=vlm_chart.view,
)
# force 0 to always be in view
@ -694,7 +696,7 @@ async def open_vlm_displays(
) -> tuple[float, float]:
'''
Flows "group" maxmin loop; assumes all named flows
Viz "group" maxmin loop; assumes all named flows
are in the same co-domain and thus can be sorted
as one set.
@ -707,7 +709,7 @@ async def open_vlm_displays(
'''
mx = 0
for name in names:
ymn, ymx = chart.maxmin(name=name)
ymn, ymx = vlm_chart.maxmin(name=name)
mx = max(mx, ymx)
return 0, mx
@ -715,34 +717,33 @@ async def open_vlm_displays(
# TODO: fix the x-axis label issue where if you put
# the axis on the left it's totally not lined up...
# show volume units value on LHS (for dinkus)
# chart.hideAxis('right')
# chart.showAxis('left')
# vlm_chart.hideAxis('right')
# vlm_chart.showAxis('left')
# send back new chart to caller
task_status.started(chart)
task_status.started(vlm_chart)
# should **not** be the same sub-chart widget
assert chart.name != linked.chart.name
assert vlm_chart.name != linked.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart.plotItem.getAxis(
'right')._stickies.get(chart.name)
last_val_sticky = vlm_chart.plotItem.getAxis(
'right')._stickies.get(vlm_chart.name)
# read from last calculated value
value = shm.array['volume'][-1]
last_val_sticky.update_from_data(-1, value)
vlm_curve = chart.update_graphics_from_flow(
vlm_curve = vlm_chart.update_graphics_from_flow(
'volume',
# shm.array,
)
# size view to data once at outset
chart.view._set_yrange()
vlm_chart.view._set_yrange()
# add axis title
axis = chart.getAxis('right')
axis = vlm_chart.getAxis('right')
axis.set_title(' vlm')
if dvlm:
@ -782,7 +783,7 @@ async def open_vlm_displays(
# XXX: the main chart already contains a vlm "units" axis
# so here we add an overlay wth a y-range in
# $ liquidity-value units (normally a fiat like USD).
dvlm_pi = chart.overlay_plotitem(
dvlm_pi = vlm_chart.overlay_plotitem(
'dolla_vlm',
index=0, # place axis on inside (nearest to chart)
axis_title=' $vlm',
@ -833,6 +834,7 @@ async def open_vlm_displays(
names: list[str],
pi: pg.PlotItem,
shm: ShmArray,
flume: Flume,
step_mode: bool = False,
style: str = 'solid',
@ -849,7 +851,7 @@ async def open_vlm_displays(
assert isinstance(shm, ShmArray)
assert isinstance(flume, Flume)
flow = chart.draw_curve(
viz = vlm_chart.draw_curve(
name,
shm,
flume,
@ -860,18 +862,13 @@ async def open_vlm_displays(
style=style,
pi=pi,
)
# TODO: we need a better API to do this..
# specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in
# ``.draw_curve()``.
# flow = chart._flows[name]
assert flow.plot is pi
assert viz.plot is pi
chart_curves(
fields,
dvlm_pi,
dvlm_flume.rt_shm,
dvlm_flume,
step_mode=True,
)
@ -900,17 +897,17 @@ async def open_vlm_displays(
# displayed and the curves are effectively the same minus
# liquidity events (well at least on low OHLC periods - 1s).
vlm_curve.hide()
chart.removeItem(vlm_curve)
vflow = chart._flows['volume']
vflow.render = False
vlm_chart.removeItem(vlm_curve)
vlm_viz = vlm_chart._vizs['volume']
vlm_viz.render = False
# avoid range sorting on volume once disabled
chart.view.disable_auto_yrange()
vlm_chart.view.disable_auto_yrange()
# Trade rate overlay
# XXX: requires an additional overlay for
# a trades-per-period (time) y-range.
tr_pi = chart.overlay_plotitem(
tr_pi = vlm_chart.overlay_plotitem(
'trade_rates',
# TODO: dynamically update period (and thus this axis?)
@ -940,6 +937,7 @@ async def open_vlm_displays(
trade_rate_fields,
tr_pi,
fr_flume.rt_shm,
fr_flume,
# step_mode=True,
# dashed line to represent "individual trades" being

View File

@ -76,7 +76,6 @@ async def handle_viewmode_kb_inputs(
pressed: set[str] = set()
last = time.time()
trigger_mode: str
action: str
on_next_release: Optional[Callable] = None
@ -495,7 +494,7 @@ class ChartView(ViewBox):
chart = self.linked.chart
# don't zoom more then the min points setting
l, lbar, rbar, r = chart.bars_range()
out = l, lbar, rbar, r = chart.get_viz(chart.name).bars_range()
# vl = r - l
# if ev.delta() > 0 and vl <= _min_points_to_show:
@ -504,7 +503,7 @@ class ChartView(ViewBox):
# if (
# ev.delta() < 0
# and vl >= len(chart._flows[chart.name].shm.array) + 666
# and vl >= len(chart._vizs[chart.name].shm.array) + 666
# ):
# log.debug("Min zoom bruh...")
# return
@ -821,7 +820,7 @@ class ChartView(ViewBox):
# XXX: only compute the mxmn range
# if none is provided as input!
if not yrange:
# flow = chart._flows[name]
# flow = chart._vizs[name]
yrange = self._maxmin()
if yrange is None:
@ -912,7 +911,7 @@ class ChartView(ViewBox):
graphics items which are our children.
'''
graphics = [f.graphics for f in self._chart._flows.values()]
graphics = [f.graphics for f in self._chart._vizs.values()]
if not graphics:
return 0
@ -948,7 +947,7 @@ class ChartView(ViewBox):
plots |= linked.subplots
for chart_name, chart in plots.items():
for name, flow in chart._flows.items():
for name, flow in chart._vizs.items():
if (
not flow.render

View File

@ -36,6 +36,7 @@ from PyQt5.QtCore import (
from PyQt5.QtGui import QPainterPath
from ._curve import FlowGraphic
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor
from ..log import get_logger
@ -51,7 +52,8 @@ log = get_logger(__name__)
def bar_from_ohlc_row(
row: np.ndarray,
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43
bar_w: float,
bar_gap: float = 0.16
) -> tuple[QLineF]:
'''
@ -59,8 +61,7 @@ def bar_from_ohlc_row(
OHLC "bar" for use in the "last datum" of a series.
'''
open, high, low, close, index = row[
['open', 'high', 'low', 'close', 'index']]
open, high, low, close, index = row
# TODO: maybe consider using `QGraphicsLineItem` ??
# gives us a ``.boundingRect()`` on the objects which may make
@ -68,9 +69,11 @@ def bar_from_ohlc_row(
# history path faster since it's done in C++:
# https://doc.qt.io/qt-5/qgraphicslineitem.html
mid: float = (bar_w / 2) + index
# high -> low vertical (body) line
if low != high:
hl = QLineF(index, low, index, high)
hl = QLineF(mid, low, mid, high)
else:
# XXX: if we don't do it renders a weird rectangle?
# see below for filtering this later...
@ -81,15 +84,18 @@ def bar_from_ohlc_row(
# the index's range according to the view mapping coordinates.
# open line
o = QLineF(index - w, open, index, open)
o = QLineF(index + bar_gap, open, mid, open)
# close line
c = QLineF(index, close, index + w, close)
c = QLineF(
mid, close,
index + bar_w - bar_gap, close,
)
return [hl, o, c]
class BarItems(pg.GraphicsObject):
class BarItems(FlowGraphic):
'''
"Price range" bars graphics rendered from a OHLC sampled sequence.
@ -113,13 +119,24 @@ class BarItems(pg.GraphicsObject):
self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2)
self._name = name
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
self.path = QPainterPath()
self._last_bar_lines: Optional[tuple[QLineF, ...]] = None
# XXX: causes this weird jitter bug when click-drag panning
# where the path curve will awkwardly flicker back and forth?
# self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
def x_uppx(self) -> int:
# we expect the downsample curve report this.
return 0
self.path = QPainterPath()
self._last_bar_lines: tuple[QLineF, ...] | None = None
def x_last(self) -> None | float:
'''
Return the last most x value of the close line segment
or if not drawn yet, ``None``.
'''
if self._last_bar_lines:
close_arm_line = self._last_bar_lines[-1]
return close_arm_line.x2() if close_arm_line else None
else:
return None
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
def boundingRect(self):
@ -214,33 +231,40 @@ class BarItems(pg.GraphicsObject):
self,
path: QPainterPath,
src_data: np.ndarray,
render_data: np.ndarray,
reset: bool,
array_key: str,
fields: list[str] = [
'index',
'open',
'high',
'low',
'close',
],
index_field: str,
) -> None:
# relevant fields
fields: list[str] = [
'open',
'high',
'low',
'close',
index_field,
]
ohlc = src_data[fields]
# last_row = ohlc[-1:]
# individual values
last_row = i, o, h, l, last = ohlc[-1]
last_row = o, h, l, last, i = ohlc[-1]
# times = src_data['time']
# if times[-1] - times[-2]:
# breakpoint()
index = src_data[index_field]
step_size = index[-1] - index[-2]
# generate new lines objects for updatable "current bar"
self._last_bar_lines = bar_from_ohlc_row(last_row)
bg: float = 0.16 * step_size
self._last_bar_lines = bar_from_ohlc_row(
last_row,
bar_w=step_size,
bar_gap=bg,
)
# assert i == graphics.start_index - 1
# assert i == last_index
@ -255,10 +279,16 @@ class BarItems(pg.GraphicsObject):
if l != h: # noqa
if body is None:
body = self._last_bar_lines[0] = QLineF(i, l, i, h)
body = self._last_bar_lines[0] = QLineF(
i + bg, l,
i + step_size - bg, h,
)
else:
# update body
body.setLine(i, l, i, h)
body.setLine(
body.x1(), l,
body.x2(), h,
)
# XXX: pretty sure this is causing an issue where the
# bar has a large upward move right before the next
@ -270,4 +300,4 @@ class BarItems(pg.GraphicsObject):
# because i've seen it do this to bars i - 3 back?
# return ohlc['time'], ohlc['close']
return ohlc['index'], ohlc['close']
return ohlc[index_field], ohlc['close']

View File

@ -54,6 +54,10 @@ def _do_overrides() -> None:
pg.functions.invertQTransform = invertQTransform
pg.PlotItem = PlotItem
# enable "QPainterPathPrivate for faster arrayToQPath" from
# https://github.com/pyqtgraph/pyqtgraph/pull/2324
pg.setConfigOption('enableExperimental', True)
# NOTE: the below customized type contains all our changes on a method
# by method basis as per the diff:

332
piker/ui/_render.py 100644
View File

@ -0,0 +1,332 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level streaming graphics primitives.
This is an intermediate layer which associates real-time low latency
graphics primitives with underlying stream/flow related data structures
for fast incremental update.
'''
from __future__ import annotations
from typing import (
Optional,
TYPE_CHECKING,
)
import msgspec
import numpy as np
import pyqtgraph as pg
from PyQt5.QtGui import QPainterPath
from ..data._formatters import (
IncrementalFormatter,
)
from ..data._pathops import (
xy_downsample,
)
from ..log import get_logger
from .._profile import (
Profiler,
)
if TYPE_CHECKING:
from ._dataviz import Viz
log = get_logger(__name__)
class Renderer(msgspec.Struct):
viz: Viz
fmtr: IncrementalFormatter
# output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()``
path: Optional[QPainterPath] = None
fast_path: Optional[QPainterPath] = None
# XXX: just ideas..
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
# the ``.draw()`` implementation.
# graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None
# graphics_t_shm: Optional[ShmArray] = None
# path graphics update implementation methods
# prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# downsampling state
_last_uppx: float = 0
_in_ds: bool = False
def draw_path(
self,
x: np.ndarray,
y: np.ndarray,
connect: str | np.ndarray = 'all',
path: Optional[QPainterPath] = None,
redraw: bool = False,
) -> QPainterPath:
path_was_none = path is None
if redraw and path:
path.clear()
# TODO: avoid this?
if self.fast_path:
self.fast_path.clear()
path = pg.functions.arrayToQPath(
x,
y,
connect=connect,
finiteCheck=False,
# reserve mem allocs see:
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - https://doc.qt.io/qt-5/qpainterpath.html#clear
# XXX: right now this is based on had hoc checks on a
# hidpi 3840x2160 4k monitor but we should optimize for
# the target display(s) on the sys.
# if no_path_yet:
# graphics.path.reserve(int(500e3))
# path=path, # path re-use / reserving
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
return path
def render(
self,
new_read,
array_key: str,
profiler: Profiler,
uppx: float = 1,
# redraw and ds flags
should_redraw: bool = False,
new_sample_rate: bool = False,
should_ds: bool = False,
showing_src_data: bool = True,
do_append: bool = True,
use_fpath: bool = True,
# only render datums "in view" of the ``ChartView``
use_vr: bool = True,
) -> tuple[QPainterPath, bool]:
'''
Render the current graphics path(s)
There are (at least) 3 stages from source data to graphics data:
- a data transform (which can be stored in additional shm)
- a graphics transform which converts discrete basis data to
a `float`-basis view-coords graphics basis. (eg. ``ohlc_flatten()``,
``step_path_arrays_from_1d()``, etc.)
- blah blah blah (from notes)
'''
# TODO: can the renderer just call ``Viz.read()`` directly?
# unpack latest source data read
fmtr = self.fmtr
(
_,
_,
array,
ivl,
ivr,
in_view,
) = new_read
# xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine.
fmt_out = fmtr.format_to_1d(
new_read,
array_key,
profiler,
slice_to_inview=use_vr,
)
# no history in view case
if not fmt_out:
# XXX: this might be why the profiler only has exits?
return
(
x_1d,
y_1d,
connect,
prepend_length,
append_length,
view_changed,
# append_tres,
) = fmt_out
# redraw conditions
if (
prepend_length > 0
or new_sample_rate
or view_changed
# NOTE: comment this to try and make "append paths"
# work below..
or append_length > 0
):
should_redraw = True
path: QPainterPath = self.path
fast_path: QPainterPath = self.fast_path
reset: bool = False
self.viz.yrange = None
# redraw the entire source data if we have either of:
# - no prior path graphic rendered or,
# - we always intend to re-render the data only in view
if (
path is None
or should_redraw
):
# print(f"{self.viz.name} -> REDRAWING BRUH")
if new_sample_rate and showing_src_data:
log.info(f'DEDOWN -> {array_key}')
self._in_ds = False
elif should_ds and uppx > 1:
x_1d, y_1d, ymn, ymx = xy_downsample(
x_1d,
y_1d,
uppx,
)
self.viz.yrange = ymn, ymx
# print(f'{self.viz.name} post ds: ymn, ymx: {ymn},{ymx}')
reset = True
profiler(f'FULL PATH downsample redraw={should_ds}')
self._in_ds = True
path = self.draw_path(
x=x_1d,
y=y_1d,
connect=connect,
path=path,
redraw=True,
)
profiler(
'generated fresh path. '
f'(should_redraw: {should_redraw} '
f'should_ds: {should_ds} new_sample_rate: {new_sample_rate})'
)
# TODO: get this piecewise prepend working - right now it's
# giving heck on vwap...
# elif prepend_length:
# prepend_path = pg.functions.arrayToQPath(
# x[0:prepend_length],
# y[0:prepend_length],
# connect='all'
# )
# # swap prepend path in "front"
# old_path = graphics.path
# graphics.path = prepend_path
# # graphics.path.moveTo(new_x[0], new_y[0])
# graphics.path.connectPath(old_path)
elif (
append_length > 0
and do_append
):
print(f'{array_key} append len: {append_length}')
# new_x = x_1d[-append_length - 2:] # slice_to_head]
# new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path')
# (
# x_1d,
# y_1d,
# connect,
# ) = append_tres
profiler(
f'diffed array input, append_length={append_length}'
)
# if should_ds and uppx > 1:
# new_x, new_y = xy_downsample(
# new_x,
# new_y,
# uppx,
# )
# profiler(f'fast path downsample redraw={should_ds}')
append_path = self.draw_path(
x=x_1d,
y=y_1d,
connect=connect,
path=fast_path,
)
profiler('generated append qpath')
if use_fpath:
# print(f'{self.viz.name}: FAST PATH')
# an attempt at trying to make append-updates faster..
if fast_path is None:
fast_path = append_path
# fast_path.reserve(int(6e3))
else:
fast_path.connectPath(append_path)
size = fast_path.capacity()
profiler(f'connected fast path w size: {size}')
print(
f"append_path br: {append_path.boundingRect()}\n"
f"path size: {size}\n"
f"append_path len: {append_path.length()}\n"
f"fast_path len: {fast_path.length()}\n"
)
# graphics.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path)
# XXX: lol this causes a hang..
# graphics.path = graphics.path.simplified()
else:
size = path.capacity()
profiler(f'connected history path w size: {size}')
path.connectPath(append_path)
self.path = path
self.fast_path = fast_path
return self.path, reset

View File

@ -494,7 +494,7 @@ class OrderMode:
uuid: str,
price: float,
arrow_index: float,
time_s: float,
pointing: Optional[str] = None,
@ -513,22 +513,32 @@ class OrderMode:
'''
dialog = self.dialogs[uuid]
lines = dialog.lines
chart = self.chart
# XXX: seems to fail on certain types of races?
# assert len(lines) == 2
if lines:
flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn]
flume: Flume = self.feed.flumes[chart.linked.symbol.fqsn]
_, _, ratio = flume.get_ds_info()
for i, chart in [
(arrow_index, self.chart),
(flume.izero_hist
+
round((arrow_index - flume.izero_rt)/ratio),
self.hist_chart)
for chart, shm in [
(self.chart, flume.rt_shm),
(self.hist_chart, flume.hist_shm),
]:
viz = chart.get_viz(chart.name)
index_field = viz.index_field
arr = shm.array
# TODO: borked for int index based..
index = flume.get_index(time_s, arr)
# get absolute index for arrow placement
arrow_index = arr[index_field][index]
self.arrows.add(
chart.plotItem,
uuid,
i,
arrow_index,
price,
pointing=pointing,
color=lines[0].color
@ -966,7 +976,6 @@ async def process_trade_msg(
if dialog:
fqsn = dialog.symbol
flume = mode.feed.flumes[fqsn]
match msg:
case Status(
@ -1037,11 +1046,11 @@ async def process_trade_msg(
# should only be one "fill" for an alert
# add a triangle and remove the level line
req = Order(**req)
index = flume.get_index(time.time())
tm = time.time()
mode.on_fill(
oid,
price=req.price,
arrow_index=index,
time_s=tm,
)
mode.lines.remove_line(uuid=oid)
msg.req = req
@ -1070,6 +1079,8 @@ async def process_trade_msg(
details = msg.brokerd_msg
# TODO: put the actual exchange timestamp?
# TODO: some kinda progress system?
# NOTE: currently the ``kraken`` openOrders sub
# doesn't deliver their engine timestamp as part of
# it's schema, so this value is **not** from them
@ -1080,15 +1091,11 @@ async def process_trade_msg(
# a true backend one? This will require finagling
# with how each backend tracks/summarizes time
# stamps for the downstream API.
index = flume.get_index(
details['broker_time']
)
# TODO: some kinda progress system
tm = details['broker_time']
mode.on_fill(
oid,
price=details['price'],
arrow_index=index,
time_s=tm,
pointing='up' if action == 'buy' else 'down',
)

View File

@ -1,3 +0,0 @@
"""
Super hawt Qt UI components
"""

View File

@ -1,67 +0,0 @@
import sys
from PySide2.QtCharts import QtCharts
from PySide2.QtWidgets import QApplication, QMainWindow
from PySide2.QtCore import Qt, QPointF
from PySide2 import QtGui
import qdarkstyle
data = ((1, 7380, 7520, 7380, 7510, 7324),
(2, 7520, 7580, 7410, 7440, 7372),
(3, 7440, 7650, 7310, 7520, 7434),
(4, 7450, 7640, 7450, 7550, 7480),
(5, 7510, 7590, 7460, 7490, 7502),
(6, 7500, 7590, 7480, 7560, 7512),
(7, 7560, 7830, 7540, 7800, 7584))
app = QApplication([])
# set dark stylesheet
# import pdb; pdb.set_trace()
app.setStyleSheet(qdarkstyle.load_stylesheet_pyside())
series = QtCharts.QCandlestickSeries()
series.setDecreasingColor(Qt.darkRed)
series.setIncreasingColor(Qt.darkGreen)
ma5 = QtCharts.QLineSeries() # 5-days average data line
tm = [] # stores str type data
# in a loop, series and ma5 append corresponding data
for num, o, h, l, c, m in data:
candle = QtCharts.QCandlestickSet(o, h, l, c)
series.append(candle)
ma5.append(QPointF(num, m))
tm.append(str(num))
pen = candle.pen()
# import pdb; pdb.set_trace()
chart = QtCharts.QChart()
# import pdb; pdb.set_trace()
series.setBodyOutlineVisible(False)
series.setCapsVisible(False)
# brush = QtGui.QBrush()
# brush.setColor(Qt.green)
# series.setBrush(brush)
chart.addSeries(series) # candle
chart.addSeries(ma5) # ma5 line
chart.setAnimationOptions(QtCharts.QChart.SeriesAnimations)
chart.createDefaultAxes()
chart.legend().hide()
chart.axisX(series).setCategories(tm)
chart.axisX(ma5).setVisible(False)
view = QtCharts.QChartView(chart)
view.chart().setTheme(QtCharts.QChart.ChartTheme.ChartThemeDark)
view.setRubberBand(QtCharts.QChartView.HorizontalRubberBand)
# chartview.chart().setTheme(QtCharts.QChart.ChartTheme.ChartThemeBlueCerulean)
ui = QMainWindow()
# ui.setGeometry(50, 50, 500, 300)
ui.setCentralWidget(view)
ui.show()
sys.exit(app.exec_())