Start fsp subpackage, separate momo stuff

bar_select
Tyler Goodlet 2020-09-09 10:46:33 -04:00
parent ee6e4d2207
commit 9a59f2408d
2 changed files with 93 additions and 81 deletions

View File

@ -0,0 +1,80 @@
"""
Financial signal processing for the peeps.
"""
from typing import AsyncIterator, Callable
import numpy as np
from ..log import get_logger
from .. import data
from ._momo import _rsi
log = get_logger(__name__)
_fsps = {'rsi': _rsi}
async def latency(
source: 'TickStream[Dict[str, float]]', # noqa
ohlcv: np.ndarray
) -> AsyncIterator[np.ndarray]:
"""Compute High-Low midpoint value.
"""
# TODO: do we want to offer yielding this async
# before the rt data connection comes up?
# deliver zeros for all prior history
yield np.zeros(len(ohlcv))
async for quote in source:
ts = quote.get('broker_ts')
if ts:
# This is codified in the per-broker normalization layer
# TODO: Add more measure points and diffs for full system
# stack tracing.
value = quote['brokerd_ts'] - quote['broker_ts']
yield value
async def stream_and_process(
bars: np.ndarray,
brokername: str,
# symbols: List[str],
symbol: str,
fsp_func_name: str,
) -> AsyncIterator[dict]:
# remember, msgpack-numpy's ``from_buffer` returns read-only array
# bars = np.array(bars[list(ohlc_dtype.names)])
# async def _yield_bars():
# yield bars
# hist_out: np.ndarray = None
# Conduct a single iteration of fsp with historical bars input
# async for hist_out in func(_yield_bars(), bars):
# yield {symbol: hist_out}
func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker
async with data.open_feed(
brokername,
[symbol],
) as (fquote, stream):
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream):
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
async for processed in func(
filter_by_sym(symbol, stream),
bars,
):
log.info(f"{fsp_func_name}: {processed}")
yield processed

View File

@ -1,82 +1,10 @@
"""
Financial signal processing for the peeps.
Momentum bby.
"""
from typing import AsyncIterator, List, Callable
from typing import AsyncIterator
import tractor
import numpy as np
from numba import jit, float64, int64, void, optional
from .log import get_logger
from . import data
log = get_logger(__name__)
async def latency(
source: 'TickStream[Dict[str, float]]',
ohlcv: np.ndarray
) -> AsyncIterator[np.ndarray]:
"""Compute High-Low midpoint value.
"""
# TODO: do we want to offer yielding this async
# before the rt data connection comes up?
# deliver zeros for all prior history
yield np.zeros(len(ohlcv))
async for quote in source:
ts = quote.get('broker_ts')
if ts:
# This is codified in the per-broker normalization layer
# TODO: Add more measure points and diffs for full system
# stack tracing.
value = quote['brokerd_ts'] - quote['broker_ts']
yield value
async def stream_and_process(
bars: np.ndarray,
brokername: str,
# symbols: List[str],
symbol: str,
fsp_func_name: str,
func: Callable = latency,
) -> AsyncIterator[dict]:
# remember, msgpack-numpy's ``from_buffer` returns read-only array
# bars = np.array(bars[list(ohlc_dtype.names)])
# async def _yield_bars():
# yield bars
# hist_out: np.ndarray = None
# Conduct a single iteration of fsp with historical bars input
# async for hist_out in func(_yield_bars(), bars):
# yield {symbol: hist_out}
func = _rsi
# open a data feed stream with requested broker
async with data.open_feed(
brokername,
[symbol],
) as (fquote, stream):
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream):
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
async for processed in func(
filter_by_sym(symbol, stream),
bars,
):
log.info(f"{fsp_func_name}: {processed}")
yield processed
from numba import jit, float64, optional
# TODO: things to figure the fuck out:
@ -160,15 +88,15 @@ def rsi(
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
alpha = 1/period
print(signal)
# print(signal)
df = np.diff(signal)
up, down = np.where(df > 0, df, 0), np.where(df < 0, -df, 0)
up_ema = ema(up, alpha, up_ema_last)
down_ema = ema(down, alpha, down_ema_last)
rs = up_ema / down_ema
print(f'up_ema: {up_ema}\ndown_ema: {down_ema}')
print(f'rs: {rs}')
# print(f'up_ema: {up_ema}\ndown_ema: {down_ema}')
# print(f'rs: {rs}')
# map rs through sigmoid (with range [0, 100])
rsi = 100 - 100 / (1 + rs)
# rsi = 100 * (up_ema / (up_ema + down_ema))
@ -194,11 +122,14 @@ async def _rsi(
# deliver history
yield rsi_h
_last = sig[-1]
async for quote in source:
# tick based updates
for tick in quote.get('ticks', ()):
if tick.get('type') == 'trade':
last = np.array([sig[-1], tick['price']])
curr = tick['price']
last = np.array([_last, curr])
# await tractor.breakpoint()
rsi_out, up_ema_last, down_ema_last = rsi(
last,
@ -206,7 +137,8 @@ async def _rsi(
up_ema_last=up_ema_last,
down_ema_last=down_ema_last,
)
print(f'last: {last}\n rsi: {rsi_out}')
_last = curr
# print(f'last: {last}\n rsi: {rsi_out}')
yield rsi_out[-1]
@ -236,5 +168,5 @@ async def wma(
'valid'
)
# todo: handle case where frame_len < length - 1
_lookback = frame[-(length-1):]
_lookback = frame[-(length-1):] # noqa
yield wma