From be3dc69290c8363155994656040cb58b5abc116c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Nov 2022 17:06:48 -0500 Subject: [PATCH 01/13] Only update pnl label on quotes with an fqsn match --- piker/ui/_position.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 98584161..6eb1d962 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -80,9 +80,9 @@ async def update_pnl_from_feed( ''' global _pnl_tasks - pp = order_mode.current_pp - live = pp.live_pp - key = live.symbol.front_fqsn() + pp: PositionTracker = order_mode.current_pp + live: Position = pp.live_pp + key: str = live.symbol.front_fqsn() log.info(f'Starting pnl display for {pp.alloc.account}') @@ -101,12 +101,18 @@ async def update_pnl_from_feed( async with flume.stream.subscribe() as bstream: # last_tick = time.time() async for quotes in bstream: - # now = time.time() # period = now - last_tick for sym, quote in quotes.items(): + # TODO: uggggh we probably want a better state + # management then this sincce we want to enable + # updating whatever the current symbol is in + # real-time right? + if sym != key: + continue + for tick in iterticks(quote, types): # print(f'{1/period} Hz') @@ -119,6 +125,7 @@ async def update_pnl_from_feed( else: # compute and display pnl status + # print(f'formatting PNL {sym}: {quote}') order_mode.pane.pnl_label.format( pnl=copysign(1, size) * pnl( # live.ppu, From 7aec238f5f27ce1d5559d633aead3596a24ea8a7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Nov 2022 15:05:05 -0500 Subject: [PATCH 02/13] Make graphics-update-loop multi-sym aware B) Initial support for real-time multi-symbol overlay charts using an aggregate feed delivered by `Feed.open_multi_stream()`. The setup steps for constructing the overlayed plot items is still very very rough and will likely provide incentive for better refactoring high level "charting APIs". For each fqsn passed into `display_symbol_data()` we now synchronously, - create a single call to `LinkedSplits.plot_ohlc_main() -> `ChartPlotWidget` where we cache the chart in scope and for all other "sibling" fqsns we, - make a call to `ChartPlotWidget.overlay_plotitem()` -> `PlotItem`, hide its axes, make another call with this plotitem input to `ChartPlotWidget.draw_curve()`, set a sym-specific view box auto-yrange maxmin callback, register the plotitem in a global `pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {}` Once all plots have been created we then asynchronously for each symbol, - maybe create a volume chart and register it in a similar task-global table: `vlms: dict[str, ChartPlotWidget] = {}` - start fsp displays for each symbol Then common entrypoints are entered once for all symbols: - a single `graphics_update_loop()` loop-task is started wherein real-time graphics update components for each symbol are created, * `L1Labels` * y-axis last clearing price stickies * `maxmin()` auto-ranger * `DisplayState` (stored in a table `dss: dict[str, DisplayState] = {}`) * an `increment_history_view()` task and a single call to `Feed.open_multi_stream()` is used to create a symbol-multiplexed quote stream which drives a single loop over all symbols wherein for each quote the appropriate components are looked up and passed to `graphics_update_cycle()`. - a single call to `open_order_mode()` is made with the first symbol provided as input, though eventually we want to support passing in the entire list. Further internal implementation details: - special tweaks to the `pg.LinearRegionItem` setup wherein the region is added with a zero opacity and *after* all plotitem overlays to avoid and issue where overlays weren't being shown within the region area in the history chart. - all symbol-specific graphics oriented update calls are adjusted to pass in the fqsn: * `update_fsp_chart()` * `ChartView._set_yrange()` * ChartPlotWidget.update_graphics_from_flow()` - avoid a double increment on sample step updates by not calling the increment on any vlm chart since it seems the vlm-ohlc chart linking already takes care of this now? - use global counters for the last epoch time step to avoid incrementing all views more then once per new time step given underlying shm array buffers may be on different array-index values from one another. --- piker/ui/_display.py | 1435 ++++++++++++++++++++++++------------------ 1 file changed, 832 insertions(+), 603 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index babbfa7a..2e43b475 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -36,6 +36,9 @@ from ..data.feed import ( Flume, ) from ..data.types import Struct +from ..data._sharedmem import ( + ShmArray, +) from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, @@ -50,14 +53,12 @@ from ._fsp import ( has_vlm, open_vlm_displays, ) -from ..data._sharedmem import ( - ShmArray, -) -from ..data._source import tf_in_1s from ._forms import ( FieldsForm, mk_order_pane_layout, ) +from . import _pg_overrides as pgo +# from ..data._source import tf_in_1s from .order_mode import ( open_order_mode, OrderMode, @@ -90,7 +91,8 @@ _tick_groups = { # https://github.com/lemire/pythonmaxmin def chart_maxmin( chart: ChartPlotWidget, - ohlcv_shm: ShmArray, + fqsn: str, + # ohlcv_shm: ShmArray, vlm_chart: Optional[ChartPlotWidget] = None, ) -> tuple[ @@ -106,7 +108,7 @@ def chart_maxmin( ''' last_bars_range = chart.bars_range() - out = chart.maxmin() + out = chart.maxmin(name=fqsn) if out is None: return (last_bars_range, 0, 0, 0) @@ -131,6 +133,10 @@ def chart_maxmin( ) +_i_last: int = 0 +_i_last_append: int = 0 + + class DisplayState(Struct): ''' Chart-local real-time graphics state container. @@ -140,6 +146,7 @@ class DisplayState(Struct): quotes: dict[str, Any] maxmin: Callable + flume: Flume ohlcv: ShmArray hist_ohlcv: ShmArray @@ -173,14 +180,18 @@ class DisplayState(Struct): update_state: bool = True, update_uppx: float = 16, + is_1m: bool = False, ) -> tuple: shm = shm or self.ohlcv chart = chart or self.chart - state = state or self.vars + # state = state or self.vars - if not update_state: + if ( + not update_state + and state + ): state = state.copy() # compute the first available graphic's x-units-per-pixel @@ -194,26 +205,56 @@ class DisplayState(Struct): # "curve" length is already automatic. # increment the view position by the sample offset. - i_step = shm.index - i_diff = i_step - state['i_last'] - state['i_last'] = i_step + # i_step = shm.index + i_step = shm.array[-1]['time'] + # i_diff = i_step - state['i_last'] + # state['i_last'] = i_step + global _i_last, _i_last_append + i_diff = i_step - _i_last + # update global state + if ( + # state is None + not is_1m + and i_diff > 0 + ): + _i_last = i_step - append_diff = i_step - state['i_last_append'] + # append_diff = i_step - state['i_last_append'] + append_diff = i_step - _i_last_append + + # real-time update necessary? + _, _, _, r = chart.bars_range() + liv = r >= shm.index # update the "last datum" (aka extending the flow graphic with # new data) only if the number of unit steps is >= the number of # such unit steps per pixel (aka uppx). Iow, if the zoom level # is such that a datum(s) update to graphics wouldn't span # to a new pixel, we don't update yet. - do_append = (append_diff >= uppx) - if do_append: - state['i_last_append'] = i_step + do_append = ( + append_diff >= uppx + and i_diff + ) + if ( + do_append + and not is_1m + ): + _i_last_append = i_step + # fqsn = self.flume.symbol.fqsn + # print( + # f'DOING APPEND => {fqsn}\n' + # f'i_step:{i_step}\n' + # f'i_diff:{i_diff}\n' + # f'last:{_i_last}\n' + # f'last_append:{_i_last_append}\n' + # f'append_diff:{append_diff}\n' + # f'r: {r}\n' + # f'liv: {liv}\n' + # f'uppx: {uppx}\n' + # ) do_rt_update = uppx < update_uppx - _, _, _, r = chart.bars_range() - liv = r >= i_step - # TODO: pack this into a struct return ( uppx, @@ -229,9 +270,10 @@ async def graphics_update_loop( nurse: trio.Nursery, godwidget: GodWidget, - flume: Flume, + feed: Feed, + pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {}, wap_in_history: bool = False, - vlm_chart: Optional[ChartPlotWidget] = None, + vlm_charts: dict[str, ChartPlotWidget] = {}, ) -> None: ''' @@ -255,185 +297,217 @@ async def graphics_update_loop( fast_chart = linked.chart hist_chart = godwidget.hist_linked.chart + assert hist_chart - ohlcv = flume.rt_shm - hist_ohlcv = flume.hist_shm + dss: dict[str, DisplayState] = {} + for fqsn, flume in feed.flumes.items(): + ohlcv = flume.rt_shm + hist_ohlcv = flume.hist_shm + symbol = flume.symbol + fqsn = symbol.fqsn - # update last price sticky - last_price_sticky = fast_chart.plotItem.getAxis( - 'right')._stickies.get(fast_chart.name) - last_price_sticky.update_from_data( - *ohlcv.array[-1][['index', 'close']] - ) + # update last price sticky + fast_pi = fast_chart._flows[fqsn].plot + last_price_sticky = fast_pi.getAxis('right')._stickies[fqsn] + last_price_sticky.update_from_data( + *ohlcv.array[-1][['index', 'close']] + ) + last_price_sticky.show() - hist_last_price_sticky = hist_chart.plotItem.getAxis( - 'right')._stickies.get(hist_chart.name) - hist_last_price_sticky.update_from_data( - *hist_ohlcv.array[-1][['index', 'close']] - ) + slow_pi = hist_chart._flows[fqsn].plot + hist_last_price_sticky = slow_pi.getAxis('right')._stickies[fqsn] + hist_last_price_sticky.update_from_data( + *hist_ohlcv.array[-1][['index', 'close']] + ) - maxmin = partial( - chart_maxmin, - fast_chart, - ohlcv, - vlm_chart, - ) - last_bars_range: tuple[float, float] - ( - last_bars_range, - last_mx, - last_mn, - last_mx_vlm, - ) = maxmin() + vlm_chart = vlm_charts[fqsn] + maxmin = partial( + chart_maxmin, + fast_chart, + fqsn, + vlm_chart, + ) + last_bars_range: tuple[float, float] + ( + last_bars_range, + last_mx, + last_mn, + last_mx_vlm, + ) = maxmin() - last, volume = ohlcv.array[-1][['close', 'volume']] + last, volume = ohlcv.array[-1][['close', 'volume']] - symbol = fast_chart.linked.symbol + symbol = flume.symbol - l1 = L1Labels( - fast_chart.plotItem, - # determine precision/decimal lengths - digits=symbol.tick_size_digits, - size_digits=symbol.lot_size_digits, - ) - fast_chart._l1_labels = l1 + l1 = L1Labels( + fast_pi, + # determine precision/decimal lengths + digits=symbol.tick_size_digits, + size_digits=symbol.lot_size_digits, + ) + # TODO: this is just wrong now since we can have multiple L1-label + # sets, so instead we should have the l1 associated with the + # plotitem or y-axis likely? + # fast_chart._l1_labels = l1 - # TODO: - # - in theory we should be able to read buffer data faster - # then msgs arrive.. needs some tinkering and testing + # TODO: + # - in theory we should be able to read buffer data faster + # then msgs arrive.. needs some tinkering and testing - # - if trade volume jumps above / below prior L1 price - # levels this might be dark volume we need to - # present differently -> likely dark vlm + # - if trade volume jumps above / below prior L1 price + # levels this might be dark volume we need to + # present differently -> likely dark vlm - tick_size = fast_chart.linked.symbol.tick_size - tick_margin = 3 * tick_size + tick_size = symbol.tick_size + tick_margin = 3 * tick_size - fast_chart.show() - last_quote = time.time() - i_last = ohlcv.index + fast_chart.show() + last_quote = time.time() + # global _i_last + i_last = ohlcv.index - ds = linked.display_state = DisplayState(**{ - 'godwidget': godwidget, - 'quotes': {}, - 'maxmin': maxmin, - 'ohlcv': ohlcv, - 'hist_ohlcv': hist_ohlcv, - 'chart': fast_chart, - 'last_price_sticky': last_price_sticky, - 'hist_last_price_sticky': hist_last_price_sticky, - 'l1': l1, + dss[fqsn] = ds = linked.display_state = DisplayState(**{ + 'godwidget': godwidget, + 'quotes': {}, + 'maxmin': maxmin, + 'flume': flume, + 'ohlcv': ohlcv, + 'hist_ohlcv': hist_ohlcv, + 'chart': fast_chart, + 'last_price_sticky': last_price_sticky, + 'hist_last_price_sticky': hist_last_price_sticky, + 'l1': l1, - 'vars': { - 'tick_margin': tick_margin, - 'i_last': i_last, - 'i_last_append': i_last, - 'last_mx_vlm': last_mx_vlm, - 'last_mx': last_mx, - 'last_mn': last_mn, - } - }) + 'vars': { + 'tick_margin': tick_margin, + 'i_last': i_last, + 'i_last_append': i_last, + 'last_mx_vlm': last_mx_vlm, + 'last_mx': last_mx, + 'last_mn': last_mn, + } + }) - if vlm_chart: - vlm_sticky = vlm_chart.plotItem.getAxis( - 'right')._stickies.get('volume') - ds.vlm_chart = vlm_chart - ds.vlm_sticky = vlm_sticky + if vlm_chart: + vlm_pi = vlm_chart._flows['volume'].plot + vlm_sticky = vlm_pi.getAxis('right')._stickies['volume'] + ds.vlm_chart = vlm_chart + ds.vlm_sticky = vlm_sticky - fast_chart.default_view() + fast_chart.default_view() - # TODO: probably factor this into some kinda `DisplayState` - # API that can be reused at least in terms of pulling view - # params (eg ``.bars_range()``). - async def increment_history_view(): - i_last = hist_ohlcv.index - state = ds.vars.copy() | { - 'i_last_append': i_last, - 'i_last': i_last, - } - _, hist_step_size_s, _ = flume.get_ds_info() + # TODO: probably factor this into some kinda `DisplayState` + # API that can be reused at least in terms of pulling view + # params (eg ``.bars_range()``). + async def increment_history_view(): + i_last = hist_ohlcv.index + state = ds.vars.copy() | { + 'i_last_append': i_last, + 'i_last': i_last, + } + _, hist_step_size_s, _ = flume.get_ds_info() - async with flume.index_stream( - # int(hist_step_size_s) - # TODO: seems this is more reliable at keeping the slow - # chart incremented in view more correctly? - # - It might make sense to just inline this logic with the - # main display task? => it's a tradeoff of slower task - # wakeups/ctx switches verus logic checks (as normal) - # - we need increment logic that only does the view shift - # call when the uppx permits/needs it - int(1), - ) as istream: - async for msg in istream: + async with flume.index_stream( + # int(hist_step_size_s) + # TODO: seems this is more reliable at keeping the slow + # chart incremented in view more correctly? + # - It might make sense to just inline this logic with the + # main display task? => it's a tradeoff of slower task + # wakeups/ctx switches verus logic checks (as normal) + # - we need increment logic that only does the view shift + # call when the uppx permits/needs it + int(1), + ) as istream: + async for msg in istream: - # check if slow chart needs an x-domain shift and/or - # y-range resize. - ( - uppx, - liv, - do_append, - i_diff, - append_diff, - do_rt_update, - ) = ds.incr_info( - chart=hist_chart, - shm=ds.hist_ohlcv, - state=state, - # update_state=False, - ) - # print( - # f'liv: {liv}\n' - # f'do_append: {do_append}\n' - # f'append_diff: {append_diff}\n' - # ) + # check if slow chart needs an x-domain shift and/or + # y-range resize. + ( + uppx, + liv, + do_append, + i_diff, + append_diff, + do_rt_update, + ) = ds.incr_info( + chart=hist_chart, + shm=ds.hist_ohlcv, + state=state, + is_1m=True, + # update_state=False, + ) + # print( + # f'liv: {liv}\n' + # f'do_append: {do_append}\n' + # f'append_diff: {append_diff}\n' + # ) - if ( - do_append - and liv - ): - hist_chart.increment_view(steps=i_diff) - hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) + if ( + do_append + and liv + ): + # hist_chart.increment_view(steps=i_diff) + flow = hist_chart._flows[fqsn] + flow.plot.vb._set_yrange( + # yrange=hist_chart.maxmin(name=fqsn) + ) + # hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) - nurse.start_soon(increment_history_view) + nurse.start_soon(increment_history_view) # main real-time quotes update loop - stream: tractor.MsgStream = flume.stream - async for quotes in stream: + stream: tractor.MsgStream + async with feed.open_multi_stream() as stream: + assert stream + async for quotes in stream: + quote_period = time.time() - last_quote + quote_rate = round( + 1/quote_period, 1) if quote_period > 0 else float('inf') + if ( + quote_period <= 1/_quote_throttle_rate - ds.quotes = quotes - quote_period = time.time() - last_quote - quote_rate = round( - 1/quote_period, 1) if quote_period > 0 else float('inf') - if ( - quote_period <= 1/_quote_throttle_rate + # in the absolute worst case we shouldn't see more then + # twice the expected throttle rate right!? + # and quote_rate >= _quote_throttle_rate * 2 + and quote_rate >= display_rate + ): + log.warning(f'High quote rate {symbol.key}: {quote_rate}') - # in the absolute worst case we shouldn't see more then - # twice the expected throttle rate right!? - # and quote_rate >= _quote_throttle_rate * 2 - and quote_rate >= display_rate - ): - log.warning(f'High quote rate {symbol.key}: {quote_rate}') + last_quote = time.time() - last_quote = time.time() + for sym, quote in quotes.items(): + ds = dss[sym] + ds.quotes = quote - # chart isn't active/shown so skip render cycle and pause feed(s) - if fast_chart.linked.isHidden(): - # print('skipping update') - fast_chart.pause_all_feeds() - continue + rt_pi, hist_pi = pis[sym] - # ic = fast_chart.view._ic - # if ic: - # fast_chart.pause_all_feeds() - # await ic.wait() - # fast_chart.resume_all_feeds() + # chart isn't active/shown so skip render cycle and + # pause feed(s) + if ( + fast_chart.linked.isHidden() + or not rt_pi.isVisible() + ): + print(f'{fqsn} skipping update -> CHART HIDDEN') + continue + # if fast_chart.linked.isHidden(): + # fast_chart.pause_all_feeds() - # sync call to update all graphics/UX components. - graphics_update_cycle(ds) + # ic = fast_chart.view._ic + # if ic: + # fast_chart.pause_all_feeds() + # await ic.wait() + # fast_chart.resume_all_feeds() + + # sync call to update all graphics/UX components. + graphics_update_cycle( + ds, + quote, + ) def graphics_update_cycle( ds: DisplayState, + quote: dict, wap_in_history: bool = False, trigger_all: bool = False, # flag used by prepend history updates prepend_update_index: Optional[int] = None, @@ -445,6 +519,12 @@ def graphics_update_cycle( chart = ds.chart # TODO: just pass this as a direct ref to avoid so many attr accesses? hist_chart = ds.godwidget.hist_linked.chart + assert hist_chart + + flume = ds.flume + sym = flume.symbol + fqsn = sym.fqsn + main_flow = chart._flows[fqsn] profiler = Profiler( msg=f'Graphics loop cycle for: `{chart.name}`', @@ -461,357 +541,381 @@ def graphics_update_cycle( # rt "HFT" chart l1 = ds.l1 - ohlcv = ds.ohlcv + # ohlcv = ds.ohlcv + ohlcv = flume.rt_shm array = ohlcv.array vars = ds.vars tick_margin = vars['tick_margin'] - for sym, quote in ds.quotes.items(): + ( + uppx, + liv, + do_append, + i_diff, + append_diff, + do_rt_update, + ) = ds.incr_info() + + # don't real-time "shift" the curve to the + # left unless we get one of the following: + if ( ( - uppx, - liv, - do_append, - i_diff, - append_diff, - do_rt_update, - ) = ds.incr_info() + do_append + and liv + ) + or trigger_all + ): + # print(f'INCREMENTING {fqsn}') + chart.increment_view(steps=i_diff) + main_flow.plot.vb._set_yrange( + # yrange=(mn, mx), + ) - # TODO: we should only run mxmn when we know - # an update is due via ``do_append`` above. - ( - brange, - mx_in_view, - mn_in_view, - mx_vlm_in_view, - ) = ds.maxmin() - l, lbar, rbar, r = brange - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin + # NOTE: since vlm and ohlc charts are axis linked now we don't + # need the double increment request? + # if vlm_chart: + # vlm_chart.increment_view(steps=i_diff) - profiler('`ds.maxmin()` call') + profiler('view incremented') - if ( - prepend_update_index is not None - and lbar > prepend_update_index - ): - # on a history update (usually from the FSP subsys) - # if the segment of history that is being prepended - # isn't in view there is no reason to do a graphics - # update. - log.debug('Skipping prepend graphics cycle: frame not in view') - return + ticks_frame = quote.get('ticks', ()) - # don't real-time "shift" the curve to the - # left unless we get one of the following: - if ( - (do_append and liv) - or trigger_all - ): - chart.increment_view(steps=i_diff) - chart.view._set_yrange(yrange=(mn, mx)) + frames_by_type: dict[str, dict] = {} + lasts = {} - if vlm_chart: - vlm_chart.increment_view(steps=i_diff) + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. - profiler('view incremented') + # iterate in FIFO order per tick-frame + # if sym != fqsn: + # continue - ticks_frame = quote.get('ticks', ()) + # TODO: we should only run mxmn when we know + # an update is due via ``do_append`` above. + ( + brange, + mx_in_view, + mn_in_view, + mx_vlm_in_view, + ) = ds.maxmin() + l, lbar, rbar, r = brange + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin + profiler('`ds.maxmin()` call') - frames_by_type: dict[str, dict] = {} - lasts = {} + if ( + prepend_update_index is not None + and lbar > prepend_update_index + ): + # on a history update (usually from the FSP subsys) + # if the segment of history that is being prepended + # isn't in view there is no reason to do a graphics + # update. + log.debug('Skipping prepend graphics cycle: frame not in view') + return - # build tick-type "frames" of tick sequences since - # likely the tick arrival rate is higher then our - # (throttled) quote stream rate. - for tick in ticks_frame: - price = tick.get('price') - ticktype = tick.get('type') + for tick in ticks_frame: + price = tick.get('price') + ticktype = tick.get('type') - if ticktype == 'n/a' or price == -1: - # okkk.. - continue + # if ticktype == 'n/a' or price == -1: + # # okkk.. + # continue - # keys are entered in olded-event-inserted-first order - # since we iterate ``ticks_frame`` in standard order - # above. in other words the order of the keys is the order - # of tick events by type from the provider feed. - frames_by_type.setdefault(ticktype, []).append(tick) + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) - # overwrites so the last tick per type is the entry - lasts[ticktype] = tick + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick - # from pprint import pformat - # frame_counts = { - # typ: len(frame) for typ, frame in frames_by_type.items() - # } - # print(f'{pformat(frame_counts)}') - # print(f'framed: {pformat(frames_by_type)}') - # print(f'lasts: {pformat(lasts)}') + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print( + # f'{pformat(frame_counts)}\n' + # f'framed: {pformat(frames_by_type)}\n' + # f'lasts: {pformat(lasts)}\n' + # ) - # TODO: eventually we want to separate out the utrade (aka - # dark vlm prices) here and show them as an additional - # graphic. - clear_types = _tick_groups['clears'] + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = _tick_groups['clears'] - # XXX: if we wanted to iterate in "latest" (i.e. most - # current) tick first order as an optimization where we only - # update from the last tick from each type class. - # last_clear_updated: bool = False + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False - # update ohlc sampled price bars - if ( - do_rt_update - or do_append - or trigger_all - ): - chart.update_graphics_from_flow( - chart.name, - do_append=do_append, - ) - hist_chart.update_graphics_from_flow( - chart.name, - do_append=do_append, - ) + # update ohlc sampled price bars + if ( + do_rt_update + or do_append + or trigger_all + ): + chart.update_graphics_from_flow( + fqsn, + # chart.name, + do_append=do_append, + ) + main_flow.draw_last(array_key=fqsn) - # NOTE: we always update the "last" datum - # since the current range should at least be updated - # to it's max/min on the last pixel. + hist_chart.update_graphics_from_flow( + fqsn, + # chart.name, + do_append=do_append, + ) - # iterate in FIFO order per tick-frame - for typ, tick in lasts.items(): + # NOTE: we always update the "last" datum + # since the current range should at least be updated + # to it's max/min on the last pixel. - price = tick.get('price') - size = tick.get('size') + for typ, tick in lasts.items(): - # compute max and min prices (including bid/ask) from - # tick frames to determine the y-range for chart - # auto-scaling. - # TODO: we need a streaming minmax algo here, see def above. - if liv: - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) + price = tick.get('price') + size = tick.get('size') - if typ in clear_types: + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + if liv: + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) - # XXX: if we only wanted to update graphics from the - # "current"/"latest received" clearing price tick - # once (see alt iteration order above). - # if last_clear_updated: - # continue + if typ in clear_types: - # last_clear_updated = True - # we only want to update grahpics from the *last* - # tick event that falls under the "clearing price" - # set. + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue - # update price sticky(s) - end = array[-1] - ds.last_price_sticky.update_from_data( - *end[['index', 'close']] - ) - ds.hist_last_price_sticky.update_from_data( - *end[['index', 'close']] - ) + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. - if wap_in_history: - # update vwap overlay line - chart.update_graphics_from_flow( - 'bar_wap', - ) + # update price sticky(s) + end_ic = array[-1][['index', 'close']] + ds.last_price_sticky.update_from_data(*end_ic) + ds.hist_last_price_sticky.update_from_data(*end_ic) - # L1 book label-line updates - # XXX: is this correct for ib? - # if ticktype in ('trade', 'last'): - # if ticktype in ('last',): # 'size'): - if typ in ('last',): # 'size'): + if wap_in_history: + # update vwap overlay line + chart.update_graphics_from_flow('bar_wap') - label = { - l1.ask_label.fields['level']: l1.ask_label, - l1.bid_label.fields['level']: l1.bid_label, - }.get(price) + # L1 book label-line updates + # XXX: is this correct for ib? + # if ticktype in ('trade', 'last'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): - if ( - label is not None - and liv - ): - label.update_fields( - {'level': price, 'size': size} - ) + label = { + l1.ask_label.fields['level']: l1.ask_label, + l1.bid_label.fields['level']: l1.bid_label, + }.get(price) - # TODO: on trades should we be knocking down - # the relevant L1 queue? - # label.size -= size - - elif ( - typ in _tick_groups['asks'] - # TODO: instead we could check if the price is in the - # y-view-range? - and liv - ): - l1.ask_label.update_fields({'level': price, 'size': size}) - - elif ( - typ in _tick_groups['bids'] - # TODO: instead we could check if the price is in the - # y-view-range? - and liv - ): - l1.bid_label.update_fields({'level': price, 'size': size}) - - # check for y-range re-size - if (mx > vars['last_mx']) or (mn < vars['last_mn']): - - # fast chart resize case if ( - liv - and not chart._static_yrange == 'axis' + label is not None + and liv ): - main_vb = chart.view - if ( - main_vb._ic is None - or not main_vb._ic.is_set() - ): - # print(f'updating range due to mxmn') - main_vb._set_yrange( - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, - yrange=(mn, mx), - ) - - # check if slow chart needs a resize - ( - _, - hist_liv, - _, - _, - _, - _, - ) = ds.incr_info( - chart=hist_chart, - shm=ds.hist_ohlcv, - update_state=False, - ) - if hist_liv: - hist_chart.view._set_yrange(yrange=hist_chart.maxmin()) - - # XXX: update this every draw cycle to make L1-always-in-view work. - vars['last_mx'], vars['last_mn'] = mx, mn - - # run synchronous update on all linked flows - # TODO: should the "main" (aka source) flow be special? - for curve_name, flow in chart._flows.items(): - # update any overlayed fsp flows - if curve_name != chart.data_key: - update_fsp_chart( - chart, - flow, - curve_name, - array_key=curve_name, + label.update_fields( + {'level': price, 'size': size} ) + # TODO: on trades should we be knocking down + # the relevant L1 queue? + # label.size -= size + + elif ( + typ in _tick_groups['asks'] + # TODO: instead we could check if the price is in the + # y-view-range? + and liv + ): + l1.ask_label.update_fields({'level': price, 'size': size}) + + elif ( + typ in _tick_groups['bids'] + # TODO: instead we could check if the price is in the + # y-view-range? + and liv + ): + l1.bid_label.update_fields({'level': price, 'size': size}) + + # check for y-range re-size + if (mx > vars['last_mx']) or (mn < vars['last_mn']): + + # fast chart resize case + if ( + liv + and not chart._static_yrange == 'axis' + ): + # main_vb = chart.view + main_vb = chart._flows[fqsn].plot.vb + if ( + main_vb._ic is None + or not main_vb._ic.is_set() + ): + # print(f'updating range due to mxmn') + main_vb._set_yrange( + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + # yrange=(mn, mx), + ) + + # check if slow chart needs a resize + ( + _, + hist_liv, + _, + _, + _, + _, + ) = ds.incr_info( + chart=hist_chart, + shm=ds.hist_ohlcv, + update_state=False, + is_1m=True, + ) + if hist_liv: + flow = hist_chart._flows[fqsn] + flow.plot.vb._set_yrange( + # yrange=hist_chart.maxmin(name=fqsn), + ) + + # XXX: update this every draw cycle to make L1-always-in-view work. + vars['last_mx'], vars['last_mn'] = mx, mn + + # run synchronous update on all linked flows + # TODO: should the "main" (aka source) flow be special? + for curve_name, flow in chart._flows.items(): + # update any overlayed fsp flows + if ( + # curve_name != chart.data_key + curve_name != fqsn + and not flow.is_ohlc + ): + update_fsp_chart( + chart, + flow, + curve_name, + array_key=curve_name, + ) + + # even if we're downsampled bigly + # draw the last datum in the final + # px column to give the user the mx/mn + # range of that set. + if ( + liv + # and not do_append + # and not do_rt_update + ): + flow.draw_last( + array_key=curve_name, + only_last_uppx=True, + ) + + # volume chart logic.. + # TODO: can we unify this with the above loop? + if vlm_chart: + # always update y-label + ds.vlm_sticky.update_from_data( + *array[-1][['index', 'volume']] + ) + + if ( + ( + do_rt_update + or do_append + and liv + ) + or trigger_all + ): + # TODO: make it so this doesn't have to be called + # once the $vlm is up? + vlm_chart.update_graphics_from_flow( + 'volume', + # UGGGh, see ``maxmin()`` impl in `._fsp` for + # the overlayed plotitems... we need a better + # bay to invoke a maxmin per overlay.. + render=False, + # XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^ + # without this, since we disable the + # 'volume' (units) chart after the $vlm starts + # up we need to be sure to enable this + # auto-ranging otherwise there will be no handler + # connected to update accompanying overlay + # graphics.. + ) + profiler('`vlm_chart.update_graphics_from_flow()`') + + if ( + mx_vlm_in_view != vars['last_mx_vlm'] + ): + yrange = (0, mx_vlm_in_view * 1.375) + vlm_chart.view._set_yrange( + yrange=yrange, + ) + profiler('`vlm_chart.view._set_yrange()`') + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + vars['last_mx_vlm'] = mx_vlm_in_view + + # update all downstream FSPs + for curve_name, flow in vlm_chart._flows.items(): + + if ( + curve_name not in {'volume', fqsn} + and flow.render + and ( + liv and do_rt_update + or do_append + ) + # and not flow.is_ohlc + # and curve_name != fqsn + ): + update_fsp_chart( + vlm_chart, + flow, + curve_name, + array_key=curve_name, + # do_append=uppx < update_uppx, + do_append=do_append, + ) + # is this even doing anything? + # (pretty sure it's the real-time + # resizing from last quote?) + fvb = flow.plot.vb + fvb._set_yrange( + name=curve_name, + ) + + elif ( + curve_name != 'volume' + and not do_append + and liv + and uppx >= 1 # even if we're downsampled bigly # draw the last datum in the final # px column to give the user the mx/mn # range of that set. - if ( - not do_append - # and not do_rt_update - and liv - ): - flow.draw_last( - array_key=curve_name, - only_last_uppx=True, - ) - - # volume chart logic.. - # TODO: can we unify this with the above loop? - if vlm_chart: - # always update y-label - ds.vlm_sticky.update_from_data( - *array[-1][['index', 'volume']] - ) - - if ( - ( - do_rt_update - or do_append - and liv - ) - or trigger_all - ): - # TODO: make it so this doesn't have to be called - # once the $vlm is up? - vlm_chart.update_graphics_from_flow( - 'volume', - # UGGGh, see ``maxmin()`` impl in `._fsp` for - # the overlayed plotitems... we need a better - # bay to invoke a maxmin per overlay.. - render=False, - # XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^ - # without this, since we disable the - # 'volume' (units) chart after the $vlm starts - # up we need to be sure to enable this - # auto-ranging otherwise there will be no handler - # connected to update accompanying overlay - # graphics.. - ) - profiler('`vlm_chart.update_graphics_from_flow()`') - - if ( - mx_vlm_in_view != vars['last_mx_vlm'] - ): - yrange = (0, mx_vlm_in_view * 1.375) - vlm_chart.view._set_yrange( - yrange=yrange, - ) - profiler('`vlm_chart.view._set_yrange()`') - # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vars['last_mx_vlm'] = mx_vlm_in_view - - for curve_name, flow in vlm_chart._flows.items(): - - if ( - curve_name != 'volume' and - flow.render and ( - liv and - do_rt_update or do_append - ) - ): - update_fsp_chart( - vlm_chart, - flow, - curve_name, - array_key=curve_name, - # do_append=uppx < update_uppx, - do_append=do_append, - ) - # is this even doing anything? - # (pretty sure it's the real-time - # resizing from last quote?) - fvb = flow.plot.vb - fvb._set_yrange( - name=curve_name, - ) - - elif ( - curve_name != 'volume' - and not do_append - and liv - and uppx >= 1 - # even if we're downsampled bigly - # draw the last datum in the final - # px column to give the user the mx/mn - # range of that set. - ): - # always update the last datum-element - # graphic for all flows - # print(f'drawing last {flow.name}') - flow.draw_last(array_key=curve_name) + ): + # always update the last datum-element + # graphic for all flows + # print(f'drawing last {flow.name}') + flow.draw_last(array_key=curve_name) async def link_views_with_region( @@ -836,11 +940,12 @@ async def link_views_with_region( pen=pg.mkPen(hcolor('gunmetal')), brush=pg.mkBrush(hcolor('default_darkest')), ) - region.setZValue(10) # put linear region "in front" in layer terms + region.setOpacity(0) hist_pi.addItem(region, ignoreBounds=True) + region.setOpacity(6/16) - flow = rt_chart._flows[hist_chart.name] + flow = rt_chart._flows[flume.symbol.fqsn] assert flow # XXX: no idea why this doesn't work but it's causing @@ -948,6 +1053,32 @@ async def link_views_with_region( # region.sigRegionChangeFinished.connect(update_pi_from_region) +# force 0 to always be in view +def multi_maxmin( + chart: ChartPlotWidget, + names: list[str], + +) -> tuple[float, float]: + ''' + Flows "group" maxmin loop; assumes all named flows + are in the same co-domain and thus can be sorted + as one set. + + Iterates all the named flows and calls the chart + api to find their range values and return. + + TODO: really we should probably have a more built-in API + for this? + + ''' + mx = 0 + for name in names: + ymn, ymx = chart.maxmin(name=name) + mx = max(mx, ymx) + + return 0, mx + + async def display_symbol_data( godwidget: GodWidget, fqsns: list[str], @@ -974,13 +1105,13 @@ async def display_symbol_data( # ) for fqsn in fqsns: - loading_sym_key = sbar.open_status( f'loading {fqsn} ->', group_key=True ) feed: Feed + # assert len(fqsns) == 2 async with open_feed( fqsns, loglevel=loglevel, @@ -991,77 +1122,171 @@ async def display_symbol_data( ) as feed: - # TODO: right now we only show one symbol on charts, but - # overlays are coming muy pronto guey.. - assert len(feed.flumes) == 1 - flume = list(feed.flumes.values())[0] + # use expanded contract symbols passed back from feed layer. + fqsns = list(feed.flumes.keys()) - ohlcv: ShmArray = flume.rt_shm - hist_ohlcv: ShmArray = flume.hist_shm - - symbol = flume.symbol - fqsn = symbol.fqsn - brokername = symbol.brokers[0] - - step_size_s = 1 - tf_key = tf_in_1s[step_size_s] - - # load in symbol's ohlc data + # step_size_s = 1 + # tf_key = tf_in_1s[step_size_s] godwidget.window.setWindowTitle( - f'{fqsn} ' - f'tick:{symbol.tick_size} ' - f'step:{tf_key} ' + f'{fqsns} ' + # f'tick:{symbol.tick_size} ' + # f'step:{tf_key} ' ) - - rt_linked = godwidget.rt_linked - rt_linked._symbol = symbol - - # create top history view chart above the "main rt chart". - hist_linked = godwidget.hist_linked - hist_linked._symbol = symbol - hist_chart = hist_linked.plot_ohlc_main( - symbol, - hist_ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - # sidepane=False, - sidepane=godwidget.search, - ) - # don't show when not focussed - hist_linked.cursor.always_show_xlabel = False - # generate order mode side-pane UI # A ``FieldsForm`` form to configure order entry # and add as next-to-y-axis singleton pane pp_pane: FieldsForm = mk_order_pane_layout(godwidget) godwidget.pp_pane = pp_pane - # create main OHLC chart - ohlc_chart = rt_linked.plot_ohlc_main( - symbol, - ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - sidepane=pp_pane, - ) + # create top history view chart above the "main rt chart". + rt_linked = godwidget.rt_linked + hist_linked = godwidget.hist_linked - ohlc_chart._feeds[symbol.key] = feed - ohlc_chart.setFocus() + # NOTE: here we insert the slow-history chart set into + # the fast chart's splitter -> so it's a splitter of charts + # inside the first widget slot of a splitter of charts XD + rt_linked.splitter.insertWidget(0, hist_linked) - # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! - # plot historical vwap if available - wap_in_history = False - # if ( - # brokermod._show_wap_in_history - # and 'bar_wap' in bars.dtype.fields - # ): - # wap_in_history = True - # ohlc_chart.draw_curve( - # name='bar_wap', - # shm=ohlcv, - # color='default_light', - # add_label=False, - # ) + rt_chart: None | ChartPlotWidget = None + hist_chart: None | ChartPlotWidget = None + + bg_chart_color = 'grayest' + bg_last_bar_color = 'grayer' + + pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {} + + # load in ohlc data to a common linked but split chart set. + for fqsn, flume in feed.flumes.items(): + rt_linked._symbol = flume.symbol + hist_linked._symbol = flume.symbol + + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + fqsn = symbol.fqsn + + if hist_chart is None: + hist_chart = hist_linked.plot_ohlc_main( + symbol, + hist_ohlcv, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + # sidepane=False, + sidepane=godwidget.search, + ) + pis.setdefault(fqsn, [None, None])[1] = hist_chart.plotItem + else: + hist_pi = hist_chart.overlay_plotitem( + name=fqsn, + axis_title=fqsn, + ) + hist_pi.hideAxis('left') + hist_pi.hideAxis('bottom') + + curve, _ = hist_chart.draw_curve( + name=fqsn, + shm=hist_ohlcv, + array_key=fqsn, + overlay=hist_pi, + pi=hist_pi, + is_ohlc=True, + + color=bg_chart_color, + last_bar_color=bg_last_bar_color, + ) + + hist_pi.vb.maxmin = partial( + hist_chart.maxmin, + name=fqsn, + ) + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + flow = hist_chart._flows[fqsn] + assert flow.plot is hist_pi + + # group_mxmn = partial( + # multi_maxmin, + # names=fqsns, + # chart=hist_chart, + # ) + # hist_pi.vb._maxmin = group_mxmn + pis.setdefault(fqsn, [None, None])[1] = hist_pi + + # don't show when not focussed + hist_linked.cursor.always_show_xlabel = False + + # create main OHLC chart + if rt_chart is None: + rt_chart = rt_linked.plot_ohlc_main( + symbol, + ohlcv, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + sidepane=pp_pane, + ) + pis.setdefault(fqsn, [None, None])[0] = rt_chart.plotItem + else: + rt_pi = rt_chart.overlay_plotitem( + name=fqsn, + axis_title=fqsn, + ) + + rt_pi.hideAxis('left') + rt_pi.hideAxis('bottom') + + curve, _ = rt_chart.draw_curve( + name=fqsn, + shm=ohlcv, + array_key=fqsn, + overlay=rt_pi, + pi=rt_pi, + is_ohlc=True, + + color=bg_chart_color, + last_bar_color=bg_last_bar_color, + ) + rt_pi.vb.maxmin = partial( + rt_chart.maxmin, + name=fqsn, + ) + # multi_maxmin, + # names=fqsn + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + flow = rt_chart._flows[fqsn] + assert flow.plot is rt_pi + + # group_mxmn = partial( + # multi_maxmin, + # names=fqsns, + # chart=rt_chart, + # ) + # rt_pi.vb._maxmin = group_mxmn + pis.setdefault(fqsn, [None, None])[0] = rt_pi + + rt_chart._feeds[symbol.key] = feed + rt_chart.setFocus() + + # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! + # plot historical vwap if available + wap_in_history = False + # if ( + # brokermod._show_wap_in_history + # and 'bar_wap' in bars.dtype.fields + # ): + # wap_in_history = True + # rt_chart.draw_curve( + # name='bar_wap', + # shm=ohlcv, + # color='default_light', + # add_label=False, + # ) # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to @@ -1070,86 +1295,60 @@ async def display_symbol_data( rt_linked.focus() await trio.sleep(0) - # NOTE: here we insert the slow-history chart set into - # the fast chart's splitter -> so it's a splitter of charts - # inside the first widget slot of a splitter of charts XD - rt_linked.splitter.insertWidget(0, hist_linked) # XXX: if we wanted it at the bottom? # rt_linked.splitter.addWidget(hist_linked) rt_linked.focus() godwidget.resize_all() - vlm_chart: Optional[ChartPlotWidget] = None + vlms: dict[str, ChartPlotWidget] = {} async with trio.open_nursery() as ln: + for fqsn, flume in feed.flumes.items(): - # if available load volume related built-in display(s) - if ( - not symbol.broker_info[brokername].get('no_vlm', False) - and has_vlm(ohlcv) - ): - vlm_chart = await ln.start( - open_vlm_displays, + vlm_chart: Optional[ChartPlotWidget] = None + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + brokername = symbol.brokers[0] + # if available load volume related built-in display(s) + if ( + not symbol.broker_info[brokername].get('no_vlm', False) + and has_vlm(ohlcv) + ): + vlms[fqsn] = vlm_chart = await ln.start( + open_vlm_displays, + rt_linked, + flume, + ) + + # load (user's) FSP set (otherwise known as "indicators") + # from an input config. + ln.start_soon( + start_fsp_displays, rt_linked, - ohlcv, + flume, + loading_sym_key, + loglevel, ) - # load (user's) FSP set (otherwise known as "indicators") - # from an input config. - ln.start_soon( - start_fsp_displays, - rt_linked, - ohlcv, - loading_sym_key, - loglevel, - ) + # size view to data prior to order mode init + rt_chart.default_view() + rt_linked.graphics_cycle() + await trio.sleep(0) - # start graphics update loop after receiving first live quote - ln.start_soon( - graphics_update_loop, - ln, - godwidget, - flume, - wap_in_history, - vlm_chart, - ) + hist_chart.default_view( + bars_from_y=int(len(hist_ohlcv.array)), # size to data + y_offset=6116*2, # push it a little away from the y-axis + ) + hist_linked.graphics_cycle() + await trio.sleep(0) - await trio.sleep(0) + godwidget.resize_all() - # size view to data prior to order mode init - ohlc_chart.default_view() - rt_linked.graphics_cycle() - await trio.sleep(0) - - hist_chart.default_view( - bars_from_y=int(len(hist_ohlcv.array)), # size to data - y_offset=6116*2, # push it a little away from the y-axis - ) - hist_linked.graphics_cycle() - await trio.sleep(0) - - godwidget.resize_all() - - await link_views_with_region( - ohlc_chart, - hist_chart, - flume, - ) - - mode: OrderMode - async with ( - open_order_mode( - feed, - godwidget, - fqsn, - order_mode_started - ) as mode - ): if not vlm_chart: # trigger another view reset if no sub-chart - ohlc_chart.default_view() - - rt_linked.mode = mode + rt_chart.default_view() # let Qt run to render all widgets and make sure the # sidepanes line up vertically. @@ -1168,7 +1367,7 @@ async def display_symbol_data( # TODO: make this not so shit XD # close group status - sbar._status_groups[loading_sym_key][1]() + # sbar._status_groups[loading_sym_key][1]() hist_linked.graphics_cycle() await trio.sleep(0) @@ -1181,5 +1380,35 @@ async def display_symbol_data( ) godwidget.resize_all() + await link_views_with_region( + rt_chart, + hist_chart, + flume, + ) + + # start graphics update loop after receiving first live quote + ln.start_soon( + graphics_update_loop, + ln, + godwidget, + feed, + pis, + wap_in_history, + vlms, + ) + + await trio.sleep(0) + + mode: OrderMode + async with ( + open_order_mode( + feed, + godwidget, + fqsns[-1], + order_mode_started + ) as mode + ): + rt_linked.mode = mode + # let the app run.. bby await trio.sleep_forever() From eac79c5cddc138551dd8caba60ed3d081b7fc734 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Nov 2022 15:35:57 -0500 Subject: [PATCH 03/13] Adjust FSP UI/mgmt apis to be `Flume` oriented --- piker/ui/_fsp.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 8c2e64a1..370eeb02 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -42,6 +42,7 @@ from ..data._sharedmem import ( _Token, try_read, ) +from ..data.feed import Flume from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -111,7 +112,7 @@ def update_fsp_chart( # read from last calculated value and update any label last_val_sticky = chart.plotItem.getAxis( - 'right')._stickies.get(chart.name) + 'right')._stickies.get(graphics_name) if last_val_sticky: last = last_row[array_key] last_val_sticky.update_from_data(-1, last) @@ -363,7 +364,7 @@ class FspAdmin: tn: trio.Nursery, cluster: dict[str, tractor.Portal], linked: LinkedSplits, - src_shm: ShmArray, + flume: Flume, ) -> None: self.tn = tn @@ -375,7 +376,7 @@ class FspAdmin: tuple[tractor.MsgStream, ShmArray] ] = {} self._flow_registry: dict[_Token, str] = {} - self.src_shm = src_shm + self.flume = flume def rr_next_portal(self) -> tractor.Portal: name, portal = next(self._rr_next_actor) @@ -410,7 +411,7 @@ class FspAdmin: fqsn=fqsn, # mems - src_shm_token=self.src_shm.token, + src_shm_token=self.flume.rt_shm.token, dst_shm_token=dst_shm.token, # target @@ -470,7 +471,7 @@ class FspAdmin: ) -> (ShmArray, trio.Event): - fqsn = self.linked.symbol.front_fqsn() + fqsn = self.flume.symbol.fqsn # allocate an output shm array key, dst_shm, opened = maybe_mk_fsp_shm( @@ -479,7 +480,7 @@ class FspAdmin: readonly=True, ) self._flow_registry[( - self.src_shm._token, + self.flume.rt_shm._token, target.name )] = dst_shm._token @@ -541,7 +542,7 @@ class FspAdmin: @acm async def open_fsp_admin( linked: LinkedSplits, - src_shm: ShmArray, + flume: Flume, **kwargs, ) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: @@ -562,7 +563,7 @@ async def open_fsp_admin( tn, cluster_map, linked, - src_shm, + flume, ) try: yield admin @@ -576,7 +577,7 @@ async def open_fsp_admin( async def open_vlm_displays( linked: LinkedSplits, - ohlcv: ShmArray, + flume: Flume, dvlm: bool = True, task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, @@ -598,6 +599,8 @@ async def open_vlm_displays( sig = inspect.signature(flow_rates.func) params = sig.parameters + ohlcv: ShmArray = flume.rt_shm + async with ( open_fsp_sidepane( linked, { @@ -617,7 +620,7 @@ async def open_vlm_displays( } }, ) as sidepane, - open_fsp_admin(linked, ohlcv) as admin, + open_fsp_admin(linked, flume) as admin, ): # TODO: support updates # period_field = sidepane.fields['period'] @@ -645,6 +648,8 @@ async def open_vlm_displays( # the curve item internals are pretty convoluted. style='step', ) + # back-link the volume chart to trigger y-autoranging + # in the ohlc (parent) chart. ohlc_chart.view.enable_auto_yrange( src_vb=chart.view, ) @@ -930,7 +935,7 @@ async def open_vlm_displays( async def start_fsp_displays( linked: LinkedSplits, - ohlcv: ShmArray, + flume: Flume, group_status_key: str, loglevel: str, @@ -973,7 +978,10 @@ async def start_fsp_displays( async with ( # NOTE: this admin internally opens an actor cluster - open_fsp_admin(linked, ohlcv) as admin, + open_fsp_admin( + linked, + flume, + ) as admin, ): statuses = [] for target, conf in fsp_conf.items(): From 92c50aa6a72f6deec6adedaf84c1880b668dbd01 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Nov 2022 16:10:41 -0500 Subject: [PATCH 04/13] Drop tick frame builder loop for now --- piker/ui/_display.py | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 2e43b475..6e815365 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -580,9 +580,8 @@ def graphics_update_cycle( profiler('view incremented') ticks_frame = quote.get('ticks', ()) - - frames_by_type: dict[str, dict] = {} - lasts = {} + # frames_by_type: dict[str, dict] = {} + # lasts = {} # build tick-type "frames" of tick sequences since # likely the tick arrival rate is higher then our @@ -616,22 +615,22 @@ def graphics_update_cycle( log.debug('Skipping prepend graphics cycle: frame not in view') return - for tick in ticks_frame: - price = tick.get('price') - ticktype = tick.get('type') + # for tick in ticks_frame: + # price = tick.get('price') + # ticktype = tick.get('type') - # if ticktype == 'n/a' or price == -1: - # # okkk.. - # continue + # # if ticktype == 'n/a' or price == -1: + # # # okkk.. + # # continue - # keys are entered in olded-event-inserted-first order - # since we iterate ``ticks_frame`` in standard order - # above. in other words the order of the keys is the order - # of tick events by type from the provider feed. - frames_by_type.setdefault(ticktype, []).append(tick) + # # keys are entered in olded-event-inserted-first order + # # since we iterate ``ticks_frame`` in standard order + # # above. in other words the order of the keys is the order + # # of tick events by type from the provider feed. + # frames_by_type.setdefault(ticktype, []).append(tick) - # overwrites so the last tick per type is the entry - lasts[ticktype] = tick + # # overwrites so the last tick per type is the entry + # lasts[ticktype] = tick # from pprint import pformat # frame_counts = { @@ -675,12 +674,19 @@ def graphics_update_cycle( # NOTE: we always update the "last" datum # since the current range should at least be updated # to it's max/min on the last pixel. + typs: set[str] = set() - for typ, tick in lasts.items(): - + # for typ, tick in lasts.items(): + for tick in ticks_frame: + typ = tick.get('type') price = tick.get('price') size = tick.get('size') + if typ in typs: + continue + + typs.add(typ) + # compute max and min prices (including bid/ask) from # tick frames to determine the y-range for chart # auto-scaling. From 6986be1b21f5436c2c945ead0238a7d0c5556f37 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Nov 2022 13:40:15 -0500 Subject: [PATCH 05/13] Define a single `ChartPlotWidget.feed: Feed` for pause/resume --- piker/ui/_chart.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index e3967f81..a8d0e20e 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -896,7 +896,7 @@ class ChartPlotWidget(pg.PlotWidget): # registry of overlay curve names self._flows: dict[str, Flow] = {} - self._feeds: dict[Symbol, Feed] = {} + self.feed: Feed | None = None self._labels = {} # registry of underlying graphics self._ysticks = {} # registry of underlying graphics @@ -917,20 +917,18 @@ class ChartPlotWidget(pg.PlotWidget): self._on_screen: bool = False def resume_all_feeds(self): - ... - # try: - # for feed in self._feeds.values(): - # for flume in feed.flumes.values(): - # self.linked.godwidget._root_n.start_soon(flume.resume) - # except RuntimeError: - # # TODO: cancel the qtractor runtime here? - # raise + feed = self.feed + if feed: + try: + self.linked.godwidget._root_n.start_soon(feed.resume) + except RuntimeError: + # TODO: cancel the qtractor runtime here? + raise def pause_all_feeds(self): - ... - # for feed in self._feeds.values(): - # for flume in feed.flumes.values(): - # self.linked.godwidget._root_n.start_soon(flume.pause) + feed = self.feed + if feed: + self.linked.godwidget._root_n.start_soon(feed.pause) @property def view(self) -> ChartView: From 1d83b43efe84d256ae8e23da288df12d04887a2d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Nov 2022 17:33:34 -0500 Subject: [PATCH 06/13] Factor setup loop, 1 FSP chain, colors, throttling Factor out the chart widget creation since it's only executed once during rendering of the first feed/flow whilst keeping plotitem overlay creation inside the (flume oriented) init loop. Only create one vlm and FSP chart/chain for now until we figure out if we want FSPs overlayed by default or selected based on the "front" symbol in use. Add a default color-palette set using shades of gray when plotting overlays. Presume that the display loop's quote throttle rate should be uniformly distributed over all input symbol-feeds for now. Restore feed pausing on mouse interaction. --- piker/ui/_display.py | 346 +++++++++++++++++++++++-------------------- 1 file changed, 184 insertions(+), 162 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 6e815365..fb87d41c 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -22,6 +22,7 @@ graphics update methods via our custom ``pyqtgraph`` charting api. ''' from functools import partial +import itertools import time from typing import Optional, Any, Callable @@ -73,7 +74,7 @@ from .._profile import Profiler log = get_logger(__name__) # TODO: load this from a config.toml! -_quote_throttle_rate: int = 16 # Hz +_quote_throttle_rate: int = round(60 * 6/16) # Hz # a working tick-type-classes template @@ -93,7 +94,7 @@ def chart_maxmin( chart: ChartPlotWidget, fqsn: str, # ohlcv_shm: ShmArray, - vlm_chart: Optional[ChartPlotWidget] = None, + vlm_chart: ChartPlotWidget | None = None, ) -> tuple[ @@ -487,16 +488,16 @@ async def graphics_update_loop( fast_chart.linked.isHidden() or not rt_pi.isVisible() ): - print(f'{fqsn} skipping update -> CHART HIDDEN') + print(f'{fqsn} skipping update for HIDDEN CHART') + fast_chart.pause_all_feeds() continue - # if fast_chart.linked.isHidden(): - # fast_chart.pause_all_feeds() - # ic = fast_chart.view._ic - # if ic: - # fast_chart.pause_all_feeds() - # await ic.wait() - # fast_chart.resume_all_feeds() + ic = fast_chart.view._ic + if ic: + fast_chart.pause_all_feeds() + print(f'{fqsn} PAUSING DURING INTERACTION') + await ic.wait() + fast_chart.resume_all_feeds() # sync call to update all graphics/UX components. graphics_update_cycle( @@ -837,6 +838,9 @@ def graphics_update_cycle( # volume chart logic.. # TODO: can we unify this with the above loop? if vlm_chart: + # print(f"DOING VLM {fqsn}") + vlm_flows = vlm_chart._flows + # always update y-label ds.vlm_sticky.update_from_data( *array[-1][['index', 'volume']] @@ -879,49 +883,49 @@ def graphics_update_cycle( # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') vars['last_mx_vlm'] = mx_vlm_in_view - # update all downstream FSPs - for curve_name, flow in vlm_chart._flows.items(): + # update all downstream FSPs + for curve_name, flow in vlm_flows.items(): - if ( - curve_name not in {'volume', fqsn} - and flow.render - and ( - liv and do_rt_update - or do_append - ) - # and not flow.is_ohlc - # and curve_name != fqsn - ): - update_fsp_chart( - vlm_chart, - flow, - curve_name, - array_key=curve_name, - # do_append=uppx < update_uppx, - do_append=do_append, - ) - # is this even doing anything? - # (pretty sure it's the real-time - # resizing from last quote?) - fvb = flow.plot.vb - fvb._set_yrange( - name=curve_name, - ) + if ( + curve_name not in {'volume', fqsn} + and flow.render + and ( + liv and do_rt_update + or do_append + ) + # and not flow.is_ohlc + # and curve_name != fqsn + ): + update_fsp_chart( + vlm_chart, + flow, + curve_name, + array_key=curve_name, + # do_append=uppx < update_uppx, + do_append=do_append, + ) + # is this even doing anything? + # (pretty sure it's the real-time + # resizing from last quote?) + fvb = flow.plot.vb + fvb._set_yrange( + name=curve_name, + ) - elif ( - curve_name != 'volume' - and not do_append - and liv - and uppx >= 1 - # even if we're downsampled bigly - # draw the last datum in the final - # px column to give the user the mx/mn - # range of that set. - ): - # always update the last datum-element - # graphic for all flows - # print(f'drawing last {flow.name}') - flow.draw_last(array_key=curve_name) + elif ( + curve_name != 'volume' + and not do_append + and liv + and uppx >= 1 + # even if we're downsampled bigly + # draw the last datum in the final + # px column to give the user the mx/mn + # range of that set. + ): + # always update the last datum-element + # graphic for all flows + # print(f'drawing last {flow.name}') + flow.draw_last(array_key=curve_name) async def link_views_with_region( @@ -1124,7 +1128,8 @@ async def display_symbol_data( # limit to at least display's FPS # avoiding needless Qt-in-guest-mode context switches - tick_throttle=_quote_throttle_rate, + tick_throttle=round(_quote_throttle_rate/len(fqsns)), + # tick_throttle=round(_quote_throttle_rate), ) as feed: @@ -1155,34 +1160,122 @@ async def display_symbol_data( rt_chart: None | ChartPlotWidget = None hist_chart: None | ChartPlotWidget = None + vlm_chart: None | ChartPlotWidget = None - bg_chart_color = 'grayest' - bg_last_bar_color = 'grayer' + # TODO: I think some palette's based on asset group types + # would be good, for eg: + # - underlying and opts contracts + # - index and underlyings + futures + # - gradient in "lightness" based on liquidity, or lifetime in derivs? + palette = itertools.cycle([ + # curve color, last bar curve color + ['i3', 'gray'], + ['grayer', 'bracket'], + ['grayest', 'i3'], + ['default_dark', 'default'], + ]) pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {} # load in ohlc data to a common linked but split chart set. - for fqsn, flume in feed.flumes.items(): - rt_linked._symbol = flume.symbol - hist_linked._symbol = flume.symbol + fitems: list[ + tuple[str, Flume] + ] = list(feed.flumes.items()) - ohlcv: ShmArray = flume.rt_shm - hist_ohlcv: ShmArray = flume.hist_shm + # for the "first"/selected symbol we create new chart widgets + # and sub-charts for FSPs + fqsn, flume = fitems[0] - symbol = flume.symbol - fqsn = symbol.fqsn + rt_linked._symbol = flume.symbol + hist_linked._symbol = flume.symbol - if hist_chart is None: - hist_chart = hist_linked.plot_ohlc_main( - symbol, - hist_ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - # sidepane=False, - sidepane=godwidget.search, + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + brokername = symbol.brokers[0] + fqsn = symbol.fqsn + + hist_chart = hist_linked.plot_ohlc_main( + symbol, + hist_ohlcv, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + # sidepane=False, + sidepane=godwidget.search, + ) + pis.setdefault(fqsn, [None, None])[1] = hist_chart.plotItem + + # don't show when not focussed + hist_linked.cursor.always_show_xlabel = False + + rt_chart = rt_linked.plot_ohlc_main( + symbol, + ohlcv, + # in the case of history chart we explicitly set `False` + # to avoid internal pane creation. + sidepane=pp_pane, + ) + pis.setdefault(fqsn, [None, None])[0] = rt_chart.plotItem + + # for pause/resume on mouse interaction + rt_chart.feed = feed + + async with trio.open_nursery() as ln: + # if available load volume related built-in display(s) + vlm_charts: dict[ + str, + None | ChartPlotWidget + ] = {}.fromkeys(feed.flumes) + if ( + not symbol.broker_info[brokername].get('no_vlm', False) + and has_vlm(ohlcv) + and vlm_chart is None + ): + vlm_charts[fqsn] = await ln.start( + open_vlm_displays, + rt_linked, + flume, ) - pis.setdefault(fqsn, [None, None])[1] = hist_chart.plotItem - else: + + # load (user's) FSP set (otherwise known as "indicators") + # from an input config. + ln.start_soon( + start_fsp_displays, + rt_linked, + flume, + loading_sym_key, + loglevel, + ) + + # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! + # plot historical vwap if available + wap_in_history = False + # if ( + # brokermod._show_wap_in_history + # and 'bar_wap' in bars.dtype.fields + # ): + # wap_in_history = True + # rt_chart.draw_curve( + # name='bar_wap', + # shm=ohlcv, + # color='default_light', + # add_label=False, + # ) + + for fqsn, flume in fitems[1:]: + # get a new color from the palette + bg_chart_color, bg_last_bar_color = next(palette) + + rt_linked._symbol = flume.symbol + hist_linked._symbol = flume.symbol + + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + fqsn = symbol.fqsn + hist_pi = hist_chart.overlay_plotitem( name=fqsn, axis_title=fqsn, @@ -1212,29 +1305,8 @@ async def display_symbol_data( # ``.draw_curve()``. flow = hist_chart._flows[fqsn] assert flow.plot is hist_pi - - # group_mxmn = partial( - # multi_maxmin, - # names=fqsns, - # chart=hist_chart, - # ) - # hist_pi.vb._maxmin = group_mxmn pis.setdefault(fqsn, [None, None])[1] = hist_pi - # don't show when not focussed - hist_linked.cursor.always_show_xlabel = False - - # create main OHLC chart - if rt_chart is None: - rt_chart = rt_linked.plot_ohlc_main( - symbol, - ohlcv, - # in the case of history chart we explicitly set `False` - # to avoid internal pane creation. - sidepane=pp_pane, - ) - pis.setdefault(fqsn, [None, None])[0] = rt_chart.plotItem - else: rt_pi = rt_chart.overlay_plotitem( name=fqsn, axis_title=fqsn, @@ -1258,8 +1330,6 @@ async def display_symbol_data( rt_chart.maxmin, name=fqsn, ) - # multi_maxmin, - # names=fqsn # TODO: we need a better API to do this.. # specially store ref to shm for lookup in display loop @@ -1267,77 +1337,26 @@ async def display_symbol_data( # ``.draw_curve()``. flow = rt_chart._flows[fqsn] assert flow.plot is rt_pi - - # group_mxmn = partial( - # multi_maxmin, - # names=fqsns, - # chart=rt_chart, - # ) - # rt_pi.vb._maxmin = group_mxmn pis.setdefault(fqsn, [None, None])[0] = rt_pi - rt_chart._feeds[symbol.key] = feed rt_chart.setFocus() - # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! - # plot historical vwap if available - wap_in_history = False - # if ( - # brokermod._show_wap_in_history - # and 'bar_wap' in bars.dtype.fields - # ): - # wap_in_history = True - # rt_chart.draw_curve( - # name='bar_wap', - # shm=ohlcv, - # color='default_light', - # add_label=False, - # ) + # NOTE: we must immediately tell Qt to show the OHLC chart + # to avoid a race where the subplots get added/shown to + # the linked set *before* the main price chart! + rt_linked.show() + rt_linked.focus() + await trio.sleep(0) - # NOTE: we must immediately tell Qt to show the OHLC chart - # to avoid a race where the subplots get added/shown to - # the linked set *before* the main price chart! - rt_linked.show() - rt_linked.focus() - await trio.sleep(0) + # XXX: if we wanted it at the bottom? + # rt_linked.splitter.addWidget(hist_linked) + rt_linked.focus() - # XXX: if we wanted it at the bottom? - # rt_linked.splitter.addWidget(hist_linked) - rt_linked.focus() + godwidget.resize_all() - godwidget.resize_all() - - vlms: dict[str, ChartPlotWidget] = {} - async with trio.open_nursery() as ln: + # add all additional symbols as overlays for fqsn, flume in feed.flumes.items(): - vlm_chart: Optional[ChartPlotWidget] = None - ohlcv: ShmArray = flume.rt_shm - hist_ohlcv: ShmArray = flume.hist_shm - - symbol = flume.symbol - brokername = symbol.brokers[0] - # if available load volume related built-in display(s) - if ( - not symbol.broker_info[brokername].get('no_vlm', False) - and has_vlm(ohlcv) - ): - vlms[fqsn] = vlm_chart = await ln.start( - open_vlm_displays, - rt_linked, - flume, - ) - - # load (user's) FSP set (otherwise known as "indicators") - # from an input config. - ln.start_soon( - start_fsp_displays, - rt_linked, - flume, - loading_sym_key, - loglevel, - ) - # size view to data prior to order mode init rt_chart.default_view() rt_linked.graphics_cycle() @@ -1352,9 +1371,9 @@ async def display_symbol_data( godwidget.resize_all() - if not vlm_chart: - # trigger another view reset if no sub-chart - rt_chart.default_view() + # trigger another view reset if no sub-chart + hist_chart.default_view() + rt_chart.default_view() # let Qt run to render all widgets and make sure the # sidepanes line up vertically. @@ -1376,6 +1395,7 @@ async def display_symbol_data( # sbar._status_groups[loading_sym_key][1]() hist_linked.graphics_cycle() + rt_chart.default_view() await trio.sleep(0) bars_in_mem = int(len(hist_ohlcv.array)) @@ -1400,9 +1420,10 @@ async def display_symbol_data( feed, pis, wap_in_history, - vlms, + vlm_charts, ) + rt_chart.default_view() await trio.sleep(0) mode: OrderMode @@ -1416,5 +1437,6 @@ async def display_symbol_data( ): rt_linked.mode = mode - # let the app run.. bby - await trio.sleep_forever() + rt_chart.default_view() + hist_chart.default_view() + await trio.sleep_forever() # let the app run.. bby From 1aa9ab03dadae8875b5ad41c1cd01d3cf3190ce1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Nov 2022 17:38:52 -0500 Subject: [PATCH 07/13] Brighter last OHLC graphics datum by default --- piker/ui/_chart.py | 1 - piker/ui/_ohlc.py | 2 +- piker/ui/_style.py | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a8d0e20e..8c1169b8 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1224,7 +1224,6 @@ class ChartPlotWidget(pg.PlotWidget): graphics = BarItems( linked=self.linked, plotitem=pi, - # pen_color=self.pen_color, color=color, name=name, **graphics_kwargs, diff --git a/piker/ui/_ohlc.py b/piker/ui/_ohlc.py index a8519d90..2ce23d30 100644 --- a/piker/ui/_ohlc.py +++ b/piker/ui/_ohlc.py @@ -99,7 +99,7 @@ class BarItems(pg.GraphicsObject): linked: LinkedSplits, plotitem: 'pg.PlotItem', # noqa color: str = 'bracket', - last_bar_color: str = 'bracket', + last_bar_color: str = 'original', name: Optional[str] = None, diff --git a/piker/ui/_style.py b/piker/ui/_style.py index 22731093..52ac753a 100644 --- a/piker/ui/_style.py +++ b/piker/ui/_style.py @@ -240,12 +240,12 @@ def hcolor(name: str) -> str: 'gunmetal': '#91A3B0', 'battleship': '#848482', + # default ohlc-bars/curve gray + 'bracket': '#666666', # like the logo + # bluish 'charcoal': '#36454F', - # default bars - 'bracket': '#666666', # like the logo - # work well for filled polygons which want a 'bracket' feel # going light to dark 'davies': '#555555', From 03821fdf6f2252d56cb9f8b6cb495361dc363866 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Nov 2022 17:59:06 -0500 Subject: [PATCH 08/13] Expect and update from by-type tick frames Move to expect and process new by-tick-event frames where the display loop can now just iterate the most recent tick events by type instead of the entire tick history sequence - thus we reduce iterations inside the update loop. Also, go back to use using the detected display's refresh rate (minus 6) as the default feed requested throttle rate since we can now handle much more bursty-ness in display updates thanks to the new framing format B) --- piker/ui/_display.py | 75 +++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 43 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index fb87d41c..6101ad2e 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -23,6 +23,7 @@ graphics update methods via our custom ``pyqtgraph`` charting api. ''' from functools import partial import itertools +from math import floor import time from typing import Optional, Any, Callable @@ -60,6 +61,7 @@ from ._forms import ( ) from . import _pg_overrides as pgo # from ..data._source import tf_in_1s +from ..data._sampling import _tick_groups from .order_mode import ( open_order_mode, OrderMode, @@ -73,17 +75,6 @@ from .._profile import Profiler log = get_logger(__name__) -# TODO: load this from a config.toml! -_quote_throttle_rate: int = round(60 * 6/16) # Hz - - -# a working tick-type-classes template -_tick_groups = { - 'clears': {'trade', 'utrade', 'last'}, - 'bids': {'bid', 'bsize'}, - 'asks': {'ask', 'asize'}, -} - # TODO: delegate this to each `Flow.maxmin()` which includes # caching and further we should implement the following stream based @@ -580,7 +571,6 @@ def graphics_update_cycle( profiler('view incremented') - ticks_frame = quote.get('ticks', ()) # frames_by_type: dict[str, dict] = {} # lasts = {} @@ -616,33 +606,6 @@ def graphics_update_cycle( log.debug('Skipping prepend graphics cycle: frame not in view') return - # for tick in ticks_frame: - # price = tick.get('price') - # ticktype = tick.get('type') - - # # if ticktype == 'n/a' or price == -1: - # # # okkk.. - # # continue - - # # keys are entered in olded-event-inserted-first order - # # since we iterate ``ticks_frame`` in standard order - # # above. in other words the order of the keys is the order - # # of tick events by type from the provider feed. - # frames_by_type.setdefault(ticktype, []).append(tick) - - # # overwrites so the last tick per type is the entry - # lasts[ticktype] = tick - - # from pprint import pformat - # frame_counts = { - # typ: len(frame) for typ, frame in frames_by_type.items() - # } - # print( - # f'{pformat(frame_counts)}\n' - # f'framed: {pformat(frames_by_type)}\n' - # f'lasts: {pformat(lasts)}\n' - # ) - # TODO: eventually we want to separate out the utrade (aka # dark vlm prices) here and show them as an additional # graphic. @@ -677,9 +640,26 @@ def graphics_update_cycle( # to it's max/min on the last pixel. typs: set[str] = set() + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print( + # f'{pformat(frame_counts)}\n' + # f'framed: {pformat(frames_by_type)}\n' + # f'lasts: {pformat(lasts)}\n' + # ) # for typ, tick in lasts.items(): - for tick in ticks_frame: - typ = tick.get('type') + # ticks_frame = quote.get('ticks', ()) + ticks_by_type = quote.get('tbt', {}) + + # for tick in ticks_frame: + for typ, ticks in ticks_by_type.items(): + + # NOTE: ticks are `.append()`-ed to the `ticks_by_type: dict` by the + # `._sampling.uniform_rate_send()` loop + tick = ticks[-1] + # typ = tick.get('type') price = tick.get('price') size = tick.get('size') @@ -1089,6 +1069,9 @@ def multi_maxmin( return 0, mx +_quote_throttle_rate: int = 60 - 6 + + async def display_symbol_data( godwidget: GodWidget, fqsns: list[str], @@ -1120,8 +1103,15 @@ async def display_symbol_data( group_key=True ) + # TODO: ctl over update loop's maximum frequency. + # - load this from a config.toml! + # - allow dyanmic configuration from chart UI? + global _quote_throttle_rate + from ._window import main_window + display_rate = main_window().current_screen().refreshRate() + _quote_throttle_rate = floor(display_rate) - 6 + feed: Feed - # assert len(fqsns) == 2 async with open_feed( fqsns, loglevel=loglevel, @@ -1129,7 +1119,6 @@ async def display_symbol_data( # limit to at least display's FPS # avoiding needless Qt-in-guest-mode context switches tick_throttle=round(_quote_throttle_rate/len(fqsns)), - # tick_throttle=round(_quote_throttle_rate), ) as feed: From 69ea296a9bbc135c87450743db16cd1e0b0a23cd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Nov 2022 13:21:15 -0500 Subject: [PATCH 09/13] Max out per symbol throttle @ 22Hz --- piker/ui/_display.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 6101ad2e..743460b9 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -625,14 +625,14 @@ def graphics_update_cycle( chart.update_graphics_from_flow( fqsn, # chart.name, - do_append=do_append, + # do_append=do_append, ) main_flow.draw_last(array_key=fqsn) hist_chart.update_graphics_from_flow( fqsn, # chart.name, - do_append=do_append, + # do_append=do_append, ) # NOTE: we always update the "last" datum @@ -882,7 +882,7 @@ def graphics_update_cycle( curve_name, array_key=curve_name, # do_append=uppx < update_uppx, - do_append=do_append, + # do_append=do_append, ) # is this even doing anything? # (pretty sure it's the real-time @@ -1118,7 +1118,10 @@ async def display_symbol_data( # limit to at least display's FPS # avoiding needless Qt-in-guest-mode context switches - tick_throttle=round(_quote_throttle_rate/len(fqsns)), + tick_throttle=min( + round(_quote_throttle_rate/len(fqsns)), + 22, + ), ) as feed: From 8d592886fae3cfb19ad35e8df99b8b192d54bdcf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Nov 2022 14:48:30 -0500 Subject: [PATCH 10/13] Pass `Flume`s throughout FSP-ui and charting APIs Since higher level charting and fsp management need access to the new `Flume` indexing apis this adjusts some func sigs to pass through (and/or create) flume instances: - `LinkedSplits.add_plot()` and dependents. - `ChartPlotWidget.draw_curve()` and deps, and it now returns a `Flow`. - `.ui._fsp.open_fsp_admin()` and `FspAdmin.open_fsp_ui()` related methods => now we wrap the destination fsp shm in a flume on the admin side and is returned from `.start_engine_method()`. Drop a bunch of (unused) chart widget methods including some already moved to flume methods: `.get_index()`, `.in_view()`, `.last_bar_in_view()`, `.is_valid_index()`. --- piker/data/feed.py | 5 +-- piker/ui/_chart.py | 95 +++++++++++++++++++------------------------- piker/ui/_display.py | 16 +++++--- piker/ui/_flows.py | 40 ++++++++++++++----- piker/ui/_fsp.py | 84 ++++++++++++++++++++++++++++----------- 5 files changed, 143 insertions(+), 97 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 534aebc9..d91b890e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1037,12 +1037,11 @@ async def allocate_persistent_feed( flume = Flume( symbol=symbol, - _hist_shm_token=hist_shm.token, - _rt_shm_token=rt_shm.token, first_quote=first_quote, + _rt_shm_token=rt_shm.token, + _hist_shm_token=hist_shm.token, izero_hist=izero_hist, izero_rt=izero_rt, - # throttle_rate=tick_throttle, ) # for ambiguous names we simply apply the retreived diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 8c1169b8..bfe1c110 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -38,7 +38,6 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QSplitter, ) -import numpy as np import pyqtgraph as pg import trio @@ -63,7 +62,10 @@ from ._style import ( _xaxis_at, _min_points_to_show, ) -from ..data.feed import Feed +from ..data.feed import ( + Feed, + Flume, +) from ..data._source import Symbol from ..log import get_logger from ._interaction import ChartView @@ -538,6 +540,7 @@ class LinkedSplits(QWidget): symbol: Symbol, shm: ShmArray, + flume: Flume, sidepane: FieldsForm, style: str = 'ohlc_bar', @@ -562,6 +565,7 @@ class LinkedSplits(QWidget): self.chart = self.add_plot( name=symbol.fqsn, shm=shm, + flume=flume, style=style, _is_main=True, sidepane=sidepane, @@ -582,6 +586,7 @@ class LinkedSplits(QWidget): name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, style: str = 'line', @@ -705,9 +710,11 @@ class LinkedSplits(QWidget): # draw curve graphics if style == 'ohlc_bar': - graphics, data_key = cpw.draw_ohlc( + # graphics, data_key = cpw.draw_ohlc( + flow = cpw.draw_ohlc( name, shm, + flume=flume, array_key=array_key ) self.cursor.contents_labels.add_label( @@ -719,18 +726,22 @@ class LinkedSplits(QWidget): elif style == 'line': add_label = True - graphics, data_key = cpw.draw_curve( + # graphics, data_key = cpw.draw_curve( + flow = cpw.draw_curve( name, shm, + flume, array_key=array_key, color='default_light', ) elif style == 'step': add_label = True - graphics, data_key = cpw.draw_curve( + # graphics, data_key = cpw.draw_curve( + flow = cpw.draw_curve( name, shm, + flume, array_key=array_key, step_mode=True, color='davies', @@ -740,6 +751,9 @@ class LinkedSplits(QWidget): else: raise ValueError(f"Chart style {style} is currently unsupported") + graphics = flow.graphics + data_key = flow.name + if _is_main: assert style == 'ohlc_bar', 'main chart must be OHLC' else: @@ -937,12 +951,6 @@ class ChartPlotWidget(pg.PlotWidget): def focus(self) -> None: self.view.setFocus() - def last_bar_in_view(self) -> int: - self._arrays[self.name][-1]['index'] - - def is_valid_index(self, index: int) -> bool: - return index >= 0 and index < self._arrays[self.name][-1]['index'] - def _set_xlimits( self, xfirst: int, @@ -1035,9 +1043,14 @@ class ChartPlotWidget(pg.PlotWidget): log.warning(f'`Flow` for {self.name} not loaded yet?') return - index = flow.shm.array['index'] + arr = flow.shm.array + index = arr['index'] + # times = arr['time'] + + # these will be epoch time floats xfirst, xlast = index[0], index[-1] l, lbar, rbar, r = self.bars_range() + view = self.view if ( @@ -1194,6 +1207,7 @@ class ChartPlotWidget(pg.PlotWidget): name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, overlay: bool = False, @@ -1206,10 +1220,7 @@ class ChartPlotWidget(pg.PlotWidget): **graphics_kwargs, - ) -> tuple[ - pg.GraphicsObject, - str, - ]: + ) -> Flow: ''' Draw a "curve" (line plot graphics) for the provided data in the input shm array ``shm``. @@ -1243,14 +1254,17 @@ class ChartPlotWidget(pg.PlotWidget): **graphics_kwargs, ) - self._flows[data_key] = Flow( - name=name, - plot=pi, - _shm=shm, + flow = self._flows[data_key] = Flow( + data_key, + pi, + shm, + flume, + is_ohlc=is_ohlc, # register curve graphics with this flow graphics=graphics, ) + assert isinstance(flow.shm, ShmArray) # TODO: this probably needs its own method? if overlay: @@ -1307,24 +1321,26 @@ class ChartPlotWidget(pg.PlotWidget): # understand. pi.addItem(graphics) - return graphics, data_key + return flow def draw_ohlc( self, name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, **draw_curve_kwargs, - ) -> (pg.GraphicsObject, str): + ) -> Flow: ''' Draw OHLC datums to chart. ''' return self.draw_curve( - name=name, - shm=shm, + name, + shm, + flume, array_key=array_key, is_ohlc=True, **draw_curve_kwargs, @@ -1389,37 +1405,6 @@ class ChartPlotWidget(pg.PlotWidget): self.sig_mouse_leave.emit(self) self.scene().leaveEvent(ev) - def get_index(self, time: float) -> int: - - # TODO: this should go onto some sort of - # data-view thinger..right? - ohlc = self._flows[self.name].shm.array - - # XXX: not sure why the time is so off here - # looks like we're gonna have to do some fixing.. - indexes = ohlc['time'] >= time - - if any(indexes): - return ohlc['index'][indexes][-1] - else: - return ohlc['index'][-1] - - def in_view( - self, - array: np.ndarray, - - ) -> np.ndarray: - ''' - Slice an input struct array providing only datums - "in view" of this chart. - - ''' - l, lbar, rbar, r = self.bars_range() - ifirst = array[0]['index'] - # slice data by offset from the first index - # available in the passed datum set. - return array[lbar - ifirst:(rbar - ifirst) + 1] - def maxmin( self, name: Optional[str] = None, diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 743460b9..a9bb5406 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -1191,6 +1191,7 @@ async def display_symbol_data( hist_chart = hist_linked.plot_ohlc_main( symbol, hist_ohlcv, + flume, # in the case of history chart we explicitly set `False` # to avoid internal pane creation. # sidepane=False, @@ -1204,6 +1205,7 @@ async def display_symbol_data( rt_chart = rt_linked.plot_ohlc_main( symbol, ohlcv, + flume, # in the case of history chart we explicitly set `False` # to avoid internal pane creation. sidepane=pp_pane, @@ -1275,9 +1277,10 @@ async def display_symbol_data( hist_pi.hideAxis('left') hist_pi.hideAxis('bottom') - curve, _ = hist_chart.draw_curve( - name=fqsn, - shm=hist_ohlcv, + flow = hist_chart.draw_curve( + fqsn, + hist_ohlcv, + flume, array_key=fqsn, overlay=hist_pi, pi=hist_pi, @@ -1307,9 +1310,10 @@ async def display_symbol_data( rt_pi.hideAxis('left') rt_pi.hideAxis('bottom') - curve, _ = rt_chart.draw_curve( - name=fqsn, - shm=ohlcv, + flow = rt_chart.draw_curve( + fqsn, + ohlcv, + flume, array_key=fqsn, overlay=rt_pi, pi=rt_pi, diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index a2908905..2e04bb37 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF from ..data._sharedmem import ( ShmArray, ) +from ..data.feed import Flume from .._profile import ( pg_profile_enabled, # ms_slower_then, @@ -208,13 +209,16 @@ class Flow(msgspec.Struct): # , frozen=True): ''' name: str plot: pg.PlotItem - graphics: Curve | BarItems _shm: ShmArray + flume: Flume + graphics: Curve | BarItems + + # for tracking y-mn/mx for y-axis auto-ranging yrange: tuple[float, float] = None # in some cases a flow may want to change its - # graphical "type" or, "form" when downsampling, - # normally this is just a plain line. + # graphical "type" or, "form" when downsampling, to + # start this is only ever an interpolation line. ds_graphics: Optional[Curve] = None is_ohlc: bool = False @@ -249,9 +253,9 @@ class Flow(msgspec.Struct): # , frozen=True): # TODO: remove this and only allow setting through # private ``._shm`` attr? - @shm.setter - def shm(self, shm: ShmArray) -> ShmArray: - self._shm = shm + # @shm.setter + # def shm(self, shm: ShmArray) -> ShmArray: + # self._shm = shm def maxmin( self, @@ -318,9 +322,15 @@ class Flow(msgspec.Struct): # , frozen=True): ''' vr = self.plot.viewRect() - return int(vr.left()), int(vr.right()) + return ( + vr.left(), + vr.right(), + ) - def datums_range(self) -> tuple[ + def datums_range( + self, + index_field: str = 'index', + ) -> tuple[ int, int, int, int, int, int ]: ''' @@ -328,6 +338,8 @@ class Flow(msgspec.Struct): # , frozen=True): ''' l, r = self.view_range() + l = round(l) + r = round(r) # TODO: avoid this and have shm passed # in earlier. @@ -348,15 +360,23 @@ class Flow(msgspec.Struct): # , frozen=True): def read( self, array_field: Optional[str] = None, + index_field: str = 'index', ) -> tuple[ int, int, np.ndarray, int, int, np.ndarray, ]: - # read call + ''' + Read the underlying shm array buffer and + return the data plus indexes for the first + and last + which has been written to. + + ''' + # readable data array = self.shm.array - indexes = array['index'] + indexes = array[index_field] ifirst = indexes[0] ilast = indexes[-1] diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 370eeb02..29162635 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -43,6 +43,7 @@ from ..data._sharedmem import ( try_read, ) from ..data.feed import Flume +from ..data._source import Symbol from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -213,7 +214,7 @@ async def open_fsp_actor_cluster( async def run_fsp_ui( linkedsplits: LinkedSplits, - shm: ShmArray, + flume: Flume, started: trio.Event, target: Fsp, conf: dict[str, dict], @@ -250,9 +251,11 @@ async def run_fsp_ui( else: chart = linkedsplits.subplots[overlay_with] + shm = flume.rt_shm chart.draw_curve( - name=name, - shm=shm, + name, + shm, + flume, overlay=True, color='default_light', array_key=name, @@ -262,8 +265,9 @@ async def run_fsp_ui( else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( - name=name, - shm=shm, + name, + shm, + flume, array_key=name, sidepane=sidepane, @@ -353,6 +357,9 @@ async def run_fsp_ui( # last = time.time() +# TODO: maybe this should be our ``Flow`` type since it maps +# one flume to the next? The machinery for task/actor mgmt should +# be part of the instantiation API? class FspAdmin: ''' Client API for orchestrating FSP actors and displaying @@ -376,6 +383,10 @@ class FspAdmin: tuple[tractor.MsgStream, ShmArray] ] = {} self._flow_registry: dict[_Token, str] = {} + + # TODO: make this a `.src_flume` and add + # a `dst_flume`? + # (=> but then wouldn't this be the most basic `Flow`?) self.flume = flume def rr_next_portal(self) -> tractor.Portal: @@ -389,7 +400,7 @@ class FspAdmin: complete: trio.Event, started: trio.Event, fqsn: str, - dst_shm: ShmArray, + dst_fsp_flume: Flume, conf: dict, target: Fsp, loglevel: str, @@ -410,9 +421,10 @@ class FspAdmin: # data feed key fqsn=fqsn, + # TODO: pass `Flume.to_msg()`s here? # mems src_shm_token=self.flume.rt_shm.token, - dst_shm_token=dst_shm.token, + dst_shm_token=dst_fsp_flume.rt_shm.token, # target ns_path=ns_path, @@ -429,12 +441,14 @@ class FspAdmin: ctx.open_stream() as stream, ): + dst_fsp_flume.stream: tractor.MsgStream = stream + # register output data self._registry[ (fqsn, ns_path) ] = ( stream, - dst_shm, + dst_fsp_flume.rt_shm, complete ) @@ -469,7 +483,7 @@ class FspAdmin: worker_name: Optional[str] = None, loglevel: str = 'info', - ) -> (ShmArray, trio.Event): + ) -> (Flume, trio.Event): fqsn = self.flume.symbol.fqsn @@ -479,6 +493,26 @@ class FspAdmin: target=target, readonly=True, ) + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + provider_tag = portal.channel.uid + + symbol = Symbol( + key=key, + broker_info={ + provider_tag: {'asset_type': 'fsp'}, + }, + ) + dst_fsp_flume = Flume( + symbol=symbol, + _rt_shm_token=dst_shm.token, + first_quote={}, + + # set to 0 presuming for now that we can't load + # FSP history (though we should eventually). + izero_hist=0, + izero_rt=0, + ) self._flow_registry[( self.flume.rt_shm._token, target.name @@ -489,7 +523,6 @@ class FspAdmin: # f'Already started FSP `{fqsn}:{func_name}`' # ) - portal = self.cluster.get(worker_name) or self.rr_next_portal() complete = trio.Event() started = trio.Event() self.tn.start_soon( @@ -498,13 +531,13 @@ class FspAdmin: complete, started, fqsn, - dst_shm, + dst_fsp_flume, conf, target, loglevel, ) - return dst_shm, started + return dst_fsp_flume, started async def open_fsp_chart( self, @@ -516,7 +549,7 @@ class FspAdmin: ) -> (trio.Event, ChartPlotWidget): - shm, started = await self.start_engine_task( + flume, started = await self.start_engine_task( target, conf, loglevel, @@ -528,7 +561,7 @@ class FspAdmin: run_fsp_ui, self.linked, - shm, + flume, started, target, @@ -636,6 +669,7 @@ async def open_vlm_displays( chart = linked.add_plot( name='volume', shm=shm, + flume=flume, array_key='volume', sidepane=sidepane, @@ -715,7 +749,7 @@ async def open_vlm_displays( tasks_ready = [] # spawn and overlay $ vlm on the same subchart - dvlm_shm, started = await admin.start_engine_task( + dvlm_flume, started = await admin.start_engine_task( dolla_vlm, { # fsp engine conf @@ -812,9 +846,13 @@ async def open_vlm_displays( else: color = 'bracket' - curve, _ = chart.draw_curve( - name=name, - shm=shm, + assert isinstance(shm, ShmArray) + assert isinstance(flume, Flume) + + flow = chart.draw_curve( + name, + shm, + flume, array_key=name, overlay=pi, color=color, @@ -827,20 +865,20 @@ async def open_vlm_displays( # specially store ref to shm for lookup in display loop # since only a placeholder of `None` is entered in # ``.draw_curve()``. - flow = chart._flows[name] + # flow = chart._flows[name] assert flow.plot is pi chart_curves( fields, dvlm_pi, - dvlm_shm, + dvlm_flume.rt_shm, step_mode=True, ) # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # up since this one depends on it. - fr_shm, started = await admin.start_engine_task( + fr_flume, started = await admin.start_engine_task( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', @@ -853,7 +891,7 @@ async def open_vlm_displays( # chart_curves( # dvlm_rate_fields, # dvlm_pi, - # fr_shm, + # fr_flume.rt_shm, # ) # TODO: is there a way to "sync" the dual axes such that only @@ -901,7 +939,7 @@ async def open_vlm_displays( chart_curves( trade_rate_fields, tr_pi, - fr_shm, + fr_flume.rt_shm, # step_mode=True, # dashed line to represent "individual trades" being From bc7fe6114d61c850d9c48980daa1e223b504aa85 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Nov 2022 15:04:16 -0500 Subject: [PATCH 11/13] Adjust order mode to use `Flume.get_index()` --- piker/ui/order_mode.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 7e4ae066..c7b3cc59 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -930,7 +930,6 @@ async def process_trade_msg( ) -> tuple[Dialog, Status]: - get_index = mode.chart.get_index fmsg = pformat(msg) log.debug(f'Received order msg:\n{fmsg}') name = msg['name'] @@ -965,6 +964,9 @@ async def process_trade_msg( oid = msg.oid dialog: Dialog = mode.dialogs.get(oid) + fqsn = dialog.symbol.front_fqsn() + flume = mode.feed.flumes[fqsn] + match msg: case Status( resp='dark_open' | 'open', @@ -1034,10 +1036,11 @@ async def process_trade_msg( # should only be one "fill" for an alert # add a triangle and remove the level line req = Order(**req) + index = flume.get_index(time.time()) mode.on_fill( oid, price=req.price, - arrow_index=get_index(time.time()), + arrow_index=index, ) mode.lines.remove_line(uuid=oid) msg.req = req @@ -1065,26 +1068,27 @@ async def process_trade_msg( action = order.action details = msg.brokerd_msg + # TODO: put the actual exchange timestamp? + # NOTE: currently the ``kraken`` openOrders sub + # doesn't deliver their engine timestamp as part of + # it's schema, so this value is **not** from them + # (see our backend code). We should probably either + # include all provider-engine timestamps in the + # summary 'closed' status msg and/or figure out + # a way to indicate what is a `brokerd` stamp versus + # a true backend one? This will require finagling + # with how each backend tracks/summarizes time + # stamps for the downstream API. + index = flume.get_index( + details['broker_time'] + ) + # TODO: some kinda progress system mode.on_fill( oid, price=details['price'], + arrow_index=index, pointing='up' if action == 'buy' else 'down', - - # TODO: put the actual exchange timestamp - arrow_index=get_index( - # TODO: note currently the ``kraken`` openOrders sub - # doesn't deliver their engine timestamp as part of - # it's schema, so this value is **not** from them - # (see our backend code). We should probably either - # include all provider-engine timestamps in the - # summary 'closed' status msg and/or figure out - # a way to indicate what is a `brokerd` stamp versus - # a true backend one? This will require finagling - # with how each backend tracks/summarizes time - # stamps for the downstream API. - details['broker_time'] - ), ) # TODO: append these fill events to the position's clear From 414866fc6b82dc33c20dddecc83cb8bddb95ca72 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Nov 2022 13:38:57 -0500 Subject: [PATCH 12/13] Assign pnl calc output for use when debugging --- piker/ui/_position.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 6eb1d962..9baca8ee 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -41,7 +41,11 @@ from ._anchors import ( pp_tight_and_right, # wanna keep it straight in the long run gpath_pin, ) -from ..calc import humanize, pnl, puterize +from ..calc import ( + humanize, + pnl, + puterize, +) from ..clearing._allocate import Allocator from ..pp import Position from ..data._normalize import iterticks @@ -105,6 +109,7 @@ async def update_pnl_from_feed( # period = now - last_tick for sym, quote in quotes.items(): + # print(f'{key} PnL: sym:{sym}') # TODO: uggggh we probably want a better state # management then this sincce we want to enable @@ -113,6 +118,10 @@ async def update_pnl_from_feed( if sym != key: continue + # watch out for wrong quote msg-data if you muck + # with backend feed subs code.. + # assert sym == quote['fqsn'] + for tick in iterticks(quote, types): # print(f'{1/period} Hz') @@ -125,14 +134,17 @@ async def update_pnl_from_feed( else: # compute and display pnl status - # print(f'formatting PNL {sym}: {quote}') - order_mode.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - # live.ppu, - order_mode.current_pp.live_pp.ppu, - tick['price'], - ), - ) + pnl_val = ( + copysign(1, size) + * + pnl( + # live.ppu, + order_mode.current_pp.live_pp.ppu, + tick['price'], + ) + ) + # print(f'formatting PNL {sym} => {pnl_val}') + order_mode.pane.pnl_label.format(pnl=pnl_val) # last_tick = time.time() finally: From 07ab853d3d03bb31c7c887318e6913c71d0e182c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Nov 2022 16:41:26 -0500 Subject: [PATCH 13/13] `Order.symbol` is a `str`.. --- piker/ui/order_mode.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index c7b3cc59..1dd49872 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -88,7 +88,7 @@ class Dialog(Struct): # TODO: use ``pydantic.UUID4`` field uuid: str order: Order - symbol: Symbol + symbol: str lines: list[LevelLine] last_status_close: Callable = lambda: None msgs: dict[str, dict] = {} @@ -379,7 +379,7 @@ class OrderMode: dialog = Dialog( uuid=order.oid, order=order, - symbol=order.symbol, + symbol=order.symbol, # XXX: always a str? lines=lines, last_status_close=self.multistatus.open_status( f'submitting {order.exec_mode}-{order.action}', @@ -964,8 +964,9 @@ async def process_trade_msg( oid = msg.oid dialog: Dialog = mode.dialogs.get(oid) - fqsn = dialog.symbol.front_fqsn() - flume = mode.feed.flumes[fqsn] + if dialog: + fqsn = dialog.symbol + flume = mode.feed.flumes[fqsn] match msg: case Status(