Merge pull request #289 from pikers/big_data_lines

"Big data" lines
no_orderid_in_error
goodboy 2022-04-30 11:37:50 -04:00 committed by GitHub
commit 84399e8131
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1979 additions and 580 deletions

View File

@ -35,7 +35,7 @@ log = get_logger(__name__)
_root_dname = 'pikerd'
_registry_addr = ('127.0.0.1', 6116)
_registry_addr = ('127.0.0.1', 1616)
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
@ -91,13 +91,17 @@ class Services(BaseModel):
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
ctx_res = await ctx.result()
except tractor.ContextCancelled:
return await self.cancel_service(name)
else:
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until cancelled
# either by error from the target context function or by
# being cancelled here by the surrounding cancel scope
# NOTE: this will block indefinitely until
# cancelled either by error from the target
# context function or by being cancelled here by
# the surrounding cancel scope
return (await portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
@ -110,14 +114,17 @@ class Services(BaseModel):
# TODO: per service cancellation by scope, we aren't using this
# anywhere right?
# async def cancel_service(
# self,
# name: str,
# ) -> Any:
# log.info(f'Cancelling `pikerd` service {name}')
# cs, portal = self.service_tasks[name]
# cs.cancel()
# return await portal.cancel_actor()
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
# XXX: not entirely sure why this is required,
# and should probably be better fine tuned in
# ``tractor``?
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None
@ -372,6 +379,7 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
async def spawn_brokerd(

View File

@ -21,7 +21,10 @@ Profiling wrappers for internal libs.
import time
from functools import wraps
_pg_profile: bool = True
# NOTE: you can pass a flag to enable this:
# ``piker chart <args> --profile``.
_pg_profile: bool = False
ms_slower_then: float = 0
def pg_profile_enabled() -> bool:

View File

@ -178,7 +178,9 @@ class Allocator(BaseModel):
l_sub_pp = (self.currency_limit - live_cost_basis) / price
else:
raise ValueError(f"Not valid size unit '{size}'")
raise ValueError(
f"Not valid size unit '{size_unit}'"
)
# an entry (adding-to or starting a pp)
if (
@ -282,6 +284,14 @@ class Allocator(BaseModel):
return round(prop * self.slots)
_derivs = (
'future',
'continuous_future',
'option',
'futures_option',
)
def mk_allocator(
symbol: Symbol,
@ -290,7 +300,7 @@ def mk_allocator(
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
'size_unit': 'currency', #_size_units['currency'],
'size_unit': 'currency',
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
@ -318,11 +328,9 @@ def mk_allocator(
asset_type = symbol.type_key
# specific configs by asset class / type
if asset_type in ('future', 'option', 'futures_option'):
if asset_type in _derivs:
# since it's harder to know how currency "applies" in this case
# given leverage properties
alloc.size_unit = '# units'
@ -345,7 +353,7 @@ def mk_allocator(
if startup_size > alloc.units_limit:
alloc.units_limit = startup_size
if asset_type in ('future', 'option', 'futures_option'):
if asset_type in _derivs:
alloc.slots = alloc.units_limit
return alloc

View File

@ -261,7 +261,15 @@ async def clear_dark_triggers(
f'pred for {oid} was already removed!?'
)
try:
await ems_client_order_stream.send(msg)
except (
trio.ClosedResourceError,
):
log.warning(
f'client {ems_client_order_stream} stream is broke'
)
break
else: # condition scan loop complete
log.debug(f'execs are {execs}')
@ -573,8 +581,16 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients:
for client_stream in router.clients.copy():
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
continue

View File

@ -261,7 +261,10 @@ async def cascade(
destination shm array buffer.
'''
profiler = pg.debug.Profiler(delayed=False, disabled=False)
profiler = pg.debug.Profiler(
delayed=False,
disabled=False
)
if loglevel:
get_console_log(loglevel)

View File

@ -25,39 +25,10 @@ from PyQt5.QtCore import QPointF
from PyQt5.QtWidgets import QGraphicsPathItem
if TYPE_CHECKING:
from ._axes import PriceAxis
from ._chart import ChartPlotWidget
from ._label import Label
def marker_right_points(
chart: ChartPlotWidget, # noqa
marker_size: int = 20,
) -> (float, float, float):
'''
Return x-dimension, y-axis-aware, level-line marker oriented scene
values.
X values correspond to set the end of a level line, end of
a paried level line marker, and the right most side of the "right"
axis respectively.
'''
# TODO: compute some sensible maximum value here
# and use a humanized scheme to limit to that length.
l1_len = chart._max_l1_line_len
ryaxis = chart.getAxis('right')
r_axis_x = ryaxis.pos().x()
up_to_l1_sc = r_axis_x - l1_len - 10
marker_right = up_to_l1_sc - (1.375 * 2 * marker_size)
line_end = marker_right - (6/16 * marker_size)
return line_end, marker_right, r_axis_x
def vbr_left(
label: Label,

View File

@ -26,8 +26,6 @@ from PyQt5.QtWidgets import QGraphicsPathItem
from pyqtgraph import Point, functions as fn, Color
import numpy as np
from ._anchors import marker_right_points
def mk_marker_path(
@ -116,7 +114,7 @@ class LevelMarker(QGraphicsPathItem):
self.get_level = get_level
self._on_paint = on_paint
self.scene_x = lambda: marker_right_points(chart)[1]
self.scene_x = lambda: chart.marker_right_points()[1]
self.level: float = 0
self.keep_in_view = keep_in_view
@ -169,7 +167,7 @@ class LevelMarker(QGraphicsPathItem):
vr = view.state['viewRange']
ymn, ymx = vr[1]
# _, marker_right, _ = marker_right_points(line._chart)
# _, marker_right, _ = line._chart.marker_right_points()
x = self.scene_x()
if self.style == '>|': # short style, points "down-to" line

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -22,7 +22,11 @@ from __future__ import annotations
from typing import Optional, TYPE_CHECKING
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtCore import Qt
from PyQt5.QtCore import (
Qt,
QLineF,
# QPointF,
)
from PyQt5.QtWidgets import (
QFrame,
QWidget,
@ -30,10 +34,11 @@ from PyQt5.QtWidgets import (
QVBoxLayout,
QSplitter,
)
import msgspec
import numpy as np
# from pydantic import BaseModel
import pyqtgraph as pg
import trio
from pydantic import BaseModel
from ._axes import (
DynamicDateAxis,
@ -52,15 +57,17 @@ from ._style import (
CHART_MARGINS,
_xaxis_at,
_min_points_to_show,
_bars_from_right_in_follow_mode,
_bars_to_left_in_follow_mode,
)
from ..data.feed import Feed
from ..data._source import Symbol
from ..data._sharedmem import ShmArray
from ..data._sharedmem import (
ShmArray,
# _Token,
)
from ..log import get_logger
from ._interaction import ChartView
from ._forms import FieldsForm
from .._profile import pg_profile_enabled, ms_slower_then
from ._overlay import PlotItemOverlay
if TYPE_CHECKING:
@ -238,6 +245,12 @@ class GodWidget(QWidget):
# resume feeds *after* rendering chart view asap
chart.resume_all_feeds()
# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
chart.default_view()
self.linkedsplits = linkedsplits
symbol = linkedsplits.symbol
if symbol is not None:
@ -376,12 +389,15 @@ class LinkedSplits(QWidget):
'''
ln = len(self.subplots)
if not prop:
# proportion allocated to consumer subcharts
if ln < 2:
prop = 1/3
elif ln >= 2:
prop = 3/8
if not prop:
prop = 3/8*5/8
# if ln < 2:
# prop = 3/8*5/8
# elif ln >= 2:
# prop = 3/8
major = 1 - prop
min_h_ind = int((self.height() * prop) / ln)
@ -642,31 +658,6 @@ class LinkedSplits(QWidget):
cpw.sidepane.setMaximumWidth(sp_w)
# class FlowsTable(pydantic.BaseModel):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
class Flow(BaseModel):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
'''
class Config:
arbitrary_types_allowed = True
name: str
plot: pg.PlotItem
shm: Optional[ShmArray] = None # may be filled in "later"
class ChartPlotWidget(pg.PlotWidget):
'''
``GraphicsView`` subtype containing a single ``PlotItem``.
@ -821,17 +812,72 @@ class ChartPlotWidget(pg.PlotWidget):
return int(vr.left()), int(vr.right())
def bars_range(self) -> tuple[int, int, int, int]:
"""Return a range tuple for the bars present in view.
"""
'''
Return a range tuple for the bars present in view.
'''
l, r = self.view_range()
array = self._arrays[self.name]
lbar = max(l, array[0]['index'])
rbar = min(r, array[-1]['index'])
start, stop = self._xrange = (
array[0]['index'],
array[-1]['index'],
)
lbar = max(l, start)
rbar = min(r, stop)
return l, lbar, rbar, r
def curve_width_pxs(
self,
) -> float:
_, lbar, rbar, _ = self.bars_range()
return self.view.mapViewToDevice(
QLineF(lbar, 0, rbar, 0)
).length()
def pre_l1_xs(self) -> tuple[float, float]:
'''
Return the view x-coord for the value just before
the L1 labels on the y-axis as well as the length
of that L1 label from the y-axis.
'''
line_end, marker_right, yaxis_x = self.marker_right_points()
view = self.view
line = view.mapToView(
QLineF(line_end, 0, yaxis_x, 0)
)
return line.x1(), line.length()
def marker_right_points(
self,
marker_size: int = 20,
) -> (float, float, float):
'''
Return x-dimension, y-axis-aware, level-line marker oriented scene
values.
X values correspond to set the end of a level line, end of
a paried level line marker, and the right most side of the "right"
axis respectively.
'''
# TODO: compute some sensible maximum value here
# and use a humanized scheme to limit to that length.
l1_len = self._max_l1_line_len
ryaxis = self.getAxis('right')
r_axis_x = ryaxis.pos().x()
up_to_l1_sc = r_axis_x - l1_len - 10
marker_right = up_to_l1_sc - (1.375 * 2 * marker_size)
line_end = marker_right - (6/16 * marker_size)
return line_end, marker_right, r_axis_x
def default_view(
self,
index: int = -1,
steps_on_screen: Optional[int] = None
) -> None:
'''
@ -839,13 +885,38 @@ class ChartPlotWidget(pg.PlotWidget):
'''
try:
xlast = self._arrays[self.name][index]['index']
index = self._arrays[self.name]['index']
except IndexError:
log.warning(f'array for {self.name} not loaded yet?')
return
begin = xlast - _bars_to_left_in_follow_mode
end = xlast + _bars_from_right_in_follow_mode
xfirst, xlast = index[0], index[-1]
l, lbar, rbar, r = self.bars_range()
marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1
if (
rbar < 0
or l < xfirst
or (rbar - lbar) < 6
):
# set fixed bars count on screen that approx includes as
# many bars as possible before a downsample line is shown.
begin = xlast - round(6116 / 6)
else:
begin = end - (r - l)
# for debugging
# print(
# f'bars range: {brange}\n'
# f'xlast: {xlast}\n'
# f'marker pos: {marker_pos}\n'
# f'l1 len: {l1_len}\n'
# f'begin: {begin}\n'
# f'end: {end}\n'
# )
# remove any custom user yrange setttings
if self._static_yrange == 'axis':
@ -858,10 +929,16 @@ class ChartPlotWidget(pg.PlotWidget):
padding=0,
)
view._set_yrange()
self.view.maybe_downsample_graphics()
try:
self.linked.graphics_cycle()
except IndexError:
pass
def increment_view(
self,
steps: int = 1,
vb: Optional[ChartView] = None,
) -> None:
"""
@ -870,7 +947,8 @@ class ChartPlotWidget(pg.PlotWidget):
"""
l, r = self.view_range()
self.view.setXRange(
view = vb or self.view
view.setXRange(
min=l + steps,
max=r + steps,
@ -892,8 +970,10 @@ class ChartPlotWidget(pg.PlotWidget):
'''
graphics = BarItems(
self.linked,
self.plotItem,
pen_color=self.pen_color
pen_color=self.pen_color,
name=name,
)
# adds all bar/candle graphics objects for each data point in
@ -905,6 +985,14 @@ class ChartPlotWidget(pg.PlotWidget):
data_key = array_key or name
self._graphics[data_key] = graphics
self._flows[data_key] = Flow(
name=name,
plot=self.plotItem,
is_ohlc=True,
graphics=graphics,
)
self._add_sticky(name, bg_color='davies')
return graphics, data_key
@ -945,6 +1033,7 @@ class ChartPlotWidget(pg.PlotWidget):
)
pi.hideButtons()
# cv.enable_auto_yrange(self.view)
cv.enable_auto_yrange()
# compose this new plot's graphics with the current chart's
@ -975,6 +1064,7 @@ class ChartPlotWidget(pg.PlotWidget):
overlay: bool = False,
color: Optional[str] = None,
add_label: bool = True,
pi: Optional[pg.PlotItem] = None,
**pdi_kwargs,
@ -1002,12 +1092,6 @@ class ChartPlotWidget(pg.PlotWidget):
# on data reads and makes graphics rendering no faster
# clipToView=True,
# TODO: see how this handles with custom ohlcv bars graphics
# and/or if we can implement something similar for OHLC graphics
# autoDownsample=True,
# downsample=60,
# downsampleMethod='subsample',
**pdi_kwargs,
)
@ -1025,7 +1109,14 @@ class ChartPlotWidget(pg.PlotWidget):
self._graphics[name] = curve
self._arrays[data_key] = data
pi = self.plotItem
pi = pi or self.plotItem
self._flows[data_key] = Flow(
name=name,
plot=pi,
is_ohlc=False,
graphics=curve,
)
# TODO: this probably needs its own method?
if overlay:
@ -1035,10 +1126,6 @@ class ChartPlotWidget(pg.PlotWidget):
f'{overlay} must be from `.plotitem_overlay()`'
)
pi = overlay
# anchor_at = ('bottom', 'left')
self._flows[name] = Flow(name=name, plot=pi)
else:
# anchor_at = ('top', 'left')
@ -1046,7 +1133,17 @@ class ChartPlotWidget(pg.PlotWidget):
# (we need something that avoids clutter on x-axis).
self._add_sticky(name, bg_color=color)
# NOTE: this is more or less the RENDER call that tells Qt to
# start showing the generated graphics-curves. This is kind of
# of edge-triggered call where once added any
# ``QGraphicsItem.update()`` calls are automatically displayed.
# Our internal graphics objects have their own "update from
# data" style method API that allows for real-time updates on
# the next render cycle; just note a lot of the real-time
# updates are implicit and require a bit of digging to
# understand.
pi.addItem(curve)
return curve, data_key
# TODO: make this a ctx mngr
@ -1078,11 +1175,16 @@ class ChartPlotWidget(pg.PlotWidget):
)
return last
def update_ohlc_from_array(
def update_graphics_from_array(
self,
graphics_name: str,
array: np.ndarray,
array: Optional[np.ndarray] = None,
array_key: Optional[str] = None,
use_vr: bool = True,
render: bool = True,
**kwargs,
) -> pg.GraphicsObject:
@ -1090,49 +1192,63 @@ class ChartPlotWidget(pg.PlotWidget):
Update the named internal graphics from ``array``.
'''
self._arrays[self.name] = array
if array is not None:
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
data_key = self.name
if array is not None:
# write array to internal graphics table
self._arrays[data_key] = array
else:
array = self._arrays[data_key]
# array key and graphics "name" might be different..
graphics = self._graphics[graphics_name]
graphics.update_from_array(array, **kwargs)
# compute "in-view" indices
l, lbar, rbar, r = self.bars_range()
indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
in_view = array[lbar_i: rbar_i + 1]
if (
not in_view.size
or not render
):
return graphics
def update_curve_from_array(
self,
if isinstance(graphics, BarItems):
graphics.update_from_array(
array,
in_view,
view_range=(lbar_i, rbar_i) if use_vr else None,
graphics_name: str,
array: np.ndarray,
array_key: Optional[str] = None,
**kwargs,
)
) -> pg.GraphicsObject:
'''
Update the named internal graphics from ``array``.
'''
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
self._arrays[self.name] = array
else:
self._arrays[data_key] = array
curve = self._graphics[graphics_name]
# NOTE: back when we weren't implementing the curve graphics
# ourselves you'd have updates using this method:
# curve.setData(y=array[graphics_name], x=array['index'], **kwargs)
# NOTE: graphics **must** implement a diff based update
# operation where an internal ``FastUpdateCurve._xrange`` is
# used to determine if the underlying path needs to be
# pre/ap-pended.
curve.update_from_array(
graphics.update_from_array(
x=array['index'],
y=array[data_key],
x_iv=in_view['index'],
y_iv=in_view[data_key],
view_range=(lbar_i, rbar_i) if use_vr else None,
**kwargs
)
return curve
return graphics
# def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms
@ -1163,6 +1279,9 @@ class ChartPlotWidget(pg.PlotWidget):
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
# TODO: pretty sure we can just call the cursor
# directly not? i don't wee why we need special "signal proxies"
# for this lul..
def enterEvent(self, ev): # noqa
# pg.PlotWidget.enterEvent(self, ev)
self.sig_mouse_enter.emit(self)
@ -1187,6 +1306,22 @@ class ChartPlotWidget(pg.PlotWidget):
else:
return ohlc['index'][-1]
def in_view(
self,
array: np.ndarray,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
l, lbar, rbar, r = self.bars_range()
ifirst = array[0]['index']
# slice data by offset from the first index
# available in the passed datum set.
return array[lbar - ifirst:(rbar - ifirst) + 1]
def maxmin(
self,
name: Optional[str] = None,
@ -1199,46 +1334,131 @@ class ChartPlotWidget(pg.PlotWidget):
If ``bars_range`` is provided use that range.
'''
l, lbar, rbar, r = bars_range or self.bars_range()
# TODO: logic to check if end of bars in view
# extra = view_len - _min_points_to_show
# begin = self._arrays['ohlc'][0]['index'] - extra
# # end = len(self._arrays['ohlc']) - 1 + extra
# end = self._arrays['ohlc'][-1]['index'] - 1 + extra
profiler = pg.debug.Profiler(
msg=f'`{str(self)}.maxmin()` loop cycle for: `{self.name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
delayed=True,
)
# bars_len = rbar - lbar
# log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
l, lbar, rbar, r = bars_range or self.bars_range()
profiler(f'{self.name} got bars range')
# TODO: here we should instead look up the ``Flow.shm.array``
# and read directly from shm to avoid copying to memory first
# and then reading it again here.
a = self._arrays.get(name or self.name)
if a is None:
return None
ifirst = a[0]['index']
bars = a[lbar - ifirst:(rbar - ifirst) + 1]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
return
flow_key = name or self.name
flow = self._flows.get(flow_key)
if (
self.data_key == self.linked.symbol.key
flow is None
):
# ohlc sampled bars hi/lo lookup
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
log.error(f"flow {flow_key} doesn't exist in chart {self.name} !?")
res = 0, 0
else:
view = bars[name or self.data_key]
ylow = np.nanmin(view)
yhigh = np.nanmax(view)
key = round(lbar), round(rbar)
res = flow.maxmin(*key)
profiler(f'yrange mxmn: {key} -> {res}')
if res == (None, None):
log.error(
f"{flow_key} no mxmn for bars_range => {key} !?"
)
res = 0, 0
# print(f'{(ylow, yhigh)}')
return ylow, yhigh
return res
# class FlowsTable(pydantic.BaseModel):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
class Flow(msgspec.Struct): # , frozen=True):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
is_ohlc: bool = False
graphics: pg.GraphicsObject
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
_shm: Optional[ShmArray] = None # currently, may be filled in "later"
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
cached_result = self._mxmns.get(rkey)
if cached_result:
return cached_result
shm = self.shm
if shm is None:
mxmn = None
else: # new block for profiling?..
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:
(rbar - ifirst) + 1
]
if not slice_view.size:
mxmn = None
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
mxmn = ylow, yhigh
if mxmn is not None:
# cache new mxmn result
self._mxmns[rkey] = mxmn
return mxmn

View File

@ -0,0 +1,351 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Graphics related downsampling routines for compressing to pixel
limits on the display device.
'''
import math
from typing import Optional
import numpy as np
from numpy.lib import recfunctions as rfn
from numba import (
jit,
# float64, optional, int64,
)
from ..log import get_logger
log = get_logger(__name__)
def hl2mxmn(ohlc: np.ndarray) -> np.ndarray:
'''
Convert a OHLC struct-array containing 'high'/'low' columns
to a "joined" max/min 1-d array.
'''
index = ohlc['index']
hls = ohlc[[
'low',
'high',
]]
mxmn = np.empty(2*hls.size, dtype=np.float64)
x = np.empty(2*hls.size, dtype=np.float64)
trace_hl(hls, mxmn, x, index[0])
x = x + index[0]
return mxmn, x
@jit(
# TODO: the type annots..
# float64[:](float64[:],),
nopython=True,
)
def trace_hl(
hl: 'np.ndarray',
out: np.ndarray,
x: np.ndarray,
start: int,
# the "offset" values in the x-domain which
# place the 2 output points around each ``int``
# master index.
margin: float = 0.43,
) -> None:
'''
"Trace" the outline of the high-low values of an ohlc sequence
as a line such that the maximum deviation (aka disperaion) between
bars if preserved.
This routine is expected to modify input arrays in-place.
'''
last_l = hl['low'][0]
last_h = hl['high'][0]
for i in range(hl.size):
row = hl[i]
l, h = row['low'], row['high']
up_diff = h - last_l
down_diff = last_h - l
if up_diff > down_diff:
out[2*i + 1] = h
out[2*i] = last_l
else:
out[2*i + 1] = l
out[2*i] = last_h
last_l = l
last_h = h
x[2*i] = int(i) - margin
x[2*i + 1] = int(i) + margin
return out
def ohlc_flatten(
ohlc: np.ndarray,
use_mxmn: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
'''
Convert an OHLCV struct-array into a flat ready-for-line-plotting
1-d array that is 4 times the size with x-domain values distributed
evenly (by 0.5 steps) over each index.
'''
index = ohlc['index']
if use_mxmn:
# traces a line optimally over highs to lows
# using numba. NOTE: pretty sure this is faster
# and looks about the same as the below output.
flat, x = hl2mxmn(ohlc)
else:
flat = rfn.structured_to_unstructured(
ohlc[['open', 'high', 'low', 'close']]
).flatten()
x = np.linspace(
start=index[0] - 0.5,
stop=index[-1] + 0.5,
num=len(flat),
)
return x, flat
def ohlc_to_m4_line(
ohlc: np.ndarray,
px_width: int,
downsample: bool = False,
uppx: Optional[float] = None,
pretrace: bool = False,
) -> tuple[np.ndarray, np.ndarray]:
'''
Convert an OHLC struct-array to a m4 downsampled 1-d array.
'''
xpts, flat = ohlc_flatten(
ohlc,
use_mxmn=pretrace,
)
if downsample:
bins, x, y = ds_m4(
xpts,
flat,
px_width=px_width,
uppx=uppx,
log_scale=bool(uppx)
)
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array([-0.43, 0, 0, 0.43])).flatten()
y = y.flatten()
return x, y
else:
return xpts, flat
def ds_m4(
x: np.ndarray,
y: np.ndarray,
# this is the width of the data in view
# in display-device-local pixel units.
px_width: int,
uppx: Optional[float] = None,
log_scale: bool = True,
) -> tuple[int, np.ndarray, np.ndarray]:
'''
Downsample using the M4 algorithm.
This is more or less an OHLC style sampling of a line-style series.
'''
# NOTE: this method is a so called "visualization driven data
# aggregation" approach. It gives error-free line chart
# downsampling, see
# further scientific paper resources:
# - http://www.vldb.org/pvldb/vol7/p797-jugel.pdf
# - http://www.vldb.org/2014/program/papers/demo/p997-jugel.pdf
# Details on implementation of this algo are based in,
# https://github.com/pikers/piker/issues/109
# XXX: from infinite on downsampling viewable graphics:
# "one thing i remembered about the binning - if you are
# picking a range within your timeseries the start and end bin
# should be one more bin size outside the visual range, then
# you get better visual fidelity at the edges of the graph"
# "i didn't show it in the sample code, but it's accounted for
# in the start and end indices and number of bins"
# optionally log-scale down the "supposed pxs on screen"
# as the units-per-px (uppx) get's large.
if log_scale:
assert uppx, 'You must provide a `uppx` value to use log scaling!'
# scaler = 2**7 / (1 + math.log(uppx, 2))
scaler = round(
max(
# NOTE: found that a 16x px width brought greater
# detail, likely due to dpi scaling?
# px_width=px_width * 16,
2**7 / (1 + math.log(uppx, 2)),
1
)
)
px_width *= scaler
assert px_width > 1 # width of screen in pxs?
# NOTE: if we didn't pre-slice the data to downsample
# you could in theory pass these as the slicing params,
# do we care though since we can always just pre-slice the
# input?
x_start = x[0] # x value start/lowest in domain
x_end = x[-1] # x end value/highest in domain
# XXX: always round up on the input pixels
px_width = math.ceil(px_width)
x_range = x_end - x_start
# ratio of indexed x-value to width of raster in pixels.
# this is more or less, uppx: units-per-pixel.
w = x_range / float(px_width)
# ensure we make more then enough
# frames (windows) for the output pixel
frames = px_width
# if we have more and then exact integer's
# (uniform quotient output) worth of datum-domain-points
# per windows-frame, add one more window to ensure
# we have room for all output down-samples.
pts_per_pixel, r = divmod(len(x), frames)
if r:
frames += 1
# call into ``numba``
nb, i_win, y_out = _m4(
x,
y,
frames,
# TODO: see func below..
# i_win,
# y_out,
# first index in x data to start at
x_start,
# window size for each "frame" of data to downsample (normally
# scaled by the ratio of pixels on screen to data in x-range).
w,
)
# filter out any overshoot in the input allocation arrays by
# removing zero-ed tail entries which should start at a certain
# index.
i_win = i_win[i_win != 0]
y_out = y_out[:i_win.size]
return nb, i_win, y_out
@jit(
nopython=True,
nogil=True,
)
def _m4(
xs: np.ndarray,
ys: np.ndarray,
frames: int,
# TODO: using this approach by having the ``.zeros()`` alloc lines
# below, in put python was causing segs faults and alloc crashes..
# we might need to see how it behaves with shm arrays and consider
# allocating them once at startup?
# pre-alloc array of x indices mapping to the start
# of each window used for downsampling in y.
# i_win: np.ndarray,
# pre-alloc array of output downsampled y values
# y_out: np.ndarray,
x_start: int,
step: float,
) -> int:
# nbins = len(i_win)
# count = len(xs)
# these are pre-allocated and mutated by ``numba``
# code in-place.
y_out = np.zeros((frames, 4), ys.dtype)
i_win = np.zeros(frames, xs.dtype)
bincount = 0
x_left = x_start
# Find the first window's starting value which *includes* the
# first value in the x-domain array, i.e. the first
# "left-side-of-window" **plus** the downsampling step,
# creates a window which includes the first x **value**.
while xs[0] >= x_left + step:
x_left += step
# set all bins in the left-most entry to the starting left-most x value
# (aka a row broadcast).
i_win[bincount] = x_left
# set all y-values to the first value passed in.
y_out[bincount] = ys[0]
for i in range(len(xs)):
x = xs[i]
y = ys[i]
if x < x_left + step: # the current window "step" is [bin, bin+1)
y_out[bincount, 1] = min(y, y_out[bincount, 1])
y_out[bincount, 2] = max(y, y_out[bincount, 2])
y_out[bincount, 3] = y
else:
# Find the next bin
while x >= x_left + step:
x_left += step
bincount += 1
i_win[bincount] = x_left
y_out[bincount] = y
return bincount, i_win, y_out

View File

@ -95,22 +95,24 @@ class LineDot(pg.CurvePoint):
def event(
self,
ev: QtCore.QEvent,
) -> None:
) -> bool:
if not isinstance(
ev, QtCore.QDynamicPropertyChangeEvent
) or self.curve() is None:
return False
# TODO: get rid of this ``.getData()`` and
# make a more pythonic api to retreive backing
# numpy arrays...
(x, y) = self.curve().getData()
index = self.property('index')
# first = self._plot._arrays['ohlc'][0]['index']
# first = x[0]
# i = index - first
if index:
i = index - x[0]
i = round(index - x[0])
if i > 0 and i < len(y):
newPos = (index, y[i])
QtWidgets.QGraphicsItem.setPos(self, *newPos)
@ -405,6 +407,7 @@ class Cursor(pg.GraphicsObject):
slot=self.mouseMoved,
delay=_debounce_delay,
)
px_enter = pg.SignalProxy(
plot.sig_mouse_enter,
rateLimit=_mouse_rate_limit,

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -23,6 +23,7 @@ from typing import Optional
import numpy as np
import pyqtgraph as pg
from PyQt5 import QtGui, QtWidgets
from PyQt5.QtWidgets import QGraphicsItem
from PyQt5.QtCore import (
Qt,
QLineF,
@ -31,8 +32,16 @@ from PyQt5.QtCore import (
QPointF,
)
from .._profile import pg_profile_enabled
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor
from ._compression import (
# ohlc_to_m4_line,
ds_m4,
)
from ..log import get_logger
log = get_logger(__name__)
def step_path_arrays_from_1d(
@ -94,8 +103,7 @@ _line_styles: dict[str, int] = {
}
# TODO: got a feeling that dropping this inheritance gets us even more speedups
class FastAppendCurve(pg.PlotCurveItem):
class FastAppendCurve(pg.GraphicsObject):
'''
A faster, append friendly version of ``pyqtgraph.PlotCurveItem``
built for real-time data updates.
@ -110,22 +118,42 @@ class FastAppendCurve(pg.PlotCurveItem):
'''
def __init__(
self,
x: np.ndarray,
y: np.ndarray,
*args,
step_mode: bool = False,
color: str = 'default_lightest',
fill_color: Optional[str] = None,
style: str = 'solid',
name: Optional[str] = None,
use_fpath: bool = True,
**kwargs
) -> None:
# brutaaalll, see comments within..
self._y = self.yData = y
self._x = self.xData = x
self._name = name
self.path: Optional[QtGui.QPainterPath] = None
self.use_fpath = use_fpath
self.fast_path: Optional[QtGui.QPainterPath] = None
# TODO: we can probably just dispense with the parent since
# we're basically only using the pen setting now...
super().__init__(*args, **kwargs)
self._name = name
self._xrange: tuple[int, int] = self.dataBounds(ax=0)
# self._xrange: tuple[int, int] = self.dataBounds(ax=0)
self._xrange: Optional[tuple[int, int]] = None
# self._last_draw = time.time()
self._in_ds: bool = False
self._last_uppx: float = 0
# all history of curve is drawn in single px thickness
pen = pg.mkPen(hcolor(color))
@ -134,20 +162,20 @@ class FastAppendCurve(pg.PlotCurveItem):
if 'dash' in style:
pen.setDashPattern([8, 3])
self.setPen(pen)
self._pen = pen
# last segment is drawn in 2px thickness for emphasis
# self.last_step_pen = pg.mkPen(hcolor(color), width=2)
self.last_step_pen = pg.mkPen(pen, width=2)
self._last_line: QLineF = None
self._last_step_rect: QRectF = None
self._last_line: Optional[QLineF] = None
self._last_step_rect: Optional[QRectF] = None
# flat-top style histogram-like discrete curve
self._step_mode: bool = step_mode
# self._fill = True
self.setBrush(hcolor(fill_color or color))
self._brush = pg.functions.mkBrush(hcolor(fill_color or color))
# TODO: one question still remaining is if this makes trasform
# interactions slower (such as zooming) and if so maybe if/when
@ -158,13 +186,87 @@ class FastAppendCurve(pg.PlotCurveItem):
# only thing drawn is the "last" line segment which can
# have a weird artifact where it won't be fully drawn to its
# endpoint (something we saw on trade rate curves)
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
self.setCacheMode(
QGraphicsItem.DeviceCoordinateCache
)
self.update()
# TODO: probably stick this in a new parent
# type which will contain our own version of
# what ``PlotCurveItem`` had in terms of base
# functionality? A `FlowGraphic` maybe?
def x_uppx(self) -> int:
px_vecs = self.pixelVectors()[0]
if px_vecs:
xs_in_px = px_vecs.x()
return round(xs_in_px)
else:
return 0
def px_width(self) -> float:
vb = self.getViewBox()
if not vb:
return 0
vr = self.viewRect()
l, r = int(vr.left()), int(vr.right())
if not self._xrange:
return 0
start, stop = self._xrange
lbar = max(l, start)
rbar = min(r, stop)
return vb.mapViewToDevice(
QLineF(lbar, 0, rbar, 0)
).length()
def downsample(
self,
x,
y,
px_width,
uppx,
) -> tuple[np.ndarray, np.ndarray]:
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y = ds_m4(
x,
y,
px_width=px_width,
uppx=uppx,
log_scale=bool(uppx)
)
x = np.broadcast_to(x[:, None], y.shape)
# x = (x + np.array([-0.43, 0, 0, 0.43])).flatten()
x = (x + np.array([-0.5, 0, 0, 0.5])).flatten()
y = y.flatten()
# presumably?
self._in_ds = True
return x, y
def update_from_array(
self,
# full array input history
x: np.ndarray,
y: np.ndarray,
# pre-sliced array data that's "in view"
x_iv: np.ndarray,
y_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
profiler: Optional[pg.debug.Profiler] = None,
) -> QtGui.QPainterPath:
'''
Update curve from input 2-d data.
@ -173,42 +275,172 @@ class FastAppendCurve(pg.PlotCurveItem):
a length diff.
'''
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
flip_cache = False
profiler = profiler or pg.debug.Profiler(
msg=f'FastAppendCurve.update_from_array(): `{self._name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
# flip_cache = False
if self._xrange:
istart, istop = self._xrange
else:
self._xrange = istart, istop = x[0], x[-1]
# print(f"xrange: {self._xrange}")
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = istart - x[0]
append_length = x[-1] - istop
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
self._x, self._y = x, y
if view_range:
profiler(f'view range slice {view_range}')
# downsampling incremental state checking
uppx = self.x_uppx()
px_width = self.px_width()
uppx_diff = (uppx - self._last_uppx)
should_ds = False
should_redraw = False
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
if view_range and not self._in_ds:
# print(f'{self._name} vr: {view_range}')
# by default we only pull data up to the last (current) index
x_out, y_out = x_iv[:-1], y_iv[:-1]
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
x_out, y_out = step_path_arrays_from_1d(x[:-1], y[:-1])
# TODO: numba this bish
x_out, y_out = step_path_arrays_from_1d(
x_out,
y_out
)
profiler('generated step arrays')
should_redraw = True
profiler('sliced in-view array history')
# x_last = x_iv[-1]
# y_last = y_iv[-1]
self._last_vr = view_range
# self.disable_cache()
# flip_cache = True
else:
self._xrange = x[0], x[-1]
x_last = x[-1]
y_last = y[-1]
# check for downsampling conditions
if (
# std m4 downsample conditions
px_width
and uppx_diff >= 4
or uppx_diff <= -3
or self._step_mode and abs(uppx_diff) >= 4
):
log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
)
self._last_uppx = uppx
should_ds = True
elif (
uppx <= 2
and self._in_ds
):
# we should de-downsample back to our original
# source data so we clear our path data in prep
# to generate a new one from original source data.
should_redraw = True
should_ds = False
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(istart - x[0])
append_length = int(x[-1] - istop)
# no_path_yet = self.path is None
if (
self.path is None
or should_redraw
or should_ds
or prepend_length > 0
):
if (
not view_range
or self._in_ds
):
# by default we only pull data up to the last (current) index
x_out, y_out = x[:-1], y[:-1]
if self.path is None or prepend_length > 0:
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
x_out, y_out = step_path_arrays_from_1d(
x_out,
y_out,
)
# TODO: numba this bish
profiler('generated step arrays')
if should_redraw:
profiler('path reversion to non-ds')
if self.path:
self.path.clear()
if self.fast_path:
self.fast_path.clear()
if should_redraw and not should_ds:
if self._in_ds:
log.info(f'DEDOWN -> {self._name}')
self._in_ds = False
elif should_ds and px_width:
x_out, y_out = self.downsample(
x_out,
y_out,
px_width,
uppx,
)
profiler(f'FULL PATH downsample redraw={should_ds}')
self._in_ds = True
self.path = pg.functions.arrayToQPath(
x_out,
y_out,
connect='all',
finiteCheck=False,
path=self.path,
)
profiler('generate fresh path')
profiler('generated fresh path')
# profiler(f'DRAW PATH IN VIEW -> {self._name}')
# if self._step_mode:
# self.path.closeSubpath()
# reserve mem allocs see:
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - https://doc.qt.io/qt-5/qpainterpath.html#clear
# XXX: right now this is based on had hoc checks on a
# hidpi 3840x2160 4k monitor but we should optimize for
# the target display(s) on the sys.
# if no_path_yet:
# self.path.reserve(int(500e3))
# TODO: get this piecewise prepend working - right now it's
# giving heck on vwap...
# if prepend_length:
# elif prepend_length:
# breakpoint()
# prepend_path = pg.functions.arrayToQPath(
@ -223,11 +455,17 @@ class FastAppendCurve(pg.PlotCurveItem):
# # self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(old_path)
elif append_length > 0:
elif (
append_length > 0
and not view_range
):
new_x = x[-append_length - 2:-1]
new_y = y[-append_length - 2:-1]
if self._step_mode:
new_x, new_y = step_path_arrays_from_1d(
x[-append_length - 2:-1],
y[-append_length - 2:-1],
new_x,
new_y,
)
# [1:] since we don't need the vertical line normally at
# the beginning of the step curve taking the first (x,
@ -236,26 +474,49 @@ class FastAppendCurve(pg.PlotCurveItem):
new_x = new_x[1:]
new_y = new_y[1:]
else:
# print(f"append_length: {append_length}")
new_x = x[-append_length - 2:-1]
new_y = y[-append_length - 2:-1]
# print((new_x, new_y))
profiler('diffed append arrays')
if should_ds:
new_x, new_y = self.downsample(
new_x,
new_y,
**should_ds,
)
profiler(f'fast path downsample redraw={should_ds}')
append_path = pg.functions.arrayToQPath(
new_x,
new_y,
connect='all',
# finiteCheck=False,
finiteCheck=False,
path=self.fast_path,
)
path = self.path
if self.use_fpath:
# an attempt at trying to make append-updates faster..
if self.fast_path is None:
self.fast_path = append_path
self.fast_path.reserve(int(6e3))
else:
self.fast_path.connectPath(append_path)
size = self.fast_path.capacity()
profiler(f'connected fast path w size: {size}')
# print(f"append_path br: {append_path.boundingRect()}")
# self.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path)
# XXX: lol this causes a hang..
# self.path = self.path.simplified()
else:
size = self.path.capacity()
profiler(f'connected history path w size: {size}')
self.path.connectPath(append_path)
# other merging ideas:
# https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths
if self._step_mode:
# path.addPath(append_path)
self.path.connectPath(append_path)
# path.closeSubpath()
# TODO: try out new work from `pyqtgraph` main which
# should repair horrid perf:
@ -265,33 +526,8 @@ class FastAppendCurve(pg.PlotCurveItem):
# # XXX: super slow set "union" op
# self.path = self.path.united(append_path).simplified()
# # path.addPath(append_path)
# # path.closeSubpath()
else:
# print(f"append_path br: {append_path.boundingRect()}")
# self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(append_path)
path.connectPath(append_path)
self.disable_cache()
flip_cache = True
if (
self._step_mode
):
self.disable_cache()
flip_cache = True
# print(f"update br: {self.path.boundingRect()}")
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
x0, x_last = self._xrange = x[0], x[-1]
y_last = y[-1]
# self.disable_cache()
# flip_cache = True
# draw the "current" step graphic segment so it lines up with
# the "middle" of the current (OHLC) sample.
@ -304,21 +540,61 @@ class FastAppendCurve(pg.PlotCurveItem):
x_last - 0.5, 0,
x_last + 0.5, y_last
)
# print(
# f"path br: {self.path.boundingRect()}",
# f"fast path br: {self.fast_path.boundingRect()}",
# f"last rect br: {self._last_step_rect}",
# )
else:
# print((x[-1], y_last))
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y_last
)
profiler('draw last segment')
# trigger redraw of path
# do update before reverting to cache mode
self.prepareGeometryChange()
# self.prepareGeometryChange()
self.update()
profiler('.update()')
if flip_cache:
# XXX: seems to be needed to avoid artifacts (see above).
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# if flip_cache:
# # XXX: seems to be needed to avoid artifacts (see above).
# self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
def getData(self):
return self._x, self._y
# TODO: drop the above after ``Cursor`` re-work
def get_arrays(self) -> tuple[np.ndarray, np.ndarray]:
return self._x, self._y
def clear(self):
'''
Clear internal graphics making object ready for full re-draw.
'''
# NOTE: original code from ``pg.PlotCurveItem``
self.xData = None
self.yData = None
# XXX: previously, if not trying to leverage `.reserve()` allocs
# then you might as well create a new one..
# self.path = None
# path reservation aware non-mem de-alloc cleaning
if self.path:
self.path.clear()
if self.fast_path:
# self.fast_path.clear()
self.fast_path = None
# self.disable_cache()
# self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
def disable_cache(self) -> None:
'''
@ -339,16 +615,21 @@ class FastAppendCurve(pg.PlotCurveItem):
else:
# dynamically override this method after initial
# path is created to avoid requiring the above None check
self.boundingRect = self._br
return self._br()
self.boundingRect = self._path_br
return self._path_br()
def _br(self):
def _path_br(self):
'''
Post init ``.boundingRect()```.
'''
hb = self.path.controlPointRect()
hb_size = hb.size()
fp = self.fast_path
if fp:
fhb = fp.controlPointRect()
hb_size = fhb.size() + hb_size
# print(f'hb_size: {hb_size}')
w = hb_size.width() + 1
@ -373,32 +654,43 @@ class FastAppendCurve(pg.PlotCurveItem):
) -> None:
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
# p.setRenderHint(p.Antialiasing, True)
profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.paint(): `{self._name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
if (
self._step_mode
and self._last_step_rect
):
brush = self.opts['brush']
brush = self._brush
# p.drawLines(*tuple(filter(bool, self._last_step_lines)))
# p.drawRect(self._last_step_rect)
p.fillRect(self._last_step_rect, brush)
profiler('.fillRect()')
# p.drawPath(self.path)
# profiler('.drawPath()')
if self._last_line:
p.setPen(self.last_step_pen)
p.drawLine(self._last_line)
profiler('.drawLine()')
p.setPen(self._pen)
# else:
p.setPen(self.opts['pen'])
p.drawPath(self.path)
profiler('.drawPath()')
path = self.path
# TODO: try out new work from `pyqtgraph` main which
# should repair horrid perf:
if path:
p.drawPath(path)
profiler('.drawPath(path)')
fp = self.fast_path
if fp:
p.drawPath(fp)
profiler('.drawPath(fast_path)')
# TODO: try out new work from `pyqtgraph` main which should
# repair horrid perf (pretty sure i did and it was still
# horrible?):
# https://github.com/pyqtgraph/pyqtgraph/pull/2032
# if self._fill:
# brush = self.opts['brush']

View File

@ -29,6 +29,7 @@ from typing import Optional, Any, Callable
import numpy as np
import tractor
import trio
import pyqtgraph as pg
from .. import brokers
from ..data.feed import open_feed
@ -51,12 +52,16 @@ from ._forms import (
mk_order_pane_layout,
)
from .order_mode import open_order_mode
# from .._profile import (
# pg_profile_enabled,
# ms_slower_then,
# )
from ..log import get_logger
log = get_logger(__name__)
# TODO: load this from a config.toml!
_quote_throttle_rate: int = 6 + 16 # Hz
_quote_throttle_rate: int = 12 # Hz
# a working tick-type-classes template
@ -67,12 +72,20 @@ _tick_groups = {
}
# TODO: delegate this to each `Flow.maxmin()` which includes
# caching and further we should implement the following stream based
# approach, likely with ``numba``:
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
def chart_maxmin(
chart: ChartPlotWidget,
ohlcv_shm: ShmArray,
vlm_chart: Optional[ChartPlotWidget] = None,
) -> tuple[
tuple[int, int, int, int],
float,
float,
float,
@ -81,11 +94,7 @@ def chart_maxmin(
Compute max and min datums "in view" for range limits.
'''
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._arrays[chart.name]
array = ohlcv_shm.array
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
@ -97,18 +106,23 @@ def chart_maxmin(
chart.default_view()
return (last_bars_range, 0, 0, 0)
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
mx, mn = (
np.nanmax(in_view['high']),
np.nanmin(in_view['low'],)
)
mx_vlm_in_view = 0
if vlm_chart:
mx_vlm_in_view = np.max(in_view['volume'])
mx_vlm_in_view = np.max(
in_view['volume']
)
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
return (
last_bars_range,
mx,
max(mn, 0), # presuming price can't be negative?
mx_vlm_in_view,
)
@dataclass
@ -177,8 +191,12 @@ async def graphics_update_loop(
if vlm_chart:
vlm_sticky = vlm_chart._ysticks['volume']
maxmin = partial(chart_maxmin, chart, vlm_chart)
chart.default_view()
maxmin = partial(
chart_maxmin,
chart,
ohlcv,
vlm_chart,
)
last_bars_range: tuple[float, float]
(
last_bars_range,
@ -258,8 +276,11 @@ async def graphics_update_loop(
}
})
# main loop
chart.default_view()
# main real-time quotes update loop
async for quotes in stream:
ds.quotes = quotes
quote_period = time.time() - last_quote
quote_rate = round(
@ -281,6 +302,12 @@ async def graphics_update_loop(
chart.pause_all_feeds()
continue
ic = chart.view._ic
if ic:
chart.pause_all_feeds()
await ic.wait()
chart.resume_all_feeds()
# sync call to update all graphics/UX components.
graphics_update_cycle(ds)
@ -291,29 +318,40 @@ def graphics_update_cycle(
trigger_all: bool = False, # flag used by prepend history updates
) -> None:
# TODO: eventually optimize this whole graphics stack with ``numba``
# hopefully XD
# unpack multi-referenced components
chart = ds.chart
profiler = pg.debug.Profiler(
msg=f'Graphics loop cycle for: `{chart.name}`',
disabled=True, # not pg_profile_enabled(),
gt=1/12 * 1e3,
# gt=ms_slower_then,
)
# unpack multi-referenced components
vlm_chart = ds.vlm_chart
l1 = ds.l1
ohlcv = ds.ohlcv
array = ohlcv.array
vars = ds.vars
tick_margin = vars['tick_margin']
update_uppx = 6
for sym, quote in ds.quotes.items():
# compute the first available graphic's x-units-per-pixel
xpx = vlm_chart.view.x_uppx()
# NOTE: vlm may be written by the ``brokerd`` backend
# event though a tick sample is not emitted.
# TODO: show dark trades differently
# https://github.com/pikers/piker/issues/116
# NOTE: this used to be implemented in a dedicated
# "increment tas": ``check_for_new_bars()`` but it doesn't
# "increment task": ``check_for_new_bars()`` but it doesn't
# make sense to do a whole task switch when we can just do
# this simple index-diff and all the fsp sub-curve graphics
# are diffed on each draw cycle anyway; so updates to the
@ -322,10 +360,6 @@ def graphics_update_cycle(
# increment the view position by the sample offset.
i_step = ohlcv.index
i_diff = i_step - vars['i_last']
if i_diff > 0:
chart.increment_view(
steps=i_diff,
)
vars['i_last'] = i_step
(
@ -338,13 +372,16 @@ def graphics_update_cycle(
l, lbar, rbar, r = brange
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
profiler('maxmin call')
liv = r > i_step # the last datum is in view
# don't real-time "shift" the curve to the
# left under the following conditions:
# left unless we get one of the following:
if (
(
i_diff > 0 # no new sample step
and xpx < 4 # chart is zoomed out very far
and r >= i_step # the last datum isn't in view
and liv
)
or trigger_all
@ -355,28 +392,57 @@ def graphics_update_cycle(
chart.increment_view(steps=i_diff)
if vlm_chart:
vlm_chart.update_curve_from_array('volume', array)
ds.vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
)
if (
mx_vlm_in_view > vars['last_mx_vlm']
(xpx < update_uppx or i_diff > 0)
or trigger_all
and liv
):
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vlm_chart.view._set_yrange(
yrange=(0, mx_vlm_in_view * 1.375)
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_array(
'volume',
array,
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
render=False,
# XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^
# without this, since we disable the
# 'volume' (units) chart after the $vlm starts
# up we need to be sure to enable this
# auto-ranging otherwise there will be no handler
# connected to update accompanying overlay
# graphics..
)
if (
mx_vlm_in_view != vars['last_mx_vlm']
):
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
update_fsp_chart(
vlm_chart,
flow.shm,
flow,
curve_name,
array_key=curve_name,
)
# is this even doing anything?
flow.plot.vb._set_yrange(
# (pretty sure it's the real-time
# resizing from last quote?)
fvb = flow.plot.vb
fvb._set_yrange(
autoscale_linked_plots=False,
name=curve_name,
)
@ -423,15 +489,18 @@ def graphics_update_cycle(
# current) tick first order as an optimization where we only
# update from the last tick from each type class.
# last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# update ohlc sampled price bars
chart.update_ohlc_from_array(
if (
xpx < update_uppx
or i_diff > 0
):
chart.update_graphics_from_array(
chart.name,
array,
)
# iterate in FIFO order per frame
# iterate in FIFO order per tick-frame
for typ, tick in lasts.items():
price = tick.get('price')
@ -465,7 +534,7 @@ def graphics_update_cycle(
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array(
chart.update_graphics_from_array(
'bar_wap',
array,
)
@ -481,7 +550,10 @@ def graphics_update_cycle(
l1.bid_label.fields['level']: l1.bid_label,
}.get(price)
if label is not None:
if (
label is not None
# and liv
):
label.update_fields(
{'level': price, 'size': size}
)
@ -490,51 +562,54 @@ def graphics_update_cycle(
# the relevant L1 queue?
# label.size -= size
# elif ticktype in ('ask', 'asize'):
elif typ in _tick_groups['asks']:
elif (
typ in _tick_groups['asks']
# TODO: instead we could check if the price is in the
# y-view-range?
# and liv
):
l1.ask_label.update_fields({'level': price, 'size': size})
# elif ticktype in ('bid', 'bsize'):
elif typ in _tick_groups['bids']:
elif (
typ in _tick_groups['bids']
# TODO: instead we could check if the price is in the
# y-view-range?
# and liv
):
l1.bid_label.update_fields({'level': price, 'size': size})
# check for y-range re-size
if (
(mx > vars['last_mx']) or (mn < vars['last_mn'])
and not chart._static_yrange == 'axis'
and liv
):
# print(f'new y range: {(mn, mx)}')
chart.view._set_yrange(
yrange=(mn, mx),
main_vb = chart.view
if (
main_vb._ic is None
or not main_vb._ic.is_set()
):
main_vb._set_yrange(
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
yrange=(mn, mx),
)
vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all derived fsp subplots
for name, subchart in ds.linked.subplots.items():
update_fsp_chart(
subchart,
subchart._shm,
# XXX: do we really needs seperate names here?
name,
array_key=name,
)
subchart.cv._set_yrange()
# TODO: all overlays on all subplots..
# run synchronous update on all derived overlays
# run synchronous update on all linked flows
for curve_name, flow in chart._flows.items():
# TODO: should the "main" (aka source) flow be special?
if curve_name == chart.data_key:
continue
update_fsp_chart(
chart,
flow.shm,
flow,
curve_name,
array_key=curve_name,
)
@ -593,8 +668,8 @@ async def display_symbol_data(
f'step:1s '
)
linkedsplits = godwidget.linkedsplits
linkedsplits._symbol = symbol
linked = godwidget.linkedsplits
linked._symbol = symbol
# generate order mode side-pane UI
# A ``FieldsForm`` form to configure order entry
@ -604,7 +679,7 @@ async def display_symbol_data(
godwidget.pp_pane = pp_pane
# create main OHLC chart
chart = linkedsplits.plot_ohlc_main(
chart = linked.plot_ohlc_main(
symbol,
bars,
sidepane=pp_pane,
@ -630,12 +705,13 @@ async def display_symbol_data(
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
chart._flows[chart.data_key].shm = ohlcv
# NOTE: we must immediately tell Qt to show the OHLC chart
# to avoid a race where the subplots get added/shown to
# the linked set *before* the main price chart!
linkedsplits.show()
linkedsplits.focus()
linked.show()
linked.focus()
await trio.sleep(0)
vlm_chart: Optional[ChartPlotWidget] = None
@ -645,7 +721,7 @@ async def display_symbol_data(
if has_vlm(ohlcv):
vlm_chart = await ln.start(
open_vlm_displays,
linkedsplits,
linked,
ohlcv,
)
@ -653,7 +729,7 @@ async def display_symbol_data(
# from an input config.
ln.start_soon(
start_fsp_displays,
linkedsplits,
linked,
ohlcv,
loading_sym_key,
loglevel,
@ -662,7 +738,7 @@ async def display_symbol_data(
# start graphics update loop after receiving first live quote
ln.start_soon(
graphics_update_loop,
linkedsplits,
linked,
feed.stream,
ohlcv,
wap_in_history,
@ -680,17 +756,19 @@ async def display_symbol_data(
# let Qt run to render all widgets and make sure the
# sidepanes line up vertically.
await trio.sleep(0)
linkedsplits.resize_sidepanes()
linked.resize_sidepanes()
# NOTE: we pop the volume chart from the subplots set so
# that it isn't double rendered in the display loop
# above since we do a maxmin calc on the volume data to
# determine if auto-range adjustements should be made.
linkedsplits.subplots.pop('volume', None)
# linked.subplots.pop('volume', None)
# TODO: make this not so shit XD
# close group status
sbar._status_groups[loading_sym_key][1]()
# let the app run.. bby
chart.default_view()
# linked.graphics_cycle()
await trio.sleep_forever()

View File

@ -72,12 +72,16 @@ def has_vlm(ohlcv: ShmArray) -> bool:
def update_fsp_chart(
chart: ChartPlotWidget,
shm: ShmArray,
flow,
graphics_name: str,
array_key: Optional[str],
) -> None:
shm = flow.shm
if not shm:
return
array = shm.array
last_row = try_read(array)
@ -89,7 +93,7 @@ def update_fsp_chart(
# update graphics
# NOTE: this does a length check internally which allows it
# staying above the last row check below..
chart.update_curve_from_array(
chart.update_graphics_from_array(
graphics_name,
array,
array_key=array_key or graphics_name,
@ -246,7 +250,6 @@ async def run_fsp_ui(
overlay=True,
color='default_light',
array_key=name,
separate_axes=conf.get('separate_axes', False),
**conf.get('chart_kwargs', {})
)
# specially store ref to shm for lookup in display loop
@ -272,6 +275,7 @@ async def run_fsp_ui(
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
chart._flows[chart.data_key].shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linkedsplits.chart.name
@ -283,7 +287,7 @@ async def run_fsp_ui(
# first UI update, usually from shm pushed history
update_fsp_chart(
chart,
shm,
chart._flows[array_key],
name,
array_key=array_key,
)
@ -426,6 +430,7 @@ class FspAdmin:
) as (ctx, last_index),
ctx.open_stream() as stream,
):
# register output data
self._registry[
(fqsn, ns_path)
@ -634,6 +639,7 @@ async def open_vlm_displays(
# the curve item internals are pretty convoluted.
style='step',
)
chart._flows['volume'].shm = ohlcv
# force 0 to always be in view
def maxmin(
@ -679,7 +685,7 @@ async def open_vlm_displays(
last_val_sticky.update_from_data(-1, value)
vlm_curve = chart.update_curve_from_array(
vlm_curve = chart.update_graphics_from_array(
'volume',
shm.array,
)
@ -756,19 +762,14 @@ async def open_vlm_displays(
'dark_trade_rate',
]
# add custom auto range handler
dvlm_pi.vb._maxmin = partial(
group_mxmn = partial(
maxmin,
# keep both regular and dark vlm in view
names=fields + dvlm_rate_fields,
)
# TODO: is there a way to "sync" the dual axes such that only
# one curve is needed?
# hide the original vlm curve since the $vlm one is now
# displayed and the curves are effectively the same minus
# liquidity events (well at least on low OHLC periods - 1s).
vlm_curve.hide()
# add custom auto range handler
dvlm_pi.vb._maxmin = group_mxmn
# use slightly less light (then bracket) gray
# for volume from "main exchange" and a more "bluey"
@ -802,13 +803,16 @@ async def open_vlm_displays(
color=color,
step_mode=step_mode,
style=style,
pi=pi,
)
# TODO: we need a better API to do this..
# specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in
# ``.draw_curve()``.
chart._flows[name].shm = shm
flow = chart._flows[name]
assert flow.plot is pi
flow.shm = shm
chart_curves(
fields,
@ -836,6 +840,17 @@ async def open_vlm_displays(
fr_shm,
)
# TODO: is there a way to "sync" the dual axes such that only
# one curve is needed?
# hide the original vlm curve since the $vlm one is now
# displayed and the curves are effectively the same minus
# liquidity events (well at least on low OHLC periods - 1s).
vlm_curve.hide()
chart.removeItem(vlm_curve)
chart._flows.pop('volume')
# avoid range sorting on volume once disabled
chart.view.disable_auto_yrange()
# Trade rate overlay
# XXX: requires an additional overlay for
# a trades-per-period (time) y-range.
@ -875,7 +890,10 @@ async def open_vlm_displays(
style='dash',
)
for pi in (dvlm_pi, tr_pi):
for pi in (
dvlm_pi,
tr_pi,
):
for name, axis_info in pi.axes.items():
# lol this sux XD
axis = axis_info['item']

View File

@ -20,6 +20,7 @@ Chart view box primitives
"""
from __future__ import annotations
from contextlib import asynccontextmanager
# import itertools
import time
from typing import Optional, Callable
@ -33,9 +34,11 @@ import numpy as np
import trio
from ..log import get_logger
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import _min_points_to_show
from ._editors import SelectRect
from . import _event
from ._ohlc import BarItems
log = get_logger(__name__)
@ -318,6 +321,7 @@ async def handle_viewmode_mouse(
):
# when in order mode, submit execution
# msg.event.accept()
# breakpoint()
view.order_mode.submit_order()
@ -356,13 +360,13 @@ class ChartView(ViewBox):
):
super().__init__(
parent=parent,
name=name,
# TODO: look into the default view padding
# support that might replace somem of our
# ``ChartPlotWidget._set_yrange()`
# defaultPadding=0.,
**kwargs
)
# for "known y-range style"
self._static_yrange = static_yrange
self._maxmin = None
@ -384,6 +388,34 @@ class ChartView(ViewBox):
self.order_mode: bool = False
self.setFocusPolicy(QtCore.Qt.StrongFocus)
self._ic = None
def start_ic(
self,
) -> None:
'''
Signal the beginning of a click-drag interaction
to any interested task waiters.
'''
if self._ic is None:
self.chart.pause_all_feeds()
self._ic = trio.Event()
def signal_ic(
self,
*args,
) -> None:
'''
Signal the end of a click-drag interaction
to any waiters.
'''
if self._ic:
self._ic.set()
self._ic = None
self.chart.resume_all_feeds()
@asynccontextmanager
async def open_async_input_handler(
@ -435,7 +467,8 @@ class ChartView(ViewBox):
axis=None,
relayed_from: ChartView = None,
):
'''Override "center-point" location for scrolling.
'''
Override "center-point" location for scrolling.
This is an override of the ``ViewBox`` method simply changing
the center of the zoom to be the y-axis.
@ -536,6 +569,11 @@ class ChartView(ViewBox):
self._resetTarget()
self.scaleBy(s, focal)
self.sigRangeChangedManually.emit(mask)
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
ev.accept()
def mouseDragEvent(
@ -618,6 +656,11 @@ class ChartView(ViewBox):
# XXX: WHY
ev.accept()
self.start_ic()
# if self._ic is None:
# self.chart.pause_all_feeds()
# self._ic = trio.Event()
if axis == 1:
self.chart._static_yrange = 'axis'
@ -635,6 +678,12 @@ class ChartView(ViewBox):
self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
if ev.isFinish():
self.signal_ic()
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif button & QtCore.Qt.RightButton:
@ -698,6 +747,11 @@ class ChartView(ViewBox):
data set.
'''
profiler = pg.debug.Profiler(
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
delayed=True,
)
set_range = True
chart = self._chart
@ -725,32 +779,42 @@ class ChartView(ViewBox):
# Make sure min bars/datums on screen is adhered.
else:
br = bars_range or chart.bars_range()
profiler(f'got bars range: {br}')
# TODO: maybe should be a method on the
# chart widget/item?
if autoscale_linked_plots:
# avoid recursion by sibling plots
linked = self.linkedsplits
plots = list(linked.subplots.copy().values())
main = linked.chart
if main:
plots.append(main)
# if False:
# if autoscale_linked_plots:
# # avoid recursion by sibling plots
# linked = self.linkedsplits
# plots = list(linked.subplots.copy().values())
# main = linked.chart
# if main:
# plots.append(main)
for chart in plots:
if chart and not chart._static_yrange:
chart.cv._set_yrange(
bars_range=br,
autoscale_linked_plots=False,
)
# for chart in plots:
# if chart and not chart._static_yrange:
# chart.cv._set_yrange(
# bars_range=br,
# autoscale_linked_plots=False,
# )
# profiler('autoscaled linked plots')
if set_range:
if not yrange:
# XXX: only compute the mxmn range
# if none is provided as input!
yrange = self._maxmin()
if yrange is None:
log.warning(f'No yrange provided for {self.name}!?')
return
ylow, yhigh = yrange
profiler(f'maxmin(): {yrange}')
# view margins: stay within a % of the "true range"
diff = yhigh - ylow
ylow = ylow - (diff * range_margin)
@ -764,9 +828,11 @@ class ChartView(ViewBox):
yMax=yhigh,
)
self.setYRange(ylow, yhigh)
profiler(f'set limits: {(ylow, yhigh)}')
def enable_auto_yrange(
vb: ChartView,
self,
src_vb: Optional[ChartView] = None,
) -> None:
'''
@ -774,13 +840,107 @@ class ChartView(ViewBox):
based on data contents and ``ViewBox`` state.
'''
vb.sigXRangeChanged.connect(vb._set_yrange)
if src_vb is None:
src_vb = self
# such that when a linked chart changes its range
# this local view is also automatically changed and
# resized to data.
src_vb.sigXRangeChanged.connect(self._set_yrange)
# splitter(s) resizing
src_vb.sigResized.connect(self._set_yrange)
# mouse wheel doesn't emit XRangeChanged
vb.sigRangeChangedManually.connect(vb._set_yrange)
vb.sigResized.connect(vb._set_yrange) # splitter(s) resizing
src_vb.sigRangeChangedManually.connect(self._set_yrange)
# TODO: a smarter way to avoid calling this needlessly?
# 2 things i can think of:
# - register downsample-able graphics specially and only
# iterate those.
# - only register this when certain downsampleable graphics are
# "added to scene".
src_vb.sigRangeChangedManually.connect(
self.maybe_downsample_graphics
)
def disable_auto_yrange(
self,
) -> None:
self._chart._static_yrange = 'axis'
# self._chart._static_yrange = 'axis'
self.sigXRangeChanged.disconnect(
self._set_yrange,
)
self.sigResized.disconnect(
self._set_yrange,
)
self.sigRangeChangedManually.disconnect(
self.maybe_downsample_graphics
)
self.sigRangeChangedManually.disconnect(
self._set_yrange,
)
def x_uppx(self) -> float:
'''
Return the "number of x units" within a single
pixel currently being displayed for relevant
graphics items which are our children.
'''
graphics = list(self._chart._graphics.values())
if not graphics:
return 0
for graphic in graphics:
xvec = graphic.pixelVectors()[0]
if xvec:
return xvec.x()
else:
return 0
def maybe_downsample_graphics(self):
uppx = self.x_uppx()
if (
# we probably want to drop this once we are "drawing in
# view" for downsampled flows..
uppx and uppx > 16
and self._ic is not None
):
# don't bother updating since we're zoomed out bigly and
# in a pan-interaction, in which case we shouldn't be
# doing view-range based rendering (at least not yet).
# print(f'{uppx} exiting early!')
return
profiler = pg.debug.Profiler(
disabled=not pg_profile_enabled(),
gt=3,
delayed=True,
)
# TODO: a faster single-loop-iterator way of doing this XD
chart = self._chart
linked = self.linkedsplits
plots = linked.subplots | {chart.name: chart}
for chart_name, chart in plots.items():
for name, flow in chart._flows.items():
graphics = flow.graphics
use_vr = False
if isinstance(graphics, BarItems):
use_vr = True
# pass in no array which will read and render from the last
# passed array (normally provided by the display loop.)
chart.update_graphics_from_array(
name,
use_vr=use_vr,
profiler=profiler,
)
profiler(f'range change updated {chart_name}:{name}')
profiler.finish()

View File

@ -20,7 +20,7 @@ Lines for orders, alerts, L2.
"""
from functools import partial
from math import floor
from typing import Tuple, Optional, List, Callable
from typing import Optional, Callable
import pyqtgraph as pg
from pyqtgraph import Point, functions as fn
@ -29,10 +29,8 @@ from PyQt5.QtCore import QPointF
from ._annotate import qgo_draw_markers, LevelMarker
from ._anchors import (
marker_right_points,
vbr_left,
right_axis,
# pp_tight_and_right, # wanna keep it straight in the long run
gpath_pin,
)
from ..calc import humanize
@ -104,8 +102,8 @@ class LevelLine(pg.InfiniteLine):
# list of labels anchored at one of the 2 line endpoints
# inside the viewbox
self._labels: List[Label] = []
self._markers: List[(int, Label)] = []
self._labels: list[Label] = []
self._markers: list[(int, Label)] = []
# whenever this line is moved trigger label updates
self.sigPositionChanged.connect(self.on_pos_change)
@ -124,7 +122,7 @@ class LevelLine(pg.InfiniteLine):
self._y_incr_mult = 1 / chart.linked.symbol.tick_size
self._right_end_sc: float = 0
def txt_offsets(self) -> Tuple[int, int]:
def txt_offsets(self) -> tuple[int, int]:
return 0, 0
@property
@ -315,17 +313,6 @@ class LevelLine(pg.InfiniteLine):
# TODO: enter labels edit mode
print(f'double click {ev}')
def right_point(
self,
) -> float:
chart = self._chart
l1_len = chart._max_l1_line_len
ryaxis = chart.getAxis('right')
up_to_l1_sc = ryaxis.pos().x() - l1_len
return up_to_l1_sc
def paint(
self,
@ -345,7 +332,7 @@ class LevelLine(pg.InfiniteLine):
vb_left, vb_right = self._endPoints
vb = self.getViewBox()
line_end, marker_right, r_axis_x = marker_right_points(self._chart)
line_end, marker_right, r_axis_x = self._chart.marker_right_points()
if self.show_markers and self.markers:
@ -411,7 +398,7 @@ class LevelLine(pg.InfiniteLine):
def scene_endpoint(self) -> QPointF:
if not self._right_end_sc:
line_end, _, _ = marker_right_points(self._chart)
line_end, _, _ = self._chart.marker_right_points()
self._right_end_sc = line_end - 10
return QPointF(self._right_end_sc, self.scene_y())
@ -422,23 +409,23 @@ class LevelLine(pg.InfiniteLine):
) -> QtWidgets.QGraphicsPathItem:
self._marker = path
self._marker.setPen(self.currentPen)
self._marker.setBrush(fn.mkBrush(self.currentPen.color()))
# add path to scene
self.getViewBox().scene().addItem(path)
self._marker = path
rsc = self.right_point()
self._marker.setPen(self.currentPen)
self._marker.setBrush(fn.mkBrush(self.currentPen.color()))
# place to just-left of L1 labels
rsc = self._chart.pre_l1_xs()[0]
path.setPos(QPointF(rsc, self.scene_y()))
return path
def hoverEvent(self, ev):
"""Mouse hover callback.
'''
Mouse hover callback.
"""
'''
cur = self._chart.linked.cursor
# hovered
@ -614,7 +601,8 @@ def order_line(
**line_kwargs,
) -> LevelLine:
'''Convenience routine to add a line graphic representing an order
'''
Convenience routine to add a line graphic representing an order
execution submitted to the EMS via the chart's "order mode".
'''
@ -689,7 +677,6 @@ def order_line(
return f'{account}: '
label.fields = {
'size': size,
'size_digits': 0,

View File

@ -17,7 +17,11 @@
Super fast OHLC sampling graphics types.
"""
from typing import List, Optional, Tuple
from __future__ import annotations
from typing import (
Optional,
TYPE_CHECKING,
)
import numpy as np
import pyqtgraph as pg
@ -27,30 +31,29 @@ from PyQt5.QtCore import QLineF, QPointF
# from numba import types as ntypes
# from ..data._source import numba_ohlc_dtype
from .._profile import pg_profile_enabled
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor
from ..log import get_logger
from ._curve import FastAppendCurve
from ._compression import ohlc_flatten
if TYPE_CHECKING:
from ._chart import LinkedSplits
def _mk_lines_array(
data: List,
size: int,
elements_step: int = 6,
) -> np.ndarray:
"""Create an ndarray to hold lines graphics info.
"""
return np.zeros_like(
data,
shape=(int(size), elements_step),
dtype=object,
)
log = get_logger(__name__)
def lines_from_ohlc(
def bar_from_ohlc_row(
row: np.ndarray,
w: float
) -> Tuple[QLineF]:
) -> tuple[QLineF]:
'''
Generate the minimal ``QLineF`` lines to construct a single
OHLC "bar" for use in the "last datum" of a series.
'''
open, high, low, close, index = row[
['open', 'high', 'low', 'close', 'index']]
@ -84,7 +87,7 @@ def lines_from_ohlc(
@njit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.Tuple((float64[:], float64[:], float64[:]))(
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
@ -95,10 +98,12 @@ def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data.
"""
) -> np.ndarray:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
x = np.zeros(
@ -152,26 +157,50 @@ def path_arrays_from_ohlc(
def gen_qpath(
data,
start, # XXX: do we need this?
w,
data: np.ndarray,
start: int, # XXX: do we need this?
w: float,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath:
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
path_was_none = path is None
x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
profiler = pg.debug.Profiler(
msg='gen_qpath ohlc',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
profiler("generate stream with numba")
# TODO: numba the internals of this!
path = pg.functions.arrayToQPath(x, y, connect=c)
path = pg.functions.arrayToQPath(
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath")
return path
class BarItems(pg.GraphicsObject):
"""Price range bars graphics rendered from a OHLC sequence.
"""
'''
"Price range" bars graphics rendered from a OHLC sampled sequence.
'''
sigPlotChanged = QtCore.pyqtSignal(object)
# 0.5 is no overlap between arms, 1.0 is full overlap
@ -179,17 +208,26 @@ class BarItems(pg.GraphicsObject):
def __init__(
self,
# scene: 'QGraphicsScene', # noqa
linked: LinkedSplits,
plotitem: 'pg.PlotItem', # noqa
pen_color: str = 'bracket',
last_bar_color: str = 'bracket',
name: Optional[str] = None,
) -> None:
super().__init__()
self.linked = linked
# XXX: for the mega-lulz increasing width here increases draw
# latency... so probably don't do it until we figure that out.
self._color = pen_color
self.bars_pen = pg.mkPen(hcolor(pen_color), width=1)
self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2)
self._name = name
self._ds_line_xy: Optional[
tuple[np.ndarray, np.ndarray]
] = None
# NOTE: this prevents redraws on mouse interaction which is
# a huge boon for avg interaction latency.
@ -200,50 +238,79 @@ class BarItems(pg.GraphicsObject):
# that mode?
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# not sure if this is actually impoving anything but figured it
# was worth a shot:
# self.path.reserve(int(100e3 * 6))
self.path = QtGui.QPainterPath()
self._pi = plotitem
self.path = QtGui.QPainterPath()
self.fast_path = QtGui.QPainterPath()
self._xrange: Tuple[int, int]
self._yrange: Tuple[float, float]
self._xrange: tuple[int, int]
self._yrange: tuple[float, float]
self._vrange = None
# TODO: don't render the full backing array each time
# self._path_data = None
self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None
self._last_bar_lines: Optional[tuple[QLineF, ...]] = None
# track the current length of drawable lines within the larger array
self.start_index: int = 0
self.stop_index: int = 0
# downsampler-line state
self._in_ds: bool = False
self._ds_line: Optional[FastAppendCurve] = None
self._dsi: tuple[int, int] = 0, 0
self._xs_in_px: float = 0
def draw_from_data(
self,
data: np.ndarray,
ohlc: np.ndarray,
start: int = 0,
) -> QtGui.QPainterPath:
"""Draw OHLC datum graphics from a ``np.ndarray``.
'''
Draw OHLC datum graphics from a ``np.ndarray``.
This routine is usually only called to draw the initial history.
"""
hist, last = data[:-1], data[-1]
'''
hist, last = ohlc[:-1], ohlc[-1]
self.path = gen_qpath(hist, start, self.w)
# save graphics for later reference and keep track
# of current internal "last index"
# self.start_index = len(data)
index = data['index']
# self.start_index = len(ohlc)
index = ohlc['index']
self._xrange = (index[0], index[-1])
self._yrange = (
np.nanmax(data['high']),
np.nanmin(data['low']),
np.nanmax(ohlc['high']),
np.nanmin(ohlc['low']),
)
# up to last to avoid double draw of last bar
self._last_bar_lines = lines_from_ohlc(last, self.w)
self._last_bar_lines = bar_from_ohlc_row(last, self.w)
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
# TODO: figuring out the most optimial size for the ideal
# curve-path by,
# - calcing the display's max px width `.screen()`
# - drawing a curve and figuring out it's capacity:
# https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - reserving that cap for each curve-mapped-to-shm with
# - leveraging clearing when needed to redraw the entire
# curve that does not release mem allocs:
# https://doc.qt.io/qt-5/qpainterpath.html#clear
curve = FastAppendCurve(
y=y,
x=x,
name='OHLC',
color=self._color,
)
curve.hide()
self._pi.addItem(curve)
self._ds_line = curve
self._ds_xrange = (index[0], index[-1])
# trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update
@ -251,12 +318,27 @@ class BarItems(pg.GraphicsObject):
return self.path
def x_uppx(self) -> int:
if self._ds_line:
return self._ds_line.x_uppx()
else:
return 0
def update_from_array(
self,
array: np.ndarray,
just_history=False,
# full array input history
ohlc: np.ndarray,
# pre-sliced array data that's "in view"
ohlc_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
profiler: Optional[pg.debug.Profiler] = None,
) -> None:
"""Update the last datum's bar graphic from input data array.
'''
Update the last datum's bar graphic from input data array.
This routine should be interface compatible with
``pg.PlotCurveItem.setData()``. Normally this method in
@ -266,63 +348,228 @@ class BarItems(pg.GraphicsObject):
does) so this "should" be simpler and faster.
This routine should be made (transitively) as fast as possible.
"""
'''
profiler = profiler or pg.debug.Profiler(
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
delayed=True,
)
# index = self.start_index
istart, istop = self._xrange
ds_istart, ds_istop = self._ds_xrange
index = array['index']
index = ohlc['index']
first_index, last_index = index[0], index[-1]
# length = len(array)
prepend_length = istart - first_index
append_length = last_index - istop
# length = len(ohlc)
# prepend_length = istart - first_index
# append_length = last_index - istop
# ds_prepend_length = ds_istart - first_index
# ds_append_length = last_index - ds_istop
flip_cache = False
# TODO: allow mapping only a range of lines thus
# only drawing as many bars as exactly specified.
x_gt = 16
if self._ds_line:
uppx = self._ds_line.x_uppx()
else:
uppx = 0
if prepend_length:
should_line = self._in_ds
if (
self._in_ds
and uppx < x_gt
):
should_line = False
# new history was added and we need to render a new path
new_bars = array[:prepend_length]
prepend_path = gen_qpath(new_bars, 0, self.w)
elif (
not self._in_ds
and uppx >= x_gt
):
should_line = True
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from
# array[prepend_length + 1] ???
profiler('ds logic complete')
# update path
old_path = self.path
self.path = prepend_path
self.path.addPath(old_path)
if should_line:
# update the line graphic
# x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
x_iv, y_iv = self._ds_line_xy = ohlc_flatten(ohlc_iv)
profiler('flattening bars to line')
# trigger redraw despite caching
self.prepareGeometryChange()
# TODO: we should be diffing the amount of new data which
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
curve = self._ds_line
curve.update_from_array(
x=x,
y=y,
x_iv=x_iv,
y_iv=y_iv,
view_range=None, # hack
profiler=profiler,
)
profiler('updated ds line')
if append_length:
# generate new lines objects for updatable "current bar"
self._last_bar_lines = lines_from_ohlc(array[-1], self.w)
if not self._in_ds:
# hide bars and show line
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
# generate new graphics to match provided array
# path appending logic:
# we need to get the previous "current bar(s)" for the time step
# and convert it to a sub-path to append to the historical set
# new_bars = array[istop - 1:istop + append_length - 1]
new_bars = array[-append_length - 1:-1]
append_path = gen_qpath(new_bars, 0, self.w)
self.path.moveTo(float(istop - self.w), float(new_bars[0]['open']))
self.path.addPath(append_path)
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {self._name}'
)
# trigger redraw despite caching
self.prepareGeometryChange()
self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
flip_cache = True
# self._pi.addItem(curve)
curve.show()
curve.update()
self._in_ds = True
# stop here since we don't need to update bars path any more
# as we delegate to the downsample line with updates.
profiler.finish()
# print('terminating early')
return
else:
# we should be in bars mode
if self._in_ds:
# flip back to bars graphics and hide the downsample line.
log.info(f'showing bars graphic {self._name}')
curve = self._ds_line
curve.hide()
# self._pi.removeItem(curve)
# XXX: is this actually any faster?
# self._pi.addItem(self)
self.show()
self._in_ds = False
# generate in_view path
self.path = gen_qpath(
ohlc_iv,
0,
self.w,
# path=self.path,
)
# TODO: to make the downsampling faster
# - allow mapping only a range of lines thus only drawing as
# many bars as exactly specified.
# - move ohlc "flattening" to a shmarr
# - maybe move all this embedded logic to a higher
# level type?
# if prepend_length:
# # new history was added and we need to render a new path
# prepend_bars = ohlc[:prepend_length]
# if ds_prepend_length:
# ds_prepend_bars = ohlc[:ds_prepend_length]
# pre_x, pre_y = ohlc_flatten(ds_prepend_bars)
# fx = np.concatenate((pre_x, fx))
# fy = np.concatenate((pre_y, fy))
# profiler('ds line prepend diff complete')
# if append_length:
# # generate new graphics to match provided array
# # path appending logic:
# # we need to get the previous "current bar(s)" for the time step
# # and convert it to a sub-path to append to the historical set
# # new_bars = ohlc[istop - 1:istop + append_length - 1]
# append_bars = ohlc[-append_length - 1:-1]
# # print(f'ohlc bars to append size: {append_bars.size}\n')
# if ds_append_length:
# ds_append_bars = ohlc[-ds_append_length - 1:-1]
# post_x, post_y = ohlc_flatten(ds_append_bars)
# print(
# f'ds curve to append sizes: {(post_x.size, post_y.size)}'
# )
# fx = np.concatenate((fx, post_x))
# fy = np.concatenate((fy, post_y))
# profiler('ds line append diff complete')
profiler('array diffs complete')
# does this work?
last = ohlc[-1]
# fy[-1] = last['close']
# # incremental update and cache line datums
# self._ds_line_xy = fx, fy
# maybe downsample to line
# ds = self.maybe_downsample()
# if ds:
# # if we downsample to a line don't bother with
# # any more path generation / updates
# self._ds_xrange = first_index, last_index
# profiler('downsampled to line')
# return
# print(in_view.size)
# if self.path:
# self.path = path
# self.path.reserve(path.capacity())
# self.path.swap(path)
# path updates
# if prepend_length:
# # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# # y value not matching the first value from
# # ohlc[prepend_length + 1] ???
# prepend_path = gen_qpath(prepend_bars, 0, self.w)
# old_path = self.path
# self.path = prepend_path
# self.path.addPath(old_path)
# profiler('path PREPEND')
# if append_length:
# append_path = gen_qpath(append_bars, 0, self.w)
# self.path.moveTo(
# float(istop - self.w),
# float(append_bars[0]['open'])
# )
# self.path.addPath(append_path)
# profiler('path APPEND')
# fp = self.fast_path
# if fp is None:
# self.fast_path = append_path
# else:
# fp.moveTo(
# float(istop - self.w), float(new_bars[0]['open'])
# )
# fp.addPath(append_path)
# self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# flip_cache = True
self._xrange = first_index, last_index
# trigger redraw despite caching
self.prepareGeometryChange()
# generate new lines objects for updatable "current bar"
self._last_bar_lines = bar_from_ohlc_row(last, self.w)
# last bar update
i, o, h, l, last, v = array[-1][
i, o, h, l, last, v = last[
['index', 'open', 'high', 'low', 'close', 'volume']
]
# assert i == self.start_index - 1
@ -351,11 +598,16 @@ class BarItems(pg.GraphicsObject):
# now out of date / from some previous sample. It's weird
# though because i've seen it do this to bars i - 3 back?
profiler('last bar set')
self.update()
profiler('.update()')
if flip_cache:
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
profiler.finish()
def boundingRect(self):
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
@ -373,12 +625,27 @@ class BarItems(pg.GraphicsObject):
# apparently this a lot faster says the docs?
# https://doc.qt.io/qt-5/qpainterpath.html#controlPointRect
hb = self.path.controlPointRect()
hb_tl, hb_br = hb.topLeft(), hb.bottomRight()
hb_tl, hb_br = (
hb.topLeft(),
hb.bottomRight(),
)
# fp = self.fast_path
# if fp:
# fhb = fp.controlPointRect()
# print((hb_tl, hb_br))
# print(fhb)
# hb_tl, hb_br = (
# fhb.topLeft() + hb.topLeft(),
# fhb.bottomRight() + hb.bottomRight(),
# )
# need to include last bar height or BR will be off
mx_y = hb_br.y()
mn_y = hb_tl.y()
last_lines = self._last_bar_lines
if last_lines:
body_line = self._last_bar_lines[0]
if body_line:
mx_y = max(mx_y, max(body_line.y1(), body_line.y2()))
@ -405,9 +672,16 @@ class BarItems(pg.GraphicsObject):
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget
) -> None:
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
if self._in_ds:
return
profiler = pg.debug.Profiler(
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
# p.setCompositionMode(0)
@ -423,4 +697,8 @@ class BarItems(pg.GraphicsObject):
p.setPen(self.bars_pen)
p.drawPath(self.path)
profiler('draw history path')
profiler(f'draw history path: {self.path.capacity()}')
# if self.fast_path:
# p.drawPath(self.fast_path)
# profiler('draw fast path')

View File

@ -14,9 +14,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
Qt UI styling.
"""
'''
from typing import Optional, Dict
import math
@ -202,8 +203,6 @@ _xaxis_at = 'bottom'
# charting config
CHART_MARGINS = (0, 0, 2, 2)
_min_points_to_show = 6
_bars_to_left_in_follow_mode = int(61*6)
_bars_from_right_in_follow_mode = round(0.16 * _bars_to_left_in_follow_mode)
_tina_mode = False

View File

@ -122,7 +122,8 @@ def optschain(config, symbol, date, rate, test):
@cli.command()
@click.option(
'--profile',
is_flag=True,
'-p',
default=None,
help='Enable pyqtgraph profiling'
)
@click.option(
@ -133,9 +134,16 @@ def optschain(config, symbol, date, rate, test):
@click.argument('symbol', required=True)
@click.pass_obj
def chart(config, symbol, profile, pdb):
"""Start a real-time chartng UI
"""
'''
Start a real-time chartng UI
'''
# eg. ``--profile 3`` reports profiling for anything slower then 3 ms.
if profile is not None:
from .. import _profile
_profile._pg_profile = True
_profile.ms_slower_then = float(profile)
from ._app import _main
if '.' not in symbol:
@ -145,8 +153,6 @@ def chart(config, symbol, profile, pdb):
))
return
# toggle to enable profiling
_profile._pg_profile = profile
# global opts
brokernames = config['brokers']