diff --git a/piker/fsp.py b/piker/fsp.py new file mode 100644 index 00000000..fff90a48 --- /dev/null +++ b/piker/fsp.py @@ -0,0 +1,161 @@ +""" +Financial signal processing for the peeps. +""" +from typing import AsyncIterator, List + +import numpy as np + +from .log import get_logger +from . import data + +log = get_logger(__name__) + + +def rec2array( + rec: np.ndarray, + fields: List[str] = None +) -> np.ndarray: + """Convert record array to std array. + + Taken from: + https://github.com/scikit-hep/root_numpy/blob/master/root_numpy/_utils.py#L20 + """ + simplify = False + + if fields is None: + fields = rec.dtype.names + elif isinstance(fields, str): + fields = [fields] + simplify = True + + # Creates a copy and casts all data to the same type + arr = np.dstack([rec[field] for field in fields]) + + # Check for array-type fields. If none, then remove outer dimension. + # Only need to check first field since np.dstack will anyway raise an + # exception if the shapes don't match + # np.dstack will also fail if fields is an empty list + if not rec.dtype[fields[0]].shape: + arr = arr[0] + + if simplify: + # remove last dimension (will be of size 1) + arr = arr.reshape(arr.shape[:-1]) + + return arr + + +async def pull_and_process( + bars: np.ndarray, + brokername: str, + # symbols: List[str], + symbol: str, + fsp_func_name: str, +) -> AsyncIterator[dict]: + + # async def _yield_bars(): + # yield bars + + # hist_out: np.ndarray = None + + func = latency + + # Conduct a single iteration of fsp with historical bars input + # async for hist_out in func(_yield_bars(), bars): + # yield {symbol: hist_out} + + # open a data feed stream with requested broker + async with data.open_feed( + brokername, + [symbol], + ) as (fquote, stream): + + # TODO: load appropriate fsp with input args + + async def filter_by_sym(sym, stream): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes + + async for processed in func( + filter_by_sym(symbol, stream), + bars, + ): + print(f"{fsp_func_name}: {processed}") + yield processed + + +# TODO: things to figure the fuck out: +# - how to handle non-plottable values +# - composition of fsps / implicit chaining + +async def latency( + source: 'TickStream[Dict[str, float]]', + ohlcv: np.ndarray +) -> AsyncIterator[np.ndarray]: + """Compute High-Low midpoint value. + """ + # TODO: do we want to offer yielding this async + # before the rt data connection comes up? + + # deliver zeros for all prior history + yield np.zeros(len(ohlcv)) + + _last = None + async for quote in source: + fill_time = quote.get('rtTime_s') + if fill_time and fill_time != _last: + value = quote['brokerd_ts'] - fill_time + print(f"latency: {value}") + yield value + + _last = fill_time + # ticks = quote.get('ticks', ()) + # for tick in ticks: + # if tick.get('type') == 'trade': + + +async def last( + source: 'TickStream[Dict[str, float]]', + ohlcv: np.ndarray +) -> AsyncIterator[np.ndarray]: + """Compute High-Low midpoint value. + """ + # first_frame = (await source.__anext__()) + + # deliver historical processed data first + yield ohlcv['close'] + + async for quote in source: + yield quote['close'] + + +async def wma( + source, #: AsyncStream[np.ndarray], + ohlcv: np.ndarray, # price time-frame "aware" + lookback: np.ndarray, # price time-frame "aware" + weights: np.ndarray, +) -> AsyncIterator[np.ndarray]: # i like FinSigStream + """Weighted moving average. + + ``weights`` is a sequence of already scaled values. As an example + for the WMA often found in "techincal analysis": + ``weights = np.arange(1, N) * N*(N-1)/2``. + """ + length = len(weights) + _lookback = np.zeros(length - 1) + + ohlcv.from_tf('5m') + + # async for frame_len, frame in source: + async for frame in source: + wma = np.convolve( + ohlcv[-length:]['close'], + # np.concatenate((_lookback, frame)), + weights, + 'valid' + ) + # todo: handle case where frame_len < length - 1 + _lookback = frame[-(length-1):] + yield wma diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index af7f52b6..cc5f1515 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1,11 +1,10 @@ """ High level Qt chart widgets. """ -from typing import List, Optional, Tuple, Dict, Any +from typing import Optional, Tuple, Dict, Any import time from PyQt5 import QtCore, QtGui -from pyqtgraph import functions as fn import numpy as np import pyqtgraph as pg import tractor @@ -15,14 +14,15 @@ from ._axes import ( DynamicDateAxis, PriceAxis, ) -from ._graphics import CrossHair, ChartType -from ._style import _xaxis_at +from ._graphics import CrossHair, BarItems +from ._style import _xaxis_at, _min_points_to_show from ._source import Symbol from .. import brokers from .. import data from ..log import get_logger from ._exec import run_qtractor from ._source import ohlc_dtype +from ._interaction import ChartView from .. import fsp @@ -50,7 +50,7 @@ class ChartSpace(QtGui.QWidget): self.v_layout.addLayout(self.toolbar_layout) self.v_layout.addLayout(self.h_layout) - self._plot_cache = {} + self._chart_cache = {} def init_timeframes_ui(self): self.tf_layout = QtGui.QHBoxLayout() @@ -81,16 +81,19 @@ class ChartSpace(QtGui.QWidget): """ # XXX: let's see if this causes mem problems self.window.setWindowTitle(f'piker chart {symbol}') - self.chart = self._plot_cache.setdefault(symbol, LinkedSplitCharts()) + linkedcharts = self._chart_cache.setdefault( + symbol, + LinkedSplitCharts() + ) s = Symbol(key=symbol) # remove any existing plots if not self.h_layout.isEmpty(): - self.h_layout.removeWidget(self.chart) + self.h_layout.removeWidget(linkedcharts) - self.chart.plot(s, data) - self.h_layout.addWidget(self.chart) - return self.chart + main_chart = linkedcharts.plot_main(s, data) + self.h_layout.addWidget(linkedcharts) + return linkedcharts, main_chart # TODO: add signalling painter system # def add_signals(self): @@ -118,7 +121,7 @@ class LinkedSplitCharts(QtGui.QWidget): self._array: np.ndarray = None # main data source self._ch: CrossHair = None # crosshair graphics self.chart: ChartPlotWidget = None # main (ohlc) chart - self.subplots: List[ChartPlotWidget] = [] + self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} self.xaxis = DynamicDateAxis( orientation='bottom', @@ -148,29 +151,28 @@ class LinkedSplitCharts(QtGui.QWidget): """Set the proportion of space allocated for linked subcharts. """ major = 1 - prop - min_h_ind = int(self.height() * prop / len(self.subplots)) + min_h_ind = int((self.height() * prop) / len(self.subplots)) sizes = [int(self.height() * major)] sizes.extend([min_h_ind] * len(self.subplots)) self.splitter.setSizes(sizes) # , int(self.height()*0.2) - def plot( + def plot_main( self, symbol: Symbol, array: np.ndarray, ohlc: bool = True, - ) -> None: + ) -> 'ChartPlotWidget': """Start up and show main (price) chart and all linked subcharts. """ self.digits = symbol.digits() - # XXX: this will eventually be a view onto shared mem - # or some higher level type / API + # TODO: this should eventually be a view onto shared mem or some + # higher level type / API self._array = array # add crosshairs self._ch = CrossHair( parent=self, - # subplots=[plot for plot, d in self.subplots], digits=self.digits ) self.chart = self.add_plot( @@ -179,26 +181,13 @@ class LinkedSplitCharts(QtGui.QWidget): xaxis=self.xaxis, ohlc=True, ) + # add crosshair graphic self.chart.addItem(self._ch) + + # style? self.chart.setFrameStyle(QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain) - # TODO: this is where we would load an indicator chain - # XXX: note, if this isn't index aligned with - # the source data the chart will go haywire. - inds = [('open', lambda a: a['close'])] - - for name, func in inds: - - # compute historical subchart values from input array - data = func(array) - - # create sub-plot - ind_chart = self.add_plot(name=name, array=data) - - self.subplots.append((ind_chart, func)) - - # scale split regions - self.set_split_sizes() + return self.chart def add_plot( self, @@ -211,25 +200,28 @@ class LinkedSplitCharts(QtGui.QWidget): If ``name`` == ``"main"`` the chart will be the the primary view. """ + if self.chart is None and name != 'main': + raise RuntimeError( + "A main plot must be created first with `.plot_main()`") + + # source of our custom interactions cv = ChartView() + cv.linked_charts = self + # use "indicator axis" by default xaxis = self.xaxis_ind if xaxis is None else xaxis cpw = ChartPlotWidget( - linked_charts=self, + array=array, parent=self.splitter, axisItems={'bottom': xaxis, 'right': PriceAxis()}, - # axisItems={'top': self.xaxis_ind, 'right': PriceAxis()}, viewBox=cv, ) # this name will be used to register the primary # graphics curve managed by the subchart cpw.name = name - cv.linked_charts = self cpw.plotItem.vb.linked_charts = self - cpw.setFrameStyle( - QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain - ) + cpw.setFrameStyle(QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain) cpw.getPlotItem().setContentsMargins(*CHART_MARGINS) # self.splitter.addWidget(cpw) @@ -240,68 +232,20 @@ class LinkedSplitCharts(QtGui.QWidget): if ohlc: cpw.draw_ohlc(array) else: - cpw.draw_curve(array, name) + cpw.draw_curve(array) # add to cross-hair's known plots self._ch.add_plot(cpw) + if name != "main": + # track by name + self.subplots[name] = cpw + + # scale split regions + self.set_split_sizes() + return cpw - def update_from_quote( - self, - quote: dict - ) -> List[pg.GraphicsObject]: - """Update all linked chart graphics with a new quote - datum. - - Return the modified graphics objects in a list. - """ - # TODO: eventually we'll want to update bid/ask labels and other - # data as subscribed by underlying UI consumers. - last = quote.get('last') or quote['close'] - index, time, open, high, low, close, volume = self._array[-1] - - # update ohlc (I guess we're enforcing this for now?) - # overwrite from quote - self._array[-1] = ( - index, - time, - open, - max(high, last), - min(low, last), - last, - volume, - ) - self.update_from_array(self._array) - - def update_from_array( - self, - array: np.ndarray, - **kwargs, - ) -> None: - """Update all linked chart graphics with a new input array. - - Return the modified graphics objects in a list. - """ - # update the ohlc sequence graphics chart - # we send a reference to the whole updated array - self.chart.update_from_array(array, **kwargs) - - # TODO: the "data" here should really be a function - # and it should be managed and computed outside of this UI - graphics = [] - for chart, func in self.subplots: - # process array in entirely every update - # TODO: change this for streaming - data = func(array) - graphic = chart.update_from_array(data, name=chart.name, **kwargs) - graphics.append(graphic) - - return graphics - - -_min_points_to_show = 3 - class ChartPlotWidget(pg.PlotWidget): """``GraphicsView`` subtype containing a single ``PlotItem``. @@ -323,7 +267,8 @@ class ChartPlotWidget(pg.PlotWidget): def __init__( self, - linked_charts, + # the data view we generate graphics from + array: np.ndarray, **kwargs, # parent=None, # background='default', @@ -332,7 +277,8 @@ class ChartPlotWidget(pg.PlotWidget): """Configure chart display settings. """ super().__init__(**kwargs) - self.parent = linked_charts + self._array = array # readonly view of data + self._graphics = {} # registry of underlying graphics # XXX: label setting doesn't seem to work? # likely custom graphics need special handling @@ -341,9 +287,6 @@ class ChartPlotWidget(pg.PlotWidget): # label.setText("Yo yoyo") # label.setText("x=") - # to be filled in when graphics are rendered by name - self._graphics = {} - # show only right side axes self.hideAxis('left') self.showAxis('right') @@ -383,28 +326,30 @@ class ChartPlotWidget(pg.PlotWidget): """ l, r = self.view_range() lbar = max(l, 0) - rbar = min(r, len(self.parent._array)) + rbar = min(r, len(self._array)) return l, lbar, rbar, r def draw_ohlc( self, data: np.ndarray, # XXX: pretty sure this is dumb and we don't need an Enum - style: ChartType = ChartType.BAR, - ) -> None: + style: pg.GraphicsObject = BarItems, + ) -> pg.GraphicsObject: """Draw OHLC datums to chart. """ # remember it's an enum type.. - graphics = style.value() + graphics = style() # adds all bar/candle graphics objects for each data point in # the np array buffer to be drawn on next render cycle graphics.draw_from_data(data) - self._graphics['main'] = graphics self.addItem(graphics) + self._graphics['ohlc_main'] = graphics + # set xrange limits xlast = data[-1]['index'] + # show last 50 points on startup self.plotItem.vb.setXRange(xlast - 50, xlast + 50) @@ -413,15 +358,20 @@ class ChartPlotWidget(pg.PlotWidget): def draw_curve( self, data: np.ndarray, - name: Optional[str] = None, - ) -> None: + name: Optional[str] = 'line_main', + ) -> pg.PlotDataItem: # draw the indicator as a plain curve - curve = pg.PlotDataItem(data, antialias=True) + curve = pg.PlotDataItem( + data, + antialias=True, + # TODO: see how this handles with custom ohlcv bars graphics + clipToView=True, + ) self.addItem(curve) # register overlay curve with name if not self._graphics and name is None: - name = 'main' + name = 'line_main' self._graphics[name] = curve @@ -439,16 +389,14 @@ class ChartPlotWidget(pg.PlotWidget): def update_from_array( self, + name: str, array: np.ndarray, - name: str = 'main', **kwargs, ) -> pg.GraphicsObject: + graphics = self._graphics[name] graphics.update_from_array(array, **kwargs) - # update view - self._set_yrange() - return graphics def _set_yrange( @@ -470,7 +418,7 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: logic to check if end of bars in view extra = view_len - _min_points_to_show begin = 0 - extra - end = len(self.parent._array) - 1 + extra + end = len(self._array) - 1 + extra log.trace( f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n" @@ -480,7 +428,7 @@ class ChartPlotWidget(pg.PlotWidget): self._set_xlimits(begin, end) # TODO: this should be some kind of numpy view api - bars = self.parent._array[lbar:rbar] + bars = self._array[lbar:rbar] if not len(bars): # likely no data loaded yet log.error(f"WTF bars_range = {lbar}:{rbar}") @@ -523,72 +471,6 @@ class ChartPlotWidget(pg.PlotWidget): self.scene().leaveEvent(ev) -class ChartView(pg.ViewBox): - """Price chart view box with interaction behaviors you'd expect from - any interactive platform: - - - zoom on mouse scroll that auto fits y-axis - - no vertical scrolling - - zoom to a "fixed point" on the y-axis - """ - def __init__( - self, - parent=None, - **kwargs, - ): - super().__init__(parent=parent, **kwargs) - # disable vertical scrolling - self.setMouseEnabled(x=True, y=False) - self.linked_charts = None - - def wheelEvent(self, ev, axis=None): - """Override "center-point" location for scrolling. - - This is an override of the ``ViewBox`` method simply changing - the center of the zoom to be the y-axis. - - TODO: PR a method into ``pyqtgraph`` to make this configurable - """ - - if axis in (0, 1): - mask = [False, False] - mask[axis] = self.state['mouseEnabled'][axis] - else: - mask = self.state['mouseEnabled'][:] - - # don't zoom more then the min points setting - l, lbar, rbar, r = self.linked_charts.chart.bars_range() - vl = r - l - - if ev.delta() > 0 and vl <= _min_points_to_show: - log.trace("Max zoom bruh...") - return - if ev.delta() < 0 and vl >= len(self.linked_charts._array): - log.trace("Min zoom bruh...") - return - - # actual scaling factor - s = 1.015 ** (ev.delta() * -1 / 20) # self.state['wheelScaleFactor']) - s = [(None if m is False else s) for m in mask] - - # center = pg.Point( - # fn.invertQTransform(self.childGroup.transform()).map(ev.pos()) - # ) - - # XXX: scroll "around" the right most element in the view - furthest_right_coord = self.boundingRect().topRight() - center = pg.Point( - fn.invertQTransform( - self.childGroup.transform() - ).map(furthest_right_coord) - ) - - self._resetTarget() - self.scaleBy(s, center) - ev.accept() - self.sigRangeChangedManually.emit(mask) - - async def add_new_bars(delay_s, linked_charts): """Task which inserts new bars into the ohlc every ``delay_s`` seconds. """ @@ -600,7 +482,8 @@ async def add_new_bars(delay_s, linked_charts): # adjust delay to compensate for trio processing time ad = delay_s - 0.002 - ohlc = linked_charts._array + price_chart = linked_charts.chart + ohlc = price_chart._array async def sleep(): """Sleep until next time frames worth has passed from last bar. @@ -623,27 +506,54 @@ async def add_new_bars(delay_s, linked_charts): # - update last open price correctly instead # of copying it from last bar's close # - 5 sec bar lookback-autocorrection like tws does? - (index, t, close) = ohlc[-1][['index', 'time', 'close']] - new = np.append( - ohlc, - np.array( - [(index + 1, t + delay_s, close, close, - close, close, 0)], - dtype=ohlc.dtype - ), - ) - ohlc = linked_charts._array = new + + def incr_ohlc_array(array: np.ndarray): + (index, t, close) = array[-1][['index', 'time', 'close']] + new_array = np.append( + array, + np.array( + [(index + 1, t + delay_s, close, close, + close, close, 0)], + dtype=array.dtype + ), + ) + return new_array + + # add new increment/bar + ohlc = price_chart._array = incr_ohlc_array(ohlc) + + # TODO: generalize this increment logic + for name, chart in linked_charts.subplots.items(): + data = chart._array + chart._array = np.append( + data, + np.array(data[-1], dtype=data.dtype) + ) + + # read value at "open" of bar last_quote = ohlc[-1] - # we **don't** update the bar right now - # since the next quote that arrives should + # We **don't** update the bar right now + # since the next quote that arrives should in the + # tick streaming task await sleep() - # if the last bar has not changed print a flat line and - # move to the next + # XXX: If the last bar has not changed print a flat line and + # move to the next. This is a "animation" choice that we may not + # keep. if last_quote == ohlc[-1]: log.debug("Printing flat line for {sym}") - linked_charts.update_from_array(ohlc) + price_chart.update_from_array('ohlc_main', ohlc) + + # resize view + price_chart._set_yrange() + + + for name, chart in linked_charts.subplots.items(): + chart.update_from_array('line_main', chart._array) + + # resize view + chart._set_yrange() async def _async_main( @@ -670,7 +580,9 @@ async def _async_main( # remember, msgpack-numpy's ``from_buffer` returns read-only array bars = np.array(bars[list(ohlc_dtype.names)]) - linked_charts = chart_app.load_symbol(sym, bars) + + # load in symbol's ohlc data + linked_charts, chart = chart_app.load_symbol(sym, bars) # determine ohlc delay between bars times = bars['time'] @@ -678,69 +590,120 @@ async def _async_main( # find expected time step between datums delay = times[-1] - times[times != times[-1]][-1] - async def stream_to_chart(func): - - async with tractor.open_nursery() as n: - portal = await n.run_in_actor( - f'fsp_{func.__name__}', - func, - brokername=brokermod.name, - sym=sym, - # loglevel='info', - ) - stream = await portal.result() - - # retreive named layout and style instructions - layout = await stream.__anext__() - - async for quote in stream: - ticks = quote.get('ticks') - if ticks: - for tick in ticks: - print(tick) - async with trio.open_nursery() as n: + # load initial fsp chain (otherwise known as "indicators") + n.start_soon( + chart_from_fsp, + linked_charts, + fsp.latency, + sym, + bars, + brokermod, + loglevel, + ) + + # graphics update loop + async with data.open_feed( brokername, [sym], loglevel=loglevel, ) as (fquote, stream): - # start downstream processor - n.start_soon(stream_to_chart, fsp.broker_latency) # wait for a first quote before we start any update tasks quote = await stream.__anext__() + print(f'RECEIVED FIRST QUOTE {quote}') # start graphics tasks after receiving first live quote n.start_soon(add_new_bars, delay, linked_charts) - async for quote in stream: - ticks = quote.get('ticks') - if ticks: + async for quotes in stream: + for sym, quote in quotes.items(): + ticks = quote.get('ticks', ()) for tick in ticks: if tick.get('type') == 'trade': - linked_charts.update_from_quote( - {'last': tick['price']} + + # TODO: eventually we'll want to update + # bid/ask labels and other data as + # subscribed by underlying UI consumers. + # last = quote.get('last') or quote['close'] + last = tick['price'] + + # update ohlc (I guess we're enforcing this + # for now?) overwrite from quote + high, low = chart._array[-1][['high', 'low']] + chart._array[['high', 'low', 'close']][-1] = ( + max(high, last), + min(low, last), + last, ) + chart.update_from_array( + 'ohlc_main', + chart._array, + ) + + +async def chart_from_fsp( + linked_charts, + fsp_func, + sym, + bars, + brokermod, + loglevel, +) -> None: + """Start financial signal processing in subactor. + + Pass target entrypoint and historical data. + """ + func_name = fsp_func.__name__ + + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + f'fsp.{func_name}', # name as title of sub-chart + + # subactor entrypoint + fsp.pull_and_process, + bars=bars, + brokername=brokermod.name, + symbol=sym, + fsp_func_name=func_name, + + # tractor config + loglevel=loglevel, + ) + + stream = await portal.result() + + # receive processed historical data-array as first message + history = (await stream.__anext__()) + + # TODO: enforce type checking here + newbars = np.array(history) + + chart = linked_charts.add_plot( + name=func_name, + array=newbars, + ) + + # update sub-plot graphics + async for value in stream: + chart._array[-1] = value + chart.update_from_array('line_main', chart._array) + chart._set_yrange() def _main( sym: str, brokername: str, - **qtractor_kwargs, + tractor_kwargs, ) -> None: """Sync entry point to start a chart app. """ # Qt entry point run_qtractor( - # func - _async_main, - # args, - (sym, brokername), - # kwargs passed through - qtractor_kwargs, - # main widget - ChartSpace, - # **qtractor_kwargs + func=_async_main, + args=(sym, brokername), + main_widget=ChartSpace, + tractor_kwargs=tractor_kwargs, ) diff --git a/piker/ui/_style.py b/piker/ui/_style.py index a5e3b64e..ea497a73 100644 --- a/piker/ui/_style.py +++ b/piker/ui/_style.py @@ -1,11 +1,10 @@ """ -Qt styling. +Qt UI styling. """ import pyqtgraph as pg from PyQt5 import QtGui - # chart-wide font _font = QtGui.QFont("Hack", 4) _i3_rgba = QtGui.QColor.fromRgbF(*[0.14]*3 + [1]) @@ -15,6 +14,10 @@ _i3_rgba = QtGui.QColor.fromRgbF(*[0.14]*3 + [1]) _xaxis_at = 'bottom' +# charting config +_min_points_to_show = 3 + + _tina_mode = False @@ -22,8 +25,5 @@ def enable_tina_mode() -> None: """Enable "tina mode" to make everything look "conventional" like your pet hedgehog always wanted. """ - - _tina_mode = True - # white background (for tinas like our pal xb) pg.setConfigOption('background', 'w')