diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 0cb58867..0b73b071 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -112,7 +112,7 @@ async def cascade( n.start_soon(increment_signals, feed, dst) async for processed in out_stream: - log.info(f"{fsp_func_name}: {processed}") + log.debug(f"{fsp_func_name}: {processed}") index = src.index dst.array[-1][fsp_func_name] = processed await ctx.send_yield(index) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 26e72b58..6c701ef2 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -122,6 +122,22 @@ def rsi( return rsi, up_ema[-1], down_ema[-1] +def wma( + signal: np.ndarray, + length: int, + weights: Optional[np.ndarray] = None, +) -> np.ndarray: + if weights is None: + # default is a standard arithmetic mean + seq = np.full((length,), 1) + weights = seq / seq.sum() + + assert length == len(weights) + + # return np.convolve(ohlcv.array['close'], weights, 'valid') + return np.convolve(signal, weights, 'valid') + + # @piker.fsp( # aggregates=['30s', '1m', '5m', '1H', '4H', '1D'], # ) @@ -136,9 +152,14 @@ async def _rsi( """ sig = ohlcv.array['close'] + # wilder says to seed the RSI EMAs with the SMA for the "period" + seed = wma(ohlcv.last(period)['close'], period)[0] + # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, None, None) + rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) + up_ema_last = last_up_ema_close + down_ema_last = last_down_ema_close # deliver history yield rsi_h @@ -150,9 +171,13 @@ async def _rsi( for tick in iterticks(quote): # though incorrect below is interesting # sig = ohlcv.last(period)['close'] + + # get only the last 2 "datums" which will be diffed to + # calculate the real-time RSI output datum sig = ohlcv.last(2)['close'] # the ema needs to be computed from the "last bar" + # TODO: how to make this cleaner if ohlcv.index > index: last_up_ema_close = up_ema_last last_down_ema_close = down_ema_last @@ -164,25 +189,13 @@ async def _rsi( up_ema_last=last_up_ema_close, down_ema_last=last_down_ema_close, ) - print(f'rsi_out: {rsi_out}') yield rsi_out[-1:] -def wma( - signal: np.ndarray, -) -> np.ndarray: - if weights is None: - # default is a standard arithmetic mean - seq = np.full((length,), 1) - weights = seq / seq.sum() - assert length == len(weights) - - -async def wma( +async def _wma( source, #: AsyncStream[np.ndarray], length: int, ohlcv: np.ndarray, # price time-frame "aware" - weights: Optional[np.ndarray] = None, ) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? """Streaming weighted moving average. @@ -191,32 +204,10 @@ async def wma( ``weights = np.arange(1, N) * N*(N-1)/2``. """ # deliver historical output as "first yield" - yield np.convolve(ohlcv.array['close'], weights, 'valid') + yield wma(ohlcv.array['close'], length) # begin real-time section - # fill length samples as lookback history - # ringbuf = RingBuffer(format='f', capacity=2*length) - # overflow = ringbuf.push(ohlcv['close'][-length + 1:]) - # assert overflow is None - - # lookback = np.zeros((length,)) - # lookback[:-1] = ohlcv['close'][-length + 1:] - - # async for frame in atleast(length, source): async for quote in source: for tick in iterticks(quote, type='trade'): - # writes no matter what - overflow = ringbuf.push(np.array([tick['price']])) - assert overflow is None - - # history = np.concatenate(ringbuf.pop(length - 1), frame) - - sig = ohlcv.last(length) - history = ringbuf.pop(ringbuf.read_available) - yield np.convolve(history, weights, 'valid') - - # push back `length-1` datums as lookback in preparation - # for next minimum 1 datum arrival which will require - # another "window's worth" of history. - ringbuf.push(history[-length + 1:]) + yield wma(ohlcv.last(length))