diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 432eaf14..b51db7d8 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -59,9 +59,58 @@ _clear_throttle_rate: int = 58 # Hz _book_throttle_rate: int = 16 # Hz +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + display_name: str, + array_key: Optional[str], + +) -> None: + + array = shm.array + + # XXX: is this a problem any more after porting to the + # ``tractor.Context`` api or can we remove it? + + # TODO: provide a read sync mechanism to avoid this polling. the + # underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the stream is never torn down on the re-compute + # steps. + # read_tries = 2 + # while read_tries > 0: + # try: + # # read last + # array = shm.array + # value = array[-1][array_key] + # break + + # except IndexError: + # read_tries -= 1 + # continue + + # update graphics + chart.update_curve_from_array( + display_name, + array, + array_key=array_key or display_name, + ) + + last_val_sticky = chart._ysticks.get(display_name) + if last_val_sticky: + # read from last calculated value + # XXX: fsp func names must be unique meaning we don't have + # duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + array = shm.array[array_key] + if len(array): + value = array[-1] + last_val_sticky.update_from_data(-1, value) + + async def update_chart_from_quotes( - chart: ChartPlotWidget, + linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -84,6 +133,8 @@ async def update_chart_from_quotes( # - 1-5 sec bar lookback-autocorrection like tws does? # (would require a background history checker task) + chart = linked.chart + # update last price sticky last_price_sticky = chart._ysticks[chart.name] last_price_sticky.update_from_data( @@ -310,6 +361,33 @@ async def update_chart_from_quotes( last_mx, last_mn = mx, mn + # run synchronous update on all derived fsp subplots + # print(f'subplots: {linked.subplots.keys()}') + for name, subchart in linked.subplots.items(): + update_fsp_chart( + subchart, + subchart._shm, + + # XXX: do we really needs seperate names here? + name, + array_key=name, + ) + + # TODO: all overlays on all subplots.. + + # run synchronous update on all derived overlays + # print(f'overlays: {chart._overlays}') + for name, shm in chart._overlays.items(): + update_fsp_chart( + chart, + shm, + + # XXX: do we really needs seperate names here? + name, + array_key=name, + ) + + def maybe_mk_fsp_shm( sym: str, @@ -318,7 +396,8 @@ def maybe_mk_fsp_shm( readonly: bool = True, ) -> (ShmArray, bool): - '''Allocate a single row shm array for an symbol-fsp pair. + '''Allocate a single row shm array for an symbol-fsp pair if none + exists, otherwise load the shm already existing for that token. ''' uid = tractor.current_actor().uid @@ -436,7 +515,6 @@ async def open_fspd_cluster( tractor.open_nursery() as n, trio.open_nursery() as ln, ): - # Currently we spawn an actor per fsp chain but # likely we'll want to pool them eventually to # scale horizonatlly once cores are used up. @@ -456,8 +534,6 @@ async def open_fspd_cluster( # "feeds". assert opened, f"A chart for {key} likely # already exists?" - # conf['shm'] = shm - portal = await n.start_actor( enable_modules=['piker.fsp._engine'], name='fsp.' + display_name, @@ -543,6 +619,8 @@ async def update_chart_from_fsp( overlay=True, color='default_light', ) + # specially store ref to shm for lookup in display loop + chart._overlays[display_name] = shm else: chart = linkedsplits.add_plot( @@ -636,52 +714,6 @@ async def update_chart_from_fsp( last = time.time() -def update_fsp_chart( - chart: ChartPlotWidget, - shm: ShmArray, - display_name: str, - array_key: str, - -) -> None: - - array = shm.array - - # XXX: is this a problem any more after porting to the - # ``tractor.Context`` api? - # TODO: provide a read sync mechanism to avoid this polling. the - # underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the stream is never torn down on the re-compute - # steps. - # read_tries = 2 - # while read_tries > 0: - # try: - # # read last - # array = shm.array - # value = array[-1][array_key] - # break - - # except IndexError: - # read_tries -= 1 - # continue - - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=array_key, - ) - - last_val_sticky = chart._ysticks.get(display_name) - if last_val_sticky: - # read from last calculated value - # XXX: fsp func names must be unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - value = shm.array[array_key][-1] - last_val_sticky.update_from_data(-1, value) - - async def check_for_new_bars(feed, ohlcv, linkedsplits): """Task which updates from new bars in the shared ohlcv buffer every ``delay_s`` seconds. @@ -1002,7 +1034,7 @@ async def display_symbol_data( # start graphics update loop(s)after receiving first live quote ln.start_soon( update_chart_from_quotes, - chart, + linkedsplits, feed.stream, ohlcv, wap_in_history,