From 590e08a4d48b0baeeed621be2cdcc8e691336956 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Sep 2021 07:56:14 -0400 Subject: [PATCH] Process framed ticks by type in main graphics loop We are already packing framed ticks in extended lists from the `.data._sampling.uniform_rate_send()` task so the natural solution to avoid needless graphics cycles for HFT-ish feeds (like binance) is to unpack those frames and for most cases only update graphics with the "latest" data per loop iteration. Unpacking in this way also lessens nested-iterations per tick type. Btw, this also effectively solves all remaining issues of fast tick feeds over-triggering the graphics loop renders as long as the original quote stream is throttled appropriately, usually to the local display rate. Relates to #183, #192 Dirty deats: - drop all per-tick rate checks, they were always somewhat pointless when iterating a frame of ticks per render cycle XD. - unpack tick frame into ticks per frame type, and last of each type; the lasts are used to update each part of the UI/graphics by class. - only skip the label update if we can't retrieve the last from from a graphics source array; it seems `chart.update_curve_from_array()` already does a `len` check internally. - add some draft commented code for tick type classes and a possible wire framed tick data structure. - move `chart_maxmin()` range computer to module level, bind a chart to it with a `partial.` - only check rate limits in main quote loop thus reporting actual overages - add in commented logic for only updating the "last" cleared price from the most recent framed value if we want to eventually (right now seems like this is only relevant to ib and it's dark trades: `utrade`). - rename `_clear_throttle_rate` -> `_quote_throttle_rate`, drop `_book_throttle_rate`. --- piker/ui/_app.py | 6 +- piker/ui/_display.py | 325 ++++++++++++++++++++++++++----------------- 2 files changed, 201 insertions(+), 130 deletions(-) diff --git a/piker/ui/_app.py b/piker/ui/_app.py index ab96f45d..78db608c 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -85,11 +85,11 @@ async def _async_main( screen = godwidget.window.current_screen() # configure graphics update throttling based on display refresh rate - _display._clear_throttle_rate = min( + _display._quote_throttle_rate = min( round(screen.refreshRate()), - _display._clear_throttle_rate, + _display._quote_throttle_rate, ) - log.info(f'Set graphics update rate to {_display._clear_throttle_rate} Hz') + log.info(f'Set graphics update rate to {_display._quote_throttle_rate} Hz') # TODO: do styling / themeing setup # _style.style_ze_sheets(godwidget) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index b51db7d8..e3c6f342 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -15,11 +15,13 @@ # along with this program. If not, see . ''' -Real-time display tasks for charting / graphics. +real-time display tasks for charting graphics update. + +this module ties together quote and computational (fsp) streams with +graphics update methods via our custom ``pyqtgraph`` charting api. ''' from contextlib import asynccontextmanager -# from pprint import pformat from functools import partial import time from types import ModuleType @@ -54,25 +56,46 @@ from ..log import get_logger log = get_logger(__name__) -# TODO: load these from a config.toml! -_clear_throttle_rate: int = 58 # Hz -_book_throttle_rate: int = 16 # Hz +# TODO: load this from a config.toml! +_quote_throttle_rate: int = 58 # Hz def update_fsp_chart( chart: ChartPlotWidget, shm: ShmArray, - display_name: str, + graphics_name: str, array_key: Optional[str], ) -> None: array = shm.array - # XXX: is this a problem any more after porting to the - # ``tractor.Context`` api or can we remove it? + # update graphics + # NOTE: this does a length check internally which allows it + # staying above the last row check below.. + chart.update_curve_from_array( + graphics_name, + array, + array_key=array_key or graphics_name, + ) - # TODO: provide a read sync mechanism to avoid this polling. the + try: + last_row = array[-1] + except IndexError: + # XXX: race condition with backfilling shm. + # + # the underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the indices may be updated in such a way that + # a read delivers an empty array (though it seems like we + # *should* be able to prevent that?). also, as and alt and + # something we need anyway, maybe there should be some kind of + # signal that a prepend is taking place and this consumer can + # respond (eg. redrawing graphics) accordingly. + log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') + return + + # TODO: provide a read sync mechanism to avoid this polling. the # underlying issue is that a backfill (aka prepend) and subsequent # shm array first/last index update could result in an empty array # read here since the stream is never torn down on the re-compute @@ -89,23 +112,75 @@ def update_fsp_chart( # read_tries -= 1 # continue - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=array_key or display_name, - ) + # XXX: re: ``array_key``: fsp func names must be unique meaning we + # can't have duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. - last_val_sticky = chart._ysticks.get(display_name) + # read from last calculated value and update any label + last_val_sticky = chart._ysticks.get(graphics_name) if last_val_sticky: - # read from last calculated value - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - array = shm.array[array_key] - if len(array): - value = array[-1] - last_val_sticky.update_from_data(-1, value) + # array = shm.array[array_key] + # if len(array): + # value = array[-1] + last = last_row[array_key] + last_val_sticky.update_from_data(-1, last) + + +# _clses = { +# 'clears': {'trade', 'utrade', 'last'}, +# 'last': {'last'}, +# 'bids': {'bid', 'bsize'}, +# 'asks': {'ask', 'asize'}, +# } + +# XXX: idea for frame type data structure we could use on the +# wire instead of doing it here? +# frames = { +# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'], + +# 'type_a': [tick0, tick1, tick2, .., tickn], +# 'type_b': [tick0, tick1, tick2, .., tickn], +# 'type_c': [tick0, tick1, tick2, .., tickn], +# ... +# 'type_n': [tick0, tick1, tick2, .., tickn], +# } + + +def chart_maxmin( + chart: ChartPlotWidget, + vlm_chart: Optional[ChartPlotWidget] = None, + +) -> tuple[ + tuple[int, int, int, int], + float, + float, + float, +]: + # TODO: implement this + # https://arxiv.org/abs/cs/0610046 + # https://github.com/lemire/pythonmaxmin + + array = chart._arrays['ohlc'] + ifirst = array[0]['index'] + + last_bars_range = chart.bars_range() + l, lbar, rbar, r = last_bars_range + in_view = array[lbar - ifirst:rbar - ifirst + 1] + + assert in_view.size + + mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) + + # TODO: when we start using line charts, probably want to make + # this an overloaded call on our `DataView + # sym = chart.name + # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) + + mx_vlm_in_view = 0 + if vlm_chart: + mx_vlm_in_view = np.max(in_view['volume']) + + return last_bars_range, mx, max(mn, 0), mx_vlm_in_view async def update_chart_from_quotes( @@ -144,32 +219,7 @@ async def update_chart_from_quotes( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - def maxmin(): - # TODO: implement this - # https://arxiv.org/abs/cs/0610046 - # https://github.com/lemire/pythonmaxmin - - array = chart._arrays['ohlc'] - ifirst = array[0]['index'] - - last_bars_range = chart.bars_range() - l, lbar, rbar, r = last_bars_range - in_view = array[lbar - ifirst:rbar - ifirst + 1] - - assert in_view.size - - mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) - - # TODO: when we start using line charts, probably want to make - # this an overloaded call on our `DataView - # sym = chart.name - # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) - - mx_vlm_in_view = 0 - if vlm_chart: - mx_vlm_in_view = np.max(in_view['volume']) - - return last_bars_range, mx, max(mn, 0), mx_vlm_in_view + maxmin = partial(chart_maxmin, chart, vlm_chart) chart.default_view() @@ -203,21 +253,27 @@ async def update_chart_from_quotes( tick_size = chart.linked.symbol.tick_size tick_margin = 3 * tick_size - last_ask = last_bid = last_clear = time.time() chart.show() + last_quote = time.time() + # NOTE: all code below this loop is expected to be synchronous + # and thus draw instructions are not picked up jntil the next + # wait / iteration. async for quotes in stream: - # chart isn't actively shown so just skip render cycle + now = time.time() + quote_period = now - last_quote + if quote_period <= 1/_quote_throttle_rate: + log.warning(f'TOO FAST: {1/quote_period}') + last_quote = now + + # chart isn't active/shown so skip render cycle and pause feed(s) if chart.linked.isHidden(): await chart.pause_all_feeds() continue for sym, quote in quotes.items(): - now = time.time() - - # brange, mx_in_view, mn_in_view = maxmin() ( brange, mx_in_view, @@ -254,30 +310,75 @@ async def update_chart_from_quotes( vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) last_mx_vlm = mx_vlm_in_view - for tick in quote.get('ticks', ()): + ticks_frame = quote.get('ticks', ()) - # log.info( - # f"quotes: {pformat(quote['symbol'])}: {pformat(tick)}") - ticktype = tick.get('type') + frames_by_type: dict[str, dict] = {} + lasts = {} + + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. + for tick in ticks_frame: price = tick.get('price') - size = tick.get('size') + ticktype = tick.get('type') if ticktype == 'n/a' or price == -1: # okkk.. continue - # clearing price event - if ticktype in ('trade', 'utrade', 'last'): + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) - # throttle clearing price updates to ~ max 60 FPS - period = now - last_clear - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - continue + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick - # print(f'passthrough {tick}\n{1/(now-last_clear)}') - # set time of last graphics update - last_clear = now + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print(f'{pformat(frame_counts)}') + # print(f'framed: {pformat(frames_by_type)}') + # print(f'lasts: {pformat(lasts)}') + + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = {'trade', 'utrade', 'last'} + + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False + # for typ, tick in reversed(lasts.items()): + + # iterate in FIFO order per frame + for typ, tick in lasts.items(): + + price = tick.get('price') + size = tick.get('size') + + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) + + if typ in clear_types: + + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue + + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. # update price sticky(s) end = array[-1] @@ -295,33 +396,11 @@ async def update_chart_from_quotes( # update vwap overlay line chart.update_curve_from_array('bar_wap', ohlcv.array) - # l1 book events - # throttle the book graphics updates at a lower rate - # since they aren't as critical for a manual user - # viewing the chart - - elif ticktype in ('ask', 'asize'): - if (now - last_ask) <= 1/_book_throttle_rate: - # print(f'skipping\n{tick}') - continue - - # print(f'passthrough {tick}\n{1/(now-last_ask)}') - last_ask = now - - elif ticktype in ('bid', 'bsize'): - if (now - last_bid) <= 1/_book_throttle_rate: - continue - - # print(f'passthrough {tick}\n{1/(now-last_bid)}') - last_bid = now - - # compute max and min trade values to display in view - # TODO: we need a streaming minmax algorithm here, see - # def above. - - # XXX: prettty sure this is correct? + # L1 book label-line updates + # XXX: is this correct for ib? # if ticktype in ('trade', 'last'): - if ticktype in ('last',): # 'size'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): label = { l1.ask_label.fields['level']: l1.ask_label, @@ -331,38 +410,34 @@ async def update_chart_from_quotes( if label is not None: label.update_fields({'level': price, 'size': size}) - # on trades should we be knocking down + # TODO: on trades should we be knocking down # the relevant L1 queue? # label.size -= size - elif ticktype in ('ask', 'asize'): + # elif ticktype in ('ask', 'asize'): + elif typ in ('ask', 'asize'): l1.ask_label.update_fields({'level': price, 'size': size}) - elif ticktype in ('bid', 'bsize'): + # elif ticktype in ('bid', 'bsize'): + elif typ in ('bid', 'bsize'): l1.bid_label.update_fields({'level': price, 'size': size}) - # in view y-range checking for auto-scale - # update the max/min price in view to keep bid/ask on screen - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) - if (mx > last_mx) or ( - mn < last_mn - ): - # print(f'new y range: {(mn, mx)}') - chart._set_yrange( - yrange=(mn, mx), - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, - ) + # check for y-range re-size + if (mx > last_mx) or (mn < last_mn): + # print(f'new y range: {(mn, mx)}') + chart._set_yrange( + yrange=(mn, mx), + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + ) - last_mx, last_mn = mx, mn + last_mx, last_mn = mx, mn # run synchronous update on all derived fsp subplots - # print(f'subplots: {linked.subplots.keys()}') for name, subchart in linked.subplots.items(): update_fsp_chart( subchart, @@ -376,19 +451,15 @@ async def update_chart_from_quotes( # TODO: all overlays on all subplots.. # run synchronous update on all derived overlays - # print(f'overlays: {chart._overlays}') - for name, shm in chart._overlays.items(): + for curve_name, shm in chart._overlays.items(): update_fsp_chart( chart, shm, - - # XXX: do we really needs seperate names here? - name, - array_key=name, + curve_name, + array_key=curve_name, ) - def maybe_mk_fsp_shm( sym: str, field_name: str, @@ -697,7 +768,7 @@ async def update_chart_from_fsp( now = time.time() period = now - last - if period <= 1/_clear_throttle_rate: + if period <= 1/_quote_throttle_rate: # faster then display refresh rate print(f'fsp too fast: {1/period}') continue @@ -915,7 +986,7 @@ async def display_symbol_data( loglevel=loglevel, # 60 FPS to limit context switches - tick_throttle=_clear_throttle_rate, + tick_throttle=_quote_throttle_rate, ) as feed, ):