diff --git a/piker/ui/_display.py b/piker/ui/_display.py index bce3a091..432eaf14 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -20,6 +20,7 @@ Real-time display tasks for charting / graphics. ''' from contextlib import asynccontextmanager # from pprint import pformat +from functools import partial import time from types import ModuleType from typing import Optional @@ -341,76 +342,6 @@ def maybe_mk_fsp_shm( return shm, opened -async def open_fspd_cluster( - - linkedsplits: LinkedSplits, - fsps: dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - '''Create financial signal processing sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. - - Pass target entrypoint and historical data. - - ''' - linkedsplits.focus() - - # spawns sub-processes which execute cpu bound fsp work - # which is streamed back to this parent. - async with ( - tractor.open_nursery() as n, - trio.open_nursery() as ln, - ): - - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): - - func_name = conf['func_name'] - - shm, opened = maybe_mk_fsp_shm( - sym, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - conf['shm'] = shm - - portal = await n.start_actor( - enable_modules=['piker.fsp._engine'], - name='fsp.' + display_name, - ) - - # init async - ln.start_soon( - update_chart_from_fsp, - portal, - linkedsplits, - brokermod, - sym, - src_shm, - func_name, - display_name, - conf, - group_status_key, - loglevel, - ) - - # blocks here until all fsp actors complete - - @asynccontextmanager async def open_fsp_sidepane( @@ -477,6 +408,84 @@ async def open_fsp_sidepane( yield sidepane +async def open_fspd_cluster( + + linkedsplits: LinkedSplits, + fsps: dict[str, str], + sym: str, + src_shm: list, + brokermod: ModuleType, + group_status_key: str, + loglevel: str, + + # this con + display_in_own_task: bool = False, + +) -> None: + '''Create sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. + + Pass target entrypoint and historical data. + + ''' + linkedsplits.focus() + + # spawns sub-processes which execute cpu bound fsp work + # which is streamed back to this parent. + async with ( + tractor.open_nursery() as n, + trio.open_nursery() as ln, + ): + + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for display_name, conf in fsps.items(): + + func_name = conf['func_name'] + + shm, opened = maybe_mk_fsp_shm( + sym, + field_name=func_name, + display_name=display_name, + readonly=True, + ) + + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + + # conf['shm'] = shm + + portal = await n.start_actor( + enable_modules=['piker.fsp._engine'], + name='fsp.' + display_name, + ) + + # init async + ln.start_soon( + partial( + update_chart_from_fsp, + + portal, + linkedsplits, + brokermod, + sym, + src_shm, + func_name, + display_name, + conf=conf, + shm=shm, + is_overlay=conf.get('overlay', False), + group_status_key=group_status_key, + loglevel=loglevel, + ) + ) + + # blocks here until all fsp actors complete + + async def update_chart_from_fsp( portal: tractor.Portal, @@ -488,6 +497,10 @@ async def update_chart_from_fsp( func_name: str, display_name: str, conf: dict[str, dict], + + shm: ShmArray, + is_overlay: bool, + group_status_key: str, loglevel: str, @@ -512,7 +525,7 @@ async def update_chart_from_fsp( # name as title of sub-chart brokername=brokermod.name, src_shm_token=src_shm.token, - dst_shm_token=conf['shm'].token, + dst_shm_token=shm.token, symbol=sym, func_name=func_name, loglevel=loglevel, @@ -520,14 +533,9 @@ async def update_chart_from_fsp( ) as (ctx, last_index), ctx.open_stream() as stream, - open_fsp_sidepane( - linkedsplits, - {display_name: conf}, - ) as sidepane, + open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, ): - shm = conf['shm'] - - if conf.get('overlay'): + if is_overlay: chart = linkedsplits.chart chart.draw_curve( name=display_name, @@ -535,10 +543,8 @@ async def update_chart_from_fsp( overlay=True, color='default_light', ) - last_val_sticky = None else: - chart = linkedsplits.add_plot( name=display_name, array=shm.array, @@ -562,30 +568,17 @@ async def update_chart_from_fsp( # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] + array_key = func_name - # read from last calculated value - array = shm.array - - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - value = array[func_name][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.linked.focus() - - # works also for overlays in which case data is looked up from - # internal chart array set.... - chart.update_curve_from_array( + # first UI update, usually from shm pushed history + update_fsp_chart( + chart, + shm, display_name, - shm.array, - array_key=func_name + array_key=array_key, ) - chart.linked.resize_sidepanes() + chart.linked.focus() # TODO: figure out if we can roll our own `FillToThreshold` to # get brush filled polygons for OS/OB conditions. @@ -608,58 +601,85 @@ async def update_chart_from_fsp( chart._set_yrange() - last = time.time() - done() + chart.linked.resize_sidepanes() - # i = 0 # update chart graphics + i = 0 + last = time.time() async for value in stream: # chart isn't actively shown so just skip render cycle if chart.linked.isHidden(): - # print(f'{i} unseen fsp cyclce') - # i += 1 + print(f'{i} unseen fsp cyclce') + i += 1 continue - now = time.time() - period = now - last + else: + now = time.time() + period = now - last - # if period <= 1/30: - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - # print(f'fsp too fast: {1/period}') - continue - - # TODO: provide a read sync mechanism to avoid this polling. - # the underlying issue is that a backfill and subsequent shm - # array first/last index update could result in an empty array - # read here since the stream is never torn down on the - # re-compute steps. - read_tries = 2 - while read_tries > 0: - try: - # read last - array = shm.array - value = array[-1][func_name] - break - - except IndexError: - read_tries -= 1 + if period <= 1/_clear_throttle_rate: + # faster then display refresh rate + print(f'fsp too fast: {1/period}') continue - if last_val_sticky: - last_val_sticky.update_from_data(-1, value) + # run synchronous update + update_fsp_chart( + chart, + shm, + display_name, + array_key=func_name, + ) - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=func_name, - ) + # set time of last graphics update + last = time.time() - # set time of last graphics update - last = time.time() + +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + display_name: str, + array_key: str, + +) -> None: + + array = shm.array + + # XXX: is this a problem any more after porting to the + # ``tractor.Context`` api? + # TODO: provide a read sync mechanism to avoid this polling. the + # underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the stream is never torn down on the re-compute + # steps. + # read_tries = 2 + # while read_tries > 0: + # try: + # # read last + # array = shm.array + # value = array[-1][array_key] + # break + + # except IndexError: + # read_tries -= 1 + # continue + + # update graphics + chart.update_curve_from_array( + display_name, + array, + array_key=array_key, + ) + + last_val_sticky = chart._ysticks.get(display_name) + if last_val_sticky: + # read from last calculated value + # XXX: fsp func names must be unique meaning we don't have + # duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + value = shm.array[array_key][-1] + last_val_sticky.update_from_data(-1, value) async def check_for_new_bars(feed, ohlcv, linkedsplits): @@ -702,6 +722,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): just_history=False, ) + # main chart overlays for name in price_chart._overlays: price_chart.update_curve_from_array( @@ -709,6 +730,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): price_chart._arrays[name] ) + # each subplot for name, chart in linkedsplits.subplots.items(): chart.update_curve_from_array( chart.name,