From 51f951166983f17cc6d2b3789a7a0376bbf8d96d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 Jan 2022 15:24:46 -0500 Subject: [PATCH] Support "volume" and "dollar volume" on same chart This is a huge commit which moves a bunch of code around in order to simplify some of our UI modules as well as support our first official mult-axis chart: overlaid volume and "dollar volume". A good deal of this change set is to make startup fast such that volume data which is often shipped alongside OHLC history is loaded and shown asap and FSPs are loaded in an actor cluster with their graphics overlayed concurrently as each responsible worker generates plottable output. For everything to work this commit requires use of a draft `pyqtgraph` PR: https://github.com/pyqtgraph/pyqtgraph/pull/2162 Change summary: - move remaining FSP actor cluster helpers into `.ui._fsp` mod as well as fsp specific UI managers (`maybe_open_vlm_display()`, `start_fsp_displays()`). - add an `FspAdmin` API for starting fsp chains on the cluster concurrently allowing for future work toward reload/unloading. - bring FSP config dict into `start_fsp_displays()` and `.started()`-deliver both the fsp admin and any volume chart back up to the calling display loop code. ToDo: - repair `ChartView` click-drag interactions - auto-range on $ vlm needs to use `ChartPlotWidget._set_yrange()` - a lot better styling for the $_vlm overlay XD --- piker/fsp/_engine.py | 6 +- piker/ui/_display.py | 381 ++----------------------- piker/ui/_fsp.py | 646 +++++++++++++++++++++++++++++++++++++------ 3 files changed, 593 insertions(+), 440 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index bc55e154..aafaf76c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -186,15 +186,17 @@ async def fsp_compute( async def cascade( ctx: tractor.Context, + + # data feed key brokername: str, + symbol: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], - symbol: str, func_name: str, - zero_on_step: bool = False, + zero_on_step: bool = False, loglevel: Optional[str] = None, ) -> None: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index b4b9618f..3ffcb4b0 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,35 +21,25 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' -from contextlib import asynccontextmanager as acm from functools import partial -from itertools import cycle import time -from types import ModuleType -from typing import Optional, AsyncGenerator +from typing import Optional import numpy as np -import pyqtgraph as pg import tractor import trio from .. import brokers -from .._cacheables import maybe_open_context -from tractor.trionics import gather_contexts from ..data.feed import open_feed, Feed from ._chart import ( ChartPlotWidget, LinkedSplits, GodWidget, ) -from .. import fsp from ._l1 import L1Labels from ._fsp import ( update_fsp_chart, - maybe_mk_fsp_shm, - open_fsp_sidepane, - has_vlm, - maybe_open_vlm_display, + start_fsp_displays, ) from ..data._sharedmem import ShmArray, try_read from ._forms import ( @@ -83,6 +73,10 @@ def chart_maxmin( float, float, ]: + ''' + Compute max and min datums "in view" for range limits. + + ''' # TODO: implement this # https://arxiv.org/abs/cs/0610046 # https://github.com/lemire/pythonmaxmin @@ -110,7 +104,7 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view -async def update_chart_from_quotes( +async def update_linked_charts_graphics( linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -121,7 +115,8 @@ async def update_chart_from_quotes( ) -> None: '''The 'main' (price) chart real-time update loop. - Receive from the quote stream and update the OHLC chart. + Receive from the primary instrument quote stream and update the OHLC + chart. ''' # TODO: bunch of stuff: @@ -231,6 +226,14 @@ async def update_chart_from_quotes( vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) last_mx_vlm = mx_vlm_in_view + for curve_name, shm in vlm_chart._overlays.items(): + update_fsp_chart( + vlm_chart, + shm, + curve_name, + array_key=curve_name, + ) + ticks_frame = quote.get('ticks', ()) frames_by_type: dict[str, dict] = {} @@ -381,284 +384,6 @@ async def update_chart_from_quotes( ) -@acm -async def open_fsp_cluster( - workers: int = 2 - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - from tractor._clustering import open_actor_cluster - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - async with open_actor_cluster( - count=2, - names=['fsp_0', 'fsp_1'], - modules=['piker.fsp._engine'], - ) as cluster_map: - profiler('started fsp cluster') - yield cluster_map - - -@acm -async def maybe_open_fsp_cluster( - workers: int = 2, - **kwargs, - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - kwargs.update( - {'workers': workers} - ) - - async with maybe_open_context( - # for now make a cluster per client? - acm_func=open_fsp_cluster, - kwargs=kwargs, - ) as (cache_hit, cluster_map): - if cache_hit: - log.info('re-using existing fsp cluster') - yield cluster_map - else: - yield cluster_map - - -async def start_fsp_displays( - cluster_map: dict[str, tractor.Portal], - linkedsplits: LinkedSplits, - fsps: dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - ''' - Create sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. - - Pass target entrypoint and historical data. - - ''' - linkedsplits.focus() - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - - async with trio.open_nursery() as n: - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for (display_name, conf), (name, portal) in zip( - fsps.items(), - - # round robin to cluster for now.. - cycle(cluster_map.items()), - ): - func_name = conf['func_name'] - shm, opened = maybe_mk_fsp_shm( - sym, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - - profiler(f'created shm for fsp actor: {display_name}') - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - profiler(f'attached to fsp portal: {display_name}') - - # init async - n.start_soon( - partial( - update_chart_from_fsp, - - portal, - linkedsplits, - brokermod, - sym, - src_shm, - func_name, - display_name, - conf=conf, - shm=shm, - is_overlay=conf.get('overlay', False), - group_status_key=group_status_key, - loglevel=loglevel, - profiler=profiler, - ) - ) - - # blocks here until all fsp actors complete - - -async def update_chart_from_fsp( - portal: tractor.Portal, - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, - src_shm: ShmArray, - func_name: str, - display_name: str, - conf: dict[str, dict], - - shm: ShmArray, - is_overlay: bool, - - group_status_key: str, - loglevel: str, - profiler: pg.debug.Profiler, - -) -> None: - ''' - FSP stream chart update loop. - - This is called once for each entry in the fsp - config map. - - ''' - - profiler(f'started chart task for fsp: {func_name}') - - done = linkedsplits.window().status_bar.open_status( - f'loading fsp, {display_name}..', - group_key=group_status_key, - ) - - async with ( - portal.open_context( - - # chaining entrypoint - fsp.cascade, - - # name as title of sub-chart - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=shm.token, - symbol=sym, - func_name=func_name, - loglevel=loglevel, - zero_on_step=conf.get('zero_on_step', False), - - ) as (ctx, last_index), - ctx.open_stream() as stream, - - open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, - ): - profiler(f'fsp:{func_name} attached to fsp ctx-stream') - - if is_overlay: - chart = linkedsplits.chart - chart.draw_curve( - name=display_name, - data=shm.array, - overlay=True, - color='default_light', - ) - # specially store ref to shm for lookup in display loop - chart._overlays[display_name] = shm - - else: - chart = linkedsplits.add_plot( - name=display_name, - array=shm.array, - - array_key=conf['func_name'], - sidepane=sidepane, - - # curve by default - ohlc=False, - - # settings passed down to ``ChartPlotWidget`` - **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linkedsplits.chart.name - - array_key = func_name - - profiler(f'fsp:{func_name} chart created') - - # first UI update, usually from shm pushed history - update_fsp_chart( - chart, - shm, - display_name, - array_key=array_key, - ) - - chart.linked.focus() - - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) - - if func_name == 'rsi': - from ._lines import level_line - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') - - chart._set_yrange() - done() # status updates - - profiler(f'fsp:{func_name} starting update loop') - profiler.finish() - - # update chart graphics - last = time.time() - async for value in stream: - - # chart isn't actively shown so just skip render cycle - if chart.linked.isHidden(): - continue - - else: - now = time.time() - period = now - last - - if period <= 1/_quote_throttle_rate: - # faster then display refresh rate - print(f'fsp too fast: {1/period}') - continue - - # run synchronous update - update_fsp_chart( - chart, - shm, - display_name, - array_key=func_name, - ) - - # set time of last graphics update - last = time.time() - - async def check_for_new_bars( feed: Feed, ohlcv: np.ndarray, @@ -822,57 +547,6 @@ async def display_symbol_data( # TODO: a data view api that makes this less shit chart._shm = ohlcv - # TODO: eventually we'll support some kind of n-compose syntax - fsp_conf = { - - # 'dolla_vlm': { - # 'func_name': 'dolla_vlm', - # 'zero_on_step': True, - # 'params': { - # 'price_func': { - # 'default_value': 'chl3', - # # tell target ``Edit`` widget to not allow - # # edits for now. - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - # 'chart_kwargs': {'style': 'step'} - # }, - - # 'rsi': { - # 'func_name': 'rsi', # literal python func ref lookup name - - # # map of parameters to place on the fsp sidepane widget - # # which should map to dynamic inputs available to the - # # fsp function at runtime. - # 'params': { - # 'period': { - # 'default_value': 14, - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - - # # ``ChartPlotWidget`` options passthrough - # 'chart_kwargs': { - # 'static_yrange': (0, 100), - # }, - # }, - } - - if has_vlm(ohlcv): # and provider != 'binance': - # binance is too fast atm for FSPs until we wrap - # the fsp streams as throttled ``Feeds``, see - # - - # add VWAP to fsp config for downstream loading - fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': True, - 'anchor': 'session', - }, - }) - # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to # the linked set *before* the main price chart! @@ -880,32 +554,24 @@ async def display_symbol_data( linkedsplits.focus() await trio.sleep(0) - vlm_chart = None - - async with gather_contexts( - ( - trio.open_nursery(), - maybe_open_vlm_display(linkedsplits, ohlcv), - maybe_open_fsp_cluster(), - ) - ) as (ln, vlm_chart, cluster_map): + vlm_chart: Optional[ChartPlotWidget] = None + async with trio.open_nursery() as ln: # load initial fsp chain (otherwise known as "indicators") - ln.start_soon( + admin, vlm_chart = await ln.start( start_fsp_displays, - cluster_map, + linkedsplits, - fsp_conf, + brokermod, sym, ohlcv, - brokermod, loading_sym_key, loglevel, ) # start graphics update loop after receiving first live quote ln.start_soon( - update_chart_from_quotes, + update_linked_charts_graphics, linkedsplits, feed.stream, ohlcv, @@ -913,6 +579,7 @@ async def display_symbol_data( vlm_chart, ) + # start sample step incrementer ln.start_soon( check_for_new_bars, feed, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 7d45fef2..22b99f2c 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -21,12 +21,20 @@ Financial signal processing cluster and real-time graphics management. ''' from contextlib import asynccontextmanager as acm -from typing import Optional +from functools import partial +from itertools import cycle +from types import ModuleType +from typing import Optional, AsyncGenerator, Any import numpy as np from pydantic import create_model import tractor +from tractor.trionics import gather_contexts +import pyqtgraph as pg +import trio +from trio_typing import TaskStatus +from .._cacheables import maybe_open_context from ..data._sharedmem import ( ShmArray, maybe_open_shm_array, @@ -35,8 +43,8 @@ from ..data._sharedmem import ( from ._chart import ( ChartPlotWidget, LinkedSplits, - # GodWidget, ) +from .. import fsp from ._forms import ( FieldsForm, mk_form, @@ -88,85 +96,6 @@ def has_vlm(ohlcv: ShmArray) -> bool: return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) -@acm -async def maybe_open_vlm_display( - linked: LinkedSplits, - ohlcv: ShmArray, - -) -> ChartPlotWidget: - - if not has_vlm(ohlcv): - log.warning(f"{linked.symbol.key} does not seem to have volume info") - yield - return - else: - - shm, opened = maybe_mk_fsp_shm( - linked.symbol.key, - 'vlm', - readonly=True, - ) - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - } - }, - ) as sidepane: - - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, - - array_key='volume', - sidepane=sidepane, - - # curve by default - ohlc=False, - - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - value = shm.array['volume'][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.update_curve_from_array( - 'volume', - shm.array, - ) - - # size view to data once at outset - chart._set_yrange() - - yield chart - - def update_fsp_chart( chart: ChartPlotWidget, shm: ShmArray, @@ -270,3 +199,558 @@ async def open_fsp_sidepane( ) ): yield sidepane + + +@acm +async def open_fsp_actor_cluster( + names: list[str] = ['fsp_0', 'fsp_1'], + +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + from tractor._clustering import open_actor_cluster + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + async with open_actor_cluster( + count=2, + names=names, + modules=['piker.fsp._engine'], + + ) as cluster_map: + profiler('started fsp cluster') + yield cluster_map + + +class FspAdmin: + ''' + Client API for orchestrating FSP actors and displaying + real-time graphics output. + + ''' + def __init__( + self, + tn: trio.Nursery, + cluster: dict[str, tractor.Portal], + + ) -> None: + self.tn = tn + self.cluster = cluster + self._rr_next_actor = cycle(cluster.items()) + self._registry: dict[ + tuple, + tuple[tractor.MsgStream, ShmArray] + ] = {} + + def rr_next_portal(self) -> tractor.Portal: + name, portal = next(self._rr_next_actor) + return portal + + async def open_remote_fsp( + self, + + portal: tractor.Portal, + complete: trio.Event, + started: trio.Event, + + brokername: str, + sym: str, + + src_shm: ShmArray, + dst_shm: ShmArray, + + conf: dict, + func_name: str, + loglevel: str, + + ) -> None: + ''' + Task which opens a remote FSP endpoint in the managed + cluster and sleeps until signalled to exit. + + ''' + async with ( + portal.open_context( + + # chaining entrypoint + fsp.cascade, + + # data feed key + brokername=brokername, + symbol=sym, + + # mems + src_shm_token=src_shm.token, + dst_shm_token=dst_shm.token, + + # target + func_name=func_name, + + loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), + + ) as (ctx, last_index), + ctx.open_stream() as stream, + ): + # register output data + self._registry[(brokername, sym, func_name)] = ( + stream, dst_shm, complete) + + started.set() + + # wait for graceful shutdown signal + await complete.wait() + + async def start_fsp( + self, + + display_name: str, + feed_key: tuple[str, str], + src_shm: ShmArray, + conf: dict[str, dict[str, Any]], + + worker_name: Optional[str] = None, + loglevel: str = 'error', + + ) -> (ShmArray, trio.Event): + + # unpack FSP details from config dict + func_name = conf['func_name'] + + # allocate an output shm array + dst_shm, opened = maybe_mk_fsp_shm( + feed_key, + field_name=func_name, + display_name=display_name, + readonly=True, + ) + if not opened: + raise RuntimeError("Already started FSP {func_name}") + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + complete = trio.Event() + started = trio.Event() + + brokername, sym = feed_key + self.tn.start_soon( + self.open_remote_fsp, + portal, + complete, + started, + + brokername, + sym, + + src_shm, + dst_shm, + + conf, + func_name, + loglevel, + ) + + return dst_shm, started + + +@acm +async def open_fsp_admin( + **kwargs, + +) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: + + async with ( + maybe_open_context( + # for now make a cluster per client? + acm_func=open_fsp_actor_cluster, + kwargs=kwargs, + ) as (cache_hit, cluster_map), + + trio.open_nursery() as tn, + ): + if cache_hit: + log.info('re-using existing fsp cluster') + + admin = FspAdmin(tn, cluster_map) + try: + yield admin + finally: + # terminate all tasks via signals + for key, entry in admin._registry.items(): + _, _, event = entry + event.set() + + +@acm +async def maybe_open_vlm_display( + linked: LinkedSplits, + ohlcv: ShmArray, + +) -> ChartPlotWidget: + ''' + Volume subchart helper. + + Since "volume" is often included directly alongside OHLCV price + data, we don't really need a separate FSP-actor + shm array for it + since it's likely already directly adjacent to OHLC samples from the + data provider. + + ''' + if not has_vlm(ohlcv): + log.warning(f"{linked.symbol.key} does not seem to have volume info") + yield + return + else: + + # shm, opened = maybe_mk_fsp_shm( + # linked.symbol.key, + # 'vlm', + # readonly=True, + # ) + + async with open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + } + }, + ) as sidepane: + + # built-in $vlm + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.update_curve_from_array( + 'volume', + shm.array, + ) + + # size view to data once at outset + chart._set_yrange() + + yield chart + + +async def run_fsp_ui( + + shm: ShmArray, + started: trio.Event, + linkedsplits: LinkedSplits, + func_name: str, + display_name: str, + conf: dict[str, dict], + group_status_key: str, + loglevel: str, + profiler: pg.debug.Profiler, + + _quote_throttle_rate: int = 58, + +) -> None: + ''' + FSP stream chart update loop. + + This is called once for each entry in the fsp + config map. + + ''' + profiler(f'started UI task for fsp: {func_name}') + + async with ( + # side UI for parameters/controls + open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, + ): + await started.wait() + profiler(f'fsp:{func_name} attached to fsp ctx-stream') + + overlay_with = conf.get('overlay', False) + if overlay_with: + if overlay_with == 'ohlc': + chart = linkedsplits.chart + else: + chart = linkedsplits.subplots[overlay_with] + + chart.draw_curve( + name=display_name, + data=shm.array, + overlay=True, + color='default_light', + separate_axes=conf.get('separate_axes', False), + **conf.get('chart_kwargs', {}) + ) + # specially store ref to shm for lookup in display loop + chart._overlays[display_name] = shm + + else: + chart = linkedsplits.add_plot( + name=display_name, + array=shm.array, + + array_key=func_name, + sidepane=sidepane, + + # curve by default + ohlc=False, + + # settings passed down to ``ChartPlotWidget`` + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linkedsplits.chart.name + + array_key = func_name + + profiler(f'fsp:{func_name} chart created') + + # first UI update, usually from shm pushed history + update_fsp_chart( + chart, + shm, + display_name, + array_key=array_key, + ) + + chart.linked.focus() + + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + + # graphics = chart.update_from_array(chart.name, array[func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) + + if func_name == 'rsi': + from ._lines import level_line + # add moveable over-[sold/bought] lines + # and labels only for the 70/30 lines + level_line(chart, 20) + level_line(chart, 30, orient_v='top') + level_line(chart, 70, orient_v='bottom') + level_line(chart, 80, orient_v='top') + + chart._set_yrange() + # done() # status updates + + profiler(f'fsp:{func_name} starting update loop') + profiler.finish() + + # update chart graphics + # last = time.time() + + # XXX: this currently doesn't loop since + # the FSP engine does **not** push updates atm + # since we do graphics update in the main loop + # in ``._display. + # async for value in stream: + # print(value) + + # # chart isn't actively shown so just skip render cycle + # if chart.linked.isHidden(): + # continue + + # else: + # now = time.time() + # period = now - last + + # if period <= 1/_quote_throttle_rate: + # # faster then display refresh rate + # print(f'fsp too fast: {1/period}') + # continue + + # # run synchronous update + # update_fsp_chart( + # chart, + # shm, + # display_name, + # array_key=func_name, + # ) + + # # set time of last graphics update + # last = time.time() + + +async def start_fsp_displays( + + linkedsplits: LinkedSplits, + brokermod: ModuleType, + sym: str, + ohlcv: ShmArray, + group_status_key: str, + loglevel: str, + + task_status: TaskStatus[ + tuple[FspAdmin, 'ChartPlotWidet'] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Create sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. + + Pass target entrypoint and historical data. + + ''' + # TODO: eventually we'll support some kind of n-compose syntax + fsp_conf = { + + # 'rsi': { + # 'func_name': 'rsi', # literal python func ref lookup name + + # # map of parameters to place on the fsp sidepane widget + # # which should map to dynamic inputs available to the + # # fsp function at runtime. + # 'params': { + # 'period': { + # 'default_value': 14, + # 'widget_kwargs': {'readonly': True}, + # }, + # }, + + # # ``ChartPlotWidget`` options passthrough + # 'chart_kwargs': { + # 'static_yrange': (0, 100), + # }, + # }, + } + + if has_vlm(ohlcv): + fsp_conf.update({ + 'vwap': { + 'func_name': 'vwap', + 'overlay': 'ohlc', + 'anchor': 'session', + }, + + 'dolla_vlm': { + + 'func_name': 'dolla_vlm', + 'zero_on_step': True, + 'overlay': 'volume', + 'separate_axes': True, + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + 'chart_kwargs': {'step_mode': True} + }, + + }) + + linkedsplits.focus() + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + + async with gather_contexts(( + + # NOTE: this admin internally opens an actor pool. + open_fsp_admin(), + + trio.open_nursery(), + + # TODO: fast startup of volume overlayed with $_vlm + maybe_open_vlm_display(linkedsplits, ohlcv), + + )) as (admin, n, vlm_chart): + + task_status.started((admin, vlm_chart)) + + for display_name, conf in fsp_conf.items(): + func_name = conf['func_name'] + + done = linkedsplits.window().status_bar.open_status( + f'loading fsp, {display_name}..', + group_key=group_status_key, + ) + + shm, started = await admin.start_fsp( + display_name, + (brokermod.name, sym), + ohlcv, + fsp_conf[display_name], + loglevel, + ) + + profiler(f'created shm for fsp actor: {display_name}') + + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + + profiler(f'attached to fsp portal: {display_name}') + + # init async + n.start_soon( + partial( + run_fsp_ui, + + shm, + started, + linkedsplits, + func_name, + display_name, + + conf=conf, + group_status_key=group_status_key, + loglevel=loglevel, + profiler=profiler, + ) + ) + await started.wait() + done() + + # blocks on nursery until all fsp actors complete