From 30a5f32ef856e8fe7b324364f72464bf911f8075 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Sep 2021 13:42:13 -0400 Subject: [PATCH 1/7] Add back in rsi --- piker/ui/_display.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 2b825bea..e5c4b710 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -868,13 +868,13 @@ async def display_symbol_data( # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { - # 'rsi': { - # 'fsp_func_name': 'rsi', - # 'params': {'period': 14}, - # 'chart_kwargs': { - # 'static_yrange': (0, 100), - # }, - # }, + 'rsi': { + 'fsp_func_name': 'rsi', + 'params': {'period': 14}, + 'chart_kwargs': { + 'static_yrange': (0, 100), + }, + }, } if has_vlm(ohlcv): From 4ea42a0a7e3478279845be8eb75d9d0885bd0c5f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 Sep 2021 18:17:15 -0400 Subject: [PATCH 2/7] More prep for FSP feeds The major change is moving the fsp "daemon" (more like wanna-be fspd) endpoint to use the newer `tractor.Portal.open_context()` and bi-directional streaming api. There's a few other things in here too: - make a helper for allocating single colume fsp shm arrays - rename some some fsp related functions to be more explicit on their purposes --- piker/ui/_display.py | 157 ++++++++++++++++++++++++++++++------------- 1 file changed, 112 insertions(+), 45 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index e5c4b710..bce3a091 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -53,6 +53,7 @@ from ..log import get_logger log = get_logger(__name__) +# TODO: load these from a config.toml! _clear_throttle_rate: int = 58 # Hz _book_throttle_rate: int = 16 # Hz @@ -184,6 +185,13 @@ async def update_chart_from_quotes( if vlm_chart: # print(f"volume: {end['volume']}") vlm_chart.update_curve_from_array('volume', array) + + # built-in tina $vlm FSP using chl3 typical price for ohlc step + # last = array[-1] + # chl3 = (last['close'] + last['high'] + last['low']) / 3 + # v = last['volume'] + # dv = last['volume'] * chl3 + vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) if ( @@ -302,7 +310,38 @@ async def update_chart_from_quotes( last_mx, last_mn = mx, mn -async def fan_out_spawn_fsp_daemons( +def maybe_mk_fsp_shm( + sym: str, + field_name: str, + display_name: Optional[str] = None, + readonly: bool = True, + +) -> (ShmArray, bool): + '''Allocate a single row shm array for an symbol-fsp pair. + + ''' + uid = tractor.current_actor().uid + if not display_name: + display_name = field_name + + # TODO: load function here and introspect + # return stream type(s) + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (field_name, float)]) + + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + return shm, opened + + +async def open_fspd_cluster( linkedsplits: LinkedSplits, fsps: dict[str, str], @@ -321,9 +360,8 @@ async def fan_out_spawn_fsp_daemons( ''' linkedsplits.focus() - uid = tractor.current_actor().uid - - # spawns sub-processes which execute cpu bound FSP code + # spawns sub-processes which execute cpu bound fsp work + # which is streamed back to this parent. async with ( tractor.open_nursery() as n, trio.open_nursery() as ln, @@ -334,21 +372,12 @@ async def fan_out_spawn_fsp_daemons( # scale horizonatlly once cores are used up. for display_name, conf in fsps.items(): - fsp_func_name = conf['fsp_func_name'] + func_name = conf['func_name'] - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - # this is all sync currently - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, + shm, opened = maybe_mk_fsp_shm( + sym, + field_name=func_name, + display_name=display_name, readonly=True, ) @@ -366,13 +395,13 @@ async def fan_out_spawn_fsp_daemons( # init async ln.start_soon( - run_fsp, + update_chart_from_fsp, portal, linkedsplits, brokermod, sym, src_shm, - fsp_func_name, + func_name, display_name, conf, group_status_key, @@ -383,7 +412,7 @@ async def fan_out_spawn_fsp_daemons( @asynccontextmanager -async def open_sidepane( +async def open_fsp_sidepane( linked: LinkedSplits, conf: dict[str, dict[str, str]], @@ -403,8 +432,11 @@ async def open_sidepane( } # add parameters for selection "options" - defaults = config.get('params', {}) - for name, default in defaults.items(): + params = config.get('params', {}) + for name, config in params.items(): + + default = config['default_value'] + kwargs = config.get('widget_kwargs', {}) # add to ORM schema schema.update({ @@ -412,6 +444,7 @@ async def open_sidepane( 'label': f'**{name}**:', 'type': 'edit', 'default_value': default, + 'kwargs': kwargs, }, }) @@ -424,7 +457,7 @@ async def open_sidepane( FspConfig = create_model( 'FspConfig', name=display_name, - **defaults, + **params, ) sidepane.model = FspConfig() @@ -444,14 +477,15 @@ async def open_sidepane( yield sidepane -async def run_fsp( +async def update_chart_from_fsp( + + portal: tractor.Portal, - portal: tractor._portal.Portal, linkedsplits: LinkedSplits, brokermod: ModuleType, sym: str, src_shm: ShmArray, - fsp_func_name: str, + func_name: str, display_name: str, conf: dict[str, dict], group_status_key: str, @@ -480,17 +514,17 @@ async def run_fsp( src_shm_token=src_shm.token, dst_shm_token=conf['shm'].token, symbol=sym, - func_name=fsp_func_name, + func_name=func_name, loglevel=loglevel, ) as (ctx, last_index), ctx.open_stream() as stream, - open_sidepane( + + open_fsp_sidepane( linkedsplits, {display_name: conf}, ) as sidepane, ): - shm = conf['shm'] if conf.get('overlay'): @@ -509,7 +543,7 @@ async def run_fsp( name=display_name, array=shm.array, - array_key=conf['fsp_func_name'], + array_key=conf['func_name'], sidepane=sidepane, # curve by default @@ -537,7 +571,7 @@ async def run_fsp( # XXX: fsp func names must be unique meaning we don't have # duplicates of the underlying data even if multiple # sub-charts reference it under different 'named charts'. - value = array[fsp_func_name][-1] + value = array[func_name][-1] last_val_sticky.update_from_data(-1, value) @@ -548,7 +582,7 @@ async def run_fsp( chart.update_curve_from_array( display_name, shm.array, - array_key=fsp_func_name + array_key=func_name ) chart.linked.resize_sidepanes() @@ -559,11 +593,11 @@ async def run_fsp( # generic fills between curve types while ``PlotCurveItem`` has # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) + # graphics = chart.update_from_array(chart.name, array[func_name]) # graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setFillLevel(50) - if fsp_func_name == 'rsi': + if func_name == 'rsi': from ._lines import level_line # add moveable over-[sold/bought] lines # and labels only for the 70/30 lines @@ -607,7 +641,7 @@ async def run_fsp( try: # read last array = shm.array - value = array[-1][fsp_func_name] + value = array[-1][func_name] break except IndexError: @@ -621,7 +655,7 @@ async def run_fsp( chart.update_curve_from_array( display_name, array, - array_key=fsp_func_name, + array_key=func_name, ) # set time of last graphics update @@ -707,12 +741,26 @@ async def maybe_open_vlm_display( yield return else: - async with open_sidepane( + + shm, opened = maybe_mk_fsp_shm( + linked.symbol.key, + '$_vlm', + readonly=True, + ) + + async with open_fsp_sidepane( linked, { 'volume': { + 'params': { - 'price_func': 'ohl3' - } + + 'price_func': { + 'default_value': 'ohl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, } }, ) as sidepane: @@ -868,20 +916,39 @@ async def display_symbol_data( # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { + 'rsi': { - 'fsp_func_name': 'rsi', - 'params': {'period': 14}, + + # literal python func ref lookup name + 'func_name': 'rsi', + + # map of parameters to place on the fsp sidepane widget + # which should map to dynamic inputs available to the + # fsp function at runtime. + 'params': { + 'period': { + 'default_value': 14, + 'widget_kwargs': {'readonly': True}, + }, + }, + + # ``ChartPlotWidget`` options passthrough 'chart_kwargs': { 'static_yrange': (0, 100), }, }, + } - if has_vlm(ohlcv): + if has_vlm(ohlcv): # and provider != 'binance': + # binance is too fast atm for FSPs until we wrap + # the fsp streams as throttled ``Feeds``, see + # + # add VWAP to fsp config for downstream loading fsp_conf.update({ 'vwap': { - 'fsp_func_name': 'vwap', + 'func_name': 'vwap', 'overlay': True, 'anchor': 'session', }, @@ -900,7 +967,7 @@ async def display_symbol_data( ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon( - fan_out_spawn_fsp_daemons, + open_fspd_cluster, linkedsplits, fsp_conf, sym, From 96937829eb9c00d350e3b64cb48092d76be29727 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Sep 2021 12:26:09 -0400 Subject: [PATCH 3/7] Factor FSP subplot update code into func This is in prep toward doing fsp graphics updates from the main quotes update loop (where OHLC and volume are done). Updating fsp output from that task should, for the majority of cases, be fine presuming the processing is derived from the quote stream as a source. Further, calling an update function on each fsp subplot/overlay is of course faster then a full task switch - which is how it currently works with a separate stream for every fsp output. This also will let us delay adding full `Feed` support around fsp streams for the moment while still getting quote throttling dictated by the quote stream. Going forward, We can still support a separate task/fsp stream for updates as needed (ex. some kind of fast external data source that isn't synced with price data) but it should be enabled as needed required by the user. --- piker/ui/_display.py | 298 +++++++++++++++++++++++-------------------- 1 file changed, 160 insertions(+), 138 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index bce3a091..432eaf14 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -20,6 +20,7 @@ Real-time display tasks for charting / graphics. ''' from contextlib import asynccontextmanager # from pprint import pformat +from functools import partial import time from types import ModuleType from typing import Optional @@ -341,76 +342,6 @@ def maybe_mk_fsp_shm( return shm, opened -async def open_fspd_cluster( - - linkedsplits: LinkedSplits, - fsps: dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - '''Create financial signal processing sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. - - Pass target entrypoint and historical data. - - ''' - linkedsplits.focus() - - # spawns sub-processes which execute cpu bound fsp work - # which is streamed back to this parent. - async with ( - tractor.open_nursery() as n, - trio.open_nursery() as ln, - ): - - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): - - func_name = conf['func_name'] - - shm, opened = maybe_mk_fsp_shm( - sym, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - conf['shm'] = shm - - portal = await n.start_actor( - enable_modules=['piker.fsp._engine'], - name='fsp.' + display_name, - ) - - # init async - ln.start_soon( - update_chart_from_fsp, - portal, - linkedsplits, - brokermod, - sym, - src_shm, - func_name, - display_name, - conf, - group_status_key, - loglevel, - ) - - # blocks here until all fsp actors complete - - @asynccontextmanager async def open_fsp_sidepane( @@ -477,6 +408,84 @@ async def open_fsp_sidepane( yield sidepane +async def open_fspd_cluster( + + linkedsplits: LinkedSplits, + fsps: dict[str, str], + sym: str, + src_shm: list, + brokermod: ModuleType, + group_status_key: str, + loglevel: str, + + # this con + display_in_own_task: bool = False, + +) -> None: + '''Create sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. + + Pass target entrypoint and historical data. + + ''' + linkedsplits.focus() + + # spawns sub-processes which execute cpu bound fsp work + # which is streamed back to this parent. + async with ( + tractor.open_nursery() as n, + trio.open_nursery() as ln, + ): + + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for display_name, conf in fsps.items(): + + func_name = conf['func_name'] + + shm, opened = maybe_mk_fsp_shm( + sym, + field_name=func_name, + display_name=display_name, + readonly=True, + ) + + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + + # conf['shm'] = shm + + portal = await n.start_actor( + enable_modules=['piker.fsp._engine'], + name='fsp.' + display_name, + ) + + # init async + ln.start_soon( + partial( + update_chart_from_fsp, + + portal, + linkedsplits, + brokermod, + sym, + src_shm, + func_name, + display_name, + conf=conf, + shm=shm, + is_overlay=conf.get('overlay', False), + group_status_key=group_status_key, + loglevel=loglevel, + ) + ) + + # blocks here until all fsp actors complete + + async def update_chart_from_fsp( portal: tractor.Portal, @@ -488,6 +497,10 @@ async def update_chart_from_fsp( func_name: str, display_name: str, conf: dict[str, dict], + + shm: ShmArray, + is_overlay: bool, + group_status_key: str, loglevel: str, @@ -512,7 +525,7 @@ async def update_chart_from_fsp( # name as title of sub-chart brokername=brokermod.name, src_shm_token=src_shm.token, - dst_shm_token=conf['shm'].token, + dst_shm_token=shm.token, symbol=sym, func_name=func_name, loglevel=loglevel, @@ -520,14 +533,9 @@ async def update_chart_from_fsp( ) as (ctx, last_index), ctx.open_stream() as stream, - open_fsp_sidepane( - linkedsplits, - {display_name: conf}, - ) as sidepane, + open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, ): - shm = conf['shm'] - - if conf.get('overlay'): + if is_overlay: chart = linkedsplits.chart chart.draw_curve( name=display_name, @@ -535,10 +543,8 @@ async def update_chart_from_fsp( overlay=True, color='default_light', ) - last_val_sticky = None else: - chart = linkedsplits.add_plot( name=display_name, array=shm.array, @@ -562,30 +568,17 @@ async def update_chart_from_fsp( # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] + array_key = func_name - # read from last calculated value - array = shm.array - - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - value = array[func_name][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.linked.focus() - - # works also for overlays in which case data is looked up from - # internal chart array set.... - chart.update_curve_from_array( + # first UI update, usually from shm pushed history + update_fsp_chart( + chart, + shm, display_name, - shm.array, - array_key=func_name + array_key=array_key, ) - chart.linked.resize_sidepanes() + chart.linked.focus() # TODO: figure out if we can roll our own `FillToThreshold` to # get brush filled polygons for OS/OB conditions. @@ -608,58 +601,85 @@ async def update_chart_from_fsp( chart._set_yrange() - last = time.time() - done() + chart.linked.resize_sidepanes() - # i = 0 # update chart graphics + i = 0 + last = time.time() async for value in stream: # chart isn't actively shown so just skip render cycle if chart.linked.isHidden(): - # print(f'{i} unseen fsp cyclce') - # i += 1 + print(f'{i} unseen fsp cyclce') + i += 1 continue - now = time.time() - period = now - last + else: + now = time.time() + period = now - last - # if period <= 1/30: - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - # print(f'fsp too fast: {1/period}') - continue - - # TODO: provide a read sync mechanism to avoid this polling. - # the underlying issue is that a backfill and subsequent shm - # array first/last index update could result in an empty array - # read here since the stream is never torn down on the - # re-compute steps. - read_tries = 2 - while read_tries > 0: - try: - # read last - array = shm.array - value = array[-1][func_name] - break - - except IndexError: - read_tries -= 1 + if period <= 1/_clear_throttle_rate: + # faster then display refresh rate + print(f'fsp too fast: {1/period}') continue - if last_val_sticky: - last_val_sticky.update_from_data(-1, value) + # run synchronous update + update_fsp_chart( + chart, + shm, + display_name, + array_key=func_name, + ) - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=func_name, - ) + # set time of last graphics update + last = time.time() - # set time of last graphics update - last = time.time() + +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + display_name: str, + array_key: str, + +) -> None: + + array = shm.array + + # XXX: is this a problem any more after porting to the + # ``tractor.Context`` api? + # TODO: provide a read sync mechanism to avoid this polling. the + # underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the stream is never torn down on the re-compute + # steps. + # read_tries = 2 + # while read_tries > 0: + # try: + # # read last + # array = shm.array + # value = array[-1][array_key] + # break + + # except IndexError: + # read_tries -= 1 + # continue + + # update graphics + chart.update_curve_from_array( + display_name, + array, + array_key=array_key, + ) + + last_val_sticky = chart._ysticks.get(display_name) + if last_val_sticky: + # read from last calculated value + # XXX: fsp func names must be unique meaning we don't have + # duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + value = shm.array[array_key][-1] + last_val_sticky.update_from_data(-1, value) async def check_for_new_bars(feed, ohlcv, linkedsplits): @@ -702,6 +722,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): just_history=False, ) + # main chart overlays for name in price_chart._overlays: price_chart.update_curve_from_array( @@ -709,6 +730,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): price_chart._arrays[name] ) + # each subplot for name, chart in linkedsplits.subplots.items(): chart.update_curve_from_array( chart.name, From e8cd1a0e830752e57c7918dbe4eb1fa81762fd0f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 27 Sep 2021 10:25:42 -0400 Subject: [PATCH 4/7] Update fsps and overlays inside main OHLC chart update loop --- piker/ui/_display.py | 136 ++++++++++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 52 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 432eaf14..b51db7d8 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -59,9 +59,58 @@ _clear_throttle_rate: int = 58 # Hz _book_throttle_rate: int = 16 # Hz +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + display_name: str, + array_key: Optional[str], + +) -> None: + + array = shm.array + + # XXX: is this a problem any more after porting to the + # ``tractor.Context`` api or can we remove it? + + # TODO: provide a read sync mechanism to avoid this polling. the + # underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the stream is never torn down on the re-compute + # steps. + # read_tries = 2 + # while read_tries > 0: + # try: + # # read last + # array = shm.array + # value = array[-1][array_key] + # break + + # except IndexError: + # read_tries -= 1 + # continue + + # update graphics + chart.update_curve_from_array( + display_name, + array, + array_key=array_key or display_name, + ) + + last_val_sticky = chart._ysticks.get(display_name) + if last_val_sticky: + # read from last calculated value + # XXX: fsp func names must be unique meaning we don't have + # duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + array = shm.array[array_key] + if len(array): + value = array[-1] + last_val_sticky.update_from_data(-1, value) + + async def update_chart_from_quotes( - chart: ChartPlotWidget, + linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -84,6 +133,8 @@ async def update_chart_from_quotes( # - 1-5 sec bar lookback-autocorrection like tws does? # (would require a background history checker task) + chart = linked.chart + # update last price sticky last_price_sticky = chart._ysticks[chart.name] last_price_sticky.update_from_data( @@ -310,6 +361,33 @@ async def update_chart_from_quotes( last_mx, last_mn = mx, mn + # run synchronous update on all derived fsp subplots + # print(f'subplots: {linked.subplots.keys()}') + for name, subchart in linked.subplots.items(): + update_fsp_chart( + subchart, + subchart._shm, + + # XXX: do we really needs seperate names here? + name, + array_key=name, + ) + + # TODO: all overlays on all subplots.. + + # run synchronous update on all derived overlays + # print(f'overlays: {chart._overlays}') + for name, shm in chart._overlays.items(): + update_fsp_chart( + chart, + shm, + + # XXX: do we really needs seperate names here? + name, + array_key=name, + ) + + def maybe_mk_fsp_shm( sym: str, @@ -318,7 +396,8 @@ def maybe_mk_fsp_shm( readonly: bool = True, ) -> (ShmArray, bool): - '''Allocate a single row shm array for an symbol-fsp pair. + '''Allocate a single row shm array for an symbol-fsp pair if none + exists, otherwise load the shm already existing for that token. ''' uid = tractor.current_actor().uid @@ -436,7 +515,6 @@ async def open_fspd_cluster( tractor.open_nursery() as n, trio.open_nursery() as ln, ): - # Currently we spawn an actor per fsp chain but # likely we'll want to pool them eventually to # scale horizonatlly once cores are used up. @@ -456,8 +534,6 @@ async def open_fspd_cluster( # "feeds". assert opened, f"A chart for {key} likely # already exists?" - # conf['shm'] = shm - portal = await n.start_actor( enable_modules=['piker.fsp._engine'], name='fsp.' + display_name, @@ -543,6 +619,8 @@ async def update_chart_from_fsp( overlay=True, color='default_light', ) + # specially store ref to shm for lookup in display loop + chart._overlays[display_name] = shm else: chart = linkedsplits.add_plot( @@ -636,52 +714,6 @@ async def update_chart_from_fsp( last = time.time() -def update_fsp_chart( - chart: ChartPlotWidget, - shm: ShmArray, - display_name: str, - array_key: str, - -) -> None: - - array = shm.array - - # XXX: is this a problem any more after porting to the - # ``tractor.Context`` api? - # TODO: provide a read sync mechanism to avoid this polling. the - # underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the stream is never torn down on the re-compute - # steps. - # read_tries = 2 - # while read_tries > 0: - # try: - # # read last - # array = shm.array - # value = array[-1][array_key] - # break - - # except IndexError: - # read_tries -= 1 - # continue - - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=array_key, - ) - - last_val_sticky = chart._ysticks.get(display_name) - if last_val_sticky: - # read from last calculated value - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - value = shm.array[array_key][-1] - last_val_sticky.update_from_data(-1, value) - - async def check_for_new_bars(feed, ohlcv, linkedsplits): """Task which updates from new bars in the shared ohlcv buffer every ``delay_s`` seconds. @@ -1002,7 +1034,7 @@ async def display_symbol_data( # start graphics update loop(s)after receiving first live quote ln.start_soon( update_chart_from_quotes, - chart, + linkedsplits, feed.stream, ohlcv, wap_in_history, From 853e8d4466340f01a400c1a7a07d7d52852fc506 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Sep 2021 07:56:14 -0400 Subject: [PATCH 5/7] Process framed ticks by type in main graphics loop We are already packing framed ticks in extended lists from the `.data._sampling.uniform_rate_send()` task so the natural solution to avoid needless graphics cycles for HFT-ish feeds (like binance) is to unpack those frames and for most cases only update graphics with the "latest" data per loop iteration. Unpacking in this way also lessens nested-iterations per tick type. Btw, this also effectively solves all remaining issues of fast tick feeds over-triggering the graphics loop renders as long as the original quote stream is throttled appropriately, usually to the local display rate. Relates to #183, #192 Dirty deats: - drop all per-tick rate checks, they were always somewhat pointless when iterating a frame of ticks per render cycle XD. - unpack tick frame into ticks per frame type, and last of each type; the lasts are used to update each part of the UI/graphics by class. - only skip the label update if we can't retrieve the last from from a graphics source array; it seems `chart.update_curve_from_array()` already does a `len` check internally. - add some draft commented code for tick type classes and a possible wire framed tick data structure. - move `chart_maxmin()` range computer to module level, bind a chart to it with a `partial.` - only check rate limits in main quote loop thus reporting actual overages - add in commented logic for only updating the "last" cleared price from the most recent framed value if we want to eventually (right now seems like this is only relevant to ib and it's dark trades: `utrade`). - rename `_clear_throttle_rate` -> `_quote_throttle_rate`, drop `_book_throttle_rate`. --- piker/ui/_app.py | 6 +- piker/ui/_display.py | 325 ++++++++++++++++++++++++++----------------- 2 files changed, 201 insertions(+), 130 deletions(-) diff --git a/piker/ui/_app.py b/piker/ui/_app.py index ab96f45d..78db608c 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -85,11 +85,11 @@ async def _async_main( screen = godwidget.window.current_screen() # configure graphics update throttling based on display refresh rate - _display._clear_throttle_rate = min( + _display._quote_throttle_rate = min( round(screen.refreshRate()), - _display._clear_throttle_rate, + _display._quote_throttle_rate, ) - log.info(f'Set graphics update rate to {_display._clear_throttle_rate} Hz') + log.info(f'Set graphics update rate to {_display._quote_throttle_rate} Hz') # TODO: do styling / themeing setup # _style.style_ze_sheets(godwidget) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index b51db7d8..e3c6f342 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -15,11 +15,13 @@ # along with this program. If not, see . ''' -Real-time display tasks for charting / graphics. +real-time display tasks for charting graphics update. + +this module ties together quote and computational (fsp) streams with +graphics update methods via our custom ``pyqtgraph`` charting api. ''' from contextlib import asynccontextmanager -# from pprint import pformat from functools import partial import time from types import ModuleType @@ -54,25 +56,46 @@ from ..log import get_logger log = get_logger(__name__) -# TODO: load these from a config.toml! -_clear_throttle_rate: int = 58 # Hz -_book_throttle_rate: int = 16 # Hz +# TODO: load this from a config.toml! +_quote_throttle_rate: int = 58 # Hz def update_fsp_chart( chart: ChartPlotWidget, shm: ShmArray, - display_name: str, + graphics_name: str, array_key: Optional[str], ) -> None: array = shm.array - # XXX: is this a problem any more after porting to the - # ``tractor.Context`` api or can we remove it? + # update graphics + # NOTE: this does a length check internally which allows it + # staying above the last row check below.. + chart.update_curve_from_array( + graphics_name, + array, + array_key=array_key or graphics_name, + ) - # TODO: provide a read sync mechanism to avoid this polling. the + try: + last_row = array[-1] + except IndexError: + # XXX: race condition with backfilling shm. + # + # the underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the indices may be updated in such a way that + # a read delivers an empty array (though it seems like we + # *should* be able to prevent that?). also, as and alt and + # something we need anyway, maybe there should be some kind of + # signal that a prepend is taking place and this consumer can + # respond (eg. redrawing graphics) accordingly. + log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') + return + + # TODO: provide a read sync mechanism to avoid this polling. the # underlying issue is that a backfill (aka prepend) and subsequent # shm array first/last index update could result in an empty array # read here since the stream is never torn down on the re-compute @@ -89,23 +112,75 @@ def update_fsp_chart( # read_tries -= 1 # continue - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=array_key or display_name, - ) + # XXX: re: ``array_key``: fsp func names must be unique meaning we + # can't have duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. - last_val_sticky = chart._ysticks.get(display_name) + # read from last calculated value and update any label + last_val_sticky = chart._ysticks.get(graphics_name) if last_val_sticky: - # read from last calculated value - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - array = shm.array[array_key] - if len(array): - value = array[-1] - last_val_sticky.update_from_data(-1, value) + # array = shm.array[array_key] + # if len(array): + # value = array[-1] + last = last_row[array_key] + last_val_sticky.update_from_data(-1, last) + + +# _clses = { +# 'clears': {'trade', 'utrade', 'last'}, +# 'last': {'last'}, +# 'bids': {'bid', 'bsize'}, +# 'asks': {'ask', 'asize'}, +# } + +# XXX: idea for frame type data structure we could use on the +# wire instead of doing it here? +# frames = { +# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'], + +# 'type_a': [tick0, tick1, tick2, .., tickn], +# 'type_b': [tick0, tick1, tick2, .., tickn], +# 'type_c': [tick0, tick1, tick2, .., tickn], +# ... +# 'type_n': [tick0, tick1, tick2, .., tickn], +# } + + +def chart_maxmin( + chart: ChartPlotWidget, + vlm_chart: Optional[ChartPlotWidget] = None, + +) -> tuple[ + tuple[int, int, int, int], + float, + float, + float, +]: + # TODO: implement this + # https://arxiv.org/abs/cs/0610046 + # https://github.com/lemire/pythonmaxmin + + array = chart._arrays['ohlc'] + ifirst = array[0]['index'] + + last_bars_range = chart.bars_range() + l, lbar, rbar, r = last_bars_range + in_view = array[lbar - ifirst:rbar - ifirst + 1] + + assert in_view.size + + mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) + + # TODO: when we start using line charts, probably want to make + # this an overloaded call on our `DataView + # sym = chart.name + # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) + + mx_vlm_in_view = 0 + if vlm_chart: + mx_vlm_in_view = np.max(in_view['volume']) + + return last_bars_range, mx, max(mn, 0), mx_vlm_in_view async def update_chart_from_quotes( @@ -144,32 +219,7 @@ async def update_chart_from_quotes( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - def maxmin(): - # TODO: implement this - # https://arxiv.org/abs/cs/0610046 - # https://github.com/lemire/pythonmaxmin - - array = chart._arrays['ohlc'] - ifirst = array[0]['index'] - - last_bars_range = chart.bars_range() - l, lbar, rbar, r = last_bars_range - in_view = array[lbar - ifirst:rbar - ifirst + 1] - - assert in_view.size - - mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) - - # TODO: when we start using line charts, probably want to make - # this an overloaded call on our `DataView - # sym = chart.name - # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) - - mx_vlm_in_view = 0 - if vlm_chart: - mx_vlm_in_view = np.max(in_view['volume']) - - return last_bars_range, mx, max(mn, 0), mx_vlm_in_view + maxmin = partial(chart_maxmin, chart, vlm_chart) chart.default_view() @@ -203,21 +253,27 @@ async def update_chart_from_quotes( tick_size = chart.linked.symbol.tick_size tick_margin = 3 * tick_size - last_ask = last_bid = last_clear = time.time() chart.show() + last_quote = time.time() + # NOTE: all code below this loop is expected to be synchronous + # and thus draw instructions are not picked up jntil the next + # wait / iteration. async for quotes in stream: - # chart isn't actively shown so just skip render cycle + now = time.time() + quote_period = now - last_quote + if quote_period <= 1/_quote_throttle_rate: + log.warning(f'TOO FAST: {1/quote_period}') + last_quote = now + + # chart isn't active/shown so skip render cycle and pause feed(s) if chart.linked.isHidden(): await chart.pause_all_feeds() continue for sym, quote in quotes.items(): - now = time.time() - - # brange, mx_in_view, mn_in_view = maxmin() ( brange, mx_in_view, @@ -254,30 +310,75 @@ async def update_chart_from_quotes( vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) last_mx_vlm = mx_vlm_in_view - for tick in quote.get('ticks', ()): + ticks_frame = quote.get('ticks', ()) - # log.info( - # f"quotes: {pformat(quote['symbol'])}: {pformat(tick)}") - ticktype = tick.get('type') + frames_by_type: dict[str, dict] = {} + lasts = {} + + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. + for tick in ticks_frame: price = tick.get('price') - size = tick.get('size') + ticktype = tick.get('type') if ticktype == 'n/a' or price == -1: # okkk.. continue - # clearing price event - if ticktype in ('trade', 'utrade', 'last'): + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) - # throttle clearing price updates to ~ max 60 FPS - period = now - last_clear - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - continue + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick - # print(f'passthrough {tick}\n{1/(now-last_clear)}') - # set time of last graphics update - last_clear = now + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print(f'{pformat(frame_counts)}') + # print(f'framed: {pformat(frames_by_type)}') + # print(f'lasts: {pformat(lasts)}') + + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = {'trade', 'utrade', 'last'} + + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False + # for typ, tick in reversed(lasts.items()): + + # iterate in FIFO order per frame + for typ, tick in lasts.items(): + + price = tick.get('price') + size = tick.get('size') + + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) + + if typ in clear_types: + + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue + + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. # update price sticky(s) end = array[-1] @@ -295,33 +396,11 @@ async def update_chart_from_quotes( # update vwap overlay line chart.update_curve_from_array('bar_wap', ohlcv.array) - # l1 book events - # throttle the book graphics updates at a lower rate - # since they aren't as critical for a manual user - # viewing the chart - - elif ticktype in ('ask', 'asize'): - if (now - last_ask) <= 1/_book_throttle_rate: - # print(f'skipping\n{tick}') - continue - - # print(f'passthrough {tick}\n{1/(now-last_ask)}') - last_ask = now - - elif ticktype in ('bid', 'bsize'): - if (now - last_bid) <= 1/_book_throttle_rate: - continue - - # print(f'passthrough {tick}\n{1/(now-last_bid)}') - last_bid = now - - # compute max and min trade values to display in view - # TODO: we need a streaming minmax algorithm here, see - # def above. - - # XXX: prettty sure this is correct? + # L1 book label-line updates + # XXX: is this correct for ib? # if ticktype in ('trade', 'last'): - if ticktype in ('last',): # 'size'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): label = { l1.ask_label.fields['level']: l1.ask_label, @@ -331,38 +410,34 @@ async def update_chart_from_quotes( if label is not None: label.update_fields({'level': price, 'size': size}) - # on trades should we be knocking down + # TODO: on trades should we be knocking down # the relevant L1 queue? # label.size -= size - elif ticktype in ('ask', 'asize'): + # elif ticktype in ('ask', 'asize'): + elif typ in ('ask', 'asize'): l1.ask_label.update_fields({'level': price, 'size': size}) - elif ticktype in ('bid', 'bsize'): + # elif ticktype in ('bid', 'bsize'): + elif typ in ('bid', 'bsize'): l1.bid_label.update_fields({'level': price, 'size': size}) - # in view y-range checking for auto-scale - # update the max/min price in view to keep bid/ask on screen - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) - if (mx > last_mx) or ( - mn < last_mn - ): - # print(f'new y range: {(mn, mx)}') - chart._set_yrange( - yrange=(mn, mx), - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, - ) + # check for y-range re-size + if (mx > last_mx) or (mn < last_mn): + # print(f'new y range: {(mn, mx)}') + chart._set_yrange( + yrange=(mn, mx), + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + ) - last_mx, last_mn = mx, mn + last_mx, last_mn = mx, mn # run synchronous update on all derived fsp subplots - # print(f'subplots: {linked.subplots.keys()}') for name, subchart in linked.subplots.items(): update_fsp_chart( subchart, @@ -376,19 +451,15 @@ async def update_chart_from_quotes( # TODO: all overlays on all subplots.. # run synchronous update on all derived overlays - # print(f'overlays: {chart._overlays}') - for name, shm in chart._overlays.items(): + for curve_name, shm in chart._overlays.items(): update_fsp_chart( chart, shm, - - # XXX: do we really needs seperate names here? - name, - array_key=name, + curve_name, + array_key=curve_name, ) - def maybe_mk_fsp_shm( sym: str, field_name: str, @@ -697,7 +768,7 @@ async def update_chart_from_fsp( now = time.time() period = now - last - if period <= 1/_clear_throttle_rate: + if period <= 1/_quote_throttle_rate: # faster then display refresh rate print(f'fsp too fast: {1/period}') continue @@ -915,7 +986,7 @@ async def display_symbol_data( loglevel=loglevel, # 60 FPS to limit context switches - tick_throttle=_clear_throttle_rate, + tick_throttle=_quote_throttle_rate, ) as feed, ): From 21386f6c1ffa71908f886b8b1204f109a23d99f7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Oct 2021 13:02:04 -0400 Subject: [PATCH 6/7] Rename feed bus entrypoint --- piker/data/feed.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index fbf8035b..f8cb0e9b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -34,7 +34,6 @@ import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor -# from tractor import _broadcast from pydantic import BaseModel from ..brokers import get_brokermod @@ -252,7 +251,7 @@ async def allocate_persistent_feed( @tractor.context -async def attach_feed_bus( +async def open_feed_bus( ctx: tractor.Context, brokername: str, @@ -512,7 +511,7 @@ async def open_feed( portal.open_context( - attach_feed_bus, + open_feed_bus, brokername=brokername, symbol=sym, loglevel=loglevel, From 139eca47f7ea6bd0ed1a0f7cf196739f87156755 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 15:46:39 -0400 Subject: [PATCH 7/7] Don't push stream msgs in fsps by default --- piker/fsp/_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index dda27ccf..00cccdbd 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -90,7 +90,7 @@ async def fsp_compute( func_name: str, func: Callable, - attach_stream: bool = True, + attach_stream: bool = False, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: