From 549ff4ef119eacea0b01c448fff97e0cf633682a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Sep 2021 12:26:09 -0400 Subject: [PATCH] Factor FSP subplot update code into func This is in prep toward doing fsp graphics updates from the main quotes update loop (where OHLC and volume are done). Updating fsp output from that task should, for the majority of cases, be fine presuming the processing is derived from the quote stream as a source. Further, calling an update function on each fsp subplot/overlay is of course faster then a full task switch - which is how it currently works with a separate stream for every fsp output. This also will let us delay adding full `Feed` support around fsp streams for the moment while still getting quote throttling dictated by the quote stream. Going forward, We can still support a separate task/fsp stream for updates as needed (ex. some kind of fast external data source that isn't synced with price data) but it should be enabled as needed required by the user. --- piker/ui/_display.py | 300 +++++++++++++++++++++++-------------------- 1 file changed, 161 insertions(+), 139 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index f4bfe72a..432eaf14 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -20,6 +20,7 @@ Real-time display tasks for charting / graphics. ''' from contextlib import asynccontextmanager # from pprint import pformat +from functools import partial import time from types import ModuleType from typing import Optional @@ -341,76 +342,6 @@ def maybe_mk_fsp_shm( return shm, opened -async def open_fspd_cluster( - - linkedsplits: LinkedSplits, - fsps: dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - '''Create financial signal processing sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. - - Pass target entrypoint and historical data. - - ''' - linkedsplits.focus() - - # spawns sub-processes which execute cpu bound fsp work - # which is streamed back to this parent. - async with ( - tractor.open_nursery() as n, - trio.open_nursery() as ln, - ): - - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): - - func_name = conf['func_name'] - - shm, opened = maybe_mk_fsp_shm( - sym, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - conf['shm'] = shm - - portal = await n.start_actor( - enable_modules=['piker.fsp._engine'], - name='fsp.' + display_name, - ) - - # init async - ln.start_soon( - update_chart_from_fsp, - portal, - linkedsplits, - brokermod, - sym, - src_shm, - func_name, - display_name, - conf, - group_status_key, - loglevel, - ) - - # blocks here until all fsp actors complete - - @asynccontextmanager async def open_fsp_sidepane( @@ -477,6 +408,84 @@ async def open_fsp_sidepane( yield sidepane +async def open_fspd_cluster( + + linkedsplits: LinkedSplits, + fsps: dict[str, str], + sym: str, + src_shm: list, + brokermod: ModuleType, + group_status_key: str, + loglevel: str, + + # this con + display_in_own_task: bool = False, + +) -> None: + '''Create sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. + + Pass target entrypoint and historical data. + + ''' + linkedsplits.focus() + + # spawns sub-processes which execute cpu bound fsp work + # which is streamed back to this parent. + async with ( + tractor.open_nursery() as n, + trio.open_nursery() as ln, + ): + + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for display_name, conf in fsps.items(): + + func_name = conf['func_name'] + + shm, opened = maybe_mk_fsp_shm( + sym, + field_name=func_name, + display_name=display_name, + readonly=True, + ) + + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + + # conf['shm'] = shm + + portal = await n.start_actor( + enable_modules=['piker.fsp._engine'], + name='fsp.' + display_name, + ) + + # init async + ln.start_soon( + partial( + update_chart_from_fsp, + + portal, + linkedsplits, + brokermod, + sym, + src_shm, + func_name, + display_name, + conf=conf, + shm=shm, + is_overlay=conf.get('overlay', False), + group_status_key=group_status_key, + loglevel=loglevel, + ) + ) + + # blocks here until all fsp actors complete + + async def update_chart_from_fsp( portal: tractor.Portal, @@ -488,6 +497,10 @@ async def update_chart_from_fsp( func_name: str, display_name: str, conf: dict[str, dict], + + shm: ShmArray, + is_overlay: bool, + group_status_key: str, loglevel: str, @@ -512,7 +525,7 @@ async def update_chart_from_fsp( # name as title of sub-chart brokername=brokermod.name, src_shm_token=src_shm.token, - dst_shm_token=conf['shm'].token, + dst_shm_token=shm.token, symbol=sym, func_name=func_name, loglevel=loglevel, @@ -520,14 +533,9 @@ async def update_chart_from_fsp( ) as (ctx, last_index), ctx.open_stream() as stream, - open_fsp_sidepane( - linkedsplits, - {display_name: conf}, - ) as sidepane, + open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, ): - shm = conf['shm'] - - if conf.get('overlay'): + if is_overlay: chart = linkedsplits.chart chart.draw_curve( name=display_name, @@ -535,10 +543,8 @@ async def update_chart_from_fsp( overlay=True, color='default_light', ) - last_val_sticky = None else: - chart = linkedsplits.add_plot( name=display_name, array=shm.array, @@ -562,30 +568,17 @@ async def update_chart_from_fsp( # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] + array_key = func_name - # read from last calculated value - array = shm.array - - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - value = array[func_name][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.linked.focus() - - # works also for overlays in which case data is looked up from - # internal chart array set.... - chart.update_curve_from_array( + # first UI update, usually from shm pushed history + update_fsp_chart( + chart, + shm, display_name, - shm.array, - array_key=func_name + array_key=array_key, ) - chart.linked.resize_sidepanes() + chart.linked.focus() # TODO: figure out if we can roll our own `FillToThreshold` to # get brush filled polygons for OS/OB conditions. @@ -608,58 +601,85 @@ async def update_chart_from_fsp( chart._set_yrange() - last = time.time() - done() + chart.linked.resize_sidepanes() - # i = 0 # update chart graphics + i = 0 + last = time.time() async for value in stream: # chart isn't actively shown so just skip render cycle if chart.linked.isHidden(): - # print(f'{i} unseen fsp cyclce') - # i += 1 + print(f'{i} unseen fsp cyclce') + i += 1 continue - now = time.time() - period = now - last + else: + now = time.time() + period = now - last - # if period <= 1/30: - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - # print(f'fsp too fast: {1/period}') - continue - - # TODO: provide a read sync mechanism to avoid this polling. - # the underlying issue is that a backfill and subsequent shm - # array first/last index update could result in an empty array - # read here since the stream is never torn down on the - # re-compute steps. - read_tries = 2 - while read_tries > 0: - try: - # read last - array = shm.array - value = array[-1][func_name] - break - - except IndexError: - read_tries -= 1 + if period <= 1/_clear_throttle_rate: + # faster then display refresh rate + print(f'fsp too fast: {1/period}') continue - if last_val_sticky: - last_val_sticky.update_from_data(-1, value) + # run synchronous update + update_fsp_chart( + chart, + shm, + display_name, + array_key=func_name, + ) - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=func_name, - ) + # set time of last graphics update + last = time.time() - # set time of last graphics update - last = time.time() + +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + display_name: str, + array_key: str, + +) -> None: + + array = shm.array + + # XXX: is this a problem any more after porting to the + # ``tractor.Context`` api? + # TODO: provide a read sync mechanism to avoid this polling. the + # underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the stream is never torn down on the re-compute + # steps. + # read_tries = 2 + # while read_tries > 0: + # try: + # # read last + # array = shm.array + # value = array[-1][array_key] + # break + + # except IndexError: + # read_tries -= 1 + # continue + + # update graphics + chart.update_curve_from_array( + display_name, + array, + array_key=array_key, + ) + + last_val_sticky = chart._ysticks.get(display_name) + if last_val_sticky: + # read from last calculated value + # XXX: fsp func names must be unique meaning we don't have + # duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + value = shm.array[array_key][-1] + last_val_sticky.update_from_data(-1, value) async def check_for_new_bars(feed, ohlcv, linkedsplits): @@ -702,6 +722,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): just_history=False, ) + # main chart overlays for name in price_chart._overlays: price_chart.update_curve_from_array( @@ -709,6 +730,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): price_chart._arrays[name] ) + # each subplot for name, chart in linkedsplits.subplots.items(): chart.update_curve_from_array( chart.name, @@ -940,7 +962,7 @@ async def display_symbol_data( } - if has_vlm(ohlcv): # and provider != 'binance': + if has_vlm(ohlcv): # and provider != 'binance': # binance is too fast atm for FSPs until we wrap # the fsp streams as throttled ``Feeds``, see #