Fix rsi history off-by-one due to `np.diff()`

fsp_drunken_alignment
Tyler Goodlet 2021-10-05 12:28:27 -04:00
parent dd9f6e8a7c
commit c9136e0494
1 changed files with 43 additions and 17 deletions

View File

@ -16,6 +16,7 @@
"""
Momentum bby.
"""
from typing import AsyncIterator, Optional
@ -23,12 +24,9 @@ import numpy as np
from numba import jit, float64, optional, int64
from ..data._normalize import iterticks
from ..data._sharedmem import ShmArray
# TODO: things to figure the fuck out:
# - how to handle non-plottable values
# - composition of fsps / implicit chaining
@jit(
float64[:](
float64[:],
@ -39,11 +37,14 @@ from ..data._normalize import iterticks
nogil=True
)
def ema(
y: 'np.ndarray[float64]',
alpha: optional(float64) = None,
ylast: optional(float64) = None,
) -> 'np.ndarray[float64]':
r"""Exponential weighted moving average owka 'Exponential smoothing'.
r'''
Exponential weighted moving average owka 'Exponential smoothing'.
- https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- https://en.wikipedia.org/wiki/Exponential_smoothing
@ -68,7 +69,8 @@ def ema(
More discussion here:
https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
"""
'''
n = y.shape[0]
if alpha is None:
@ -105,14 +107,21 @@ def ema(
# nogil=True
# )
def rsi(
# TODO: use https://github.com/ramonhagenaars/nptyping
signal: 'np.ndarray[float64]',
period: int64 = 14,
up_ema_last: float64 = None,
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
'''
relative strengggth.
'''
alpha = 1/period
df = np.diff(signal)
df = np.diff(signal, prepend=0)
up = np.where(df > 0, df, 0)
up_ema = ema(up, alpha, up_ema_last)
@ -120,11 +129,12 @@ def rsi(
down = np.where(df < 0, -df, 0)
down_ema = ema(down, alpha, down_ema_last)
# avoid dbz errors
# avoid dbz errors, this leaves the first
# index == 0 right?
rs = np.divide(
up_ema,
down_ema,
out=np.zeros_like(up_ema),
out=np.zeros_like(signal),
where=down_ema != 0
)
@ -137,10 +147,18 @@ def rsi(
def wma(
signal: np.ndarray,
length: int,
weights: Optional[np.ndarray] = None,
) -> np.ndarray:
'''
Compute a windowed moving average of ``signal`` with window
``length`` and optional ``weights`` (must be same size as
``signal``).
'''
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
@ -151,18 +169,22 @@ def wma(
return np.convolve(signal, weights, 'valid')
# @piker.fsp.signal(
# @piker.fsp.emit(
# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa
ohlcv: "ShmArray[T<'close'>]",
ohlcv: ShmArray,
period: int = 14,
) -> AsyncIterator[np.ndarray]:
"""Multi-timeframe streaming RSI.
'''
Multi-timeframe streaming RSI.
https://en.wikipedia.org/wiki/Relative_strength_index
"""
'''
sig = ohlcv.array['close']
# wilder says to seed the RSI EMAs with the SMA for the "period"
@ -170,7 +192,8 @@ async def _rsi(
# TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula..
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed)
rsi_h, last_up_ema_close, last_down_ema_close = rsi(
sig, period, seed, seed)
up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close
@ -178,7 +201,6 @@ async def _rsi(
yield rsi_h
index = ohlcv.index
async for quote in source:
# tick based updates
for tick in iterticks(quote):
@ -206,16 +228,20 @@ async def _rsi(
async def _wma(
source, #: AsyncStream[np.ndarray],
length: int,
ohlcv: np.ndarray, # price time-frame "aware"
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming weighted moving average.
'''
Streaming weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
"""
'''
# deliver historical output as "first yield"
yield wma(ohlcv.array['close'], length)