First draft of real-time rsi using numba

bar_select
Tyler Goodlet 2020-09-08 09:59:29 -04:00
parent 9d8a867767
commit 7d24e8eeb0
2 changed files with 149 additions and 51 deletions

View File

@ -1,9 +1,11 @@
""" """
Financial signal processing for the peeps. Financial signal processing for the peeps.
""" """
from typing import AsyncIterator, List from typing import AsyncIterator, List, Callable
import tractor
import numpy as np import numpy as np
from numba import jit, float64, int64, void, optional
from .log import get_logger from .log import get_logger
from . import data from . import data
@ -11,58 +13,49 @@ from . import data
log = get_logger(__name__) log = get_logger(__name__)
def rec2array( async def latency(
rec: np.ndarray, source: 'TickStream[Dict[str, float]]',
fields: List[str] = None ohlcv: np.ndarray
) -> np.ndarray: ) -> AsyncIterator[np.ndarray]:
"""Convert record array to std array. """Compute High-Low midpoint value.
Taken from:
https://github.com/scikit-hep/root_numpy/blob/master/root_numpy/_utils.py#L20
""" """
simplify = False # TODO: do we want to offer yielding this async
# before the rt data connection comes up?
if fields is None: # deliver zeros for all prior history
fields = rec.dtype.names yield np.zeros(len(ohlcv))
elif isinstance(fields, str):
fields = [fields]
simplify = True
# Creates a copy and casts all data to the same type async for quote in source:
arr = np.dstack([rec[field] for field in fields]) ts = quote.get('broker_ts')
if ts:
# Check for array-type fields. If none, then remove outer dimension. # This is codified in the per-broker normalization layer
# Only need to check first field since np.dstack will anyway raise an # TODO: Add more measure points and diffs for full system
# exception if the shapes don't match # stack tracing.
# np.dstack will also fail if fields is an empty list value = quote['brokerd_ts'] - quote['broker_ts']
if not rec.dtype[fields[0]].shape: yield value
arr = arr[0]
if simplify:
# remove last dimension (will be of size 1)
arr = arr.reshape(arr.shape[:-1])
return arr
async def pull_and_process( async def stream_and_process(
bars: np.ndarray, bars: np.ndarray,
brokername: str, brokername: str,
# symbols: List[str], # symbols: List[str],
symbol: str, symbol: str,
fsp_func_name: str, fsp_func_name: str,
func: Callable = latency,
) -> AsyncIterator[dict]: ) -> AsyncIterator[dict]:
# remember, msgpack-numpy's ``from_buffer` returns read-only array
# bars = np.array(bars[list(ohlc_dtype.names)])
# async def _yield_bars(): # async def _yield_bars():
# yield bars # yield bars
# hist_out: np.ndarray = None # hist_out: np.ndarray = None
func = latency
# Conduct a single iteration of fsp with historical bars input # Conduct a single iteration of fsp with historical bars input
# async for hist_out in func(_yield_bars(), bars): # async for hist_out in func(_yield_bars(), bars):
# yield {symbol: hist_out} # yield {symbol: hist_out}
func = _rsi
# open a data feed stream with requested broker # open a data feed stream with requested broker
async with data.open_feed( async with data.open_feed(
@ -90,26 +83,131 @@ async def pull_and_process(
# - how to handle non-plottable values # - how to handle non-plottable values
# - composition of fsps / implicit chaining # - composition of fsps / implicit chaining
async def latency( @jit(
source: 'TickStream[Dict[str, float]]', float64[:](
ohlcv: np.ndarray float64[:],
) -> AsyncIterator[np.ndarray]: optional(float64),
"""Compute High-Low midpoint value. optional(float64)
""" ),
# TODO: do we want to offer yielding this async nopython=True,
# before the rt data connection comes up? nogil=True
)
def ema(
y: 'np.ndarray[float64]',
alpha: optional(float64) = None,
ylast: optional(float64) = None,
) -> 'np.ndarray[float64]':
r"""Exponential weighted moving average owka 'Exponential smoothing'.
# deliver zeros for all prior history - https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
yield np.zeros(len(ohlcv)) - https://en.wikipedia.org/wiki/Exponential_smoothing
Fun facts:
A geometric progression is the discrete version of an
exponential function, that is where the name for this
smoothing method originated according to statistics lore. In
signal processing parlance, an EMA is a first order IIR filter.
.. math::
.tex
{S_{t}={\begin{cases}Y_{1},&t=1
\\\alpha Y_{t}+(1-\alpha )\cdot S_{t-1},&t>1\end{cases}}}
.nerd
(2) s = {
s[0] = y[0]; t = 0
s[t] = a*y[t] + (1-a)*s[t-1], t > 0.
}
"""
n = y.shape[0]
if alpha is None:
# https://en.wikipedia.org/wiki/Moving_average#Relationship_between_SMA_and_EMA
# use the "center of mass" convention making an ema compare
# directly to the com of a SMA or WMA:
alpha = 2 / float(n + 1)
s = np.empty(n, dtype=float64)
if n == 1:
s[0] = y[0] * alpha + ylast * (1 - alpha)
else:
if ylast is None:
s[0] = y[0]
else:
s[0] = ylast
for i in range(1, n):
s[i] = y[i] * alpha + s[i-1] * (1 - alpha)
return s
# @jit(
# float64[:](
# float64[:],
# int64,
# ),
# # nopython=True,
# nogil=True
# )
def rsi(
signal: 'np.ndarray[float64]',
period: int = 14,
up_ema_last: float64 = None,
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
alpha = 1/period
print(signal)
df = np.diff(signal)
up, down = np.where(df > 0, df, 0), np.where(df < 0, -df, 0)
up_ema = ema(up, alpha, up_ema_last)
down_ema = ema(down, alpha, down_ema_last)
rs = up_ema / down_ema
print(f'up_ema: {up_ema}\ndown_ema: {down_ema}')
print(f'rs: {rs}')
# map rs through sigmoid (with range [0, 100])
rsi = 100 - 100 / (1 + rs)
# rsi = 100 * (up_ema / (up_ema + down_ema))
# also return the last ema state for next iteration
return rsi, up_ema[-1], down_ema[-1]
# @piker.fsp(
# aggregates=['30s', '1m', '5m', '1H', '4H', '1D'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa
ohlcv: np.ndarray,
period: int = 14,
) -> AsyncIterator[np.ndarray]:
"""Multi-timeframe streaming RSI.
https://en.wikipedia.org/wiki/Relative_strength_index
"""
sig = ohlcv['close']
rsi_h, up_ema_last, down_ema_last = rsi(sig, period, None, None)
# deliver history
yield rsi_h
async for quote in source: async for quote in source:
ts = quote.get('broker_ts') # tick based updates
if ts: for tick in quote.get('ticks', ()):
# This is codified in the per-broker normalization layer if tick.get('type') == 'trade':
# TODO: Add more measure points and diffs for full system last = np.array([sig[-1], tick['price']])
# stack tracing. # await tractor.breakpoint()
value = quote['brokerd_ts'] - quote['broker_ts'] rsi_out, up_ema_last, down_ema_last = rsi(
yield value last,
period=period,
up_ema_last=up_ema_last,
down_ema_last=down_ema_last,
)
print(f'last: {last}\n rsi: {rsi_out}')
yield rsi_out[-1]
async def wma( async def wma(

View File

@ -736,7 +736,7 @@ async def chart_from_fsp(
f'fsp.{func_name}', # name as title of sub-chart f'fsp.{func_name}', # name as title of sub-chart
# subactor entrypoint # subactor entrypoint
fsp.pull_and_process, fsp.stream_and_process,
bars=bars, bars=bars,
brokername=brokermod.name, brokername=brokermod.name,
symbol=sym, symbol=sym,