Use shm in fsp cascading

This kicks off what will be the beginning of hopefully a very nice
(soft) real-time financial signal processing system. We're keeping the
hack to "time align" curves (for now) with the bars for now by slapping
in an extra datum at index 0.
bar_select
Tyler Goodlet 2020-09-22 20:23:48 -04:00
parent 4383579cd0
commit 2fcbefa6e1
1 changed files with 64 additions and 26 deletions

View File

@ -1,13 +1,16 @@
""" """
Financial signal processing for the peeps. Financial signal processing for the peeps.
""" """
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable, Tuple
import trio
import tractor
import numpy as np import numpy as np
from ..log import get_logger from ..log import get_logger
from .. import data from .. import data
from ._momo import _rsi from ._momo import _rsi
from ..data import attach_shm_array, Feed
log = get_logger(__name__) log = get_logger(__name__)
@ -19,7 +22,7 @@ async def latency(
source: 'TickStream[Dict[str, float]]', # noqa source: 'TickStream[Dict[str, float]]', # noqa
ohlcv: np.ndarray ohlcv: np.ndarray
) -> AsyncIterator[np.ndarray]: ) -> AsyncIterator[np.ndarray]:
"""Compute High-Low midpoint value. """Latency measurements, broker to piker.
""" """
# TODO: do we want to offer yielding this async # TODO: do we want to offer yielding this async
# before the rt data connection comes up? # before the rt data connection comes up?
@ -37,33 +40,40 @@ async def latency(
yield value yield value
async def stream_and_process( async def increment_signals(
bars: np.ndarray, feed: Feed,
dst_shm: 'SharedArray', # noqa
) -> None:
async for msg in await feed.index_stream():
array = dst_shm.array
last = array[-1:].copy()
# write new slot to the buffer
dst_shm.push(last)
@tractor.stream
async def cascade(
ctx: tractor.Context,
brokername: str, brokername: str,
# symbols: List[str], src_shm_token: dict,
dst_shm_token: Tuple[str, np.dtype],
symbol: str, symbol: str,
fsp_func_name: str, fsp_func_name: str,
) -> AsyncIterator[dict]: ) -> AsyncIterator[dict]:
"""Chain streaming signal processors and deliver output to
destination mem buf.
"""
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)
# remember, msgpack-numpy's ``from_buffer` returns read-only array
# bars = np.array(bars[list(ohlc_dtype.names)])
# async def _yield_bars():
# yield bars
# hist_out: np.ndarray = None
# Conduct a single iteration of fsp with historical bars input
# async for hist_out in func(_yield_bars(), bars):
# yield {symbol: hist_out}
func: Callable = _fsps[fsp_func_name] func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker # open a data feed stream with requested broker
async with data.open_feed( async with data.open_feed(brokername, [symbol]) as feed:
brokername,
[symbol],
) as (fquote, stream):
assert src.token == feed.shm.token
# TODO: load appropriate fsp with input args # TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream): async def filter_by_sym(sym, stream):
@ -72,9 +82,37 @@ async def stream_and_process(
if symbol == sym: if symbol == sym:
yield quotes yield quotes
async for processed in func( out_stream = func(
filter_by_sym(symbol, stream), filter_by_sym(symbol, feed.stream),
bars, feed.shm,
): )
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history = await out_stream.__anext__()
# TODO: talk to ``pyqtgraph`` core about proper way to solve this:
# XXX: hack to get curves aligned with bars graphics: prepend
# a copy of the first datum..
dst.push(history[:1])
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
yield index
async with trio.open_nursery() as n:
n.start_soon(increment_signals, feed, dst)
async for processed in out_stream:
log.info(f"{fsp_func_name}: {processed}") log.info(f"{fsp_func_name}: {processed}")
yield processed index = src.index
dst.array[-1][fsp_func_name] = processed
await ctx.send_yield(index)