Display loop mega-cleanup

The most important changes include:
- iterating the new `Flow` type and updating graphics
- adding detailed profiling
- increasing the min uppx before graphics updates are throttled
- including the L1 spread in y-range calcs so that you never have the
  bid/ask go "out of view"..
- pass around `Flow`s instead of shms
- drop all the old prototyped downsampling code
big_data_lines
Tyler Goodlet 2022-04-07 14:11:01 -04:00
parent 7c615a403b
commit ee831baeb3
1 changed files with 59 additions and 108 deletions

View File

@ -30,7 +30,6 @@ import numpy as np
import tractor import tractor
import trio import trio
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5.QtCore import QLineF
from .. import brokers from .. import brokers
from ..data.feed import open_feed from ..data.feed import open_feed
@ -73,13 +72,20 @@ _tick_groups = {
} }
# TODO: delegate this to each `Flow.maxmin()` which includes
# caching and further we should implement the following stream based
# approach, likely with ``numba``:
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
def chart_maxmin( def chart_maxmin(
chart: ChartPlotWidget, chart: ChartPlotWidget,
ohlcv_shm: ShmArray, ohlcv_shm: ShmArray,
vlm_chart: Optional[ChartPlotWidget] = None, vlm_chart: Optional[ChartPlotWidget] = None,
) -> tuple[ ) -> tuple[
tuple[int, int, int, int], tuple[int, int, int, int],
float, float,
float, float,
float, float,
@ -88,11 +94,6 @@ def chart_maxmin(
Compute max and min datums "in view" for range limits. Compute max and min datums "in view" for range limits.
''' '''
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
# array = chart._arrays[chart.name]
array = ohlcv_shm.array array = ohlcv_shm.array
ifirst = array[0]['index'] ifirst = array[0]['index']
@ -105,18 +106,23 @@ def chart_maxmin(
chart.default_view() chart.default_view()
return (last_bars_range, 0, 0, 0) return (last_bars_range, 0, 0, 0)
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) mx, mn = (
np.nanmax(in_view['high']),
# TODO: when we start using line charts, probably want to make np.nanmin(in_view['low'],)
# this an overloaded call on our `DataView )
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
mx_vlm_in_view = 0 mx_vlm_in_view = 0
if vlm_chart: if vlm_chart:
mx_vlm_in_view = np.max(in_view['volume']) mx_vlm_in_view = np.max(
in_view['volume']
)
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view return (
last_bars_range,
mx,
max(mn, 0), # presuming price can't be negative?
mx_vlm_in_view,
)
@dataclass @dataclass
@ -272,8 +278,9 @@ async def graphics_update_loop(
chart.default_view() chart.default_view()
# main loop # main real-time quotes update loop
async for quotes in stream: async for quotes in stream:
ds.quotes = quotes ds.quotes = quotes
quote_period = time.time() - last_quote quote_period = time.time() - last_quote
quote_rate = round( quote_rate = round(
@ -311,36 +318,40 @@ def graphics_update_cycle(
trigger_all: bool = False, # flag used by prepend history updates trigger_all: bool = False, # flag used by prepend history updates
) -> None: ) -> None:
# TODO: eventually optimize this whole graphics stack with ``numba`` # TODO: eventually optimize this whole graphics stack with ``numba``
# hopefully XD # hopefully XD
chart = ds.chart
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg=f'Graphics loop cycle for: `{chart.name}`',
disabled=True, # not pg_profile_enabled(), disabled=True, # not pg_profile_enabled(),
gt=1/12 * 1e3, gt=1/12 * 1e3,
# gt=ms_slower_then,
) )
# unpack multi-referenced components # unpack multi-referenced components
chart = ds.chart
vlm_chart = ds.vlm_chart vlm_chart = ds.vlm_chart
l1 = ds.l1 l1 = ds.l1
ohlcv = ds.ohlcv ohlcv = ds.ohlcv
array = ohlcv.array array = ohlcv.array
vars = ds.vars vars = ds.vars
tick_margin = vars['tick_margin'] tick_margin = vars['tick_margin']
update_uppx = 5 update_uppx = 6
for sym, quote in ds.quotes.items(): for sym, quote in ds.quotes.items():
# compute the first available graphic's x-units-per-pixel
xpx = vlm_chart.view.x_uppx()
# NOTE: vlm may be written by the ``brokerd`` backend # NOTE: vlm may be written by the ``brokerd`` backend
# event though a tick sample is not emitted. # event though a tick sample is not emitted.
# TODO: show dark trades differently # TODO: show dark trades differently
# https://github.com/pikers/piker/issues/116 # https://github.com/pikers/piker/issues/116
# NOTE: this used to be implemented in a dedicated # NOTE: this used to be implemented in a dedicated
# "increment tas": ``check_for_new_bars()`` but it doesn't # "increment task": ``check_for_new_bars()`` but it doesn't
# make sense to do a whole task switch when we can just do # make sense to do a whole task switch when we can just do
# this simple index-diff and all the fsp sub-curve graphics # this simple index-diff and all the fsp sub-curve graphics
# are diffed on each draw cycle anyway; so updates to the # are diffed on each draw cycle anyway; so updates to the
@ -364,60 +375,6 @@ def graphics_update_cycle(
profiler('maxmin call') profiler('maxmin call')
liv = r > i_step # the last datum is in view liv = r > i_step # the last datum is in view
# compute the first available graphic's x-units-per-pixel
xpx = vlm_chart.view.xs_in_px()
# print(f'vlm xpx {xpx}')
in_view = chart.in_view(ohlcv.array)
if lbar != rbar:
# view box width in pxs
w = chart.view.boundingRect().width()
# TODO: a better way to get this?
# i would guess the esiest way is to just
# get the ``.boundingRect()`` of the curve
# in view but maybe there's something smarter?
# Currently we're just mapping the rbar, lbar to
# pixels via:
cw = chart.view.mapViewToDevice(QLineF(lbar, 0, rbar, 0)).length()
# is this faster?
# cw = chart.mapFromView(QLineF(lbar, 0 , rbar, 0)).length()
profiler(
f'view width pxs: {w}\n'
f'curve width pxs: {cw}\n'
f'sliced in view: {in_view.size}'
)
# compress bars to m4 line(s) if uppx is high enough
# if in_view.size > cw:
# from ._compression import ds_m4, hl2mxmn
# mxmn, x = hl2mxmn(in_view)
# profiler('hl tracer')
# nb, x, y = ds_m4(
# x=x,
# y=mxmn,
# # TODO: this needs to actually be the width
# # in pixels of the visible curve since we don't
# # want to downsample any 'zeros' around the curve,
# # just the values that make up the curve graphic,
# # i think?
# px_width=cw,
# )
# profiler(
# 'm4 downsampled\n'
# f' ds bins: {nb}\n'
# f' x.shape: {x.shape}\n'
# f' y.shape: {y.shape}\n'
# f' x: {x}\n'
# f' y: {y}\n'
# )
# assert y.size == mxmn.size
# don't real-time "shift" the curve to the # don't real-time "shift" the curve to the
# left unless we get one of the following: # left unless we get one of the following:
if ( if (
@ -435,7 +392,9 @@ def graphics_update_cycle(
if vlm_chart: if vlm_chart:
# always update y-label # always update y-label
ds.vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
)
if ( if (
(xpx < update_uppx or i_diff > 0) (xpx < update_uppx or i_diff > 0)
@ -464,17 +423,17 @@ def graphics_update_cycle(
if ( if (
mx_vlm_in_view != vars['last_mx_vlm'] mx_vlm_in_view != vars['last_mx_vlm']
): ):
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
yrange = (0, mx_vlm_in_view * 1.375) yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange( vlm_chart.view._set_yrange(
yrange=yrange, yrange=yrange,
) )
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items(): for curve_name, flow in vlm_chart._flows.items():
update_fsp_chart( update_fsp_chart(
vlm_chart, vlm_chart,
flow.shm, flow,
curve_name, curve_name,
array_key=curve_name, array_key=curve_name,
) )
@ -529,7 +488,6 @@ def graphics_update_cycle(
# current) tick first order as an optimization where we only # current) tick first order as an optimization where we only
# update from the last tick from each type class. # update from the last tick from each type class.
# last_clear_updated: bool = False # last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# update ohlc sampled price bars # update ohlc sampled price bars
if ( if (
@ -541,7 +499,7 @@ def graphics_update_cycle(
array, array,
) )
# iterate in FIFO order per frame # iterate in FIFO order per tick-frame
for typ, tick in lasts.items(): for typ, tick in lasts.items():
price = tick.get('price') price = tick.get('price')
@ -612,42 +570,34 @@ def graphics_update_cycle(
if ( if (
(mx > vars['last_mx']) or (mn < vars['last_mn']) (mx > vars['last_mx']) or (mn < vars['last_mn'])
and not chart._static_yrange == 'axis' and not chart._static_yrange == 'axis'
and r > i_step # the last datum is in view
): ):
# print(f'new y range: {(mn, mx)}') main_vb = chart.view
chart.view._set_yrange( if (
yrange=(mn, mx), main_vb._ic is None
# TODO: we should probably scale or not main_vb._ic.is_set()
# the view margin based on the size ):
# of the true range? This way you can main_vb._set_yrange(
# slap in orders outside the current # TODO: we should probably scale
# L1 (only) book range. # the view margin based on the size
# range_margin=0.1, # of the true range? This way you can
) # slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
yrange=(mn, mx),
)
vars['last_mx'], vars['last_mn'] = mx, mn vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all derived fsp subplots # run synchronous update on all linked flows
for name, subchart in ds.linked.subplots.items(): for curve_name, flow in chart._flows.items():
if name == 'volume': # TODO: should the "main" (aka source) flow be special?
if curve_name == chart.data_key:
continue continue
update_fsp_chart(
subchart,
subchart._shm,
# XXX: do we really needs seperate names here?
name,
array_key=name,
)
subchart.cv._set_yrange()
# TODO: all overlays on all subplots..
# run synchronous update on all derived overlays
for curve_name, flow in chart._flows.items():
update_fsp_chart( update_fsp_chart(
chart, chart,
flow.shm, flow,
curve_name, curve_name,
array_key=curve_name, array_key=curve_name,
) )
@ -743,6 +693,7 @@ async def display_symbol_data(
# TODO: a data view api that makes this less shit # TODO: a data view api that makes this less shit
chart._shm = ohlcv chart._shm = ohlcv
chart._flows[chart.data_key].shm = ohlcv
# NOTE: we must immediately tell Qt to show the OHLC chart # NOTE: we must immediately tell Qt to show the OHLC chart
# to avoid a race where the subplots get added/shown to # to avoid a race where the subplots get added/shown to
@ -799,7 +750,7 @@ async def display_symbol_data(
# that it isn't double rendered in the display loop # that it isn't double rendered in the display loop
# above since we do a maxmin calc on the volume data to # above since we do a maxmin calc on the volume data to
# determine if auto-range adjustements should be made. # determine if auto-range adjustements should be made.
linked.subplots.pop('volume', None) # linked.subplots.pop('volume', None)
# TODO: make this not so shit XD # TODO: make this not so shit XD
# close group status # close group status