From 4ac65a93ae3b3da0d3f39105acc0e58fd873676d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Mar 2022 17:02:49 -0500 Subject: [PATCH] Factor sync part of graphics update into func, add `trigger_update()`` --- piker/ui/_display.py | 469 ++++++++++++++++++++++++------------------- 1 file changed, 267 insertions(+), 202 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 9957baa4..02c3b83e 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -23,7 +23,7 @@ graphics update methods via our custom ``pyqtgraph`` charting api. ''' from functools import partial import time -from typing import Optional +from typing import Optional, Any import numpy as np import tractor @@ -109,6 +109,12 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view +# actor-local graphics state that can be passed +# to a ``graphic_update_cycle()`` call by any task +# wishing to update the UI. +_ux_state: dict[str, Any] = {} + + async def graphics_update_loop( linked: LinkedSplits, @@ -147,7 +153,7 @@ async def graphics_update_loop( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - vlm_view = vlm_chart.view + # vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) chart.default_view() @@ -183,7 +189,7 @@ async def graphics_update_loop( tick_margin = 3 * tick_size chart.show() - view = chart.view + # view = chart.view last_quote = time.time() i_last = ohlcv.index @@ -210,7 +216,27 @@ async def graphics_update_loop( # async for quotes in iter_drain_quotes(): + _ux_state.update({ + 'quotes': {}, + 'linked': linked, + 'maxmin': maxmin, + 'tick_margin': tick_margin, + 'ohlcv': ohlcv, + 'chart': chart, + 'last_price_sticky': last_price_sticky, + 'vlm_chart': vlm_chart, + 'i_last': i_last, + 'last_mx_vlm': last_mx_vlm, + 'vlm_sticky': vlm_sticky, + 'l1': l1, + 'last_mx': last_mx, + 'last_mn': last_mn, + }) + async for quotes in stream: + + _ux_state['quotes'] = quotes + quote_period = time.time() - last_quote quote_rate = round( 1/quote_period, 1) if quote_period > 0 else float('inf') @@ -231,222 +257,259 @@ async def graphics_update_loop( chart.pause_all_feeds() continue - for sym, quote in quotes.items(): + # sync call to update all graphics/UX components. + graphics_update_cycle(**_ux_state) - ( - brange, - mx_in_view, - mn_in_view, - mx_vlm_in_view, - ) = maxmin() - l, lbar, rbar, r = brange - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin - # NOTE: vlm may be written by the ``brokerd`` backend - # event though a tick sample is not emitted. - # TODO: show dark trades differently - # https://github.com/pikers/piker/issues/116 - array = ohlcv.array +def trigger_update() -> None: + ''' + Manually trigger a graphics update from global state. - # NOTE: this used to be implemented in a dedicated - # "increment tas": ``check_for_new_bars()`` but it doesn't - # make sense to do a whole task switch when we can just do - # this simple index-diff and all the fsp sub-curve graphics - # are diffed on each draw cycle anyway; so updates to the - # "curve" length is already automatic. + Generally used from remote actors who wish to trigger a UI refresh. - # increment the view position by the sample offset. - i_step = ohlcv.index - i_diff = i_step - i_last - if i_diff > 0: - chart.increment_view( - steps=i_diff, - ) - i_last = i_step + ''' + assert _ux_state is not None, 'graphics engine not initialized?' + graphics_update_cycle(**_ux_state) - if vlm_chart: - vlm_chart.update_curve_from_array('volume', array) - vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) - if ( - mx_vlm_in_view != last_mx_vlm or - mx_vlm_in_view > last_mx_vlm - ): - # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vlm_view._set_yrange( - yrange=(0, mx_vlm_in_view * 1.375) - ) - last_mx_vlm = mx_vlm_in_view +def graphics_update_cycle( + quotes, + linked, + maxmin, + tick_margin, + ohlcv, + chart, + last_price_sticky, + vlm_chart, + i_last, + last_mx_vlm, + vlm_sticky, + l1, - for curve_name, flow in vlm_chart._flows.items(): - update_fsp_chart( - vlm_chart, - flow.shm, - curve_name, - array_key=curve_name, - ) - # is this even doing anything? - flow.plot.vb._set_yrange( - autoscale_linked_plots=False, - name=curve_name, - ) + last_mx, + last_mn, - ticks_frame = quote.get('ticks', ()) + wap_in_history: bool = False, + # vlm_view, - frames_by_type: dict[str, dict] = {} - lasts = {} +) -> None: - # build tick-type "frames" of tick sequences since - # likely the tick arrival rate is higher then our - # (throttled) quote stream rate. - for tick in ticks_frame: - price = tick.get('price') - ticktype = tick.get('type') + for sym, quote in quotes.items(): - if ticktype == 'n/a' or price == -1: - # okkk.. - continue + ( + brange, + mx_in_view, + mn_in_view, + mx_vlm_in_view, + ) = maxmin() + l, lbar, rbar, r = brange + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin - # keys are entered in olded-event-inserted-first order - # since we iterate ``ticks_frame`` in standard order - # above. in other words the order of the keys is the order - # of tick events by type from the provider feed. - frames_by_type.setdefault(ticktype, []).append(tick) + # NOTE: vlm may be written by the ``brokerd`` backend + # event though a tick sample is not emitted. + # TODO: show dark trades differently + # https://github.com/pikers/piker/issues/116 + array = ohlcv.array - # overwrites so the last tick per type is the entry - lasts[ticktype] = tick + # NOTE: this used to be implemented in a dedicated + # "increment tas": ``check_for_new_bars()`` but it doesn't + # make sense to do a whole task switch when we can just do + # this simple index-diff and all the fsp sub-curve graphics + # are diffed on each draw cycle anyway; so updates to the + # "curve" length is already automatic. - # from pprint import pformat - # frame_counts = { - # typ: len(frame) for typ, frame in frames_by_type.items() - # } - # print(f'{pformat(frame_counts)}') - # print(f'framed: {pformat(frames_by_type)}') - # print(f'lasts: {pformat(lasts)}') + # increment the view position by the sample offset. + i_step = ohlcv.index + i_diff = i_step - i_last + if i_diff > 0: + chart.increment_view( + steps=i_diff, + ) + i_last = i_step - # TODO: eventually we want to separate out the utrade (aka - # dark vlm prices) here and show them as an additional - # graphic. - clear_types = _tick_groups['clears'] + if vlm_chart: + vlm_chart.update_curve_from_array('volume', array) + vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) - # XXX: if we wanted to iterate in "latest" (i.e. most - # current) tick first order as an optimization where we only - # update from the last tick from each type class. - # last_clear_updated: bool = False - # for typ, tick in reversed(lasts.items()): - - # iterate in FIFO order per frame - for typ, tick in lasts.items(): - - price = tick.get('price') - size = tick.get('size') - - # compute max and min prices (including bid/ask) from - # tick frames to determine the y-range for chart - # auto-scaling. - # TODO: we need a streaming minmax algo here, see def above. - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) - - if typ in clear_types: - - # XXX: if we only wanted to update graphics from the - # "current"/"latest received" clearing price tick - # once (see alt iteration order above). - # if last_clear_updated: - # continue - - # last_clear_updated = True - # we only want to update grahpics from the *last* - # tick event that falls under the "clearing price" - # set. - - # update price sticky(s) - end = array[-1] - last_price_sticky.update_from_data( - *end[['index', 'close']] - ) - - # update ohlc sampled price bars - chart.update_ohlc_from_array( - chart.name, - array, - ) - - if wap_in_history: - # update vwap overlay line - chart.update_curve_from_array('bar_wap', ohlcv.array) - - # L1 book label-line updates - # XXX: is this correct for ib? - # if ticktype in ('trade', 'last'): - # if ticktype in ('last',): # 'size'): - if typ in ('last',): # 'size'): - - label = { - l1.ask_label.fields['level']: l1.ask_label, - l1.bid_label.fields['level']: l1.bid_label, - }.get(price) - - if label is not None: - label.update_fields({'level': price, 'size': size}) - - # TODO: on trades should we be knocking down - # the relevant L1 queue? - # label.size -= size - - # elif ticktype in ('ask', 'asize'): - elif typ in _tick_groups['asks']: - l1.ask_label.update_fields({'level': price, 'size': size}) - - # elif ticktype in ('bid', 'bsize'): - elif typ in _tick_groups['bids']: - l1.bid_label.update_fields({'level': price, 'size': size}) - - # check for y-range re-size if ( - (mx > last_mx) or (mn < last_mn) - and not chart._static_yrange == 'axis' + mx_vlm_in_view != last_mx_vlm or + mx_vlm_in_view > last_mx_vlm ): - # print(f'new y range: {(mn, mx)}') - view._set_yrange( - yrange=(mn, mx), - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + vlm_chart.view._set_yrange( + yrange=(0, mx_vlm_in_view * 1.375) ) + last_mx_vlm = mx_vlm_in_view - last_mx, last_mn = mx, mn - - # run synchronous update on all derived fsp subplots - for name, subchart in linked.subplots.items(): + for curve_name, flow in vlm_chart._flows.items(): update_fsp_chart( - subchart, - subchart._shm, - - # XXX: do we really needs seperate names here? - name, - array_key=name, - ) - subchart.cv._set_yrange() - - # TODO: all overlays on all subplots.. - - # run synchronous update on all derived overlays - for curve_name, flow in chart._flows.items(): - update_fsp_chart( - chart, + vlm_chart, flow.shm, curve_name, array_key=curve_name, ) - # chart.view._set_yrange() + # is this even doing anything? + flow.plot.vb._set_yrange( + autoscale_linked_plots=False, + name=curve_name, + ) - # loop end + ticks_frame = quote.get('ticks', ()) + + frames_by_type: dict[str, dict] = {} + lasts = {} + + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. + for tick in ticks_frame: + price = tick.get('price') + ticktype = tick.get('type') + + if ticktype == 'n/a' or price == -1: + # okkk.. + continue + + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) + + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick + + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print(f'{pformat(frame_counts)}') + # print(f'framed: {pformat(frames_by_type)}') + # print(f'lasts: {pformat(lasts)}') + + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = _tick_groups['clears'] + + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False + # for typ, tick in reversed(lasts.items()): + + # iterate in FIFO order per frame + for typ, tick in lasts.items(): + + price = tick.get('price') + size = tick.get('size') + + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) + + if typ in clear_types: + + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue + + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. + + # update price sticky(s) + end = array[-1] + last_price_sticky.update_from_data( + *end[['index', 'close']] + ) + + # update ohlc sampled price bars + chart.update_ohlc_from_array( + chart.name, + array, + ) + + if wap_in_history: + # update vwap overlay line + chart.update_curve_from_array('bar_wap', ohlcv.array) + + # L1 book label-line updates + # XXX: is this correct for ib? + # if ticktype in ('trade', 'last'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): + + label = { + l1.ask_label.fields['level']: l1.ask_label, + l1.bid_label.fields['level']: l1.bid_label, + }.get(price) + + if label is not None: + label.update_fields({'level': price, 'size': size}) + + # TODO: on trades should we be knocking down + # the relevant L1 queue? + # label.size -= size + + # elif ticktype in ('ask', 'asize'): + elif typ in _tick_groups['asks']: + l1.ask_label.update_fields({'level': price, 'size': size}) + + # elif ticktype in ('bid', 'bsize'): + elif typ in _tick_groups['bids']: + l1.bid_label.update_fields({'level': price, 'size': size}) + + # check for y-range re-size + if ( + (mx > last_mx) or (mn < last_mn) + and not chart._static_yrange == 'axis' + ): + # print(f'new y range: {(mn, mx)}') + chart.view._set_yrange( + yrange=(mn, mx), + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + ) + + last_mx, last_mn = mx, mn + + # run synchronous update on all derived fsp subplots + for name, subchart in linked.subplots.items(): + update_fsp_chart( + subchart, + subchart._shm, + + # XXX: do we really needs seperate names here? + name, + array_key=name, + ) + subchart.cv._set_yrange() + + # TODO: all overlays on all subplots.. + + # run synchronous update on all derived overlays + for curve_name, flow in chart._flows.items(): + update_fsp_chart( + chart, + flow.shm, + curve_name, + array_key=curve_name, + ) + # chart.view._set_yrange() + + # loop end async def display_symbol_data( @@ -479,23 +542,24 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) - async with open_feed( - ['.'.join((sym, provider))], - loglevel=loglevel, - # limit to at least display's FPS - # avoiding needless Qt-in-guest-mode context switches - tick_throttle=_quote_throttle_rate, + async with open_feed( + provider, + [sym], + loglevel=loglevel, + + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches + tick_throttle=_quote_throttle_rate, ) as feed: ohlcv: ShmArray = feed.shm bars = ohlcv.array symbol = feed.symbols[sym] - fqsn = symbol.front_fqsn() # load in symbol's ohlc data godwidget.window.setWindowTitle( - f'{fqsn} ' + f'{symbol.key}@{symbol.brokers} ' f'tick:{symbol.tick_size} ' f'step:1s ' ) @@ -580,7 +644,8 @@ async def display_symbol_data( open_order_mode( feed, chart, - fqsn, + symbol, + provider, order_mode_started ) ):