Break wma calc into sync func

bar_select
Tyler Goodlet 2020-09-24 13:04:47 -04:00
parent 3f0e175011
commit 41e85ccaa9
2 changed files with 30 additions and 39 deletions

View File

@ -112,7 +112,7 @@ async def cascade(
n.start_soon(increment_signals, feed, dst) n.start_soon(increment_signals, feed, dst)
async for processed in out_stream: async for processed in out_stream:
log.info(f"{fsp_func_name}: {processed}") log.debug(f"{fsp_func_name}: {processed}")
index = src.index index = src.index
dst.array[-1][fsp_func_name] = processed dst.array[-1][fsp_func_name] = processed
await ctx.send_yield(index) await ctx.send_yield(index)

View File

@ -122,6 +122,22 @@ def rsi(
return rsi, up_ema[-1], down_ema[-1] return rsi, up_ema[-1], down_ema[-1]
def wma(
signal: np.ndarray,
length: int,
weights: Optional[np.ndarray] = None,
) -> np.ndarray:
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
weights = seq / seq.sum()
assert length == len(weights)
# return np.convolve(ohlcv.array['close'], weights, 'valid')
return np.convolve(signal, weights, 'valid')
# @piker.fsp( # @piker.fsp(
# aggregates=['30s', '1m', '5m', '1H', '4H', '1D'], # aggregates=['30s', '1m', '5m', '1H', '4H', '1D'],
# ) # )
@ -136,9 +152,14 @@ async def _rsi(
""" """
sig = ohlcv.array['close'] sig = ohlcv.array['close']
# wilder says to seed the RSI EMAs with the SMA for the "period"
seed = wma(ohlcv.last(period)['close'], period)[0]
# TODO: the emas here should be seeded with a period SMA as per # TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula.. # wilder's original formula..
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, None, None) rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed)
up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close
# deliver history # deliver history
yield rsi_h yield rsi_h
@ -150,9 +171,13 @@ async def _rsi(
for tick in iterticks(quote): for tick in iterticks(quote):
# though incorrect below is interesting # though incorrect below is interesting
# sig = ohlcv.last(period)['close'] # sig = ohlcv.last(period)['close']
# get only the last 2 "datums" which will be diffed to
# calculate the real-time RSI output datum
sig = ohlcv.last(2)['close'] sig = ohlcv.last(2)['close']
# the ema needs to be computed from the "last bar" # the ema needs to be computed from the "last bar"
# TODO: how to make this cleaner
if ohlcv.index > index: if ohlcv.index > index:
last_up_ema_close = up_ema_last last_up_ema_close = up_ema_last
last_down_ema_close = down_ema_last last_down_ema_close = down_ema_last
@ -164,25 +189,13 @@ async def _rsi(
up_ema_last=last_up_ema_close, up_ema_last=last_up_ema_close,
down_ema_last=last_down_ema_close, down_ema_last=last_down_ema_close,
) )
print(f'rsi_out: {rsi_out}')
yield rsi_out[-1:] yield rsi_out[-1:]
def wma(
signal: np.ndarray,
) -> np.ndarray:
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
weights = seq / seq.sum()
assert length == len(weights) async def _wma(
async def wma(
source, #: AsyncStream[np.ndarray], source, #: AsyncStream[np.ndarray],
length: int, length: int,
ohlcv: np.ndarray, # price time-frame "aware" ohlcv: np.ndarray, # price time-frame "aware"
weights: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? ) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming weighted moving average. """Streaming weighted moving average.
@ -191,32 +204,10 @@ async def wma(
``weights = np.arange(1, N) * N*(N-1)/2``. ``weights = np.arange(1, N) * N*(N-1)/2``.
""" """
# deliver historical output as "first yield" # deliver historical output as "first yield"
yield np.convolve(ohlcv.array['close'], weights, 'valid') yield wma(ohlcv.array['close'], length)
# begin real-time section # begin real-time section
# fill length samples as lookback history
# ringbuf = RingBuffer(format='f', capacity=2*length)
# overflow = ringbuf.push(ohlcv['close'][-length + 1:])
# assert overflow is None
# lookback = np.zeros((length,))
# lookback[:-1] = ohlcv['close'][-length + 1:]
# async for frame in atleast(length, source):
async for quote in source: async for quote in source:
for tick in iterticks(quote, type='trade'): for tick in iterticks(quote, type='trade'):
# writes no matter what yield wma(ohlcv.last(length))
overflow = ringbuf.push(np.array([tick['price']]))
assert overflow is None
# history = np.concatenate(ringbuf.pop(length - 1), frame)
sig = ohlcv.last(length)
history = ringbuf.pop(ringbuf.read_available)
yield np.convolve(history, weights, 'valid')
# push back `length-1` datums as lookback in preparation
# for next minimum 1 datum arrival which will require
# another "window's worth" of history.
ringbuf.push(history[-length + 1:])