From 404f5d6d23d0ddd8497fe8217ed0ac14b51bb797 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Jan 2022 18:48:47 -0500 Subject: [PATCH] Factor (sub-)chart spawning into a admin method Adds `FspAdmin.open_fsp_chart()` which allows adding a real time graphics display of an fsp's output with different options for where (which chart or make a new one) to place it. Further, - change some method naming, namely the other fsp engine task methods to `.open_chain()` and `.start_engine_task()`. - make `run_fsp_ui()` a lone task function for now with the default config parsing and chart setup logic (and it still includes a buncha commented out stuff for doing graphics update which is now done in the main loop to avoid task switching overhead). - move all vlm related fsp config entries into the `open_vlm_displays()` task for dedicated setup with the fsp admin api such as special auto-yrange handling and graph overlays. - `start_fsp_displays()` is now just a small loop through config entries with synced startup status messages. --- piker/ui/_fsp.py | 787 +++++++++++++++++++++++++---------------------- 1 file changed, 421 insertions(+), 366 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 047d7e80..12fe9a69 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -23,13 +23,12 @@ Financial signal processing cluster and real-time graphics management. from contextlib import asynccontextmanager as acm from functools import partial from itertools import cycle -from types import ModuleType from typing import Optional, AsyncGenerator, Any import numpy as np from pydantic import create_model import tractor -from tractor.trionics import gather_contexts +# from tractor.trionics import gather_contexts import pyqtgraph as pg import trio from trio_typing import TaskStatus @@ -92,8 +91,8 @@ def has_vlm(ohlcv: ShmArray) -> bool: # make sure that the instrument supports volume history # (sometimes this is not the case for some commodities and # derivatives) - volm = ohlcv.array['volume'] - return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) + vlm = ohlcv.array['volume'] + return not bool(np.all(np.isin(vlm, -1)) or np.all(np.isnan(vlm))) def update_fsp_chart( @@ -208,285 +207,42 @@ async def open_fsp_actor_cluster( from tractor._clustering import open_actor_cluster - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) + # profiler = pg.debug.Profiler( + # delayed=False, + # disabled=False + # ) async with open_actor_cluster( count=2, names=names, modules=['piker.fsp._engine'], ) as cluster_map: - profiler('started fsp cluster') + # profiler('started fsp cluster') yield cluster_map -class FspAdmin: - ''' - Client API for orchestrating FSP actors and displaying - real-time graphics output. - - ''' - def __init__( - self, - tn: trio.Nursery, - cluster: dict[str, tractor.Portal], - - ) -> None: - self.tn = tn - self.cluster = cluster - self._rr_next_actor = cycle(cluster.items()) - self._registry: dict[ - tuple, - tuple[tractor.MsgStream, ShmArray] - ] = {} - - def rr_next_portal(self) -> tractor.Portal: - name, portal = next(self._rr_next_actor) - return portal - - async def open_remote_fsp( - self, - - portal: tractor.Portal, - complete: trio.Event, - started: trio.Event, - - brokername: str, - sym: str, - - src_shm: ShmArray, - dst_shm: ShmArray, - - conf: dict, - func_name: str, - loglevel: str, - - ) -> None: - ''' - Task which opens a remote FSP endpoint in the managed - cluster and sleeps until signalled to exit. - - ''' - async with ( - portal.open_context( - - # chaining entrypoint - fsp.cascade, - - # data feed key - brokername=brokername, - symbol=sym, - - # mems - src_shm_token=src_shm.token, - dst_shm_token=dst_shm.token, - - # target - func_name=func_name, - - loglevel=loglevel, - zero_on_step=conf.get('zero_on_step', False), - - ) as (ctx, last_index), - ctx.open_stream() as stream, - ): - # register output data - self._registry[(brokername, sym, func_name)] = ( - stream, dst_shm, complete) - - started.set() - - # wait for graceful shutdown signal - await complete.wait() - - async def start_fsp( - self, - - display_name: str, - feed_key: tuple[str, str], - src_shm: ShmArray, - conf: dict[str, dict[str, Any]], - - worker_name: Optional[str] = None, - loglevel: str = 'error', - - ) -> (ShmArray, trio.Event): - - # unpack FSP details from config dict - func_name = conf['func_name'] - - # allocate an output shm array - dst_shm, opened = maybe_mk_fsp_shm( - feed_key, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - if not opened: - raise RuntimeError("Already started FSP {func_name}") - - portal = self.cluster.get(worker_name) or self.rr_next_portal() - complete = trio.Event() - started = trio.Event() - - brokername, sym = feed_key - self.tn.start_soon( - self.open_remote_fsp, - portal, - complete, - started, - - brokername, - sym, - - src_shm, - dst_shm, - - conf, - func_name, - loglevel, - ) - - return dst_shm, started - - -@acm -async def open_fsp_admin( - **kwargs, - -) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: - - async with ( - maybe_open_context( - # for now make a cluster per client? - acm_func=open_fsp_actor_cluster, - kwargs=kwargs, - ) as (cache_hit, cluster_map), - - trio.open_nursery() as tn, - ): - if cache_hit: - log.info('re-using existing fsp cluster') - - admin = FspAdmin(tn, cluster_map) - try: - yield admin - finally: - # terminate all tasks via signals - for key, entry in admin._registry.items(): - _, _, event = entry - event.set() - - -@acm -async def maybe_open_vlm_display( - linked: LinkedSplits, - ohlcv: ShmArray, - -) -> ChartPlotWidget: - ''' - Volume subchart helper. - - Since "volume" is often included directly alongside OHLCV price - data, we don't really need a separate FSP-actor + shm array for it - since it's likely already directly adjacent to OHLC samples from the - data provider. - - ''' - if not has_vlm(ohlcv): - log.warning(f"{linked.symbol.key} does not seem to have volume info") - yield - return - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - } - }, - ) as sidepane: - - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, - - array_key='volume', - sidepane=sidepane, - - # curve by default - ohlc=False, - - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) - - # show volume units value on LHS (for dinkus) - chart.hideAxis('right') - chart.showAxis('left') - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - value = shm.array['volume'][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.update_curve_from_array( - 'volume', - shm.array, - ) - - # size view to data once at outset - chart.view._set_yrange() - - yield chart - - async def run_fsp_ui( + linkedsplits: LinkedSplits, shm: ShmArray, started: trio.Event, - linkedsplits: LinkedSplits, func_name: str, display_name: str, conf: dict[str, dict], - group_status_key: str, loglevel: str, - profiler: pg.debug.Profiler, - - _quote_throttle_rate: int = 58, + # profiler: pg.debug.Profiler, + # _quote_throttle_rate: int = 58, ) -> None: ''' - FSP stream chart update loop. + Taskf for UI spawning around a ``LinkedSplits`` chart for fsp + related graphics/UX management. - This is called once for each entry in the fsp - config map. + This is normally spawned/called once for each entry in the fsp + config. ''' - profiler(f'started UI task for fsp: {func_name}') + # profiler(f'started UI task for fsp: {func_name}') async with ( # side UI for parameters/controls @@ -496,7 +252,7 @@ async def run_fsp_ui( ) as sidepane, ): await started.wait() - profiler(f'fsp:{func_name} attached to fsp ctx-stream') + # profiler(f'fsp:{func_name} attached to fsp ctx-stream') overlay_with = conf.get('overlay', False) if overlay_with: @@ -518,6 +274,7 @@ async def run_fsp_ui( chart._overlays[display_name] = shm else: + # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( name=display_name, array=shm.array, @@ -530,7 +287,6 @@ async def run_fsp_ui( # settings passed down to ``ChartPlotWidget`` **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), ) # XXX: ONLY for sub-chart fsps, overlays have their @@ -543,7 +299,7 @@ async def run_fsp_ui( array_key = func_name - profiler(f'fsp:{func_name} chart created') + # profiler(f'fsp:{func_name} chart created') # first UI update, usually from shm pushed history update_fsp_chart( @@ -566,20 +322,20 @@ async def run_fsp_ui( # graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setFillLevel(50) - if func_name == 'rsi': - from ._lines import level_line - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') + # if func_name == 'rsi': + # from ._lines import level_line + # # add moveable over-[sold/bought] lines + # # and labels only for the 70/30 lines + # level_line(chart, 20) + # level_line(chart, 30, orient_v='top') + # level_line(chart, 70, orient_v='bottom') + # level_line(chart, 80, orient_v='top') chart.view._set_yrange() # done() # status updates - profiler(f'fsp:{func_name} starting update loop') - profiler.finish() + # profiler(f'fsp:{func_name} starting update loop') + # profiler.finish() # update chart graphics # last = time.time() @@ -616,30 +372,396 @@ async def run_fsp_ui( # last = time.time() +class FspAdmin: + ''' + Client API for orchestrating FSP actors and displaying + real-time graphics output. + + ''' + def __init__( + self, + tn: trio.Nursery, + cluster: dict[str, tractor.Portal], + linked: LinkedSplits, + src_shm: ShmArray, + + ) -> None: + self.tn = tn + self.cluster = cluster + self.linked = linked + self._rr_next_actor = cycle(cluster.items()) + self._registry: dict[ + tuple, + tuple[tractor.MsgStream, ShmArray] + ] = {} + self.src_shm = src_shm + + def rr_next_portal(self) -> tractor.Portal: + name, portal = next(self._rr_next_actor) + return portal + + async def open_chain( + self, + + portal: tractor.Portal, + complete: trio.Event, + started: trio.Event, + dst_shm: ShmArray, + conf: dict, + func_name: str, + loglevel: str, + + ) -> None: + ''' + Task which opens a remote FSP endpoint in the managed + cluster and sleeps until signalled to exit. + + ''' + brokername, sym = self.linked.symbol.front_feed() + async with ( + portal.open_context( + + # chaining entrypoint + fsp.cascade, + + # data feed key + brokername=brokername, + symbol=sym, + + # mems + src_shm_token=self.src_shm.token, + dst_shm_token=dst_shm.token, + + # target + func_name=func_name, + + loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), + + ) as (ctx, last_index), + ctx.open_stream() as stream, + ): + # register output data + self._registry[(brokername, sym, func_name)] = ( + stream, dst_shm, complete) + + started.set() + + # wait for graceful shutdown signal + await complete.wait() + + async def start_engine_task( + self, + + display_name: str, + conf: dict[str, dict[str, Any]], + + worker_name: Optional[str] = None, + loglevel: str = 'error', + + ) -> (ShmArray, trio.Event): + + # unpack FSP details from config dict + func_name = conf['func_name'] + + # allocate an output shm array + dst_shm, opened = maybe_mk_fsp_shm( + self.linked.symbol.front_feed(), + field_name=func_name, + display_name=display_name, + readonly=True, + ) + if not opened: + raise RuntimeError(f'Already started FSP {func_name}') + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + complete = trio.Event() + started = trio.Event() + self.tn.start_soon( + self.open_chain, + + portal, + complete, + started, + dst_shm, + conf, + func_name, + loglevel, + ) + + return dst_shm, started + + async def open_fsp_chart( + self, + display_name: str, + conf: dict, # yeah probably dumb.. + loglevel: str = 'error', + + ) -> (trio.Event, ChartPlotWidget): + + func_name = conf['func_name'] + + shm, started = await self.start_engine_task( + display_name, + conf, + loglevel, + ) + + # init async + self.tn.start_soon( + partial( + run_fsp_ui, + + self.linked, + shm, + started, + func_name, + display_name, + + conf=conf, + loglevel=loglevel, + ) + ) + return started + + +@acm +async def open_fsp_admin( + linked: LinkedSplits, + src_shm: ShmArray, + **kwargs, + +) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: + + async with ( + maybe_open_context( + # for now make a cluster per client? + acm_func=open_fsp_actor_cluster, + kwargs=kwargs, + ) as (cache_hit, cluster_map), + + trio.open_nursery() as tn, + ): + if cache_hit: + log.info('re-using existing fsp cluster') + + admin = FspAdmin( + tn, + cluster_map, + linked, + src_shm, + ) + try: + yield admin + finally: + # terminate all tasks via signals + for key, entry in admin._registry.items(): + _, _, event = entry + event.set() + + +async def open_vlm_displays( + + linked: LinkedSplits, + ohlcv: ShmArray, + dvlm: bool = True, + + task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, + +) -> ChartPlotWidget: + ''' + Volume subchart displays. + + Since "volume" is often included directly alongside OHLCV price + data, we don't really need a separate FSP-actor + shm array for it + since it's likely already directly adjacent to OHLC samples from the + data provider. + + Further only if volume data is detected (it sometimes isn't provided + eg. forex, certain commodities markets) will volume dependent FSPs + be spawned here. + + ''' + async with ( + open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + } + }, + ) as sidepane, + open_fsp_admin(linked, ohlcv) as admin, + ): + # built-in vlm which we plot ASAP since it's + # usually data provided directly with OHLC history. + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) + + # force 0 to always be in view + def maxmin(name) -> tuple[float, float]: + mxmn = chart.maxmin(name=name) + if mxmn: + return 0, mxmn[1] + + return 0, 0 + + chart.view._maxmin = partial(maxmin, name='volume') + + # TODO: fix the x-axis label issue where if you put + # the axis on the left it's totally not lined up... + # show volume units value on LHS (for dinkus) + # chart.hideAxis('right') + # chart.showAxis('left') + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # send back new chart to caller + task_status.started(chart) + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.update_curve_from_array( + 'volume', + shm.array, + ) + + # size view to data once at outset + chart.view._set_yrange() + + if not dvlm: + return + + # spawn and overlay $ vlm on the same subchart + shm, started = await admin.start_engine_task( + 'dolla_vlm', + # linked.symbol.front_feed(), # data-feed symbol key + { # fsp engine conf + 'func_name': 'dolla_vlm', + 'zero_on_step': True, + 'params': { + 'price_func': { + 'default_value': 'chl3', + }, + }, + }, + # loglevel, + ) + # profiler(f'created shm for fsp actor: {display_name}') + + await started.wait() + + pi = chart.overlay_plotitem( + 'dolla_vlm', + ) + # add custom auto range handler + pi.vb._maxmin = partial(maxmin, name='dolla_vlm') + + curve, _ = chart.draw_curve( + + name='dolla_vlm', + data=shm.array, + + array_key='dolla_vlm', + overlay=pi, + color='charcoal', + step_mode=True, + # **conf.get('chart_kwargs', {}) + ) + # TODO: is there a way to "sync" the dual axes such that only + # one curve is needed? + # curve.hide() + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + chart._overlays['dolla_vlm'] = shm + + # XXX: old dict-style config before it was moved into the helper task + # 'dolla_vlm': { + # 'func_name': 'dolla_vlm', + # 'zero_on_step': True, + # 'overlay': 'volume', + # 'separate_axes': True, + # 'params': { + # 'price_func': { + # 'default_value': 'chl3', + # # tell target ``Edit`` widget to not allow + # # edits for now. + # 'widget_kwargs': {'readonly': True}, + # }, + # }, + # 'chart_kwargs': {'step_mode': True} + # }, + + # } + + # built-in vlm fsps + for display_name, conf in { + 'vwap': { + 'func_name': 'vwap', + 'overlay': 'ohlc', # overlays with OHLCV (main) chart + 'anchor': 'session', + }, + }.items(): + started = await admin.open_fsp_chart( + display_name, + conf, + ) + + async def start_fsp_displays( - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, + linked: LinkedSplits, ohlcv: ShmArray, group_status_key: str, loglevel: str, - task_status: TaskStatus[ - tuple[FspAdmin, 'ChartPlotWidet'] - ] = trio.TASK_STATUS_IGNORED, - ) -> None: ''' - Create sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. + Create fsp charts from a config input attached to a local actor + compute cluster. - Pass target entrypoint and historical data. + Pass target entrypoint and historical data via ``ShmArray``. ''' + linked.focus() + # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { - # 'rsi': { # 'func_name': 'rsi', # literal python func ref lookup name @@ -659,99 +781,32 @@ async def start_fsp_displays( # }, # }, } - - if has_vlm(ohlcv): - fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': 'ohlc', - 'anchor': 'session', - }, - - 'dolla_vlm': { - 'func_name': 'dolla_vlm', - 'zero_on_step': True, - 'overlay': 'volume', - 'separate_axes': True, - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - 'chart_kwargs': {'step_mode': True} - }, - - }) - - linkedsplits.focus() - profiler = pg.debug.Profiler( delayed=False, disabled=False ) - async with gather_contexts(( - - # NOTE: this admin internally opens an actor pool. - open_fsp_admin(), - - trio.open_nursery(), - - maybe_open_vlm_display( - linkedsplits, - ohlcv, - ), - - )) as (admin, n, vlm_chart): - - task_status.started((admin, vlm_chart)) + # async with gather_contexts(( + async with ( + # NOTE: this admin internally opens an actor cluster + open_fsp_admin(linked, ohlcv) as admin, + ): + statuses = [] for display_name, conf in fsp_conf.items(): - func_name = conf['func_name'] - - done = linkedsplits.window().status_bar.open_status( + started = await admin.open_fsp_chart( + display_name, + conf, + ) + done = linked.window().status_bar.open_status( f'loading fsp, {display_name}..', group_key=group_status_key, ) + statuses.append((started, done)) - shm, started = await admin.start_fsp( - display_name, - (brokermod.name, sym), - ohlcv, - fsp_conf[display_name], - loglevel, - ) - - profiler(f'created shm for fsp actor: {display_name}') - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - + for fsp_loaded, status_cb in statuses: + await fsp_loaded.wait() profiler(f'attached to fsp portal: {display_name}') - - # init async - n.start_soon( - partial( - run_fsp_ui, - - shm, - started, - linkedsplits, - func_name, - display_name, - - conf=conf, - group_status_key=group_status_key, - loglevel=loglevel, - profiler=profiler, - ) - ) - await started.wait() - done() + status_cb() # blocks on nursery until all fsp actors complete