diff --git a/piker/ui/_app.py b/piker/ui/_app.py index ab96f45d..78db608c 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -85,11 +85,11 @@ async def _async_main( screen = godwidget.window.current_screen() # configure graphics update throttling based on display refresh rate - _display._clear_throttle_rate = min( + _display._quote_throttle_rate = min( round(screen.refreshRate()), - _display._clear_throttle_rate, + _display._quote_throttle_rate, ) - log.info(f'Set graphics update rate to {_display._clear_throttle_rate} Hz') + log.info(f'Set graphics update rate to {_display._quote_throttle_rate} Hz') # TODO: do styling / themeing setup # _style.style_ze_sheets(godwidget) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index b51db7d8..e3c6f342 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -15,11 +15,13 @@ # along with this program. If not, see . ''' -Real-time display tasks for charting / graphics. +real-time display tasks for charting graphics update. + +this module ties together quote and computational (fsp) streams with +graphics update methods via our custom ``pyqtgraph`` charting api. ''' from contextlib import asynccontextmanager -# from pprint import pformat from functools import partial import time from types import ModuleType @@ -54,25 +56,46 @@ from ..log import get_logger log = get_logger(__name__) -# TODO: load these from a config.toml! -_clear_throttle_rate: int = 58 # Hz -_book_throttle_rate: int = 16 # Hz +# TODO: load this from a config.toml! +_quote_throttle_rate: int = 58 # Hz def update_fsp_chart( chart: ChartPlotWidget, shm: ShmArray, - display_name: str, + graphics_name: str, array_key: Optional[str], ) -> None: array = shm.array - # XXX: is this a problem any more after porting to the - # ``tractor.Context`` api or can we remove it? + # update graphics + # NOTE: this does a length check internally which allows it + # staying above the last row check below.. + chart.update_curve_from_array( + graphics_name, + array, + array_key=array_key or graphics_name, + ) - # TODO: provide a read sync mechanism to avoid this polling. the + try: + last_row = array[-1] + except IndexError: + # XXX: race condition with backfilling shm. + # + # the underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the indices may be updated in such a way that + # a read delivers an empty array (though it seems like we + # *should* be able to prevent that?). also, as and alt and + # something we need anyway, maybe there should be some kind of + # signal that a prepend is taking place and this consumer can + # respond (eg. redrawing graphics) accordingly. + log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') + return + + # TODO: provide a read sync mechanism to avoid this polling. the # underlying issue is that a backfill (aka prepend) and subsequent # shm array first/last index update could result in an empty array # read here since the stream is never torn down on the re-compute @@ -89,23 +112,75 @@ def update_fsp_chart( # read_tries -= 1 # continue - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=array_key or display_name, - ) + # XXX: re: ``array_key``: fsp func names must be unique meaning we + # can't have duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. - last_val_sticky = chart._ysticks.get(display_name) + # read from last calculated value and update any label + last_val_sticky = chart._ysticks.get(graphics_name) if last_val_sticky: - # read from last calculated value - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - array = shm.array[array_key] - if len(array): - value = array[-1] - last_val_sticky.update_from_data(-1, value) + # array = shm.array[array_key] + # if len(array): + # value = array[-1] + last = last_row[array_key] + last_val_sticky.update_from_data(-1, last) + + +# _clses = { +# 'clears': {'trade', 'utrade', 'last'}, +# 'last': {'last'}, +# 'bids': {'bid', 'bsize'}, +# 'asks': {'ask', 'asize'}, +# } + +# XXX: idea for frame type data structure we could use on the +# wire instead of doing it here? +# frames = { +# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'], + +# 'type_a': [tick0, tick1, tick2, .., tickn], +# 'type_b': [tick0, tick1, tick2, .., tickn], +# 'type_c': [tick0, tick1, tick2, .., tickn], +# ... +# 'type_n': [tick0, tick1, tick2, .., tickn], +# } + + +def chart_maxmin( + chart: ChartPlotWidget, + vlm_chart: Optional[ChartPlotWidget] = None, + +) -> tuple[ + tuple[int, int, int, int], + float, + float, + float, +]: + # TODO: implement this + # https://arxiv.org/abs/cs/0610046 + # https://github.com/lemire/pythonmaxmin + + array = chart._arrays['ohlc'] + ifirst = array[0]['index'] + + last_bars_range = chart.bars_range() + l, lbar, rbar, r = last_bars_range + in_view = array[lbar - ifirst:rbar - ifirst + 1] + + assert in_view.size + + mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) + + # TODO: when we start using line charts, probably want to make + # this an overloaded call on our `DataView + # sym = chart.name + # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) + + mx_vlm_in_view = 0 + if vlm_chart: + mx_vlm_in_view = np.max(in_view['volume']) + + return last_bars_range, mx, max(mn, 0), mx_vlm_in_view async def update_chart_from_quotes( @@ -144,32 +219,7 @@ async def update_chart_from_quotes( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - def maxmin(): - # TODO: implement this - # https://arxiv.org/abs/cs/0610046 - # https://github.com/lemire/pythonmaxmin - - array = chart._arrays['ohlc'] - ifirst = array[0]['index'] - - last_bars_range = chart.bars_range() - l, lbar, rbar, r = last_bars_range - in_view = array[lbar - ifirst:rbar - ifirst + 1] - - assert in_view.size - - mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) - - # TODO: when we start using line charts, probably want to make - # this an overloaded call on our `DataView - # sym = chart.name - # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) - - mx_vlm_in_view = 0 - if vlm_chart: - mx_vlm_in_view = np.max(in_view['volume']) - - return last_bars_range, mx, max(mn, 0), mx_vlm_in_view + maxmin = partial(chart_maxmin, chart, vlm_chart) chart.default_view() @@ -203,21 +253,27 @@ async def update_chart_from_quotes( tick_size = chart.linked.symbol.tick_size tick_margin = 3 * tick_size - last_ask = last_bid = last_clear = time.time() chart.show() + last_quote = time.time() + # NOTE: all code below this loop is expected to be synchronous + # and thus draw instructions are not picked up jntil the next + # wait / iteration. async for quotes in stream: - # chart isn't actively shown so just skip render cycle + now = time.time() + quote_period = now - last_quote + if quote_period <= 1/_quote_throttle_rate: + log.warning(f'TOO FAST: {1/quote_period}') + last_quote = now + + # chart isn't active/shown so skip render cycle and pause feed(s) if chart.linked.isHidden(): await chart.pause_all_feeds() continue for sym, quote in quotes.items(): - now = time.time() - - # brange, mx_in_view, mn_in_view = maxmin() ( brange, mx_in_view, @@ -254,30 +310,75 @@ async def update_chart_from_quotes( vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) last_mx_vlm = mx_vlm_in_view - for tick in quote.get('ticks', ()): + ticks_frame = quote.get('ticks', ()) - # log.info( - # f"quotes: {pformat(quote['symbol'])}: {pformat(tick)}") - ticktype = tick.get('type') + frames_by_type: dict[str, dict] = {} + lasts = {} + + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. + for tick in ticks_frame: price = tick.get('price') - size = tick.get('size') + ticktype = tick.get('type') if ticktype == 'n/a' or price == -1: # okkk.. continue - # clearing price event - if ticktype in ('trade', 'utrade', 'last'): + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) - # throttle clearing price updates to ~ max 60 FPS - period = now - last_clear - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - continue + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick - # print(f'passthrough {tick}\n{1/(now-last_clear)}') - # set time of last graphics update - last_clear = now + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print(f'{pformat(frame_counts)}') + # print(f'framed: {pformat(frames_by_type)}') + # print(f'lasts: {pformat(lasts)}') + + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = {'trade', 'utrade', 'last'} + + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False + # for typ, tick in reversed(lasts.items()): + + # iterate in FIFO order per frame + for typ, tick in lasts.items(): + + price = tick.get('price') + size = tick.get('size') + + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) + + if typ in clear_types: + + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue + + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. # update price sticky(s) end = array[-1] @@ -295,33 +396,11 @@ async def update_chart_from_quotes( # update vwap overlay line chart.update_curve_from_array('bar_wap', ohlcv.array) - # l1 book events - # throttle the book graphics updates at a lower rate - # since they aren't as critical for a manual user - # viewing the chart - - elif ticktype in ('ask', 'asize'): - if (now - last_ask) <= 1/_book_throttle_rate: - # print(f'skipping\n{tick}') - continue - - # print(f'passthrough {tick}\n{1/(now-last_ask)}') - last_ask = now - - elif ticktype in ('bid', 'bsize'): - if (now - last_bid) <= 1/_book_throttle_rate: - continue - - # print(f'passthrough {tick}\n{1/(now-last_bid)}') - last_bid = now - - # compute max and min trade values to display in view - # TODO: we need a streaming minmax algorithm here, see - # def above. - - # XXX: prettty sure this is correct? + # L1 book label-line updates + # XXX: is this correct for ib? # if ticktype in ('trade', 'last'): - if ticktype in ('last',): # 'size'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): label = { l1.ask_label.fields['level']: l1.ask_label, @@ -331,38 +410,34 @@ async def update_chart_from_quotes( if label is not None: label.update_fields({'level': price, 'size': size}) - # on trades should we be knocking down + # TODO: on trades should we be knocking down # the relevant L1 queue? # label.size -= size - elif ticktype in ('ask', 'asize'): + # elif ticktype in ('ask', 'asize'): + elif typ in ('ask', 'asize'): l1.ask_label.update_fields({'level': price, 'size': size}) - elif ticktype in ('bid', 'bsize'): + # elif ticktype in ('bid', 'bsize'): + elif typ in ('bid', 'bsize'): l1.bid_label.update_fields({'level': price, 'size': size}) - # in view y-range checking for auto-scale - # update the max/min price in view to keep bid/ask on screen - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) - if (mx > last_mx) or ( - mn < last_mn - ): - # print(f'new y range: {(mn, mx)}') - chart._set_yrange( - yrange=(mn, mx), - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, - ) + # check for y-range re-size + if (mx > last_mx) or (mn < last_mn): + # print(f'new y range: {(mn, mx)}') + chart._set_yrange( + yrange=(mn, mx), + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + ) - last_mx, last_mn = mx, mn + last_mx, last_mn = mx, mn # run synchronous update on all derived fsp subplots - # print(f'subplots: {linked.subplots.keys()}') for name, subchart in linked.subplots.items(): update_fsp_chart( subchart, @@ -376,19 +451,15 @@ async def update_chart_from_quotes( # TODO: all overlays on all subplots.. # run synchronous update on all derived overlays - # print(f'overlays: {chart._overlays}') - for name, shm in chart._overlays.items(): + for curve_name, shm in chart._overlays.items(): update_fsp_chart( chart, shm, - - # XXX: do we really needs seperate names here? - name, - array_key=name, + curve_name, + array_key=curve_name, ) - def maybe_mk_fsp_shm( sym: str, field_name: str, @@ -697,7 +768,7 @@ async def update_chart_from_fsp( now = time.time() period = now - last - if period <= 1/_clear_throttle_rate: + if period <= 1/_quote_throttle_rate: # faster then display refresh rate print(f'fsp too fast: {1/period}') continue @@ -915,7 +986,7 @@ async def display_symbol_data( loglevel=loglevel, # 60 FPS to limit context switches - tick_throttle=_clear_throttle_rate, + tick_throttle=_quote_throttle_rate, ) as feed, ):