piker/piker/fsp/_momo.py

227 lines
6.3 KiB
Python
Raw Normal View History

2020-11-06 17:23:14 +00:00
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
"""
Momentum bby.
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
"""
2020-09-23 17:15:27 +00:00
from typing import AsyncIterator, Optional
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
import numpy as np
2020-09-23 17:15:27 +00:00
from numba import jit, float64, optional, int64
from ..data._normalize import iterticks
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
# TODO: things to figure the fuck out:
# - how to handle non-plottable values
# - composition of fsps / implicit chaining
@jit(
float64[:](
float64[:],
optional(float64),
optional(float64)
),
nopython=True,
nogil=True
)
def ema(
y: 'np.ndarray[float64]',
alpha: optional(float64) = None,
ylast: optional(float64) = None,
) -> 'np.ndarray[float64]':
r"""Exponential weighted moving average owka 'Exponential smoothing'.
- https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- https://en.wikipedia.org/wiki/Exponential_smoothing
Fun facts:
A geometric progression is the discrete version of an
exponential function, that is where the name for this
smoothing method originated according to statistics lore. In
signal processing parlance, an EMA is a first order IIR filter.
.. math::
.tex
{S_{t}={\begin{cases}Y_{1},&t=1
\\\alpha Y_{t}+(1-\alpha )\cdot S_{t-1},&t>1\end{cases}}}
.nerd
(2) s = {
s[0] = y[0]; t = 0
s[t] = a*y[t] + (1-a)*s[t-1], t > 0.
}
2020-09-23 17:15:27 +00:00
More discussion here:
https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
"""
n = y.shape[0]
if alpha is None:
# https://en.wikipedia.org/wiki/Moving_average#Relationship_between_SMA_and_EMA
# use the "center of mass" convention making an ema compare
# directly to the com of a SMA or WMA:
alpha = 2 / float(n + 1)
s = np.empty(n, dtype=float64)
if n == 1:
s[0] = y[0] * alpha + ylast * (1 - alpha)
else:
if ylast is None:
s[0] = y[0]
else:
s[0] = ylast
for i in range(1, n):
s[i] = y[i] * alpha + s[i-1] * (1 - alpha)
return s
# @jit(
# float64[:](
# float64[:],
# int64,
2020-09-23 17:15:27 +00:00
# float64,
# float64,
# ),
2020-09-23 17:15:27 +00:00
# nopython=True,
# nogil=True
# )
def rsi(
signal: 'np.ndarray[float64]',
2020-09-23 17:15:27 +00:00
period: int64 = 14,
up_ema_last: float64 = None,
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
alpha = 1/period
df = np.diff(signal)
2020-09-23 17:15:27 +00:00
up = np.where(df > 0, df, 0)
up_ema = ema(up, alpha, up_ema_last)
2020-09-23 17:15:27 +00:00
down = np.where(df < 0, -df, 0)
down_ema = ema(down, alpha, down_ema_last)
2020-09-23 17:15:27 +00:00
# avoid dbz errors
2020-09-11 23:32:07 +00:00
rs = np.divide(
up_ema,
down_ema,
out=np.zeros_like(up_ema),
2020-09-23 17:15:27 +00:00
where=down_ema != 0
2020-09-11 23:32:07 +00:00
)
2020-09-23 17:15:27 +00:00
# map rs through sigmoid (with range [0, 100])
rsi = 100 - 100 / (1 + rs)
# rsi = 100 * (up_ema / (up_ema + down_ema))
2020-09-23 17:15:27 +00:00
# also return the last ema state for next iteration
return rsi, up_ema[-1], down_ema[-1]
2020-09-24 17:04:47 +00:00
def wma(
signal: np.ndarray,
length: int,
weights: Optional[np.ndarray] = None,
) -> np.ndarray:
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
weights = seq / seq.sum()
assert length == len(weights)
return np.convolve(signal, weights, 'valid')
# @piker.fsp.signal(
# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa
2020-09-23 17:15:27 +00:00
ohlcv: "ShmArray[T<'close'>]",
period: int = 14,
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
) -> AsyncIterator[np.ndarray]:
"""Multi-timeframe streaming RSI.
https://en.wikipedia.org/wiki/Relative_strength_index
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
"""
2020-09-23 17:15:27 +00:00
sig = ohlcv.array['close']
2020-09-11 23:32:07 +00:00
2020-09-24 17:04:47 +00:00
# wilder says to seed the RSI EMAs with the SMA for the "period"
seed = wma(ohlcv.last(period)['close'], period)[0]
2020-09-11 23:32:07 +00:00
# TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula..
2020-09-24 17:04:47 +00:00
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed)
up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
# deliver history
yield rsi_h
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
2020-09-23 17:15:27 +00:00
index = ohlcv.index
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
async for quote in source:
# tick based updates
2020-09-23 17:15:27 +00:00
for tick in iterticks(quote):
# though incorrect below is interesting
# sig = ohlcv.last(period)['close']
2020-09-24 17:04:47 +00:00
# get only the last 2 "datums" which will be diffed to
# calculate the real-time RSI output datum
2020-09-23 17:15:27 +00:00
sig = ohlcv.last(2)['close']
# the ema needs to be computed from the "last bar"
2020-09-24 17:04:47 +00:00
# TODO: how to make this cleaner
2020-09-23 17:15:27 +00:00
if ohlcv.index > index:
last_up_ema_close = up_ema_last
last_down_ema_close = down_ema_last
index = ohlcv.index
rsi_out, up_ema_last, down_ema_last = rsi(
sig,
period=period,
up_ema_last=last_up_ema_close,
down_ema_last=last_down_ema_close,
)
yield rsi_out[-1:]
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
2020-09-24 17:04:47 +00:00
async def _wma(
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
source, #: AsyncStream[np.ndarray],
2020-09-23 17:15:27 +00:00
length: int,
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
ohlcv: np.ndarray, # price time-frame "aware"
2020-09-23 17:15:27 +00:00
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming weighted moving average.
WIP initial draft of FSP subsystem This is a first attempt at a financial signal processing subsystem which utilizes async generators for streaming frames of numpy array data between actors. In this initial attempt the focus is on processing price data and relaying it to the chart app for real-time display. So far this seems to work (with decent latency) but much more work is likely needed around improving the data model for even better latency and less data duplication. Surprisingly (or not?) a lot of simplifications to the charting code came out of this in terms of conducting graphics updates in streaming tasks instead of hiding them inside the obfuscated mess that is the Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be to enforce strict semantics around reading and writing of data such that state is kept outside "object trees" as much as possible and streaming function semantics guide our flow model. Unsurprisingly, this reduction in "instance state" is happening wherever we use `trio` ;) A little summary on the technical changes: - not going to explain the fsp system yet; it's too nascent and probably going to get some heavy editing. - drop any "update" methods from the `LinkedCharts` type since each sub-chart will have it's own update task and thus a separate update loop; further individual graphics (per chart) may eventually require this same design. - delete `ChartView`; moved into separate mod. - add "stream from fsp" task to start our foray into real-time actor processed numpy streaming.
2020-08-19 19:32:09 +00:00
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
"""
2020-09-23 17:15:27 +00:00
# deliver historical output as "first yield"
2020-09-24 17:04:47 +00:00
yield wma(ohlcv.array['close'], length)
2020-09-23 17:15:27 +00:00
# begin real-time section
async for quote in source:
for tick in iterticks(quote, type='trade'):
2020-09-24 17:04:47 +00:00
yield wma(ohlcv.last(length))