# piker: trading gear for hackers # Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Super fast ``QPainterPath`` generation related operator routines. """ from math import ( ceil, floor, ) import numpy as np from numpy.lib import recfunctions as rfn from numba import ( # types, njit, float64, int64, # optional, ) # TODO: for ``numba`` typing.. # from ._source import numba_ohlc_dtype from ._m4 import ds_m4 from .._profile import ( Profiler, pg_profile_enabled, ms_slower_then, ) def xy_downsample( x, y, uppx, x_spacer: float = 0.5, ) -> tuple[ np.ndarray, np.ndarray, float, float, ]: ''' Downsample 1D (flat ``numpy.ndarray``) arrays using M4 given an input ``uppx`` (units-per-pixel) and add space between discreet datums. ''' # downsample whenever more then 1 pixels per datum can be shown. # always refresh data bounds until we get diffing # working properly, see above.. m4_out = ds_m4( x, y, uppx, ) if m4_out is not None: bins, x, y, ymn, ymx = m4_out # flatten output to 1d arrays suitable for path-graphics generation. x = np.broadcast_to(x[:, None], y.shape) x = (x + np.array( [-x_spacer, 0, 0, x_spacer] )).flatten() y = y.flatten() return x, y, ymn, ymx # XXX: we accept a None output for the case where the input range # to ``ds_m4()`` is bad (-ve) and we want to catch and debug # that (seemingly super rare) circumstance.. return None @njit( # NOTE: need to construct this manually for readonly # arrays, see https://github.com/numba/numba/issues/4511 # ( # types.Array( # numba_ohlc_dtype, # 1, # 'C', # readonly=True, # ), # int64, # types.unicode_type, # optional(float64), # ), nogil=True ) def path_arrays_from_ohlc( data: np.ndarray, start: int64, bar_w: float64, bar_gap: float64 = 0.16, use_time_index: bool = True, # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622 # index_field: str, ) -> tuple[ np.ndarray, np.ndarray, np.ndarray, ]: ''' Generate an array of lines objects from input ohlc data. ''' size = int(data.shape[0] * 6) # XXX: see this for why the dtype might have to be defined outside # the routine. # https://github.com/numba/numba/issues/4098#issuecomment-493914533 x = np.zeros( shape=size, dtype=float64, ) y, c = x.copy(), x.copy() half_w: float = bar_w/2 # TODO: report bug for assert @ # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 for i, q in enumerate(data[start:], start): open = q['open'] high = q['high'] low = q['low'] close = q['close'] if use_time_index: index = float64(q['time']) else: index = float64(q['index']) # XXX: ``numba`` issue: https://github.com/numba/numba/issues/8622 # index = float64(q[index_field]) # AND this (probably) # open, high, low, close, index = q[ # ['open', 'high', 'low', 'close', 'index']] istart = i * 6 istop = istart + 6 # x,y detail the 6 points which connect all vertexes of a ohlc bar mid: float = index + half_w x[istart:istop] = ( index + bar_gap, mid, mid, mid, mid, index + bar_w - bar_gap, ) y[istart:istop] = ( open, open, low, high, close, close, ) # specifies that the first edge is never connected to the # prior bars last edge thus providing a small "gap"/"space" # between bars determined by ``bar_gap``. c[istart:istop] = (1, 1, 1, 1, 1, 0) return x, y, c def hl2mxmn( ohlc: np.ndarray, index_field: str = 'index', ) -> np.ndarray: ''' Convert a OHLC struct-array containing 'high'/'low' columns to a "joined" max/min 1-d array. ''' index = ohlc[index_field] hls = ohlc[[ 'low', 'high', ]] mxmn = np.empty(2*hls.size, dtype=np.float64) x = np.empty(2*hls.size, dtype=np.float64) trace_hl(hls, mxmn, x, index[0]) x = x + index[0] return mxmn, x @njit( # TODO: the type annots.. # float64[:](float64[:],), ) def trace_hl( hl: 'np.ndarray', out: np.ndarray, x: np.ndarray, start: int, # the "offset" values in the x-domain which # place the 2 output points around each ``int`` # master index. margin: float = 0.43, ) -> None: ''' "Trace" the outline of the high-low values of an ohlc sequence as a line such that the maximum deviation (aka disperaion) between bars if preserved. This routine is expected to modify input arrays in-place. ''' last_l = hl['low'][0] last_h = hl['high'][0] for i in range(hl.size): row = hl[i] l, h = row['low'], row['high'] up_diff = h - last_l down_diff = last_h - l if up_diff > down_diff: out[2*i + 1] = h out[2*i] = last_l else: out[2*i + 1] = l out[2*i] = last_h last_l = l last_h = h x[2*i] = int(i) - margin x[2*i + 1] = int(i) + margin return out def ohlc_flatten( ohlc: np.ndarray, use_mxmn: bool = True, index_field: str = 'index', ) -> tuple[np.ndarray, np.ndarray]: ''' Convert an OHLCV struct-array into a flat ready-for-line-plotting 1-d array that is 4 times the size with x-domain values distributed evenly (by 0.5 steps) over each index. ''' index = ohlc[index_field] if use_mxmn: # traces a line optimally over highs to lows # using numba. NOTE: pretty sure this is faster # and looks about the same as the below output. flat, x = hl2mxmn(ohlc) else: flat = rfn.structured_to_unstructured( ohlc[['open', 'high', 'low', 'close']] ).flatten() x = np.linspace( start=index[0] - 0.5, stop=index[-1] + 0.5, num=len(flat), ) return x, flat def slice_from_time( arr: np.ndarray, start_t: float, stop_t: float, step: float, # sampler period step-diff ) -> slice: ''' Calculate array indices mapped from a time range and return them in a slice. Given an input array with an epoch `'time'` series entry, calculate the indices which span the time range and return in a slice. Presume each `'time'` step increment is uniform and when the time stamp series contains gaps (the uniform presumption is untrue) use ``np.searchsorted()`` binary search to look up the appropriate index. ''' profiler = Profiler( msg='slice_from_time()', disabled=not pg_profile_enabled(), ms_threshold=ms_slower_then, ) times = arr['time'] t_first = floor(times[0]) t_last = ceil(times[-1]) # the greatest index we can return which slices to the # end of the input array. read_i_max = arr.shape[0] # compute (presumed) uniform-time-step index offsets i_start_t = floor(start_t) read_i_start = floor(((i_start_t - t_first) // step)) - 1 i_stop_t = ceil(stop_t) # XXX: edge case -> always set stop index to last in array whenever # the input stop time is detected to be greater then the equiv time # stamp at that last entry. if i_stop_t >= t_last: read_i_stop = read_i_max else: read_i_stop = ceil((i_stop_t - t_first) // step) + 1 # always clip outputs to array support # for read start: # - never allow a start < the 0 index # - never allow an end index > the read array len read_i_start = min( max(0, read_i_start), read_i_max - 1, ) read_i_stop = max( 0, min(read_i_stop, read_i_max), ) # check for larger-then-latest calculated index for given start # time, in which case we do a binary search for the correct index. # NOTE: this is usually the result of a time series with time gaps # where it is expected that each index step maps to a uniform step # in the time stamp series. t_iv_start = times[read_i_start] if ( t_iv_start > i_start_t ): # do a binary search for the best index mapping to ``start_t`` # given we measured an overshoot using the uniform-time-step # calculation from above. # TODO: once we start caching these per source-array, # we can just overwrite ``read_i_start`` directly. new_read_i_start = np.searchsorted( times, i_start_t, side='left', ) # TODO: minimize binary search work as much as possible: # - cache these remap values which compensate for gaps in the # uniform time step basis where we calc a later start # index for the given input ``start_t``. # - can we shorten the input search sequence by heuristic? # up_to_arith_start = index[:read_i_start] if ( new_read_i_start <= read_i_start ): # t_diff = t_iv_start - start_t # print( # f"WE'RE CUTTING OUT TIME - STEP:{step}\n" # f'start_t:{start_t} -> 0index start_t:{t_iv_start}\n' # f'diff: {t_diff}\n' # f'REMAPPED START i: {read_i_start} -> {new_read_i_start}\n' # ) read_i_start = new_read_i_start t_iv_stop = times[read_i_stop - 1] if ( t_iv_stop > i_stop_t ): # t_diff = stop_t - t_iv_stop # print( # f"WE'RE CUTTING OUT TIME - STEP:{step}\n" # f'calced iv stop:{t_iv_stop} -> stop_t:{stop_t}\n' # f'diff: {t_diff}\n' # # f'SHOULD REMAP STOP: {read_i_start} -> {new_read_i_start}\n' # ) new_read_i_stop = np.searchsorted( times[read_i_start:], # times, i_stop_t, side='right', ) if ( new_read_i_stop <= read_i_stop ): read_i_stop = read_i_start + new_read_i_stop + 1 # sanity checks for range size # samples = (i_stop_t - i_start_t) // step # index_diff = read_i_stop - read_i_start + 1 # if index_diff > (samples + 3): # breakpoint() # read-relative indexes: gives a slice where `shm.array[read_slc]` # will be the data spanning the input time range `start_t` -> # `stop_t` read_slc = slice( int(read_i_start), int(read_i_stop), ) profiler( 'slicing complete' # f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' # f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' ) # NOTE: if caller needs absolute buffer indices they can # slice the buffer abs index like so: # index = arr['index'] # abs_indx = index[read_slc] # abs_slc = slice( # int(abs_indx[0]), # int(abs_indx[-1]), # ) return read_slc