3rdz the charm: log-linearize minor y-ranges to a major

In very close manner to the original (gut instinct) attempt, this
properly (y-axis-vertically) aligns and scales overlaid curves according
to what we are calling a "log-linearized y-range multi-plot" B)

The basic idea is that a simple returns measure (eg. `R = (p1 - p0)
/ p0`) applied to all curves gives a constant output `R` no matter the
price co-domain in use and thus gives a constant returns over all assets
in view styled scaling; a intuitive visual of returns correlation. The
reference point is for now the left-most point in view (or highest
common index available to all curves), though we can make this
a parameter based on user needs.

A slew of debug `print()`s are left in for now until we iron out the
remaining edge cases to do with re-scaling a major (dispersion) curve
based on a minor now requiring a larger log-linear y-range from that
previous major' range.
log_linearized_curve_overlays
Tyler Goodlet 2023-01-20 18:46:44 -05:00
parent 52ac1053aa
commit 052ce65682
1 changed files with 111 additions and 88 deletions

View File

@ -20,6 +20,7 @@ Chart view box primitives
"""
from __future__ import annotations
from contextlib import asynccontextmanager
import math
import time
from typing import (
Optional,
@ -988,6 +989,13 @@ class ChartView(ViewBox):
profiler(f'<{chart_name}>.interact_graphics_cycle({name})')
# if no overlays, set lone chart's yrange and short circuit
if len(mxmn_groups) < 2:
viz.plot.vb._set_yrange(
yrange=yrange,
)
return
# proportional group auto-scaling per overlay set.
# -> loop through overlays on each multi-chart widget
# and scale all y-ranges based on autoscale config.
@ -1008,6 +1016,7 @@ class ChartView(ViewBox):
float, # y max
float, # y median
slice, # in-view array slice
np.ndarray, # in-view array
],
] = {}
max_start: float = 0
@ -1020,7 +1029,6 @@ class ChartView(ViewBox):
(ymn, ymx),
) = out
x_start = ixrng[0]
max_start = max(x_start, max_start)
@ -1032,27 +1040,28 @@ class ChartView(ViewBox):
# row_stop = arr[read_slc.stop - 1]
if viz.is_ohlc:
y_median = np.median(in_view['close'])
y_med = np.median(in_view['close'])
y_start = row_start['open']
else:
y_median = np.median(in_view[viz.name])
y_med = np.median(in_view[viz.name])
y_start = row_start[viz.name]
# y_stop = row_stop[viz.name]
print(
f'{viz.name} -> (x_start: {x_start}, y_start: {y_start}\n'
)
start_datums[viz.plot.vb] = (
viz,
y_start,
ymn,
ymx,
y_median,
y_med,
read_slc,
in_view,
)
# compute directional (up/down) y-range % swing/dispersion
y_ref = y_median
up_rng = (ymx - y_ref) / y_ref
down_rng = (ymn - y_ref) / y_ref
disp = abs(ymx - ymn) / y_ref
# find curve with max dispersion
disp = abs(ymx - ymn) / y_med
# track the "major" curve as the curve with most
# dispersion.
@ -1062,17 +1071,26 @@ class ChartView(ViewBox):
major_mn = ymn
major_mx = ymx
# compute directional (up/down) y-range % swing/dispersion
y_ref = y_med
up_rng = (ymx - y_ref) / y_ref
down_rng = (ymn - y_ref) / y_ref
mx_up_rng = max(mx_up_rng, up_rng)
mn_down_rng = min(mn_down_rng, down_rng)
print(
'####################\n'
f'{viz.name}@{chart_name} group mxmn calc\n'
'--------------------\nn'
f'y_start: {y_start}\n'
f'ymn: {ymn}\n'
f'ymx: {ymx}\n'
f'mx_disp: {mx_disp}\n'
'####################\n'
f'up %: {up_rng * 100}\n'
f'down %: {down_rng * 100}\n'
'####################\n'
f'mx up %: {mx_up_rng * 100}\n'
f'mn down %: {mn_down_rng * 100}\n'
)
@ -1084,109 +1102,114 @@ class ChartView(ViewBox):
y_start,
y_min,
y_max,
y_median,
y_med,
read_slc,
minor_in_view,
)
) in start_datums.items():
# TODO: just use y_min / y_max directly for the major
# `Viz` instead of the below calc since it should be the
# same output..
symn = y_median * (1 + mn_down_rng)
symx = y_median * (1 + mx_up_rng)
if not (viz is major_viz):
# compute dispersion normed offsets at the start
# index of the smaller dispersion curve.
maj_viz_arr = major_viz.shm.array
# we use the ymn/mx verbatim from the major curve
# (i.e. the curve measured to have the highest
# dispersion in view).
if viz is major_viz:
ymn = y_min
ymx = y_max
else:
key = 'open' if viz.is_ohlc else viz.name
# handle case where major (dispersion) curve has
# a smaller domain then minor one(s).
istart = read_slc.start
if read_slc.start > maj_viz_arr.size:
istart = 0
# handle case where major and minor curve(s) have
# a disjoint x-domain (one curve is smaller in
# length then the other):
# - find the highest (time) index common to both
# curves.
# - slice out the first "intersecting" y-value from
# both curves for use in log-linear scaling such
# that the intersecting y-value is used as the
# reference point for scaling minor curve's
# y-range based on the major curves y-range.
abs_ifirst = minor_in_view[0]['index']
mshm = major_viz.shm
abs_i_start = max(
abs_ifirst,
mshm.array['index'][0],
)
# get intersection point y-values for both curves
y_maj_intersect = mshm._array[abs_i_start][key]
y_min_intersect = minor_in_view[abs_i_start - abs_ifirst]
maj_start_y = maj_viz_arr[istart][key]
# TODO: probably write this as a compile cpython or
# numba func.
maj_start_offset = maj_start_y / major_mn
maj_max_offset = major_mx / major_mn
# compute directional (up/down) y-range
# % swing/dispersion starting at the reference index
# determined by the above indexing arithmetic.
y_ref = y_maj_intersect
assert y_ref
r_up = (major_mx - y_ref) / y_ref
r_down = (major_mn - y_ref) / y_ref
ymn = y_start * (1 + r_down)
ymx = y_start * (1 + r_up)
# XXX: or this?
# maj_start_offset = (maj_start_y - major_mn) / major_mn
# maj_max_offset = (major_mx - maj_start_y) / major_mn
# XXX: handle out of view cases where minor curve
# now is outside the range of the major curve. in
# this case we then re-scale the major curve to
# include the range missing now enforced by the
# minor (now new major for this *side*). Note this
# is side (up/down) specific.
new_maj_mxmn: None | tuple[float, float] = None
if y_max > ymx:
y_ref = y_min_intersect[key]
r_up_minor = (y_max - y_ref) / y_ref
new_maj_ymx = y_maj_intersect * (1 + r_up_minor)
new_maj_mxmn = (major_mn, new_maj_ymx)
ymx = y_max
# XXX: or this?
# major_disp_offset = (
# (maj_viz_arr[istart][key] - major_mn)
# /
# major_mn
# )
# minor_disp_offset_mn = (
# (y_start - y_min)
# /
# y_min
# )
# minor_disp_offset_mx = (
# (ymx - y_start)
# /
# y_min
print(
f'{view.name} OUT OF RANGE:\n'
f'MAJOR is {major_viz.name}\n'
f'y_max:{y_max} > ymx:{ymx}\n'
)
# normed_disp_ratio = minor_disp_offset - major_disp_offset
if y_min < ymn:
y_ref = y_min_intersect[key]
r_down_minor = (y_min - y_ref) / y_ref
new_maj_ymn = y_maj_intersect * (1 + r_down_minor)
new_maj_mxmn = (
new_maj_ymn,
new_maj_ymx[1] if new_maj_mxmn else major_mx
)
ymn = y_min
print(
f'{view.name} OUT OF RANGE:\n'
f'MAJOR is {major_viz.name}\n'
f'y_min:{y_min} < ymn:{ymn}\n'
)
# adjust mxmn range to align curve start point in
# the minor overlay with the major one.
# symn = symn * (1 + normed_disp_ratio)
# symx = symx * (1 + normed_disp_ratio)
# symn = symn - (symn * normed_disp_ratio)
# symx = symx - (symn * normed_disp_ratio)
# symn = y_min * maj_start_offset
# symx = y_min * maj_max_offset
if new_maj_mxmn:
major_viz.plot.vb._set_yrange(
yrange=new_maj_mxmn,
)
print(
f'{view.name} APPLY group mxmn\n'
# f'disp offset ratio diff %: {normed_disp_ratio}\n'
# f'major disp offset %: {major_disp_offset}\n'
# f'minor disp offset %: {minor_disp_offset}\n'
f'y_start: {y_start}\n'
f'mn_down_rng: {mn_down_rng * 100}\n'
f'mx_up_rng: {mx_up_rng * 100}\n'
f'scaled ymn: {symn}\n'
f'scaled ymx: {symx}\n'
f'scaled ymn: {ymn}\n'
f'scaled ymx: {ymx}\n'
f'scaled mx_disp: {mx_disp}\n'
)
if (
math.isinf(ymx)
or math.isinf(ymn)
):
breakpoint()
view._set_yrange(
yrange=(symn, symx),
# range_margin=None,
yrange=(ymn, ymx),
)
# if 'mnq' in viz.name:
# print(
# f'AUTO-Y-RANGING: {viz.name}\n'
# f'i_read_range: {i_read_range}\n'
# f'ixrng: {ixrng}\n'
# f'yrange: {yrange}\n'
# )
# (
# view_xrange,
# view_yrange,
# ) = viz.plot.vb.viewRange()
# view_ymx = view_yrange[1]
# print(
# f'{viz.name}@{chart_name}\n'
# f' xRange -> {view_xrange}\n'
# f' yRange -> {view_yrange}\n'
# f' view y-max -> {view_ymx}\n'
# )
# if view_ymx != symx:
# breakpoint()
profiler.finish()