diff --git a/piker/data/feed.py b/piker/data/feed.py index 534aebc9..d91b890e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1037,12 +1037,11 @@ async def allocate_persistent_feed( flume = Flume( symbol=symbol, - _hist_shm_token=hist_shm.token, - _rt_shm_token=rt_shm.token, first_quote=first_quote, + _rt_shm_token=rt_shm.token, + _hist_shm_token=hist_shm.token, izero_hist=izero_hist, izero_rt=izero_rt, - # throttle_rate=tick_throttle, ) # for ambiguous names we simply apply the retreived diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 6ae30a84..760445f8 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -38,7 +38,6 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QSplitter, ) -import numpy as np import pyqtgraph as pg import trio @@ -63,7 +62,10 @@ from ._style import ( _xaxis_at, _min_points_to_show, ) -from ..data.feed import Feed +from ..data.feed import ( + Feed, + Flume, +) from ..data._source import Symbol from ..log import get_logger from ._interaction import ChartView @@ -537,6 +539,7 @@ class LinkedSplits(QWidget): symbol: Symbol, shm: ShmArray, + flume: Flume, sidepane: FieldsForm, style: str = 'ohlc_bar', @@ -561,6 +564,7 @@ class LinkedSplits(QWidget): self.chart = self.add_plot( name=symbol.fqsn, shm=shm, + flume=flume, style=style, _is_main=True, sidepane=sidepane, @@ -581,6 +585,7 @@ class LinkedSplits(QWidget): name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, style: str = 'line', @@ -704,9 +709,11 @@ class LinkedSplits(QWidget): # draw curve graphics if style == 'ohlc_bar': - graphics, data_key = cpw.draw_ohlc( + # graphics, data_key = cpw.draw_ohlc( + flow = cpw.draw_ohlc( name, shm, + flume=flume, array_key=array_key ) self.cursor.contents_labels.add_label( @@ -718,18 +725,22 @@ class LinkedSplits(QWidget): elif style == 'line': add_label = True - graphics, data_key = cpw.draw_curve( + # graphics, data_key = cpw.draw_curve( + flow = cpw.draw_curve( name, shm, + flume, array_key=array_key, color='default_light', ) elif style == 'step': add_label = True - graphics, data_key = cpw.draw_curve( + # graphics, data_key = cpw.draw_curve( + flow = cpw.draw_curve( name, shm, + flume, array_key=array_key, step_mode=True, color='davies', @@ -739,6 +750,9 @@ class LinkedSplits(QWidget): else: raise ValueError(f"Chart style {style} is currently unsupported") + graphics = flow.graphics + data_key = flow.name + if _is_main: assert style == 'ohlc_bar', 'main chart must be OHLC' else: @@ -895,7 +909,7 @@ class ChartPlotWidget(pg.PlotWidget): # registry of overlay curve names self._flows: dict[str, Flow] = {} - self._feeds: dict[Symbol, Feed] = {} + self.feed: Feed | None = None self._labels = {} # registry of underlying graphics self._ysticks = {} # registry of underlying graphics @@ -916,20 +930,18 @@ class ChartPlotWidget(pg.PlotWidget): self._on_screen: bool = False def resume_all_feeds(self): - ... - # try: - # for feed in self._feeds.values(): - # for flume in feed.flumes.values(): - # self.linked.godwidget._root_n.start_soon(flume.resume) - # except RuntimeError: - # # TODO: cancel the qtractor runtime here? - # raise + feed = self.feed + if feed: + try: + self.linked.godwidget._root_n.start_soon(feed.resume) + except RuntimeError: + # TODO: cancel the qtractor runtime here? + raise def pause_all_feeds(self): - ... - # for feed in self._feeds.values(): - # for flume in feed.flumes.values(): - # self.linked.godwidget._root_n.start_soon(flume.pause) + feed = self.feed + if feed: + self.linked.godwidget._root_n.start_soon(feed.pause) @property def view(self) -> ChartView: @@ -938,12 +950,6 @@ class ChartPlotWidget(pg.PlotWidget): def focus(self) -> None: self.view.setFocus() - def last_bar_in_view(self) -> int: - self._arrays[self.name][-1]['index'] - - def is_valid_index(self, index: int) -> bool: - return index >= 0 and index < self._arrays[self.name][-1]['index'] - def _set_xlimits( self, xfirst: int, @@ -1036,9 +1042,14 @@ class ChartPlotWidget(pg.PlotWidget): log.warning(f'`Flow` for {self.name} not loaded yet?') return - index = flow.shm.array['index'] + arr = flow.shm.array + index = arr['index'] + # times = arr['time'] + + # these will be epoch time floats xfirst, xlast = index[0], index[-1] l, lbar, rbar, r = self.bars_range() + view = self.view if ( @@ -1195,6 +1206,7 @@ class ChartPlotWidget(pg.PlotWidget): name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, overlay: bool = False, @@ -1207,10 +1219,7 @@ class ChartPlotWidget(pg.PlotWidget): **graphics_kwargs, - ) -> tuple[ - pg.GraphicsObject, - str, - ]: + ) -> Flow: ''' Draw a "curve" (line plot graphics) for the provided data in the input shm array ``shm``. @@ -1225,7 +1234,6 @@ class ChartPlotWidget(pg.PlotWidget): graphics = BarItems( linked=self.linked, plotitem=pi, - # pen_color=self.pen_color, color=color, name=name, **graphics_kwargs, @@ -1245,14 +1253,17 @@ class ChartPlotWidget(pg.PlotWidget): **graphics_kwargs, ) - self._flows[data_key] = Flow( - name=name, - plot=pi, - _shm=shm, + flow = self._flows[data_key] = Flow( + data_key, + pi, + shm, + flume, + is_ohlc=is_ohlc, # register curve graphics with this flow graphics=graphics, ) + assert isinstance(flow.shm, ShmArray) # TODO: this probably needs its own method? if overlay: @@ -1309,24 +1320,26 @@ class ChartPlotWidget(pg.PlotWidget): # understand. pi.addItem(graphics) - return graphics, data_key + return flow def draw_ohlc( self, name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, **draw_curve_kwargs, - ) -> (pg.GraphicsObject, str): + ) -> Flow: ''' Draw OHLC datums to chart. ''' return self.draw_curve( - name=name, - shm=shm, + name, + shm, + flume, array_key=array_key, is_ohlc=True, **draw_curve_kwargs, @@ -1391,37 +1404,6 @@ class ChartPlotWidget(pg.PlotWidget): self.sig_mouse_leave.emit(self) self.scene().leaveEvent(ev) - def get_index(self, time: float) -> int: - - # TODO: this should go onto some sort of - # data-view thinger..right? - ohlc = self._flows[self.name].shm.array - - # XXX: not sure why the time is so off here - # looks like we're gonna have to do some fixing.. - indexes = ohlc['time'] >= time - - if any(indexes): - return ohlc['index'][indexes][-1] - else: - return ohlc['index'][-1] - - def in_view( - self, - array: np.ndarray, - - ) -> np.ndarray: - ''' - Slice an input struct array providing only datums - "in view" of this chart. - - ''' - l, lbar, rbar, r = self.bars_range() - ifirst = array[0]['index'] - # slice data by offset from the first index - # available in the passed datum set. - return array[lbar - ifirst:(rbar - ifirst) + 1] - def maxmin( self, name: Optional[str] = None, diff --git a/piker/ui/_display.py b/piker/ui/_display.py index babbfa7a..a9bb5406 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -22,6 +22,8 @@ graphics update methods via our custom ``pyqtgraph`` charting api. ''' from functools import partial +import itertools +from math import floor import time from typing import Optional, Any, Callable @@ -36,6 +38,9 @@ from ..data.feed import ( Flume, ) from ..data.types import Struct +from ..data._sharedmem import ( + ShmArray, +) from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, @@ -50,14 +55,13 @@ from ._fsp import ( has_vlm, open_vlm_displays, ) -from ..data._sharedmem import ( - ShmArray, -) -from ..data._source import tf_in_1s from ._forms import ( FieldsForm, mk_order_pane_layout, ) +from . import _pg_overrides as pgo +# from ..data._source import tf_in_1s +from ..data._sampling import _tick_groups from .order_mode import ( open_order_mode, OrderMode, @@ -71,17 +75,6 @@ from .._profile import Profiler log = get_logger(__name__) -# TODO: load this from a config.toml! -_quote_throttle_rate: int = 16 # Hz - - -# a working tick-type-classes template -_tick_groups = { - 'clears': {'trade', 'utrade', 'last'}, - 'bids': {'bid', 'bsize'}, - 'asks': {'ask', 'asize'}, -} - # TODO: delegate this to each `Flow.maxmin()` which includes # caching and further we should implement the following stream based @@ -90,8 +83,9 @@ _tick_groups = { # https://github.com/lemire/pythonmaxmin def chart_maxmin( chart: ChartPlotWidget, - ohlcv_shm: ShmArray, - vlm_chart: Optional[ChartPlotWidget] = None, + fqsn: str, + # ohlcv_shm: ShmArray, + vlm_chart: ChartPlotWidget | None = None, ) -> tuple[ @@ -106,7 +100,7 @@ def chart_maxmin( ''' last_bars_range = chart.bars_range() - out = chart.maxmin() + out = chart.maxmin(name=fqsn) if out is None: return (last_bars_range, 0, 0, 0) @@ -131,6 +125,10 @@ def chart_maxmin( ) +_i_last: int = 0 +_i_last_append: int = 0 + + class DisplayState(Struct): ''' Chart-local real-time graphics state container. @@ -140,6 +138,7 @@ class DisplayState(Struct): quotes: dict[str, Any] maxmin: Callable + flume: Flume ohlcv: ShmArray hist_ohlcv: ShmArray @@ -173,14 +172,18 @@ class DisplayState(Struct): update_state: bool = True, update_uppx: float = 16, + is_1m: bool = False, ) -> tuple: shm = shm or self.ohlcv chart = chart or self.chart - state = state or self.vars + # state = state or self.vars - if not update_state: + if ( + not update_state + and state + ): state = state.copy() # compute the first available graphic's x-units-per-pixel @@ -194,26 +197,56 @@ class DisplayState(Struct): # "curve" length is already automatic. # increment the view position by the sample offset. - i_step = shm.index - i_diff = i_step - state['i_last'] - state['i_last'] = i_step + # i_step = shm.index + i_step = shm.array[-1]['time'] + # i_diff = i_step - state['i_last'] + # state['i_last'] = i_step + global _i_last, _i_last_append + i_diff = i_step - _i_last + # update global state + if ( + # state is None + not is_1m + and i_diff > 0 + ): + _i_last = i_step - append_diff = i_step - state['i_last_append'] + # append_diff = i_step - state['i_last_append'] + append_diff = i_step - _i_last_append + + # real-time update necessary? + _, _, _, r = chart.bars_range() + liv = r >= shm.index # update the "last datum" (aka extending the flow graphic with # new data) only if the number of unit steps is >= the number of # such unit steps per pixel (aka uppx). Iow, if the zoom level # is such that a datum(s) update to graphics wouldn't span # to a new pixel, we don't update yet. - do_append = (append_diff >= uppx) - if do_append: - state['i_last_append'] = i_step + do_append = ( + append_diff >= uppx + and i_diff + ) + if ( + do_append + and not is_1m + ): + _i_last_append = i_step + # fqsn = self.flume.symbol.fqsn + # print( + # f'DOING APPEND => {fqsn}\n' + # f'i_step:{i_step}\n' + # f'i_diff:{i_diff}\n' + # f'last:{_i_last}\n' + # f'last_append:{_i_last_append}\n' + # f'append_diff:{append_diff}\n' + # f'r: {r}\n' + # f'liv: {liv}\n' + # f'uppx: {uppx}\n' + # ) do_rt_update = uppx < update_uppx - _, _, _, r = chart.bars_range() - liv = r >= i_step - # TODO: pack this into a struct return ( uppx, @@ -229,9 +262,10 @@ async def graphics_update_loop( nurse: trio.Nursery, godwidget: GodWidget, - flume: Flume, + feed: Feed, + pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {}, wap_in_history: bool = False, - vlm_chart: Optional[ChartPlotWidget] = None, + vlm_charts: dict[str, ChartPlotWidget] = {}, ) -> None: ''' @@ -255,185 +289,217 @@ async def graphics_update_loop( fast_chart = linked.chart hist_chart = godwidget.hist_linked.chart + assert hist_chart - ohlcv = flume.rt_shm - hist_ohlcv = flume.hist_shm + dss: dict[str, DisplayState] = {} + for fqsn, flume in feed.flumes.items(): + ohlcv = flume.rt_shm + hist_ohlcv = flume.hist_shm + symbol = flume.symbol + fqsn = symbol.fqsn - # update last price sticky - last_price_sticky = fast_chart.plotItem.getAxis( - 'right')._stickies.get(fast_chart.name) - last_price_sticky.update_from_data( - *ohlcv.array[-1][['index', 'close']] - ) + # update last price sticky + fast_pi = fast_chart._flows[fqsn].plot + last_price_sticky = fast_pi.getAxis('right')._stickies[fqsn] + last_price_sticky.update_from_data( + *ohlcv.array[-1][['index', 'close']] + ) + last_price_sticky.show() - hist_last_price_sticky = hist_chart.plotItem.getAxis( - 'right')._stickies.get(hist_chart.name) - hist_last_price_sticky.update_from_data( - *hist_ohlcv.array[-1][['index', 'close']] - ) + slow_pi = hist_chart._flows[fqsn].plot + hist_last_price_sticky = slow_pi.getAxis('right')._stickies[fqsn] + hist_last_price_sticky.update_from_data( + *hist_ohlcv.array[-1][['index', 'close']] + ) - maxmin = partial( - chart_maxmin, - fast_chart, - ohlcv, - vlm_chart, - ) - last_bars_range: tuple[float, float] - ( - last_bars_range, - last_mx, - last_mn, - last_mx_vlm, - ) = maxmin() + vlm_chart = vlm_charts[fqsn] + maxmin = partial( + chart_maxmin, + fast_chart, + fqsn, + vlm_chart, + ) + last_bars_range: tuple[float, float] + ( + last_bars_range, + last_mx, + last_mn, + last_mx_vlm, + ) = maxmin() - last, volume = ohlcv.array[-1][['close', 'volume']] + last, volume = ohlcv.array[-1][['close', 'volume']] - symbol = fast_chart.linked.symbol + symbol = flume.symbol - l1 = L1Labels( - fast_chart.plotItem, - # determine precision/decimal lengths - digits=symbol.tick_size_digits, - size_digits=symbol.lot_size_digits, - ) - fast_chart._l1_labels = l1 + l1 = L1Labels( + fast_pi, + # determine precision/decimal lengths + digits=symbol.tick_size_digits, + size_digits=symbol.lot_size_digits, + ) + # TODO: this is just wrong now since we can have multiple L1-label + # sets, so instead we should have the l1 associated with the + # plotitem or y-axis likely? + # fast_chart._l1_labels = l1 - # TODO: - # - in theory we should be able to read buffer data faster - # then msgs arrive.. needs some tinkering and testing + # TODO: + # - in theory we should be able to read buffer data faster + # then msgs arrive.. needs some tinkering and testing - # - if trade volume jumps above / below prior L1 price - # levels this might be dark volume we need to - # present differently -> likely dark vlm + # - if trade volume jumps above / below prior L1 price + # levels this might be dark volume we need to + # present differently -> likely dark vlm - tick_size = fast_chart.linked.symbol.tick_size - tick_margin = 3 * tick_size + tick_size = symbol.tick_size + tick_margin = 3 * tick_size - fast_chart.show() - last_quote = time.time() - i_last = ohlcv.index + fast_chart.show() + last_quote = time.time() + # global _i_last + i_last = ohlcv.index - ds = linked.display_state = DisplayState(**{ - 'godwidget': godwidget, - 'quotes': {}, - 'maxmin': maxmin, - 'ohlcv': ohlcv, - 'hist_ohlcv': hist_ohlcv, - 'chart': fast_chart, - 'last_price_sticky': last_price_sticky, - 'hist_last_price_sticky': hist_last_price_sticky, - 'l1': l1, + dss[fqsn] = ds = linked.display_state = DisplayState(**{ + 'godwidget': godwidget, + 'quotes': {}, + 'maxmin': maxmin, + 'flume': flume, + 'ohlcv': ohlcv, + 'hist_ohlcv': hist_ohlcv, + 'chart': fast_chart, + 'last_price_sticky': last_price_sticky, + 'hist_last_price_sticky': hist_last_price_sticky, + 'l1': l1, - 'vars': { - 'tick_margin': tick_margin, - 'i_last': i_last, - 'i_last_append': i_last, - 'last_mx_vlm': last_mx_vlm, - 'last_mx': last_mx, - 'last_mn': last_mn, - } - }) + 'vars': { + 'tick_margin': tick_margin, + 'i_last': i_last, + 'i_last_append': i_last, + 'last_mx_vlm': last_mx_vlm, + 'last_mx': last_mx, + 'last_mn': last_mn, + } + }) - if vlm_chart: - vlm_sticky = vlm_chart.plotItem.getAxis( - 'right')._stickies.get('volume') - ds.vlm_chart = vlm_chart - ds.vlm_sticky = vlm_sticky + if vlm_chart: + vlm_pi = vlm_chart._flows['volume'].plot + vlm_sticky = vlm_pi.getAxis('right')._stickies['volume'] + ds.vlm_chart = vlm_chart + ds.vlm_sticky = vlm_sticky - fast_chart.default_view() + fast_chart.default_view() - # TODO: probably factor this into some kinda `DisplayState` - # API that can be reused at least in terms of pulling view - # params (eg ``.bars_range()``). - async def increment_history_view(): - i_last = hist_ohlcv.index - state = ds.vars.copy() | { - 'i_last_append': i_last, - 'i_last': i_last, - } - _, hist_step_size_s, _ = flume.get_ds_info() + # TODO: probably factor this into some kinda `DisplayState` + # API that can be reused at least in terms of pulling view + # params (eg ``.bars_range()``). + async def increment_history_view(): + i_last = hist_ohlcv.index + state = ds.vars.copy() | { + 'i_last_append': i_last, + 'i_last': i_last, + } + _, hist_step_size_s, _ = flume.get_ds_info() - async with flume.index_stream( - # int(hist_step_size_s) - # TODO: seems this is more reliable at keeping the slow - # chart incremented in view more correctly? - # - It might make sense to just inline this logic with the - # main display task? => it's a tradeoff of slower task - # wakeups/ctx switches verus logic checks (as normal) - # - we need increment logic that only does the view shift - # call when the uppx permits/needs it - int(1), - ) as istream: - async for msg in istream: + async with flume.index_stream( + # int(hist_step_size_s) + # TODO: seems this is more reliable at keeping the slow + # chart incremented in view more correctly? + # - It might make sense to just inline this logic with the + # main display task? => it's a tradeoff of slower task + # wakeups/ctx switches verus logic checks (as normal) + # - we need increment logic that only does the view shift + # call when the uppx permits/needs it + int(1), + ) as istream: + async for msg in istream: - # check if slow chart needs an x-domain shift and/or - # y-range resize. - ( - uppx, - liv, - do_append, - i_diff, - append_diff, - do_rt_update, - ) = ds.incr_info( - chart=hist_chart, - shm=ds.hist_ohlcv, - state=state, - # update_state=False, - ) - # print( - # f'liv: {liv}\n' - # f'do_append: {do_append}\n' - # f'append_diff: {append_diff}\n' - # ) + # check if slow chart needs an x-domain shift and/or + # y-range resize. + ( + uppx, + liv, + do_append, + i_diff, + append_diff, + do_rt_update, + ) = ds.incr_info( + chart=hist_chart, + shm=ds.hist_ohlcv, + state=state, + is_1m=True, + # update_state=False, + ) + # print( + # f'liv: {liv}\n' + # f'do_append: {do_append}\n' + # f'append_diff: {append_diff}\n' + # ) - if ( - do_append - and liv - ): - hist_chart.increment_view(steps=i_diff) - hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) + if ( + do_append + and liv + ): + # hist_chart.increment_view(steps=i_diff) + flow = hist_chart._flows[fqsn] + flow.plot.vb._set_yrange( + # yrange=hist_chart.maxmin(name=fqsn) + ) + # hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) - nurse.start_soon(increment_history_view) + nurse.start_soon(increment_history_view) # main real-time quotes update loop - stream: tractor.MsgStream = flume.stream - async for quotes in stream: + stream: tractor.MsgStream + async with feed.open_multi_stream() as stream: + assert stream + async for quotes in stream: + quote_period = time.time() - last_quote + quote_rate = round( + 1/quote_period, 1) if quote_period > 0 else float('inf') + if ( + quote_period <= 1/_quote_throttle_rate - ds.quotes = quotes - quote_period = time.time() - last_quote - quote_rate = round( - 1/quote_period, 1) if quote_period > 0 else float('inf') - if ( - quote_period <= 1/_quote_throttle_rate + # in the absolute worst case we shouldn't see more then + # twice the expected throttle rate right!? + # and quote_rate >= _quote_throttle_rate * 2 + and quote_rate >= display_rate + ): + log.warning(f'High quote rate {symbol.key}: {quote_rate}') - # in the absolute worst case we shouldn't see more then - # twice the expected throttle rate right!? - # and quote_rate >= _quote_throttle_rate * 2 - and quote_rate >= display_rate - ): - log.warning(f'High quote rate {symbol.key}: {quote_rate}') + last_quote = time.time() - last_quote = time.time() + for sym, quote in quotes.items(): + ds = dss[sym] + ds.quotes = quote - # chart isn't active/shown so skip render cycle and pause feed(s) - if fast_chart.linked.isHidden(): - # print('skipping update') - fast_chart.pause_all_feeds() - continue + rt_pi, hist_pi = pis[sym] - # ic = fast_chart.view._ic - # if ic: - # fast_chart.pause_all_feeds() - # await ic.wait() - # fast_chart.resume_all_feeds() + # chart isn't active/shown so skip render cycle and + # pause feed(s) + if ( + fast_chart.linked.isHidden() + or not rt_pi.isVisible() + ): + print(f'{fqsn} skipping update for HIDDEN CHART') + fast_chart.pause_all_feeds() + continue - # sync call to update all graphics/UX components. - graphics_update_cycle(ds) + ic = fast_chart.view._ic + if ic: + fast_chart.pause_all_feeds() + print(f'{fqsn} PAUSING DURING INTERACTION') + await ic.wait() + fast_chart.resume_all_feeds() + + # sync call to update all graphics/UX components. + graphics_update_cycle( + ds, + quote, + ) def graphics_update_cycle( ds: DisplayState, + quote: dict, wap_in_history: bool = False, trigger_all: bool = False, # flag used by prepend history updates prepend_update_index: Optional[int] = None, @@ -445,6 +511,12 @@ def graphics_update_cycle( chart = ds.chart # TODO: just pass this as a direct ref to avoid so many attr accesses? hist_chart = ds.godwidget.hist_linked.chart + assert hist_chart + + flume = ds.flume + sym = flume.symbol + fqsn = sym.fqsn + main_flow = chart._flows[fqsn] profiler = Profiler( msg=f'Graphics loop cycle for: `{chart.name}`', @@ -461,357 +533,379 @@ def graphics_update_cycle( # rt "HFT" chart l1 = ds.l1 - ohlcv = ds.ohlcv + # ohlcv = ds.ohlcv + ohlcv = flume.rt_shm array = ohlcv.array vars = ds.vars tick_margin = vars['tick_margin'] - for sym, quote in ds.quotes.items(): + ( + uppx, + liv, + do_append, + i_diff, + append_diff, + do_rt_update, + ) = ds.incr_info() + + # don't real-time "shift" the curve to the + # left unless we get one of the following: + if ( ( - uppx, - liv, - do_append, - i_diff, - append_diff, - do_rt_update, - ) = ds.incr_info() + do_append + and liv + ) + or trigger_all + ): + # print(f'INCREMENTING {fqsn}') + chart.increment_view(steps=i_diff) + main_flow.plot.vb._set_yrange( + # yrange=(mn, mx), + ) - # TODO: we should only run mxmn when we know - # an update is due via ``do_append`` above. - ( - brange, - mx_in_view, - mn_in_view, - mx_vlm_in_view, - ) = ds.maxmin() - l, lbar, rbar, r = brange - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin + # NOTE: since vlm and ohlc charts are axis linked now we don't + # need the double increment request? + # if vlm_chart: + # vlm_chart.increment_view(steps=i_diff) - profiler('`ds.maxmin()` call') + profiler('view incremented') - if ( - prepend_update_index is not None - and lbar > prepend_update_index - ): - # on a history update (usually from the FSP subsys) - # if the segment of history that is being prepended - # isn't in view there is no reason to do a graphics - # update. - log.debug('Skipping prepend graphics cycle: frame not in view') - return + # frames_by_type: dict[str, dict] = {} + # lasts = {} - # don't real-time "shift" the curve to the - # left unless we get one of the following: - if ( - (do_append and liv) - or trigger_all - ): - chart.increment_view(steps=i_diff) - chart.view._set_yrange(yrange=(mn, mx)) + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. - if vlm_chart: - vlm_chart.increment_view(steps=i_diff) + # iterate in FIFO order per tick-frame + # if sym != fqsn: + # continue - profiler('view incremented') + # TODO: we should only run mxmn when we know + # an update is due via ``do_append`` above. + ( + brange, + mx_in_view, + mn_in_view, + mx_vlm_in_view, + ) = ds.maxmin() + l, lbar, rbar, r = brange + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin + profiler('`ds.maxmin()` call') - ticks_frame = quote.get('ticks', ()) + if ( + prepend_update_index is not None + and lbar > prepend_update_index + ): + # on a history update (usually from the FSP subsys) + # if the segment of history that is being prepended + # isn't in view there is no reason to do a graphics + # update. + log.debug('Skipping prepend graphics cycle: frame not in view') + return - frames_by_type: dict[str, dict] = {} - lasts = {} + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = _tick_groups['clears'] - # build tick-type "frames" of tick sequences since - # likely the tick arrival rate is higher then our - # (throttled) quote stream rate. - for tick in ticks_frame: - price = tick.get('price') - ticktype = tick.get('type') + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False - if ticktype == 'n/a' or price == -1: - # okkk.. - continue + # update ohlc sampled price bars + if ( + do_rt_update + or do_append + or trigger_all + ): + chart.update_graphics_from_flow( + fqsn, + # chart.name, + # do_append=do_append, + ) + main_flow.draw_last(array_key=fqsn) - # keys are entered in olded-event-inserted-first order - # since we iterate ``ticks_frame`` in standard order - # above. in other words the order of the keys is the order - # of tick events by type from the provider feed. - frames_by_type.setdefault(ticktype, []).append(tick) + hist_chart.update_graphics_from_flow( + fqsn, + # chart.name, + # do_append=do_append, + ) - # overwrites so the last tick per type is the entry - lasts[ticktype] = tick + # NOTE: we always update the "last" datum + # since the current range should at least be updated + # to it's max/min on the last pixel. + typs: set[str] = set() - # from pprint import pformat - # frame_counts = { - # typ: len(frame) for typ, frame in frames_by_type.items() - # } - # print(f'{pformat(frame_counts)}') - # print(f'framed: {pformat(frames_by_type)}') - # print(f'lasts: {pformat(lasts)}') + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print( + # f'{pformat(frame_counts)}\n' + # f'framed: {pformat(frames_by_type)}\n' + # f'lasts: {pformat(lasts)}\n' + # ) + # for typ, tick in lasts.items(): + # ticks_frame = quote.get('ticks', ()) + ticks_by_type = quote.get('tbt', {}) - # TODO: eventually we want to separate out the utrade (aka - # dark vlm prices) here and show them as an additional - # graphic. - clear_types = _tick_groups['clears'] + # for tick in ticks_frame: + for typ, ticks in ticks_by_type.items(): - # XXX: if we wanted to iterate in "latest" (i.e. most - # current) tick first order as an optimization where we only - # update from the last tick from each type class. - # last_clear_updated: bool = False + # NOTE: ticks are `.append()`-ed to the `ticks_by_type: dict` by the + # `._sampling.uniform_rate_send()` loop + tick = ticks[-1] + # typ = tick.get('type') + price = tick.get('price') + size = tick.get('size') - # update ohlc sampled price bars - if ( - do_rt_update - or do_append - or trigger_all - ): - chart.update_graphics_from_flow( - chart.name, - do_append=do_append, - ) - hist_chart.update_graphics_from_flow( - chart.name, - do_append=do_append, - ) + if typ in typs: + continue - # NOTE: we always update the "last" datum - # since the current range should at least be updated - # to it's max/min on the last pixel. + typs.add(typ) - # iterate in FIFO order per tick-frame - for typ, tick in lasts.items(): + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + if liv: + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) - price = tick.get('price') - size = tick.get('size') + if typ in clear_types: - # compute max and min prices (including bid/ask) from - # tick frames to determine the y-range for chart - # auto-scaling. - # TODO: we need a streaming minmax algo here, see def above. - if liv: - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue - if typ in clear_types: + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. - # XXX: if we only wanted to update graphics from the - # "current"/"latest received" clearing price tick - # once (see alt iteration order above). - # if last_clear_updated: - # continue + # update price sticky(s) + end_ic = array[-1][['index', 'close']] + ds.last_price_sticky.update_from_data(*end_ic) + ds.hist_last_price_sticky.update_from_data(*end_ic) - # last_clear_updated = True - # we only want to update grahpics from the *last* - # tick event that falls under the "clearing price" - # set. + if wap_in_history: + # update vwap overlay line + chart.update_graphics_from_flow('bar_wap') - # update price sticky(s) - end = array[-1] - ds.last_price_sticky.update_from_data( - *end[['index', 'close']] - ) - ds.hist_last_price_sticky.update_from_data( - *end[['index', 'close']] - ) + # L1 book label-line updates + # XXX: is this correct for ib? + # if ticktype in ('trade', 'last'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): - if wap_in_history: - # update vwap overlay line - chart.update_graphics_from_flow( - 'bar_wap', - ) + label = { + l1.ask_label.fields['level']: l1.ask_label, + l1.bid_label.fields['level']: l1.bid_label, + }.get(price) - # L1 book label-line updates - # XXX: is this correct for ib? - # if ticktype in ('trade', 'last'): - # if ticktype in ('last',): # 'size'): - if typ in ('last',): # 'size'): - - label = { - l1.ask_label.fields['level']: l1.ask_label, - l1.bid_label.fields['level']: l1.bid_label, - }.get(price) - - if ( - label is not None - and liv - ): - label.update_fields( - {'level': price, 'size': size} - ) - - # TODO: on trades should we be knocking down - # the relevant L1 queue? - # label.size -= size - - elif ( - typ in _tick_groups['asks'] - # TODO: instead we could check if the price is in the - # y-view-range? - and liv - ): - l1.ask_label.update_fields({'level': price, 'size': size}) - - elif ( - typ in _tick_groups['bids'] - # TODO: instead we could check if the price is in the - # y-view-range? - and liv - ): - l1.bid_label.update_fields({'level': price, 'size': size}) - - # check for y-range re-size - if (mx > vars['last_mx']) or (mn < vars['last_mn']): - - # fast chart resize case if ( - liv - and not chart._static_yrange == 'axis' + label is not None + and liv ): - main_vb = chart.view - if ( - main_vb._ic is None - or not main_vb._ic.is_set() - ): - # print(f'updating range due to mxmn') - main_vb._set_yrange( - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, - yrange=(mn, mx), - ) + label.update_fields( + {'level': price, 'size': size} + ) - # check if slow chart needs a resize - ( - _, - hist_liv, - _, - _, - _, - _, - ) = ds.incr_info( - chart=hist_chart, - shm=ds.hist_ohlcv, - update_state=False, + # TODO: on trades should we be knocking down + # the relevant L1 queue? + # label.size -= size + + elif ( + typ in _tick_groups['asks'] + # TODO: instead we could check if the price is in the + # y-view-range? + and liv + ): + l1.ask_label.update_fields({'level': price, 'size': size}) + + elif ( + typ in _tick_groups['bids'] + # TODO: instead we could check if the price is in the + # y-view-range? + and liv + ): + l1.bid_label.update_fields({'level': price, 'size': size}) + + # check for y-range re-size + if (mx > vars['last_mx']) or (mn < vars['last_mn']): + + # fast chart resize case + if ( + liv + and not chart._static_yrange == 'axis' + ): + # main_vb = chart.view + main_vb = chart._flows[fqsn].plot.vb + if ( + main_vb._ic is None + or not main_vb._ic.is_set() + ): + # print(f'updating range due to mxmn') + main_vb._set_yrange( + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + # yrange=(mn, mx), + ) + + # check if slow chart needs a resize + ( + _, + hist_liv, + _, + _, + _, + _, + ) = ds.incr_info( + chart=hist_chart, + shm=ds.hist_ohlcv, + update_state=False, + is_1m=True, + ) + if hist_liv: + flow = hist_chart._flows[fqsn] + flow.plot.vb._set_yrange( + # yrange=hist_chart.maxmin(name=fqsn), ) - if hist_liv: - hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) - # XXX: update this every draw cycle to make L1-always-in-view work. - vars['last_mx'], vars['last_mn'] = mx, mn + # XXX: update this every draw cycle to make L1-always-in-view work. + vars['last_mx'], vars['last_mn'] = mx, mn - # run synchronous update on all linked flows - # TODO: should the "main" (aka source) flow be special? - for curve_name, flow in chart._flows.items(): - # update any overlayed fsp flows - if curve_name != chart.data_key: + # run synchronous update on all linked flows + # TODO: should the "main" (aka source) flow be special? + for curve_name, flow in chart._flows.items(): + # update any overlayed fsp flows + if ( + # curve_name != chart.data_key + curve_name != fqsn + and not flow.is_ohlc + ): + update_fsp_chart( + chart, + flow, + curve_name, + array_key=curve_name, + ) + + # even if we're downsampled bigly + # draw the last datum in the final + # px column to give the user the mx/mn + # range of that set. + if ( + liv + # and not do_append + # and not do_rt_update + ): + flow.draw_last( + array_key=curve_name, + only_last_uppx=True, + ) + + # volume chart logic.. + # TODO: can we unify this with the above loop? + if vlm_chart: + # print(f"DOING VLM {fqsn}") + vlm_flows = vlm_chart._flows + + # always update y-label + ds.vlm_sticky.update_from_data( + *array[-1][['index', 'volume']] + ) + + if ( + ( + do_rt_update + or do_append + and liv + ) + or trigger_all + ): + # TODO: make it so this doesn't have to be called + # once the $vlm is up? + vlm_chart.update_graphics_from_flow( + 'volume', + # UGGGh, see ``maxmin()`` impl in `._fsp` for + # the overlayed plotitems... we need a better + # bay to invoke a maxmin per overlay.. + render=False, + # XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^ + # without this, since we disable the + # 'volume' (units) chart after the $vlm starts + # up we need to be sure to enable this + # auto-ranging otherwise there will be no handler + # connected to update accompanying overlay + # graphics.. + ) + profiler('`vlm_chart.update_graphics_from_flow()`') + + if ( + mx_vlm_in_view != vars['last_mx_vlm'] + ): + yrange = (0, mx_vlm_in_view * 1.375) + vlm_chart.view._set_yrange( + yrange=yrange, + ) + profiler('`vlm_chart.view._set_yrange()`') + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + vars['last_mx_vlm'] = mx_vlm_in_view + + # update all downstream FSPs + for curve_name, flow in vlm_flows.items(): + + if ( + curve_name not in {'volume', fqsn} + and flow.render + and ( + liv and do_rt_update + or do_append + ) + # and not flow.is_ohlc + # and curve_name != fqsn + ): update_fsp_chart( - chart, + vlm_chart, flow, curve_name, array_key=curve_name, + # do_append=uppx < update_uppx, + # do_append=do_append, + ) + # is this even doing anything? + # (pretty sure it's the real-time + # resizing from last quote?) + fvb = flow.plot.vb + fvb._set_yrange( + name=curve_name, ) - # even if we're downsampled bigly - # draw the last datum in the final - # px column to give the user the mx/mn - # range of that set. - if ( - not do_append - # and not do_rt_update + elif ( + curve_name != 'volume' + and not do_append and liv + and uppx >= 1 + # even if we're downsampled bigly + # draw the last datum in the final + # px column to give the user the mx/mn + # range of that set. ): - flow.draw_last( - array_key=curve_name, - only_last_uppx=True, - ) - - # volume chart logic.. - # TODO: can we unify this with the above loop? - if vlm_chart: - # always update y-label - ds.vlm_sticky.update_from_data( - *array[-1][['index', 'volume']] - ) - - if ( - ( - do_rt_update - or do_append - and liv - ) - or trigger_all - ): - # TODO: make it so this doesn't have to be called - # once the $vlm is up? - vlm_chart.update_graphics_from_flow( - 'volume', - # UGGGh, see ``maxmin()`` impl in `._fsp` for - # the overlayed plotitems... we need a better - # bay to invoke a maxmin per overlay.. - render=False, - # XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^ - # without this, since we disable the - # 'volume' (units) chart after the $vlm starts - # up we need to be sure to enable this - # auto-ranging otherwise there will be no handler - # connected to update accompanying overlay - # graphics.. - ) - profiler('`vlm_chart.update_graphics_from_flow()`') - - if ( - mx_vlm_in_view != vars['last_mx_vlm'] - ): - yrange = (0, mx_vlm_in_view * 1.375) - vlm_chart.view._set_yrange( - yrange=yrange, - ) - profiler('`vlm_chart.view._set_yrange()`') - # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vars['last_mx_vlm'] = mx_vlm_in_view - - for curve_name, flow in vlm_chart._flows.items(): - - if ( - curve_name != 'volume' and - flow.render and ( - liv and - do_rt_update or do_append - ) - ): - update_fsp_chart( - vlm_chart, - flow, - curve_name, - array_key=curve_name, - # do_append=uppx < update_uppx, - do_append=do_append, - ) - # is this even doing anything? - # (pretty sure it's the real-time - # resizing from last quote?) - fvb = flow.plot.vb - fvb._set_yrange( - name=curve_name, - ) - - elif ( - curve_name != 'volume' - and not do_append - and liv - and uppx >= 1 - # even if we're downsampled bigly - # draw the last datum in the final - # px column to give the user the mx/mn - # range of that set. - ): - # always update the last datum-element - # graphic for all flows - # print(f'drawing last {flow.name}') - flow.draw_last(array_key=curve_name) + # always update the last datum-element + # graphic for all flows + # print(f'drawing last {flow.name}') + flow.draw_last(array_key=curve_name) async def link_views_with_region( @@ -836,11 +930,12 @@ async def link_views_with_region( pen=pg.mkPen(hcolor('gunmetal')), brush=pg.mkBrush(hcolor('default_darkest')), ) - region.setZValue(10) # put linear region "in front" in layer terms + region.setOpacity(0) hist_pi.addItem(region, ignoreBounds=True) + region.setOpacity(6/16) - flow = rt_chart._flows[hist_chart.name] + flow = rt_chart._flows[flume.symbol.fqsn] assert flow # XXX: no idea why this doesn't work but it's causing @@ -948,6 +1043,35 @@ async def link_views_with_region( # region.sigRegionChangeFinished.connect(update_pi_from_region) +# force 0 to always be in view +def multi_maxmin( + chart: ChartPlotWidget, + names: list[str], + +) -> tuple[float, float]: + ''' + Flows "group" maxmin loop; assumes all named flows + are in the same co-domain and thus can be sorted + as one set. + + Iterates all the named flows and calls the chart + api to find their range values and return. + + TODO: really we should probably have a more built-in API + for this? + + ''' + mx = 0 + for name in names: + ymn, ymx = chart.maxmin(name=name) + mx = max(mx, ymx) + + return 0, mx + + +_quote_throttle_rate: int = 60 - 6 + + async def display_symbol_data( godwidget: GodWidget, fqsns: list[str], @@ -974,12 +1098,19 @@ async def display_symbol_data( # ) for fqsn in fqsns: - loading_sym_key = sbar.open_status( f'loading {fqsn} ->', group_key=True ) + # TODO: ctl over update loop's maximum frequency. + # - load this from a config.toml! + # - allow dyanmic configuration from chart UI? + global _quote_throttle_rate + from ._window import main_window + display_rate = main_window().current_screen().refreshRate() + _quote_throttle_rate = floor(display_rate) - 6 + feed: Feed async with open_feed( fqsns, @@ -987,111 +1118,118 @@ async def display_symbol_data( # limit to at least display's FPS # avoiding needless Qt-in-guest-mode context switches - tick_throttle=_quote_throttle_rate, + tick_throttle=min( + round(_quote_throttle_rate/len(fqsns)), + 22, + ), ) as feed: - # TODO: right now we only show one symbol on charts, but - # overlays are coming muy pronto guey.. - assert len(feed.flumes) == 1 - flume = list(feed.flumes.values())[0] + # use expanded contract symbols passed back from feed layer. + fqsns = list(feed.flumes.keys()) - ohlcv: ShmArray = flume.rt_shm - hist_ohlcv: ShmArray = flume.hist_shm - - symbol = flume.symbol - fqsn = symbol.fqsn - brokername = symbol.brokers[0] - - step_size_s = 1 - tf_key = tf_in_1s[step_size_s] - - # load in symbol's ohlc data + # step_size_s = 1 + # tf_key = tf_in_1s[step_size_s] godwidget.window.setWindowTitle( - f'{fqsn} ' - f'tick:{symbol.tick_size} ' - f'step:{tf_key} ' + f'{fqsns} ' + # f'tick:{symbol.tick_size} ' + # f'step:{tf_key} ' ) - - rt_linked = godwidget.rt_linked - rt_linked._symbol = symbol - - # create top history view chart above the "main rt chart". - hist_linked = godwidget.hist_linked - hist_linked._symbol = symbol - hist_chart = hist_linked.plot_ohlc_main( - symbol, - hist_ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - # sidepane=False, - sidepane=godwidget.search, - ) - # don't show when not focussed - hist_linked.cursor.always_show_xlabel = False - # generate order mode side-pane UI # A ``FieldsForm`` form to configure order entry # and add as next-to-y-axis singleton pane pp_pane: FieldsForm = mk_order_pane_layout(godwidget) godwidget.pp_pane = pp_pane - # create main OHLC chart - ohlc_chart = rt_linked.plot_ohlc_main( - symbol, - ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - sidepane=pp_pane, - ) - - ohlc_chart._feeds[symbol.key] = feed - ohlc_chart.setFocus() - - # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! - # plot historical vwap if available - wap_in_history = False - # if ( - # brokermod._show_wap_in_history - # and 'bar_wap' in bars.dtype.fields - # ): - # wap_in_history = True - # ohlc_chart.draw_curve( - # name='bar_wap', - # shm=ohlcv, - # color='default_light', - # add_label=False, - # ) - - # NOTE: we must immediately tell Qt to show the OHLC chart - # to avoid a race where the subplots get added/shown to - # the linked set *before* the main price chart! - rt_linked.show() - rt_linked.focus() - await trio.sleep(0) + # create top history view chart above the "main rt chart". + rt_linked = godwidget.rt_linked + hist_linked = godwidget.hist_linked # NOTE: here we insert the slow-history chart set into # the fast chart's splitter -> so it's a splitter of charts # inside the first widget slot of a splitter of charts XD rt_linked.splitter.insertWidget(0, hist_linked) - # XXX: if we wanted it at the bottom? - # rt_linked.splitter.addWidget(hist_linked) - rt_linked.focus() - godwidget.resize_all() + rt_chart: None | ChartPlotWidget = None + hist_chart: None | ChartPlotWidget = None + vlm_chart: None | ChartPlotWidget = None + + # TODO: I think some palette's based on asset group types + # would be good, for eg: + # - underlying and opts contracts + # - index and underlyings + futures + # - gradient in "lightness" based on liquidity, or lifetime in derivs? + palette = itertools.cycle([ + # curve color, last bar curve color + ['i3', 'gray'], + ['grayer', 'bracket'], + ['grayest', 'i3'], + ['default_dark', 'default'], + ]) + + pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {} + + # load in ohlc data to a common linked but split chart set. + fitems: list[ + tuple[str, Flume] + ] = list(feed.flumes.items()) + + # for the "first"/selected symbol we create new chart widgets + # and sub-charts for FSPs + fqsn, flume = fitems[0] + + rt_linked._symbol = flume.symbol + hist_linked._symbol = flume.symbol + + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + brokername = symbol.brokers[0] + fqsn = symbol.fqsn + + hist_chart = hist_linked.plot_ohlc_main( + symbol, + hist_ohlcv, + flume, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + # sidepane=False, + sidepane=godwidget.search, + ) + pis.setdefault(fqsn, [None, None])[1] = hist_chart.plotItem + + # don't show when not focussed + hist_linked.cursor.always_show_xlabel = False + + rt_chart = rt_linked.plot_ohlc_main( + symbol, + ohlcv, + flume, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + sidepane=pp_pane, + ) + pis.setdefault(fqsn, [None, None])[0] = rt_chart.plotItem + + # for pause/resume on mouse interaction + rt_chart.feed = feed - vlm_chart: Optional[ChartPlotWidget] = None async with trio.open_nursery() as ln: - # if available load volume related built-in display(s) + vlm_charts: dict[ + str, + None | ChartPlotWidget + ] = {}.fromkeys(feed.flumes) if ( not symbol.broker_info[brokername].get('no_vlm', False) and has_vlm(ohlcv) + and vlm_chart is None ): - vlm_chart = await ln.start( + vlm_charts[fqsn] = await ln.start( open_vlm_displays, rt_linked, - ohlcv, + flume, ) # load (user's) FSP set (otherwise known as "indicators") @@ -1099,57 +1237,139 @@ async def display_symbol_data( ln.start_soon( start_fsp_displays, rt_linked, - ohlcv, + flume, loading_sym_key, loglevel, ) - # start graphics update loop after receiving first live quote - ln.start_soon( - graphics_update_loop, - ln, - godwidget, - flume, - wap_in_history, - vlm_chart, - ) + # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! + # plot historical vwap if available + wap_in_history = False + # if ( + # brokermod._show_wap_in_history + # and 'bar_wap' in bars.dtype.fields + # ): + # wap_in_history = True + # rt_chart.draw_curve( + # name='bar_wap', + # shm=ohlcv, + # color='default_light', + # add_label=False, + # ) + for fqsn, flume in fitems[1:]: + # get a new color from the palette + bg_chart_color, bg_last_bar_color = next(palette) + + rt_linked._symbol = flume.symbol + hist_linked._symbol = flume.symbol + + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + fqsn = symbol.fqsn + + hist_pi = hist_chart.overlay_plotitem( + name=fqsn, + axis_title=fqsn, + ) + hist_pi.hideAxis('left') + hist_pi.hideAxis('bottom') + + flow = hist_chart.draw_curve( + fqsn, + hist_ohlcv, + flume, + array_key=fqsn, + overlay=hist_pi, + pi=hist_pi, + is_ohlc=True, + + color=bg_chart_color, + last_bar_color=bg_last_bar_color, + ) + + hist_pi.vb.maxmin = partial( + hist_chart.maxmin, + name=fqsn, + ) + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + flow = hist_chart._flows[fqsn] + assert flow.plot is hist_pi + pis.setdefault(fqsn, [None, None])[1] = hist_pi + + rt_pi = rt_chart.overlay_plotitem( + name=fqsn, + axis_title=fqsn, + ) + + rt_pi.hideAxis('left') + rt_pi.hideAxis('bottom') + + flow = rt_chart.draw_curve( + fqsn, + ohlcv, + flume, + array_key=fqsn, + overlay=rt_pi, + pi=rt_pi, + is_ohlc=True, + + color=bg_chart_color, + last_bar_color=bg_last_bar_color, + ) + rt_pi.vb.maxmin = partial( + rt_chart.maxmin, + name=fqsn, + ) + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + flow = rt_chart._flows[fqsn] + assert flow.plot is rt_pi + pis.setdefault(fqsn, [None, None])[0] = rt_pi + + rt_chart.setFocus() + + # NOTE: we must immediately tell Qt to show the OHLC chart + # to avoid a race where the subplots get added/shown to + # the linked set *before* the main price chart! + rt_linked.show() + rt_linked.focus() await trio.sleep(0) - # size view to data prior to order mode init - ohlc_chart.default_view() - rt_linked.graphics_cycle() - await trio.sleep(0) - - hist_chart.default_view( - bars_from_y=int(len(hist_ohlcv.array)), # size to data - y_offset=6116*2, # push it a little away from the y-axis - ) - hist_linked.graphics_cycle() - await trio.sleep(0) + # XXX: if we wanted it at the bottom? + # rt_linked.splitter.addWidget(hist_linked) + rt_linked.focus() godwidget.resize_all() - await link_views_with_region( - ohlc_chart, - hist_chart, - flume, - ) + # add all additional symbols as overlays + for fqsn, flume in feed.flumes.items(): - mode: OrderMode - async with ( - open_order_mode( - feed, - godwidget, - fqsn, - order_mode_started - ) as mode - ): - if not vlm_chart: - # trigger another view reset if no sub-chart - ohlc_chart.default_view() + # size view to data prior to order mode init + rt_chart.default_view() + rt_linked.graphics_cycle() + await trio.sleep(0) - rt_linked.mode = mode + hist_chart.default_view( + bars_from_y=int(len(hist_ohlcv.array)), # size to data + y_offset=6116*2, # push it a little away from the y-axis + ) + hist_linked.graphics_cycle() + await trio.sleep(0) + + godwidget.resize_all() + + # trigger another view reset if no sub-chart + hist_chart.default_view() + rt_chart.default_view() # let Qt run to render all widgets and make sure the # sidepanes line up vertically. @@ -1168,9 +1388,10 @@ async def display_symbol_data( # TODO: make this not so shit XD # close group status - sbar._status_groups[loading_sym_key][1]() + # sbar._status_groups[loading_sym_key][1]() hist_linked.graphics_cycle() + rt_chart.default_view() await trio.sleep(0) bars_in_mem = int(len(hist_ohlcv.array)) @@ -1181,5 +1402,37 @@ async def display_symbol_data( ) godwidget.resize_all() - # let the app run.. bby - await trio.sleep_forever() + await link_views_with_region( + rt_chart, + hist_chart, + flume, + ) + + # start graphics update loop after receiving first live quote + ln.start_soon( + graphics_update_loop, + ln, + godwidget, + feed, + pis, + wap_in_history, + vlm_charts, + ) + + rt_chart.default_view() + await trio.sleep(0) + + mode: OrderMode + async with ( + open_order_mode( + feed, + godwidget, + fqsns[-1], + order_mode_started + ) as mode + ): + rt_linked.mode = mode + + rt_chart.default_view() + hist_chart.default_view() + await trio.sleep_forever() # let the app run.. bby diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index a2908905..2e04bb37 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF from ..data._sharedmem import ( ShmArray, ) +from ..data.feed import Flume from .._profile import ( pg_profile_enabled, # ms_slower_then, @@ -208,13 +209,16 @@ class Flow(msgspec.Struct): # , frozen=True): ''' name: str plot: pg.PlotItem - graphics: Curve | BarItems _shm: ShmArray + flume: Flume + graphics: Curve | BarItems + + # for tracking y-mn/mx for y-axis auto-ranging yrange: tuple[float, float] = None # in some cases a flow may want to change its - # graphical "type" or, "form" when downsampling, - # normally this is just a plain line. + # graphical "type" or, "form" when downsampling, to + # start this is only ever an interpolation line. ds_graphics: Optional[Curve] = None is_ohlc: bool = False @@ -249,9 +253,9 @@ class Flow(msgspec.Struct): # , frozen=True): # TODO: remove this and only allow setting through # private ``._shm`` attr? - @shm.setter - def shm(self, shm: ShmArray) -> ShmArray: - self._shm = shm + # @shm.setter + # def shm(self, shm: ShmArray) -> ShmArray: + # self._shm = shm def maxmin( self, @@ -318,9 +322,15 @@ class Flow(msgspec.Struct): # , frozen=True): ''' vr = self.plot.viewRect() - return int(vr.left()), int(vr.right()) + return ( + vr.left(), + vr.right(), + ) - def datums_range(self) -> tuple[ + def datums_range( + self, + index_field: str = 'index', + ) -> tuple[ int, int, int, int, int, int ]: ''' @@ -328,6 +338,8 @@ class Flow(msgspec.Struct): # , frozen=True): ''' l, r = self.view_range() + l = round(l) + r = round(r) # TODO: avoid this and have shm passed # in earlier. @@ -348,15 +360,23 @@ class Flow(msgspec.Struct): # , frozen=True): def read( self, array_field: Optional[str] = None, + index_field: str = 'index', ) -> tuple[ int, int, np.ndarray, int, int, np.ndarray, ]: - # read call + ''' + Read the underlying shm array buffer and + return the data plus indexes for the first + and last + which has been written to. + + ''' + # readable data array = self.shm.array - indexes = array['index'] + indexes = array[index_field] ifirst = indexes[0] ilast = indexes[-1] diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 8c2e64a1..29162635 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -42,6 +42,8 @@ from ..data._sharedmem import ( _Token, try_read, ) +from ..data.feed import Flume +from ..data._source import Symbol from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -111,7 +113,7 @@ def update_fsp_chart( # read from last calculated value and update any label last_val_sticky = chart.plotItem.getAxis( - 'right')._stickies.get(chart.name) + 'right')._stickies.get(graphics_name) if last_val_sticky: last = last_row[array_key] last_val_sticky.update_from_data(-1, last) @@ -212,7 +214,7 @@ async def open_fsp_actor_cluster( async def run_fsp_ui( linkedsplits: LinkedSplits, - shm: ShmArray, + flume: Flume, started: trio.Event, target: Fsp, conf: dict[str, dict], @@ -249,9 +251,11 @@ async def run_fsp_ui( else: chart = linkedsplits.subplots[overlay_with] + shm = flume.rt_shm chart.draw_curve( - name=name, - shm=shm, + name, + shm, + flume, overlay=True, color='default_light', array_key=name, @@ -261,8 +265,9 @@ async def run_fsp_ui( else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( - name=name, - shm=shm, + name, + shm, + flume, array_key=name, sidepane=sidepane, @@ -352,6 +357,9 @@ async def run_fsp_ui( # last = time.time() +# TODO: maybe this should be our ``Flow`` type since it maps +# one flume to the next? The machinery for task/actor mgmt should +# be part of the instantiation API? class FspAdmin: ''' Client API for orchestrating FSP actors and displaying @@ -363,7 +371,7 @@ class FspAdmin: tn: trio.Nursery, cluster: dict[str, tractor.Portal], linked: LinkedSplits, - src_shm: ShmArray, + flume: Flume, ) -> None: self.tn = tn @@ -375,7 +383,11 @@ class FspAdmin: tuple[tractor.MsgStream, ShmArray] ] = {} self._flow_registry: dict[_Token, str] = {} - self.src_shm = src_shm + + # TODO: make this a `.src_flume` and add + # a `dst_flume`? + # (=> but then wouldn't this be the most basic `Flow`?) + self.flume = flume def rr_next_portal(self) -> tractor.Portal: name, portal = next(self._rr_next_actor) @@ -388,7 +400,7 @@ class FspAdmin: complete: trio.Event, started: trio.Event, fqsn: str, - dst_shm: ShmArray, + dst_fsp_flume: Flume, conf: dict, target: Fsp, loglevel: str, @@ -409,9 +421,10 @@ class FspAdmin: # data feed key fqsn=fqsn, + # TODO: pass `Flume.to_msg()`s here? # mems - src_shm_token=self.src_shm.token, - dst_shm_token=dst_shm.token, + src_shm_token=self.flume.rt_shm.token, + dst_shm_token=dst_fsp_flume.rt_shm.token, # target ns_path=ns_path, @@ -428,12 +441,14 @@ class FspAdmin: ctx.open_stream() as stream, ): + dst_fsp_flume.stream: tractor.MsgStream = stream + # register output data self._registry[ (fqsn, ns_path) ] = ( stream, - dst_shm, + dst_fsp_flume.rt_shm, complete ) @@ -468,9 +483,9 @@ class FspAdmin: worker_name: Optional[str] = None, loglevel: str = 'info', - ) -> (ShmArray, trio.Event): + ) -> (Flume, trio.Event): - fqsn = self.linked.symbol.front_fqsn() + fqsn = self.flume.symbol.fqsn # allocate an output shm array key, dst_shm, opened = maybe_mk_fsp_shm( @@ -478,8 +493,28 @@ class FspAdmin: target=target, readonly=True, ) + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + provider_tag = portal.channel.uid + + symbol = Symbol( + key=key, + broker_info={ + provider_tag: {'asset_type': 'fsp'}, + }, + ) + dst_fsp_flume = Flume( + symbol=symbol, + _rt_shm_token=dst_shm.token, + first_quote={}, + + # set to 0 presuming for now that we can't load + # FSP history (though we should eventually). + izero_hist=0, + izero_rt=0, + ) self._flow_registry[( - self.src_shm._token, + self.flume.rt_shm._token, target.name )] = dst_shm._token @@ -488,7 +523,6 @@ class FspAdmin: # f'Already started FSP `{fqsn}:{func_name}`' # ) - portal = self.cluster.get(worker_name) or self.rr_next_portal() complete = trio.Event() started = trio.Event() self.tn.start_soon( @@ -497,13 +531,13 @@ class FspAdmin: complete, started, fqsn, - dst_shm, + dst_fsp_flume, conf, target, loglevel, ) - return dst_shm, started + return dst_fsp_flume, started async def open_fsp_chart( self, @@ -515,7 +549,7 @@ class FspAdmin: ) -> (trio.Event, ChartPlotWidget): - shm, started = await self.start_engine_task( + flume, started = await self.start_engine_task( target, conf, loglevel, @@ -527,7 +561,7 @@ class FspAdmin: run_fsp_ui, self.linked, - shm, + flume, started, target, @@ -541,7 +575,7 @@ class FspAdmin: @acm async def open_fsp_admin( linked: LinkedSplits, - src_shm: ShmArray, + flume: Flume, **kwargs, ) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: @@ -562,7 +596,7 @@ async def open_fsp_admin( tn, cluster_map, linked, - src_shm, + flume, ) try: yield admin @@ -576,7 +610,7 @@ async def open_fsp_admin( async def open_vlm_displays( linked: LinkedSplits, - ohlcv: ShmArray, + flume: Flume, dvlm: bool = True, task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, @@ -598,6 +632,8 @@ async def open_vlm_displays( sig = inspect.signature(flow_rates.func) params = sig.parameters + ohlcv: ShmArray = flume.rt_shm + async with ( open_fsp_sidepane( linked, { @@ -617,7 +653,7 @@ async def open_vlm_displays( } }, ) as sidepane, - open_fsp_admin(linked, ohlcv) as admin, + open_fsp_admin(linked, flume) as admin, ): # TODO: support updates # period_field = sidepane.fields['period'] @@ -633,6 +669,7 @@ async def open_vlm_displays( chart = linked.add_plot( name='volume', shm=shm, + flume=flume, array_key='volume', sidepane=sidepane, @@ -645,6 +682,8 @@ async def open_vlm_displays( # the curve item internals are pretty convoluted. style='step', ) + # back-link the volume chart to trigger y-autoranging + # in the ohlc (parent) chart. ohlc_chart.view.enable_auto_yrange( src_vb=chart.view, ) @@ -710,7 +749,7 @@ async def open_vlm_displays( tasks_ready = [] # spawn and overlay $ vlm on the same subchart - dvlm_shm, started = await admin.start_engine_task( + dvlm_flume, started = await admin.start_engine_task( dolla_vlm, { # fsp engine conf @@ -807,9 +846,13 @@ async def open_vlm_displays( else: color = 'bracket' - curve, _ = chart.draw_curve( - name=name, - shm=shm, + assert isinstance(shm, ShmArray) + assert isinstance(flume, Flume) + + flow = chart.draw_curve( + name, + shm, + flume, array_key=name, overlay=pi, color=color, @@ -822,20 +865,20 @@ async def open_vlm_displays( # specially store ref to shm for lookup in display loop # since only a placeholder of `None` is entered in # ``.draw_curve()``. - flow = chart._flows[name] + # flow = chart._flows[name] assert flow.plot is pi chart_curves( fields, dvlm_pi, - dvlm_shm, + dvlm_flume.rt_shm, step_mode=True, ) # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # up since this one depends on it. - fr_shm, started = await admin.start_engine_task( + fr_flume, started = await admin.start_engine_task( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', @@ -848,7 +891,7 @@ async def open_vlm_displays( # chart_curves( # dvlm_rate_fields, # dvlm_pi, - # fr_shm, + # fr_flume.rt_shm, # ) # TODO: is there a way to "sync" the dual axes such that only @@ -896,7 +939,7 @@ async def open_vlm_displays( chart_curves( trade_rate_fields, tr_pi, - fr_shm, + fr_flume.rt_shm, # step_mode=True, # dashed line to represent "individual trades" being @@ -930,7 +973,7 @@ async def open_vlm_displays( async def start_fsp_displays( linked: LinkedSplits, - ohlcv: ShmArray, + flume: Flume, group_status_key: str, loglevel: str, @@ -973,7 +1016,10 @@ async def start_fsp_displays( async with ( # NOTE: this admin internally opens an actor cluster - open_fsp_admin(linked, ohlcv) as admin, + open_fsp_admin( + linked, + flume, + ) as admin, ): statuses = [] for target, conf in fsp_conf.items(): diff --git a/piker/ui/_ohlc.py b/piker/ui/_ohlc.py index a8519d90..2ce23d30 100644 --- a/piker/ui/_ohlc.py +++ b/piker/ui/_ohlc.py @@ -99,7 +99,7 @@ class BarItems(pg.GraphicsObject): linked: LinkedSplits, plotitem: 'pg.PlotItem', # noqa color: str = 'bracket', - last_bar_color: str = 'bracket', + last_bar_color: str = 'original', name: Optional[str] = None, diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 98584161..9baca8ee 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -41,7 +41,11 @@ from ._anchors import ( pp_tight_and_right, # wanna keep it straight in the long run gpath_pin, ) -from ..calc import humanize, pnl, puterize +from ..calc import ( + humanize, + pnl, + puterize, +) from ..clearing._allocate import Allocator from ..pp import Position from ..data._normalize import iterticks @@ -80,9 +84,9 @@ async def update_pnl_from_feed( ''' global _pnl_tasks - pp = order_mode.current_pp - live = pp.live_pp - key = live.symbol.front_fqsn() + pp: PositionTracker = order_mode.current_pp + live: Position = pp.live_pp + key: str = live.symbol.front_fqsn() log.info(f'Starting pnl display for {pp.alloc.account}') @@ -101,11 +105,22 @@ async def update_pnl_from_feed( async with flume.stream.subscribe() as bstream: # last_tick = time.time() async for quotes in bstream: - # now = time.time() # period = now - last_tick for sym, quote in quotes.items(): + # print(f'{key} PnL: sym:{sym}') + + # TODO: uggggh we probably want a better state + # management then this sincce we want to enable + # updating whatever the current symbol is in + # real-time right? + if sym != key: + continue + + # watch out for wrong quote msg-data if you muck + # with backend feed subs code.. + # assert sym == quote['fqsn'] for tick in iterticks(quote, types): # print(f'{1/period} Hz') @@ -119,13 +134,17 @@ async def update_pnl_from_feed( else: # compute and display pnl status - order_mode.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - # live.ppu, - order_mode.current_pp.live_pp.ppu, - tick['price'], - ), - ) + pnl_val = ( + copysign(1, size) + * + pnl( + # live.ppu, + order_mode.current_pp.live_pp.ppu, + tick['price'], + ) + ) + # print(f'formatting PNL {sym} => {pnl_val}') + order_mode.pane.pnl_label.format(pnl=pnl_val) # last_tick = time.time() finally: diff --git a/piker/ui/_style.py b/piker/ui/_style.py index 22731093..52ac753a 100644 --- a/piker/ui/_style.py +++ b/piker/ui/_style.py @@ -240,12 +240,12 @@ def hcolor(name: str) -> str: 'gunmetal': '#91A3B0', 'battleship': '#848482', + # default ohlc-bars/curve gray + 'bracket': '#666666', # like the logo + # bluish 'charcoal': '#36454F', - # default bars - 'bracket': '#666666', # like the logo - # work well for filled polygons which want a 'bracket' feel # going light to dark 'davies': '#555555', diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 7e4ae066..1dd49872 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -88,7 +88,7 @@ class Dialog(Struct): # TODO: use ``pydantic.UUID4`` field uuid: str order: Order - symbol: Symbol + symbol: str lines: list[LevelLine] last_status_close: Callable = lambda: None msgs: dict[str, dict] = {} @@ -379,7 +379,7 @@ class OrderMode: dialog = Dialog( uuid=order.oid, order=order, - symbol=order.symbol, + symbol=order.symbol, # XXX: always a str? lines=lines, last_status_close=self.multistatus.open_status( f'submitting {order.exec_mode}-{order.action}', @@ -930,7 +930,6 @@ async def process_trade_msg( ) -> tuple[Dialog, Status]: - get_index = mode.chart.get_index fmsg = pformat(msg) log.debug(f'Received order msg:\n{fmsg}') name = msg['name'] @@ -965,6 +964,10 @@ async def process_trade_msg( oid = msg.oid dialog: Dialog = mode.dialogs.get(oid) + if dialog: + fqsn = dialog.symbol + flume = mode.feed.flumes[fqsn] + match msg: case Status( resp='dark_open' | 'open', @@ -1034,10 +1037,11 @@ async def process_trade_msg( # should only be one "fill" for an alert # add a triangle and remove the level line req = Order(**req) + index = flume.get_index(time.time()) mode.on_fill( oid, price=req.price, - arrow_index=get_index(time.time()), + arrow_index=index, ) mode.lines.remove_line(uuid=oid) msg.req = req @@ -1065,26 +1069,27 @@ async def process_trade_msg( action = order.action details = msg.brokerd_msg + # TODO: put the actual exchange timestamp? + # NOTE: currently the ``kraken`` openOrders sub + # doesn't deliver their engine timestamp as part of + # it's schema, so this value is **not** from them + # (see our backend code). We should probably either + # include all provider-engine timestamps in the + # summary 'closed' status msg and/or figure out + # a way to indicate what is a `brokerd` stamp versus + # a true backend one? This will require finagling + # with how each backend tracks/summarizes time + # stamps for the downstream API. + index = flume.get_index( + details['broker_time'] + ) + # TODO: some kinda progress system mode.on_fill( oid, price=details['price'], + arrow_index=index, pointing='up' if action == 'buy' else 'down', - - # TODO: put the actual exchange timestamp - arrow_index=get_index( - # TODO: note currently the ``kraken`` openOrders sub - # doesn't deliver their engine timestamp as part of - # it's schema, so this value is **not** from them - # (see our backend code). We should probably either - # include all provider-engine timestamps in the - # summary 'closed' status msg and/or figure out - # a way to indicate what is a `brokerd` stamp versus - # a true backend one? This will require finagling - # with how each backend tracks/summarizes time - # stamps for the downstream API. - details['broker_time'] - ), ) # TODO: append these fill events to the position's clear