Add an ohlcv high/low tracer with optional downsampling

big_data_lines
Tyler Goodlet 2022-03-10 17:39:40 -05:00
parent dbe55ad4d2
commit ab8ea41b93
1 changed files with 129 additions and 9 deletions

View File

@ -13,16 +13,121 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Graphics related downsampling routines for compressing to pixel
limits on the display device.
'''
# from typing import Optional
import numpy as np
# from numpy.lib.recfunctions import structured_to_unstructured
from numba import (
jit, float64, optional, int64,
jit,
float64, optional, int64,
)
from ..log import get_logger
log = get_logger(__name__)
def hl2mxmn(
ohlc: np.ndarray,
downsample_by: int = 0,
) -> np.ndarray:
'''
Convert a OHLC struct-array containing 'high'/'low' columns
to a "joined" max/min 1-d array.
'''
index = ohlc['index']
hls = ohlc[[
'low',
'high',
]]
# XXX: don't really need this any more since we implemented
# the "tracer" routine, `numba`-style..
# create a "max and min" sequence from ohlc datums
# hl2d = structured_to_unstructured(hls)
# hl1d = hl2d.flatten()
mxmn = np.empty(2*hls.size, dtype=np.float64)
x = np.empty(2*hls.size, dtype=np.float64)
trace_hl(hls, mxmn, x, index[0])
x = x + index[0] - 1
if not downsample_by > 2:
return mxmn, x
dsx, dsy = downsample(
y=mxmn,
x=x,
bins=downsample_by,
)
log.info(f'downsampling by {downsample_by}')
return dsy, dsx
@jit(
# TODO: the type annots..
# float64[:](float64[:],),
nopython=True,
)
def trace_hl(
hl: 'np.ndarray',
out: np.ndarray,
x: np.ndarray,
start: int,
# the "offset" values in the x-domain which
# place the 2 output points around each ``int``
# master index.
margin: float = 0.43,
) -> None:
'''
"Trace" the outline of the high-low values of an ohlc sequence
as a line such that the maximum deviation (aka disperaion) between
bars if preserved.
This routine is expected to modify input arrays in-place.
'''
last_l = hl['low'][0]
last_h = hl['high'][0]
for i in range(hl.size):
row = hl[i]
l, h = row['low'], row['high']
up_diff = h - last_l
down_diff = last_h - l
if up_diff > down_diff:
out[2*i + 1] = h
out[2*i] = last_l
else:
out[2*i + 1] = l
out[2*i] = last_h
last_l = l
last_h = h
x[2*i] = int(i) - margin
x[2*i + 1] = int(i) + margin
return out
def downsample(
x: np.ndarray,
y: np.ndarray,
bins: int,
bins: int = 2,
method: str = 'peak',
) -> tuple[np.ndarray, np.ndarray]:
@ -36,20 +141,31 @@ def downsample(
# py3.10 syntax
match method:
case 'peak':
# breakpoint()
if bins < 2:
log.warning('No downsampling taking place?')
ds = bins
n = len(x) // ds
x1 = np.empty((n, 2))
# start of x-values; try to select a somewhat centered point
stx = ds//2
stx = ds // 2
x1[:] = x[stx:stx+n*ds:ds, np.newaxis]
x = x1.reshape(n*2)
y1 = np.empty((n, 2))
y2 = y[:n*ds].reshape((n, ds))
y1[:, 0] = y2.max(axis=1)
y1[:, 1] = y2.min(axis=1)
y = y1.reshape(n*2)
case '4px':
return x, y
# TODO: this algo from infinite, see
# https://github.com/pikers/piker/issues/109
case 'infinite_4px':
# Ex. from infinite on downsampling viewable graphics.
# "one thing i remembered about the binning - if you are
@ -62,7 +178,7 @@ def downsample(
def build_subchart(
self,
subchart,
width, # width of screen?
width, # width of screen in pxs?
chart_type,
lower, # x start?
upper, # x end?
@ -86,8 +202,9 @@ def downsample(
# the width of the screen?
(upper-lower)/float(width),
)
print(f'downsampled to {nb} bins')
return x, y
return x, y
@jit(nopython=True)
@ -101,13 +218,16 @@ def subset_by_x(
step: float,
) -> int:
count = len(xs)
# nbins = len(bins)
count = len(xs)
bincount = 0
x_left = start
x_left = x_start
# Find the first bin
while xs[0] >= x_left + step:
first = xs[0]
while first >= x_left + step:
x_left += step
bins[bincount] = x_left
data[bincount] = ys[0]