WIP initial draft of FSP subsystem

This is a first attempt at a financial signal processing subsystem which
utilizes async generators for streaming frames of numpy array data
between actors. In this initial attempt the focus is on processing price
data and relaying it to the chart app for real-time display. So far this
seems to work (with decent latency) but much more work is likely needed
around improving the data model for even better latency and less data
duplication.

Surprisingly (or not?) a lot of simplifications to the charting code
came out of this in terms of conducting graphics updates in streaming
tasks instead of hiding them inside the obfuscated mess that is the
Qt-style-inheritance-OO-90s-trash. The goal from here on wards will be
to enforce strict semantics around reading and writing of data such that
state is kept outside "object trees" as much as possible and streaming
function semantics guide our flow model. Unsurprisingly, this reduction
in "instance state" is happening wherever we use `trio` ;)

A little summary on the technical changes:
- not going to explain the fsp system yet; it's too nascent and
  probably going to get some heavy editing.
- drop any "update" methods from the `LinkedCharts` type since each
  sub-chart will have it's own update task and thus a separate update
  loop; further individual graphics (per chart) may eventually require
  this same design.
- delete `ChartView`; moved into separate mod.
- add "stream from fsp" task to start our foray into real-time actor
  processed numpy streaming.
its_happening
Tyler Goodlet 2020-08-19 15:32:09 -04:00
parent f4dddecf17
commit f5ad56a257
3 changed files with 366 additions and 242 deletions

161
piker/fsp.py 100644
View File

@ -0,0 +1,161 @@
"""
Financial signal processing for the peeps.
"""
from typing import AsyncIterator, List
import numpy as np
from .log import get_logger
from . import data
log = get_logger(__name__)
def rec2array(
rec: np.ndarray,
fields: List[str] = None
) -> np.ndarray:
"""Convert record array to std array.
Taken from:
https://github.com/scikit-hep/root_numpy/blob/master/root_numpy/_utils.py#L20
"""
simplify = False
if fields is None:
fields = rec.dtype.names
elif isinstance(fields, str):
fields = [fields]
simplify = True
# Creates a copy and casts all data to the same type
arr = np.dstack([rec[field] for field in fields])
# Check for array-type fields. If none, then remove outer dimension.
# Only need to check first field since np.dstack will anyway raise an
# exception if the shapes don't match
# np.dstack will also fail if fields is an empty list
if not rec.dtype[fields[0]].shape:
arr = arr[0]
if simplify:
# remove last dimension (will be of size 1)
arr = arr.reshape(arr.shape[:-1])
return arr
async def pull_and_process(
bars: np.ndarray,
brokername: str,
# symbols: List[str],
symbol: str,
fsp_func_name: str,
) -> AsyncIterator[dict]:
# async def _yield_bars():
# yield bars
# hist_out: np.ndarray = None
func = latency
# Conduct a single iteration of fsp with historical bars input
# async for hist_out in func(_yield_bars(), bars):
# yield {symbol: hist_out}
# open a data feed stream with requested broker
async with data.open_feed(
brokername,
[symbol],
) as (fquote, stream):
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream):
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
async for processed in func(
filter_by_sym(symbol, stream),
bars,
):
print(f"{fsp_func_name}: {processed}")
yield processed
# TODO: things to figure the fuck out:
# - how to handle non-plottable values
# - composition of fsps / implicit chaining
async def latency(
source: 'TickStream[Dict[str, float]]',
ohlcv: np.ndarray
) -> AsyncIterator[np.ndarray]:
"""Compute High-Low midpoint value.
"""
# TODO: do we want to offer yielding this async
# before the rt data connection comes up?
# deliver zeros for all prior history
yield np.zeros(len(ohlcv))
_last = None
async for quote in source:
fill_time = quote.get('rtTime_s')
if fill_time and fill_time != _last:
value = quote['brokerd_ts'] - fill_time
print(f"latency: {value}")
yield value
_last = fill_time
# ticks = quote.get('ticks', ())
# for tick in ticks:
# if tick.get('type') == 'trade':
async def last(
source: 'TickStream[Dict[str, float]]',
ohlcv: np.ndarray
) -> AsyncIterator[np.ndarray]:
"""Compute High-Low midpoint value.
"""
# first_frame = (await source.__anext__())
# deliver historical processed data first
yield ohlcv['close']
async for quote in source:
yield quote['close']
async def wma(
source, #: AsyncStream[np.ndarray],
ohlcv: np.ndarray, # price time-frame "aware"
lookback: np.ndarray, # price time-frame "aware"
weights: np.ndarray,
) -> AsyncIterator[np.ndarray]: # i like FinSigStream
"""Weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
"""
length = len(weights)
_lookback = np.zeros(length - 1)
ohlcv.from_tf('5m')
# async for frame_len, frame in source:
async for frame in source:
wma = np.convolve(
ohlcv[-length:]['close'],
# np.concatenate((_lookback, frame)),
weights,
'valid'
)
# todo: handle case where frame_len < length - 1
_lookback = frame[-(length-1):]
yield wma

View File

@ -1,11 +1,10 @@
"""
High level Qt chart widgets.
"""
from typing import List, Optional, Tuple, Dict, Any
from typing import Optional, Tuple, Dict, Any
import time
from PyQt5 import QtCore, QtGui
from pyqtgraph import functions as fn
import numpy as np
import pyqtgraph as pg
import tractor
@ -15,14 +14,15 @@ from ._axes import (
DynamicDateAxis,
PriceAxis,
)
from ._graphics import CrossHair, ChartType
from ._style import _xaxis_at
from ._graphics import CrossHair, BarItems
from ._style import _xaxis_at, _min_points_to_show
from ._source import Symbol
from .. import brokers
from .. import data
from ..log import get_logger
from ._exec import run_qtractor
from ._source import ohlc_dtype
from ._interaction import ChartView
from .. import fsp
@ -50,7 +50,7 @@ class ChartSpace(QtGui.QWidget):
self.v_layout.addLayout(self.toolbar_layout)
self.v_layout.addLayout(self.h_layout)
self._plot_cache = {}
self._chart_cache = {}
def init_timeframes_ui(self):
self.tf_layout = QtGui.QHBoxLayout()
@ -81,16 +81,19 @@ class ChartSpace(QtGui.QWidget):
"""
# XXX: let's see if this causes mem problems
self.window.setWindowTitle(f'piker chart {symbol}')
self.chart = self._plot_cache.setdefault(symbol, LinkedSplitCharts())
linkedcharts = self._chart_cache.setdefault(
symbol,
LinkedSplitCharts()
)
s = Symbol(key=symbol)
# remove any existing plots
if not self.h_layout.isEmpty():
self.h_layout.removeWidget(self.chart)
self.h_layout.removeWidget(linkedcharts)
self.chart.plot(s, data)
self.h_layout.addWidget(self.chart)
return self.chart
main_chart = linkedcharts.plot_main(s, data)
self.h_layout.addWidget(linkedcharts)
return linkedcharts, main_chart
# TODO: add signalling painter system
# def add_signals(self):
@ -118,7 +121,7 @@ class LinkedSplitCharts(QtGui.QWidget):
self._array: np.ndarray = None # main data source
self._ch: CrossHair = None # crosshair graphics
self.chart: ChartPlotWidget = None # main (ohlc) chart
self.subplots: List[ChartPlotWidget] = []
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
self.xaxis = DynamicDateAxis(
orientation='bottom',
@ -148,29 +151,28 @@ class LinkedSplitCharts(QtGui.QWidget):
"""Set the proportion of space allocated for linked subcharts.
"""
major = 1 - prop
min_h_ind = int(self.height() * prop / len(self.subplots))
min_h_ind = int((self.height() * prop) / len(self.subplots))
sizes = [int(self.height() * major)]
sizes.extend([min_h_ind] * len(self.subplots))
self.splitter.setSizes(sizes) # , int(self.height()*0.2)
def plot(
def plot_main(
self,
symbol: Symbol,
array: np.ndarray,
ohlc: bool = True,
) -> None:
) -> 'ChartPlotWidget':
"""Start up and show main (price) chart and all linked subcharts.
"""
self.digits = symbol.digits()
# XXX: this will eventually be a view onto shared mem
# or some higher level type / API
# TODO: this should eventually be a view onto shared mem or some
# higher level type / API
self._array = array
# add crosshairs
self._ch = CrossHair(
parent=self,
# subplots=[plot for plot, d in self.subplots],
digits=self.digits
)
self.chart = self.add_plot(
@ -179,26 +181,13 @@ class LinkedSplitCharts(QtGui.QWidget):
xaxis=self.xaxis,
ohlc=True,
)
# add crosshair graphic
self.chart.addItem(self._ch)
# style?
self.chart.setFrameStyle(QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain)
# TODO: this is where we would load an indicator chain
# XXX: note, if this isn't index aligned with
# the source data the chart will go haywire.
inds = [('open', lambda a: a['close'])]
for name, func in inds:
# compute historical subchart values from input array
data = func(array)
# create sub-plot
ind_chart = self.add_plot(name=name, array=data)
self.subplots.append((ind_chart, func))
# scale split regions
self.set_split_sizes()
return self.chart
def add_plot(
self,
@ -211,25 +200,28 @@ class LinkedSplitCharts(QtGui.QWidget):
If ``name`` == ``"main"`` the chart will be the the primary view.
"""
if self.chart is None and name != 'main':
raise RuntimeError(
"A main plot must be created first with `.plot_main()`")
# source of our custom interactions
cv = ChartView()
cv.linked_charts = self
# use "indicator axis" by default
xaxis = self.xaxis_ind if xaxis is None else xaxis
cpw = ChartPlotWidget(
linked_charts=self,
array=array,
parent=self.splitter,
axisItems={'bottom': xaxis, 'right': PriceAxis()},
# axisItems={'top': self.xaxis_ind, 'right': PriceAxis()},
viewBox=cv,
)
# this name will be used to register the primary
# graphics curve managed by the subchart
cpw.name = name
cv.linked_charts = self
cpw.plotItem.vb.linked_charts = self
cpw.setFrameStyle(
QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain
)
cpw.setFrameStyle(QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain)
cpw.getPlotItem().setContentsMargins(*CHART_MARGINS)
# self.splitter.addWidget(cpw)
@ -240,68 +232,20 @@ class LinkedSplitCharts(QtGui.QWidget):
if ohlc:
cpw.draw_ohlc(array)
else:
cpw.draw_curve(array, name)
cpw.draw_curve(array)
# add to cross-hair's known plots
self._ch.add_plot(cpw)
if name != "main":
# track by name
self.subplots[name] = cpw
# scale split regions
self.set_split_sizes()
return cpw
def update_from_quote(
self,
quote: dict
) -> List[pg.GraphicsObject]:
"""Update all linked chart graphics with a new quote
datum.
Return the modified graphics objects in a list.
"""
# TODO: eventually we'll want to update bid/ask labels and other
# data as subscribed by underlying UI consumers.
last = quote.get('last') or quote['close']
index, time, open, high, low, close, volume = self._array[-1]
# update ohlc (I guess we're enforcing this for now?)
# overwrite from quote
self._array[-1] = (
index,
time,
open,
max(high, last),
min(low, last),
last,
volume,
)
self.update_from_array(self._array)
def update_from_array(
self,
array: np.ndarray,
**kwargs,
) -> None:
"""Update all linked chart graphics with a new input array.
Return the modified graphics objects in a list.
"""
# update the ohlc sequence graphics chart
# we send a reference to the whole updated array
self.chart.update_from_array(array, **kwargs)
# TODO: the "data" here should really be a function
# and it should be managed and computed outside of this UI
graphics = []
for chart, func in self.subplots:
# process array in entirely every update
# TODO: change this for streaming
data = func(array)
graphic = chart.update_from_array(data, name=chart.name, **kwargs)
graphics.append(graphic)
return graphics
_min_points_to_show = 3
class ChartPlotWidget(pg.PlotWidget):
"""``GraphicsView`` subtype containing a single ``PlotItem``.
@ -323,7 +267,8 @@ class ChartPlotWidget(pg.PlotWidget):
def __init__(
self,
linked_charts,
# the data view we generate graphics from
array: np.ndarray,
**kwargs,
# parent=None,
# background='default',
@ -332,7 +277,8 @@ class ChartPlotWidget(pg.PlotWidget):
"""Configure chart display settings.
"""
super().__init__(**kwargs)
self.parent = linked_charts
self._array = array # readonly view of data
self._graphics = {} # registry of underlying graphics
# XXX: label setting doesn't seem to work?
# likely custom graphics need special handling
@ -341,9 +287,6 @@ class ChartPlotWidget(pg.PlotWidget):
# label.setText("Yo yoyo")
# label.setText("<span style='font-size: 12pt'>x=")
# to be filled in when graphics are rendered by name
self._graphics = {}
# show only right side axes
self.hideAxis('left')
self.showAxis('right')
@ -383,28 +326,30 @@ class ChartPlotWidget(pg.PlotWidget):
"""
l, r = self.view_range()
lbar = max(l, 0)
rbar = min(r, len(self.parent._array))
rbar = min(r, len(self._array))
return l, lbar, rbar, r
def draw_ohlc(
self,
data: np.ndarray,
# XXX: pretty sure this is dumb and we don't need an Enum
style: ChartType = ChartType.BAR,
) -> None:
style: pg.GraphicsObject = BarItems,
) -> pg.GraphicsObject:
"""Draw OHLC datums to chart.
"""
# remember it's an enum type..
graphics = style.value()
graphics = style()
# adds all bar/candle graphics objects for each data point in
# the np array buffer to be drawn on next render cycle
graphics.draw_from_data(data)
self._graphics['main'] = graphics
self.addItem(graphics)
self._graphics['ohlc_main'] = graphics
# set xrange limits
xlast = data[-1]['index']
# show last 50 points on startup
self.plotItem.vb.setXRange(xlast - 50, xlast + 50)
@ -413,15 +358,20 @@ class ChartPlotWidget(pg.PlotWidget):
def draw_curve(
self,
data: np.ndarray,
name: Optional[str] = None,
) -> None:
name: Optional[str] = 'line_main',
) -> pg.PlotDataItem:
# draw the indicator as a plain curve
curve = pg.PlotDataItem(data, antialias=True)
curve = pg.PlotDataItem(
data,
antialias=True,
# TODO: see how this handles with custom ohlcv bars graphics
clipToView=True,
)
self.addItem(curve)
# register overlay curve with name
if not self._graphics and name is None:
name = 'main'
name = 'line_main'
self._graphics[name] = curve
@ -439,16 +389,14 @@ class ChartPlotWidget(pg.PlotWidget):
def update_from_array(
self,
name: str,
array: np.ndarray,
name: str = 'main',
**kwargs,
) -> pg.GraphicsObject:
graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs)
# update view
self._set_yrange()
return graphics
def _set_yrange(
@ -470,7 +418,7 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: logic to check if end of bars in view
extra = view_len - _min_points_to_show
begin = 0 - extra
end = len(self.parent._array) - 1 + extra
end = len(self._array) - 1 + extra
log.trace(
f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
@ -480,7 +428,7 @@ class ChartPlotWidget(pg.PlotWidget):
self._set_xlimits(begin, end)
# TODO: this should be some kind of numpy view api
bars = self.parent._array[lbar:rbar]
bars = self._array[lbar:rbar]
if not len(bars):
# likely no data loaded yet
log.error(f"WTF bars_range = {lbar}:{rbar}")
@ -523,72 +471,6 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev)
class ChartView(pg.ViewBox):
"""Price chart view box with interaction behaviors you'd expect from
any interactive platform:
- zoom on mouse scroll that auto fits y-axis
- no vertical scrolling
- zoom to a "fixed point" on the y-axis
"""
def __init__(
self,
parent=None,
**kwargs,
):
super().__init__(parent=parent, **kwargs)
# disable vertical scrolling
self.setMouseEnabled(x=True, y=False)
self.linked_charts = None
def wheelEvent(self, ev, axis=None):
"""Override "center-point" location for scrolling.
This is an override of the ``ViewBox`` method simply changing
the center of the zoom to be the y-axis.
TODO: PR a method into ``pyqtgraph`` to make this configurable
"""
if axis in (0, 1):
mask = [False, False]
mask[axis] = self.state['mouseEnabled'][axis]
else:
mask = self.state['mouseEnabled'][:]
# don't zoom more then the min points setting
l, lbar, rbar, r = self.linked_charts.chart.bars_range()
vl = r - l
if ev.delta() > 0 and vl <= _min_points_to_show:
log.trace("Max zoom bruh...")
return
if ev.delta() < 0 and vl >= len(self.linked_charts._array):
log.trace("Min zoom bruh...")
return
# actual scaling factor
s = 1.015 ** (ev.delta() * -1 / 20) # self.state['wheelScaleFactor'])
s = [(None if m is False else s) for m in mask]
# center = pg.Point(
# fn.invertQTransform(self.childGroup.transform()).map(ev.pos())
# )
# XXX: scroll "around" the right most element in the view
furthest_right_coord = self.boundingRect().topRight()
center = pg.Point(
fn.invertQTransform(
self.childGroup.transform()
).map(furthest_right_coord)
)
self._resetTarget()
self.scaleBy(s, center)
ev.accept()
self.sigRangeChangedManually.emit(mask)
async def add_new_bars(delay_s, linked_charts):
"""Task which inserts new bars into the ohlc every ``delay_s`` seconds.
"""
@ -600,7 +482,8 @@ async def add_new_bars(delay_s, linked_charts):
# adjust delay to compensate for trio processing time
ad = delay_s - 0.002
ohlc = linked_charts._array
price_chart = linked_charts.chart
ohlc = price_chart._array
async def sleep():
"""Sleep until next time frames worth has passed from last bar.
@ -623,27 +506,54 @@ async def add_new_bars(delay_s, linked_charts):
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
(index, t, close) = ohlc[-1][['index', 'time', 'close']]
new = np.append(
ohlc,
np.array(
[(index + 1, t + delay_s, close, close,
close, close, 0)],
dtype=ohlc.dtype
),
)
ohlc = linked_charts._array = new
def incr_ohlc_array(array: np.ndarray):
(index, t, close) = array[-1][['index', 'time', 'close']]
new_array = np.append(
array,
np.array(
[(index + 1, t + delay_s, close, close,
close, close, 0)],
dtype=array.dtype
),
)
return new_array
# add new increment/bar
ohlc = price_chart._array = incr_ohlc_array(ohlc)
# TODO: generalize this increment logic
for name, chart in linked_charts.subplots.items():
data = chart._array
chart._array = np.append(
data,
np.array(data[-1], dtype=data.dtype)
)
# read value at "open" of bar
last_quote = ohlc[-1]
# we **don't** update the bar right now
# since the next quote that arrives should
# We **don't** update the bar right now
# since the next quote that arrives should in the
# tick streaming task
await sleep()
# if the last bar has not changed print a flat line and
# move to the next
# XXX: If the last bar has not changed print a flat line and
# move to the next. This is a "animation" choice that we may not
# keep.
if last_quote == ohlc[-1]:
log.debug("Printing flat line for {sym}")
linked_charts.update_from_array(ohlc)
price_chart.update_from_array('ohlc_main', ohlc)
# resize view
price_chart._set_yrange()
for name, chart in linked_charts.subplots.items():
chart.update_from_array('line_main', chart._array)
# resize view
chart._set_yrange()
async def _async_main(
@ -670,7 +580,9 @@ async def _async_main(
# remember, msgpack-numpy's ``from_buffer` returns read-only array
bars = np.array(bars[list(ohlc_dtype.names)])
linked_charts = chart_app.load_symbol(sym, bars)
# load in symbol's ohlc data
linked_charts, chart = chart_app.load_symbol(sym, bars)
# determine ohlc delay between bars
times = bars['time']
@ -678,69 +590,120 @@ async def _async_main(
# find expected time step between datums
delay = times[-1] - times[times != times[-1]][-1]
async def stream_to_chart(func):
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
f'fsp_{func.__name__}',
func,
brokername=brokermod.name,
sym=sym,
# loglevel='info',
)
stream = await portal.result()
# retreive named layout and style instructions
layout = await stream.__anext__()
async for quote in stream:
ticks = quote.get('ticks')
if ticks:
for tick in ticks:
print(tick)
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
chart_from_fsp,
linked_charts,
fsp.latency,
sym,
bars,
brokermod,
loglevel,
)
# graphics update loop
async with data.open_feed(
brokername,
[sym],
loglevel=loglevel,
) as (fquote, stream):
# start downstream processor
n.start_soon(stream_to_chart, fsp.broker_latency)
# wait for a first quote before we start any update tasks
quote = await stream.__anext__()
print(f'RECEIVED FIRST QUOTE {quote}')
# start graphics tasks after receiving first live quote
n.start_soon(add_new_bars, delay, linked_charts)
async for quote in stream:
ticks = quote.get('ticks')
if ticks:
async for quotes in stream:
for sym, quote in quotes.items():
ticks = quote.get('ticks', ())
for tick in ticks:
if tick.get('type') == 'trade':
linked_charts.update_from_quote(
{'last': tick['price']}
# TODO: eventually we'll want to update
# bid/ask labels and other data as
# subscribed by underlying UI consumers.
# last = quote.get('last') or quote['close']
last = tick['price']
# update ohlc (I guess we're enforcing this
# for now?) overwrite from quote
high, low = chart._array[-1][['high', 'low']]
chart._array[['high', 'low', 'close']][-1] = (
max(high, last),
min(low, last),
last,
)
chart.update_from_array(
'ohlc_main',
chart._array,
)
async def chart_from_fsp(
linked_charts,
fsp_func,
sym,
bars,
brokermod,
loglevel,
) -> None:
"""Start financial signal processing in subactor.
Pass target entrypoint and historical data.
"""
func_name = fsp_func.__name__
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
f'fsp.{func_name}', # name as title of sub-chart
# subactor entrypoint
fsp.pull_and_process,
bars=bars,
brokername=brokermod.name,
symbol=sym,
fsp_func_name=func_name,
# tractor config
loglevel=loglevel,
)
stream = await portal.result()
# receive processed historical data-array as first message
history = (await stream.__anext__())
# TODO: enforce type checking here
newbars = np.array(history)
chart = linked_charts.add_plot(
name=func_name,
array=newbars,
)
# update sub-plot graphics
async for value in stream:
chart._array[-1] = value
chart.update_from_array('line_main', chart._array)
chart._set_yrange()
def _main(
sym: str,
brokername: str,
**qtractor_kwargs,
tractor_kwargs,
) -> None:
"""Sync entry point to start a chart app.
"""
# Qt entry point
run_qtractor(
# func
_async_main,
# args,
(sym, brokername),
# kwargs passed through
qtractor_kwargs,
# main widget
ChartSpace,
# **qtractor_kwargs
func=_async_main,
args=(sym, brokername),
main_widget=ChartSpace,
tractor_kwargs=tractor_kwargs,
)

View File

@ -1,11 +1,10 @@
"""
Qt styling.
Qt UI styling.
"""
import pyqtgraph as pg
from PyQt5 import QtGui
# chart-wide font
_font = QtGui.QFont("Hack", 4)
_i3_rgba = QtGui.QColor.fromRgbF(*[0.14]*3 + [1])
@ -15,6 +14,10 @@ _i3_rgba = QtGui.QColor.fromRgbF(*[0.14]*3 + [1])
_xaxis_at = 'bottom'
# charting config
_min_points_to_show = 3
_tina_mode = False
@ -22,8 +25,5 @@ def enable_tina_mode() -> None:
"""Enable "tina mode" to make everything look "conventional"
like your pet hedgehog always wanted.
"""
_tina_mode = True
# white background (for tinas like our pal xb)
pg.setConfigOption('background', 'w')