From daa429f7ca0f9ac438a23d2f1c5cda10b0cb7e89 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Nov 2020 10:39:30 -0500 Subject: [PATCH 01/28] Put fsp plotting into a couple tasks, startup speedups. Break the chart update code for fsps into a new task (add a nursery) in new `spawn_fsps` (was `chart_from_fsps`) that async requests actor spawning and initial historical data (all CPU bound work). For multiple fsp subcharts this allows processing initial output in parallel (multi-core). We might want to wrap this in a "feed" like api eventually. Basically the fsp startup sequence is now: - start all requested fsp actors in an async loop and wait for historical data to arrive - loop through them all again to start update tasks which do chart graphics rendering Add separate x-axis objects for each new subchart (required by pyqtgraph); still need to fix hiding unnecessary ones. Add a `ChartPlotWidget._arrays: dict` for holding overlay data distinct from ohlc. Drop the sizing yrange to label heights for now since it's pretty much all gone to hell since adding L1 labels. Fix y-stickies to look up correct overly arrays. --- piker/ui/_chart.py | 357 +++++++++++++++++++++++++++++---------------- 1 file changed, 231 insertions(+), 126 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 0caf0d17..edb9d6cd 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -184,11 +184,6 @@ class LinkedSplitCharts(QtGui.QWidget): orientation='bottom', linked_charts=self ) - self.xaxis_ind = DynamicDateAxis( - orientation='bottom', - linked_charts=self - ) - # if _xaxis_at == 'bottom': # self.xaxis.setStyle(showValues=False) # self.xaxis.hide() @@ -274,7 +269,12 @@ class LinkedSplitCharts(QtGui.QWidget): cv.linked_charts = self # use "indicator axis" by default - xaxis = self.xaxis_ind if xaxis is None else xaxis + if xaxis is None: + xaxis = DynamicDateAxis( + orientation='bottom', + linked_charts=self + ) + cpw = ChartPlotWidget( array=array, parent=self.splitter, @@ -286,6 +286,8 @@ class LinkedSplitCharts(QtGui.QWidget): cursor=self._ch, **cpw_kwargs, ) + cv.chart = cpw + # this name will be used to register the primary # graphics curve managed by the subchart cpw.name = name @@ -357,6 +359,7 @@ class ChartPlotWidget(pg.PlotWidget): ) # self.setViewportMargins(0, 0, 0, 0) self._array = array # readonly view of data + self._arrays = {} # readonly view of overlays self._graphics = {} # registry of underlying graphics self._overlays = {} # registry of overlay curves self._labels = {} # registry of underlying graphics @@ -389,11 +392,19 @@ class ChartPlotWidget(pg.PlotWidget): def last_bar_in_view(self) -> bool: self._array[-1]['index'] - def update_contents_labels(self, index: int) -> None: + def update_contents_labels( + self, + index: int, + # array_name: str, + ) -> None: if index >= 0 and index < len(self._array): - array = self._array - for name, (label, update) in self._labels.items(): + + if name is self.name : + array = self._array + else: + array = self._arrays[name] + update(index, array) def _set_xlimits( @@ -477,7 +488,7 @@ class ChartPlotWidget(pg.PlotWidget): label = ContentsLabel(chart=self, anchor_at=('top', 'left')) self._labels[name] = (label, partial(label.update_from_ohlc, name)) label.show() - self.update_contents_labels(len(data) - 1) + self.update_contents_labels(len(data) - 1) #, name) self._add_sticky(name) @@ -512,6 +523,7 @@ class ChartPlotWidget(pg.PlotWidget): if overlay: anchor_at = ('bottom', 'right') self._overlays[name] = curve + self._arrays[name] = data else: anchor_at = ('top', 'right') @@ -523,7 +535,7 @@ class ChartPlotWidget(pg.PlotWidget): label = ContentsLabel(chart=self, anchor_at=anchor_at) self._labels[name] = (label, partial(label.update_from_value, name)) label.show() - self.update_contents_labels(len(data) - 1) + self.update_contents_labels(len(data) - 1) #, name) if self._cursor: self._cursor.add_curve_cursor(self, curve) @@ -556,9 +568,7 @@ class ChartPlotWidget(pg.PlotWidget): """Update the named internal graphics from ``array``. """ - if name not in self._overlays: - self._array = array - + self._array = array graphics = self._graphics[name] graphics.update_from_array(array, **kwargs) return graphics @@ -574,6 +584,8 @@ class ChartPlotWidget(pg.PlotWidget): """ if name not in self._overlays: self._array = array + else: + self._arrays[name] = array curve = self._graphics[name] # TODO: we should instead implement a diff based @@ -644,40 +656,43 @@ class ChartPlotWidget(pg.PlotWidget): ylow = np.nanmin(bars['low']) yhigh = np.nanmax(bars['high']) except (IndexError, ValueError): - # must be non-ohlc array? + # likely non-ohlc array? + bars = bars[self.name] ylow = np.nanmin(bars) yhigh = np.nanmax(bars) # view margins: stay within a % of the "true range" diff = yhigh - ylow ylow = ylow - (diff * 0.04) - # yhigh = yhigh + (diff * 0.01) + yhigh = yhigh + (diff * 0.04) - # compute contents label "height" in view terms - # to avoid having data "contents" overlap with them - if self._labels: - label = self._labels[self.name][0] + # # compute contents label "height" in view terms + # # to avoid having data "contents" overlap with them + # if self._labels: + # label = self._labels[self.name][0] - rect = label.itemRect() - tl, br = rect.topLeft(), rect.bottomRight() - vb = self.plotItem.vb + # rect = label.itemRect() + # tl, br = rect.topLeft(), rect.bottomRight() + # vb = self.plotItem.vb - try: - # on startup labels might not yet be rendered - top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y()) + # try: + # # on startup labels might not yet be rendered + # top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y()) - # XXX: magic hack, how do we compute exactly? - label_h = (top - bottom) * 0.42 + # # XXX: magic hack, how do we compute exactly? + # label_h = (top - bottom) * 0.42 - except np.linalg.LinAlgError: - label_h = 0 - else: - label_h = 0 + # except np.linalg.LinAlgError: + # label_h = 0 + # else: + # label_h = 0 - # print(f'label height {self.name}: {label_h}') + # # print(f'label height {self.name}: {label_h}') - if label_h > yhigh - ylow: - label_h = 0 + # if label_h > yhigh - ylow: + # label_h = 0 + # print(f"bounds (ylow, yhigh): {(ylow, yhigh)}") + label_h = 0 self.setLimits( yMin=ylow, @@ -715,9 +730,6 @@ async def _async_main( # chart_app.init_search() - # from ._exec import get_screen - # screen = get_screen(chart_app.geometry().bottomRight()) - # XXX: bug zone if you try to ctl-c after this we get hangs again? # wtf... # await tractor.breakpoint() @@ -749,13 +761,28 @@ async def _async_main( chart._set_yrange() + # eventually we'll support some kind of n-compose syntax + fsp_conf = { + 'vwap': { + 'overlay': True, + 'anchor': 'session', + }, + 'rsi': { + 'period': 14, + 'chart_kwargs': { + 'static_yrange': (0, 100), + }, + }, + + } + async with trio.open_nursery() as n: # load initial fsp chain (otherwise known as "indicators") n.start_soon( - chart_from_fsp, + spawn_fsps, linked_charts, - 'rsi', # eventually will be n-compose syntax + fsp_conf, sym, ohlcv, brokermod, @@ -800,6 +827,7 @@ async def chart_from_quotes( vwap_in_history: bool = False, ) -> None: """The 'main' (price) chart real-time update loop. + """ # TODO: bunch of stuff: # - I'm starting to think all this logic should be @@ -836,6 +864,14 @@ async def chart_from_quotes( size_digits=min(float_digits(volume), 3) ) + # TODO: + # - in theory we should be able to read buffer data faster + # then msgs arrive.. needs some tinkering and testing + + # - if trade volume jumps above / below prior L1 price + # levels this might be dark volume we need to + # present differently? + async for quotes in stream: for sym, quote in quotes.items(): # print(f'CHART: {quote}') @@ -862,19 +898,9 @@ async def chart_from_quotes( array, ) - if vwap_in_history: - # update vwap overlay line - chart.update_curve_from_array('vwap', ohlcv.array) - - # TODO: - # - eventually we'll want to update bid/ask labels - # and other data as subscribed by underlying UI - # consumers. - # - in theory we should be able to read buffer data faster - # then msgs arrive.. needs some tinkering and testing - - # if trade volume jumps above / below prior L1 price - # levels adjust bid / ask lines to match + # if vwap_in_history: + # # update vwap overlay line + # chart.update_curve_from_array('vwap', ohlcv.array) # compute max and min trade values to display in view # TODO: we need a streaming minmax algorithm here, see @@ -910,7 +936,7 @@ async def chart_from_quotes( l1.bid_label.update_from_data(0, price) # update min price in view to keep bid on screen - mn_in_view = max(price, mn_in_view) + mn_in_view = min(price, mn_in_view) if mx_in_view > last_mx or mn_in_view < last_mn: chart._set_yrange(yrange=(mn_in_view, mx_in_view)) @@ -923,9 +949,10 @@ async def chart_from_quotes( last_bars_range = brange -async def chart_from_fsp( - linked_charts, - fsp_func_name, +async def spawn_fsps( + linked_charts: LinkedSplitCharts, + # fsp_func_name, + fsps: Dict[str, str], sym, src_shm, brokermod, @@ -934,53 +961,119 @@ async def chart_from_fsp( """Start financial signal processing in subactor. Pass target entrypoint and historical data. + """ - name = f'fsp.{fsp_func_name}' - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) - + # spawns sub-processes which execute cpu bound FSP code async with tractor.open_nursery() as n: - key = f'{sym}.' + name - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, + # spawns local task that consume and chart data streams from + # sub-procs + async with trio.open_nursery() as ln: + + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for fsp_func_name, conf in fsps.items(): + + display_name = f'fsp.{fsp_func_name}' + + # TODO: load function here and introspect + # return stream type(s) + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) + + key = f'{sym}.' + display_name + + # this is all sync currently + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + + # XXX: fsp may have been opened by a duplicate chart. Error for + # now until we figure out how to wrap fsps as "feeds". + assert opened, f"A chart for {key} likely already exists?" + + conf['shm'] = shm + + # spawn closure, can probably define elsewhere + async def spawn_fsp_daemon( + fsp_name, + conf, + ): + """Start an fsp subactor async. + + """ + portal = await n.run_in_actor( + + # name as title of sub-chart + display_name, + + # subactor entrypoint + fsp.cascade, + brokername=brokermod.name, + src_shm_token=src_shm.token, + dst_shm_token=conf['shm'].token, + symbol=sym, + fsp_func_name=fsp_name, + + # tractor config + loglevel=loglevel, + ) + + stream = await portal.result() + + # receive last index for processed historical + # data-array as first msg + _ = await stream.receive() + + conf['stream'] = stream + conf['portal'] = portal + + # new local task + ln.start_soon( + spawn_fsp_daemon, + fsp_func_name, + conf, + ) + + # blocks here until all daemons up + + # start and block on update loops + async with trio.open_nursery() as ln: + for fsp_func_name, conf in fsps.items(): + ln.start_soon( + update_signals, + linked_charts, + fsp_func_name, + conf, + ) + + +async def update_signals( + linked_charts: LinkedSplitCharts, + fsp_func_name: str, + conf: Dict[str, Any], + +) -> None: + """FSP stream chart update loop. + + """ + shm = conf['shm'] + + if conf.get('overlay'): + chart = linked_charts.chart + chart.draw_curve( + name='vwap', + data=shm.array, + overlay=True, ) + last_val_sticky = None - # XXX: fsp may have been opened by a duplicate chart. Error for - # now until we figure out how to wrap fsps as "feeds". - assert opened, f"A chart for {key} likely already exists?" - - # start fsp sub-actor - portal = await n.run_in_actor( - - # name as title of sub-chart - name, - - # subactor entrypoint - fsp.cascade, - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=shm.token, - symbol=sym, - fsp_func_name=fsp_func_name, - - # tractor config - loglevel=loglevel, - ) - - stream = await portal.result() - - # receive last index for processed historical - # data-array as first msg - _ = await stream.receive() - + else: chart = linked_charts.add_plot( name=fsp_func_name, array=shm.array, @@ -989,11 +1082,15 @@ async def chart_from_fsp( ohlc=False, # settings passed down to ``ChartPlotWidget`` - static_yrange=(0, 100), + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), ) # display contents labels asap - chart.update_contents_labels(len(shm.array) - 1) + chart.update_contents_labels( + len(shm.array) - 1, + # fsp_func_name + ) array = shm.array value = array[fsp_func_name][-1] @@ -1004,31 +1101,34 @@ async def chart_from_fsp( chart.update_curve_from_array(fsp_func_name, array) chart.default_view() - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) - # add moveable over-[sold/bought] lines - level_line(chart, 30) - level_line(chart, 70, orient_v='top') + # add moveable over-[sold/bought] lines + level_line(chart, 30) + level_line(chart, 70, orient_v='top') - chart._shm = shm - chart._set_yrange() + chart._shm = shm + chart._set_yrange() - # update chart graphics - async for value in stream: - # p = pg.debug.Profiler(disabled=False, delayed=False) - array = shm.array - value = array[-1][fsp_func_name] + stream = conf['stream'] + + # update chart graphics + async for value in stream: + # p = pg.debug.Profiler(disabled=False, delayed=False) + array = shm.array + value = array[-1][fsp_func_name] + if last_val_sticky: last_val_sticky.update_from_data(-1, value) - chart.update_curve_from_array(fsp_func_name, array) - # p('rendered rsi datum') + chart.update_curve_from_array(fsp_func_name, array) + # p('rendered rsi datum') async def check_for_new_bars(feed, ohlcv, linked_charts): @@ -1081,11 +1181,16 @@ async def check_for_new_bars(feed, ohlcv, linked_charts): for name, curve in price_chart._overlays.items(): - # TODO: standard api for signal lookups per plot - if name in price_chart._array.dtype.fields: + price_chart.update_curve_from_array( + name, + price_chart._arrays[name] + ) - # should have already been incremented above - price_chart.update_curve_from_array(name, price_chart._array) + # # TODO: standard api for signal lookups per plot + # if name in price_chart._array.dtype.fields: + + # # should have already been incremented above + # price_chart.update_curve_from_array(name, price_chart._array) for name, chart in linked_charts.subplots.items(): chart.update_curve_from_array(chart.name, chart._shm.array) From acf8aeb33ee26a30df7941717e438bc68ca93181 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Nov 2020 12:31:45 -0500 Subject: [PATCH 02/28] Allocate space for 2d worth of 5s bars --- piker/data/_sharedmem.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 7f90d1ae..77dac544 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -214,10 +214,12 @@ class ShmArray: ... +_lotsa_5s = int(5*60*60*10/5) + def open_shm_array( key: Optional[str] = None, # approx number of 5s bars in a "day" x2 - size: int = int(2*60*60*10/5), + size: int = _lotsa_5s, dtype: Optional[np.dtype] = None, readonly: bool = False, ) -> ShmArray: @@ -269,7 +271,7 @@ def open_shm_array( def attach_shm_array( token: Tuple[str, str, Tuple[str, str]], - size: int = int(60*60*10/5), + size: int = _lotsa_5s, readonly: bool = True, ) -> ShmArray: """Load and attach to an existing shared memory array previously From bfcf5170dc0e853da4fc80bedbfa884139c7c717 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Nov 2020 12:33:59 -0500 Subject: [PATCH 03/28] Add commented ex. code for line price charts --- piker/ui/_chart.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index edb9d6cd..60b55e5d 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -123,6 +123,7 @@ class ChartSpace(QtGui.QWidget): self, symbol: str, data: np.ndarray, + ohlc: bool = True, ) -> None: """Load a new contract into the charting app. """ @@ -147,7 +148,7 @@ class ChartSpace(QtGui.QWidget): if not self.v_layout.isEmpty(): self.v_layout.removeWidget(linkedcharts) - main_chart = linkedcharts.plot_main(s, data) + main_chart = linkedcharts.plot_main(s, data, ohlc=ohlc) self.v_layout.addWidget(linkedcharts) return linkedcharts, main_chart @@ -234,7 +235,7 @@ class LinkedSplitCharts(QtGui.QWidget): name=symbol.key, array=array, xaxis=self.xaxis, - ohlc=True, + ohlc=ohlc, _is_main=True, ) # add crosshair graphic @@ -746,6 +747,15 @@ async def _async_main( ohlcv = feed.shm bars = ohlcv.array + # TODO: when we start messing with line charts + # c = np.zeros(len(bars), dtype=[ + # (sym, bars.dtype.fields['close'][0]), + # ('index', 'i4'), + # ]) + # c[sym] = bars['close'] + # c['index'] = bars['index'] + # linked_charts, chart = chart_app.load_symbol(sym, c, ohlc=False) + # load in symbol's ohlc data linked_charts, chart = chart_app.load_symbol(sym, bars) @@ -849,6 +859,11 @@ async def chart_from_quotes( l, lbar, rbar, r = last_bars_range in_view = array[lbar:rbar] mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) + + # TODO: when we start using line charts + # sym = chart.name + # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) + return last_bars_range, mx, mn last_bars_range, last_mx, last_mn = maxmin() @@ -898,6 +913,11 @@ async def chart_from_quotes( array, ) + # chart.update_curve_from_array( + # chart.name, + # TODO: when we start using line charts + # np.array(array['close'], dtype=[(chart.name, 'f8')]) + # if vwap_in_history: # # update vwap overlay line # chart.update_curve_from_array('vwap', ohlcv.array) From 957228055317a6f06dddfe4712ee0d62805e7d43 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Nov 2020 10:18:15 -0500 Subject: [PATCH 04/28] Kill the tractor tree on window close. This makes it so you don't have to ctrl-c kill apps. Add in the experimental openGL support even though I'm pretty sure it's not being used much for curve plotting (but could be wrong). --- piker/ui/_exec.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 732db3e2..3e3f57bf 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -26,6 +26,7 @@ from typing import Tuple, Callable, Dict, Any # Qt specific import PyQt5 # noqa +import pyqtgraph as pg from pyqtgraph import QtGui from PyQt5 import QtCore from PyQt5.QtCore import ( @@ -37,6 +38,12 @@ import tractor from outcome import Error +# pyqtgraph global config +# might as well enable this for now? +pg.useOpenGL = True +pg.enableExperimental = True + + # singleton app per actor _qt_app: QtGui.QApplication = None _qt_win: QtGui.QMainWindow = None @@ -67,6 +74,12 @@ class MainWindow(QtGui.QMainWindow): self.setMinimumSize(*self.size) self.setWindowTitle(self.title) + def closeEvent(self, event: 'QCloseEvent') -> None: + """Cancel the root actor asap. + + """ + tractor.current_actor().cancel_soon() + def run_qtractor( func: Callable, @@ -131,6 +144,9 @@ def run_qtractor( instance = main_widget() instance.window = window + # kill the app when root actor terminates + tractor._actor._lifetime_stack.callback(app.quit) + widgets = { 'window': window, 'main': instance, From c625dc90f1e97665c7c0fb71581bd9856e6cd064 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Nov 2020 11:41:33 -0500 Subject: [PATCH 05/28] Use new global var stack from tractor --- piker/data/_sharedmem.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 77dac544..a992a2b2 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -216,6 +216,7 @@ class ShmArray: _lotsa_5s = int(5*60*60*10/5) + def open_shm_array( key: Optional[str] = None, # approx number of 5s bars in a "day" x2 @@ -263,9 +264,9 @@ def open_shm_array( # "unlink" created shm on process teardown by # pushing teardown calls onto actor context stack - actor = tractor.current_actor() - actor._lifetime_stack.callback(shmarr.close) - actor._lifetime_stack.callback(shmarr.destroy) + tractor._actor._lifetime_stack.callback(shmarr.close) + tractor._actor._lifetime_stack.callback(shmarr.destroy) + return shmarr @@ -310,8 +311,8 @@ def attach_shm_array( _known_tokens[key] = token # "close" attached shm on process teardown - actor = tractor.current_actor() - actor._lifetime_stack.callback(sha.close) + tractor._actor._lifetime_stack.callback(sha.close) + return sha From f9e4e9507d97e5ea5ffa9a18722419caac755e09 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Nov 2020 17:39:14 -0500 Subject: [PATCH 06/28] Tweak axis text offset and margins --- piker/ui/_axes.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/piker/ui/_axes.py b/piker/ui/_axes.py index e0be7178..193d5b62 100644 --- a/piker/ui/_axes.py +++ b/piker/ui/_axes.py @@ -51,6 +51,8 @@ class Axis(pg.AxisItem): self.setStyle(**{ 'textFillLimits': [(0, 0.666)], 'tickFont': _font.font, + # offset of text *away from* axis line in px + 'tickTextOffset': 2, }) self.setTickFont(_font.font) @@ -88,11 +90,10 @@ class PriceAxis(Axis): # print(f'digits: {digits}') return [ - ('{value:,.{digits}f}') - .format( - digits=digits, - value=v, - ).replace(',', ' ') for v in vals + ('{value:,.{digits}f}').format( + digits=digits, + value=v, + ).replace(',', ' ') for v in vals ] @@ -104,10 +105,11 @@ class DynamicDateAxis(Axis): 60: '%H:%M', 30: '%H:%M:%S', 5: '%H:%M:%S', + 1: '%H:%M:%S', } def resize(self) -> None: - self.setHeight(self.typical_br.height() + 3) + self.setHeight(self.typical_br.height() + 1) def _indexes_to_timestrs( self, @@ -228,6 +230,7 @@ class AxisLabel(pg.GraphicsObject): class XAxisLabel(AxisLabel): + _w_margin = 4 text_flags = ( QtCore.Qt.TextDontClip @@ -255,14 +258,13 @@ class XAxisLabel(AxisLabel): w = self.boundingRect().width() self.setPos(QPointF( abs_pos.x() - w / 2 - offset, - 0, + 1, )) self.update() class YAxisLabel(AxisLabel): - _h_margin = 3 - # _w_margin = 1 + _h_margin = 2 text_flags = ( # QtCore.Qt.AlignLeft @@ -289,7 +291,7 @@ class YAxisLabel(AxisLabel): br = self.boundingRect() h = br.height() self.setPos(QPointF( - 0, + 1, abs_pos.y() - h / 2 - offset )) self.update() From 8aede3cbcbb264c4b249f2caba15a719ede97d54 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Nov 2020 09:46:48 -0500 Subject: [PATCH 07/28] Add field diffing on failed push --- piker/data/_sharedmem.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index a992a2b2..55f3675e 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -194,9 +194,30 @@ class ShmArray: # TODO: use .index for actual ring logic? index = self._i.value end = index + length - self._array[index:end] = data[:] - self._i.value = end - return end + try: + self._array[index:end] = data[:] + self._i.value = end + return end + except ValueError as err: + # reraise with any field discrepancy + our_fields, their_fields = ( + set(self._array.dtype.fields), + set(data.dtype.fields), + ) + + only_in_ours = our_fields - their_fields + only_in_theirs = their_fields - our_fields + + if only_in_ours: + raise TypeError( + f"Input array is missing field(s): {only_in_ours}" + ) + elif only_in_theirs: + raise TypeError( + f"Input array has unknown field(s): {only_in_theirs}" + ) + else: + raise err def close(self) -> None: self._i._shm.close() @@ -214,7 +235,7 @@ class ShmArray: ... -_lotsa_5s = int(5*60*60*10/5) +_lotsa_5s = int(5 * 60 * 60 * 10 / 5) def open_shm_array( From 413c703e34f044f148774832509d4ddb896250f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 23 Nov 2020 23:32:55 -0500 Subject: [PATCH 08/28] Draw bars using `QPainterPath` magic This gives a massive speedup when viewing large bar sets (think a day's worth of 5s bars) by using the `pg.functions.arrayToQPath()` "magic" binary array writing that is also used in `PlotCurveItem`. We're using this same (lower level) function directly to draw bars as part of one large path and it seems to be painting 15k (ish) bars with around 3ms `.paint()` latency. The only thing still a bit slow is the path array generation despite doing it with `numba`. Likely, either having multiple paths or, only regenerating the missing backing array elements should speed this up further to avoid slight delays when incrementing the bar step. This is of course a first draft and more cleanups are coming. --- piker/ui/_graphics.py | 369 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 306 insertions(+), 63 deletions(-) diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py index 88193a72..11e5ae82 100644 --- a/piker/ui/_graphics.py +++ b/piker/ui/_graphics.py @@ -18,16 +18,16 @@ Chart graphics for displaying a slew of different data types. """ -# import time +import time from typing import List, Optional, Tuple import numpy as np import pyqtgraph as pg -# from numba import jit, float64, optional, int64 +from numba import jit, float64, optional, int64 from PyQt5 import QtCore, QtGui from PyQt5.QtCore import QLineF, QPointF -# from .._profile import timeit +from .._profile import timeit from ._style import ( _xaxis_at, hcolor, @@ -44,6 +44,8 @@ _debounce_delay = 1 / 2e3 _ch_label_opac = 1 +# TODO: we need to handle the case where index is outside +# the underlying datums range class LineDot(pg.CurvePoint): def __init__( @@ -149,8 +151,9 @@ class ContentsLabel(pg.LabelItem): index: int, array: np.ndarray, ) -> None: - data = array[index][name] - self.setText(f"{name}: {data:.2f}") + if index < len(array): + data = array[index][name] + self.setText(f"{name}: {data:.2f}") class CrossHair(pg.GraphicsObject): @@ -246,7 +249,7 @@ class CrossHair(pg.GraphicsObject): ) -> LineDot: # if this plot contains curves add line dot "cursors" to denote # the current sample under the mouse - cursor = LineDot(curve, index=len(plot._array)) + cursor = LineDot(curve, index=len(plot._ohlc)) plot.addItem(cursor) self.graphics[plot].setdefault('cursors', []).append(cursor) return cursor @@ -341,18 +344,45 @@ class CrossHair(pg.GraphicsObject): # nopython=True, # nogil=True # ) -def _mk_lines_array(data: List, size: int) -> np.ndarray: - """Create an ndarray to hold lines graphics objects. +def _mk_lines_array( + data: List, + size: int, + elements_step: int = 6, +) -> np.ndarray: + """Create an ndarray to hold lines graphics info. + """ return np.zeros_like( data, - shape=(int(size), 3), + shape=(int(size), elements_step), dtype=object, ) -# TODO: `numba` this? +def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]: + open, high, low, close, index = row[ + ['open', 'high', 'low', 'close', 'index']] + # high -> low vertical (body) line + if low != high: + hl = QLineF(index, low, index, high) + else: + # XXX: if we don't do it renders a weird rectangle? + # see below for filtering this later... + hl = None + + # NOTE: place the x-coord start as "middle" of the drawing range such + # that the open arm line-graphic is at the left-most-side of + # the index's range according to the view mapping. + + # open line + o = QLineF(index - w, open, index, open) + # close line + c = QLineF(index, close, index + w, close) + + return [hl, o, c] + +# TODO: `numba` this? # @jit( # # float64[:]( # # float64[:], @@ -370,7 +400,7 @@ def bars_from_ohlc( """Generate an array of lines objects from input ohlc data. """ - lines = _mk_lines_array(data, data.shape[0]) + lines = _mk_lines_array(data, data.shape[0], 3) for i, q in enumerate(data[start:], start=start): open, high, low, close, index = q[ @@ -424,6 +454,94 @@ def bars_from_ohlc( return lines +# @timeit +@jit( + # float64[:]( + # float64[:], + # optional(float64), + # optional(int64) + # ), + nopython=True, + nogil=True +) +def path_arrays_from_ohlc( + data: np.ndarray, + w: float64, + start: int64 = int64(0), +) -> np.ndarray: + """Generate an array of lines objects from input ohlc data. + + """ + size = int(data.shape[0] * 6) + + x = np.zeros( + # data, + shape=size, + dtype=float64, + ) + y = np.zeros( + # data, + shape=size, + dtype=float64, + ) + c = np.zeros( + # data, + shape=size, + dtype=float64, + ) + + # TODO: report bug for assert + # @ /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 + # for i, q in enumerate(data[start:], start): + for i, q in enumerate(data[start:], start): + + # TODO: ask numba why this doesn't work.. + # open, high, low, close, index = q[ + # ['open', 'high', 'low', 'close', 'index']] + + open = q['open'] + high = q['high'] + low = q['low'] + close = q['close'] + index = float64(q['index']) + + istart = i * 6 + istop = istart + 6 + + # write points for x, y, and connections + x[istart:istop] = ( + index - w, + index, + index, + index, + index, + index + w, + ) + y[istart:istop] = ( + open, + open, + low, + high, + close, + close, + ) + c[istart:istop] = (0, 1, 1, 1, 1, 1) + + return x, y, c + + +@timeit +def gen_qpath( + data, + w, + start, +) -> QtGui.QPainterPath: + + x, y, c = path_arrays_from_ohlc(data, w, start=start) + return pg.functions.arrayToQPath(x, y, connect=c) + + + class BarItems(pg.GraphicsObject): """Price range bars graphics rendered from a OHLC sequence. """ @@ -431,6 +549,9 @@ class BarItems(pg.GraphicsObject): # 0.5 is no overlap between arms, 1.0 is full overlap w: float = 0.43 + + # XXX: for the mega-lulz increasing width here increases draw latency... + # so probably don't do it until we figure that out. bars_pen = pg.mkPen(hcolor('bracket')) # XXX: tina mode, see below @@ -443,7 +564,8 @@ class BarItems(pg.GraphicsObject): plotitem: 'pg.PlotItem', # noqa ) -> None: super().__init__() - self.last = QtGui.QPicture() + + self.last_bar = QtGui.QPicture() self.history = QtGui.QPicture() # TODO: implement updateable pixmap solution self._pi = plotitem @@ -456,7 +578,11 @@ class BarItems(pg.GraphicsObject): # XXX: not sure this actually needs to be an array other # then for the old tina mode calcs for up/down bars below? # lines container - self.lines = _mk_lines_array([], 50e3) + day_in_s = 60 * 60 * 12 + self.lines = _mk_lines_array([], 50e3, 6) + # TODO: don't render the full backing array each time + # self._path_data = None + self._last_bar_lines = None # track the current length of drawable lines within the larger array self.index: int = 0 @@ -471,67 +597,136 @@ class BarItems(pg.GraphicsObject): This routine is usually only called to draw the initial history. """ - lines = bars_from_ohlc(data, self.w, start=start) + # start_lines = time.time() + + # lines = bars_from_ohlc(data, self.w, start=start) + + # start_path = time.time() + # assert len(data) == 2000 + + self.path = gen_qpath(data, self.w, start=start) + + # end = time.time() + # print(f"paths took {end - start_path}\n lines took {start_path - start_lines}") # save graphics for later reference and keep track # of current internal "last index" - index = len(lines) - self.lines[:index] = lines - self.index = index + # index = len(lines) + # index = len(data) + # self.lines[:index] = lines + # lines = bars_from_ohlc(data[-1:], self.w, start=start) + + self.index = len(data) # up to last to avoid double draw of last bar - self.draw_lines(just_history=True, iend=self.index - 1) - self.draw_lines(iend=self.index) + # self.draw_lines(just_history=True, iend=self.index - 1, path=self.path) - # @timeit + # self.draw_lines(iend=self.index) + + self._last_bar_lines = lines_from_ohlc(data[-1], self.w) + + # create pics + self.draw_history() + self.draw_last_bar() + + # trigger render + # https://doc.qt.io/qt-5/qgraphicsitem.html#update + self.update() + + def draw_last_bar(self) -> None: + + # pic = self.last_bar + + # pre-computing a QPicture object allows paint() to run much + # more quickly, rather than re-drawing the shapes every time. + p = QtGui.QPainter(self.last_bar) + p.setPen(self.bars_pen) + + # print(self._last_bar_lines) + p.drawLines(*tuple(filter(bool, self._last_bar_lines))) + p.end() + + # trigger re-render + # https://doc.qt.io/qt-5/qgraphicsitem.html#update + # self.update() + + def draw_history(self) -> None: + p = QtGui.QPainter(self.history) + p.setPen(self.bars_pen) + p.drawPath(self.path) + p.end() + + # self.update() + + @timeit def draw_lines( self, - istart=0, iend=None, just_history=False, + istart=0, + path: QtGui.QPainterPath = None, + # TODO: could get even fancier and only update the single close line? lines=None, ) -> None: """Draw the current line set using the painter. + + Currently this draws lines to a cached ``QPicture`` which + is supposed to speed things up on ``.paint()`` calls (which + is a call to ``QPainter.drawPicture()`` but I'm not so sure. """ - if just_history: - # draw bars for the "history" picture - iend = iend or self.index - 1 - pic = self.history - else: - # draw the last bar - istart = self.index - 1 - iend = iend or self.index - pic = self.last + # if path is None: + # if just_history: + # raise RuntimeError + # # draw bars for the "history" picture + # iend = iend or self.index - 1 + # pic = self.history + # else: + # # draw the last bar + # istart = self.index - 1 + # iend = iend or self.index - # use 2d array of lines objects, see conlusion on speed: - # https://stackoverflow.com/a/60089929 - flat = np.ravel(self.lines[istart:iend]) + # pic = self.last_bar - # TODO: do this with numba for speed gain: - # https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach - to_draw = flat[np.where(flat != None)] # noqa + # # use 2d array of lines objects, see conlusion on speed: + # # https://stackoverflow.com/a/60089929 + # flat = np.ravel(self.lines[istart:iend]) + + # # TODO: do this with numba for speed gain: + # # https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach + # to_draw = flat[np.where(flat != None)] # noqa + + # else: + # pic = self.history + + pic = self.last_bar # pre-computing a QPicture object allows paint() to run much # more quickly, rather than re-drawing the shapes every time. p = QtGui.QPainter(pic) p.setPen(self.bars_pen) - # TODO: is there any way to not have to pass all the lines every - # iteration? It seems they won't draw unless it's done this way.. - p.drawLines(*to_draw) + p.drawLines(*self._last_bar_lines) p.end() - # XXX: if we ever try using `QPixmap` again... - # if self._pmi is None: - # self._pmi = self.scene().addPixmap(self.picture) - # else: - # self._pmi.setPixmap(self.picture) - # trigger re-render # https://doc.qt.io/qt-5/qgraphicsitem.html#update self.update() + # TODO: is there any way to not have to pass all the lines every + # iteration? It seems they won't draw unless it's done this way.. + # if path is None: + # # p.drawLines(*to_draw) + # p.drawLines(*self._last_bars_lines) + # else: + # p.drawPath(path) + + # p.end() + + # trigger re-render + # https://doc.qt.io/qt-5/qgraphicsitem.html#update + # self.update() + def update_from_array( self, array: np.ndarray, @@ -552,25 +747,51 @@ class BarItems(pg.GraphicsObject): # start_bar_to_update = index - 100 + # TODO: allow mapping only a range of lines thus + # only drawing as many bars as exactly specified. if extra > 0: # generate new graphics to match provided array + + # lines = bars_from_ohlc(new, self.w) + # lines = bars_from_ohlc(array[-1:], self.w) + self._last_bar_lines = lines_from_ohlc(array[-1], self.w) + + # TODO: only draw these new bars to the backing binary + # path array and then call arrayToQpath() on the whole + # -> will avoid multiple passes for path data we've already + # already generated new = array[index:index + extra] - lines = bars_from_ohlc(new, self.w) - bars_added = len(lines) - self.lines[index:index + bars_added] = lines - self.index += bars_added + + self.path = gen_qpath(array[:-1], self.w, start=0) + + # self.path.connectPath(path) + + # bars_added = len(new) + # bars_added = extra + # self.lines[index:index + bars_added] = lines + + self.index += extra # start_bar_to_update = index - bars_added - self.draw_lines(just_history=True) + # self.draw_lines(just_history=True, path=self.path) + # self.update() + + self.draw_history() + if just_history: + self.update() + return - # current bar update + # last bar update i, o, h, l, last, v = array[-1][ ['index', 'open', 'high', 'low', 'close', 'volume'] ] - assert i == self.index - 1 - body, larm, rarm = self.lines[i] + # assert i == self.index - 1 + + # body, larm, rarm = self.lines[i] + # body, larm, rarm = self._bars + body, larm, rarm = self._last_bar_lines # XXX: is there a faster way to modify this? rarm.setLine(rarm.x1(), last, rarm.x2(), last) @@ -579,18 +800,30 @@ class BarItems(pg.GraphicsObject): if l != h: # noqa if body is None: - body = self.lines[index - 1][0] = QLineF(i, l, i, h) + # body = self.lines[index - 1][0] = QLineF(i, l, i, h) + body = self._last_bar_lines[0] = QLineF(i, l, i, h) else: # update body body.setLine(i, l, i, h) - else: - # XXX: h == l -> remove any HL line to avoid render bug - if body is not None: - body = self.lines[index - 1][0] = None - self.draw_lines(just_history=False) + # XXX: pretty sure this is causing an issue where the bar has + # a large upward move right before the next sample and the body + # is getting set to None since the next bar is flat but the shm + # array index update wasn't read by the time this code runs. Iow + # we're doing this removal of the body for a bar index that is + # now out of date / from some previous sample. It's weird + # though because i've seen it do this to bars i - 3 back? - # @timeit + # else: + # # XXX: h == l -> remove any HL line to avoid render bug + # if body is not None: + # body = self.lines[index - 1][0] = None + + # self.draw_lines(just_history=False) + self.draw_last_bar() + self.update() + + @timeit def paint(self, p, opt, widget): # profiler = pg.debug.Profiler(disabled=False, delayed=False) @@ -606,8 +839,17 @@ class BarItems(pg.GraphicsObject): # as is necesarry for what's in "view". Not sure if this will # lead to any perf gains other then when zoomed in to less bars # in view. - p.drawPicture(0, 0, self.history) - p.drawPicture(0, 0, self.last) + # p.drawPicture(0, 0, self.history) + p.drawPicture(0, 0, self.last_bar) + + # p = QtGui.QPainter(pic) + p.setPen(self.bars_pen) + # p.drawLines(*self._last_bar_lines) + + # TODO: is there any way to not have to pass all the lines every + # iteration? It seems they won't draw unless it's done this way.. + p.drawPath(self.path) + # TODO: if we can ever make pixmaps work... # p.drawPixmap(0, 0, self.picture) @@ -616,6 +858,7 @@ class BarItems(pg.GraphicsObject): # profiler('bars redraw:') + # @timeit def boundingRect(self): # TODO: can we do rect caching to make this faster? @@ -626,7 +869,7 @@ class BarItems(pg.GraphicsObject): # bounding rect for us). # compute aggregate bounding rectangle - lb = self.last.boundingRect() + lb = self.last_bar.boundingRect() hb = self.history.boundingRect() return QtCore.QRectF( # top left From f083f537b1e6faa569445f6cc880c1f0ab3dccf2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 24 Nov 2020 12:01:06 -0500 Subject: [PATCH 09/28] Get `QPainterPath` "append" working Pertains further to #109. Instead of redrawing the entire `QPainterPath` every time there's a historical bars update just use `.addPath()` to slap in latest history. It seems to work and is fast. This also seems like it will be a great strategy for filling in earlier data, woot! --- piker/ui/_graphics.py | 263 +++++------------------------------------- 1 file changed, 32 insertions(+), 231 deletions(-) diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py index 11e5ae82..b70c6641 100644 --- a/piker/ui/_graphics.py +++ b/piker/ui/_graphics.py @@ -18,12 +18,11 @@ Chart graphics for displaying a slew of different data types. """ -import time from typing import List, Optional, Tuple import numpy as np import pyqtgraph as pg -from numba import jit, float64, optional, int64 +from numba import jit, float64, int64 from PyQt5 import QtCore, QtGui from PyQt5.QtCore import QLineF, QPointF @@ -335,15 +334,6 @@ class CrossHair(pg.GraphicsObject): return self.plots[0].boundingRect() -# @jit( -# # float64[:]( -# # float64[:], -# # optional(float64), -# # optional(int16) -# # ), -# nopython=True, -# nogil=True -# ) def _mk_lines_array( data: List, size: int, @@ -382,77 +372,6 @@ def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]: return [hl, o, c] -# TODO: `numba` this? -# @jit( -# # float64[:]( -# # float64[:], -# # optional(float64), -# # optional(int16) -# # ), -# nopython=True, -# nogil=True -# ) -def bars_from_ohlc( - data: np.ndarray, - w: float, - start: int = 0, -) -> np.ndarray: - """Generate an array of lines objects from input ohlc data. - - """ - lines = _mk_lines_array(data, data.shape[0], 3) - - for i, q in enumerate(data[start:], start=start): - open, high, low, close, index = q[ - ['open', 'high', 'low', 'close', 'index']] - - # high -> low vertical (body) line - if low != high: - hl = QLineF(index, low, index, high) - else: - # XXX: if we don't do it renders a weird rectangle? - # see below for filtering this later... - hl = None - - # NOTE: place the x-coord start as "middle" of the drawing range such - # that the open arm line-graphic is at the left-most-side of - # the index's range according to the view mapping. - - # open line - o = QLineF(index - w, open, index, open) - # close line - c = QLineF(index, close, index + w, close) - - # indexing here is as per the below comments - lines[i] = (hl, o, c) - - # XXX: in theory we could get a further speedup by using a flat - # array and avoiding the call to `np.ravel()` below? - # lines[3*i:3*i+3] = (hl, o, c) - - # XXX: legacy code from candles custom graphics: - # if not _tina_mode: - # else _tina_mode: - # self.lines = lines = np.concatenate( - # [high_to_low, open_sticks, close_sticks]) - # use traditional up/down green/red coloring - # long_bars = np.resize(Quotes.close > Quotes.open, len(lines)) - # short_bars = np.resize( - # Quotes.close < Quotes.open, len(lines)) - - # ups = lines[long_bars] - # downs = lines[short_bars] - - # # draw "up" bars - # p.setPen(self.bull_brush) - # p.drawLines(*ups) - - # # draw "down" bars - # p.setPen(self.bear_brush) - # p.drawLines(*downs) - - return lines - # @timeit @jit( @@ -490,9 +409,8 @@ def path_arrays_from_ohlc( dtype=float64, ) - # TODO: report bug for assert - # @ /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 - # for i, q in enumerate(data[start:], start): + # TODO: report bug for assert @ + # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 for i, q in enumerate(data[start:], start): # TODO: ask numba why this doesn't work.. @@ -541,7 +459,6 @@ def gen_qpath( return pg.functions.arrayToQPath(x, y, connect=c) - class BarItems(pg.GraphicsObject): """Price range bars graphics rendered from a OHLC sequence. """ @@ -554,10 +471,6 @@ class BarItems(pg.GraphicsObject): # so probably don't do it until we figure that out. bars_pen = pg.mkPen(hcolor('bracket')) - # XXX: tina mode, see below - # bull_brush = pg.mkPen('#00cc00') - # bear_brush = pg.mkPen('#fa0000') - def __init__( self, # scene: 'QGraphicsScene', # noqa @@ -567,22 +480,18 @@ class BarItems(pg.GraphicsObject): self.last_bar = QtGui.QPicture() self.history = QtGui.QPicture() + # TODO: implement updateable pixmap solution self._pi = plotitem - # self._scene = plotitem.vb.scene() - # self.picture = QtGui.QPixmap(1000, 300) - # plotitem.addItem(self.picture) - # self._pmi = None - # self._pmi = self._scene.addPixmap(self.picture) # XXX: not sure this actually needs to be an array other # then for the old tina mode calcs for up/down bars below? # lines container - day_in_s = 60 * 60 * 12 self.lines = _mk_lines_array([], 50e3, 6) + # TODO: don't render the full backing array each time # self._path_data = None - self._last_bar_lines = None + self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None # track the current length of drawable lines within the larger array self.index: int = 0 @@ -597,32 +506,13 @@ class BarItems(pg.GraphicsObject): This routine is usually only called to draw the initial history. """ - # start_lines = time.time() - - # lines = bars_from_ohlc(data, self.w, start=start) - - # start_path = time.time() - # assert len(data) == 2000 - self.path = gen_qpath(data, self.w, start=start) - # end = time.time() - # print(f"paths took {end - start_path}\n lines took {start_path - start_lines}") - # save graphics for later reference and keep track # of current internal "last index" - # index = len(lines) - # index = len(data) - # self.lines[:index] = lines - # lines = bars_from_ohlc(data[-1:], self.w, start=start) - self.index = len(data) # up to last to avoid double draw of last bar - # self.draw_lines(just_history=True, iend=self.index - 1, path=self.path) - - # self.draw_lines(iend=self.index) - self._last_bar_lines = lines_from_ohlc(data[-1], self.w) # create pics @@ -634,99 +524,23 @@ class BarItems(pg.GraphicsObject): self.update() def draw_last_bar(self) -> None: + """Currently this draws lines to a cached ``QPicture`` which + is supposed to speed things up on ``.paint()`` calls (which + is a call to ``QPainter.drawPicture()`` but I'm not so sure. - # pic = self.last_bar - - # pre-computing a QPicture object allows paint() to run much - # more quickly, rather than re-drawing the shapes every time. + """ p = QtGui.QPainter(self.last_bar) p.setPen(self.bars_pen) - - # print(self._last_bar_lines) p.drawLines(*tuple(filter(bool, self._last_bar_lines))) p.end() - # trigger re-render - # https://doc.qt.io/qt-5/qgraphicsitem.html#update - # self.update() - def draw_history(self) -> None: p = QtGui.QPainter(self.history) p.setPen(self.bars_pen) p.drawPath(self.path) p.end() - # self.update() - @timeit - def draw_lines( - self, - iend=None, - just_history=False, - istart=0, - path: QtGui.QPainterPath = None, - - # TODO: could get even fancier and only update the single close line? - lines=None, - ) -> None: - """Draw the current line set using the painter. - - Currently this draws lines to a cached ``QPicture`` which - is supposed to speed things up on ``.paint()`` calls (which - is a call to ``QPainter.drawPicture()`` but I'm not so sure. - """ - # if path is None: - # if just_history: - # raise RuntimeError - # # draw bars for the "history" picture - # iend = iend or self.index - 1 - # pic = self.history - # else: - # # draw the last bar - # istart = self.index - 1 - # iend = iend or self.index - - # pic = self.last_bar - - # # use 2d array of lines objects, see conlusion on speed: - # # https://stackoverflow.com/a/60089929 - # flat = np.ravel(self.lines[istart:iend]) - - # # TODO: do this with numba for speed gain: - # # https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach - # to_draw = flat[np.where(flat != None)] # noqa - - # else: - # pic = self.history - - pic = self.last_bar - - # pre-computing a QPicture object allows paint() to run much - # more quickly, rather than re-drawing the shapes every time. - p = QtGui.QPainter(pic) - p.setPen(self.bars_pen) - - p.drawLines(*self._last_bar_lines) - p.end() - - # trigger re-render - # https://doc.qt.io/qt-5/qgraphicsitem.html#update - self.update() - - # TODO: is there any way to not have to pass all the lines every - # iteration? It seems they won't draw unless it's done this way.. - # if path is None: - # # p.drawLines(*to_draw) - # p.drawLines(*self._last_bars_lines) - # else: - # p.drawPath(path) - - # p.end() - - # trigger re-render - # https://doc.qt.io/qt-5/qgraphicsitem.html#update - # self.update() - def update_from_array( self, array: np.ndarray, @@ -740,57 +554,49 @@ class BarItems(pg.GraphicsObject): graphics object, and then update/rerender, but here we're assuming the prior graphics havent changed (OHLC history rarely does) so this "should" be simpler and faster. + + This routine should be made (transitively) as fast as possible. """ index = self.index length = len(array) extra = length - index - # start_bar_to_update = index - 100 - # TODO: allow mapping only a range of lines thus # only drawing as many bars as exactly specified. if extra > 0: - # generate new graphics to match provided array - # lines = bars_from_ohlc(new, self.w) - # lines = bars_from_ohlc(array[-1:], self.w) + # generate new lines objects for updatable "current bar" self._last_bar_lines = lines_from_ohlc(array[-1], self.w) + self.draw_last_bar() - # TODO: only draw these new bars to the backing binary - # path array and then call arrayToQpath() on the whole - # -> will avoid multiple passes for path data we've already - # already generated - new = array[index:index + extra] + # generate new graphics to match provided array + # path appending logic: + # we need to get the previous "current bar(s)" for the time step + # and convert it to a path to append to the historical set + new_history_istart = length - 2 + to_history = array[new_history_istart:new_history_istart + extra] + # generate a new sub-path for this now-ready-for-history bar set + new_history_qpath = gen_qpath(to_history, self.w, 0) - self.path = gen_qpath(array[:-1], self.w, start=0) - - # self.path.connectPath(path) - - # bars_added = len(new) - # bars_added = extra - # self.lines[index:index + bars_added] = lines + # move to position of placement for the next bar in history + # and append new sub-path + new_bars = array[index:index + extra] + self.path.moveTo(float(index - self.w), float(new_bars[0]['open'])) + self.path.addPath(new_history_qpath) self.index += extra - # start_bar_to_update = index - bars_added - # self.draw_lines(just_history=True, path=self.path) - # self.update() - self.draw_history() if just_history: self.update() - return # last bar update i, o, h, l, last, v = array[-1][ ['index', 'open', 'high', 'low', 'close', 'volume'] ] - # assert i == self.index - 1 - - # body, larm, rarm = self.lines[i] - # body, larm, rarm = self._bars + assert i == self.index - 1 body, larm, rarm = self._last_bar_lines # XXX: is there a faster way to modify this? @@ -800,7 +606,6 @@ class BarItems(pg.GraphicsObject): if l != h: # noqa if body is None: - # body = self.lines[index - 1][0] = QLineF(i, l, i, h) body = self._last_bar_lines[0] = QLineF(i, l, i, h) else: # update body @@ -819,7 +624,6 @@ class BarItems(pg.GraphicsObject): # if body is not None: # body = self.lines[index - 1][0] = None - # self.draw_lines(just_history=False) self.draw_last_bar() self.update() @@ -842,15 +646,9 @@ class BarItems(pg.GraphicsObject): # p.drawPicture(0, 0, self.history) p.drawPicture(0, 0, self.last_bar) - # p = QtGui.QPainter(pic) p.setPen(self.bars_pen) - # p.drawLines(*self._last_bar_lines) - - # TODO: is there any way to not have to pass all the lines every - # iteration? It seems they won't draw unless it's done this way.. p.drawPath(self.path) - # TODO: if we can ever make pixmaps work... # p.drawPixmap(0, 0, self.picture) # self._pmi.setPixmap(self.picture) @@ -860,7 +658,10 @@ class BarItems(pg.GraphicsObject): # @timeit def boundingRect(self): - # TODO: can we do rect caching to make this faster? + # TODO: can we do rect caching to make this faster + # like `pg.PlotCurveItem` does? In theory it's just + # computing max/min stuff again like we do in the udpate loop + # anyway. # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect # boundingRect _must_ indicate the entire area that will be From 949e9d6cd111aab4d878fd2c27a4b145794f9d7d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 24 Nov 2020 14:48:52 -0500 Subject: [PATCH 10/28] Drop commented pixmap cruft See #124 as to why we'll probably never need this. --- piker/ui/_graphics.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py index b70c6641..8ea108ce 100644 --- a/piker/ui/_graphics.py +++ b/piker/ui/_graphics.py @@ -481,7 +481,6 @@ class BarItems(pg.GraphicsObject): self.last_bar = QtGui.QPicture() self.history = QtGui.QPicture() - # TODO: implement updateable pixmap solution self._pi = plotitem # XXX: not sure this actually needs to be an array other @@ -649,13 +648,6 @@ class BarItems(pg.GraphicsObject): p.setPen(self.bars_pen) p.drawPath(self.path) - # TODO: if we can ever make pixmaps work... - # p.drawPixmap(0, 0, self.picture) - # self._pmi.setPixmap(self.picture) - # print(self.scene()) - - # profiler('bars redraw:') - # @timeit def boundingRect(self): # TODO: can we do rect caching to make this faster From 3e16840566cabf5172f4fc6f381b2bbb84131fe8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 25 Nov 2020 14:55:18 -0500 Subject: [PATCH 11/28] Attempt to add numba typing and use `QGraphicsPathItem` Failed at using either. Quirks in numba's typing require specifying readonly arrays by composing types manually. The graphics item path thing, while it does take less time to write on bar appends, seems to be slower in general in calculating the ``.boundingRect()`` value. Likely we'll just add manual max/min tracking on array updates like ``pg.PlotCurveItem`` to squeeze some final juices on this. --- piker/ui/_graphics.py | 65 ++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py index 8ea108ce..e1dbdaf2 100644 --- a/piker/ui/_graphics.py +++ b/piker/ui/_graphics.py @@ -22,11 +22,13 @@ from typing import List, Optional, Tuple import numpy as np import pyqtgraph as pg -from numba import jit, float64, int64 +from numba import jit, float64, int64 # , optional +# from numba import types as ntypes from PyQt5 import QtCore, QtGui from PyQt5.QtCore import QLineF, QPointF from .._profile import timeit +# from ..data._source import numba_ohlc_dtype from ._style import ( _xaxis_at, hcolor, @@ -375,18 +377,20 @@ def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]: # @timeit @jit( - # float64[:]( - # float64[:], + # TODO: for now need to construct this manually for readonly arrays, see + # https://github.com/numba/numba/issues/4511 + # ntypes.Tuple((float64[:], float64[:], float64[:]))( + # numba_ohlc_dtype[::1], # contiguous + # int64, # optional(float64), - # optional(int64) # ), nopython=True, nogil=True ) def path_arrays_from_ohlc( data: np.ndarray, - w: float64, - start: int64 = int64(0), + start: int64, + bar_gap: float64 = 0.43, ) -> np.ndarray: """Generate an array of lines objects from input ohlc data. @@ -398,16 +402,7 @@ def path_arrays_from_ohlc( shape=size, dtype=float64, ) - y = np.zeros( - # data, - shape=size, - dtype=float64, - ) - c = np.zeros( - # data, - shape=size, - dtype=float64, - ) + y, c = x.copy(), x.copy() # TODO: report bug for assert @ # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 @@ -426,14 +421,14 @@ def path_arrays_from_ohlc( istart = i * 6 istop = istart + 6 - # write points for x, y, and connections + # x,y detail the 6 points which connect all vertexes of a ohlc bar x[istart:istop] = ( - index - w, + index - bar_gap, index, index, index, index, - index + w, + index + bar_gap, ) y[istart:istop] = ( open, @@ -443,6 +438,10 @@ def path_arrays_from_ohlc( close, close, ) + + # specifies that the first edge is never connected to the + # prior bars last edge thus providing a small "gap"/"space" + # between bars determined by ``bar_gap``. c[istart:istop] = (0, 1, 1, 1, 1, 1) return x, y, c @@ -451,11 +450,11 @@ def path_arrays_from_ohlc( @timeit def gen_qpath( data, - w, start, + w, ) -> QtGui.QPainterPath: - x, y, c = path_arrays_from_ohlc(data, w, start=start) + x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w) return pg.functions.arrayToQPath(x, y, connect=c) @@ -481,12 +480,15 @@ class BarItems(pg.GraphicsObject): self.last_bar = QtGui.QPicture() self.history = QtGui.QPicture() + self.path = QtGui.QPainterPath() + self._h_path = QtGui.QGraphicsPathItem(self.path) + self._pi = plotitem # XXX: not sure this actually needs to be an array other # then for the old tina mode calcs for up/down bars below? # lines container - self.lines = _mk_lines_array([], 50e3, 6) + # self.lines = _mk_lines_array([], 50e3, 6) # TODO: don't render the full backing array each time # self._path_data = None @@ -505,7 +507,7 @@ class BarItems(pg.GraphicsObject): This routine is usually only called to draw the initial history. """ - self.path = gen_qpath(data, self.w, start=start) + self.path = gen_qpath(data, start, self.w) # save graphics for later reference and keep track # of current internal "last index" @@ -533,7 +535,13 @@ class BarItems(pg.GraphicsObject): p.drawLines(*tuple(filter(bool, self._last_bar_lines))) p.end() + @timeit def draw_history(self) -> None: + # TODO: avoid having to use a ```QPicture` to calc the + # ``.boundingRect()``, use ``QGraphicsPathItem`` instead? + # https://doc.qt.io/qt-5/qgraphicspathitem.html + # self._h_path.setPath(self.path) + p = QtGui.QPainter(self.history) p.setPen(self.bars_pen) p.drawPath(self.path) @@ -571,11 +579,10 @@ class BarItems(pg.GraphicsObject): # generate new graphics to match provided array # path appending logic: # we need to get the previous "current bar(s)" for the time step - # and convert it to a path to append to the historical set + # and convert it to a sub-path to append to the historical set new_history_istart = length - 2 to_history = array[new_history_istart:new_history_istart + extra] - # generate a new sub-path for this now-ready-for-history bar set - new_history_qpath = gen_qpath(to_history, self.w, 0) + new_history_qpath = gen_qpath(to_history, 0, self.w) # move to position of placement for the next bar in history # and append new sub-path @@ -646,6 +653,9 @@ class BarItems(pg.GraphicsObject): p.drawPicture(0, 0, self.last_bar) p.setPen(self.bars_pen) + + # TODO: does it matter which we use? + # p.drawPath(self._h_path.path()) p.drawPath(self.path) # @timeit @@ -664,10 +674,13 @@ class BarItems(pg.GraphicsObject): # compute aggregate bounding rectangle lb = self.last_bar.boundingRect() hb = self.history.boundingRect() + # hb = self._h_path.boundingRect() + return QtCore.QRectF( # top left QtCore.QPointF(hb.topLeft()), # total size + # QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size()) QtCore.QSizeF(lb.size() + hb.size()) ) From 247b5fa2ec65f315dfcf994116644568105eb5ee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Nov 2020 10:11:59 -0500 Subject: [PATCH 12/28] Tidy up doc string --- piker/data/_sharedmem.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 55f3675e..7f06bf78 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -342,21 +342,20 @@ def maybe_open_shm_array( dtype: Optional[np.dtype] = None, **kwargs, ) -> Tuple[ShmArray, bool]: - """Attempt to attach to a shared memory block by a - "key" determined by the users overall "system" + """Attempt to attach to a shared memory block using a "key" lookup + to registered blocks in the users overall "system" registryt (presumes you don't have the block's explicit token). - This function is meant to solve the problem of - discovering whether a shared array token has been - allocated or discovered by the actor running in - **this** process. Systems where multiple actors - may seek to access a common block can use this - function to attempt to acquire a token as discovered - by the actors who have previously stored a - "key" -> ``_Token`` map in an actor local variable. + This function is meant to solve the problem of discovering whether + a shared array token has been allocated or discovered by the actor + running in **this** process. Systems where multiple actors may seek + to access a common block can use this function to attempt to acquire + a token as discovered by the actors who have previously stored + a "key" -> ``_Token`` map in an actor local (aka python global) + variable. - If you know the explicit ``_Token`` for your memory - instead use ``attach_shm_array``. + If you know the explicit ``_Token`` for your memory segment instead + use ``attach_shm_array``. """ try: # see if we already know this key From 1f8f2eb8b3f565bcba03a4e40730b40ada7b8b65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Dec 2020 13:07:03 -0500 Subject: [PATCH 13/28] Font size tweaks for low dpi --- piker/ui/_style.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/piker/ui/_style.py b/piker/ui/_style.py index 23a3ac09..3764750a 100644 --- a/piker/ui/_style.py +++ b/piker/ui/_style.py @@ -18,6 +18,7 @@ Qt UI styling. """ from typing import Optional +import math import pyqtgraph as pg from PyQt5 import QtCore, QtGui @@ -27,10 +28,9 @@ from ..log import get_logger log = get_logger(__name__) -# chart-wide font -# font size 6px / 53 dpi (3x scaled down on 4k hidpi) -_default_font_inches_we_like = 6 / 53 # px / (px / inch) = inch -_down_2_font_inches_we_like = 4 / 53 +# chart-wide fonts specified in inches +_default_font_inches_we_like = 0.0666 +_down_2_font_inches_we_like = 6 / 96 class DpiAwareFont: @@ -66,8 +66,12 @@ class DpiAwareFont: listed in the script in ``snippets/qt_screen_info.py``. """ - dpi = screen.physicalDotsPerInch() - font_size = round(self._iwl * dpi) + # take the max since scaling can make things ugly in some cases + pdpi = screen.physicalDotsPerInch() + ldpi = screen.logicalDotsPerInch() + dpi = max(pdpi, ldpi) + + font_size = math.floor(self._iwl * dpi) log.info( f"\nscreen:{screen.name()} with DPI: {dpi}" f"\nbest font size is {font_size}\n" From 971068550822cc25bad31b0d71b1006c07a9c1e3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Dec 2020 08:53:09 -0500 Subject: [PATCH 14/28] Left align yaxis label --- piker/ui/_axes.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/piker/ui/_axes.py b/piker/ui/_axes.py index 193d5b62..c5d8f2ea 100644 --- a/piker/ui/_axes.py +++ b/piker/ui/_axes.py @@ -38,7 +38,7 @@ class Axis(pg.AxisItem): def __init__( self, linked_charts, - typical_max_str: str = '100 000.00', + typical_max_str: str = '100 000.000', min_tick: int = 2, **kwargs ) -> None: @@ -116,7 +116,7 @@ class DynamicDateAxis(Axis): indexes: List[int], ) -> List[str]: - bars = self.linked_charts.chart._array + bars = self.linked_charts.chart._ohlc bars_len = len(bars) times = bars['time'] @@ -267,8 +267,8 @@ class YAxisLabel(AxisLabel): _h_margin = 2 text_flags = ( - # QtCore.Qt.AlignLeft - QtCore.Qt.AlignHCenter + QtCore.Qt.AlignLeft + # QtCore.Qt.AlignHCenter | QtCore.Qt.AlignVCenter | QtCore.Qt.TextDontClip ) @@ -285,7 +285,7 @@ class YAxisLabel(AxisLabel): ) -> None: # this is read inside ``.paint()`` - self.label_str = '{value:,.{digits}f}'.format( + self.label_str = ' {value:,.{digits}f}'.format( digits=self.digits, value=value).replace(',', ' ') br = self.boundingRect() From 02b7d6cd19b630c99a7e43505e64136cbe72a80c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Dec 2020 10:30:31 -0500 Subject: [PATCH 15/28] Add prepend support to shm system --- piker/data/_sharedmem.py | 219 +++++++++++++++++++++++++++------------ 1 file changed, 150 insertions(+), 69 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 7f06bf78..fb442d7c 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -17,11 +17,10 @@ """ NumPy compatible shared memory buffers for real-time FSP. """ -from typing import List from dataclasses import dataclass, asdict from sys import byteorder -from typing import Tuple, Optional -from multiprocessing import shared_memory +from typing import List, Tuple, Optional +from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing import resource_tracker as mantracker from _posixshmem import shm_unlink @@ -29,7 +28,7 @@ import tractor import numpy as np from ..log import get_logger -from ._source import base_ohlc_dtype +from ._source import base_ohlc_dtype, base_iohlc_dtype log = get_logger(__name__) @@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd class SharedInt: + """Wrapper around a single entry shared memory array which + holds an ``int`` value used as an index counter. + + """ def __init__( self, - token: str, - create: bool = False, + shm: SharedMemory, ) -> None: - # create a single entry array for storing an index counter - self._shm = shared_memory.SharedMemory( - name=token, - create=create, - size=4, # std int - ) + self._shm = shm @property def value(self) -> int: @@ -79,7 +76,7 @@ class SharedInt: self._shm.buf[:] = value.to_bytes(4, byteorder) def destroy(self) -> None: - if shared_memory._USE_POSIX: + if _USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. shm_unlink(self._shm.name) @@ -91,7 +88,8 @@ class _Token: which can be used to key a system wide post shm entry. """ shm_name: str # this servers as a "key" value - shm_counter_name: str + shm_first_index_name: str + shm_last_index_name: str dtype_descr: List[Tuple[str]] def __post_init__(self): @@ -130,27 +128,47 @@ def _make_token( """Create a serializable token that can be used to access a shared array. """ - dtype = base_ohlc_dtype if dtype is None else dtype + dtype = base_iohlc_dtype if dtype is None else dtype return _Token( key, - key + "_counter", + key + "_first", + key + "_last", np.dtype(dtype).descr ) class ShmArray: + """A shared memory ``numpy`` (compatible) array API. + + An underlying shared memory buffer is allocated based on + a user specified ``numpy.ndarray``. This fixed size array + can be read and written to by pushing data both onto the "front" + or "back" of a set index range. The indexes for the "first" and + "last" index are themselves stored in shared memory (accessed via + ``SharedInt`` interfaces) values such that multiple processes can + interact with the same array using a synchronized-index. + + """ def __init__( self, shmarr: np.ndarray, - counter: SharedInt, - shm: shared_memory.SharedMemory, - readonly: bool = True, + first: SharedInt, + last: SharedInt, + shm: SharedMemory, + # readonly: bool = True, ) -> None: self._array = shmarr - self._i = counter + + # indexes for first and last indices corresponding + # to fille data + self._first = first + self._last = last + self._len = len(shmarr) self._shm = shm - self._readonly = readonly + + # pushing data does not write the index (aka primary key) + self._write_fields = list(shmarr.dtype.fields.keys())[1:] # TODO: ringbuf api? @@ -158,24 +176,25 @@ class ShmArray: def _token(self) -> _Token: return _Token( self._shm.name, - self._i._shm.name, + self._first._shm.name, + self._last._shm.name, self._array.dtype.descr, ) @property def token(self) -> dict: - """Shared memory token that can be serialized - and used by another process to attach to this array. + """Shared memory token that can be serialized and used by + another process to attach to this array. """ return self._token.as_msg() @property def index(self) -> int: - return self._i.value % self._len + return self._last.value % self._len @property def array(self) -> np.ndarray: - return self._array[:self._i.value] + return self._array[self._first.value:self._last.value] def last( self, @@ -186,62 +205,90 @@ class ShmArray: def push( self, data: np.ndarray, + prepend: bool = False, ) -> int: """Ring buffer like "push" to append data - into the buffer and return updated index. + into the buffer and return updated "last" index. """ length = len(data) - # TODO: use .index for actual ring logic? - index = self._i.value + + if prepend: + index = self._first.value - length + else: + index = self._last.value + end = index + length + + fields = self._write_fields + try: - self._array[index:end] = data[:] - self._i.value = end + self._array[fields][index:end] = data[fields][:] + if prepend: + self._first.value = index + else: + self._last.value = end return end except ValueError as err: - # reraise with any field discrepancy - our_fields, their_fields = ( - set(self._array.dtype.fields), - set(data.dtype.fields), + # shoudl raise if diff detected + self.diff_err_fields(data) + + raise err + + def diff_err_fields( + self, + data: np.ndarray, + ) -> None: + # reraise with any field discrepancy + our_fields, their_fields = ( + set(self._array.dtype.fields), + set(data.dtype.fields), + ) + + only_in_ours = our_fields - their_fields + only_in_theirs = their_fields - our_fields + + if only_in_ours: + raise TypeError( + f"Input array is missing field(s): {only_in_ours}" + ) + elif only_in_theirs: + raise TypeError( + f"Input array has unknown field(s): {only_in_theirs}" ) - only_in_ours = our_fields - their_fields - only_in_theirs = their_fields - our_fields - - if only_in_ours: - raise TypeError( - f"Input array is missing field(s): {only_in_ours}" - ) - elif only_in_theirs: - raise TypeError( - f"Input array has unknown field(s): {only_in_theirs}" - ) - else: - raise err + def prepend( + self, + data: np.ndarray, + ) -> int: + end = self.push(data, prepend=True) + assert end def close(self) -> None: - self._i._shm.close() + self._first._shm.close() + self._last._shm.close() self._shm.close() def destroy(self) -> None: - if shared_memory._USE_POSIX: + if _USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. shm_unlink(self._shm.name) - self._i.destroy() + + self._first.destroy() + self._last.destroy() def flush(self) -> None: # TODO: flush to storage backend like markestore? ... -_lotsa_5s = int(5 * 60 * 60 * 10 / 5) - +# how much is probably dependent on lifestyle +_secs_in_day = int(60 * 60 * 12) +_default_size = 2 * _secs_in_day def open_shm_array( key: Optional[str] = None, - # approx number of 5s bars in a "day" x2 - size: int = _lotsa_5s, + size: int = _default_size, dtype: Optional[np.dtype] = None, readonly: bool = False, ) -> ShmArray: @@ -253,7 +300,9 @@ def open_shm_array( # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype) - shm = shared_memory.SharedMemory( + a['index'] = np.arange(len(a)) + + shm = SharedMemory( name=key, create=True, size=a.nbytes @@ -267,17 +316,30 @@ def open_shm_array( dtype=dtype ) - counter = SharedInt( - token=token.shm_counter_name, - create=True, + # create single entry arrays for storing an first and last indices + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=True, + size=4, # std int + ) ) - counter.value = 0 + + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=True, + size=4, # std int + ) + ) + + last.value = first.value = int(_secs_in_day) shmarr = ShmArray( array, - counter, + first, + last, shm, - readonly=readonly, ) assert shmarr._token == token @@ -293,18 +355,23 @@ def open_shm_array( def attach_shm_array( token: Tuple[str, str, Tuple[str, str]], - size: int = _lotsa_5s, + size: int = _default_size, readonly: bool = True, ) -> ShmArray: - """Load and attach to an existing shared memory array previously + """Attach to an existing shared memory array previously created by another process using ``open_shared_array``. + + No new shared mem is allocated but wrapper types for read/write + access are constructed. """ token = _Token.from_msg(token) key = token.shm_name + if key in _known_tokens: assert _known_tokens[key] == token, "WTF" - shm = shared_memory.SharedMemory(name=key) + # attach to array buffer and view as per dtype + shm = SharedMemory(name=key) shmarr = np.ndarray( (size,), dtype=token.dtype_descr, @@ -312,15 +379,29 @@ def attach_shm_array( ) shmarr.setflags(write=int(not readonly)) - counter = SharedInt(token=token.shm_counter_name) + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=False, + size=4, # std int + ), + ) + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=False, + size=4, # std int + ), + ) + # make sure we can read - counter.value + first.value sha = ShmArray( shmarr, - counter, + first, + last, shm, - readonly=readonly, ) # read test sha.array From 2568a2d2e974e340b55021418b926d453c7fa41e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Dec 2020 14:30:40 -0500 Subject: [PATCH 16/28] First draft, make graphics work on shm primary index This is a bit hacky (what with array indexing semantics being relative to the primary index's "start" value but it works. We'll likely want to somehow wrap this index finagling into an API soon. --- piker/ui/_graphics.py | 238 ++++++++++++++++++++++++++++++++---------- 1 file changed, 182 insertions(+), 56 deletions(-) diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py index e1dbdaf2..83cbd5b5 100644 --- a/piker/ui/_graphics.py +++ b/piker/ui/_graphics.py @@ -33,14 +33,15 @@ from ._style import ( _xaxis_at, hcolor, _font, + _down_2_font_inches_we_like, ) from ._axes import YAxisLabel, XAxisLabel, YSticky # XXX: these settings seem to result in really decent mouse scroll # latency (in terms of perceived lag in cross hair) so really be sure -# there's an improvement if you want to change it. -_mouse_rate_limit = 60 # calc current screen refresh rate? +# there's an improvement if you want to change it! +_mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate? _debounce_delay = 1 / 2e3 _ch_label_opac = 1 @@ -53,6 +54,7 @@ class LineDot(pg.CurvePoint): self, curve: pg.PlotCurveItem, index: int, + plot: 'ChartPlotWidget', pos=None, size: int = 2, # in pxs color: str = 'default_light', @@ -64,6 +66,7 @@ class LineDot(pg.CurvePoint): pos=pos, rotate=False, ) + self._plot = plot # TODO: get pen from curve if not defined? cdefault = hcolor(color) @@ -83,6 +86,31 @@ class LineDot(pg.CurvePoint): # keep a static size self.setFlag(self.ItemIgnoresTransformations) + def event( + self, + ev: QtCore.QEvent, + ) -> None: + # print((ev, type(ev))) + if not isinstance(ev, QtCore.QDynamicPropertyChangeEvent) or self.curve() is None: + return False + + # if ev.propertyName() == 'index': + # print(ev) + # # self.setProperty + + (x, y) = self.curve().getData() + index = self.property('index') + # first = self._plot._ohlc[0]['index'] + # first = x[0] + # i = index - first + i = index - x[0] + if i > 0: + newPos = (index, y[i]) + QtGui.QGraphicsItem.setPos(self, *newPos) + return True + + return False + _corner_anchors = { 'top': 0, @@ -94,8 +122,10 @@ _corner_anchors = { _corner_margins = { ('top', 'left'): (-4, -5), ('top', 'right'): (4, -5), - ('bottom', 'left'): (-4, 5), - ('bottom', 'right'): (4, 5), + + # TODO: pretty sure y here needs to be 2x font height + ('bottom', 'left'): (-4, 14), + ('bottom', 'right'): (4, 14), } @@ -132,15 +162,19 @@ class ContentsLabel(pg.LabelItem): array: np.ndarray, ) -> None: # this being "html" is the dumbest shit :eyeroll: + first = array[0]['index'] + self.setText( "i:{index}
" "O:{}
" "H:{}
" "L:{}
" "C:{}
" - "V:{}".format( - # *self._array[index].item()[2:8], - *array[index].item()[2:8], + "V:{}
" + "wap:{}".format( + *array[index - first][ + ['open', 'high', 'low', 'close', 'volume', 'bar_wap'] + ], name=name, index=index, ) @@ -152,8 +186,9 @@ class ContentsLabel(pg.LabelItem): index: int, array: np.ndarray, ) -> None: - if index < len(array): - data = array[index][name] + first = array[0]['index'] + if index < array[-1]['index'] and index > first: + data = array[index - first][name] self.setText(f"{name}: {data:.2f}") @@ -250,7 +285,7 @@ class CrossHair(pg.GraphicsObject): ) -> LineDot: # if this plot contains curves add line dot "cursors" to denote # the current sample under the mouse - cursor = LineDot(curve, index=len(plot._ohlc)) + cursor = LineDot(curve, index=plot._ohlc[-1]['index'], plot=plot) plot.addItem(cursor) self.graphics[plot].setdefault('cursors', []).append(cursor) return cursor @@ -312,8 +347,9 @@ class CrossHair(pg.GraphicsObject): plot.update_contents_labels(ix) # update all subscribed curve dots + # first = plot._ohlc[0]['index'] for cursor in opts.get('cursors', ()): - cursor.setIndex(ix) + cursor.setIndex(ix) # - first) # update the label on the bottom of the crosshair self.xaxis_label.update_label( @@ -375,7 +411,7 @@ def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]: return [hl, o, c] -# @timeit +@timeit @jit( # TODO: for now need to construct this manually for readonly arrays, see # https://github.com/numba/numba/issues/4511 @@ -450,7 +486,7 @@ def path_arrays_from_ohlc( @timeit def gen_qpath( data, - start, + start, # XXX: do we need this? w, ) -> QtGui.QPainterPath: @@ -478,13 +514,16 @@ class BarItems(pg.GraphicsObject): super().__init__() self.last_bar = QtGui.QPicture() - self.history = QtGui.QPicture() + # self.history = QtGui.QPicture() self.path = QtGui.QPainterPath() - self._h_path = QtGui.QGraphicsPathItem(self.path) + # self._h_path = QtGui.QGraphicsPathItem(self.path) self._pi = plotitem + self._xrange: Tuple[int, int] + self._yrange: Tuple[float, float] + # XXX: not sure this actually needs to be an array other # then for the old tina mode calcs for up/down bars below? # lines container @@ -495,14 +534,15 @@ class BarItems(pg.GraphicsObject): self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None # track the current length of drawable lines within the larger array - self.index: int = 0 + self.start_index: int = 0 + self.stop_index: int = 0 - # @timeit + @timeit def draw_from_data( self, data: np.ndarray, start: int = 0, - ): + ) -> QtGui.QPainterPath: """Draw OHLC datum graphics from a ``np.ndarray``. This routine is usually only called to draw the initial history. @@ -511,19 +551,37 @@ class BarItems(pg.GraphicsObject): # save graphics for later reference and keep track # of current internal "last index" - self.index = len(data) + # self.start_index = len(data) + index = data['index'] + self._xrange = (index[0], index[-1]) + self._yrange = ( + np.nanmax(data['high']), + np.nanmin(data['low']), + ) # up to last to avoid double draw of last bar self._last_bar_lines = lines_from_ohlc(data[-1], self.w) # create pics - self.draw_history() + # self.draw_history() self.draw_last_bar() # trigger render # https://doc.qt.io/qt-5/qgraphicsitem.html#update self.update() + return self.path + + # def update_ranges( + # self, + # xmn: int, + # xmx: int, + # ymn: float, + # ymx: float, + # ) -> None: + # ... + + def draw_last_bar(self) -> None: """Currently this draws lines to a cached ``QPicture`` which is supposed to speed things up on ``.paint()`` calls (which @@ -535,17 +593,17 @@ class BarItems(pg.GraphicsObject): p.drawLines(*tuple(filter(bool, self._last_bar_lines))) p.end() - @timeit - def draw_history(self) -> None: - # TODO: avoid having to use a ```QPicture` to calc the - # ``.boundingRect()``, use ``QGraphicsPathItem`` instead? - # https://doc.qt.io/qt-5/qgraphicspathitem.html - # self._h_path.setPath(self.path) + # @timeit + # def draw_history(self) -> None: + # # TODO: avoid having to use a ```QPicture` to calc the + # # ``.boundingRect()``, use ``QGraphicsPathItem`` instead? + # # https://doc.qt.io/qt-5/qgraphicspathitem.html + # # self._h_path.setPath(self.path) - p = QtGui.QPainter(self.history) - p.setPen(self.bars_pen) - p.drawPath(self.path) - p.end() + # p = QtGui.QPainter(self.history) + # p.setPen(self.bars_pen) + # p.drawPath(self.path) + # p.end() @timeit def update_from_array( @@ -564,14 +622,42 @@ class BarItems(pg.GraphicsObject): This routine should be made (transitively) as fast as possible. """ - index = self.index - length = len(array) - extra = length - index + # index = self.start_index + istart, istop = self._xrange + + index = array['index'] + first_index, last_index = index[0], index[-1] + + # length = len(array) + prepend_length = istart - first_index + append_length = last_index - istop # TODO: allow mapping only a range of lines thus # only drawing as many bars as exactly specified. - if extra > 0: + if prepend_length: + # breakpoint() + # new history was added and we need to render a new path + new_bars = array[:prepend_length] + prepend_path = gen_qpath(new_bars, 0, self.w) + + # XXX: SOMETHING IS FISHY HERE what with the old_path + # y value not matching the first value from + # array[prepend_length + 1] ??? + + # update path + old_path = self.path + self.path = prepend_path + # self.path.moveTo(float(index - self.w), float(new_bars[0]['open'])) + # self.path.moveTo( + # float(istart - self.w), + # # float(array[prepend_length + 1]['open']) + # float(array[prepend_length]['open']) + # ) + self.path.addPath(old_path) + # self.draw_history() + + if append_length: # generate new lines objects for updatable "current bar" self._last_bar_lines = lines_from_ohlc(array[-1], self.w) self.draw_last_bar() @@ -580,29 +666,61 @@ class BarItems(pg.GraphicsObject): # path appending logic: # we need to get the previous "current bar(s)" for the time step # and convert it to a sub-path to append to the historical set - new_history_istart = length - 2 - to_history = array[new_history_istart:new_history_istart + extra] - new_history_qpath = gen_qpath(to_history, 0, self.w) + # new_bars = array[istop - 1:istop + append_length - 1] + new_bars = array[-append_length - 1:-1] + append_path = gen_qpath(new_bars, 0, self.w) + self.path.moveTo(float(istop - self.w), float(new_bars[0]['open'])) + self.path.addPath(append_path) - # move to position of placement for the next bar in history - # and append new sub-path - new_bars = array[index:index + extra] - self.path.moveTo(float(index - self.w), float(new_bars[0]['open'])) - self.path.addPath(new_history_qpath) + # self.draw_history() - self.index += extra + self._xrange = first_index, last_index - self.draw_history() + # if extra > 0: + # index = array['index'] + # first, last = index[0], indext[-1] - if just_history: - self.update() - return + # # if first < self.start_index: + # # length = self.start_index - first + # # prepend_path = gen_qpath(array[:sef: + + # # generate new lines objects for updatable "current bar" + # self._last_bar_lines = lines_from_ohlc(array[-1], self.w) + # self.draw_last_bar() + + # # generate new graphics to match provided array + # # path appending logic: + # # we need to get the previous "current bar(s)" for the time step + # # and convert it to a sub-path to append to the historical set + # new_history_istart = length - 2 + + # to_history = array[new_history_istart:new_history_istart + extra] + + # new_history_qpath = gen_qpath(to_history, 0, self.w) + + # # move to position of placement for the next bar in history + # # and append new sub-path + # new_bars = array[index:index + extra] + + # # x, y coordinates for start of next open/left arm + # self.path.moveTo(float(index - self.w), float(new_bars[0]['open'])) + + # self.path.addPath(new_history_qpath) + + # self.start_index += extra + + # self.draw_history() + + if just_history: + self.update() + return # last bar update i, o, h, l, last, v = array[-1][ ['index', 'open', 'high', 'low', 'close', 'volume'] ] - assert i == self.index - 1 + # assert i == self.start_index - 1 + assert i == last_index body, larm, rarm = self._last_bar_lines # XXX: is there a faster way to modify this? @@ -660,12 +778,14 @@ class BarItems(pg.GraphicsObject): # @timeit def boundingRect(self): - # TODO: can we do rect caching to make this faster + # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect + + # TODO: Can we do rect caching to make this faster # like `pg.PlotCurveItem` does? In theory it's just # computing max/min stuff again like we do in the udpate loop - # anyway. + # anyway. Not really sure it's necessary since profiling already + # shows this method is faf. - # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect # boundingRect _must_ indicate the entire area that will be # drawn on or else we will get artifacts and possibly crashing. # (in this case, QPicture does all the work of computing the @@ -673,15 +793,15 @@ class BarItems(pg.GraphicsObject): # compute aggregate bounding rectangle lb = self.last_bar.boundingRect() - hb = self.history.boundingRect() + hb = self.path.boundingRect() # hb = self._h_path.boundingRect() return QtCore.QRectF( # top left QtCore.QPointF(hb.topLeft()), # total size - # QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size()) - QtCore.QSizeF(lb.size() + hb.size()) + QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size()) + # QtCore.QSizeF(lb.size() + hb.size()) ) @@ -834,7 +954,7 @@ class L1Labels: chart: 'ChartPlotWidget', # noqa digits: int = 2, size_digits: int = 0, - font_size_inches: float = 4 / 53., + font_size_inches: float = _down_2_font_inches_we_like, ) -> None: self.chart = chart @@ -888,7 +1008,9 @@ def level_line( digits: int = 1, # size 4 font on 4k screen scaled down, so small-ish. - font_size_inches: float = 4 / 53., + font_size_inches: float = _down_2_font_inches_we_like, + + show_label: bool = True, **linelabelkwargs ) -> LevelLine: @@ -908,6 +1030,7 @@ def level_line( **linelabelkwargs ) label.update_from_data(0, level) + # TODO: can we somehow figure out a max value from the parent axis? label._size_br_from_str(label.label_str) @@ -923,4 +1046,7 @@ def level_line( chart.plotItem.addItem(line) + if not show_label: + label.hide() + return line From 6d50ad75a7f59bd05a2ebbeb63533b2a3f064cdc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Dec 2020 15:44:20 -0500 Subject: [PATCH 17/28] Ensure right bar x index is an int --- piker/ui/_interaction.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index fbd5694b..464fd80e 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -245,7 +245,7 @@ class ChartView(ViewBox): log.debug("Max zoom bruh...") return - if ev.delta() < 0 and vl >= len(self.linked_charts._array) + 666: + if ev.delta() < 0 and vl >= len(self.linked_charts.chart._ohlc) + 666: log.debug("Min zoom bruh...") return @@ -268,9 +268,9 @@ class ChartView(ViewBox): # ).map(furthest_right_coord) # ) - # This seems like the most "intuitive option, a hybrdid of + # This seems like the most "intuitive option, a hybrid of # tws and tv styles - last_bar = pg.Point(rbar) + last_bar = pg.Point(int(rbar)) self._resetTarget() self.scaleBy(s, last_bar) From fda9fcbc55e59e5f357e9bad7cd62014a3ad880b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Dec 2020 15:46:46 -0500 Subject: [PATCH 18/28] Add historical backfilling to ib backend --- piker/brokers/ib.py | 186 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 160 insertions(+), 26 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cf509bfb..9731e37f 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ -from contextlib import asynccontextmanager, contextmanager +from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial +from datetime import datetime from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable import asyncio import logging @@ -32,6 +33,7 @@ import itertools import time from async_generator import aclosing +from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails from ib_insync.ticker import Ticker import ib_insync as ibis @@ -45,7 +47,7 @@ from ..data import ( maybe_spawn_brokerd, iterticks, attach_shm_array, - get_shm_token, + # get_shm_token, subscribe_ohlc_for_increment, ) from ..data._source import from_df @@ -86,6 +88,8 @@ _time_frames = { 'Y': 'OneYear', } +_show_wap_in_history = False + # overrides to sidestep pretty questionable design decisions in # ``ib_insync``: @@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = { 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } +_enters = 0 + class Client: """IB wrapped for our broker backend API. @@ -142,32 +148,54 @@ class Client: self.ib = ib self.ib.RaiseRequestErrors = True + # NOTE: the ib.client here is "throttled" to 45 rps by default + async def bars( self, symbol: str, # EST in ISO 8601 format is required... below is EPOCH - start_date: str = "1970-01-01T00:00:00.000000-05:00", - time_frame: str = '1m', - count: int = int(2e3), # <- max allowed per query - is_paid_feed: bool = False, + start_dt: str = "1970-01-01T00:00:00.000000-05:00", + end_dt: str = "", + + sample_period_s: str = 1, # ohlc sample period + period_count: int = int(2e3), # <- max per 1s sample query + + is_paid_feed: bool = False, # placeholder ) -> List[Dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. """ bars_kwargs = {'whatToShow': 'TRADES'} + global _enters + print(f'ENTER BARS {_enters}') + _enters += 1 + contract = await self.find_contract(symbol) bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, - endDateTime='', - # durationStr='60 S', - # durationStr='1 D', + endDateTime=end_dt, + + # time history length values format: + # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` + + # OHLC sampling values: + # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins, + # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins, + # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M + # barSizeSetting='1 secs', + + # durationStr='{count} S'.format(count=15000 * 5), + # durationStr='{count} D'.format(count=1), + # barSizeSetting='5 secs', + + durationStr='{count} S'.format(count=period_count), + barSizeSetting='1 secs', + + # barSizeSetting='1 min', - # time length calcs - durationStr='{count} S'.format(count=5000 * 5), - barSizeSetting='5 secs', # always use extended hours useRTH=False, @@ -181,9 +209,13 @@ class Client: # TODO: raise underlying error here raise ValueError(f"No bars retreived for {symbol}?") + # TODO: rewrite this faster with ``numba`` # convert to pandas dataframe: df = ibis.util.df(bars) - return from_df(df) + return bars, from_df(df) + + def onError(self, reqId, errorCode, errorString, contract) -> None: + breakpoint() async def search_stocks( self, @@ -237,6 +269,8 @@ class Client: """Get an unqualifed contract for the current "continous" future. """ contcon = ibis.ContFuture(symbol, exchange=exchange) + + # it's the "front" contract returned here frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] return ibis.Future(conId=frontcon.conId) @@ -279,10 +313,10 @@ class Client: if exch in ('PURE', 'TSE'): # non-yankee currency = 'CAD' - if exch in ('PURE',): + if exch in ('PURE', 'TSE'): # stupid ib... + primaryExchange = exch exch = 'SMART' - primaryExchange = 'PURE' con = ibis.Stock( symbol=sym, @@ -293,10 +327,27 @@ class Client: try: exch = 'SMART' if not exch else exch contract = (await self.ib.qualifyContractsAsync(con))[0] + + head = await self.get_head_time(contract) + print(head) except IndexError: raise ValueError(f"No contract could be found {con}") return contract + async def get_head_time( + self, + contract: Contract, + ) -> datetime: + """Return the first datetime stamp for ``contract``. + + """ + return await self.ib.reqHeadTimeStampAsync( + contract, + whatToShow='TRADES', + useRTH=False, + formatDate=2, # timezone aware UTC datetime + ) + async def stream_ticker( self, symbol: str, @@ -309,7 +360,13 @@ class Client: contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) + # define a simple queue push routine that streams quote packets + # to trio over the ``to_trio`` memory channel. + def push(t): + """Push quotes to trio task. + + """ # log.debug(t) try: to_trio.send_nowait(t) @@ -346,9 +403,17 @@ async def _aio_get_client( """ # first check cache for existing client + # breakpoint() try: - yield _client_cache[(host, port)] - except KeyError: + if port: + client = _client_cache[(host, port)] + else: + # grab first cached client + client = list(_client_cache.values())[0] + + yield client + + except (KeyError, IndexError): # TODO: in case the arbiter has no record # of existing brokerd we need to broadcast for one. @@ -359,9 +424,11 @@ async def _aio_get_client( ib = NonShittyIB() ports = _try_ports if port is None else [port] + _err = None for port in ports: try: + log.info(f"Connecting to the EYEBEE on port {port}!") await ib.connectAsync(host, port, clientId=client_id) break except ConnectionRefusedError as ce: @@ -373,6 +440,7 @@ async def _aio_get_client( try: client = Client(ib) _client_cache[(host, port)] = client + log.debug(f"Caching client for {(host, port)}") yield client except BaseException: ib.disconnect() @@ -385,7 +453,6 @@ async def _aio_run_client_method( from_trio=None, **kwargs, ) -> None: - log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!") async with _aio_get_client() as client: async_meth = getattr(client, meth) @@ -402,6 +469,9 @@ async def _trio_run_client_method( method: str, **kwargs, ) -> None: + """Asyncio entry point to run tasks against the ``ib_insync`` api. + + """ ca = tractor.current_actor() assert ca.is_infected_aio() @@ -530,18 +600,60 @@ def normalize( _local_buffer_writers = {} -@contextmanager -def activate_writer(key: str): +@asynccontextmanager +async def activate_writer(key: str) -> (bool, trio.Nursery): try: writer_already_exists = _local_buffer_writers.get(key, False) + if not writer_already_exists: _local_buffer_writers[key] = True - yield writer_already_exists + async with trio.open_nursery() as n: + yield writer_already_exists, n + else: + yield writer_already_exists, None finally: _local_buffer_writers.pop(key, None) +async def fill_bars( + first_bars, + shm, + count: int = 21, +) -> None: + """Fill historical bars into shared mem / storage afap. + + TODO: avoid pacing constraints: + https://github.com/pikers/piker/issues/128 + + """ + next_dt = first_bars[0].date + + i = 0 + while i < count: + + try: + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol='.'.join( + (first_bars.contract.symbol, first_bars.contract.exchange) + ), + end_dt=next_dt, + + ) + shm.push(bars_array, prepend=True) + i += 1 + next_dt = bars[0].date + + except RequestError as err: + # TODO: retreive underlying ``ib_insync`` error~~ + if err.code == 162: + log.exception( + "Data query rate reached: Press `ctrl-alt-f` in TWS") + + await tractor.breakpoint() + + # TODO: figure out how to share quote feeds sanely despite # the wacky ``ib_insync`` api. # @tractor.msg.pub @@ -575,7 +687,9 @@ async def stream_quotes( # check if a writer already is alive in a streaming task, # otherwise start one and mark it as now existing - with activate_writer(shm_token['shm_name']) as writer_already_exists: + async with activate_writer( + shm_token['shm_name'] + ) as (writer_already_exists, ln): # maybe load historical ohlcv in to shared mem # check if shm has already been created by previous @@ -588,18 +702,29 @@ async def stream_quotes( # we are the buffer writer readonly=False, ) - bars = await _trio_run_client_method( + + # async def retrieve_and_push(): + start = time.time() + + bars, bars_array = await _trio_run_client_method( method='bars', symbol=sym, + ) - if bars is None: + log.info(f"bars_array request: {time.time() - start}") + + if bars_array is None: raise SymbolNotFound(sym) # write historical data to buffer - shm.push(bars) + shm.push(bars_array) shm_token = shm.token + # TODO: generalize this for other brokers + # start bar filler task in bg + ln.start_soon(fill_bars, bars, shm) + times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] subscribe_ohlc_for_increment(shm, delay_s) @@ -656,6 +781,7 @@ async def stream_quotes( # real-time stream async for ticker in stream: + # print(ticker.vwap) quote = normalize( ticker, calc_price=calc_price @@ -674,6 +800,8 @@ async def stream_quotes( for tick in iterticks(quote, types=('trade', 'utrade',)): last = tick['price'] + # print(f"{quote['symbol']}: {tick}") + # update last entry # benchmarked in the 4-5 us range o, high, low, v = shm.array[-1][ @@ -687,7 +815,13 @@ async def stream_quotes( # is also the close/last trade price o = last - shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = ( + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( o, max(high, last), min(low, last), From 599b5276b4428059c3cc78348a539b95047074ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Dec 2020 17:14:03 -0500 Subject: [PATCH 19/28] Port data apis to not touch primary index --- piker/data/__init__.py | 5 +++-- piker/data/_buffer.py | 11 +++++----- piker/data/_normalize.py | 2 +- piker/data/_source.py | 46 ++++++++++++++++++++++++++++------------ 4 files changed, 42 insertions(+), 22 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index cae1347c..fa26801c 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -42,7 +42,7 @@ from ._sharedmem import ( ShmArray, get_shm_token, ) -from ._source import base_ohlc_dtype +from ._source import base_iohlc_dtype from ._buffer import ( increment_ohlc_buffer, subscribe_ohlc_for_increment @@ -139,6 +139,7 @@ class Feed: name: str stream: AsyncIterator[Dict[str, Any]] shm: ShmArray + # ticks: ShmArray _broker_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None @@ -188,7 +189,7 @@ async def open_feed( key=sym_to_shm_key(name, symbols[0]), # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), # we expect the sub-actor to write readonly=True, diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index 64460476..fed6b965 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -91,19 +91,20 @@ async def increment_ohlc_buffer( # append new entry to buffer thus "incrementing" the bar array = shm.array - last = array[-1:].copy() - (index, t, close) = last[0][['index', 'time', 'close']] + last = array[-1:][shm._write_fields].copy() + # (index, t, close) = last[0][['index', 'time', 'close']] + (t, close) = last[0][['time', 'close']] # this copies non-std fields (eg. vwap) from the last datum last[ - ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (index + 1, t + delay_s, 0, close, close, close, close) + ['time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (t + delay_s, 0, close, close, close, close) # write to the buffer shm.push(last) # broadcast the buffer index step - yield {'index': shm._i.value} + yield {'index': shm._last.value} def subscribe_ohlc_for_increment( diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index cbda6062..363f3c01 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -33,6 +33,6 @@ def iterticks( ticks = quote.get('ticks', ()) if ticks: for tick in ticks: - print(f"{quote['symbol']}: {tick}") + # print(f"{quote['symbol']}: {tick}") if tick.get('type') in types: yield tick diff --git a/piker/data/_source.py b/piker/data/_source.py index 3ad6d3e8..26180443 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -15,27 +15,36 @@ # along with this program. If not, see . """ -Numpy data source machinery. +numpy data source coversion helpers. """ import decimal from dataclasses import dataclass import numpy as np import pandas as pd +# from numba import from_dtype +ohlc_fields = [ + ('time', float), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', int), + ('bar_wap', float), +] + +ohlc_with_index = ohlc_fields.copy() +ohlc_with_index.insert(0, ('index', int)) + # our minimum structured array layout for ohlc data -base_ohlc_dtype = np.dtype( - [ - ('index', int), - ('time', float), - ('open', float), - ('high', float), - ('low', float), - ('close', float), - ('volume', int), - ] -) +base_iohlc_dtype = np.dtype(ohlc_with_index) +base_ohlc_dtype = np.dtype(ohlc_fields) + +# TODO: for now need to construct this manually for readonly arrays, see +# https://github.com/numba/numba/issues/4511 +# numba_ohlc_dtype = from_dtype(base_ohlc_dtype) # map time frame "keys" to minutes values tf_in_1m = { @@ -110,18 +119,27 @@ def from_df( 'Low': 'low', 'Close': 'close', 'Volume': 'volume', + + # most feeds are providing this over sesssion anchored + 'vwap': 'bar_wap', + + # XXX: ib_insync calls this the "wap of the bar" + # but no clue what is actually is... + # https://github.com/pikers/piker/issues/119#issuecomment-729120988 + 'average': 'bar_wap', } df = df.rename(columns=columns) for name in df.columns: - if name not in base_ohlc_dtype.names[1:]: + # if name not in base_ohlc_dtype.names[1:]: + if name not in base_ohlc_dtype.names: del df[name] # TODO: it turns out column access on recarrays is actually slower: # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # it might make sense to make these structured arrays? - array = df.to_records() + array = df.to_records(index=False) _nan_to_closest_num(array) return array From 9930430721365a8ff92388ce9201d503eee68f1f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Dec 2020 13:11:23 -0500 Subject: [PATCH 20/28] Close app on last window exit Use a system triggered SIGINT on app close to tear down the streaming stack and terminate the `trio`/`tractor` runtimes deterministically. --- piker/ui/_exec.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 3e3f57bf..3757e79c 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -20,6 +20,8 @@ Trio - Qt integration Run ``trio`` in guest mode on top of the Qt event loop. All global Qt runtime settings are mostly defined here. """ +import os +import signal from functools import partial import traceback from typing import Tuple, Callable, Dict, Any @@ -74,11 +76,16 @@ class MainWindow(QtGui.QMainWindow): self.setMinimumSize(*self.size) self.setWindowTitle(self.title) - def closeEvent(self, event: 'QCloseEvent') -> None: + def closeEvent( + self, + event: 'QCloseEvent' + ) -> None: """Cancel the root actor asap. """ - tractor.current_actor().cancel_soon() + # raising KBI seems to get intercepted by by Qt so just use the + # system. + os.kill(os.getpid(), signal.SIGINT) def run_qtractor( @@ -128,11 +135,15 @@ def run_qtractor( def done_callback(outcome): - print(f"Outcome: {outcome}") - if isinstance(outcome, Error): exc = outcome.error - traceback.print_exception(type(exc), exc, exc.__traceback__) + + if isinstance(outcome.error, KeyboardInterrupt): + # make it look like ``trio`` + print("Aborted!") + + else: + traceback.print_exception(type(exc), exc, exc.__traceback__) app.quit() @@ -144,9 +155,6 @@ def run_qtractor( instance = main_widget() instance.window = window - # kill the app when root actor terminates - tractor._actor._lifetime_stack.callback(app.quit) - widgets = { 'window': window, 'main': instance, @@ -170,6 +178,7 @@ def run_qtractor( main, run_sync_soon_threadsafe=run_sync_soon_threadsafe, done_callback=done_callback, + # restrict_keyboard_interrupt_to_checkpoints=True, ) window.main_widget = main_widget From c8537d59a85e0adad7034e2d5a5034f734e8727a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Dec 2020 12:21:39 -0500 Subject: [PATCH 21/28] Port charting to new shm primary indexing --- piker/ui/_chart.py | 306 ++++++++++++++++++++++++++++----------------- 1 file changed, 194 insertions(+), 112 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 60b55e5d..476f0632 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -17,7 +17,7 @@ """ High level Qt chart widgets. """ -from typing import Tuple, Dict, Any, Optional +from typing import Tuple, Dict, Any, Optional, Callable from functools import partial from PyQt5 import QtCore, QtGui @@ -105,6 +105,7 @@ class ChartSpace(QtGui.QWidget): self.tf_layout.setContentsMargins(0, 12, 0, 0) time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN') btn_prefix = 'TF' + for tf in time_frames: btn_name = ''.join([btn_prefix, tf]) btn = QtGui.QPushButton(tf) @@ -112,6 +113,7 @@ class ChartSpace(QtGui.QWidget): btn.setEnabled(False) setattr(self, btn_name, btn) self.tf_layout.addWidget(btn) + self.toolbar_layout.addLayout(self.tf_layout) # XXX: strat loader/saver that we don't need yet. @@ -126,6 +128,8 @@ class ChartSpace(QtGui.QWidget): ohlc: bool = True, ) -> None: """Load a new contract into the charting app. + + Expects a ``numpy`` structured array containing all the ohlcv fields. """ # XXX: let's see if this causes mem problems self.window.setWindowTitle(f'piker chart {symbol}') @@ -148,7 +152,8 @@ class ChartSpace(QtGui.QWidget): if not self.v_layout.isEmpty(): self.v_layout.removeWidget(linkedcharts) - main_chart = linkedcharts.plot_main(s, data, ohlc=ohlc) + main_chart = linkedcharts.plot_ohlc_main(s, data) + self.v_layout.addWidget(linkedcharts) return linkedcharts, main_chart @@ -176,7 +181,6 @@ class LinkedSplitCharts(QtGui.QWidget): def __init__(self): super().__init__() self.signals_visible: bool = False - self._array: np.ndarray = None # main data source self._ch: CrossHair = None # crosshair graphics self.chart: ChartPlotWidget = None # main (ohlc) chart self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} @@ -212,20 +216,18 @@ class LinkedSplitCharts(QtGui.QWidget): sizes.extend([min_h_ind] * len(self.subplots)) self.splitter.setSizes(sizes) # , int(self.height()*0.2) - def plot_main( + def plot_ohlc_main( self, symbol: Symbol, array: np.ndarray, - ohlc: bool = True, + style: str = 'bar', ) -> 'ChartPlotWidget': """Start up and show main (price) chart and all linked subcharts. + + The data input struct array must include OHLC fields. """ self.digits = symbol.digits() - # TODO: this should eventually be a view onto shared mem or some - # higher level type / API - self._array = array - # add crosshairs self._ch = CrossHair( linkedsplitcharts=self, @@ -235,11 +237,13 @@ class LinkedSplitCharts(QtGui.QWidget): name=symbol.key, array=array, xaxis=self.xaxis, - ohlc=ohlc, + style=style, _is_main=True, ) # add crosshair graphic self.chart.addItem(self._ch) + + # axis placement if _xaxis_at == 'bottom': self.chart.hideAxis('bottom') @@ -253,7 +257,7 @@ class LinkedSplitCharts(QtGui.QWidget): name: str, array: np.ndarray, xaxis: DynamicDateAxis = None, - ohlc: bool = False, + style: str = 'line', _is_main: bool = False, **cpw_kwargs, ) -> 'ChartPlotWidget': @@ -263,7 +267,7 @@ class LinkedSplitCharts(QtGui.QWidget): """ if self.chart is None and not _is_main: raise RuntimeError( - "A main plot must be created first with `.plot_main()`") + "A main plot must be created first with `.plot_ohlc_main()`") # source of our custom interactions cv = ChartView() @@ -277,6 +281,11 @@ class LinkedSplitCharts(QtGui.QWidget): ) cpw = ChartPlotWidget( + + # this name will be used to register the primary + # graphics curve managed by the subchart + name=name, + array=array, parent=self.splitter, axisItems={ @@ -287,11 +296,12 @@ class LinkedSplitCharts(QtGui.QWidget): cursor=self._ch, **cpw_kwargs, ) + + # give viewbox a reference to primary chart + # allowing for kb controls and interactions + # (see our custom view in `._interactions.py`) cv.chart = cpw - # this name will be used to register the primary - # graphics curve managed by the subchart - cpw.name = name cpw.plotItem.vb.linked_charts = self cpw.setFrameStyle(QtGui.QFrame.StyledPanel) # | QtGui.QFrame.Plain) cpw.hideButtons() @@ -305,11 +315,15 @@ class LinkedSplitCharts(QtGui.QWidget): self._ch.add_plot(cpw) # draw curve graphics - if ohlc: + if style == 'bar': cpw.draw_ohlc(name, array) - else: + + elif style == 'line': cpw.draw_curve(name, array) + else: + raise ValueError(f"Chart style {style} is currently unsupported") + if not _is_main: # track by name self.subplots[name] = cpw @@ -319,6 +333,8 @@ class LinkedSplitCharts(QtGui.QWidget): # XXX: we need this right? # self.splitter.addWidget(cpw) + else: + assert style == 'bar', 'main chart must be OHLC' return cpw @@ -344,6 +360,7 @@ class ChartPlotWidget(pg.PlotWidget): def __init__( self, # the data view we generate graphics from + name: str, array: np.ndarray, static_yrange: Optional[Tuple[float, float]] = None, cursor: Optional[CrossHair] = None, @@ -356,17 +373,26 @@ class ChartPlotWidget(pg.PlotWidget): # parent=None, # plotItem=None, # antialias=True, + useOpenGL=True, **kwargs ) + + self.name = name + # self.setViewportMargins(0, 0, 0, 0) - self._array = array # readonly view of data + self._ohlc = array # readonly view of ohlc data + self.default_view() + self._arrays = {} # readonly view of overlays self._graphics = {} # registry of underlying graphics - self._overlays = {} # registry of overlay curves + self._overlays = set() # registry of overlay curve names + self._labels = {} # registry of underlying graphics self._ysticks = {} # registry of underlying graphics + self._vb = self.plotItem.vb self._static_yrange = static_yrange # for "known y-range style" + self._view_mode: str = 'follow' self._cursor = cursor # placehold for mouse @@ -377,6 +403,7 @@ class ChartPlotWidget(pg.PlotWidget): # show background grid self.showGrid(x=True, y=True, alpha=0.5) + # TODO: stick in config # use cross-hair for cursor? # self.setCursor(QtCore.Qt.CrossCursor) @@ -391,22 +418,25 @@ class ChartPlotWidget(pg.PlotWidget): self._vb.sigResized.connect(self._set_yrange) def last_bar_in_view(self) -> bool: - self._array[-1]['index'] + self._ohlc[-1]['index'] def update_contents_labels( self, index: int, # array_name: str, ) -> None: - if index >= 0 and index < len(self._array): + if index >= 0 and index < self._ohlc[-1]['index']: for name, (label, update) in self._labels.items(): - if name is self.name : - array = self._array + if name is self.name: + array = self._ohlc else: array = self._arrays[name] - update(index, array) + try: + update(index, array) + except IndexError: + log.exception(f"Failed to update label: {name}") def _set_xlimits( self, @@ -430,8 +460,11 @@ class ChartPlotWidget(pg.PlotWidget): """Return a range tuple for the bars present in view. """ l, r = self.view_range() - lbar = max(l, 0) - rbar = min(r, len(self._array)) + a = self._ohlc + lbar = max(l, a[0]['index']) + rbar = min(r, a[-1]['index']) + # lbar = max(l, 0) + # rbar = min(r, len(self._ohlc)) return l, lbar, rbar, r def default_view( @@ -441,7 +474,8 @@ class ChartPlotWidget(pg.PlotWidget): """Set the view box to the "default" startup view of the scene. """ - xlast = self._array[index]['index'] + xlast = self._ohlc[index]['index'] + print(xlast) begin = xlast - _bars_to_left_in_follow_mode end = xlast + _bars_from_right_in_follow_mode @@ -462,7 +496,7 @@ class ChartPlotWidget(pg.PlotWidget): self._vb.setXRange( min=l + 1, max=r + 1, - # holy shit, wtf dude... why tf would this not be 0 by + # TODO: holy shit, wtf dude... why tf would this not be 0 by # default... speechless. padding=0, ) @@ -477,6 +511,7 @@ class ChartPlotWidget(pg.PlotWidget): """Draw OHLC datums to chart. """ graphics = style(self.plotItem) + # adds all bar/candle graphics objects for each data point in # the np array buffer to be drawn on next render cycle self.addItem(graphics) @@ -486,10 +521,12 @@ class ChartPlotWidget(pg.PlotWidget): self._graphics[name] = graphics - label = ContentsLabel(chart=self, anchor_at=('top', 'left')) - self._labels[name] = (label, partial(label.update_from_ohlc, name)) - label.show() - self.update_contents_labels(len(data) - 1) #, name) + self.add_contents_label( + name, + anchor_at=('top', 'left'), + update_func=ContentsLabel.update_from_ohlc, + ) + self.update_contents_labels(len(data) - 1) self._add_sticky(name) @@ -500,49 +537,74 @@ class ChartPlotWidget(pg.PlotWidget): name: str, data: np.ndarray, overlay: bool = False, + color: str = 'default_light', + add_label: bool = True, **pdi_kwargs, ) -> pg.PlotDataItem: - # draw the indicator as a plain curve + """Draw a "curve" (line plot graphics) for the provided data in + the input array ``data``. + + """ _pdi_defaults = { - 'pen': pg.mkPen(hcolor('default_light')), + 'pen': pg.mkPen(hcolor(color)), } pdi_kwargs.update(_pdi_defaults) curve = pg.PlotDataItem( - data[name], + y=data[name], + x=data['index'], # antialias=True, name=name, + # TODO: see how this handles with custom ohlcv bars graphics + # and/or if we can implement something similar for OHLC graphics clipToView=True, + **pdi_kwargs, ) self.addItem(curve) - # register overlay curve with name + # register curve graphics and backing array for name self._graphics[name] = curve + self._arrays[name] = data if overlay: - anchor_at = ('bottom', 'right') - self._overlays[name] = curve - self._arrays[name] = data + anchor_at = ('bottom', 'left') + self._overlays.add(name) else: - anchor_at = ('top', 'right') + anchor_at = ('top', 'left') # TODO: something instead of stickies for overlays # (we need something that avoids clutter on x-axis). self._add_sticky(name, bg_color='default_light') - label = ContentsLabel(chart=self, anchor_at=anchor_at) - self._labels[name] = (label, partial(label.update_from_value, name)) - label.show() - self.update_contents_labels(len(data) - 1) #, name) + if add_label: + self.add_contents_label(name, anchor_at=anchor_at) + self.update_contents_labels(len(data) - 1) if self._cursor: self._cursor.add_curve_cursor(self, curve) return curve + def add_contents_label( + self, + name: str, + anchor_at: Tuple[str, str] = ('top', 'left'), + update_func: Callable = ContentsLabel.update_from_value, + ) -> ContentsLabel: + + label = ContentsLabel(chart=self, anchor_at=anchor_at) + self._labels[name] = ( + # calls class method on instance + label, + partial(update_func, label, name) + ) + label.show() + + return label + def _add_sticky( self, name: str, @@ -569,7 +631,7 @@ class ChartPlotWidget(pg.PlotWidget): """Update the named internal graphics from ``array``. """ - self._array = array + self._ohlc = array graphics = self._graphics[name] graphics.update_from_array(array, **kwargs) return graphics @@ -584,14 +646,18 @@ class ChartPlotWidget(pg.PlotWidget): """ if name not in self._overlays: - self._array = array + self._ohlc = array else: self._arrays[name] = array curve = self._graphics[name] + # TODO: we should instead implement a diff based - # "only update with new items" on the pg.PlotDataItem - curve.setData(array[name], **kwargs) + # "only update with new items" on the pg.PlotCurveItem + # one place to dig around this might be the `QBackingStore` + # https://doc.qt.io/qt-5/qbackingstore.html + curve.setData(y=array[name], x=array['index'], **kwargs) + return curve def _set_yrange( @@ -625,8 +691,9 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: logic to check if end of bars in view extra = view_len - _min_points_to_show - begin = 0 - extra - end = len(self._array) - 1 + extra + begin = self._ohlc[0]['index'] - extra + # end = len(self._ohlc) - 1 + extra + end = self._ohlc[-1]['index'] - 1 + extra # XXX: test code for only rendering lines for the bars in view. # This turns out to be very very poor perf when scaling out to @@ -642,10 +709,15 @@ class ChartPlotWidget(pg.PlotWidget): # f"view_len: {view_len}, bars_len: {bars_len}\n" # f"begin: {begin}, end: {end}, extra: {extra}" # ) - self._set_xlimits(begin, end) + # self._set_xlimits(begin, end) # TODO: this should be some kind of numpy view api - bars = self._array[lbar:rbar] + # bars = self._ohlc[lbar:rbar] + + a = self._ohlc + ifirst = a[0]['index'] + bars = a[lbar - ifirst:rbar - ifirst] + if not len(bars): # likely no data loaded yet or extreme scrolling? log.error(f"WTF bars_range = {lbar}:{rbar}") @@ -731,10 +803,6 @@ async def _async_main( # chart_app.init_search() - # XXX: bug zone if you try to ctl-c after this we get hangs again? - # wtf... - # await tractor.breakpoint() - # historical data fetch brokermod = brokers.get_brokermod(brokername) @@ -747,30 +815,28 @@ async def _async_main( ohlcv = feed.shm bars = ohlcv.array - # TODO: when we start messing with line charts - # c = np.zeros(len(bars), dtype=[ - # (sym, bars.dtype.fields['close'][0]), - # ('index', 'i4'), - # ]) - # c[sym] = bars['close'] - # c['index'] = bars['index'] - # linked_charts, chart = chart_app.load_symbol(sym, c, ohlc=False) - # load in symbol's ohlc data + # await tractor.breakpoint() linked_charts, chart = chart_app.load_symbol(sym, bars) # plot historical vwap if available - vwap_in_history = False - if 'vwap' in bars.dtype.fields: - vwap_in_history = True - chart.draw_curve( - name='vwap', - data=bars, - overlay=True, - ) + wap_in_history = False + + if brokermod._show_wap_in_history: + + if 'bar_wap' in bars.dtype.fields: + wap_in_history = True + chart.draw_curve( + name='bar_wap', + data=bars, + add_label=False, + ) chart._set_yrange() + # TODO: a data view api that makes this less shit + chart._shm = ohlcv + # eventually we'll support some kind of n-compose syntax fsp_conf = { 'vwap': { @@ -799,19 +865,13 @@ async def _async_main( loglevel, ) - # update last price sticky - last_price_sticky = chart._ysticks[chart.name] - last_price_sticky.update_from_data( - *ohlcv.array[-1][['index', 'close']] - ) - # start graphics update loop(s)after receiving first live quote n.start_soon( chart_from_quotes, chart, feed.stream, ohlcv, - vwap_in_history, + wap_in_history, ) # wait for a first quote before we start any update tasks @@ -834,7 +894,7 @@ async def chart_from_quotes( chart: ChartPlotWidget, stream, ohlcv: np.ndarray, - vwap_in_history: bool = False, + wap_in_history: bool = False, ) -> None: """The 'main' (price) chart real-time update loop. @@ -847,29 +907,40 @@ async def chart_from_quotes( # - update last open price correctly instead # of copying it from last bar's close # - 5 sec bar lookback-autocorrection like tws does? + + # update last price sticky last_price_sticky = chart._ysticks[chart.name] + last_price_sticky.update_from_data( + *ohlcv.array[-1][['index', 'close']] + ) def maxmin(): # TODO: implement this # https://arxiv.org/abs/cs/0610046 # https://github.com/lemire/pythonmaxmin - array = chart._array + array = chart._ohlc + ifirst = array[0]['index'] + last_bars_range = chart.bars_range() l, lbar, rbar, r = last_bars_range - in_view = array[lbar:rbar] + in_view = array[lbar - ifirst:rbar - ifirst] + + assert in_view.size + mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) - # TODO: when we start using line charts + # TODO: when we start using line charts, probably want to make + # this an overloaded call on our `DataView # sym = chart.name # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) return last_bars_range, mx, mn - last_bars_range, last_mx, last_mn = maxmin() - chart.default_view() + last_bars_range, last_mx, last_mn = maxmin() + last, volume = ohlcv.array[-1][['close', 'volume']] l1 = L1Labels( @@ -889,7 +960,6 @@ async def chart_from_quotes( async for quotes in stream: for sym, quote in quotes.items(): - # print(f'CHART: {quote}') for tick in quote.get('ticks', ()): @@ -898,7 +968,14 @@ async def chart_from_quotes( price = tick.get('price') size = tick.get('size') - if ticktype in ('trade', 'utrade'): + # compute max and min trade values to display in view + # TODO: we need a streaming minmax algorithm here, see + # def above. + brange, mx_in_view, mn_in_view = maxmin() + l, lbar, rbar, r = brange + + if ticktype in ('trade', 'utrade', 'last'): + array = ohlcv.array # update price sticky(s) @@ -907,25 +984,16 @@ async def chart_from_quotes( *last[['index', 'close']] ) + # plot bars # update price bar chart.update_ohlc_from_array( chart.name, array, ) - # chart.update_curve_from_array( - # chart.name, - # TODO: when we start using line charts - # np.array(array['close'], dtype=[(chart.name, 'f8')]) - - # if vwap_in_history: - # # update vwap overlay line - # chart.update_curve_from_array('vwap', ohlcv.array) - - # compute max and min trade values to display in view - # TODO: we need a streaming minmax algorithm here, see - # def above. - brange, mx_in_view, mn_in_view = maxmin() + if wap_in_history: + # update vwap overlay line + chart.update_curve_from_array('bar_wap', ohlcv.array) # XXX: prettty sure this is correct? # if ticktype in ('trade', 'last'): @@ -1021,12 +1089,14 @@ async def spawn_fsps( # spawn closure, can probably define elsewhere async def spawn_fsp_daemon( - fsp_name, - conf, + fsp_name: str, + display_name: str, + conf: dict, ): """Start an fsp subactor async. """ + print(f'FSP NAME: {fsp_name}') portal = await n.run_in_actor( # name as title of sub-chart @@ -1057,6 +1127,7 @@ async def spawn_fsps( ln.start_soon( spawn_fsp_daemon, fsp_func_name, + display_name, conf, ) @@ -1081,6 +1152,8 @@ async def update_signals( ) -> None: """FSP stream chart update loop. + This is called once for each entry in the fsp + config map. """ shm = conf['shm'] @@ -1094,6 +1167,7 @@ async def update_signals( last_val_sticky = None else: + chart = linked_charts.add_plot( name=fsp_func_name, array=shm.array, @@ -1112,6 +1186,7 @@ async def update_signals( # fsp_func_name ) + # read last value array = shm.array value = array[fsp_func_name][-1] @@ -1119,7 +1194,8 @@ async def update_signals( last_val_sticky.update_from_data(-1, value) chart.update_curve_from_array(fsp_func_name, array) - chart.default_view() + + chart._shm = shm # TODO: figure out if we can roll our own `FillToThreshold` to # get brush filled polygons for OS/OB conditions. @@ -1132,23 +1208,28 @@ async def update_signals( # graphics.curve.setFillLevel(50) # add moveable over-[sold/bought] lines - level_line(chart, 30) - level_line(chart, 70, orient_v='top') + # and labels only for the 70/30 lines + level_line(chart, 20, show_label=False) + level_line(chart, 30, orient_v='top') + level_line(chart, 70, orient_v='bottom') + level_line(chart, 80, orient_v='top', show_label=False) - chart._shm = shm chart._set_yrange() stream = conf['stream'] # update chart graphics async for value in stream: - # p = pg.debug.Profiler(disabled=False, delayed=False) + + # read last array = shm.array value = array[-1][fsp_func_name] + if last_val_sticky: last_val_sticky.update_from_data(-1, value) + + # update graphics chart.update_curve_from_array(fsp_func_name, array) - # p('rendered rsi datum') async def check_for_new_bars(feed, ohlcv, linked_charts): @@ -1199,7 +1280,7 @@ async def check_for_new_bars(feed, ohlcv, linked_charts): # resize view # price_chart._set_yrange() - for name, curve in price_chart._overlays.items(): + for name in price_chart._overlays: price_chart.update_curve_from_array( name, @@ -1207,15 +1288,16 @@ async def check_for_new_bars(feed, ohlcv, linked_charts): ) # # TODO: standard api for signal lookups per plot - # if name in price_chart._array.dtype.fields: + # if name in price_chart._ohlc.dtype.fields: # # should have already been incremented above - # price_chart.update_curve_from_array(name, price_chart._array) + # price_chart.update_curve_from_array(name, price_chart._ohlc) for name, chart in linked_charts.subplots.items(): chart.update_curve_from_array(chart.name, chart._shm.array) # chart._set_yrange() + # shift the view if in follow mode price_chart.increment_view() From 7ce27107df412276c3bb45f3a2bc1d491a3946d4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Dec 2020 12:22:05 -0500 Subject: [PATCH 22/28] Fix axes for shm primary indexing --- piker/ui/_axes.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/piker/ui/_axes.py b/piker/ui/_axes.py index c5d8f2ea..d2bac3cd 100644 --- a/piker/ui/_axes.py +++ b/piker/ui/_axes.py @@ -116,13 +116,25 @@ class DynamicDateAxis(Axis): indexes: List[int], ) -> List[str]: - bars = self.linked_charts.chart._ohlc + # try: + chart = self.linked_charts.chart + bars = chart._ohlc + shm = self.linked_charts.chart._shm + first = shm._first.value + bars_len = len(bars) times = bars['time'] epochs = times[list( - map(int, filter(lambda i: i < bars_len, indexes)) + map( + int, + filter( + lambda i: i > 0 and i < bars_len, + (i-first for i in indexes) + ) + ) )] + # TODO: **don't** have this hard coded shift to EST dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour() From db6f77481b07182ba0390da72addb9cf66e2cdb3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Dec 2020 12:32:35 -0500 Subject: [PATCH 23/28] Stick with slightly smaller fonts --- piker/ui/_style.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/ui/_style.py b/piker/ui/_style.py index 3764750a..eeeb6c9c 100644 --- a/piker/ui/_style.py +++ b/piker/ui/_style.py @@ -29,8 +29,8 @@ from ..log import get_logger log = get_logger(__name__) # chart-wide fonts specified in inches -_default_font_inches_we_like = 0.0666 -_down_2_font_inches_we_like = 6 / 96 +_default_font_inches_we_like = 6 / 96 +_down_2_font_inches_we_like = 5 / 96 class DpiAwareFont: From 618c2f8e0a8a4ea7e307a3182b6766c08396e9dc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Dec 2020 12:36:38 -0500 Subject: [PATCH 24/28] More general salutation --- piker/ui/_exec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 3757e79c..bbb3633a 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -139,8 +139,8 @@ def run_qtractor( exc = outcome.error if isinstance(outcome.error, KeyboardInterrupt): - # make it look like ``trio`` - print("Aborted!") + # make it kinda look like ``trio`` + print("Terminated!") else: traceback.print_exception(type(exc), exc, exc.__traceback__) From df686755dabdfd4cda9157b694fcb23a09ba9588 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Dec 2020 12:37:19 -0500 Subject: [PATCH 25/28] Drop legacy "historical" QPicture cruft --- piker/ui/_graphics.py | 87 ++++++++----------------------------------- 1 file changed, 15 insertions(+), 72 deletions(-) diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py index 83cbd5b5..88be1602 100644 --- a/piker/ui/_graphics.py +++ b/piker/ui/_graphics.py @@ -17,7 +17,7 @@ """ Chart graphics for displaying a slew of different data types. """ - +import inspect from typing import List, Optional, Tuple import numpy as np @@ -104,7 +104,7 @@ class LineDot(pg.CurvePoint): # first = x[0] # i = index - first i = index - x[0] - if i > 0: + if i > 0 and i < len(y): newPos = (index, y[i]) QtGui.QGraphicsItem.setPos(self, *newPos) return True @@ -123,9 +123,8 @@ _corner_margins = { ('top', 'left'): (-4, -5), ('top', 'right'): (4, -5), - # TODO: pretty sure y here needs to be 2x font height - ('bottom', 'left'): (-4, 14), - ('bottom', 'right'): (4, 14), + ('bottom', 'left'): (-4, lambda font_size: font_size * 2), + ('bottom', 'right'): (4, lambda font_size: font_size * 2), } @@ -142,7 +141,10 @@ class ContentsLabel(pg.LabelItem): font_size: Optional[int] = None, ) -> None: font_size = font_size or _font.font.pixelSize() - super().__init__(justify=justify_text, size=f'{str(font_size)}px') + super().__init__( + justify=justify_text, + size=f'{str(font_size)}px' + ) # anchor to viewbox self.setParentItem(chart._vb) @@ -153,6 +155,10 @@ class ContentsLabel(pg.LabelItem): index = (_corner_anchors[h], _corner_anchors[v]) margins = _corner_margins[(v, h)] + ydim = margins[1] + if inspect.isfunction(margins[1]): + margins = margins[0], ydim(font_size) + self.anchor(itemPos=index, parentPos=index, offset=margins) def update_from_ohlc( @@ -514,7 +520,6 @@ class BarItems(pg.GraphicsObject): super().__init__() self.last_bar = QtGui.QPicture() - # self.history = QtGui.QPicture() self.path = QtGui.QPainterPath() # self._h_path = QtGui.QGraphicsPathItem(self.path) @@ -594,18 +599,6 @@ class BarItems(pg.GraphicsObject): p.end() # @timeit - # def draw_history(self) -> None: - # # TODO: avoid having to use a ```QPicture` to calc the - # # ``.boundingRect()``, use ``QGraphicsPathItem`` instead? - # # https://doc.qt.io/qt-5/qgraphicspathitem.html - # # self._h_path.setPath(self.path) - - # p = QtGui.QPainter(self.history) - # p.setPen(self.bars_pen) - # p.drawPath(self.path) - # p.end() - - @timeit def update_from_array( self, array: np.ndarray, @@ -636,26 +629,19 @@ class BarItems(pg.GraphicsObject): # only drawing as many bars as exactly specified. if prepend_length: - # breakpoint() + # new history was added and we need to render a new path new_bars = array[:prepend_length] prepend_path = gen_qpath(new_bars, 0, self.w) - # XXX: SOMETHING IS FISHY HERE what with the old_path + # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path # y value not matching the first value from # array[prepend_length + 1] ??? # update path old_path = self.path self.path = prepend_path - # self.path.moveTo(float(index - self.w), float(new_bars[0]['open'])) - # self.path.moveTo( - # float(istart - self.w), - # # float(array[prepend_length + 1]['open']) - # float(array[prepend_length]['open']) - # ) self.path.addPath(old_path) - # self.draw_history() if append_length: # generate new lines objects for updatable "current bar" @@ -672,45 +658,8 @@ class BarItems(pg.GraphicsObject): self.path.moveTo(float(istop - self.w), float(new_bars[0]['open'])) self.path.addPath(append_path) - # self.draw_history() - self._xrange = first_index, last_index - # if extra > 0: - # index = array['index'] - # first, last = index[0], indext[-1] - - # # if first < self.start_index: - # # length = self.start_index - first - # # prepend_path = gen_qpath(array[:sef: - - # # generate new lines objects for updatable "current bar" - # self._last_bar_lines = lines_from_ohlc(array[-1], self.w) - # self.draw_last_bar() - - # # generate new graphics to match provided array - # # path appending logic: - # # we need to get the previous "current bar(s)" for the time step - # # and convert it to a sub-path to append to the historical set - # new_history_istart = length - 2 - - # to_history = array[new_history_istart:new_history_istart + extra] - - # new_history_qpath = gen_qpath(to_history, 0, self.w) - - # # move to position of placement for the next bar in history - # # and append new sub-path - # new_bars = array[index:index + extra] - - # # x, y coordinates for start of next open/left arm - # self.path.moveTo(float(index - self.w), float(new_bars[0]['open'])) - - # self.path.addPath(new_history_qpath) - - # self.start_index += extra - - # self.draw_history() - if just_history: self.update() return @@ -751,7 +700,7 @@ class BarItems(pg.GraphicsObject): self.draw_last_bar() self.update() - @timeit + # @timeit def paint(self, p, opt, widget): # profiler = pg.debug.Profiler(disabled=False, delayed=False) @@ -767,13 +716,8 @@ class BarItems(pg.GraphicsObject): # as is necesarry for what's in "view". Not sure if this will # lead to any perf gains other then when zoomed in to less bars # in view. - # p.drawPicture(0, 0, self.history) p.drawPicture(0, 0, self.last_bar) - p.setPen(self.bars_pen) - - # TODO: does it matter which we use? - # p.drawPath(self._h_path.path()) p.drawPath(self.path) # @timeit @@ -794,7 +738,6 @@ class BarItems(pg.GraphicsObject): # compute aggregate bounding rectangle lb = self.last_bar.boundingRect() hb = self.path.boundingRect() - # hb = self._h_path.boundingRect() return QtCore.QRectF( # top left From b72d7d3085a3023b593435b38a0daefd8fdc2a34 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Dec 2020 16:08:54 -0500 Subject: [PATCH 26/28] Drop profile calls on OHLC bars for now --- piker/ui/_graphics.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py index 88be1602..6e968060 100644 --- a/piker/ui/_graphics.py +++ b/piker/ui/_graphics.py @@ -417,7 +417,6 @@ def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]: return [hl, o, c] -@timeit @jit( # TODO: for now need to construct this manually for readonly arrays, see # https://github.com/numba/numba/issues/4511 @@ -489,7 +488,7 @@ def path_arrays_from_ohlc( return x, y, c -@timeit +# @timeit def gen_qpath( data, start, # XXX: do we need this? @@ -497,6 +496,8 @@ def gen_qpath( ) -> QtGui.QPainterPath: x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w) + + # TODO: numba the internals of this! return pg.functions.arrayToQPath(x, y, connect=c) @@ -542,7 +543,7 @@ class BarItems(pg.GraphicsObject): self.start_index: int = 0 self.stop_index: int = 0 - @timeit + # @timeit def draw_from_data( self, data: np.ndarray, @@ -717,6 +718,7 @@ class BarItems(pg.GraphicsObject): # lead to any perf gains other then when zoomed in to less bars # in view. p.drawPicture(0, 0, self.last_bar) + p.setPen(self.bars_pen) p.drawPath(self.path) From f27d639552074ebc44aaff651047c0ee4b6c4a1d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Dec 2020 17:14:47 -0500 Subject: [PATCH 27/28] Port kraken to declare "wap" field --- piker/brokers/kraken.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 4329a981..2289f743 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -57,13 +57,15 @@ _ohlc_dtype = [ ('close', float), ('volume', float), ('count', int), - ('vwap', float), + ('bar_wap', float), ] # UI components allow this to be declared such that additional # (historical) fields can be exposed. ohlc_dtype = np.dtype(_ohlc_dtype) +_show_wap_in_history = True + class Client: @@ -341,7 +343,7 @@ async def stream_quotes( while True: try: async with trio_websocket.open_websocket_url( - 'wss://ws.kraken.com', + 'wss://ws.kraken.com/', ) as ws: # XXX: setup subs @@ -433,7 +435,7 @@ async def stream_quotes( 'high', 'low', 'close', - 'vwap', + 'bar_wap', # in this case vwap of bar 'volume'] ][-1] = ( o, From 833a442ac060584d94db686f0b824fb50d7ac7fd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Dec 2020 16:57:58 -0500 Subject: [PATCH 28/28] Disable vwap in conf, feature delay --- piker/ui/_chart.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 476f0632..a708a82e 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -839,10 +839,10 @@ async def _async_main( # eventually we'll support some kind of n-compose syntax fsp_conf = { - 'vwap': { - 'overlay': True, - 'anchor': 'session', - }, + # 'vwap': { + # 'overlay': True, + # 'anchor': 'session', + # }, 'rsi': { 'period': 14, 'chart_kwargs': {