diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py new file mode 100644 index 00000000..bf2ea127 --- /dev/null +++ b/piker/fsp/__init__.py @@ -0,0 +1,80 @@ +""" +Financial signal processing for the peeps. +""" +from typing import AsyncIterator, Callable + +import numpy as np + +from ..log import get_logger +from .. import data +from ._momo import _rsi + +log = get_logger(__name__) + + +_fsps = {'rsi': _rsi} + + +async def latency( + source: 'TickStream[Dict[str, float]]', # noqa + ohlcv: np.ndarray +) -> AsyncIterator[np.ndarray]: + """Compute High-Low midpoint value. + """ + # TODO: do we want to offer yielding this async + # before the rt data connection comes up? + + # deliver zeros for all prior history + yield np.zeros(len(ohlcv)) + + async for quote in source: + ts = quote.get('broker_ts') + if ts: + # This is codified in the per-broker normalization layer + # TODO: Add more measure points and diffs for full system + # stack tracing. + value = quote['brokerd_ts'] - quote['broker_ts'] + yield value + + +async def stream_and_process( + bars: np.ndarray, + brokername: str, + # symbols: List[str], + symbol: str, + fsp_func_name: str, +) -> AsyncIterator[dict]: + + # remember, msgpack-numpy's ``from_buffer` returns read-only array + # bars = np.array(bars[list(ohlc_dtype.names)]) + + # async def _yield_bars(): + # yield bars + + # hist_out: np.ndarray = None + + # Conduct a single iteration of fsp with historical bars input + # async for hist_out in func(_yield_bars(), bars): + # yield {symbol: hist_out} + func: Callable = _fsps[fsp_func_name] + + # open a data feed stream with requested broker + async with data.open_feed( + brokername, + [symbol], + ) as (fquote, stream): + + # TODO: load appropriate fsp with input args + + async def filter_by_sym(sym, stream): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes + + async for processed in func( + filter_by_sym(symbol, stream), + bars, + ): + log.info(f"{fsp_func_name}: {processed}") + yield processed diff --git a/piker/fsp.py b/piker/fsp/_momo.py similarity index 64% rename from piker/fsp.py rename to piker/fsp/_momo.py index 25014155..6f1a4c0a 100644 --- a/piker/fsp.py +++ b/piker/fsp/_momo.py @@ -1,82 +1,10 @@ """ -Financial signal processing for the peeps. +Momentum bby. """ -from typing import AsyncIterator, List, Callable +from typing import AsyncIterator -import tractor import numpy as np -from numba import jit, float64, int64, void, optional - -from .log import get_logger -from . import data - -log = get_logger(__name__) - - -async def latency( - source: 'TickStream[Dict[str, float]]', - ohlcv: np.ndarray -) -> AsyncIterator[np.ndarray]: - """Compute High-Low midpoint value. - """ - # TODO: do we want to offer yielding this async - # before the rt data connection comes up? - - # deliver zeros for all prior history - yield np.zeros(len(ohlcv)) - - async for quote in source: - ts = quote.get('broker_ts') - if ts: - # This is codified in the per-broker normalization layer - # TODO: Add more measure points and diffs for full system - # stack tracing. - value = quote['brokerd_ts'] - quote['broker_ts'] - yield value - - -async def stream_and_process( - bars: np.ndarray, - brokername: str, - # symbols: List[str], - symbol: str, - fsp_func_name: str, - func: Callable = latency, -) -> AsyncIterator[dict]: - - # remember, msgpack-numpy's ``from_buffer` returns read-only array - # bars = np.array(bars[list(ohlc_dtype.names)]) - - # async def _yield_bars(): - # yield bars - - # hist_out: np.ndarray = None - - # Conduct a single iteration of fsp with historical bars input - # async for hist_out in func(_yield_bars(), bars): - # yield {symbol: hist_out} - func = _rsi - - # open a data feed stream with requested broker - async with data.open_feed( - brokername, - [symbol], - ) as (fquote, stream): - - # TODO: load appropriate fsp with input args - - async def filter_by_sym(sym, stream): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes - - async for processed in func( - filter_by_sym(symbol, stream), - bars, - ): - log.info(f"{fsp_func_name}: {processed}") - yield processed +from numba import jit, float64, optional # TODO: things to figure the fuck out: @@ -160,15 +88,15 @@ def rsi( down_ema_last: float64 = None, ) -> 'np.ndarray[float64]': alpha = 1/period - print(signal) + # print(signal) df = np.diff(signal) up, down = np.where(df > 0, df, 0), np.where(df < 0, -df, 0) up_ema = ema(up, alpha, up_ema_last) down_ema = ema(down, alpha, down_ema_last) rs = up_ema / down_ema - print(f'up_ema: {up_ema}\ndown_ema: {down_ema}') - print(f'rs: {rs}') + # print(f'up_ema: {up_ema}\ndown_ema: {down_ema}') + # print(f'rs: {rs}') # map rs through sigmoid (with range [0, 100]) rsi = 100 - 100 / (1 + rs) # rsi = 100 * (up_ema / (up_ema + down_ema)) @@ -194,11 +122,14 @@ async def _rsi( # deliver history yield rsi_h + _last = sig[-1] + async for quote in source: # tick based updates for tick in quote.get('ticks', ()): if tick.get('type') == 'trade': - last = np.array([sig[-1], tick['price']]) + curr = tick['price'] + last = np.array([_last, curr]) # await tractor.breakpoint() rsi_out, up_ema_last, down_ema_last = rsi( last, @@ -206,7 +137,8 @@ async def _rsi( up_ema_last=up_ema_last, down_ema_last=down_ema_last, ) - print(f'last: {last}\n rsi: {rsi_out}') + _last = curr + # print(f'last: {last}\n rsi: {rsi_out}') yield rsi_out[-1] @@ -236,5 +168,5 @@ async def wma( 'valid' ) # todo: handle case where frame_len < length - 1 - _lookback = frame[-(length-1):] + _lookback = frame[-(length-1):] # noqa yield wma