From bd2460846e9ab8c058aa7af21f8a11798ff01061 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 07:43:49 -0500 Subject: [PATCH] Decorate momo routines --- piker/fsp/_momo.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 2ee55e00..29e94f98 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -23,6 +23,7 @@ from typing import AsyncIterator, Optional import numpy as np from numba import jit, float64, optional, int64 +from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray @@ -106,7 +107,7 @@ def ema( # nopython=True, # nogil=True # ) -def rsi( +def _rsi( # TODO: use https://github.com/ramonhagenaars/nptyping signal: 'np.ndarray[float64]', @@ -146,7 +147,7 @@ def rsi( return rsi, up_ema[-1], down_ema[-1] -def wma( +def _wma( signal: np.ndarray, length: int, @@ -169,10 +170,8 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp.emit( -# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], -# ) -async def _rsi( +@fsp +async def rsi( source: 'QuoteStream[Dict[str, Any]]', # noqa ohlcv: ShmArray, @@ -188,11 +187,11 @@ async def _rsi( sig = ohlcv.array['close'] # wilder says to seed the RSI EMAs with the SMA for the "period" - seed = wma(ohlcv.last(period)['close'], period)[0] + seed = _wma(ohlcv.last(period)['close'], period)[0] # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, last_up_ema_close, last_down_ema_close = rsi( + rsi_h, last_up_ema_close, last_down_ema_close = _rsi( sig, period, seed, seed) up_ema_last = last_up_ema_close down_ema_last = last_down_ema_close @@ -218,7 +217,7 @@ async def _rsi( last_down_ema_close = down_ema_last index = ohlcv.index - rsi_out, up_ema_last, down_ema_last = rsi( + rsi_out, up_ema_last, down_ema_last = _rsi( sig, period=period, up_ema_last=last_up_ema_close, @@ -227,7 +226,8 @@ async def _rsi( yield rsi_out[-1:] -async def _wma( +@fsp +async def wma( source, #: AsyncStream[np.ndarray], length: int, @@ -243,10 +243,10 @@ async def _wma( ''' # deliver historical output as "first yield" - yield wma(ohlcv.array['close'], length) + yield _wma(ohlcv.array['close'], length) # begin real-time section async for quote in source: for tick in iterticks(quote, type='trade'): - yield wma(ohlcv.last(length)) + yield _wma(ohlcv.last(length))