diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index bc55e154..aafaf76c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -186,15 +186,17 @@ async def fsp_compute( async def cascade( ctx: tractor.Context, + + # data feed key brokername: str, + symbol: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], - symbol: str, func_name: str, - zero_on_step: bool = False, + zero_on_step: bool = False, loglevel: Optional[str] = None, ) -> None: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index b4b9618f..3ffcb4b0 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,35 +21,25 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' -from contextlib import asynccontextmanager as acm from functools import partial -from itertools import cycle import time -from types import ModuleType -from typing import Optional, AsyncGenerator +from typing import Optional import numpy as np -import pyqtgraph as pg import tractor import trio from .. import brokers -from .._cacheables import maybe_open_context -from tractor.trionics import gather_contexts from ..data.feed import open_feed, Feed from ._chart import ( ChartPlotWidget, LinkedSplits, GodWidget, ) -from .. import fsp from ._l1 import L1Labels from ._fsp import ( update_fsp_chart, - maybe_mk_fsp_shm, - open_fsp_sidepane, - has_vlm, - maybe_open_vlm_display, + start_fsp_displays, ) from ..data._sharedmem import ShmArray, try_read from ._forms import ( @@ -83,6 +73,10 @@ def chart_maxmin( float, float, ]: + ''' + Compute max and min datums "in view" for range limits. + + ''' # TODO: implement this # https://arxiv.org/abs/cs/0610046 # https://github.com/lemire/pythonmaxmin @@ -110,7 +104,7 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view -async def update_chart_from_quotes( +async def update_linked_charts_graphics( linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -121,7 +115,8 @@ async def update_chart_from_quotes( ) -> None: '''The 'main' (price) chart real-time update loop. - Receive from the quote stream and update the OHLC chart. + Receive from the primary instrument quote stream and update the OHLC + chart. ''' # TODO: bunch of stuff: @@ -231,6 +226,14 @@ async def update_chart_from_quotes( vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) last_mx_vlm = mx_vlm_in_view + for curve_name, shm in vlm_chart._overlays.items(): + update_fsp_chart( + vlm_chart, + shm, + curve_name, + array_key=curve_name, + ) + ticks_frame = quote.get('ticks', ()) frames_by_type: dict[str, dict] = {} @@ -381,284 +384,6 @@ async def update_chart_from_quotes( ) -@acm -async def open_fsp_cluster( - workers: int = 2 - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - from tractor._clustering import open_actor_cluster - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - async with open_actor_cluster( - count=2, - names=['fsp_0', 'fsp_1'], - modules=['piker.fsp._engine'], - ) as cluster_map: - profiler('started fsp cluster') - yield cluster_map - - -@acm -async def maybe_open_fsp_cluster( - workers: int = 2, - **kwargs, - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - kwargs.update( - {'workers': workers} - ) - - async with maybe_open_context( - # for now make a cluster per client? - acm_func=open_fsp_cluster, - kwargs=kwargs, - ) as (cache_hit, cluster_map): - if cache_hit: - log.info('re-using existing fsp cluster') - yield cluster_map - else: - yield cluster_map - - -async def start_fsp_displays( - cluster_map: dict[str, tractor.Portal], - linkedsplits: LinkedSplits, - fsps: dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - ''' - Create sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. - - Pass target entrypoint and historical data. - - ''' - linkedsplits.focus() - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - - async with trio.open_nursery() as n: - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for (display_name, conf), (name, portal) in zip( - fsps.items(), - - # round robin to cluster for now.. - cycle(cluster_map.items()), - ): - func_name = conf['func_name'] - shm, opened = maybe_mk_fsp_shm( - sym, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - - profiler(f'created shm for fsp actor: {display_name}') - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - profiler(f'attached to fsp portal: {display_name}') - - # init async - n.start_soon( - partial( - update_chart_from_fsp, - - portal, - linkedsplits, - brokermod, - sym, - src_shm, - func_name, - display_name, - conf=conf, - shm=shm, - is_overlay=conf.get('overlay', False), - group_status_key=group_status_key, - loglevel=loglevel, - profiler=profiler, - ) - ) - - # blocks here until all fsp actors complete - - -async def update_chart_from_fsp( - portal: tractor.Portal, - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, - src_shm: ShmArray, - func_name: str, - display_name: str, - conf: dict[str, dict], - - shm: ShmArray, - is_overlay: bool, - - group_status_key: str, - loglevel: str, - profiler: pg.debug.Profiler, - -) -> None: - ''' - FSP stream chart update loop. - - This is called once for each entry in the fsp - config map. - - ''' - - profiler(f'started chart task for fsp: {func_name}') - - done = linkedsplits.window().status_bar.open_status( - f'loading fsp, {display_name}..', - group_key=group_status_key, - ) - - async with ( - portal.open_context( - - # chaining entrypoint - fsp.cascade, - - # name as title of sub-chart - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=shm.token, - symbol=sym, - func_name=func_name, - loglevel=loglevel, - zero_on_step=conf.get('zero_on_step', False), - - ) as (ctx, last_index), - ctx.open_stream() as stream, - - open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, - ): - profiler(f'fsp:{func_name} attached to fsp ctx-stream') - - if is_overlay: - chart = linkedsplits.chart - chart.draw_curve( - name=display_name, - data=shm.array, - overlay=True, - color='default_light', - ) - # specially store ref to shm for lookup in display loop - chart._overlays[display_name] = shm - - else: - chart = linkedsplits.add_plot( - name=display_name, - array=shm.array, - - array_key=conf['func_name'], - sidepane=sidepane, - - # curve by default - ohlc=False, - - # settings passed down to ``ChartPlotWidget`` - **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linkedsplits.chart.name - - array_key = func_name - - profiler(f'fsp:{func_name} chart created') - - # first UI update, usually from shm pushed history - update_fsp_chart( - chart, - shm, - display_name, - array_key=array_key, - ) - - chart.linked.focus() - - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) - - if func_name == 'rsi': - from ._lines import level_line - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') - - chart._set_yrange() - done() # status updates - - profiler(f'fsp:{func_name} starting update loop') - profiler.finish() - - # update chart graphics - last = time.time() - async for value in stream: - - # chart isn't actively shown so just skip render cycle - if chart.linked.isHidden(): - continue - - else: - now = time.time() - period = now - last - - if period <= 1/_quote_throttle_rate: - # faster then display refresh rate - print(f'fsp too fast: {1/period}') - continue - - # run synchronous update - update_fsp_chart( - chart, - shm, - display_name, - array_key=func_name, - ) - - # set time of last graphics update - last = time.time() - - async def check_for_new_bars( feed: Feed, ohlcv: np.ndarray, @@ -822,57 +547,6 @@ async def display_symbol_data( # TODO: a data view api that makes this less shit chart._shm = ohlcv - # TODO: eventually we'll support some kind of n-compose syntax - fsp_conf = { - - # 'dolla_vlm': { - # 'func_name': 'dolla_vlm', - # 'zero_on_step': True, - # 'params': { - # 'price_func': { - # 'default_value': 'chl3', - # # tell target ``Edit`` widget to not allow - # # edits for now. - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - # 'chart_kwargs': {'style': 'step'} - # }, - - # 'rsi': { - # 'func_name': 'rsi', # literal python func ref lookup name - - # # map of parameters to place on the fsp sidepane widget - # # which should map to dynamic inputs available to the - # # fsp function at runtime. - # 'params': { - # 'period': { - # 'default_value': 14, - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - - # # ``ChartPlotWidget`` options passthrough - # 'chart_kwargs': { - # 'static_yrange': (0, 100), - # }, - # }, - } - - if has_vlm(ohlcv): # and provider != 'binance': - # binance is too fast atm for FSPs until we wrap - # the fsp streams as throttled ``Feeds``, see - # - - # add VWAP to fsp config for downstream loading - fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': True, - 'anchor': 'session', - }, - }) - # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to # the linked set *before* the main price chart! @@ -880,32 +554,24 @@ async def display_symbol_data( linkedsplits.focus() await trio.sleep(0) - vlm_chart = None - - async with gather_contexts( - ( - trio.open_nursery(), - maybe_open_vlm_display(linkedsplits, ohlcv), - maybe_open_fsp_cluster(), - ) - ) as (ln, vlm_chart, cluster_map): + vlm_chart: Optional[ChartPlotWidget] = None + async with trio.open_nursery() as ln: # load initial fsp chain (otherwise known as "indicators") - ln.start_soon( + admin, vlm_chart = await ln.start( start_fsp_displays, - cluster_map, + linkedsplits, - fsp_conf, + brokermod, sym, ohlcv, - brokermod, loading_sym_key, loglevel, ) # start graphics update loop after receiving first live quote ln.start_soon( - update_chart_from_quotes, + update_linked_charts_graphics, linkedsplits, feed.stream, ohlcv, @@ -913,6 +579,7 @@ async def display_symbol_data( vlm_chart, ) + # start sample step incrementer ln.start_soon( check_for_new_bars, feed, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 7d45fef2..22b99f2c 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -21,12 +21,20 @@ Financial signal processing cluster and real-time graphics management. ''' from contextlib import asynccontextmanager as acm -from typing import Optional +from functools import partial +from itertools import cycle +from types import ModuleType +from typing import Optional, AsyncGenerator, Any import numpy as np from pydantic import create_model import tractor +from tractor.trionics import gather_contexts +import pyqtgraph as pg +import trio +from trio_typing import TaskStatus +from .._cacheables import maybe_open_context from ..data._sharedmem import ( ShmArray, maybe_open_shm_array, @@ -35,8 +43,8 @@ from ..data._sharedmem import ( from ._chart import ( ChartPlotWidget, LinkedSplits, - # GodWidget, ) +from .. import fsp from ._forms import ( FieldsForm, mk_form, @@ -88,85 +96,6 @@ def has_vlm(ohlcv: ShmArray) -> bool: return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) -@acm -async def maybe_open_vlm_display( - linked: LinkedSplits, - ohlcv: ShmArray, - -) -> ChartPlotWidget: - - if not has_vlm(ohlcv): - log.warning(f"{linked.symbol.key} does not seem to have volume info") - yield - return - else: - - shm, opened = maybe_mk_fsp_shm( - linked.symbol.key, - 'vlm', - readonly=True, - ) - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - } - }, - ) as sidepane: - - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, - - array_key='volume', - sidepane=sidepane, - - # curve by default - ohlc=False, - - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - value = shm.array['volume'][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.update_curve_from_array( - 'volume', - shm.array, - ) - - # size view to data once at outset - chart._set_yrange() - - yield chart - - def update_fsp_chart( chart: ChartPlotWidget, shm: ShmArray, @@ -270,3 +199,558 @@ async def open_fsp_sidepane( ) ): yield sidepane + + +@acm +async def open_fsp_actor_cluster( + names: list[str] = ['fsp_0', 'fsp_1'], + +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + from tractor._clustering import open_actor_cluster + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + async with open_actor_cluster( + count=2, + names=names, + modules=['piker.fsp._engine'], + + ) as cluster_map: + profiler('started fsp cluster') + yield cluster_map + + +class FspAdmin: + ''' + Client API for orchestrating FSP actors and displaying + real-time graphics output. + + ''' + def __init__( + self, + tn: trio.Nursery, + cluster: dict[str, tractor.Portal], + + ) -> None: + self.tn = tn + self.cluster = cluster + self._rr_next_actor = cycle(cluster.items()) + self._registry: dict[ + tuple, + tuple[tractor.MsgStream, ShmArray] + ] = {} + + def rr_next_portal(self) -> tractor.Portal: + name, portal = next(self._rr_next_actor) + return portal + + async def open_remote_fsp( + self, + + portal: tractor.Portal, + complete: trio.Event, + started: trio.Event, + + brokername: str, + sym: str, + + src_shm: ShmArray, + dst_shm: ShmArray, + + conf: dict, + func_name: str, + loglevel: str, + + ) -> None: + ''' + Task which opens a remote FSP endpoint in the managed + cluster and sleeps until signalled to exit. + + ''' + async with ( + portal.open_context( + + # chaining entrypoint + fsp.cascade, + + # data feed key + brokername=brokername, + symbol=sym, + + # mems + src_shm_token=src_shm.token, + dst_shm_token=dst_shm.token, + + # target + func_name=func_name, + + loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), + + ) as (ctx, last_index), + ctx.open_stream() as stream, + ): + # register output data + self._registry[(brokername, sym, func_name)] = ( + stream, dst_shm, complete) + + started.set() + + # wait for graceful shutdown signal + await complete.wait() + + async def start_fsp( + self, + + display_name: str, + feed_key: tuple[str, str], + src_shm: ShmArray, + conf: dict[str, dict[str, Any]], + + worker_name: Optional[str] = None, + loglevel: str = 'error', + + ) -> (ShmArray, trio.Event): + + # unpack FSP details from config dict + func_name = conf['func_name'] + + # allocate an output shm array + dst_shm, opened = maybe_mk_fsp_shm( + feed_key, + field_name=func_name, + display_name=display_name, + readonly=True, + ) + if not opened: + raise RuntimeError("Already started FSP {func_name}") + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + complete = trio.Event() + started = trio.Event() + + brokername, sym = feed_key + self.tn.start_soon( + self.open_remote_fsp, + portal, + complete, + started, + + brokername, + sym, + + src_shm, + dst_shm, + + conf, + func_name, + loglevel, + ) + + return dst_shm, started + + +@acm +async def open_fsp_admin( + **kwargs, + +) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: + + async with ( + maybe_open_context( + # for now make a cluster per client? + acm_func=open_fsp_actor_cluster, + kwargs=kwargs, + ) as (cache_hit, cluster_map), + + trio.open_nursery() as tn, + ): + if cache_hit: + log.info('re-using existing fsp cluster') + + admin = FspAdmin(tn, cluster_map) + try: + yield admin + finally: + # terminate all tasks via signals + for key, entry in admin._registry.items(): + _, _, event = entry + event.set() + + +@acm +async def maybe_open_vlm_display( + linked: LinkedSplits, + ohlcv: ShmArray, + +) -> ChartPlotWidget: + ''' + Volume subchart helper. + + Since "volume" is often included directly alongside OHLCV price + data, we don't really need a separate FSP-actor + shm array for it + since it's likely already directly adjacent to OHLC samples from the + data provider. + + ''' + if not has_vlm(ohlcv): + log.warning(f"{linked.symbol.key} does not seem to have volume info") + yield + return + else: + + # shm, opened = maybe_mk_fsp_shm( + # linked.symbol.key, + # 'vlm', + # readonly=True, + # ) + + async with open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + } + }, + ) as sidepane: + + # built-in $vlm + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.update_curve_from_array( + 'volume', + shm.array, + ) + + # size view to data once at outset + chart._set_yrange() + + yield chart + + +async def run_fsp_ui( + + shm: ShmArray, + started: trio.Event, + linkedsplits: LinkedSplits, + func_name: str, + display_name: str, + conf: dict[str, dict], + group_status_key: str, + loglevel: str, + profiler: pg.debug.Profiler, + + _quote_throttle_rate: int = 58, + +) -> None: + ''' + FSP stream chart update loop. + + This is called once for each entry in the fsp + config map. + + ''' + profiler(f'started UI task for fsp: {func_name}') + + async with ( + # side UI for parameters/controls + open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, + ): + await started.wait() + profiler(f'fsp:{func_name} attached to fsp ctx-stream') + + overlay_with = conf.get('overlay', False) + if overlay_with: + if overlay_with == 'ohlc': + chart = linkedsplits.chart + else: + chart = linkedsplits.subplots[overlay_with] + + chart.draw_curve( + name=display_name, + data=shm.array, + overlay=True, + color='default_light', + separate_axes=conf.get('separate_axes', False), + **conf.get('chart_kwargs', {}) + ) + # specially store ref to shm for lookup in display loop + chart._overlays[display_name] = shm + + else: + chart = linkedsplits.add_plot( + name=display_name, + array=shm.array, + + array_key=func_name, + sidepane=sidepane, + + # curve by default + ohlc=False, + + # settings passed down to ``ChartPlotWidget`` + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linkedsplits.chart.name + + array_key = func_name + + profiler(f'fsp:{func_name} chart created') + + # first UI update, usually from shm pushed history + update_fsp_chart( + chart, + shm, + display_name, + array_key=array_key, + ) + + chart.linked.focus() + + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + + # graphics = chart.update_from_array(chart.name, array[func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) + + if func_name == 'rsi': + from ._lines import level_line + # add moveable over-[sold/bought] lines + # and labels only for the 70/30 lines + level_line(chart, 20) + level_line(chart, 30, orient_v='top') + level_line(chart, 70, orient_v='bottom') + level_line(chart, 80, orient_v='top') + + chart._set_yrange() + # done() # status updates + + profiler(f'fsp:{func_name} starting update loop') + profiler.finish() + + # update chart graphics + # last = time.time() + + # XXX: this currently doesn't loop since + # the FSP engine does **not** push updates atm + # since we do graphics update in the main loop + # in ``._display. + # async for value in stream: + # print(value) + + # # chart isn't actively shown so just skip render cycle + # if chart.linked.isHidden(): + # continue + + # else: + # now = time.time() + # period = now - last + + # if period <= 1/_quote_throttle_rate: + # # faster then display refresh rate + # print(f'fsp too fast: {1/period}') + # continue + + # # run synchronous update + # update_fsp_chart( + # chart, + # shm, + # display_name, + # array_key=func_name, + # ) + + # # set time of last graphics update + # last = time.time() + + +async def start_fsp_displays( + + linkedsplits: LinkedSplits, + brokermod: ModuleType, + sym: str, + ohlcv: ShmArray, + group_status_key: str, + loglevel: str, + + task_status: TaskStatus[ + tuple[FspAdmin, 'ChartPlotWidet'] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Create sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. + + Pass target entrypoint and historical data. + + ''' + # TODO: eventually we'll support some kind of n-compose syntax + fsp_conf = { + + # 'rsi': { + # 'func_name': 'rsi', # literal python func ref lookup name + + # # map of parameters to place on the fsp sidepane widget + # # which should map to dynamic inputs available to the + # # fsp function at runtime. + # 'params': { + # 'period': { + # 'default_value': 14, + # 'widget_kwargs': {'readonly': True}, + # }, + # }, + + # # ``ChartPlotWidget`` options passthrough + # 'chart_kwargs': { + # 'static_yrange': (0, 100), + # }, + # }, + } + + if has_vlm(ohlcv): + fsp_conf.update({ + 'vwap': { + 'func_name': 'vwap', + 'overlay': 'ohlc', + 'anchor': 'session', + }, + + 'dolla_vlm': { + + 'func_name': 'dolla_vlm', + 'zero_on_step': True, + 'overlay': 'volume', + 'separate_axes': True, + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + 'chart_kwargs': {'step_mode': True} + }, + + }) + + linkedsplits.focus() + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + + async with gather_contexts(( + + # NOTE: this admin internally opens an actor pool. + open_fsp_admin(), + + trio.open_nursery(), + + # TODO: fast startup of volume overlayed with $_vlm + maybe_open_vlm_display(linkedsplits, ohlcv), + + )) as (admin, n, vlm_chart): + + task_status.started((admin, vlm_chart)) + + for display_name, conf in fsp_conf.items(): + func_name = conf['func_name'] + + done = linkedsplits.window().status_bar.open_status( + f'loading fsp, {display_name}..', + group_key=group_status_key, + ) + + shm, started = await admin.start_fsp( + display_name, + (brokermod.name, sym), + ohlcv, + fsp_conf[display_name], + loglevel, + ) + + profiler(f'created shm for fsp actor: {display_name}') + + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + + profiler(f'attached to fsp portal: {display_name}') + + # init async + n.start_soon( + partial( + run_fsp_ui, + + shm, + started, + linkedsplits, + func_name, + display_name, + + conf=conf, + group_status_key=group_status_key, + loglevel=loglevel, + profiler=profiler, + ) + ) + await started.wait() + done() + + # blocks on nursery until all fsp actors complete