diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 557c0614..26e72b58 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -1,10 +1,13 @@ """ Momentum bby. """ -from typing import AsyncIterator +from typing import AsyncIterator, Optional import numpy as np -from numba import jit, float64, optional +from ringbuf import RingBuffer +from numba import jit, float64, optional, int64 + +from ..data._normalize import iterticks # TODO: things to figure the fuck out: @@ -47,6 +50,9 @@ def ema( s[0] = y[0]; t = 0 s[t] = a*y[t] + (1-a)*s[t-1], t > 0. } + + More discussion here: + https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm """ n = y.shape[0] @@ -67,6 +73,7 @@ def ema( else: s[0] = ylast + print(s) for i in range(1, n): s[i] = y[i] * alpha + s[i-1] * (1 - alpha) @@ -77,34 +84,40 @@ def ema( # float64[:]( # float64[:], # int64, +# float64, +# float64, # ), -# # nopython=True, +# nopython=True, # nogil=True # ) def rsi( signal: 'np.ndarray[float64]', - period: int = 14, + period: int64 = 14, up_ema_last: float64 = None, down_ema_last: float64 = None, ) -> 'np.ndarray[float64]': alpha = 1/period - # print(signal) df = np.diff(signal) - up, down = np.where(df > 0, df, 0), np.where(df < 0, -df, 0) + + up = np.where(df > 0, df, 0) up_ema = ema(up, alpha, up_ema_last) + + down = np.where(df < 0, -df, 0) down_ema = ema(down, alpha, down_ema_last) + + # avoid dbz errors rs = np.divide( up_ema, down_ema, out=np.zeros_like(up_ema), - where=down_ema!=0 + where=down_ema != 0 ) - # print(f'up_ema: {up_ema}\ndown_ema: {down_ema}') - # print(f'rs: {rs}') + # map rs through sigmoid (with range [0, 100]) rsi = 100 - 100 / (1 + rs) # rsi = 100 * (up_ema / (up_ema + down_ema)) + # also return the last ema state for next iteration return rsi, up_ema[-1], down_ema[-1] @@ -114,67 +127,96 @@ def rsi( # ) async def _rsi( source: 'QuoteStream[Dict[str, Any]]', # noqa - ohlcv: np.ndarray, + ohlcv: "ShmArray[T<'close'>]", period: int = 14, ) -> AsyncIterator[np.ndarray]: """Multi-timeframe streaming RSI. https://en.wikipedia.org/wiki/Relative_strength_index """ - sig = ohlcv['close'] + sig = ohlcv.array['close'] # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, up_ema_last, down_ema_last = rsi(sig, period, None, None) + rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, None, None) # deliver history yield rsi_h - _last = sig[-1] + index = ohlcv.index async for quote in source: # tick based updates - for tick in quote.get('ticks', ()): - if tick.get('type') == 'trade': - curr = tick['price'] - last = np.array([_last, curr]) - # await tractor.breakpoint() - rsi_out, up_ema_last, down_ema_last = rsi( - last, - period=period, - up_ema_last=up_ema_last, - down_ema_last=down_ema_last, - ) - _last = curr - # print(f'last: {last}\n rsi: {rsi_out}') - yield rsi_out[-1] + for tick in iterticks(quote): + # though incorrect below is interesting + # sig = ohlcv.last(period)['close'] + sig = ohlcv.last(2)['close'] + + # the ema needs to be computed from the "last bar" + if ohlcv.index > index: + last_up_ema_close = up_ema_last + last_down_ema_close = down_ema_last + index = ohlcv.index + + rsi_out, up_ema_last, down_ema_last = rsi( + sig, + period=period, + up_ema_last=last_up_ema_close, + down_ema_last=last_down_ema_close, + ) + print(f'rsi_out: {rsi_out}') + yield rsi_out[-1:] + +def wma( + signal: np.ndarray, +) -> np.ndarray: + if weights is None: + # default is a standard arithmetic mean + seq = np.full((length,), 1) + weights = seq / seq.sum() + + assert length == len(weights) async def wma( source, #: AsyncStream[np.ndarray], + length: int, ohlcv: np.ndarray, # price time-frame "aware" - lookback: np.ndarray, # price time-frame "aware" - weights: np.ndarray, -) -> AsyncIterator[np.ndarray]: # i like FinSigStream - """Weighted moving average. + weights: Optional[np.ndarray] = None, +) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? + """Streaming weighted moving average. ``weights`` is a sequence of already scaled values. As an example for the WMA often found in "techincal analysis": ``weights = np.arange(1, N) * N*(N-1)/2``. """ - length = len(weights) - _lookback = np.zeros(length - 1) + # deliver historical output as "first yield" + yield np.convolve(ohlcv.array['close'], weights, 'valid') - ohlcv.from_tf('5m') + # begin real-time section - # async for frame_len, frame in source: - async for frame in source: - wma = np.convolve( - ohlcv[-length:]['close'], - # np.concatenate((_lookback, frame)), - weights, - 'valid' - ) - # todo: handle case where frame_len < length - 1 - _lookback = frame[-(length-1):] # noqa - yield wma + # fill length samples as lookback history + # ringbuf = RingBuffer(format='f', capacity=2*length) + # overflow = ringbuf.push(ohlcv['close'][-length + 1:]) + # assert overflow is None + + # lookback = np.zeros((length,)) + # lookback[:-1] = ohlcv['close'][-length + 1:] + + # async for frame in atleast(length, source): + async for quote in source: + for tick in iterticks(quote, type='trade'): + # writes no matter what + overflow = ringbuf.push(np.array([tick['price']])) + assert overflow is None + + # history = np.concatenate(ringbuf.pop(length - 1), frame) + + sig = ohlcv.last(length) + history = ringbuf.pop(ringbuf.read_available) + yield np.convolve(history, weights, 'valid') + + # push back `length-1` datums as lookback in preparation + # for next minimum 1 datum arrival which will require + # another "window's worth" of history. + ringbuf.push(history[-length + 1:])