Add an ohlcv high/low tracer with optional downsampling
							parent
							
								
									2f02f71610
								
							
						
					
					
						commit
						01ea2b3110
					
				| 
						 | 
					@ -13,16 +13,121 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# You should have received a copy of the GNU Affero General Public License
 | 
					# You should have received a copy of the GNU Affero General Public License
 | 
				
			||||||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
					# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					'''
 | 
				
			||||||
 | 
					Graphics related downsampling routines for compressing to pixel
 | 
				
			||||||
 | 
					limits on the display device.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					'''
 | 
				
			||||||
 | 
					# from typing import Optional
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
 | 
					# from numpy.lib.recfunctions import structured_to_unstructured
 | 
				
			||||||
from numba import (
 | 
					from numba import (
 | 
				
			||||||
    jit, float64, optional, int64,
 | 
					    jit,
 | 
				
			||||||
 | 
					    float64, optional, int64,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from ..log import get_logger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					log = get_logger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def hl2mxmn(
 | 
				
			||||||
 | 
					    ohlc: np.ndarray,
 | 
				
			||||||
 | 
					    downsample_by: int = 0,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> np.ndarray:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Convert a OHLC struct-array containing 'high'/'low' columns
 | 
				
			||||||
 | 
					    to a "joined" max/min 1-d array.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    index = ohlc['index']
 | 
				
			||||||
 | 
					    hls = ohlc[[
 | 
				
			||||||
 | 
					        'low',
 | 
				
			||||||
 | 
					        'high',
 | 
				
			||||||
 | 
					    ]]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # XXX: don't really need this any more since we implemented
 | 
				
			||||||
 | 
					    # the "tracer" routine, `numba`-style..
 | 
				
			||||||
 | 
					    # create a "max and min" sequence from ohlc datums
 | 
				
			||||||
 | 
					    # hl2d = structured_to_unstructured(hls)
 | 
				
			||||||
 | 
					    # hl1d = hl2d.flatten()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    mxmn = np.empty(2*hls.size, dtype=np.float64)
 | 
				
			||||||
 | 
					    x = np.empty(2*hls.size, dtype=np.float64)
 | 
				
			||||||
 | 
					    trace_hl(hls, mxmn, x, index[0])
 | 
				
			||||||
 | 
					    x = x + index[0] - 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if not downsample_by > 2:
 | 
				
			||||||
 | 
					        return mxmn, x
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    dsx, dsy = downsample(
 | 
				
			||||||
 | 
					        y=mxmn,
 | 
				
			||||||
 | 
					        x=x,
 | 
				
			||||||
 | 
					        bins=downsample_by,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    log.info(f'downsampling by {downsample_by}')
 | 
				
			||||||
 | 
					    return dsy, dsx
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@jit(
 | 
				
			||||||
 | 
					    # TODO: the type annots..
 | 
				
			||||||
 | 
					    # float64[:](float64[:],),
 | 
				
			||||||
 | 
					    nopython=True,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					def trace_hl(
 | 
				
			||||||
 | 
					    hl: 'np.ndarray',
 | 
				
			||||||
 | 
					    out: np.ndarray,
 | 
				
			||||||
 | 
					    x: np.ndarray,
 | 
				
			||||||
 | 
					    start: int,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # the "offset" values in the x-domain which
 | 
				
			||||||
 | 
					    # place the 2 output points around each ``int``
 | 
				
			||||||
 | 
					    # master index.
 | 
				
			||||||
 | 
					    margin: float = 0.43,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    "Trace" the outline of the high-low values of an ohlc sequence
 | 
				
			||||||
 | 
					    as a line such that the maximum deviation (aka disperaion) between
 | 
				
			||||||
 | 
					    bars if preserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    This routine is expected to modify input arrays in-place.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    last_l = hl['low'][0]
 | 
				
			||||||
 | 
					    last_h = hl['high'][0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for i in range(hl.size):
 | 
				
			||||||
 | 
					        row = hl[i]
 | 
				
			||||||
 | 
					        l, h = row['low'], row['high']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        up_diff = h - last_l
 | 
				
			||||||
 | 
					        down_diff = last_h - l
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if up_diff > down_diff:
 | 
				
			||||||
 | 
					            out[2*i + 1] = h
 | 
				
			||||||
 | 
					            out[2*i] = last_l
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            out[2*i + 1] = l
 | 
				
			||||||
 | 
					            out[2*i] = last_h
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        last_l = l
 | 
				
			||||||
 | 
					        last_h = h
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        x[2*i] = int(i) - margin
 | 
				
			||||||
 | 
					        x[2*i + 1] = int(i) + margin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return out
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def downsample(
 | 
					def downsample(
 | 
				
			||||||
    x: np.ndarray,
 | 
					    x: np.ndarray,
 | 
				
			||||||
    y: np.ndarray,
 | 
					    y: np.ndarray,
 | 
				
			||||||
    bins: int,
 | 
					    bins: int = 2,
 | 
				
			||||||
    method: str = 'peak',
 | 
					    method: str = 'peak',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> tuple[np.ndarray, np.ndarray]:
 | 
					) -> tuple[np.ndarray, np.ndarray]:
 | 
				
			||||||
| 
						 | 
					@ -36,20 +141,31 @@ def downsample(
 | 
				
			||||||
    # py3.10 syntax
 | 
					    # py3.10 syntax
 | 
				
			||||||
    match method:
 | 
					    match method:
 | 
				
			||||||
        case 'peak':
 | 
					        case 'peak':
 | 
				
			||||||
 | 
					            # breakpoint()
 | 
				
			||||||
 | 
					            if bins < 2:
 | 
				
			||||||
 | 
					                log.warning('No downsampling taking place?')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            ds = bins
 | 
					            ds = bins
 | 
				
			||||||
            n = len(x) // ds
 | 
					            n = len(x) // ds
 | 
				
			||||||
            x1 = np.empty((n, 2))
 | 
					            x1 = np.empty((n, 2))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # start of x-values; try to select a somewhat centered point
 | 
					            # start of x-values; try to select a somewhat centered point
 | 
				
			||||||
            stx = ds//2
 | 
					            stx = ds // 2
 | 
				
			||||||
            x1[:] = x[stx:stx+n*ds:ds, np.newaxis]
 | 
					            x1[:] = x[stx:stx+n*ds:ds, np.newaxis]
 | 
				
			||||||
            x = x1.reshape(n*2)
 | 
					            x = x1.reshape(n*2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            y1 = np.empty((n, 2))
 | 
					            y1 = np.empty((n, 2))
 | 
				
			||||||
            y2 = y[:n*ds].reshape((n, ds))
 | 
					            y2 = y[:n*ds].reshape((n, ds))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            y1[:, 0] = y2.max(axis=1)
 | 
					            y1[:, 0] = y2.max(axis=1)
 | 
				
			||||||
            y1[:, 1] = y2.min(axis=1)
 | 
					            y1[:, 1] = y2.min(axis=1)
 | 
				
			||||||
            y = y1.reshape(n*2)
 | 
					            y = y1.reshape(n*2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        case '4px':
 | 
					            return x, y
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # TODO: this algo from infinite, see
 | 
				
			||||||
 | 
					        # https://github.com/pikers/piker/issues/109
 | 
				
			||||||
 | 
					        case 'infinite_4px':
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Ex. from infinite on downsampling viewable graphics.
 | 
					            # Ex. from infinite on downsampling viewable graphics.
 | 
				
			||||||
            # "one thing i remembered about the binning - if you are
 | 
					            # "one thing i remembered about the binning - if you are
 | 
				
			||||||
| 
						 | 
					@ -62,7 +178,7 @@ def downsample(
 | 
				
			||||||
            def build_subchart(
 | 
					            def build_subchart(
 | 
				
			||||||
                self,
 | 
					                self,
 | 
				
			||||||
                subchart,
 | 
					                subchart,
 | 
				
			||||||
                width,  # width of screen?
 | 
					                width,  # width of screen in pxs?
 | 
				
			||||||
                chart_type,
 | 
					                chart_type,
 | 
				
			||||||
                lower,  # x start?
 | 
					                lower,  # x start?
 | 
				
			||||||
                upper,  # x end?
 | 
					                upper,  # x end?
 | 
				
			||||||
| 
						 | 
					@ -86,8 +202,9 @@ def downsample(
 | 
				
			||||||
                        # the width of the screen?
 | 
					                        # the width of the screen?
 | 
				
			||||||
                        (upper-lower)/float(width),
 | 
					                        (upper-lower)/float(width),
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
 | 
					                    print(f'downsampled to {nb} bins')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return x, y
 | 
					            return x, y
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@jit(nopython=True)
 | 
					@jit(nopython=True)
 | 
				
			||||||
| 
						 | 
					@ -101,13 +218,16 @@ def subset_by_x(
 | 
				
			||||||
    step: float,
 | 
					    step: float,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> int:
 | 
					) -> int:
 | 
				
			||||||
    count = len(xs)
 | 
					 | 
				
			||||||
    # nbins = len(bins)
 | 
					    # nbins = len(bins)
 | 
				
			||||||
 | 
					    count = len(xs)
 | 
				
			||||||
    bincount = 0
 | 
					    bincount = 0
 | 
				
			||||||
    x_left = start
 | 
					    x_left = x_start
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Find the first bin
 | 
					    # Find the first bin
 | 
				
			||||||
    while xs[0] >= x_left + step:
 | 
					    first = xs[0]
 | 
				
			||||||
 | 
					    while first >= x_left + step:
 | 
				
			||||||
        x_left += step
 | 
					        x_left += step
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    bins[bincount] = x_left
 | 
					    bins[bincount] = x_left
 | 
				
			||||||
    data[bincount] = ys[0]
 | 
					    data[bincount] = ys[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue