Get bar oriented RSI working correctly
@ -1,10 +1,13 @@
Momentum bby.
from typing import AsyncIterator
from typing import AsyncIterator, Optional
import numpy as np
from numba import jit, float64, optional
from ringbuf import RingBuffer
from numba import jit, float64, optional, int64
from import iterticks
# TODO: things to figure the fuck out:
@ -47,6 +50,9 @@ def ema(
s[0] = y[0]; t = 0
s[t] = a*y[t] + (1-a)*s[t-1], t > 0.
More discussion here:
n = y.shape[0]
@ -67,6 +73,7 @@ def ema(
s[0] = ylast
for i in range(1, n):
s[i] = y[i] * alpha + s[i-1] * (1 - alpha)
@ -77,34 +84,40 @@ def ema(
# float64[:](
# float64[:],
# int64,
# float64,
# float64,
# ),
# # nopython=True,
# nopython=True,
# nogil=True
# )
def rsi(
signal: 'np.ndarray[float64]',
period: int = 14,
period: int64 = 14,
up_ema_last: float64 = None,
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
alpha = 1/period
# print(signal)
df = np.diff(signal)
up, down = np.where(df > 0, df, 0), np.where(df < 0, -df, 0)
up = np.where(df > 0, df, 0)
up_ema = ema(up, alpha, up_ema_last)
down = np.where(df < 0, -df, 0)
down_ema = ema(down, alpha, down_ema_last)
# avoid dbz errors
rs = np.divide(
where=down_ema != 0
# print(f'up_ema: {up_ema}\ndown_ema: {down_ema}')
# print(f'rs: {rs}')
# map rs through sigmoid (with range [0, 100])
rsi = 100 - 100 / (1 + rs)
# rsi = 100 * (up_ema / (up_ema + down_ema))
# also return the last ema state for next iteration
return rsi, up_ema[-1], down_ema[-1]
@ -114,67 +127,96 @@ def rsi(
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa
ohlcv: np.ndarray,
ohlcv: "ShmArray[T<'close'>]",
period: int = 14,
) -> AsyncIterator[np.ndarray]:
"""Multi-timeframe streaming RSI.
sig = ohlcv['close']
sig = ohlcv.array['close']
# TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula..
rsi_h, up_ema_last, down_ema_last = rsi(sig, period, None, None)
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, None, None)
# deliver history
yield rsi_h
_last = sig[-1]
index = ohlcv.index
async for quote in source:
# tick based updates
for tick in quote.get('ticks', ()):
if tick.get('type') == 'trade':
curr = tick['price']
last = np.array([_last, curr])
# await tractor.breakpoint()
for tick in iterticks(quote):
# though incorrect below is interesting
# sig = ohlcv.last(period)['close']
sig = ohlcv.last(2)['close']
# the ema needs to be computed from the "last bar"
if ohlcv.index > index:
last_up_ema_close = up_ema_last
last_down_ema_close = down_ema_last
index = ohlcv.index
rsi_out, up_ema_last, down_ema_last = rsi(
_last = curr
# print(f'last: {last}\n rsi: {rsi_out}')
yield rsi_out[-1]
print(f'rsi_out: {rsi_out}')
yield rsi_out[-1:]
def wma(
signal: np.ndarray,
) -> np.ndarray:
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
weights = seq / seq.sum()
assert length == len(weights)
async def wma(
source, #: AsyncStream[np.ndarray],
length: int,
ohlcv: np.ndarray, # price time-frame "aware"
lookback: np.ndarray, # price time-frame "aware"
weights: np.ndarray,
) -> AsyncIterator[np.ndarray]: # i like FinSigStream
"""Weighted moving average.
weights: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
length = len(weights)
_lookback = np.zeros(length - 1)
# deliver historical output as "first yield"
yield np.convolve(ohlcv.array['close'], weights, 'valid')
# begin real-time section
# async for frame_len, frame in source:
async for frame in source:
wma = np.convolve(
# np.concatenate((_lookback, frame)),
# todo: handle case where frame_len < length - 1
_lookback = frame[-(length-1):] # noqa
yield wma
# fill length samples as lookback history
# ringbuf = RingBuffer(format='f', capacity=2*length)
# overflow = ringbuf.push(ohlcv['close'][-length + 1:])
# assert overflow is None
# lookback = np.zeros((length,))
# lookback[:-1] = ohlcv['close'][-length + 1:]
# async for frame in atleast(length, source):
async for quote in source:
for tick in iterticks(quote, type='trade'):
# writes no matter what
overflow = ringbuf.push(np.array([tick['price']]))
assert overflow is None
# history = np.concatenate(ringbuf.pop(length - 1), frame)
sig = ohlcv.last(length)
history = ringbuf.pop(ringbuf.read_available)
yield np.convolve(history, weights, 'valid')
# push back `length-1` datums as lookback in preparation
# for next minimum 1 datum arrival which will require
# another "window's worth" of history.
ringbuf.push(history[-length + 1:])
Reference in New Issue