diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 9c515ce3..c741ba1c 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -505,3 +505,35 @@ def maybe_open_shm_array( # to fail if a block has been allocated # on the OS by someone else. return open_shm_array(key=key, dtype=dtype, **kwargs), True + + +def try_read( + array: np.ndarray + +) -> Optional[np.ndarray]: + ''' + Try to read the last row from a shared mem array or ``None`` + if the array read returns a zero-length array result. + + Can be used to check for backfilling race conditions where an array + is currently being (re-)written by a writer actor but the reader is + unaware and reads during the window where the first and last indexes + are being updated. + + ''' + try: + return array[-1] + except IndexError: + # XXX: race condition with backfilling shm. + # + # the underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the indices may be updated in such a way that + # a read delivers an empty array (though it seems like we + # *should* be able to prevent that?). also, as and alt and + # something we need anyway, maybe there should be some kind of + # signal that a prepend is taking place and this consumer can + # respond (eg. redrawing graphics) accordingly. + + # the array read was emtpy + return None diff --git a/piker/data/_source.py b/piker/data/_source.py index 8ec92dfd..9b9b323d 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -106,6 +106,18 @@ class Symbol(BaseModel): mult = 1 / self.tick_size return round(value * mult) / mult + def front_feed(self) -> tuple[str, str]: + ''' + Return the "current" feed key for this symbol. + + (i.e. the broker + symbol key in a tuple). + + ''' + return ( + list(self.broker_info.keys())[0], + self.key, + ) + @validate_arguments def mk_symbol( diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index bc55e154..aafaf76c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -186,15 +186,17 @@ async def fsp_compute( async def cascade( ctx: tractor.Context, + + # data feed key brokername: str, + symbol: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], - symbol: str, func_name: str, - zero_on_step: bool = False, + zero_on_step: bool = False, loglevel: Optional[str] = None, ) -> None: diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 3ac2cf14..dd665fde 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -19,8 +19,6 @@ High level chart-widget apis. ''' from __future__ import annotations -from functools import partial -from dataclasses import dataclass from typing import Optional from PyQt5 import QtCore, QtWidgets @@ -383,8 +381,9 @@ class LinkedSplits(QWidget): style: str = 'bar', - ) -> 'ChartPlotWidget': - '''Start up and show main (price) chart and all linked subcharts. + ) -> ChartPlotWidget: + ''' + Start up and show main (price) chart and all linked subcharts. The data input struct array must include OHLC fields. @@ -537,7 +536,7 @@ class LinkedSplits(QWidget): ) self.cursor.contents_labels.add_label( cpw, - 'ohlc', + name, anchor_at=('top', 'left'), update_func=ContentsLabel.update_from_ohlc, ) @@ -611,11 +610,11 @@ class LinkedSplits(QWidget): # import pydantic -# class ArrayScene(pydantic.BaseModel): +# class Graphics(pydantic.BaseModel): # ''' # Data-AGGRegate: high level API onto multiple (categorized) -# ``ShmArray``s with high level processing routines mostly for -# graphics summary and display. +# ``ShmArray``s with high level processing routines for +# graphics computations and display. # ''' # arrays: dict[str, np.ndarray] = {} @@ -738,7 +737,7 @@ class ChartPlotWidget(pg.PlotWidget): self.default_view() self.cv.enable_auto_yrange() - self.overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) + self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) def resume_all_feeds(self): for feed in self._feeds.values(): @@ -849,7 +848,7 @@ class ChartPlotWidget(pg.PlotWidget): # adds all bar/candle graphics objects for each data point in # the np array buffer to be drawn on next render cycle - self.addItem(graphics) + self.plotItem.addItem(graphics) # draw after to allow self.scene() to work... graphics.draw_from_data(data) @@ -860,6 +859,57 @@ class ChartPlotWidget(pg.PlotWidget): return graphics, data_key + def overlay_plotitem( + self, + name: str, + + ) -> pg.PlotItem: + # Custom viewbox impl + cv = self.mk_vb(name) + cv.chart = self + + # xaxis = DynamicDateAxis( + # orientation='bottom', + # linkedsplits=self.linked, + # ) + yaxis = PriceAxis( + orientation='right', + linkedsplits=self.linked, + ) + + plotitem = pg.PlotItem( + parent=self.plotItem, + name=name, + enableMenu=False, + viewBox=cv, + axisItems={ + # 'bottom': xaxis, + 'right': yaxis, + }, + default_axes=[], + ) + # plotitem.setAxisItems( + # add_to_layout=False, + # axisItems={ + # 'bottom': xaxis, + # 'right': yaxis, + # }, + # ) + # plotite.hideAxis('right') + # plotite.hideAxis('bottom') + # plotitem.addItem(curve) + cv.enable_auto_yrange() + + # plotitem.enableAutoRange(axis='y') + plotitem.hideButtons() + + self.pi_overlay.add_plotitem( + plotitem, + # only link x-axes, + link_axes=(0,), + ) + return plotitem + def draw_curve( self, @@ -868,7 +918,6 @@ class ChartPlotWidget(pg.PlotWidget): array_key: Optional[str] = None, overlay: bool = False, - separate_axes: bool = False, color: Optional[str] = None, add_label: bool = True, @@ -907,11 +956,11 @@ class ChartPlotWidget(pg.PlotWidget): **pdi_kwargs, ) - # XXX: see explanation for differenct caching modes: + # XXX: see explanation for different caching modes: # https://stackoverflow.com/a/39410081 # seems to only be useful if we don't re-generate the entire # QPainterPath every time - # curve.curve.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + # curve.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) # don't ever use this - it's a colossal nightmare of artefacts # and is disastrous for performance. @@ -921,83 +970,28 @@ class ChartPlotWidget(pg.PlotWidget): self._graphics[name] = curve self._arrays[data_key] = data + pi = self.plotItem + + # TODO: this probably needs its own method? if overlay: # anchor_at = ('bottom', 'left') self._overlays[name] = None - if separate_axes: + if isinstance(overlay, pg.PlotItem): + if overlay not in self.pi_overlay.overlays: + raise RuntimeError( + f'{overlay} must be from `.plotitem_overlay()`' + ) + pi = overlay - # Custom viewbox impl - cv = self.mk_vb(name) - - def maxmin(): - return self.maxmin(name=data_key) - - # ensure view maxmin is computed from correct array - # cv._maxmin = partial(self.maxmin, name=data_key) - - cv._maxmin = maxmin - - cv.chart = self - - # xaxis = DynamicDateAxis( - # orientation='bottom', - # linkedsplits=self.linked, - # ) - yaxis = PriceAxis( - orientation='right', - linkedsplits=self.linked, - ) - - plotitem = pg.PlotItem( - parent=self.plotItem, - name=name, - enableMenu=False, - viewBox=cv, - axisItems={ - # 'bottom': xaxis, - 'right': yaxis, - }, - default_axes=[], - ) - # plotitem.setAxisItems( - # add_to_layout=False, - # axisItems={ - # 'bottom': xaxis, - # 'right': yaxis, - # }, - # ) - # plotite.hideAxis('right') - # plotite.hideAxis('bottom') - plotitem.addItem(curve) - cv.enable_auto_yrange() - - # config - # plotitem.setAutoVisible(y=True) - # plotitem.enableAutoRange(axis='y') - plotitem.hideButtons() - - self.overlay.add_plotitem( - plotitem, - # only link x-axes, - link_axes=(0,), - ) - - else: - # this intnernally calls `PlotItem.addItem()` on the - # graphics object - self.addItem(curve) else: - # this intnernally calls `PlotItem.addItem()` on the - # graphics object - self.addItem(curve) - # anchor_at = ('top', 'left') # TODO: something instead of stickies for overlays # (we need something that avoids clutter on x-axis). self._add_sticky(name, bg_color=color) + pi.addItem(curve) return curve, data_key # TODO: make this a ctx mngr @@ -1036,7 +1030,8 @@ class ChartPlotWidget(pg.PlotWidget): **kwargs, ) -> pg.GraphicsObject: - '''Update the named internal graphics from ``array``. + ''' + Update the named internal graphics from ``array``. ''' self._arrays[self.name] = array @@ -1053,7 +1048,8 @@ class ChartPlotWidget(pg.PlotWidget): **kwargs, ) -> pg.GraphicsObject: - '''Update the named internal graphics from ``array``. + ''' + Update the named internal graphics from ``array``. ''' assert len(array) @@ -1123,7 +1119,7 @@ class ChartPlotWidget(pg.PlotWidget): def get_index(self, time: float) -> int: # TODO: this should go onto some sort of - # data-view strimg thinger..right? + # data-view thinger..right? ohlc = self._shm.array # XXX: not sure why the time is so off here @@ -1178,15 +1174,9 @@ class ChartPlotWidget(pg.PlotWidget): yhigh = np.nanmax(bars['high']) else: - try: - view = bars[name or self.data_key] - except: - breakpoint() - # if self.data_key != 'volume': - # else: - # view = bars + view = bars[name or self.data_key] ylow = np.nanmin(view) yhigh = np.nanmax(view) - # print(f'{(ylow, yhigh)}') + # print(f'{(ylow, yhigh)}') return ylow, yhigh diff --git a/piker/ui/_cursor.py b/piker/ui/_cursor.py index d9a4e45a..3833a123 100644 --- a/piker/ui/_cursor.py +++ b/piker/ui/_cursor.py @@ -24,7 +24,7 @@ from typing import Optional, Callable import inspect import numpy as np import pyqtgraph as pg -from PyQt5 import QtCore, QtGui, QtWidgets +from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import QPointF, QRectF from ._style import ( @@ -43,8 +43,8 @@ log = get_logger(__name__) # latency (in terms of perceived lag in cross hair) so really be sure # there's an improvement if you want to change it! -_mouse_rate_limit = 120 # TODO; should we calc current screen refresh rate? -_debounce_delay = 1 / 40 +_mouse_rate_limit = 58 # TODO; should we calc current screen refresh rate? +_debounce_delay = 1 / 60 _ch_label_opac = 1 @@ -64,7 +64,7 @@ class LineDot(pg.CurvePoint): ) -> None: # scale from dpi aware font size - size = int(_font.px_size * 0.375) + size = int(_font.px_size * 0.375) pg.CurvePoint.__init__( self, @@ -246,12 +246,16 @@ class ContentsLabels: # for name, (label, update) in self._labels.items(): for chart, name, label, update in self._labels: - if not (index >= 0 and index < chart._arrays['ohlc'][-1]['index']): + array = chart._arrays[name] + if not ( + index >= 0 + and index < array[-1]['index'] + ): # out of range print('out of range?') continue - array = chart._arrays[name] + # array = chart._arrays[name] # call provided update func with data point try: @@ -462,12 +466,15 @@ class Cursor(pg.GraphicsObject): def mouseMoved( self, - evt: 'tuple[QMouseEvent]', # noqa - ) -> None: # noqa - """Update horizonal and vertical lines when mouse moves inside + coords: tuple[QPointF], # noqa + + ) -> None: + ''' + Update horizonal and vertical lines when mouse moves inside either the main chart or any indicator subplot. - """ - pos = evt[0] + + ''' + pos = coords[0] # find position inside active plot try: @@ -516,28 +523,37 @@ class Cursor(pg.GraphicsObject): # with cursor movement self.contents_labels.update_labels(ix) + vl_x = ix + line_offset for plot, opts in self.graphics.items(): - # update the chart's "contents" label - # plot.update_contents_labels(ix) - # move the vertical line to the current "center of bar" - opts['vl'].setX(ix + line_offset) + opts['vl'].setX(vl_x) # update all subscribed curve dots for cursor in opts.get('cursors', ()): cursor.setIndex(ix) # update the label on the bottom of the crosshair - if 'bottom' in plot.plotItem.axes: + axes = plot.plotItem.axes + + # TODO: make this an up-front calc that we update + # on axis-widget resize events. + # left axis offset width for calcuating + # absolute x-axis label placement. + left_axis_width = 0 + + if 'bottom' in axes: + + left = axes.get('left') + if left: + left_axis_width = left['item'].width() + + # map back to abs (label-local) coordinates self.xaxis_label.update_label( - - # XXX: requires: - # https://github.com/pyqtgraph/pyqtgraph/pull/1418 - # otherwise gobbles tons of CPU.. - - # map back to abs (label-local) coordinates - abs_pos=plot.mapFromView(QPointF(ix + line_offset, iy)), + abs_pos=( + plot.mapFromView(QPointF(vl_x, iy)) - + QPointF(left_axis_width, 0) + ), value=ix, ) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 657d203a..1e1855bb 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,36 +21,32 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' -from contextlib import asynccontextmanager as acm from functools import partial -from itertools import cycle import time -from types import ModuleType -from typing import Optional, AsyncGenerator +from typing import Optional import numpy as np -from pydantic import create_model -import pyqtgraph as pg import tractor import trio from .. import brokers -from .._cacheables import maybe_open_context -from tractor.trionics import gather_contexts from ..data.feed import open_feed, Feed from ._chart import ( ChartPlotWidget, LinkedSplits, GodWidget, ) -from .. import fsp from ._l1 import L1Labels -from ..data._sharedmem import ShmArray, maybe_open_shm_array +from ._fsp import ( + update_fsp_chart, + start_fsp_displays, + has_vlm, + open_vlm_displays, +) +from ..data._sharedmem import ShmArray, try_read from ._forms import ( FieldsForm, - mk_form, mk_order_pane_layout, - open_form_input_handling, ) from .order_mode import open_order_mode from ..log import get_logger @@ -61,78 +57,6 @@ log = get_logger(__name__) _quote_throttle_rate: int = 58 # Hz -def try_read( - array: np.ndarray - -) -> Optional[np.ndarray]: - ''' - Try to read the last row from a shared mem array or ``None`` - if the array read returns a zero-length array result. - - Can be used to check for backfilling race conditions where an array - is currently being (re-)written by a writer actor but the reader is - unaware and reads during the window where the first and last indexes - are being updated. - - ''' - try: - return array[-1] - except IndexError: - # XXX: race condition with backfilling shm. - # - # the underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the indices may be updated in such a way that - # a read delivers an empty array (though it seems like we - # *should* be able to prevent that?). also, as and alt and - # something we need anyway, maybe there should be some kind of - # signal that a prepend is taking place and this consumer can - # respond (eg. redrawing graphics) accordingly. - - # the array read was emtpy - return None - - -def update_fsp_chart( - chart: ChartPlotWidget, - shm: ShmArray, - graphics_name: str, - array_key: Optional[str], - -) -> None: - - array = shm.array - last_row = try_read(array) - - # guard against unreadable case - if not last_row: - log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') - return - - # update graphics - # NOTE: this does a length check internally which allows it - # staying above the last row check below.. - chart.update_curve_from_array( - graphics_name, - array, - array_key=array_key or graphics_name, - ) - chart.cv._set_yrange() - - # XXX: re: ``array_key``: fsp func names must be unique meaning we - # can't have duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - - # read from last calculated value and update any label - last_val_sticky = chart._ysticks.get(graphics_name) - if last_val_sticky: - # array = shm.array[array_key] - # if len(array): - # value = array[-1] - last = last_row[array_key] - last_val_sticky.update_from_data(-1, last) - - # a working tick-type-classes template _tick_groups = { 'clears': {'trade', 'utrade', 'last'}, @@ -151,6 +75,10 @@ def chart_maxmin( float, float, ]: + ''' + Compute max and min datums "in view" for range limits. + + ''' # TODO: implement this # https://arxiv.org/abs/cs/0610046 # https://github.com/lemire/pythonmaxmin @@ -178,7 +106,7 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view -async def update_chart_from_quotes( +async def update_linked_charts_graphics( linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -189,7 +117,8 @@ async def update_chart_from_quotes( ) -> None: '''The 'main' (price) chart real-time update loop. - Receive from the quote stream and update the OHLC chart. + Receive from the primary instrument quote stream and update the OHLC + chart. ''' # TODO: bunch of stuff: @@ -261,7 +190,10 @@ async def update_chart_from_quotes( if ( quote_period <= 1/_quote_throttle_rate - and quote_rate > _quote_throttle_rate * 1.5 + + # in the absolute worst case we shouldn't see more then + # twice the expected throttle rate right!? + and quote_rate >= _quote_throttle_rate * 1.5 ): log.warning(f'High quote rate {symbol.key}: {quote_rate}') last_quote = now @@ -297,12 +229,20 @@ async def update_chart_from_quotes( mx_vlm_in_view != last_mx_vlm or mx_vlm_in_view > last_mx_vlm ): - print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') vlm_view._set_yrange( yrange=(0, mx_vlm_in_view * 1.375) ) last_mx_vlm = mx_vlm_in_view + for curve_name, shm in vlm_chart._overlays.items(): + update_fsp_chart( + vlm_chart, + shm, + curve_name, + array_key=curve_name, + ) + ticks_frame = quote.get('ticks', ()) frames_by_type: dict[str, dict] = {} @@ -420,7 +360,7 @@ async def update_chart_from_quotes( (mx > last_mx) or (mn < last_mn) and not chart._static_yrange == 'axis' ): - print(f'new y range: {(mn, mx)}') + # print(f'new y range: {(mn, mx)}') view._set_yrange( yrange=(mn, mx), # TODO: we should probably scale @@ -458,382 +398,6 @@ async def update_chart_from_quotes( # chart._set_yrange() -def maybe_mk_fsp_shm( - sym: str, - field_name: str, - display_name: Optional[str] = None, - readonly: bool = True, - -) -> (ShmArray, bool): - ''' - Allocate a single row shm array for an symbol-fsp pair if none - exists, otherwise load the shm already existing for that token. - - ''' - uid = tractor.current_actor().uid - if not display_name: - display_name = field_name - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (field_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) - return shm, opened - - -@acm -async def open_fsp_sidepane( - linked: LinkedSplits, - conf: dict[str, dict[str, str]], - -) -> FieldsForm: - - schema = {} - - assert len(conf) == 1 # for now - - # add (single) selection widget - for display_name, config in conf.items(): - schema[display_name] = { - 'label': '**fsp**:', - 'type': 'select', - 'default_value': [display_name], - } - - # add parameters for selection "options" - params = config.get('params', {}) - for name, config in params.items(): - - default = config['default_value'] - kwargs = config.get('widget_kwargs', {}) - - # add to ORM schema - schema.update({ - name: { - 'label': f'**{name}**:', - 'type': 'edit', - 'default_value': default, - 'kwargs': kwargs, - }, - }) - - sidepane: FieldsForm = mk_form( - parent=linked.godwidget, - fields_schema=schema, - ) - - # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation - FspConfig = create_model( - 'FspConfig', - name=display_name, - **params, - ) - sidepane.model = FspConfig() - - # just a logger for now until we get fsp configs up and running. - async def settings_change(key: str, value: str) -> bool: - print(f'{key}: {value}') - return True - - # TODO: - async with ( - open_form_input_handling( - sidepane, - focus_next=linked.godwidget, - on_value_change=settings_change, - ) - ): - yield sidepane - - -@acm -async def open_fsp_cluster( - workers: int = 2 - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - from tractor._clustering import open_actor_cluster - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - async with open_actor_cluster( - count=2, - names=['fsp_0', 'fsp_1'], - modules=['piker.fsp._engine'], - ) as cluster_map: - profiler('started fsp cluster') - yield cluster_map - - -@acm -async def maybe_open_fsp_cluster( - workers: int = 2, - **kwargs, - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - kwargs.update( - {'workers': workers} - ) - - async with maybe_open_context( - # for now make a cluster per client? - acm_func=open_fsp_cluster, - kwargs=kwargs, - ) as (cache_hit, cluster_map): - if cache_hit: - log.info('re-using existing fsp cluster') - yield cluster_map - else: - yield cluster_map - - -async def start_fsp_displays( - cluster_map: dict[str, tractor.Portal], - linkedsplits: LinkedSplits, - fsps: dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - ''' - Create sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. - - Pass target entrypoint and historical data. - - ''' - linkedsplits.focus() - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - - async with trio.open_nursery() as n: - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for (display_name, conf), (name, portal) in zip( - fsps.items(), - - # round robin to cluster for now.. - cycle(cluster_map.items()), - ): - func_name = conf['func_name'] - shm, opened = maybe_mk_fsp_shm( - sym, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - - profiler(f'created shm for fsp actor: {display_name}') - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - profiler(f'attached to fsp portal: {display_name}') - - # init async - n.start_soon( - partial( - update_chart_from_fsp, - - portal, - linkedsplits, - brokermod, - sym, - src_shm, - func_name, - display_name, - conf=conf, - shm=shm, - is_overlay=conf.get('overlay', False), - group_status_key=group_status_key, - loglevel=loglevel, - profiler=profiler, - ) - ) - - # blocks here until all fsp actors complete - - -async def update_chart_from_fsp( - portal: tractor.Portal, - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, - src_shm: ShmArray, - func_name: str, - display_name: str, - conf: dict[str, dict], - - shm: ShmArray, - is_overlay: bool, - - group_status_key: str, - loglevel: str, - profiler: pg.debug.Profiler, - -) -> None: - ''' - FSP stream chart update loop. - - This is called once for each entry in the fsp - config map. - - ''' - - profiler(f'started chart task for fsp: {func_name}') - - done = linkedsplits.window().status_bar.open_status( - f'loading fsp, {display_name}..', - group_key=group_status_key, - ) - - async with ( - portal.open_context( - - # chaining entrypoint - fsp.cascade, - - # name as title of sub-chart - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=shm.token, - symbol=sym, - func_name=func_name, - loglevel=loglevel, - zero_on_step=conf.get('zero_on_step', False), - - ) as (ctx, last_index), - ctx.open_stream() as stream, - - open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, - ): - profiler(f'fsp:{func_name} attached to fsp ctx-stream') - - if is_overlay: - chart = linkedsplits.chart - chart.draw_curve( - name=display_name, - data=shm.array, - overlay=True, - color='default_light', - ) - # specially store ref to shm for lookup in display loop - chart._overlays[display_name] = shm - - else: - chart = linkedsplits.add_plot( - name=display_name, - array=shm.array, - - array_key=conf['func_name'], - sidepane=sidepane, - - # curve by default - ohlc=False, - - # settings passed down to ``ChartPlotWidget`` - **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linkedsplits.chart.name - - array_key = func_name - - profiler(f'fsp:{func_name} chart created') - - # first UI update, usually from shm pushed history - update_fsp_chart( - chart, - shm, - display_name, - array_key=array_key, - ) - - chart.linked.focus() - - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) - - if func_name == 'rsi': - from ._lines import level_line - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') - - chart.cv._set_yrange() - done() # status updates - - profiler(f'fsp:{func_name} starting update loop') - profiler.finish() - - # update chart graphics - last = time.time() - async for value in stream: - - # chart isn't actively shown so just skip render cycle - if chart.linked.isHidden(): - continue - - else: - now = time.time() - period = now - last - - if period <= 1/_quote_throttle_rate: - # faster then display refresh rate - print(f'fsp too fast: {1/period}') - continue - - # run synchronous update - update_fsp_chart( - chart, - shm, - display_name, - array_key=func_name, - ) - - # set time of last graphics update - last = time.time() - - async def check_for_new_bars( feed: Feed, ohlcv: np.ndarray, @@ -908,93 +472,6 @@ async def check_for_new_bars( price_chart.increment_view() -def has_vlm(ohlcv: ShmArray) -> bool: - # make sure that the instrument supports volume history - # (sometimes this is not the case for some commodities and - # derivatives) - volm = ohlcv.array['volume'] - return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) - - -@acm -async def maybe_open_vlm_display( - linked: LinkedSplits, - ohlcv: ShmArray, - -) -> ChartPlotWidget: - - if not has_vlm(ohlcv): - log.warning(f"{linked.symbol.key} does not seem to have volume info") - yield - return - else: - - shm, opened = maybe_mk_fsp_shm( - linked.symbol.key, - 'vlm', - readonly=True, - ) - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - } - }, - ) as sidepane: - - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, - - array_key='volume', - sidepane=sidepane, - - # curve by default - ohlc=False, - - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - value = shm.array['volume'][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.update_curve_from_array( - 'volume', - shm.array, - ) - - # size view to data once at outset - chart.cv._set_yrange() - - yield chart - - async def display_symbol_data( godwidget: GodWidget, provider: str, @@ -1084,57 +561,6 @@ async def display_symbol_data( # TODO: a data view api that makes this less shit chart._shm = ohlcv - # TODO: eventually we'll support some kind of n-compose syntax - fsp_conf = { - - # 'dolla_vlm': { - # 'func_name': 'dolla_vlm', - # 'zero_on_step': True, - # 'params': { - # 'price_func': { - # 'default_value': 'chl3', - # # tell target ``Edit`` widget to not allow - # # edits for now. - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - # 'chart_kwargs': {'style': 'step'} - # }, - - # 'rsi': { - # 'func_name': 'rsi', # literal python func ref lookup name - - # # map of parameters to place on the fsp sidepane widget - # # which should map to dynamic inputs available to the - # # fsp function at runtime. - # 'params': { - # 'period': { - # 'default_value': 14, - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - - # # ``ChartPlotWidget`` options passthrough - # 'chart_kwargs': { - # 'static_yrange': (0, 100), - # }, - # }, - } - - if has_vlm(ohlcv): # and provider != 'binance': - # binance is too fast atm for FSPs until we wrap - # the fsp streams as throttled ``Feeds``, see - # - - # add VWAP to fsp config for downstream loading - fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': True, - 'anchor': 'session', - }, - }) - # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to # the linked set *before* the main price chart! @@ -1142,32 +568,30 @@ async def display_symbol_data( linkedsplits.focus() await trio.sleep(0) - vlm_chart = None + vlm_chart: Optional[ChartPlotWidget] = None + async with trio.open_nursery() as ln: - async with gather_contexts( - ( - trio.open_nursery(), - maybe_open_vlm_display(linkedsplits, ohlcv), - maybe_open_fsp_cluster(), - ) - ) as (ln, vlm_chart, cluster_map): + # if available load volume related built-in display(s) + if has_vlm(ohlcv): + vlm_chart = await ln.start( + open_vlm_displays, + linkedsplits, + ohlcv, + ) - # load initial fsp chain (otherwise known as "indicators") + # load (user's) FSP set (otherwise known as "indicators") + # from an input config. ln.start_soon( start_fsp_displays, - cluster_map, linkedsplits, - fsp_conf, - sym, ohlcv, - brokermod, loading_sym_key, loglevel, ) # start graphics update loop after receiving first live quote ln.start_soon( - update_chart_from_quotes, + update_linked_charts_graphics, linkedsplits, feed.stream, ohlcv, @@ -1175,6 +599,7 @@ async def display_symbol_data( vlm_chart, ) + # start sample step incrementer ln.start_soon( check_for_new_bars, feed, @@ -1196,5 +621,9 @@ async def display_symbol_data( await trio.sleep(0) linkedsplits.resize_sidepanes() + # TODO: make this not so shit XD + # close group status + sbar._status_groups[loading_sym_key][1]() + # let the app run. await trio.sleep_forever() diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py new file mode 100644 index 00000000..12fe9a69 --- /dev/null +++ b/piker/ui/_fsp.py @@ -0,0 +1,812 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +FSP UI and graphics components. + +Financial signal processing cluster and real-time graphics management. + +''' +from contextlib import asynccontextmanager as acm +from functools import partial +from itertools import cycle +from typing import Optional, AsyncGenerator, Any + +import numpy as np +from pydantic import create_model +import tractor +# from tractor.trionics import gather_contexts +import pyqtgraph as pg +import trio +from trio_typing import TaskStatus + +from .._cacheables import maybe_open_context +from ..data._sharedmem import ( + ShmArray, + maybe_open_shm_array, + try_read, +) +from ._chart import ( + ChartPlotWidget, + LinkedSplits, +) +from .. import fsp +from ._forms import ( + FieldsForm, + mk_form, + open_form_input_handling, +) +from ..log import get_logger + +log = get_logger(__name__) + + +def maybe_mk_fsp_shm( + sym: str, + field_name: str, + display_name: Optional[str] = None, + readonly: bool = True, + +) -> (ShmArray, bool): + ''' + Allocate a single row shm array for an symbol-fsp pair if none + exists, otherwise load the shm already existing for that token. + + ''' + uid = tractor.current_actor().uid + if not display_name: + display_name = field_name + + # TODO: load function here and introspect + # return stream type(s) + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (field_name, float)]) + + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + return shm, opened + + +def has_vlm(ohlcv: ShmArray) -> bool: + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + vlm = ohlcv.array['volume'] + return not bool(np.all(np.isin(vlm, -1)) or np.all(np.isnan(vlm))) + + +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + graphics_name: str, + array_key: Optional[str], + +) -> None: + + array = shm.array + last_row = try_read(array) + + # guard against unreadable case + if not last_row: + log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') + return + + # update graphics + # NOTE: this does a length check internally which allows it + # staying above the last row check below.. + chart.update_curve_from_array( + graphics_name, + array, + array_key=array_key or graphics_name, + ) + + # XXX: re: ``array_key``: fsp func names must be unique meaning we + # can't have duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + + # read from last calculated value and update any label + last_val_sticky = chart._ysticks.get(graphics_name) + if last_val_sticky: + # array = shm.array[array_key] + # if len(array): + # value = array[-1] + last = last_row[array_key] + last_val_sticky.update_from_data(-1, last) + + +@acm +async def open_fsp_sidepane( + linked: LinkedSplits, + conf: dict[str, dict[str, str]], + +) -> FieldsForm: + + schema = {} + + assert len(conf) == 1 # for now + + # add (single) selection widget + for display_name, config in conf.items(): + schema[display_name] = { + 'label': '**fsp**:', + 'type': 'select', + 'default_value': [display_name], + } + + # add parameters for selection "options" + params = config.get('params', {}) + for name, config in params.items(): + + default = config['default_value'] + kwargs = config.get('widget_kwargs', {}) + + # add to ORM schema + schema.update({ + name: { + 'label': f'**{name}**:', + 'type': 'edit', + 'default_value': default, + 'kwargs': kwargs, + }, + }) + + sidepane: FieldsForm = mk_form( + parent=linked.godwidget, + fields_schema=schema, + ) + + # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation + FspConfig = create_model( + 'FspConfig', + name=display_name, + **params, + ) + sidepane.model = FspConfig() + + # just a logger for now until we get fsp configs up and running. + async def settings_change(key: str, value: str) -> bool: + print(f'{key}: {value}') + return True + + # TODO: + async with ( + open_form_input_handling( + sidepane, + focus_next=linked.godwidget, + on_value_change=settings_change, + ) + ): + yield sidepane + + +@acm +async def open_fsp_actor_cluster( + names: list[str] = ['fsp_0', 'fsp_1'], + +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + from tractor._clustering import open_actor_cluster + + # profiler = pg.debug.Profiler( + # delayed=False, + # disabled=False + # ) + async with open_actor_cluster( + count=2, + names=names, + modules=['piker.fsp._engine'], + + ) as cluster_map: + # profiler('started fsp cluster') + yield cluster_map + + +async def run_fsp_ui( + + linkedsplits: LinkedSplits, + shm: ShmArray, + started: trio.Event, + func_name: str, + display_name: str, + conf: dict[str, dict], + loglevel: str, + # profiler: pg.debug.Profiler, + # _quote_throttle_rate: int = 58, + +) -> None: + ''' + Taskf for UI spawning around a ``LinkedSplits`` chart for fsp + related graphics/UX management. + + This is normally spawned/called once for each entry in the fsp + config. + + ''' + # profiler(f'started UI task for fsp: {func_name}') + + async with ( + # side UI for parameters/controls + open_fsp_sidepane( + linkedsplits, + {display_name: conf}, + ) as sidepane, + ): + await started.wait() + # profiler(f'fsp:{func_name} attached to fsp ctx-stream') + + overlay_with = conf.get('overlay', False) + if overlay_with: + if overlay_with == 'ohlc': + chart = linkedsplits.chart + else: + chart = linkedsplits.subplots[overlay_with] + + chart.draw_curve( + name=display_name, + data=shm.array, + overlay=True, + color='default_light', + array_key=func_name, + separate_axes=conf.get('separate_axes', False), + **conf.get('chart_kwargs', {}) + ) + # specially store ref to shm for lookup in display loop + chart._overlays[display_name] = shm + + else: + # create a new sub-chart widget for this fsp + chart = linkedsplits.add_plot( + name=display_name, + array=shm.array, + + array_key=func_name, + sidepane=sidepane, + + # curve by default + ohlc=False, + + # settings passed down to ``ChartPlotWidget`` + **conf.get('chart_kwargs', {}) + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linkedsplits.chart.name + + array_key = func_name + + # profiler(f'fsp:{func_name} chart created') + + # first UI update, usually from shm pushed history + update_fsp_chart( + chart, + shm, + display_name, + array_key=array_key, + ) + + chart.linked.focus() + + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + + # graphics = chart.update_from_array(chart.name, array[func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) + + # if func_name == 'rsi': + # from ._lines import level_line + # # add moveable over-[sold/bought] lines + # # and labels only for the 70/30 lines + # level_line(chart, 20) + # level_line(chart, 30, orient_v='top') + # level_line(chart, 70, orient_v='bottom') + # level_line(chart, 80, orient_v='top') + + chart.view._set_yrange() + # done() # status updates + + # profiler(f'fsp:{func_name} starting update loop') + # profiler.finish() + + # update chart graphics + # last = time.time() + + # XXX: this currently doesn't loop since + # the FSP engine does **not** push updates atm + # since we do graphics update in the main loop + # in ``._display. + # async for value in stream: + # print(value) + + # # chart isn't actively shown so just skip render cycle + # if chart.linked.isHidden(): + # continue + + # else: + # now = time.time() + # period = now - last + + # if period <= 1/_quote_throttle_rate: + # # faster then display refresh rate + # print(f'fsp too fast: {1/period}') + # continue + + # # run synchronous update + # update_fsp_chart( + # chart, + # shm, + # display_name, + # array_key=func_name, + # ) + + # # set time of last graphics update + # last = time.time() + + +class FspAdmin: + ''' + Client API for orchestrating FSP actors and displaying + real-time graphics output. + + ''' + def __init__( + self, + tn: trio.Nursery, + cluster: dict[str, tractor.Portal], + linked: LinkedSplits, + src_shm: ShmArray, + + ) -> None: + self.tn = tn + self.cluster = cluster + self.linked = linked + self._rr_next_actor = cycle(cluster.items()) + self._registry: dict[ + tuple, + tuple[tractor.MsgStream, ShmArray] + ] = {} + self.src_shm = src_shm + + def rr_next_portal(self) -> tractor.Portal: + name, portal = next(self._rr_next_actor) + return portal + + async def open_chain( + self, + + portal: tractor.Portal, + complete: trio.Event, + started: trio.Event, + dst_shm: ShmArray, + conf: dict, + func_name: str, + loglevel: str, + + ) -> None: + ''' + Task which opens a remote FSP endpoint in the managed + cluster and sleeps until signalled to exit. + + ''' + brokername, sym = self.linked.symbol.front_feed() + async with ( + portal.open_context( + + # chaining entrypoint + fsp.cascade, + + # data feed key + brokername=brokername, + symbol=sym, + + # mems + src_shm_token=self.src_shm.token, + dst_shm_token=dst_shm.token, + + # target + func_name=func_name, + + loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), + + ) as (ctx, last_index), + ctx.open_stream() as stream, + ): + # register output data + self._registry[(brokername, sym, func_name)] = ( + stream, dst_shm, complete) + + started.set() + + # wait for graceful shutdown signal + await complete.wait() + + async def start_engine_task( + self, + + display_name: str, + conf: dict[str, dict[str, Any]], + + worker_name: Optional[str] = None, + loglevel: str = 'error', + + ) -> (ShmArray, trio.Event): + + # unpack FSP details from config dict + func_name = conf['func_name'] + + # allocate an output shm array + dst_shm, opened = maybe_mk_fsp_shm( + self.linked.symbol.front_feed(), + field_name=func_name, + display_name=display_name, + readonly=True, + ) + if not opened: + raise RuntimeError(f'Already started FSP {func_name}') + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + complete = trio.Event() + started = trio.Event() + self.tn.start_soon( + self.open_chain, + + portal, + complete, + started, + dst_shm, + conf, + func_name, + loglevel, + ) + + return dst_shm, started + + async def open_fsp_chart( + self, + display_name: str, + conf: dict, # yeah probably dumb.. + loglevel: str = 'error', + + ) -> (trio.Event, ChartPlotWidget): + + func_name = conf['func_name'] + + shm, started = await self.start_engine_task( + display_name, + conf, + loglevel, + ) + + # init async + self.tn.start_soon( + partial( + run_fsp_ui, + + self.linked, + shm, + started, + func_name, + display_name, + + conf=conf, + loglevel=loglevel, + ) + ) + return started + + +@acm +async def open_fsp_admin( + linked: LinkedSplits, + src_shm: ShmArray, + **kwargs, + +) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: + + async with ( + maybe_open_context( + # for now make a cluster per client? + acm_func=open_fsp_actor_cluster, + kwargs=kwargs, + ) as (cache_hit, cluster_map), + + trio.open_nursery() as tn, + ): + if cache_hit: + log.info('re-using existing fsp cluster') + + admin = FspAdmin( + tn, + cluster_map, + linked, + src_shm, + ) + try: + yield admin + finally: + # terminate all tasks via signals + for key, entry in admin._registry.items(): + _, _, event = entry + event.set() + + +async def open_vlm_displays( + + linked: LinkedSplits, + ohlcv: ShmArray, + dvlm: bool = True, + + task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, + +) -> ChartPlotWidget: + ''' + Volume subchart displays. + + Since "volume" is often included directly alongside OHLCV price + data, we don't really need a separate FSP-actor + shm array for it + since it's likely already directly adjacent to OHLC samples from the + data provider. + + Further only if volume data is detected (it sometimes isn't provided + eg. forex, certain commodities markets) will volume dependent FSPs + be spawned here. + + ''' + async with ( + open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + } + }, + ) as sidepane, + open_fsp_admin(linked, ohlcv) as admin, + ): + # built-in vlm which we plot ASAP since it's + # usually data provided directly with OHLC history. + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) + + # force 0 to always be in view + def maxmin(name) -> tuple[float, float]: + mxmn = chart.maxmin(name=name) + if mxmn: + return 0, mxmn[1] + + return 0, 0 + + chart.view._maxmin = partial(maxmin, name='volume') + + # TODO: fix the x-axis label issue where if you put + # the axis on the left it's totally not lined up... + # show volume units value on LHS (for dinkus) + # chart.hideAxis('right') + # chart.showAxis('left') + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # send back new chart to caller + task_status.started(chart) + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.update_curve_from_array( + 'volume', + shm.array, + ) + + # size view to data once at outset + chart.view._set_yrange() + + if not dvlm: + return + + # spawn and overlay $ vlm on the same subchart + shm, started = await admin.start_engine_task( + 'dolla_vlm', + # linked.symbol.front_feed(), # data-feed symbol key + { # fsp engine conf + 'func_name': 'dolla_vlm', + 'zero_on_step': True, + 'params': { + 'price_func': { + 'default_value': 'chl3', + }, + }, + }, + # loglevel, + ) + # profiler(f'created shm for fsp actor: {display_name}') + + await started.wait() + + pi = chart.overlay_plotitem( + 'dolla_vlm', + ) + # add custom auto range handler + pi.vb._maxmin = partial(maxmin, name='dolla_vlm') + + curve, _ = chart.draw_curve( + + name='dolla_vlm', + data=shm.array, + + array_key='dolla_vlm', + overlay=pi, + color='charcoal', + step_mode=True, + # **conf.get('chart_kwargs', {}) + ) + # TODO: is there a way to "sync" the dual axes such that only + # one curve is needed? + # curve.hide() + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + chart._overlays['dolla_vlm'] = shm + + # XXX: old dict-style config before it was moved into the helper task + # 'dolla_vlm': { + # 'func_name': 'dolla_vlm', + # 'zero_on_step': True, + # 'overlay': 'volume', + # 'separate_axes': True, + # 'params': { + # 'price_func': { + # 'default_value': 'chl3', + # # tell target ``Edit`` widget to not allow + # # edits for now. + # 'widget_kwargs': {'readonly': True}, + # }, + # }, + # 'chart_kwargs': {'step_mode': True} + # }, + + # } + + # built-in vlm fsps + for display_name, conf in { + 'vwap': { + 'func_name': 'vwap', + 'overlay': 'ohlc', # overlays with OHLCV (main) chart + 'anchor': 'session', + }, + }.items(): + started = await admin.open_fsp_chart( + display_name, + conf, + ) + + +async def start_fsp_displays( + + linked: LinkedSplits, + ohlcv: ShmArray, + group_status_key: str, + loglevel: str, + +) -> None: + ''' + Create fsp charts from a config input attached to a local actor + compute cluster. + + Pass target entrypoint and historical data via ``ShmArray``. + + ''' + linked.focus() + + # TODO: eventually we'll support some kind of n-compose syntax + fsp_conf = { + # 'rsi': { + # 'func_name': 'rsi', # literal python func ref lookup name + + # # map of parameters to place on the fsp sidepane widget + # # which should map to dynamic inputs available to the + # # fsp function at runtime. + # 'params': { + # 'period': { + # 'default_value': 14, + # 'widget_kwargs': {'readonly': True}, + # }, + # }, + + # # ``ChartPlotWidget`` options passthrough + # 'chart_kwargs': { + # 'static_yrange': (0, 100), + # }, + # }, + } + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + + # async with gather_contexts(( + async with ( + + # NOTE: this admin internally opens an actor cluster + open_fsp_admin(linked, ohlcv) as admin, + ): + statuses = [] + for display_name, conf in fsp_conf.items(): + started = await admin.open_fsp_chart( + display_name, + conf, + ) + done = linked.window().status_bar.open_status( + f'loading fsp, {display_name}..', + group_key=group_status_key, + ) + statuses.append((started, done)) + + for fsp_loaded, status_cb in statuses: + await fsp_loaded.wait() + profiler(f'attached to fsp portal: {display_name}') + status_cb() + + # blocks on nursery until all fsp actors complete diff --git a/piker/ui/_window.py b/piker/ui/_window.py index 17c97411..6a0c1d8b 100644 --- a/piker/ui/_window.py +++ b/piker/ui/_window.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -25,7 +25,7 @@ from typing import Callable, Optional, Union import uuid from pyqtgraph import QtGui -from PyQt5 import QtCore, QtWidgets +from PyQt5 import QtCore from PyQt5.QtWidgets import QLabel, QStatusBar from ..log import get_logger @@ -55,7 +55,8 @@ class MultiStatus: group_key: Optional[Union[bool, str]] = False, ) -> Union[Callable[..., None], str]: - '''Add a status to the status bar and return a close callback which + ''' + Add a status to the status bar and return a close callback which when called will remove the status ``msg``. ''' @@ -137,7 +138,8 @@ class MultiStatus: return ret def render(self) -> None: - '''Display all open statuses to bar. + ''' + Display all open statuses to bar. ''' if self.statuses: