From b75a3310fe711d780d5756c15288fae8d2e4cd7a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Mar 2022 17:02:49 -0500 Subject: [PATCH 01/10] Factor sync part of graphics update into func, add `trigger_update()`` --- piker/ui/_display.py | 469 ++++++++++++++++++++++++------------------- 1 file changed, 267 insertions(+), 202 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 9957baa4..02c3b83e 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -23,7 +23,7 @@ graphics update methods via our custom ``pyqtgraph`` charting api. ''' from functools import partial import time -from typing import Optional +from typing import Optional, Any import numpy as np import tractor @@ -109,6 +109,12 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view +# actor-local graphics state that can be passed +# to a ``graphic_update_cycle()`` call by any task +# wishing to update the UI. +_ux_state: dict[str, Any] = {} + + async def graphics_update_loop( linked: LinkedSplits, @@ -147,7 +153,7 @@ async def graphics_update_loop( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - vlm_view = vlm_chart.view + # vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) chart.default_view() @@ -183,7 +189,7 @@ async def graphics_update_loop( tick_margin = 3 * tick_size chart.show() - view = chart.view + # view = chart.view last_quote = time.time() i_last = ohlcv.index @@ -210,7 +216,27 @@ async def graphics_update_loop( # async for quotes in iter_drain_quotes(): + _ux_state.update({ + 'quotes': {}, + 'linked': linked, + 'maxmin': maxmin, + 'tick_margin': tick_margin, + 'ohlcv': ohlcv, + 'chart': chart, + 'last_price_sticky': last_price_sticky, + 'vlm_chart': vlm_chart, + 'i_last': i_last, + 'last_mx_vlm': last_mx_vlm, + 'vlm_sticky': vlm_sticky, + 'l1': l1, + 'last_mx': last_mx, + 'last_mn': last_mn, + }) + async for quotes in stream: + + _ux_state['quotes'] = quotes + quote_period = time.time() - last_quote quote_rate = round( 1/quote_period, 1) if quote_period > 0 else float('inf') @@ -231,222 +257,259 @@ async def graphics_update_loop( chart.pause_all_feeds() continue - for sym, quote in quotes.items(): + # sync call to update all graphics/UX components. + graphics_update_cycle(**_ux_state) - ( - brange, - mx_in_view, - mn_in_view, - mx_vlm_in_view, - ) = maxmin() - l, lbar, rbar, r = brange - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin - # NOTE: vlm may be written by the ``brokerd`` backend - # event though a tick sample is not emitted. - # TODO: show dark trades differently - # https://github.com/pikers/piker/issues/116 - array = ohlcv.array +def trigger_update() -> None: + ''' + Manually trigger a graphics update from global state. - # NOTE: this used to be implemented in a dedicated - # "increment tas": ``check_for_new_bars()`` but it doesn't - # make sense to do a whole task switch when we can just do - # this simple index-diff and all the fsp sub-curve graphics - # are diffed on each draw cycle anyway; so updates to the - # "curve" length is already automatic. + Generally used from remote actors who wish to trigger a UI refresh. - # increment the view position by the sample offset. - i_step = ohlcv.index - i_diff = i_step - i_last - if i_diff > 0: - chart.increment_view( - steps=i_diff, - ) - i_last = i_step + ''' + assert _ux_state is not None, 'graphics engine not initialized?' + graphics_update_cycle(**_ux_state) - if vlm_chart: - vlm_chart.update_curve_from_array('volume', array) - vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) - if ( - mx_vlm_in_view != last_mx_vlm or - mx_vlm_in_view > last_mx_vlm - ): - # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vlm_view._set_yrange( - yrange=(0, mx_vlm_in_view * 1.375) - ) - last_mx_vlm = mx_vlm_in_view +def graphics_update_cycle( + quotes, + linked, + maxmin, + tick_margin, + ohlcv, + chart, + last_price_sticky, + vlm_chart, + i_last, + last_mx_vlm, + vlm_sticky, + l1, - for curve_name, flow in vlm_chart._flows.items(): - update_fsp_chart( - vlm_chart, - flow.shm, - curve_name, - array_key=curve_name, - ) - # is this even doing anything? - flow.plot.vb._set_yrange( - autoscale_linked_plots=False, - name=curve_name, - ) + last_mx, + last_mn, - ticks_frame = quote.get('ticks', ()) + wap_in_history: bool = False, + # vlm_view, - frames_by_type: dict[str, dict] = {} - lasts = {} +) -> None: - # build tick-type "frames" of tick sequences since - # likely the tick arrival rate is higher then our - # (throttled) quote stream rate. - for tick in ticks_frame: - price = tick.get('price') - ticktype = tick.get('type') + for sym, quote in quotes.items(): - if ticktype == 'n/a' or price == -1: - # okkk.. - continue + ( + brange, + mx_in_view, + mn_in_view, + mx_vlm_in_view, + ) = maxmin() + l, lbar, rbar, r = brange + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin - # keys are entered in olded-event-inserted-first order - # since we iterate ``ticks_frame`` in standard order - # above. in other words the order of the keys is the order - # of tick events by type from the provider feed. - frames_by_type.setdefault(ticktype, []).append(tick) + # NOTE: vlm may be written by the ``brokerd`` backend + # event though a tick sample is not emitted. + # TODO: show dark trades differently + # https://github.com/pikers/piker/issues/116 + array = ohlcv.array - # overwrites so the last tick per type is the entry - lasts[ticktype] = tick + # NOTE: this used to be implemented in a dedicated + # "increment tas": ``check_for_new_bars()`` but it doesn't + # make sense to do a whole task switch when we can just do + # this simple index-diff and all the fsp sub-curve graphics + # are diffed on each draw cycle anyway; so updates to the + # "curve" length is already automatic. - # from pprint import pformat - # frame_counts = { - # typ: len(frame) for typ, frame in frames_by_type.items() - # } - # print(f'{pformat(frame_counts)}') - # print(f'framed: {pformat(frames_by_type)}') - # print(f'lasts: {pformat(lasts)}') + # increment the view position by the sample offset. + i_step = ohlcv.index + i_diff = i_step - i_last + if i_diff > 0: + chart.increment_view( + steps=i_diff, + ) + i_last = i_step - # TODO: eventually we want to separate out the utrade (aka - # dark vlm prices) here and show them as an additional - # graphic. - clear_types = _tick_groups['clears'] + if vlm_chart: + vlm_chart.update_curve_from_array('volume', array) + vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) - # XXX: if we wanted to iterate in "latest" (i.e. most - # current) tick first order as an optimization where we only - # update from the last tick from each type class. - # last_clear_updated: bool = False - # for typ, tick in reversed(lasts.items()): - - # iterate in FIFO order per frame - for typ, tick in lasts.items(): - - price = tick.get('price') - size = tick.get('size') - - # compute max and min prices (including bid/ask) from - # tick frames to determine the y-range for chart - # auto-scaling. - # TODO: we need a streaming minmax algo here, see def above. - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) - - if typ in clear_types: - - # XXX: if we only wanted to update graphics from the - # "current"/"latest received" clearing price tick - # once (see alt iteration order above). - # if last_clear_updated: - # continue - - # last_clear_updated = True - # we only want to update grahpics from the *last* - # tick event that falls under the "clearing price" - # set. - - # update price sticky(s) - end = array[-1] - last_price_sticky.update_from_data( - *end[['index', 'close']] - ) - - # update ohlc sampled price bars - chart.update_ohlc_from_array( - chart.name, - array, - ) - - if wap_in_history: - # update vwap overlay line - chart.update_curve_from_array('bar_wap', ohlcv.array) - - # L1 book label-line updates - # XXX: is this correct for ib? - # if ticktype in ('trade', 'last'): - # if ticktype in ('last',): # 'size'): - if typ in ('last',): # 'size'): - - label = { - l1.ask_label.fields['level']: l1.ask_label, - l1.bid_label.fields['level']: l1.bid_label, - }.get(price) - - if label is not None: - label.update_fields({'level': price, 'size': size}) - - # TODO: on trades should we be knocking down - # the relevant L1 queue? - # label.size -= size - - # elif ticktype in ('ask', 'asize'): - elif typ in _tick_groups['asks']: - l1.ask_label.update_fields({'level': price, 'size': size}) - - # elif ticktype in ('bid', 'bsize'): - elif typ in _tick_groups['bids']: - l1.bid_label.update_fields({'level': price, 'size': size}) - - # check for y-range re-size if ( - (mx > last_mx) or (mn < last_mn) - and not chart._static_yrange == 'axis' + mx_vlm_in_view != last_mx_vlm or + mx_vlm_in_view > last_mx_vlm ): - # print(f'new y range: {(mn, mx)}') - view._set_yrange( - yrange=(mn, mx), - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + vlm_chart.view._set_yrange( + yrange=(0, mx_vlm_in_view * 1.375) ) + last_mx_vlm = mx_vlm_in_view - last_mx, last_mn = mx, mn - - # run synchronous update on all derived fsp subplots - for name, subchart in linked.subplots.items(): + for curve_name, flow in vlm_chart._flows.items(): update_fsp_chart( - subchart, - subchart._shm, - - # XXX: do we really needs seperate names here? - name, - array_key=name, - ) - subchart.cv._set_yrange() - - # TODO: all overlays on all subplots.. - - # run synchronous update on all derived overlays - for curve_name, flow in chart._flows.items(): - update_fsp_chart( - chart, + vlm_chart, flow.shm, curve_name, array_key=curve_name, ) - # chart.view._set_yrange() + # is this even doing anything? + flow.plot.vb._set_yrange( + autoscale_linked_plots=False, + name=curve_name, + ) - # loop end + ticks_frame = quote.get('ticks', ()) + + frames_by_type: dict[str, dict] = {} + lasts = {} + + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. + for tick in ticks_frame: + price = tick.get('price') + ticktype = tick.get('type') + + if ticktype == 'n/a' or price == -1: + # okkk.. + continue + + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) + + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick + + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print(f'{pformat(frame_counts)}') + # print(f'framed: {pformat(frames_by_type)}') + # print(f'lasts: {pformat(lasts)}') + + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = _tick_groups['clears'] + + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False + # for typ, tick in reversed(lasts.items()): + + # iterate in FIFO order per frame + for typ, tick in lasts.items(): + + price = tick.get('price') + size = tick.get('size') + + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) + + if typ in clear_types: + + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue + + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. + + # update price sticky(s) + end = array[-1] + last_price_sticky.update_from_data( + *end[['index', 'close']] + ) + + # update ohlc sampled price bars + chart.update_ohlc_from_array( + chart.name, + array, + ) + + if wap_in_history: + # update vwap overlay line + chart.update_curve_from_array('bar_wap', ohlcv.array) + + # L1 book label-line updates + # XXX: is this correct for ib? + # if ticktype in ('trade', 'last'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): + + label = { + l1.ask_label.fields['level']: l1.ask_label, + l1.bid_label.fields['level']: l1.bid_label, + }.get(price) + + if label is not None: + label.update_fields({'level': price, 'size': size}) + + # TODO: on trades should we be knocking down + # the relevant L1 queue? + # label.size -= size + + # elif ticktype in ('ask', 'asize'): + elif typ in _tick_groups['asks']: + l1.ask_label.update_fields({'level': price, 'size': size}) + + # elif ticktype in ('bid', 'bsize'): + elif typ in _tick_groups['bids']: + l1.bid_label.update_fields({'level': price, 'size': size}) + + # check for y-range re-size + if ( + (mx > last_mx) or (mn < last_mn) + and not chart._static_yrange == 'axis' + ): + # print(f'new y range: {(mn, mx)}') + chart.view._set_yrange( + yrange=(mn, mx), + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + ) + + last_mx, last_mn = mx, mn + + # run synchronous update on all derived fsp subplots + for name, subchart in linked.subplots.items(): + update_fsp_chart( + subchart, + subchart._shm, + + # XXX: do we really needs seperate names here? + name, + array_key=name, + ) + subchart.cv._set_yrange() + + # TODO: all overlays on all subplots.. + + # run synchronous update on all derived overlays + for curve_name, flow in chart._flows.items(): + update_fsp_chart( + chart, + flow.shm, + curve_name, + array_key=curve_name, + ) + # chart.view._set_yrange() + + # loop end async def display_symbol_data( @@ -479,23 +542,24 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) - async with open_feed( - ['.'.join((sym, provider))], - loglevel=loglevel, - # limit to at least display's FPS - # avoiding needless Qt-in-guest-mode context switches - tick_throttle=_quote_throttle_rate, + async with open_feed( + provider, + [sym], + loglevel=loglevel, + + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches + tick_throttle=_quote_throttle_rate, ) as feed: ohlcv: ShmArray = feed.shm bars = ohlcv.array symbol = feed.symbols[sym] - fqsn = symbol.front_fqsn() # load in symbol's ohlc data godwidget.window.setWindowTitle( - f'{fqsn} ' + f'{symbol.key}@{symbol.brokers} ' f'tick:{symbol.tick_size} ' f'step:1s ' ) @@ -580,7 +644,8 @@ async def display_symbol_data( open_order_mode( feed, chart, - fqsn, + symbol, + provider, order_mode_started ) ): From 761b82393977f03397fc9015fb3f8922456be22d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Mar 2022 17:03:37 -0500 Subject: [PATCH 02/10] Always fire a "step/update message" on every fsp history update --- piker/fsp/_engine.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 7e75c283..f1904000 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -127,8 +127,8 @@ async def fsp_compute( # each respective field. fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') - # TODO: nptyping here! - history: Optional[np.ndarray] = None + history: Optional[np.ndarray] = None # TODO: nptyping here! + if fields and len(fields) > 1 and fields: if not isinstance(history_output, dict): raise ValueError( @@ -209,6 +209,12 @@ async def fsp_compute( try: # rt stream async with ctx.open_stream() as stream: + + # always trigger UI refresh after history update, + # see ``piker.ui._fsp.FspAdmin.open_chain()`` and + # ``piker.ui._display.trigger_update()``. + await stream.send(index) + async for processed in out_stream: log.debug(f"{func_name}: {processed}") From ad1bbe74ad14d6cca52fe5757406ef30b1be7a67 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Mar 2022 17:04:03 -0500 Subject: [PATCH 03/10] Manually trigger graphics loops updates on msgs from the fsp chain --- piker/ui/_fsp.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index d56cc2d5..d1076981 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -437,7 +437,13 @@ class FspAdmin: started.set() + from ._display import trigger_update + # wait for graceful shutdown signal + async with stream.subscribe() as stream: + async for msg in stream: + trigger_update() + await complete.wait() async def start_engine_task( From b3efa2874b8fda02b62a2182dd87040aa1479794 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 07:20:17 -0500 Subject: [PATCH 04/10] Facepalm: display state must be linked charts specific --- piker/ui/_chart.py | 5 +++ piker/ui/_display.py | 77 ++++++++++++++++++++------------------------ 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 7938e0d8..adce3a40 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -346,6 +346,11 @@ class LinkedSplits(QWidget): self.layout.setContentsMargins(0, 0, 0, 0) self.layout.addWidget(self.splitter) + # chart-local graphics state that can be passed to + # a ``graphic_update_cycle()`` call by any task wishing to + # update the UI for a given "chart instance". + self.display_state: dict[str, dict] = {} + self._symbol: Symbol = None @property diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 02c3b83e..04d08fc4 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -109,12 +109,6 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view -# actor-local graphics state that can be passed -# to a ``graphic_update_cycle()`` call by any task -# wishing to update the UI. -_ux_state: dict[str, Any] = {} - - async def graphics_update_loop( linked: LinkedSplits, @@ -153,7 +147,6 @@ async def graphics_update_loop( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - # vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) chart.default_view() @@ -216,26 +209,30 @@ async def graphics_update_loop( # async for quotes in iter_drain_quotes(): - _ux_state.update({ + ds = linked.display_state = { 'quotes': {}, 'linked': linked, 'maxmin': maxmin, - 'tick_margin': tick_margin, 'ohlcv': ohlcv, 'chart': chart, 'last_price_sticky': last_price_sticky, 'vlm_chart': vlm_chart, - 'i_last': i_last, - 'last_mx_vlm': last_mx_vlm, 'vlm_sticky': vlm_sticky, 'l1': l1, - 'last_mx': last_mx, - 'last_mn': last_mn, - }) + 'vars': { + 'tick_margin': tick_margin, + 'i_last': i_last, + 'last_mx_vlm': last_mx_vlm, + 'last_mx': last_mx, + 'last_mn': last_mn, + } + } + + # main loop async for quotes in stream: - _ux_state['quotes'] = quotes + ds['quotes'] = quotes quote_period = time.time() - last_quote quote_rate = round( @@ -258,42 +255,28 @@ async def graphics_update_loop( continue # sync call to update all graphics/UX components. - graphics_update_cycle(**_ux_state) - - -def trigger_update() -> None: - ''' - Manually trigger a graphics update from global state. - - Generally used from remote actors who wish to trigger a UI refresh. - - ''' - assert _ux_state is not None, 'graphics engine not initialized?' - graphics_update_cycle(**_ux_state) + graphics_update_cycle(**ds) def graphics_update_cycle( quotes, linked, maxmin, - tick_margin, ohlcv, chart, last_price_sticky, vlm_chart, - i_last, - last_mx_vlm, vlm_sticky, l1, - last_mx, - last_mn, + vars: dict[str, Any], wap_in_history: bool = False, - # vlm_view, ) -> None: + tick_margin = vars['tick_margin'] + for sym, quote in quotes.items(): ( @@ -321,26 +304,26 @@ def graphics_update_cycle( # increment the view position by the sample offset. i_step = ohlcv.index - i_diff = i_step - i_last + i_diff = i_step - vars['i_last'] if i_diff > 0: chart.increment_view( steps=i_diff, ) - i_last = i_step + vars['i_last'] = i_step if vlm_chart: vlm_chart.update_curve_from_array('volume', array) vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) if ( - mx_vlm_in_view != last_mx_vlm or - mx_vlm_in_view > last_mx_vlm + mx_vlm_in_view != vars['last_mx_vlm'] or + mx_vlm_in_view > vars['last_mx_vlm'] ): # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') vlm_chart.view._set_yrange( yrange=(0, mx_vlm_in_view * 1.375) ) - last_mx_vlm = mx_vlm_in_view + vars['last_mx_vlm'] = mx_vlm_in_view for curve_name, flow in vlm_chart._flows.items(): update_fsp_chart( @@ -469,7 +452,7 @@ def graphics_update_cycle( # check for y-range re-size if ( - (mx > last_mx) or (mn < last_mn) + (mx > vars['last_mx']) or (mn < vars['last_mn']) and not chart._static_yrange == 'axis' ): # print(f'new y range: {(mn, mx)}') @@ -483,7 +466,7 @@ def graphics_update_cycle( # range_margin=0.1, ) - last_mx, last_mn = mx, mn + vars['last_mx'], vars['last_mn'] = mx, mn # run synchronous update on all derived fsp subplots for name, subchart in linked.subplots.items(): @@ -507,9 +490,19 @@ def graphics_update_cycle( curve_name, array_key=curve_name, ) - # chart.view._set_yrange() - # loop end + +def trigger_update( + linked: LinkedSplits, + +) -> None: + ''' + Manually trigger a graphics update from global state. + + Generally used from remote actors who wish to trigger a UI refresh. + + ''' + graphics_update_cycle(**linked.display_state) async def display_symbol_data( From 2564acea1b452cffa49d713bc3b7626a7b77ce28 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 07:20:58 -0500 Subject: [PATCH 05/10] Facepalm**2: only update on special "update" msg --- piker/fsp/_engine.py | 2 +- piker/ui/_fsp.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f1904000..951aa186 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -213,7 +213,7 @@ async def fsp_compute( # always trigger UI refresh after history update, # see ``piker.ui._fsp.FspAdmin.open_chain()`` and # ``piker.ui._display.trigger_update()``. - await stream.send(index) + await stream.send('update') async for processed in out_stream: diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index d1076981..41572a58 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -50,6 +50,7 @@ from ._forms import ( mk_form, open_form_input_handling, ) +from . import _display from ..fsp._api import maybe_mk_fsp_shm, Fsp from ..fsp import cascade from ..fsp._volume import ( @@ -437,12 +438,11 @@ class FspAdmin: started.set() - from ._display import trigger_update - # wait for graceful shutdown signal async with stream.subscribe() as stream: async for msg in stream: - trigger_update() + if msg == 'update': + _display.trigger_update(self.linked) await complete.wait() From 30656eda39c61544ddebc49e4ecc1dd3ac5a42ce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 11:08:04 -0500 Subject: [PATCH 06/10] Use a `DisplayState` in the graphics update loop The graphics update loop is much easier to grok when all the UI components which potentially need to be updated on a cycle are arranged together in a high-level composite namespace, thus this new `DisplayState` addition. Create and set this state on each `LinkedSplits` chart set and add a new method `.graphics_cycle()` which let's a caller trigger a graphics loop update manually. Use this method in the fsp graphics manager such that a chain can update new history output even if there is no real-time feed driving the display loop (eg. when a market is "closed"). --- piker/ui/_chart.py | 11 +++- piker/ui/_display.py | 118 ++++++++++++++++++++++++------------------- piker/ui/_fsp.py | 2 +- 3 files changed, 76 insertions(+), 55 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index adce3a40..79abf215 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -19,7 +19,7 @@ High level chart-widget apis. ''' from __future__ import annotations -from typing import Optional +from typing import Optional, TYPE_CHECKING from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import Qt @@ -63,6 +63,8 @@ from ._interaction import ChartView from ._forms import FieldsForm from ._overlay import PlotItemOverlay +if TYPE_CHECKING: + from ._display import DisplayState log = get_logger(__name__) @@ -230,6 +232,7 @@ class GodWidget(QWidget): # chart is already in memory so just focus it linkedsplits.show() linkedsplits.focus() + linkedsplits.graphics_cycle() await trio.sleep(0) # resume feeds *after* rendering chart view asap @@ -349,10 +352,14 @@ class LinkedSplits(QWidget): # chart-local graphics state that can be passed to # a ``graphic_update_cycle()`` call by any task wishing to # update the UI for a given "chart instance". - self.display_state: dict[str, dict] = {} + self.display_state: Optional[DisplayState] = None self._symbol: Symbol = None + def graphics_cycle(self) -> None: + from . import _display + return _display.graphics_update_cycle(self.display_state) + @property def symbol(self) -> Symbol: return self._symbol diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 04d08fc4..ae6d0c77 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,9 +21,10 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' +from dataclasses import dataclass from functools import partial import time -from typing import Optional, Any +from typing import Optional, Any, Callable import numpy as np import tractor @@ -31,6 +32,7 @@ import trio from .. import brokers from ..data.feed import open_feed +from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -109,6 +111,33 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view +@dataclass +class DisplayState: + ''' + Chart-local real-time graphics state container. + + ''' + quotes: dict[str, Any] + + maxmin: Callable + ohlcv: ShmArray + + # high level chart handles + linked: LinkedSplits + chart: ChartPlotWidget + vlm_chart: ChartPlotWidget + + # axis labels + l1: L1Labels + last_price_sticky: YAxisLabel + vlm_sticky: YAxisLabel + + # misc state tracking + vars: dict[str, Any] + + wap_in_history: bool = False + + async def graphics_update_loop( linked: LinkedSplits, @@ -209,7 +238,7 @@ async def graphics_update_loop( # async for quotes in iter_drain_quotes(): - ds = linked.display_state = { + ds = linked.display_state = DisplayState(**{ 'quotes': {}, 'linked': linked, 'maxmin': maxmin, @@ -227,12 +256,12 @@ async def graphics_update_loop( 'last_mx': last_mx, 'last_mn': last_mn, } - } + }) # main loop async for quotes in stream: - ds['quotes'] = quotes + ds.quotes = quotes quote_period = time.time() - last_quote quote_rate = round( @@ -255,36 +284,30 @@ async def graphics_update_loop( continue # sync call to update all graphics/UX components. - graphics_update_cycle(**ds) + graphics_update_cycle(ds) def graphics_update_cycle( - quotes, - linked, - maxmin, - ohlcv, - chart, - last_price_sticky, - vlm_chart, - vlm_sticky, - l1, - - vars: dict[str, Any], - + ds: DisplayState, wap_in_history: bool = False, ) -> None: + # TODO: eventually optimize this whole graphics stack with ``numba`` + # hopefully XD + + # unpack multi-referenced components + chart = ds.chart + vlm_chart = ds.vlm_chart + l1 = ds.l1 + + ohlcv = ds.ohlcv + array = ohlcv.array + vars = ds.vars tick_margin = vars['tick_margin'] - for sym, quote in quotes.items(): - - ( - brange, - mx_in_view, - mn_in_view, - mx_vlm_in_view, - ) = maxmin() + for sym, quote in ds.quotes.items(): + brange, mx_in_view, mn_in_view, mx_vlm_in_view = ds.maxmin() l, lbar, rbar, r = brange mx = mx_in_view + tick_margin mn = mn_in_view - tick_margin @@ -293,7 +316,6 @@ def graphics_update_cycle( # event though a tick sample is not emitted. # TODO: show dark trades differently # https://github.com/pikers/piker/issues/116 - array = ohlcv.array # NOTE: this used to be implemented in a dedicated # "increment tas": ``check_for_new_bars()`` but it doesn't @@ -313,11 +335,11 @@ def graphics_update_cycle( if vlm_chart: vlm_chart.update_curve_from_array('volume', array) - vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) + ds.vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) if ( - mx_vlm_in_view != vars['last_mx_vlm'] or - mx_vlm_in_view > vars['last_mx_vlm'] + mx_vlm_in_view != vars['last_mx_vlm'] + or mx_vlm_in_view > vars['last_mx_vlm'] ): # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') vlm_chart.view._set_yrange( @@ -382,6 +404,12 @@ def graphics_update_cycle( # last_clear_updated: bool = False # for typ, tick in reversed(lasts.items()): + # update ohlc sampled price bars + chart.update_ohlc_from_array( + chart.name, + array, + ) + # iterate in FIFO order per frame for typ, tick in lasts.items(): @@ -410,19 +438,16 @@ def graphics_update_cycle( # update price sticky(s) end = array[-1] - last_price_sticky.update_from_data( + ds.last_price_sticky.update_from_data( *end[['index', 'close']] ) - # update ohlc sampled price bars - chart.update_ohlc_from_array( - chart.name, - array, - ) - if wap_in_history: # update vwap overlay line - chart.update_curve_from_array('bar_wap', ohlcv.array) + chart.update_curve_from_array( + 'bar_wap', + array, + ) # L1 book label-line updates # XXX: is this correct for ib? @@ -436,7 +461,9 @@ def graphics_update_cycle( }.get(price) if label is not None: - label.update_fields({'level': price, 'size': size}) + label.update_fields( + {'level': price, 'size': size} + ) # TODO: on trades should we be knocking down # the relevant L1 queue? @@ -469,7 +496,7 @@ def graphics_update_cycle( vars['last_mx'], vars['last_mn'] = mx, mn # run synchronous update on all derived fsp subplots - for name, subchart in linked.subplots.items(): + for name, subchart in ds.linked.subplots.items(): update_fsp_chart( subchart, subchart._shm, @@ -492,19 +519,6 @@ def graphics_update_cycle( ) -def trigger_update( - linked: LinkedSplits, - -) -> None: - ''' - Manually trigger a graphics update from global state. - - Generally used from remote actors who wish to trigger a UI refresh. - - ''' - graphics_update_cycle(**linked.display_state) - - async def display_symbol_data( godwidget: GodWidget, provider: str, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 41572a58..678b5d8e 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -442,7 +442,7 @@ class FspAdmin: async with stream.subscribe() as stream: async for msg in stream: if msg == 'update': - _display.trigger_update(self.linked) + self.linked.graphics_cycle() await complete.wait() From 8195fae289ebf172596fdafbe8d49e08c741feaa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Apr 2022 18:47:45 -0400 Subject: [PATCH 07/10] Add a `trigger_all` arg to update cycle func; allows hard history updates --- piker/ui/_chart.py | 6 ++++-- piker/ui/_display.py | 35 +++++++++++++++++++++++++++++------ piker/ui/_fsp.py | 9 +++++++-- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 79abf215..3fcaae07 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -356,9 +356,11 @@ class LinkedSplits(QWidget): self._symbol: Symbol = None - def graphics_cycle(self) -> None: + def graphics_cycle(self, **kwargs) -> None: from . import _display - return _display.graphics_update_cycle(self.display_state) + ds = self.display_state + if ds: + return _display.graphics_update_cycle(ds, **kwargs) @property def symbol(self) -> Symbol: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index ae6d0c77..1cb13f52 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -290,6 +290,7 @@ async def graphics_update_loop( def graphics_update_cycle( ds: DisplayState, wap_in_history: bool = False, + trigger_all: bool = False, # flag used by prepend history updates ) -> None: @@ -307,10 +308,6 @@ def graphics_update_cycle( tick_margin = vars['tick_margin'] for sym, quote in ds.quotes.items(): - brange, mx_in_view, mn_in_view, mx_vlm_in_view = ds.maxmin() - l, lbar, rbar, r = brange - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin # NOTE: vlm may be written by the ``brokerd`` backend # event though a tick sample is not emitted. @@ -333,13 +330,39 @@ def graphics_update_cycle( ) vars['i_last'] = i_step + ( + brange, + mx_in_view, + mn_in_view, + mx_vlm_in_view, + ) = ds.maxmin() + + l, lbar, rbar, r = brange + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin + liv = r > i_step # the last datum is in view + + # don't real-time "shift" the curve to the + # left under the following conditions: + if ( + ( + i_diff > 0 # no new sample step + and liv + ) + or trigger_all + ): + # TODO: we should track and compute whether the last + # pixel in a curve should show new data based on uppx + # and then iff update curves and shift? + chart.increment_view(steps=i_diff) + if vlm_chart: vlm_chart.update_curve_from_array('volume', array) ds.vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) if ( - mx_vlm_in_view != vars['last_mx_vlm'] - or mx_vlm_in_view > vars['last_mx_vlm'] + mx_vlm_in_view > vars['last_mx_vlm'] + or trigger_all ): # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') vlm_chart.view._set_yrange( diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 678b5d8e..a1193327 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -50,7 +50,6 @@ from ._forms import ( mk_form, open_form_input_handling, ) -from . import _display from ..fsp._api import maybe_mk_fsp_shm, Fsp from ..fsp import cascade from ..fsp._volume import ( @@ -442,7 +441,13 @@ class FspAdmin: async with stream.subscribe() as stream: async for msg in stream: if msg == 'update': - self.linked.graphics_cycle() + # if the chart isn't hidden try to update + # the data on screen. + if not self.linked.isHidden(): + log.info(f'Re-syncing graphics for fsp: {ns_path}') + self.linked.graphics_cycle(trigger_all=True) + else: + log.info(f'recved unexpected fsp engine msg: {msg}') await complete.wait() From 82f2fa2d377bbeb46cc182067e1f6fcc848a15b0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 15:07:48 -0400 Subject: [PATCH 08/10] Pass in fqsn from chart UI components --- piker/ui/_display.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 1cb13f52..398a180c 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -260,9 +260,7 @@ async def graphics_update_loop( # main loop async for quotes in stream: - ds.quotes = quotes - quote_period = time.time() - last_quote quote_rate = round( 1/quote_period, 1) if quote_period > 0 else float('inf') @@ -572,24 +570,25 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) + fqsn = '.'.join((sym, provider)) async with open_feed( - provider, - [sym], - loglevel=loglevel, + [fqsn], + loglevel=loglevel, - # limit to at least display's FPS - # avoiding needless Qt-in-guest-mode context switches - tick_throttle=_quote_throttle_rate, + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches + tick_throttle=_quote_throttle_rate, ) as feed: ohlcv: ShmArray = feed.shm bars = ohlcv.array symbol = feed.symbols[sym] + fqsn = symbol.front_fqsn() # load in symbol's ohlc data godwidget.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' + f'{fqsn} ' f'tick:{symbol.tick_size} ' f'step:1s ' ) @@ -674,8 +673,7 @@ async def display_symbol_data( open_order_mode( feed, chart, - symbol, - provider, + fqsn, order_mode_started ) ): From d8db9233c9cb199d95eecec1896243b0f07c8c15 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Apr 2022 23:28:30 -0400 Subject: [PATCH 09/10] Establish stream before `fsp_compute` so that backfill updates work again.. --- piker/fsp/_engine.py | 218 ++++++++++++++++++++++++------------------- 1 file changed, 120 insertions(+), 98 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 951aa186..c9c53d60 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -76,7 +76,6 @@ async def filter_quotes_by_sym( async def fsp_compute( - ctx: tractor.Context, symbol: Symbol, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -86,7 +85,7 @@ async def fsp_compute( func: Callable, - attach_stream: bool = False, + # attach_stream: bool = False, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -193,46 +192,47 @@ async def fsp_compute( profiler(f'{func_name} pushed history') profiler.finish() - # TODO: UGH, what is the right way to do something like this? - if not ctx._started_called: - await ctx.started(index) - # setup a respawn handle with trio.CancelScope() as cs: + + # TODO: might be better to just make a "restart" method where + # the target task is spawned implicitly and then the event is + # set via some higher level api? At that poing we might as well + # be writing a one-cancels-one nursery though right? tracker = TaskTracker(trio.Event(), cs) task_status.started((tracker, index)) + profiler(f'{func_name} yield last index') # import time # last = time.time() try: - # rt stream - async with ctx.open_stream() as stream: - # always trigger UI refresh after history update, - # see ``piker.ui._fsp.FspAdmin.open_chain()`` and - # ``piker.ui._display.trigger_update()``. - await stream.send('update') + async for processed in out_stream: - async for processed in out_stream: + log.debug(f"{func_name}: {processed}") + key, output = processed + index = src.index + dst.array[-1][key] = output - log.debug(f"{func_name}: {processed}") - key, output = processed - index = src.index - dst.array[-1][key] = output + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + # TODO: further this should likely be implemented much + # like our `Feed` api where there is one background + # "service" task which computes output and then sends to + # N-consumers who subscribe for the real-time output, + # which we'll likely want to implement using local-mem + # chans for the fan out? + # if attach_stream: + # await client_stream.send(index) - # NOTE: for now we aren't streaming this to the consumer - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - if attach_stream: - await stream.send(index) - - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - # last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() finally: tracker.complete.set() @@ -320,7 +320,6 @@ async def cascade( fsp_target = partial( fsp_compute, - ctx=ctx, symbol=symbol, feed=feed, quote_stream=quote_stream, @@ -329,7 +328,7 @@ async def cascade( src=src, dst=dst, - # func_name=func_name, + # target func=func ) @@ -341,90 +340,113 @@ async def cascade( profiler(f'{func_name}: fsp up') - async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - log.warning(f're-syncing fsp {func_name} to source') - tracker.cs.cancel() - await tracker.complete.wait() - return await n.start(fsp_target) + # sync client + await ctx.started(index) - def is_synced( - src: ShmArray, - dst: ShmArray - ) -> tuple[bool, int, int]: - '''Predicate to dertmine if a destination FSP - output array is aligned to its source array. + # XXX: rt stream with client which we MUST + # open here (and keep it open) in order to make + # incremental "updates" as history prepends take + # place. + async with ctx.open_stream() as client_stream: - ''' - step_diff = src.index - dst.index - len_diff = abs(len(src.array) - len(dst.array)) - return not ( - # the source is likely backfilling and we must - # sync history calculations - len_diff > 2 or + # TODO: these likely should all become + # methods of this ``TaskLifetime`` or wtv + # abstraction.. + async def resync( + tracker: TaskTracker, - # we aren't step synced to the source and may be - # leading/lagging by a step - step_diff > 1 or - step_diff < 0 - ), step_diff, len_diff + ) -> tuple[TaskTracker, int]: + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + tracker, index = await n.start(fsp_target) - async def poll_and_sync_to_step( + # always trigger UI refresh after history update, + # see ``piker.ui._fsp.FspAdmin.open_chain()`` and + # ``piker.ui._display.trigger_update()``. + await client_stream.send('update') + return tracker, index - tracker: TaskTracker, - src: ShmArray, - dst: ShmArray, + def is_synced( + src: ShmArray, + dst: ShmArray + ) -> tuple[bool, int, int]: + '''Predicate to dertmine if a destination FSP + output array is aligned to its source array. - ) -> tuple[TaskTracker, int]: + ''' + step_diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return not ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or + + # we aren't step synced to the source and may be + # leading/lagging by a step + step_diff > 1 or + step_diff < 0 + ), step_diff, len_diff + + async def poll_and_sync_to_step( + + tracker: TaskTracker, + src: ShmArray, + dst: ShmArray, + + ) -> tuple[TaskTracker, int]: - synced, step_diff, _ = is_synced(src, dst) - while not synced: - tracker, index = await resync(tracker) synced, step_diff, _ = is_synced(src, dst) + while not synced: + tracker, index = await resync(tracker) + synced, step_diff, _ = is_synced(src, dst) - return tracker, step_diff + return tracker, step_diff - s, step, ld = is_synced(src, dst) + s, step, ld = is_synced(src, dst) - # detect sample period step for subscription to increment - # signal - times = src.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # detect sample period step for subscription to increment + # signal + times = src.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - async with feed.index_stream(int(delay_s)) as istream: + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. + async with feed.index_stream( + int(delay_s) + ) as istream: - profiler(f'{func_name}: sample stream up') - profiler.finish() + profiler(f'{func_name}: sample stream up') + profiler.finish() - async for _ in istream: + async for _ in istream: - # respawn the compute task if the source - # array has been updated such that we compute - # new history from the (prepended) source. - synced, step_diff, _ = is_synced(src, dst) - if not synced: - tracker, step_diff = await poll_and_sync_to_step( - tracker, - src, - dst, - ) + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + synced, step_diff, _ = is_synced(src, dst) + if not synced: + tracker, step_diff = await poll_and_sync_to_step( + tracker, + src, + dst, + ) - # skip adding a last bar since we should already - # be step alinged - if step_diff == 0: - continue + # skip adding a last bar since we should already + # be step alinged + if step_diff == 0: + continue - # read out last shm row, copy and write new row - array = dst.array + # read out last shm row, copy and write new row + array = dst.array - # some metrics like vlm should be reset - # to zero every step. - if zero_on_step: - last = zeroed - else: - last = array[-1:].copy() + # some metrics like vlm should be reset + # to zero every step. + if zero_on_step: + last = zeroed + else: + last = array[-1:].copy() - dst.push(last) + dst.push(last) From bcb4fe8c503115529ea06165d1a7fc2605f24637 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Apr 2022 18:57:00 -0400 Subject: [PATCH 10/10] Indefinitely wait on feed hack for windows? --- piker/brokers/ib.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 65d6f4c0..f66f81c5 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -37,7 +37,6 @@ import asyncio from pprint import pformat import inspect import logging -import platform from random import randint import time @@ -1583,7 +1582,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. - count: int = 100, + count: int = 16, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1603,11 +1602,6 @@ async def backfill_bars( # async with open_history_client(fqsn) as proxy: async with open_client_proxy() as proxy: - if platform.system() == 'Windows': - log.warning( - 'Decreasing history query count to 4 since, windows...') - count = 4 - out, fails = await get_bars(proxy, fqsn) if out is None: