From 99bba1240d580b9baa4b93742bdfe4c1a75fc5b5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Nov 2022 15:05:05 -0500 Subject: [PATCH] Make graphics-update-loop multi-sym aware B) Initial support for real-time multi-symbol overlay charts using an aggregate feed delivered by `Feed.open_multi_stream()`. The setup steps for constructing the overlayed plot items is still very very rough and will likely provide incentive for better refactoring high level "charting APIs". For each fqsn passed into `display_symbol_data()` we now synchronously, - create a single call to `LinkedSplits.plot_ohlc_main() -> `ChartPlotWidget` where we cache the chart in scope and for all other "sibling" fqsns we, - make a call to `ChartPlotWidget.overlay_plotitem()` -> `PlotItem`, hide its axes, make another call with this plotitem input to `ChartPlotWidget.draw_curve()`, set a sym-specific view box auto-yrange maxmin callback, register the plotitem in a global `pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {}` Once all plots have been created we then asynchronously for each symbol, - maybe create a volume chart and register it in a similar task-global table: `vlms: dict[str, ChartPlotWidget] = {}` - start fsp displays for each symbol Then common entrypoints are entered once for all symbols: - a single `graphics_update_loop()` loop-task is started wherein real-time graphics update components for each symbol are created, * `L1Labels` * y-axis last clearing price stickies * `maxmin()` auto-ranger * `DisplayState` (stored in a table `dss: dict[str, DisplayState] = {}`) * an `increment_history_view()` task and a single call to `Feed.open_multi_stream()` is used to create a symbol-multiplexed quote stream which drives a single loop over all symbols wherein for each quote the appropriate components are looked up and passed to `graphics_update_cycle()`. - a single call to `open_order_mode()` is made with the first symbol provided as input, though eventually we want to support passing in the entire list. Further internal implementation details: - special tweaks to the `pg.LinearRegionItem` setup wherein the region is added with a zero opacity and *after* all plotitem overlays to avoid and issue where overlays weren't being shown within the region area in the history chart. - all symbol-specific graphics oriented update calls are adjusted to pass in the fqsn: * `update_fsp_chart()` * `ChartView._set_yrange()` * ChartPlotWidget.update_graphics_from_flow()` - avoid a double increment on sample step updates by not calling the increment on any vlm chart since it seems the vlm-ohlc chart linking already takes care of this now? - use global counters for the last epoch time step to avoid incrementing all views more then once per new time step given underlying shm array buffers may be on different array-index values from one another. --- piker/ui/_display.py | 1432 ++++++++++++++++++++++++------------------ 1 file changed, 832 insertions(+), 600 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index c7ed9299..2e43b475 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -36,6 +36,9 @@ from ..data.feed import ( Flume, ) from ..data.types import Struct +from ..data._sharedmem import ( + ShmArray, +) from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, @@ -50,14 +53,12 @@ from ._fsp import ( has_vlm, open_vlm_displays, ) -from ..data._sharedmem import ( - ShmArray, -) -from ..data._source import tf_in_1s from ._forms import ( FieldsForm, mk_order_pane_layout, ) +from . import _pg_overrides as pgo +# from ..data._source import tf_in_1s from .order_mode import ( open_order_mode, OrderMode, @@ -90,7 +91,8 @@ _tick_groups = { # https://github.com/lemire/pythonmaxmin def chart_maxmin( chart: ChartPlotWidget, - ohlcv_shm: ShmArray, + fqsn: str, + # ohlcv_shm: ShmArray, vlm_chart: Optional[ChartPlotWidget] = None, ) -> tuple[ @@ -106,7 +108,7 @@ def chart_maxmin( ''' last_bars_range = chart.bars_range() - out = chart.maxmin() + out = chart.maxmin(name=fqsn) if out is None: return (last_bars_range, 0, 0, 0) @@ -131,6 +133,10 @@ def chart_maxmin( ) +_i_last: int = 0 +_i_last_append: int = 0 + + class DisplayState(Struct): ''' Chart-local real-time graphics state container. @@ -140,6 +146,7 @@ class DisplayState(Struct): quotes: dict[str, Any] maxmin: Callable + flume: Flume ohlcv: ShmArray hist_ohlcv: ShmArray @@ -173,14 +180,18 @@ class DisplayState(Struct): update_state: bool = True, update_uppx: float = 16, + is_1m: bool = False, ) -> tuple: shm = shm or self.ohlcv chart = chart or self.chart - state = state or self.vars + # state = state or self.vars - if not update_state: + if ( + not update_state + and state + ): state = state.copy() # compute the first available graphic's x-units-per-pixel @@ -194,26 +205,56 @@ class DisplayState(Struct): # "curve" length is already automatic. # increment the view position by the sample offset. - i_step = shm.index - i_diff = i_step - state['i_last'] - state['i_last'] = i_step + # i_step = shm.index + i_step = shm.array[-1]['time'] + # i_diff = i_step - state['i_last'] + # state['i_last'] = i_step + global _i_last, _i_last_append + i_diff = i_step - _i_last + # update global state + if ( + # state is None + not is_1m + and i_diff > 0 + ): + _i_last = i_step - append_diff = i_step - state['i_last_append'] + # append_diff = i_step - state['i_last_append'] + append_diff = i_step - _i_last_append + + # real-time update necessary? + _, _, _, r = chart.bars_range() + liv = r >= shm.index # update the "last datum" (aka extending the flow graphic with # new data) only if the number of unit steps is >= the number of # such unit steps per pixel (aka uppx). Iow, if the zoom level # is such that a datum(s) update to graphics wouldn't span # to a new pixel, we don't update yet. - do_append = (append_diff >= uppx) - if do_append: - state['i_last_append'] = i_step + do_append = ( + append_diff >= uppx + and i_diff + ) + if ( + do_append + and not is_1m + ): + _i_last_append = i_step + # fqsn = self.flume.symbol.fqsn + # print( + # f'DOING APPEND => {fqsn}\n' + # f'i_step:{i_step}\n' + # f'i_diff:{i_diff}\n' + # f'last:{_i_last}\n' + # f'last_append:{_i_last_append}\n' + # f'append_diff:{append_diff}\n' + # f'r: {r}\n' + # f'liv: {liv}\n' + # f'uppx: {uppx}\n' + # ) do_rt_update = uppx < update_uppx - _, _, _, r = chart.bars_range() - liv = r >= i_step - # TODO: pack this into a struct return ( uppx, @@ -229,9 +270,10 @@ async def graphics_update_loop( nurse: trio.Nursery, godwidget: GodWidget, - flume: Flume, + feed: Feed, + pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {}, wap_in_history: bool = False, - vlm_chart: Optional[ChartPlotWidget] = None, + vlm_charts: dict[str, ChartPlotWidget] = {}, ) -> None: ''' @@ -255,182 +297,217 @@ async def graphics_update_loop( fast_chart = linked.chart hist_chart = godwidget.hist_linked.chart + assert hist_chart - ohlcv = flume.rt_shm - hist_ohlcv = flume.hist_shm + dss: dict[str, DisplayState] = {} + for fqsn, flume in feed.flumes.items(): + ohlcv = flume.rt_shm + hist_ohlcv = flume.hist_shm + symbol = flume.symbol + fqsn = symbol.fqsn - # update last price sticky - last_price_sticky = fast_chart._ysticks[fast_chart.name] - last_price_sticky.update_from_data( - *ohlcv.array[-1][['index', 'close']] - ) + # update last price sticky + fast_pi = fast_chart._flows[fqsn].plot + last_price_sticky = fast_pi.getAxis('right')._stickies[fqsn] + last_price_sticky.update_from_data( + *ohlcv.array[-1][['index', 'close']] + ) + last_price_sticky.show() - hist_last_price_sticky = hist_chart._ysticks[hist_chart.name] - hist_last_price_sticky.update_from_data( - *hist_ohlcv.array[-1][['index', 'close']] - ) + slow_pi = hist_chart._flows[fqsn].plot + hist_last_price_sticky = slow_pi.getAxis('right')._stickies[fqsn] + hist_last_price_sticky.update_from_data( + *hist_ohlcv.array[-1][['index', 'close']] + ) - maxmin = partial( - chart_maxmin, - fast_chart, - ohlcv, - vlm_chart, - ) - last_bars_range: tuple[float, float] - ( - last_bars_range, - last_mx, - last_mn, - last_mx_vlm, - ) = maxmin() + vlm_chart = vlm_charts[fqsn] + maxmin = partial( + chart_maxmin, + fast_chart, + fqsn, + vlm_chart, + ) + last_bars_range: tuple[float, float] + ( + last_bars_range, + last_mx, + last_mn, + last_mx_vlm, + ) = maxmin() - last, volume = ohlcv.array[-1][['close', 'volume']] + last, volume = ohlcv.array[-1][['close', 'volume']] - symbol = fast_chart.linked.symbol + symbol = flume.symbol - l1 = L1Labels( - fast_chart, - # determine precision/decimal lengths - digits=symbol.tick_size_digits, - size_digits=symbol.lot_size_digits, - ) - fast_chart._l1_labels = l1 + l1 = L1Labels( + fast_pi, + # determine precision/decimal lengths + digits=symbol.tick_size_digits, + size_digits=symbol.lot_size_digits, + ) + # TODO: this is just wrong now since we can have multiple L1-label + # sets, so instead we should have the l1 associated with the + # plotitem or y-axis likely? + # fast_chart._l1_labels = l1 - # TODO: - # - in theory we should be able to read buffer data faster - # then msgs arrive.. needs some tinkering and testing + # TODO: + # - in theory we should be able to read buffer data faster + # then msgs arrive.. needs some tinkering and testing - # - if trade volume jumps above / below prior L1 price - # levels this might be dark volume we need to - # present differently -> likely dark vlm + # - if trade volume jumps above / below prior L1 price + # levels this might be dark volume we need to + # present differently -> likely dark vlm - tick_size = fast_chart.linked.symbol.tick_size - tick_margin = 3 * tick_size + tick_size = symbol.tick_size + tick_margin = 3 * tick_size - fast_chart.show() - last_quote = time.time() - i_last = ohlcv.index + fast_chart.show() + last_quote = time.time() + # global _i_last + i_last = ohlcv.index - ds = linked.display_state = DisplayState(**{ - 'godwidget': godwidget, - 'quotes': {}, - 'maxmin': maxmin, - 'ohlcv': ohlcv, - 'hist_ohlcv': hist_ohlcv, - 'chart': fast_chart, - 'last_price_sticky': last_price_sticky, - 'hist_last_price_sticky': hist_last_price_sticky, - 'l1': l1, + dss[fqsn] = ds = linked.display_state = DisplayState(**{ + 'godwidget': godwidget, + 'quotes': {}, + 'maxmin': maxmin, + 'flume': flume, + 'ohlcv': ohlcv, + 'hist_ohlcv': hist_ohlcv, + 'chart': fast_chart, + 'last_price_sticky': last_price_sticky, + 'hist_last_price_sticky': hist_last_price_sticky, + 'l1': l1, - 'vars': { - 'tick_margin': tick_margin, - 'i_last': i_last, - 'i_last_append': i_last, - 'last_mx_vlm': last_mx_vlm, - 'last_mx': last_mx, - 'last_mn': last_mn, - } - }) + 'vars': { + 'tick_margin': tick_margin, + 'i_last': i_last, + 'i_last_append': i_last, + 'last_mx_vlm': last_mx_vlm, + 'last_mx': last_mx, + 'last_mn': last_mn, + } + }) - if vlm_chart: - vlm_sticky = vlm_chart._ysticks['volume'] - ds.vlm_chart = vlm_chart - ds.vlm_sticky = vlm_sticky + if vlm_chart: + vlm_pi = vlm_chart._flows['volume'].plot + vlm_sticky = vlm_pi.getAxis('right')._stickies['volume'] + ds.vlm_chart = vlm_chart + ds.vlm_sticky = vlm_sticky - fast_chart.default_view() + fast_chart.default_view() - # TODO: probably factor this into some kinda `DisplayState` - # API that can be reused at least in terms of pulling view - # params (eg ``.bars_range()``). - async def increment_history_view(): - i_last = hist_ohlcv.index - state = ds.vars.copy() | { - 'i_last_append': i_last, - 'i_last': i_last, - } - _, hist_step_size_s, _ = flume.get_ds_info() + # TODO: probably factor this into some kinda `DisplayState` + # API that can be reused at least in terms of pulling view + # params (eg ``.bars_range()``). + async def increment_history_view(): + i_last = hist_ohlcv.index + state = ds.vars.copy() | { + 'i_last_append': i_last, + 'i_last': i_last, + } + _, hist_step_size_s, _ = flume.get_ds_info() - async with flume.index_stream( - # int(hist_step_size_s) - # TODO: seems this is more reliable at keeping the slow - # chart incremented in view more correctly? - # - It might make sense to just inline this logic with the - # main display task? => it's a tradeoff of slower task - # wakeups/ctx switches verus logic checks (as normal) - # - we need increment logic that only does the view shift - # call when the uppx permits/needs it - int(1), - ) as istream: - async for msg in istream: + async with flume.index_stream( + # int(hist_step_size_s) + # TODO: seems this is more reliable at keeping the slow + # chart incremented in view more correctly? + # - It might make sense to just inline this logic with the + # main display task? => it's a tradeoff of slower task + # wakeups/ctx switches verus logic checks (as normal) + # - we need increment logic that only does the view shift + # call when the uppx permits/needs it + int(1), + ) as istream: + async for msg in istream: - # check if slow chart needs an x-domain shift and/or - # y-range resize. - ( - uppx, - liv, - do_append, - i_diff, - append_diff, - do_rt_update, - ) = ds.incr_info( - chart=hist_chart, - shm=ds.hist_ohlcv, - state=state, - # update_state=False, - ) - # print( - # f'liv: {liv}\n' - # f'do_append: {do_append}\n' - # f'append_diff: {append_diff}\n' - # ) + # check if slow chart needs an x-domain shift and/or + # y-range resize. + ( + uppx, + liv, + do_append, + i_diff, + append_diff, + do_rt_update, + ) = ds.incr_info( + chart=hist_chart, + shm=ds.hist_ohlcv, + state=state, + is_1m=True, + # update_state=False, + ) + # print( + # f'liv: {liv}\n' + # f'do_append: {do_append}\n' + # f'append_diff: {append_diff}\n' + # ) - if ( - do_append - and liv - ): - hist_chart.increment_view(steps=i_diff) - hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) + if ( + do_append + and liv + ): + # hist_chart.increment_view(steps=i_diff) + flow = hist_chart._flows[fqsn] + flow.plot.vb._set_yrange( + # yrange=hist_chart.maxmin(name=fqsn) + ) + # hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) - nurse.start_soon(increment_history_view) + nurse.start_soon(increment_history_view) # main real-time quotes update loop - stream: tractor.MsgStream = flume.stream - async for quotes in stream: + stream: tractor.MsgStream + async with feed.open_multi_stream() as stream: + assert stream + async for quotes in stream: + quote_period = time.time() - last_quote + quote_rate = round( + 1/quote_period, 1) if quote_period > 0 else float('inf') + if ( + quote_period <= 1/_quote_throttle_rate - ds.quotes = quotes - quote_period = time.time() - last_quote - quote_rate = round( - 1/quote_period, 1) if quote_period > 0 else float('inf') - if ( - quote_period <= 1/_quote_throttle_rate + # in the absolute worst case we shouldn't see more then + # twice the expected throttle rate right!? + # and quote_rate >= _quote_throttle_rate * 2 + and quote_rate >= display_rate + ): + log.warning(f'High quote rate {symbol.key}: {quote_rate}') - # in the absolute worst case we shouldn't see more then - # twice the expected throttle rate right!? - # and quote_rate >= _quote_throttle_rate * 2 - and quote_rate >= display_rate - ): - log.warning(f'High quote rate {symbol.key}: {quote_rate}') + last_quote = time.time() - last_quote = time.time() + for sym, quote in quotes.items(): + ds = dss[sym] + ds.quotes = quote - # chart isn't active/shown so skip render cycle and pause feed(s) - if fast_chart.linked.isHidden(): - # print('skipping update') - fast_chart.pause_all_feeds() - continue + rt_pi, hist_pi = pis[sym] - # ic = fast_chart.view._ic - # if ic: - # fast_chart.pause_all_feeds() - # await ic.wait() - # fast_chart.resume_all_feeds() + # chart isn't active/shown so skip render cycle and + # pause feed(s) + if ( + fast_chart.linked.isHidden() + or not rt_pi.isVisible() + ): + print(f'{fqsn} skipping update -> CHART HIDDEN') + continue + # if fast_chart.linked.isHidden(): + # fast_chart.pause_all_feeds() - # sync call to update all graphics/UX components. - graphics_update_cycle(ds) + # ic = fast_chart.view._ic + # if ic: + # fast_chart.pause_all_feeds() + # await ic.wait() + # fast_chart.resume_all_feeds() + + # sync call to update all graphics/UX components. + graphics_update_cycle( + ds, + quote, + ) def graphics_update_cycle( ds: DisplayState, + quote: dict, wap_in_history: bool = False, trigger_all: bool = False, # flag used by prepend history updates prepend_update_index: Optional[int] = None, @@ -442,6 +519,12 @@ def graphics_update_cycle( chart = ds.chart # TODO: just pass this as a direct ref to avoid so many attr accesses? hist_chart = ds.godwidget.hist_linked.chart + assert hist_chart + + flume = ds.flume + sym = flume.symbol + fqsn = sym.fqsn + main_flow = chart._flows[fqsn] profiler = Profiler( msg=f'Graphics loop cycle for: `{chart.name}`', @@ -458,357 +541,381 @@ def graphics_update_cycle( # rt "HFT" chart l1 = ds.l1 - ohlcv = ds.ohlcv + # ohlcv = ds.ohlcv + ohlcv = flume.rt_shm array = ohlcv.array vars = ds.vars tick_margin = vars['tick_margin'] - for sym, quote in ds.quotes.items(): + ( + uppx, + liv, + do_append, + i_diff, + append_diff, + do_rt_update, + ) = ds.incr_info() + + # don't real-time "shift" the curve to the + # left unless we get one of the following: + if ( ( - uppx, - liv, - do_append, - i_diff, - append_diff, - do_rt_update, - ) = ds.incr_info() + do_append + and liv + ) + or trigger_all + ): + # print(f'INCREMENTING {fqsn}') + chart.increment_view(steps=i_diff) + main_flow.plot.vb._set_yrange( + # yrange=(mn, mx), + ) - # TODO: we should only run mxmn when we know - # an update is due via ``do_append`` above. - ( - brange, - mx_in_view, - mn_in_view, - mx_vlm_in_view, - ) = ds.maxmin() - l, lbar, rbar, r = brange - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin + # NOTE: since vlm and ohlc charts are axis linked now we don't + # need the double increment request? + # if vlm_chart: + # vlm_chart.increment_view(steps=i_diff) - profiler('`ds.maxmin()` call') + profiler('view incremented') - if ( - prepend_update_index is not None - and lbar > prepend_update_index - ): - # on a history update (usually from the FSP subsys) - # if the segment of history that is being prepended - # isn't in view there is no reason to do a graphics - # update. - log.debug('Skipping prepend graphics cycle: frame not in view') - return + ticks_frame = quote.get('ticks', ()) - # don't real-time "shift" the curve to the - # left unless we get one of the following: - if ( - (do_append and liv) - or trigger_all - ): - chart.increment_view(steps=i_diff) - chart.view._set_yrange(yrange=(mn, mx)) + frames_by_type: dict[str, dict] = {} + lasts = {} - if vlm_chart: - vlm_chart.increment_view(steps=i_diff) + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. - profiler('view incremented') + # iterate in FIFO order per tick-frame + # if sym != fqsn: + # continue - ticks_frame = quote.get('ticks', ()) + # TODO: we should only run mxmn when we know + # an update is due via ``do_append`` above. + ( + brange, + mx_in_view, + mn_in_view, + mx_vlm_in_view, + ) = ds.maxmin() + l, lbar, rbar, r = brange + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin + profiler('`ds.maxmin()` call') - frames_by_type: dict[str, dict] = {} - lasts = {} + if ( + prepend_update_index is not None + and lbar > prepend_update_index + ): + # on a history update (usually from the FSP subsys) + # if the segment of history that is being prepended + # isn't in view there is no reason to do a graphics + # update. + log.debug('Skipping prepend graphics cycle: frame not in view') + return - # build tick-type "frames" of tick sequences since - # likely the tick arrival rate is higher then our - # (throttled) quote stream rate. - for tick in ticks_frame: - price = tick.get('price') - ticktype = tick.get('type') + for tick in ticks_frame: + price = tick.get('price') + ticktype = tick.get('type') - if ticktype == 'n/a' or price == -1: - # okkk.. - continue + # if ticktype == 'n/a' or price == -1: + # # okkk.. + # continue - # keys are entered in olded-event-inserted-first order - # since we iterate ``ticks_frame`` in standard order - # above. in other words the order of the keys is the order - # of tick events by type from the provider feed. - frames_by_type.setdefault(ticktype, []).append(tick) + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) - # overwrites so the last tick per type is the entry - lasts[ticktype] = tick + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick - # from pprint import pformat - # frame_counts = { - # typ: len(frame) for typ, frame in frames_by_type.items() - # } - # print(f'{pformat(frame_counts)}') - # print(f'framed: {pformat(frames_by_type)}') - # print(f'lasts: {pformat(lasts)}') + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print( + # f'{pformat(frame_counts)}\n' + # f'framed: {pformat(frames_by_type)}\n' + # f'lasts: {pformat(lasts)}\n' + # ) - # TODO: eventually we want to separate out the utrade (aka - # dark vlm prices) here and show them as an additional - # graphic. - clear_types = _tick_groups['clears'] + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = _tick_groups['clears'] - # XXX: if we wanted to iterate in "latest" (i.e. most - # current) tick first order as an optimization where we only - # update from the last tick from each type class. - # last_clear_updated: bool = False + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False - # update ohlc sampled price bars - if ( - do_rt_update - or do_append - or trigger_all - ): - chart.update_graphics_from_flow( - chart.name, - do_append=do_append, - ) - hist_chart.update_graphics_from_flow( - chart.name, - do_append=do_append, - ) + # update ohlc sampled price bars + if ( + do_rt_update + or do_append + or trigger_all + ): + chart.update_graphics_from_flow( + fqsn, + # chart.name, + do_append=do_append, + ) + main_flow.draw_last(array_key=fqsn) - # NOTE: we always update the "last" datum - # since the current range should at least be updated - # to it's max/min on the last pixel. + hist_chart.update_graphics_from_flow( + fqsn, + # chart.name, + do_append=do_append, + ) - # iterate in FIFO order per tick-frame - for typ, tick in lasts.items(): + # NOTE: we always update the "last" datum + # since the current range should at least be updated + # to it's max/min on the last pixel. - price = tick.get('price') - size = tick.get('size') + for typ, tick in lasts.items(): - # compute max and min prices (including bid/ask) from - # tick frames to determine the y-range for chart - # auto-scaling. - # TODO: we need a streaming minmax algo here, see def above. - if liv: - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) + price = tick.get('price') + size = tick.get('size') - if typ in clear_types: + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + if liv: + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) - # XXX: if we only wanted to update graphics from the - # "current"/"latest received" clearing price tick - # once (see alt iteration order above). - # if last_clear_updated: - # continue + if typ in clear_types: - # last_clear_updated = True - # we only want to update grahpics from the *last* - # tick event that falls under the "clearing price" - # set. + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue - # update price sticky(s) - end = array[-1] - ds.last_price_sticky.update_from_data( - *end[['index', 'close']] - ) - ds.hist_last_price_sticky.update_from_data( - *end[['index', 'close']] - ) + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. - if wap_in_history: - # update vwap overlay line - chart.update_graphics_from_flow( - 'bar_wap', - ) + # update price sticky(s) + end_ic = array[-1][['index', 'close']] + ds.last_price_sticky.update_from_data(*end_ic) + ds.hist_last_price_sticky.update_from_data(*end_ic) - # L1 book label-line updates - # XXX: is this correct for ib? - # if ticktype in ('trade', 'last'): - # if ticktype in ('last',): # 'size'): - if typ in ('last',): # 'size'): + if wap_in_history: + # update vwap overlay line + chart.update_graphics_from_flow('bar_wap') - label = { - l1.ask_label.fields['level']: l1.ask_label, - l1.bid_label.fields['level']: l1.bid_label, - }.get(price) + # L1 book label-line updates + # XXX: is this correct for ib? + # if ticktype in ('trade', 'last'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): - if ( - label is not None - and liv - ): - label.update_fields( - {'level': price, 'size': size} - ) + label = { + l1.ask_label.fields['level']: l1.ask_label, + l1.bid_label.fields['level']: l1.bid_label, + }.get(price) - # TODO: on trades should we be knocking down - # the relevant L1 queue? - # label.size -= size - - elif ( - typ in _tick_groups['asks'] - # TODO: instead we could check if the price is in the - # y-view-range? - and liv - ): - l1.ask_label.update_fields({'level': price, 'size': size}) - - elif ( - typ in _tick_groups['bids'] - # TODO: instead we could check if the price is in the - # y-view-range? - and liv - ): - l1.bid_label.update_fields({'level': price, 'size': size}) - - # check for y-range re-size - if (mx > vars['last_mx']) or (mn < vars['last_mn']): - - # fast chart resize case if ( - liv - and not chart._static_yrange == 'axis' + label is not None + and liv ): - main_vb = chart.view - if ( - main_vb._ic is None - or not main_vb._ic.is_set() - ): - # print(f'updating range due to mxmn') - main_vb._set_yrange( - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, - yrange=(mn, mx), - ) - - # check if slow chart needs a resize - ( - _, - hist_liv, - _, - _, - _, - _, - ) = ds.incr_info( - chart=hist_chart, - shm=ds.hist_ohlcv, - update_state=False, - ) - if hist_liv: - hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) - - # XXX: update this every draw cycle to make L1-always-in-view work. - vars['last_mx'], vars['last_mn'] = mx, mn - - # run synchronous update on all linked flows - # TODO: should the "main" (aka source) flow be special? - for curve_name, flow in chart._flows.items(): - # update any overlayed fsp flows - if curve_name != chart.data_key: - update_fsp_chart( - chart, - flow, - curve_name, - array_key=curve_name, + label.update_fields( + {'level': price, 'size': size} ) + # TODO: on trades should we be knocking down + # the relevant L1 queue? + # label.size -= size + + elif ( + typ in _tick_groups['asks'] + # TODO: instead we could check if the price is in the + # y-view-range? + and liv + ): + l1.ask_label.update_fields({'level': price, 'size': size}) + + elif ( + typ in _tick_groups['bids'] + # TODO: instead we could check if the price is in the + # y-view-range? + and liv + ): + l1.bid_label.update_fields({'level': price, 'size': size}) + + # check for y-range re-size + if (mx > vars['last_mx']) or (mn < vars['last_mn']): + + # fast chart resize case + if ( + liv + and not chart._static_yrange == 'axis' + ): + # main_vb = chart.view + main_vb = chart._flows[fqsn].plot.vb + if ( + main_vb._ic is None + or not main_vb._ic.is_set() + ): + # print(f'updating range due to mxmn') + main_vb._set_yrange( + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + # yrange=(mn, mx), + ) + + # check if slow chart needs a resize + ( + _, + hist_liv, + _, + _, + _, + _, + ) = ds.incr_info( + chart=hist_chart, + shm=ds.hist_ohlcv, + update_state=False, + is_1m=True, + ) + if hist_liv: + flow = hist_chart._flows[fqsn] + flow.plot.vb._set_yrange( + # yrange=hist_chart.maxmin(name=fqsn), + ) + + # XXX: update this every draw cycle to make L1-always-in-view work. + vars['last_mx'], vars['last_mn'] = mx, mn + + # run synchronous update on all linked flows + # TODO: should the "main" (aka source) flow be special? + for curve_name, flow in chart._flows.items(): + # update any overlayed fsp flows + if ( + # curve_name != chart.data_key + curve_name != fqsn + and not flow.is_ohlc + ): + update_fsp_chart( + chart, + flow, + curve_name, + array_key=curve_name, + ) + + # even if we're downsampled bigly + # draw the last datum in the final + # px column to give the user the mx/mn + # range of that set. + if ( + liv + # and not do_append + # and not do_rt_update + ): + flow.draw_last( + array_key=curve_name, + only_last_uppx=True, + ) + + # volume chart logic.. + # TODO: can we unify this with the above loop? + if vlm_chart: + # always update y-label + ds.vlm_sticky.update_from_data( + *array[-1][['index', 'volume']] + ) + + if ( + ( + do_rt_update + or do_append + and liv + ) + or trigger_all + ): + # TODO: make it so this doesn't have to be called + # once the $vlm is up? + vlm_chart.update_graphics_from_flow( + 'volume', + # UGGGh, see ``maxmin()`` impl in `._fsp` for + # the overlayed plotitems... we need a better + # bay to invoke a maxmin per overlay.. + render=False, + # XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^ + # without this, since we disable the + # 'volume' (units) chart after the $vlm starts + # up we need to be sure to enable this + # auto-ranging otherwise there will be no handler + # connected to update accompanying overlay + # graphics.. + ) + profiler('`vlm_chart.update_graphics_from_flow()`') + + if ( + mx_vlm_in_view != vars['last_mx_vlm'] + ): + yrange = (0, mx_vlm_in_view * 1.375) + vlm_chart.view._set_yrange( + yrange=yrange, + ) + profiler('`vlm_chart.view._set_yrange()`') + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + vars['last_mx_vlm'] = mx_vlm_in_view + + # update all downstream FSPs + for curve_name, flow in vlm_chart._flows.items(): + + if ( + curve_name not in {'volume', fqsn} + and flow.render + and ( + liv and do_rt_update + or do_append + ) + # and not flow.is_ohlc + # and curve_name != fqsn + ): + update_fsp_chart( + vlm_chart, + flow, + curve_name, + array_key=curve_name, + # do_append=uppx < update_uppx, + do_append=do_append, + ) + # is this even doing anything? + # (pretty sure it's the real-time + # resizing from last quote?) + fvb = flow.plot.vb + fvb._set_yrange( + name=curve_name, + ) + + elif ( + curve_name != 'volume' + and not do_append + and liv + and uppx >= 1 # even if we're downsampled bigly # draw the last datum in the final # px column to give the user the mx/mn # range of that set. - if ( - not do_append - # and not do_rt_update - and liv - ): - flow.draw_last( - array_key=curve_name, - only_last_uppx=True, - ) - - # volume chart logic.. - # TODO: can we unify this with the above loop? - if vlm_chart: - # always update y-label - ds.vlm_sticky.update_from_data( - *array[-1][['index', 'volume']] - ) - - if ( - ( - do_rt_update - or do_append - and liv - ) - or trigger_all - ): - # TODO: make it so this doesn't have to be called - # once the $vlm is up? - vlm_chart.update_graphics_from_flow( - 'volume', - # UGGGh, see ``maxmin()`` impl in `._fsp` for - # the overlayed plotitems... we need a better - # bay to invoke a maxmin per overlay.. - render=False, - # XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^ - # without this, since we disable the - # 'volume' (units) chart after the $vlm starts - # up we need to be sure to enable this - # auto-ranging otherwise there will be no handler - # connected to update accompanying overlay - # graphics.. - ) - profiler('`vlm_chart.update_graphics_from_flow()`') - - if ( - mx_vlm_in_view != vars['last_mx_vlm'] - ): - yrange = (0, mx_vlm_in_view * 1.375) - vlm_chart.view._set_yrange( - yrange=yrange, - ) - profiler('`vlm_chart.view._set_yrange()`') - # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vars['last_mx_vlm'] = mx_vlm_in_view - - for curve_name, flow in vlm_chart._flows.items(): - - if ( - curve_name != 'volume' and - flow.render and ( - liv and - do_rt_update or do_append - ) - ): - update_fsp_chart( - vlm_chart, - flow, - curve_name, - array_key=curve_name, - # do_append=uppx < update_uppx, - do_append=do_append, - ) - # is this even doing anything? - # (pretty sure it's the real-time - # resizing from last quote?) - fvb = flow.plot.vb - fvb._set_yrange( - name=curve_name, - ) - - elif ( - curve_name != 'volume' - and not do_append - and liv - and uppx >= 1 - # even if we're downsampled bigly - # draw the last datum in the final - # px column to give the user the mx/mn - # range of that set. - ): - # always update the last datum-element - # graphic for all flows - # print(f'drawing last {flow.name}') - flow.draw_last(array_key=curve_name) + ): + # always update the last datum-element + # graphic for all flows + # print(f'drawing last {flow.name}') + flow.draw_last(array_key=curve_name) async def link_views_with_region( @@ -833,11 +940,12 @@ async def link_views_with_region( pen=pg.mkPen(hcolor('gunmetal')), brush=pg.mkBrush(hcolor('default_darkest')), ) - region.setZValue(10) # put linear region "in front" in layer terms + region.setOpacity(0) hist_pi.addItem(region, ignoreBounds=True) + region.setOpacity(6/16) - flow = rt_chart._flows[hist_chart.name] + flow = rt_chart._flows[flume.symbol.fqsn] assert flow # XXX: no idea why this doesn't work but it's causing @@ -945,9 +1053,34 @@ async def link_views_with_region( # region.sigRegionChangeFinished.connect(update_pi_from_region) +# force 0 to always be in view +def multi_maxmin( + chart: ChartPlotWidget, + names: list[str], + +) -> tuple[float, float]: + ''' + Flows "group" maxmin loop; assumes all named flows + are in the same co-domain and thus can be sorted + as one set. + + Iterates all the named flows and calls the chart + api to find their range values and return. + + TODO: really we should probably have a more built-in API + for this? + + ''' + mx = 0 + for name in names: + ymn, ymx = chart.maxmin(name=name) + mx = max(mx, ymx) + + return 0, mx + + async def display_symbol_data( godwidget: GodWidget, - provider: str, fqsns: list[str], loglevel: str, order_mode_started: trio.Event, @@ -972,13 +1105,13 @@ async def display_symbol_data( # ) for fqsn in fqsns: - loading_sym_key = sbar.open_status( f'loading {fqsn} ->', group_key=True ) feed: Feed + # assert len(fqsns) == 2 async with open_feed( fqsns, loglevel=loglevel, @@ -989,76 +1122,171 @@ async def display_symbol_data( ) as feed: - # TODO: right now we only show one symbol on charts, but - # overlays are coming muy pronto guey.. - assert len(feed.flumes) == 1 - flume = list(feed.flumes.values())[0] + # use expanded contract symbols passed back from feed layer. + fqsns = list(feed.flumes.keys()) - ohlcv: ShmArray = flume.rt_shm - hist_ohlcv: ShmArray = flume.hist_shm - - symbol = flume.symbol - fqsn = symbol.fqsn - - step_size_s = 1 - tf_key = tf_in_1s[step_size_s] - - # load in symbol's ohlc data + # step_size_s = 1 + # tf_key = tf_in_1s[step_size_s] godwidget.window.setWindowTitle( - f'{fqsn} ' - f'tick:{symbol.tick_size} ' - f'step:{tf_key} ' + f'{fqsns} ' + # f'tick:{symbol.tick_size} ' + # f'step:{tf_key} ' ) - - rt_linked = godwidget.rt_linked - rt_linked._symbol = symbol - - # create top history view chart above the "main rt chart". - hist_linked = godwidget.hist_linked - hist_linked._symbol = symbol - hist_chart = hist_linked.plot_ohlc_main( - symbol, - hist_ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - # sidepane=False, - sidepane=godwidget.search, - ) - # don't show when not focussed - hist_linked.cursor.always_show_xlabel = False - # generate order mode side-pane UI # A ``FieldsForm`` form to configure order entry # and add as next-to-y-axis singleton pane pp_pane: FieldsForm = mk_order_pane_layout(godwidget) godwidget.pp_pane = pp_pane - # create main OHLC chart - ohlc_chart = rt_linked.plot_ohlc_main( - symbol, - ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - sidepane=pp_pane, - ) + # create top history view chart above the "main rt chart". + rt_linked = godwidget.rt_linked + hist_linked = godwidget.hist_linked - ohlc_chart._feeds[symbol.key] = feed - ohlc_chart.setFocus() + # NOTE: here we insert the slow-history chart set into + # the fast chart's splitter -> so it's a splitter of charts + # inside the first widget slot of a splitter of charts XD + rt_linked.splitter.insertWidget(0, hist_linked) - # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! - # plot historical vwap if available - wap_in_history = False - # if ( - # brokermod._show_wap_in_history - # and 'bar_wap' in bars.dtype.fields - # ): - # wap_in_history = True - # ohlc_chart.draw_curve( - # name='bar_wap', - # shm=ohlcv, - # color='default_light', - # add_label=False, - # ) + rt_chart: None | ChartPlotWidget = None + hist_chart: None | ChartPlotWidget = None + + bg_chart_color = 'grayest' + bg_last_bar_color = 'grayer' + + pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {} + + # load in ohlc data to a common linked but split chart set. + for fqsn, flume in feed.flumes.items(): + rt_linked._symbol = flume.symbol + hist_linked._symbol = flume.symbol + + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + fqsn = symbol.fqsn + + if hist_chart is None: + hist_chart = hist_linked.plot_ohlc_main( + symbol, + hist_ohlcv, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + # sidepane=False, + sidepane=godwidget.search, + ) + pis.setdefault(fqsn, [None, None])[1] = hist_chart.plotItem + else: + hist_pi = hist_chart.overlay_plotitem( + name=fqsn, + axis_title=fqsn, + ) + hist_pi.hideAxis('left') + hist_pi.hideAxis('bottom') + + curve, _ = hist_chart.draw_curve( + name=fqsn, + shm=hist_ohlcv, + array_key=fqsn, + overlay=hist_pi, + pi=hist_pi, + is_ohlc=True, + + color=bg_chart_color, + last_bar_color=bg_last_bar_color, + ) + + hist_pi.vb.maxmin = partial( + hist_chart.maxmin, + name=fqsn, + ) + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + flow = hist_chart._flows[fqsn] + assert flow.plot is hist_pi + + # group_mxmn = partial( + # multi_maxmin, + # names=fqsns, + # chart=hist_chart, + # ) + # hist_pi.vb._maxmin = group_mxmn + pis.setdefault(fqsn, [None, None])[1] = hist_pi + + # don't show when not focussed + hist_linked.cursor.always_show_xlabel = False + + # create main OHLC chart + if rt_chart is None: + rt_chart = rt_linked.plot_ohlc_main( + symbol, + ohlcv, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + sidepane=pp_pane, + ) + pis.setdefault(fqsn, [None, None])[0] = rt_chart.plotItem + else: + rt_pi = rt_chart.overlay_plotitem( + name=fqsn, + axis_title=fqsn, + ) + + rt_pi.hideAxis('left') + rt_pi.hideAxis('bottom') + + curve, _ = rt_chart.draw_curve( + name=fqsn, + shm=ohlcv, + array_key=fqsn, + overlay=rt_pi, + pi=rt_pi, + is_ohlc=True, + + color=bg_chart_color, + last_bar_color=bg_last_bar_color, + ) + rt_pi.vb.maxmin = partial( + rt_chart.maxmin, + name=fqsn, + ) + # multi_maxmin, + # names=fqsn + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + flow = rt_chart._flows[fqsn] + assert flow.plot is rt_pi + + # group_mxmn = partial( + # multi_maxmin, + # names=fqsns, + # chart=rt_chart, + # ) + # rt_pi.vb._maxmin = group_mxmn + pis.setdefault(fqsn, [None, None])[0] = rt_pi + + rt_chart._feeds[symbol.key] = feed + rt_chart.setFocus() + + # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! + # plot historical vwap if available + wap_in_history = False + # if ( + # brokermod._show_wap_in_history + # and 'bar_wap' in bars.dtype.fields + # ): + # wap_in_history = True + # rt_chart.draw_curve( + # name='bar_wap', + # shm=ohlcv, + # color='default_light', + # add_label=False, + # ) # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to @@ -1067,86 +1295,60 @@ async def display_symbol_data( rt_linked.focus() await trio.sleep(0) - # NOTE: here we insert the slow-history chart set into - # the fast chart's splitter -> so it's a splitter of charts - # inside the first widget slot of a splitter of charts XD - rt_linked.splitter.insertWidget(0, hist_linked) # XXX: if we wanted it at the bottom? # rt_linked.splitter.addWidget(hist_linked) rt_linked.focus() godwidget.resize_all() - vlm_chart: Optional[ChartPlotWidget] = None + vlms: dict[str, ChartPlotWidget] = {} async with trio.open_nursery() as ln: + for fqsn, flume in feed.flumes.items(): - # if available load volume related built-in display(s) - if ( - not symbol.broker_info[provider].get('no_vlm', False) - and has_vlm(ohlcv) - ): - vlm_chart = await ln.start( - open_vlm_displays, + vlm_chart: Optional[ChartPlotWidget] = None + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + brokername = symbol.brokers[0] + # if available load volume related built-in display(s) + if ( + not symbol.broker_info[brokername].get('no_vlm', False) + and has_vlm(ohlcv) + ): + vlms[fqsn] = vlm_chart = await ln.start( + open_vlm_displays, + rt_linked, + flume, + ) + + # load (user's) FSP set (otherwise known as "indicators") + # from an input config. + ln.start_soon( + start_fsp_displays, rt_linked, - ohlcv, + flume, + loading_sym_key, + loglevel, ) - # load (user's) FSP set (otherwise known as "indicators") - # from an input config. - ln.start_soon( - start_fsp_displays, - rt_linked, - ohlcv, - loading_sym_key, - loglevel, - ) + # size view to data prior to order mode init + rt_chart.default_view() + rt_linked.graphics_cycle() + await trio.sleep(0) - # start graphics update loop after receiving first live quote - ln.start_soon( - graphics_update_loop, - ln, - godwidget, - flume, - wap_in_history, - vlm_chart, - ) + hist_chart.default_view( + bars_from_y=int(len(hist_ohlcv.array)), # size to data + y_offset=6116*2, # push it a little away from the y-axis + ) + hist_linked.graphics_cycle() + await trio.sleep(0) - await trio.sleep(0) + godwidget.resize_all() - # size view to data prior to order mode init - ohlc_chart.default_view() - rt_linked.graphics_cycle() - await trio.sleep(0) - - hist_chart.default_view( - bars_from_y=int(len(hist_ohlcv.array)), # size to data - y_offset=6116*2, # push it a little away from the y-axis - ) - hist_linked.graphics_cycle() - await trio.sleep(0) - - godwidget.resize_all() - - await link_views_with_region( - ohlc_chart, - hist_chart, - flume, - ) - - mode: OrderMode - async with ( - open_order_mode( - feed, - godwidget, - fqsn, - order_mode_started - ) as mode - ): if not vlm_chart: # trigger another view reset if no sub-chart - ohlc_chart.default_view() - - rt_linked.mode = mode + rt_chart.default_view() # let Qt run to render all widgets and make sure the # sidepanes line up vertically. @@ -1165,7 +1367,7 @@ async def display_symbol_data( # TODO: make this not so shit XD # close group status - sbar._status_groups[loading_sym_key][1]() + # sbar._status_groups[loading_sym_key][1]() hist_linked.graphics_cycle() await trio.sleep(0) @@ -1178,5 +1380,35 @@ async def display_symbol_data( ) godwidget.resize_all() + await link_views_with_region( + rt_chart, + hist_chart, + flume, + ) + + # start graphics update loop after receiving first live quote + ln.start_soon( + graphics_update_loop, + ln, + godwidget, + feed, + pis, + wap_in_history, + vlms, + ) + + await trio.sleep(0) + + mode: OrderMode + async with ( + open_order_mode( + feed, + godwidget, + fqsns[-1], + order_mode_started + ) as mode + ): + rt_linked.mode = mode + # let the app run.. bby await trio.sleep_forever()