From c08f192f77818cdd801f0799cd1ff462031fca32 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Apr 2021 09:03:28 -0400 Subject: [PATCH] Move charting to new tractor stream api This required a fsp task spawning logic rework which ended up being cleaner just spawning tasks in a loop sequentially instead of trying a 2-phase spawn-then-initialize approach. This also includes changes from the symbol search branch hacked in. Mostly it includes isolating the main chart startup-sequence to a function that can be run in a new task every time a new symbol is requested by the selector/searcher. The actual search functionality obviously isn't in here yet but minor changes are included as part of pulling out the `tractor` stream api patch from the symbol search dev branch. --- piker/ui/_chart.py | 641 ++++++++++++++++++++++++--------------------- 1 file changed, 341 insertions(+), 300 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index f9aefc34..0650663c 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -19,6 +19,7 @@ High level Qt chart widgets. """ from typing import Tuple, Dict, Any, Optional, Callable +from types import ModuleType from functools import partial from PyQt5 import QtCore, QtGui @@ -26,6 +27,7 @@ import numpy as np import pyqtgraph as pg import tractor import trio +from trio_typing import TaskStatus from ._axes import ( DynamicDateAxis, @@ -53,6 +55,7 @@ from ._style import ( _bars_to_left_in_follow_mode, ) from ..data._source import Symbol +from ..data._sharedmem import ShmArray from .. import brokers from .. import data from ..data import maybe_open_shm_array @@ -128,7 +131,8 @@ class ChartSpace(QtGui.QWidget): # self.toolbar_layout.addWidget(self.strategy_box) def load_symbol( self, - symbol: Symbol, + brokername: str, + symbol_key: str, data: np.ndarray, ohlc: bool = True, ) -> None: @@ -136,12 +140,6 @@ class ChartSpace(QtGui.QWidget): Expects a ``numpy`` structured array containing all the ohlcv fields. """ - # XXX: let's see if this causes mem problems - self.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' - f'tick:{symbol.tick_size}' - ) - # TODO: symbol search # # of course this doesn't work :eyeroll: # h = _font.boundingRect('Ag').height() @@ -151,19 +149,18 @@ class ChartSpace(QtGui.QWidget): # self.symbol_label.setText(f'/`{symbol}`') linkedcharts = self._chart_cache.setdefault( - symbol.key, - LinkedSplitCharts(symbol) + symbol_key, + LinkedSplitCharts(self) ) + self.linkedcharts = linkedcharts # remove any existing plots if not self.v_layout.isEmpty(): self.v_layout.removeWidget(linkedcharts) - main_chart = linkedcharts.plot_ohlc_main(symbol, data) - self.v_layout.addWidget(linkedcharts) - return linkedcharts, main_chart + return linkedcharts # TODO: add signalling painter system # def add_signals(self): @@ -187,13 +184,14 @@ class LinkedSplitCharts(QtGui.QWidget): def __init__( self, - symbol: Symbol, + chart_space: ChartSpace, ) -> None: super().__init__() self.signals_visible: bool = False self._cursor: Cursor = None # crosshair graphics self.chart: ChartPlotWidget = None # main (ohlc) chart self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} + self.chart_space = chart_space self.xaxis = DynamicDateAxis( orientation='bottom', @@ -215,7 +213,7 @@ class LinkedSplitCharts(QtGui.QWidget): self.layout.addWidget(self.splitter) # state tracker? - self._symbol: Symbol = symbol + self._symbol: Symbol = None @property def symbol(self) -> Symbol: @@ -939,135 +937,6 @@ async def test_bed( # rlabel.setPos(vb_right - 2*w, d_coords.y()) -async def _async_main( - # implicit required argument provided by ``qtractor_run()`` - widgets: Dict[str, Any], - - sym: str, - brokername: str, - loglevel: str, - -) -> None: - """Main Qt-trio routine invoked by the Qt loop with - the widgets ``dict``. - """ - chart_app = widgets['main'] - - # attempt to configure DPI aware font size - _font.configure_to_dpi(current_screen()) - - # chart_app.init_search() - - # historical data fetch - brokermod = brokers.get_brokermod(brokername) - - async with data.open_feed( - brokername, - [sym], - loglevel=loglevel, - ) as feed: - - ohlcv = feed.shm - bars = ohlcv.array - symbol = feed.symbols[sym] - - # load in symbol's ohlc data - linked_charts, chart = chart_app.load_symbol(symbol, bars) - - # plot historical vwap if available - wap_in_history = False - - if brokermod._show_wap_in_history: - - if 'bar_wap' in bars.dtype.fields: - wap_in_history = True - chart.draw_curve( - name='bar_wap', - data=bars, - add_label=False, - ) - - # size view to data once at outset - chart._set_yrange() - - # TODO: a data view api that makes this less shit - chart._shm = ohlcv - - # TODO: eventually we'll support some kind of n-compose syntax - fsp_conf = { - 'rsi': { - 'period': 14, - 'chart_kwargs': { - 'static_yrange': (0, 100), - }, - }, - - } - - # make sure that the instrument supports volume history - # (sometimes this is not the case for some commodities and - # derivatives) - volm = ohlcv.array['volume'] - if ( - np.all(np.isin(volm, -1)) or - np.all(np.isnan(volm)) - ): - log.warning( - f"{sym} does not seem to have volume info," - " dropping volume signals") - else: - fsp_conf.update({ - 'vwap': { - 'overlay': True, - 'anchor': 'session', - }, - }) - - async with trio.open_nursery() as n: - - # load initial fsp chain (otherwise known as "indicators") - n.start_soon( - spawn_fsps, - linked_charts, - fsp_conf, - sym, - ohlcv, - brokermod, - loglevel, - ) - - # start graphics update loop(s)after receiving first live quote - n.start_soon( - chart_from_quotes, - chart, - feed.stream, - ohlcv, - wap_in_history, - ) - - # wait for a first quote before we start any update tasks - quote = await feed.receive() - - log.info(f'Received first quote {quote}') - - n.start_soon( - check_for_new_bars, - feed, - # delay, - ohlcv, - linked_charts - ) - - # interactive testing - # n.start_soon( - # test_bed, - # ohlcv, - # chart, - # linked_charts, - # ) - await start_order_mode(chart, symbol, brokername) - - async def chart_from_quotes( chart: ChartPlotWidget, stream, @@ -1245,7 +1114,7 @@ async def spawn_fsps( """ # spawns sub-processes which execute cpu bound FSP code - async with tractor.open_nursery() as n: + async with tractor.open_nursery(loglevel=loglevel) as n: # spawns local task that consume and chart data streams from # sub-procs @@ -1280,66 +1149,36 @@ async def spawn_fsps( conf['shm'] = shm - # spawn closure, can probably define elsewhere - async def spawn_fsp_daemon( - fsp_name: str, - display_name: str, - conf: dict, - ): - """Start an fsp subactor async. + portal = await n.start_actor( + enable_modules=['piker.fsp'], + name=display_name, + ) - """ - # print(f'FSP NAME: {fsp_name}') - portal = await n.run_in_actor( - - # subactor entrypoint - fsp.cascade, - - # name as title of sub-chart - name=display_name, - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=conf['shm'].token, - symbol=sym, - fsp_func_name=fsp_name, - - # tractor config - loglevel=loglevel, - ) - - stream = await portal.result() - - # receive last index for processed historical - # data-array as first msg - _ = await stream.receive() - - conf['stream'] = stream - conf['portal'] = portal - - # new local task + # init async ln.start_soon( - spawn_fsp_daemon, + run_fsp, + portal, + linked_charts, + brokermod, + sym, + src_shm, fsp_func_name, display_name, conf, ) - # blocks here until all daemons up - - # start and block on update loops - async with trio.open_nursery() as ln: - for fsp_func_name, conf in fsps.items(): - ln.start_soon( - update_signals, - linked_charts, - fsp_func_name, - conf, - ) + # blocks here until all fsp actors complete -async def update_signals( +async def run_fsp( + + portal: tractor._portal.Portal, linked_charts: LinkedSplitCharts, + brokermod: ModuleType, + sym: str, + src_shm: ShmArray, fsp_func_name: str, + display_name: str, conf: Dict[str, Any], ) -> None: @@ -1348,96 +1187,117 @@ async def update_signals( This is called once for each entry in the fsp config map. """ - shm = conf['shm'] + async with portal.open_stream_from( - if conf.get('overlay'): - chart = linked_charts.chart - chart.draw_curve( - name='vwap', - data=shm.array, - overlay=True, - ) - last_val_sticky = None + # subactor entrypoint + fsp.cascade, - else: + # name as title of sub-chart + brokername=brokermod.name, + src_shm_token=src_shm.token, + dst_shm_token=conf['shm'].token, + symbol=sym, + fsp_func_name=fsp_func_name, - chart = linked_charts.add_plot( - name=fsp_func_name, - array=shm.array, + ) as stream: - # curve by default - ohlc=False, + # receive last index for processed historical + # data-array as first msg + _ = await stream.receive() - # settings passed down to ``ChartPlotWidget`` - **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), - ) + conf['stream'] = stream + conf['portal'] = portal - # display contents labels asap - chart.update_contents_labels( - len(shm.array) - 1, - # fsp_func_name - ) + shm = conf['shm'] - # read last value - array = shm.array - value = array[fsp_func_name][-1] + if conf.get('overlay'): + chart = linked_charts.chart + chart.draw_curve( + name='vwap', + data=shm.array, + overlay=True, + ) + last_val_sticky = None - last_val_sticky = chart._ysticks[chart.name] - last_val_sticky.update_from_data(-1, value) + else: - chart.update_curve_from_array(fsp_func_name, array) + chart = linked_charts.add_plot( + name=fsp_func_name, + array=shm.array, - chart._shm = shm + # curve by default + ohlc=False, - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) + # settings passed down to ``ChartPlotWidget`` + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), + ) - if fsp_func_name == 'rsi': - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') + # display contents labels asap + chart.update_contents_labels( + len(shm.array) - 1, + # fsp_func_name + ) - chart._set_yrange() + # read last value + array = shm.array + value = array[fsp_func_name][-1] - stream = conf['stream'] - - # update chart graphics - async for value in stream: - - # TODO: provide a read sync mechanism to avoid this polling. - # the underlying issue is that a backfill and subsequent shm - # array first/last index update could result in an empty array - # read here since the stream is never torn down on the - # re-compute steps. - read_tries = 2 - while read_tries > 0: - - try: - # read last - array = shm.array - value = array[-1][fsp_func_name] - break - - except IndexError: - read_tries -= 1 - continue - - if last_val_sticky: + last_val_sticky = chart._ysticks[chart.name] last_val_sticky.update_from_data(-1, value) - # update graphics - chart.update_curve_from_array(fsp_func_name, array) + chart.update_curve_from_array(fsp_func_name, array) + + chart._shm = shm + + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) + + if fsp_func_name == 'rsi': + # add moveable over-[sold/bought] lines + # and labels only for the 70/30 lines + level_line(chart, 20) + level_line(chart, 30, orient_v='top') + level_line(chart, 70, orient_v='bottom') + level_line(chart, 80, orient_v='top') + + chart._set_yrange() + + stream = conf['stream'] + + # update chart graphics + async for value in stream: + + # TODO: provide a read sync mechanism to avoid this polling. + # the underlying issue is that a backfill and subsequent shm + # array first/last index update could result in an empty array + # read here since the stream is never torn down on the + # re-compute steps. + read_tries = 2 + while read_tries > 0: + + try: + # read last + array = shm.array + value = array[-1][fsp_func_name] + break + + except IndexError: + read_tries -= 1 + continue + + if last_val_sticky: + last_val_sticky.update_from_data(-1, value) + + # update graphics + chart.update_curve_from_array(fsp_func_name, array) async def check_for_new_bars(feed, ohlcv, linked_charts): @@ -1453,45 +1313,226 @@ async def check_for_new_bars(feed, ohlcv, linked_charts): price_chart = linked_charts.chart price_chart.default_view() - async for index in await feed.index_stream(): + async with feed.index_stream() as stream: + async for index in stream: - # update chart historical bars graphics by incrementing - # a time step and drawing the history and new bar + # update chart historical bars graphics by incrementing + # a time step and drawing the history and new bar - # When appending a new bar, in the time between the insert - # from the writing process and the Qt render call, here, - # the index of the shm buffer may be incremented and the - # (render) call here might read the new flat bar appended - # to the buffer (since -1 index read). In that case H==L and the - # body will be set as None (not drawn) on what this render call - # *thinks* is the curent bar (even though it's reading data from - # the newly inserted flat bar. - # - # HACK: We need to therefore write only the history (not the - # current bar) and then either write the current bar manually - # or place a cursor for visual cue of the current time step. + # When appending a new bar, in the time between the insert + # from the writing process and the Qt render call, here, + # the index of the shm buffer may be incremented and the + # (render) call here might read the new flat bar appended + # to the buffer (since -1 index read). In that case H==L and the + # body will be set as None (not drawn) on what this render call + # *thinks* is the curent bar (even though it's reading data from + # the newly inserted flat bar. + # + # HACK: We need to therefore write only the history (not the + # current bar) and then either write the current bar manually + # or place a cursor for visual cue of the current time step. - # XXX: this puts a flat bar on the current time step - # TODO: if we eventually have an x-axis time-step "cursor" - # we can get rid of this since it is extra overhead. - price_chart.update_ohlc_from_array( - price_chart.name, - ohlcv.array, - just_history=False, - ) - - for name in price_chart._overlays: - - price_chart.update_curve_from_array( - name, - price_chart._arrays[name] + # XXX: this puts a flat bar on the current time step + # TODO: if we eventually have an x-axis time-step "cursor" + # we can get rid of this since it is extra overhead. + price_chart.update_ohlc_from_array( + price_chart.name, + ohlcv.array, + just_history=False, ) - for name, chart in linked_charts.subplots.items(): - chart.update_curve_from_array(chart.name, chart._shm.array) + for name in price_chart._overlays: - # shift the view if in follow mode - price_chart.increment_view() + price_chart.update_curve_from_array( + name, + price_chart._arrays[name] + ) + + for name, chart in linked_charts.subplots.items(): + chart.update_curve_from_array(chart.name, chart._shm.array) + + # shift the view if in follow mode + price_chart.increment_view() + + +async def chart_symbol( + chart_app: ChartSpace, + brokername: str, + sym: str, + loglevel: str, + task_status: TaskStatus[Symbol] = trio.TASK_STATUS_IGNORED, +) -> None: + """Spawn a real-time chart widget for this symbol and app session. + + These widgets can remain up but hidden so that multiple symbols + can be viewed and switched between extremely fast. + + """ + # historical data fetch + brokermod = brokers.get_brokermod(brokername) + + async with data.open_feed( + brokername, + [sym], + loglevel=loglevel, + ) as feed: + + ohlcv: ShmArray = feed.shm + bars = ohlcv.array + symbol = feed.symbols[sym] + + task_status.started(symbol) + + # load in symbol's ohlc data + chart_app.window.setWindowTitle( + f'{symbol.key}@{symbol.brokers} ' + f'tick:{symbol.tick_size}' + ) + + # await tractor.breakpoint() + linked_charts = chart_app.linkedcharts + linked_charts._symbol = symbol + chart = linked_charts.plot_ohlc_main(symbol, bars) + + chart.setFocus() + + # plot historical vwap if available + wap_in_history = False + + if brokermod._show_wap_in_history: + + if 'bar_wap' in bars.dtype.fields: + wap_in_history = True + chart.draw_curve( + name='bar_wap', + data=bars, + add_label=False, + ) + + # size view to data once at outset + chart._set_yrange() + + # TODO: a data view api that makes this less shit + chart._shm = ohlcv + + # TODO: eventually we'll support some kind of n-compose syntax + fsp_conf = { + 'rsi': { + 'period': 14, + 'chart_kwargs': { + 'static_yrange': (0, 100), + }, + }, + + } + + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + volm = ohlcv.array['volume'] + if ( + np.all(np.isin(volm, -1)) or + np.all(np.isnan(volm)) + ): + log.warning( + f"{sym} does not seem to have volume info," + " dropping volume signals") + else: + fsp_conf.update({ + 'vwap': { + 'overlay': True, + 'anchor': 'session', + }, + }) + + async with trio.open_nursery() as n: + + # load initial fsp chain (otherwise known as "indicators") + n.start_soon( + spawn_fsps, + linked_charts, + fsp_conf, + sym, + ohlcv, + brokermod, + loglevel, + ) + + # start graphics update loop(s)after receiving first live quote + n.start_soon( + chart_from_quotes, + chart, + feed.stream, + ohlcv, + wap_in_history, + ) + + # wait for a first quote before we start any update tasks + quote = await feed.receive() + + log.info(f'Received first quote {quote}') + + n.start_soon( + check_for_new_bars, + feed, + # delay, + ohlcv, + linked_charts + ) + + # interactive testing + # n.start_soon( + # test_bed, + # ohlcv, + # chart, + # linked_charts, + # ) + + await start_order_mode(chart, symbol, brokername) + + +async def _async_main( + # implicit required argument provided by ``qtractor_run()`` + widgets: Dict[str, Any], + + symbol_key: str, + brokername: str, + loglevel: str, + +) -> None: + """ + Main Qt-trio routine invoked by the Qt loop with the widgets ``dict``. + + Provision the "main" widget with initial symbol data and root nursery. + + """ + chart_app = widgets['main'] + + # attempt to configure DPI aware font size + _font.configure_to_dpi(current_screen()) + + async with trio.open_nursery() as root_n: + + # set root nursery for spawning other charts/feeds + # that run cached in the bg + chart_app._root_n = root_n + + chart_app.load_symbol(brokername, symbol_key, loglevel) + + symbol = await root_n.start( + chart_symbol, + chart_app, + brokername, + symbol_key, + loglevel, + ) + + chart_app.window.setWindowTitle( + f'{symbol.key}@{symbol.brokers} ' + f'tick:{symbol.tick_size}' + ) + + await trio.sleep_forever() def _main(