From 31735f26d38a37cd4713e598634cca81b50e1af3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Sep 2022 16:25:54 -0400 Subject: [PATCH] Poll for sampling info at startup, tolerate races Use the new `Feed.get_ds_info()` method in a poll loop to definitively get the inter-chart sampling info and avoid races with shm buffer backfilling. Also, factor the history increment closure-task into `graphics_update_loop()` which will make it clearer how to factor all the "should we update" logic into some `DisplayState` API. --- piker/ui/_display.py | 152 ++++++++++++++++++++++--------------------- 1 file changed, 78 insertions(+), 74 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index c220a68e..c24d1d9c 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -26,14 +26,15 @@ from functools import partial import time from typing import Optional, Any, Callable -import numpy as np import tractor import trio -import pendulum import pyqtgraph as pg # from .. import brokers -from ..data.feed import open_feed +from ..data.feed import ( + open_feed, + Feed, +) from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, @@ -153,11 +154,9 @@ class DisplayState: async def graphics_update_loop( - linked: LinkedSplits, - stream: tractor.MsgStream, - ohlcv: np.ndarray, - hist_ohlcv: np.ndarray, - + nurse: trio.Nursery, + godwidget: GodWidget, + feed: Feed, wap_in_history: bool = False, vlm_chart: Optional[ChartPlotWidget] = None, @@ -178,10 +177,14 @@ async def graphics_update_loop( # of copying it from last bar's close # - 1-5 sec bar lookback-autocorrection like tws does? # (would require a background history checker task) - godwidget = linked.godwidget + linked: LinkedSplits = godwidget.rt_linked display_rate = godwidget.window.current_screen().refreshRate() chart = linked.chart + hist_chart = godwidget.hist_linked.chart + + ohlcv = feed.rt_shm + hist_ohlcv = feed.hist_shm # update last price sticky last_price_sticky = chart._ysticks[chart.name] @@ -257,7 +260,44 @@ async def graphics_update_loop( chart.default_view() + # TODO: probably factor this into some kinda `DisplayState` + # API that can be reused at least in terms of pulling view + # params (eg ``.bars_range()``). + async def increment_history_view(): + i_last_append = i_last = hist_ohlcv.index + _, hist_step_size_s, _ = feed.get_ds_info() + + async with feed.index_stream( + int(hist_step_size_s) + ) as istream: + async for msg in istream: + + # increment the view position by the sample offset. + uppx = hist_chart.view.x_uppx() + l, lbar, rbar, r = hist_chart.bars_range() + + i_step = hist_ohlcv.index + i_diff = i_step - i_last + i_last = i_step + liv = r >= i_step + append_diff = i_step - i_last_append + do_append = (append_diff >= uppx) + + if do_append: + i_last_append = i_step + + if ( + # i_diff > 0 # no new sample step + do_append + # and uppx < 4 # chart is zoomed out very far + and liv + ): + hist_chart.increment_view(steps=i_diff) + + nurse.start_soon(increment_history_view) + # main real-time quotes update loop + stream: tractor.MsgStream = feed.stream async for quotes in stream: ds.quotes = quotes @@ -736,27 +776,13 @@ async def display_symbol_data( ohlcv: ShmArray = feed.rt_shm hist_ohlcv: ShmArray = feed.hist_shm - times = hist_ohlcv.array['time'] - end = pendulum.from_timestamp(times[-1]) - start = pendulum.from_timestamp(times[times != times[-1]][-1]) - hist_step_size_s = (end - start).seconds - - times = ohlcv.array['time'] - end = pendulum.from_timestamp(times[-1]) - start = pendulum.from_timestamp(times[times != times[-1]][-1]) - rt_step_size_s = (end - start).seconds - - ratio = hist_step_size_s / rt_step_size_s - # this value needs to be pulled once and only once during # startup end_index = feed.startup_hist_index - # bars = ohlcv.array symbol = feed.symbols[sym] fqsn = symbol.front_fqsn() - step_size_s = 1 tf_key = tf_in_1s[step_size_s] @@ -767,8 +793,8 @@ async def display_symbol_data( f'step:{tf_key} ' ) - linked = godwidget.rt_linked - linked._symbol = symbol + rt_linked = godwidget.rt_linked + rt_linked._symbol = symbol # generate order mode side-pane UI # A ``FieldsForm`` form to configure order entry @@ -793,7 +819,7 @@ async def display_symbol_data( ) # create main OHLC chart - chart = linked.plot_ohlc_main( + chart = rt_linked.plot_ohlc_main( symbol, ohlcv, # in the case of history chart we explicitly set `False` @@ -801,7 +827,6 @@ async def display_symbol_data( sidepane=pp_pane, ) - # chart.default_view() chart._feeds[symbol.key] = feed chart.setFocus() @@ -837,6 +862,18 @@ async def display_symbol_data( # a weird placement of the region on the way-far-left.. # region.setClipItem(flow.graphics) + # poll for datums load and timestep detection + for _ in range(10): + try: + _, _, ratio = feed.get_ds_info() + break + except IndexError: + await trio.sleep(0.001) + continue + else: + raise RuntimeError( + 'Failed to detect sampling periods from shm!?') + def update_pi_from_region(): region.setZValue(10) mn, mx = region.getRegion() @@ -877,14 +914,14 @@ async def display_symbol_data( # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to # the linked set *before* the main price chart! - linked.show() - linked.focus() + rt_linked.show() + rt_linked.focus() await trio.sleep(0) - linked.splitter.insertWidget(0, hist_linked) + rt_linked.splitter.insertWidget(0, hist_linked) # XXX: if we wanted it at the bottom? - # linked.splitter.addWidget(hist_linked) - linked.focus() + # rt_linked.splitter.addWidget(hist_linked) + rt_linked.focus() vlm_chart: Optional[ChartPlotWidget] = None async with trio.open_nursery() as ln: @@ -896,7 +933,7 @@ async def display_symbol_data( ): vlm_chart = await ln.start( open_vlm_displays, - linked, + rt_linked, ohlcv, ) @@ -904,7 +941,7 @@ async def display_symbol_data( # from an input config. ln.start_soon( start_fsp_displays, - linked, + rt_linked, ohlcv, loading_sym_key, loglevel, @@ -913,46 +950,13 @@ async def display_symbol_data( # start graphics update loop after receiving first live quote ln.start_soon( graphics_update_loop, - linked, - feed.stream, - ohlcv, - hist_ohlcv, + ln, + godwidget, + feed, wap_in_history, vlm_chart, ) - async def increment_history_view(): - i_last_append = i_last = hist_ohlcv.index - - async with feed.index_stream( - int(hist_step_size_s) - ) as istream: - async for msg in istream: - - # increment the view position by the sample offset. - uppx = hist_chart.view.x_uppx() - l, lbar, rbar, r = hist_chart.bars_range() - - i_step = hist_ohlcv.index - i_diff = i_step - i_last - i_last = i_step - liv = r >= i_step - append_diff = i_step - i_last_append - do_append = (append_diff >= uppx) - - if do_append: - i_last_append = i_step - - if ( - # i_diff > 0 # no new sample step - do_append - # and uppx < 4 # chart is zoomed out very far - and liv - ): - hist_chart.increment_view(steps=i_diff) - - ln.start_soon(increment_history_view) - await trio.sleep(0) # size view to data prior to order mode init @@ -973,19 +977,19 @@ async def display_symbol_data( # let Qt run to render all widgets and make sure the # sidepanes line up vertically. await trio.sleep(0) - linked.resize_sidepanes() - linked.set_split_sizes() + rt_linked.resize_sidepanes() + rt_linked.set_split_sizes() # NOTE: we pop the volume chart from the subplots set so # that it isn't double rendered in the display loop # above since we do a maxmin calc on the volume data to # determine if auto-range adjustements should be made. - # linked.subplots.pop('volume', None) + # rt_linked.subplots.pop('volume', None) # TODO: make this not so shit XD # close group status sbar._status_groups[loading_sym_key][1]() # let the app run.. bby - # linked.graphics_cycle() + # rt_linked.graphics_cycle() await trio.sleep_forever()