diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index f8811afa..78461d8a 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -16,6 +16,7 @@ """ Momentum bby. + """ from typing import AsyncIterator, Optional @@ -23,12 +24,9 @@ import numpy as np from numba import jit, float64, optional, int64 from ..data._normalize import iterticks +from ..data._sharedmem import ShmArray -# TODO: things to figure the fuck out: -# - how to handle non-plottable values -# - composition of fsps / implicit chaining - @jit( float64[:]( float64[:], @@ -39,11 +37,14 @@ from ..data._normalize import iterticks nogil=True ) def ema( + y: 'np.ndarray[float64]', alpha: optional(float64) = None, ylast: optional(float64) = None, + ) -> 'np.ndarray[float64]': - r"""Exponential weighted moving average owka 'Exponential smoothing'. + r''' + Exponential weighted moving average owka 'Exponential smoothing'. - https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - https://en.wikipedia.org/wiki/Exponential_smoothing @@ -68,7 +69,8 @@ def ema( More discussion here: https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm - """ + + ''' n = y.shape[0] if alpha is None: @@ -105,14 +107,21 @@ def ema( # nogil=True # ) def rsi( + + # TODO: use https://github.com/ramonhagenaars/nptyping signal: 'np.ndarray[float64]', period: int64 = 14, up_ema_last: float64 = None, down_ema_last: float64 = None, + ) -> 'np.ndarray[float64]': + ''' + relative strengggth. + + ''' alpha = 1/period - df = np.diff(signal) + df = np.diff(signal, prepend=0) up = np.where(df > 0, df, 0) up_ema = ema(up, alpha, up_ema_last) @@ -120,11 +129,12 @@ def rsi( down = np.where(df < 0, -df, 0) down_ema = ema(down, alpha, down_ema_last) - # avoid dbz errors + # avoid dbz errors, this leaves the first + # index == 0 right? rs = np.divide( up_ema, down_ema, - out=np.zeros_like(up_ema), + out=np.zeros_like(signal), where=down_ema != 0 ) @@ -137,10 +147,18 @@ def rsi( def wma( + signal: np.ndarray, length: int, weights: Optional[np.ndarray] = None, + ) -> np.ndarray: + ''' + Compute a windowed moving average of ``signal`` with window + ``length`` and optional ``weights`` (must be same size as + ``signal``). + + ''' if weights is None: # default is a standard arithmetic mean seq = np.full((length,), 1) @@ -151,18 +169,22 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp.signal( +# @piker.fsp.emit( # timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], # ) async def _rsi( + source: 'QuoteStream[Dict[str, Any]]', # noqa - ohlcv: "ShmArray[T<'close'>]", + ohlcv: ShmArray, period: int = 14, + ) -> AsyncIterator[np.ndarray]: - """Multi-timeframe streaming RSI. + ''' + Multi-timeframe streaming RSI. https://en.wikipedia.org/wiki/Relative_strength_index - """ + + ''' sig = ohlcv.array['close'] # wilder says to seed the RSI EMAs with the SMA for the "period" @@ -170,7 +192,8 @@ async def _rsi( # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) + rsi_h, last_up_ema_close, last_down_ema_close = rsi( + sig, period, seed, seed) up_ema_last = last_up_ema_close down_ema_last = last_down_ema_close @@ -178,7 +201,6 @@ async def _rsi( yield rsi_h index = ohlcv.index - async for quote in source: # tick based updates for tick in iterticks(quote): @@ -206,16 +228,20 @@ async def _rsi( async def _wma( + source, #: AsyncStream[np.ndarray], length: int, ohlcv: np.ndarray, # price time-frame "aware" + ) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? - """Streaming weighted moving average. + ''' + Streaming weighted moving average. ``weights`` is a sequence of already scaled values. As an example for the WMA often found in "techincal analysis": ``weights = np.arange(1, N) * N*(N-1)/2``. - """ + + ''' # deliver historical output as "first yield" yield wma(ohlcv.array['close'], length)