398 lines
10 KiB
Python
398 lines
10 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
'''
|
|
Graphics related downsampling routines for compressing to pixel
|
|
limits on the display device.
|
|
|
|
'''
|
|
import math
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
from numpy.lib import recfunctions as rfn
|
|
from numba import (
|
|
jit,
|
|
# float64, optional, int64,
|
|
)
|
|
|
|
from ..log import get_logger
|
|
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
def hl2mxmn(ohlc: np.ndarray) -> np.ndarray:
|
|
'''
|
|
Convert a OHLC struct-array containing 'high'/'low' columns
|
|
to a "joined" max/min 1-d array.
|
|
|
|
'''
|
|
index = ohlc['index']
|
|
hls = ohlc[[
|
|
'low',
|
|
'high',
|
|
]]
|
|
|
|
mxmn = np.empty(2*hls.size, dtype=np.float64)
|
|
x = np.empty(2*hls.size, dtype=np.float64)
|
|
trace_hl(hls, mxmn, x, index[0])
|
|
x = x + index[0]
|
|
|
|
return mxmn, x
|
|
|
|
|
|
@jit(
|
|
# TODO: the type annots..
|
|
# float64[:](float64[:],),
|
|
nopython=True,
|
|
)
|
|
def trace_hl(
|
|
hl: 'np.ndarray',
|
|
out: np.ndarray,
|
|
x: np.ndarray,
|
|
start: int,
|
|
|
|
# the "offset" values in the x-domain which
|
|
# place the 2 output points around each ``int``
|
|
# master index.
|
|
margin: float = 0.43,
|
|
|
|
) -> None:
|
|
'''
|
|
"Trace" the outline of the high-low values of an ohlc sequence
|
|
as a line such that the maximum deviation (aka disperaion) between
|
|
bars if preserved.
|
|
|
|
This routine is expected to modify input arrays in-place.
|
|
|
|
'''
|
|
last_l = hl['low'][0]
|
|
last_h = hl['high'][0]
|
|
|
|
for i in range(hl.size):
|
|
row = hl[i]
|
|
l, h = row['low'], row['high']
|
|
|
|
up_diff = h - last_l
|
|
down_diff = last_h - l
|
|
|
|
if up_diff > down_diff:
|
|
out[2*i + 1] = h
|
|
out[2*i] = last_l
|
|
else:
|
|
out[2*i + 1] = l
|
|
out[2*i] = last_h
|
|
|
|
last_l = l
|
|
last_h = h
|
|
|
|
x[2*i] = int(i) - margin
|
|
x[2*i + 1] = int(i) + margin
|
|
|
|
return out
|
|
|
|
|
|
def downsample(
|
|
x: np.ndarray,
|
|
y: np.ndarray,
|
|
bins: int = 2,
|
|
|
|
method: str = 'peak',
|
|
|
|
**kwargs,
|
|
|
|
) -> tuple[np.ndarray, np.ndarray]:
|
|
'''
|
|
Downsample x/y data for lesser curve graphics gen.
|
|
|
|
The "peak" method is originally copied verbatim from
|
|
``pyqtgraph.PlotDataItem.getDisplayDataset()`` which gets
|
|
all credit, though we will likely drop this in favor of the M4
|
|
algo below.
|
|
|
|
'''
|
|
# py3.10 syntax
|
|
match method:
|
|
case 'peak':
|
|
if bins < 2:
|
|
log.warning('No downsampling taking place?')
|
|
|
|
ds = bins
|
|
n = len(x) // ds
|
|
x1 = np.empty((n, 2))
|
|
|
|
# start of x-values; try to select a somewhat centered point
|
|
stx = ds // 2
|
|
x1[:] = x[stx:stx+n*ds:ds, np.newaxis]
|
|
x = x1.reshape(n*2)
|
|
|
|
y1 = np.empty((n, 2))
|
|
y2 = y[:n*ds].reshape((n, ds))
|
|
|
|
y1[:, 0] = y2.max(axis=1)
|
|
y1[:, 1] = y2.min(axis=1)
|
|
y = y1.reshape(n*2)
|
|
|
|
return x, y
|
|
|
|
case 'm4':
|
|
return ds_m4(x, y, kwargs['px_width'])
|
|
|
|
|
|
def ohlc_flatten(
|
|
ohlc: np.ndarray,
|
|
use_mxmn: bool = True,
|
|
|
|
) -> tuple[np.ndarray, np.ndarray]:
|
|
'''
|
|
Convert an OHLCV struct-array into a flat ready-for-line-plotting
|
|
1-d array that is 4 times the size with x-domain values distributed
|
|
evenly (by 0.5 steps) over each index.
|
|
|
|
'''
|
|
index = ohlc['index']
|
|
|
|
if use_mxmn:
|
|
# traces a line optimally over highs to lows
|
|
# using numba. NOTE: pretty sure this is faster
|
|
# and looks about the same as the below output.
|
|
flat, x = hl2mxmn(ohlc)
|
|
|
|
else:
|
|
flat = rfn.structured_to_unstructured(
|
|
ohlc[['open', 'high', 'low', 'close']]
|
|
).flatten()
|
|
|
|
x = np.linspace(
|
|
start=index[0] - 0.5,
|
|
stop=index[-1] + 0.5,
|
|
num=len(flat),
|
|
)
|
|
return x, flat
|
|
|
|
|
|
def ohlc_to_m4_line(
|
|
ohlc: np.ndarray,
|
|
px_width: int,
|
|
|
|
downsample: bool = False,
|
|
uppx: Optional[float] = None,
|
|
pretrace: bool = False,
|
|
|
|
) -> tuple[np.ndarray, np.ndarray]:
|
|
'''
|
|
Convert an OHLC struct-array to a m4 downsampled 1-d array.
|
|
|
|
'''
|
|
xpts, flat = ohlc_flatten(
|
|
ohlc,
|
|
use_mxmn=pretrace,
|
|
)
|
|
|
|
if downsample:
|
|
bins, x, y = ds_m4(
|
|
xpts,
|
|
flat,
|
|
px_width=px_width,
|
|
uppx=uppx,
|
|
log_scale=bool(uppx)
|
|
)
|
|
x = np.broadcast_to(x[:, None], y.shape)
|
|
x = (x + np.array([-0.43, 0, 0, 0.43])).flatten()
|
|
y = y.flatten()
|
|
|
|
return x, y
|
|
else:
|
|
return xpts, flat
|
|
|
|
|
|
def ds_m4(
|
|
x: np.ndarray,
|
|
y: np.ndarray,
|
|
|
|
# this is the width of the data in view
|
|
# in display-device-local pixel units.
|
|
px_width: int,
|
|
uppx: Optional[float] = None,
|
|
log_scale: bool = True,
|
|
|
|
) -> tuple[int, np.ndarray, np.ndarray]:
|
|
'''
|
|
Downsample using the M4 algorithm.
|
|
|
|
This is more or less an OHLC style sampling of a line-style series.
|
|
|
|
'''
|
|
# NOTE: this method is a so called "visualization driven data
|
|
# aggregation" approach. It gives error-free line chart
|
|
# downsampling, see
|
|
# further scientific paper resources:
|
|
# - http://www.vldb.org/pvldb/vol7/p797-jugel.pdf
|
|
# - http://www.vldb.org/2014/program/papers/demo/p997-jugel.pdf
|
|
|
|
# Details on implementation of this algo are based in,
|
|
# https://github.com/pikers/piker/issues/109
|
|
|
|
# XXX: from infinite on downsampling viewable graphics:
|
|
# "one thing i remembered about the binning - if you are
|
|
# picking a range within your timeseries the start and end bin
|
|
# should be one more bin size outside the visual range, then
|
|
# you get better visual fidelity at the edges of the graph"
|
|
# "i didn't show it in the sample code, but it's accounted for
|
|
# in the start and end indices and number of bins"
|
|
|
|
# optionally log-scale down the "supposed pxs on screen"
|
|
# as the units-per-px (uppx) get's large.
|
|
if log_scale:
|
|
assert uppx, 'You must provide a `uppx` value to use log scaling!'
|
|
|
|
scaler = round(
|
|
max(
|
|
# NOTE: found that a 16x px width brought greater
|
|
# detail, likely due to dpi scaling?
|
|
# px_width=px_width * 16,
|
|
2**6 / (1 + math.log(uppx, 2)),
|
|
1
|
|
)
|
|
)
|
|
px_width *= scaler
|
|
|
|
assert px_width > 1 # width of screen in pxs?
|
|
|
|
# NOTE: if we didn't pre-slice the data to downsample
|
|
# you could in theory pass these as the slicing params,
|
|
# do we care though since we can always just pre-slice the
|
|
# input?
|
|
x_start = x[0] # x value start/lowest in domain
|
|
x_end = x[-1] # x end value/highest in domain
|
|
|
|
# XXX: always round up on the input pixels
|
|
px_width = math.ceil(px_width)
|
|
|
|
x_range = x_end - x_start
|
|
|
|
# ratio of indexed x-value to width of raster in pixels.
|
|
# this is more or less, uppx: units-per-pixel.
|
|
w = x_range / float(px_width)
|
|
|
|
# ensure we make more then enough
|
|
# frames (windows) for the output pixel
|
|
frames = px_width
|
|
|
|
# if we have more and then exact integer's
|
|
# (uniform quotient output) worth of datum-domain-points
|
|
# per windows-frame, add one more window to ensure
|
|
# we have room for all output down-samples.
|
|
pts_per_pixel, r = divmod(len(x), frames)
|
|
if r:
|
|
frames += 1
|
|
|
|
# call into ``numba``
|
|
nb, i_win, y_out = _m4(
|
|
x,
|
|
y,
|
|
|
|
frames,
|
|
|
|
# TODO: see func below..
|
|
# i_win,
|
|
# y_out,
|
|
|
|
# first index in x data to start at
|
|
x_start,
|
|
# window size for each "frame" of data to downsample (normally
|
|
# scaled by the ratio of pixels on screen to data in x-range).
|
|
w,
|
|
)
|
|
|
|
# filter out any overshoot in the input allocation arrays by
|
|
# removing zero-ed tail entries which should start at a certain
|
|
# index.
|
|
i_win = i_win[i_win != 0]
|
|
y_out = y_out[:i_win.size]
|
|
|
|
return nb, i_win, y_out
|
|
|
|
|
|
@jit(
|
|
nopython=True,
|
|
nogil=True,
|
|
)
|
|
def _m4(
|
|
|
|
xs: np.ndarray,
|
|
ys: np.ndarray,
|
|
|
|
frames: int,
|
|
|
|
# TODO: using this approach by having the ``.zeros()`` alloc lines
|
|
# below, in put python was causing segs faults and alloc crashes..
|
|
# we might need to see how it behaves with shm arrays and consider
|
|
# allocating them once at startup?
|
|
|
|
# pre-alloc array of x indices mapping to the start
|
|
# of each window used for downsampling in y.
|
|
# i_win: np.ndarray,
|
|
# pre-alloc array of output downsampled y values
|
|
# y_out: np.ndarray,
|
|
|
|
x_start: int,
|
|
step: float,
|
|
|
|
) -> int:
|
|
# nbins = len(i_win)
|
|
# count = len(xs)
|
|
|
|
# these are pre-allocated and mutated by ``numba``
|
|
# code in-place.
|
|
y_out = np.zeros((frames, 4), ys.dtype)
|
|
i_win = np.zeros(frames, xs.dtype)
|
|
|
|
bincount = 0
|
|
x_left = x_start
|
|
|
|
# Find the first window's starting value which *includes* the
|
|
# first value in the x-domain array, i.e. the first
|
|
# "left-side-of-window" **plus** the downsampling step,
|
|
# creates a window which includes the first x **value**.
|
|
while xs[0] >= x_left + step:
|
|
x_left += step
|
|
|
|
# set all bins in the left-most entry to the starting left-most x value
|
|
# (aka a row broadcast).
|
|
i_win[bincount] = x_left
|
|
# set all y-values to the first value passed in.
|
|
y_out[bincount] = ys[0]
|
|
|
|
for i in range(len(xs)):
|
|
x = xs[i]
|
|
y = ys[i]
|
|
if x < x_left + step: # the current window "step" is [bin, bin+1)
|
|
y_out[bincount, 1] = min(y, y_out[bincount, 1])
|
|
y_out[bincount, 2] = max(y, y_out[bincount, 2])
|
|
y_out[bincount, 3] = y
|
|
else:
|
|
# Find the next bin
|
|
while x >= x_left + step:
|
|
x_left += step
|
|
|
|
bincount += 1
|
|
i_win[bincount] = x_left
|
|
y_out[bincount] = y
|
|
|
|
return bincount, i_win, y_out
|