piker/piker/fsp/_momo.py

253 lines
6.5 KiB
Python
Raw Normal View History

2020-11-06 17:23:14 +00:00
# piker: trading gear for hackers
2022-01-21 22:03:14 +00:00
# Copyright (C) Tyler Goodlet (in stewardship of pikers)
2020-11-06 17:23:14 +00:00
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
"""
Momentum bby.
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
"""
2020-09-23 17:15:27 +00:00
from typing import AsyncIterator, Optional
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
import numpy as np
2020-09-23 17:15:27 +00:00
from numba import jit, float64, optional, int64
2022-01-28 12:43:49 +00:00
from ._api import fsp
2020-09-23 17:15:27 +00:00
from ..data._normalize import iterticks
from ..data._sharedmem import ShmArray
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
@jit(
float64[:](
float64[:],
optional(float64),
optional(float64)
),
nopython=True,
nogil=True
)
def ema(
y: 'np.ndarray[float64]',
alpha: optional(float64) = None,
ylast: optional(float64) = None,
) -> 'np.ndarray[float64]':
r'''
Exponential weighted moving average owka 'Exponential smoothing'.
- https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- https://en.wikipedia.org/wiki/Exponential_smoothing
Fun facts:
A geometric progression is the discrete version of an
exponential function, that is where the name for this
smoothing method originated according to statistics lore. In
signal processing parlance, an EMA is a first order IIR filter.
.. math::
.tex
{S_{t}={\begin{cases}Y_{1},&t=1
\\\alpha Y_{t}+(1-\alpha )\cdot S_{t-1},&t>1\end{cases}}}
.nerd
(2) s = {
s[0] = y[0]; t = 0
s[t] = a*y[t] + (1-a)*s[t-1], t > 0.
}
2020-09-23 17:15:27 +00:00
More discussion here:
https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
'''
n = y.shape[0]
if alpha is None:
# https://en.wikipedia.org/wiki/Moving_average#Relationship_between_SMA_and_EMA
# use the "center of mass" convention making an ema compare
# directly to the com of a SMA or WMA:
alpha = 2 / float(n + 1)
s = np.empty(n, dtype=float64)
if n == 1:
s[0] = y[0] * alpha + ylast * (1 - alpha)
else:
if ylast is None:
s[0] = y[0]
else:
s[0] = ylast
for i in range(1, n):
s[i] = y[i] * alpha + s[i-1] * (1 - alpha)
return s
# @jit(
# float64[:](
# float64[:],
# int64,
2020-09-23 17:15:27 +00:00
# float64,
# float64,
# ),
2020-09-23 17:15:27 +00:00
# nopython=True,
# nogil=True
# )
2022-01-28 12:43:49 +00:00
def _rsi(
# TODO: use https://github.com/ramonhagenaars/nptyping
signal: 'np.ndarray[float64]',
2020-09-23 17:15:27 +00:00
period: int64 = 14,
up_ema_last: float64 = None,
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
'''
relative strengggth.
'''
alpha = 1/period
df = np.diff(signal, prepend=0)
2020-09-23 17:15:27 +00:00
up = np.where(df > 0, df, 0)
up_ema = ema(up, alpha, up_ema_last)
2020-09-23 17:15:27 +00:00
down = np.where(df < 0, -df, 0)
down_ema = ema(down, alpha, down_ema_last)
2020-09-23 17:15:27 +00:00
# avoid dbz errors, this leaves the first
# index == 0 right?
2020-09-11 23:32:07 +00:00
rs = np.divide(
up_ema,
down_ema,
out=np.zeros_like(signal),
2020-09-23 17:15:27 +00:00
where=down_ema != 0
2020-09-11 23:32:07 +00:00
)
2020-09-23 17:15:27 +00:00
# map rs through sigmoid (with range [0, 100])
rsi = 100 - 100 / (1 + rs)
# rsi = 100 * (up_ema / (up_ema + down_ema))
2020-09-23 17:15:27 +00:00
# also return the last ema state for next iteration
return rsi, up_ema[-1], down_ema[-1]
2022-01-28 12:43:49 +00:00
def _wma(
2020-09-24 17:04:47 +00:00
signal: np.ndarray,
length: int,
weights: Optional[np.ndarray] = None,
2020-09-24 17:04:47 +00:00
) -> np.ndarray:
'''
Compute a windowed moving average of ``signal`` with window
``length`` and optional ``weights`` (must be same size as
``signal``).
'''
2020-09-24 17:04:47 +00:00
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
weights = seq / seq.sum()
assert length == len(weights)
return np.convolve(signal, weights, 'valid')
2022-02-04 17:10:44 +00:00
@fsp
async def wma(
source, #: AsyncStream[np.ndarray],
length: int,
ohlcv: np.ndarray, # price time-frame "aware"
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
'''
Streaming weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
'''
# deliver historical output as "first yield"
yield _wma(ohlcv.array['close'], length)
# begin real-time section
async for quote in source:
for tick in iterticks(quote, type='trade'):
yield _wma(ohlcv.last(length))
2022-01-28 12:43:49 +00:00
@fsp
async def rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa
ohlcv: ShmArray,
period: int = 14,
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
) -> AsyncIterator[np.ndarray]:
'''
Multi-timeframe streaming RSI.
https://en.wikipedia.org/wiki/Relative_strength_index
'''
2020-09-23 17:15:27 +00:00
sig = ohlcv.array['close']
2020-09-11 23:32:07 +00:00
2020-09-24 17:04:47 +00:00
# wilder says to seed the RSI EMAs with the SMA for the "period"
2022-01-28 12:43:49 +00:00
seed = _wma(ohlcv.last(period)['close'], period)[0]
2020-09-24 17:04:47 +00:00
2020-09-11 23:32:07 +00:00
# TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula..
2022-01-28 12:43:49 +00:00
rsi_h, last_up_ema_close, last_down_ema_close = _rsi(
sig, period, seed, seed)
up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
# deliver history
yield rsi_h
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
2020-09-23 17:15:27 +00:00
index = ohlcv.index
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
async for quote in source:
# tick based updates
2020-09-23 17:15:27 +00:00
for tick in iterticks(quote):
# though incorrect below is interesting
# sig = ohlcv.last(period)['close']
2020-09-24 17:04:47 +00:00
# get only the last 2 "datums" which will be diffed to
# calculate the real-time RSI output datum
2020-09-23 17:15:27 +00:00
sig = ohlcv.last(2)['close']
# the ema needs to be computed from the "last bar"
2020-09-24 17:04:47 +00:00
# TODO: how to make this cleaner
2020-09-23 17:15:27 +00:00
if ohlcv.index > index:
last_up_ema_close = up_ema_last
last_down_ema_close = down_ema_last
index = ohlcv.index
2022-01-28 12:43:49 +00:00
rsi_out, up_ema_last, down_ema_last = _rsi(
2020-09-23 17:15:27 +00:00
sig,
period=period,
up_ema_last=last_up_ema_close,
down_ema_last=last_down_ema_close,
)
yield rsi_out[-1:]