diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index dd713a04..780262fc 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -276,6 +276,8 @@ class ChartnPane(QFrame): hbox.setContentsMargins(0, 0, 0, 0) hbox.setSpacing(3) + # self.setMaximumWidth() + class LinkedSplits(QWidget): ''' @@ -339,7 +341,8 @@ class LinkedSplits(QWidget): def set_split_sizes( self, - prop: float = 0.375 # proportion allocated to consumer subcharts + # prop: float = 0.375, # proportion allocated to consumer subcharts + prop: float = 5/8, ) -> None: '''Set the proportion of space allocated for linked subcharts. @@ -450,7 +453,6 @@ class LinkedSplits(QWidget): self.xaxis = xaxis qframe = ChartnPane(sidepane=sidepane, parent=self.splitter) - cpw = ChartPlotWidget( # this name will be used to register the primary @@ -522,10 +524,10 @@ class LinkedSplits(QWidget): # track by name self.subplots[name] = cpw - if sidepane: - # TODO: use a "panes" collection to manage this? - sidepane.setMinimumWidth(self.chart.sidepane.width()) - sidepane.setMaximumWidth(self.chart.sidepane.width()) + # if sidepane: + # # TODO: use a "panes" collection to manage this? + # qframe.setMaximumWidth(self.chart.sidepane.width()) + # qframe.setMinimumWidth(self.chart.sidepane.width()) self.splitter.addWidget(qframe) @@ -537,6 +539,16 @@ class LinkedSplits(QWidget): return cpw + def resize_sidepanes( + self, + ) -> None: + '''Size all sidepanes based on the OHLC "main" plot. + + ''' + for name, cpw in self.subplots.items(): + cpw.sidepane.setMinimumWidth(self.chart.sidepane.width()) + cpw.sidepane.setMaximumWidth(self.chart.sidepane.width()) + class ChartPlotWidget(pg.PlotWidget): ''' @@ -681,9 +693,9 @@ class ChartPlotWidget(pg.PlotWidget): """Return a range tuple for the bars present in view. """ l, r = self.view_range() - a = self._arrays['ohlc'] - lbar = max(l, a[0]['index']) - rbar = min(r, a[-1]['index']) + array = self._arrays['ohlc'] + lbar = max(l, array[0]['index']) + rbar = min(r, array[-1]['index']) return l, lbar, rbar, r def default_view( @@ -991,22 +1003,19 @@ class ChartPlotWidget(pg.PlotWidget): a = self._arrays['ohlc'] ifirst = a[0]['index'] bars = a[lbar - ifirst:rbar - ifirst + 1] - if not len(bars): # likely no data loaded yet or extreme scrolling? log.error(f"WTF bars_range = {lbar}:{rbar}") return - # TODO: should probably just have some kinda attr mark - # that determines this behavior based on array type - try: + if self.data_key != self.linked.symbol.key: + bars = a[self.data_key] + ylow = np.nanmin(bars) + yhigh = np.nanmax((bars)) + else: + # just the std ohlc bars ylow = np.nanmin(bars['low']) yhigh = np.nanmax(bars['high']) - except (IndexError, ValueError): - # likely non-ohlc array? - bars = bars[self.name] - ylow = np.nanmin(bars) - yhigh = np.nanmax(bars) if set_range: # view margins: stay within a % of the "true range" diff --git a/piker/ui/_display.py b/piker/ui/_display.py index dcc04f47..254fee76 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -18,6 +18,7 @@ Real-time display tasks for charting / graphics. ''' +from contextlib import asynccontextmanager import time from typing import Any from types import ModuleType @@ -264,7 +265,7 @@ async def chart_from_quotes( last_mx, last_mn = mx, mn -async def spawn_fsps( +async def fan_out_spawn_fsp_daemons( linkedsplits: LinkedSplits, fsps: dict[str, str], @@ -275,109 +276,93 @@ async def spawn_fsps( loglevel: str, ) -> None: - """Start financial signal processing in subactor. + '''Create financial signal processing sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. Pass target entrypoint and historical data. - """ - + ''' linkedsplits.focus() uid = tractor.current_actor().uid # spawns sub-processes which execute cpu bound FSP code - async with tractor.open_nursery(loglevel=loglevel) as n: + async with ( + tractor.open_nursery() as n, + trio.open_nursery() as ln, + ): - # spawns local task that consume and chart data streams from - # sub-procs - async with trio.open_nursery() as ln: + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for display_name, conf in fsps.items(): - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): + fsp_func_name = conf['fsp_func_name'] - fsp_func_name = conf['fsp_func_name'] + # TODO: load function here and introspect + # return stream type(s) - # TODO: load function here and introspect - # return stream type(s) + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + # this is all sync currently + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) - # this is all sync currently - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" + conf['shm'] = shm - conf['shm'] = shm + portal = await n.start_actor( + enable_modules=['piker.fsp'], + name='fsp.' + display_name, + ) - portal = await n.start_actor( - enable_modules=['piker.fsp'], - name='fsp.' + display_name, - ) + # init async + ln.start_soon( + run_fsp, + portal, + linkedsplits, + brokermod, + sym, + src_shm, + fsp_func_name, + display_name, + conf, + group_status_key, + loglevel, + ) - # init async - ln.start_soon( - run_fsp, - portal, - linkedsplits, - brokermod, - sym, - src_shm, - fsp_func_name, - display_name, - conf, - group_status_key, - ) - - # blocks here until all fsp actors complete + # blocks here until all fsp actors complete -async def run_fsp( +class FspConfig(BaseModel): + class Config: + validate_assignment = True - portal: tractor._portal.Portal, - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, - src_shm: ShmArray, - fsp_func_name: str, + name: str + period: int + + +@asynccontextmanager +async def open_sidepane( + + linked: LinkedSplits, display_name: str, - conf: dict[str, Any], - group_status_key: str, -) -> None: - """FSP stream chart update loop. - - This is called once for each entry in the fsp - config map. - """ - done = linkedsplits.window().status_bar.open_status( - f'loading fsp, {display_name}..', - group_key=group_status_key, - ) - - # make sidepane config widget - class FspConfig(BaseModel): - - class Config: - validate_assignment = True - - name: str - period: int +) -> FspConfig: sidepane: FieldsForm = mk_form( - parent=linkedsplits.godwidget, + parent=linked.godwidget, fields_schema={ 'name': { 'label': '**fsp**:', @@ -386,6 +371,8 @@ async def run_fsp( f'{display_name}' ], }, + + # TODO: generate this from input map 'period': { 'label': '**period**:', 'type': 'edit', @@ -403,10 +390,46 @@ async def run_fsp( print(f'{key}: {value}') return True + # TODO: + async with ( + open_form_input_handling( + sidepane, + focus_next=linked.godwidget, + on_value_change=settings_change, + ) + ): + yield sidepane + + +async def run_fsp( + + portal: tractor._portal.Portal, + linkedsplits: LinkedSplits, + brokermod: ModuleType, + sym: str, + src_shm: ShmArray, + fsp_func_name: str, + display_name: str, + conf: dict[str, Any], + group_status_key: str, + loglevel: str, + +) -> None: + '''FSP stream chart update loop. + + This is called once for each entry in the fsp + config map. + + ''' + done = linkedsplits.window().status_bar.open_status( + f'loading fsp, {display_name}..', + group_key=group_status_key, + ) + async with ( portal.open_stream_from( - # subactor entrypoint + # chaining entrypoint fsp.cascade, # name as title of sub-chart @@ -415,15 +438,14 @@ async def run_fsp( dst_shm_token=conf['shm'].token, symbol=sym, fsp_func_name=fsp_func_name, + loglevel=loglevel, ) as stream, - # TODO: - open_form_input_handling( - sidepane, - focus_next=linkedsplits.godwidget, - on_value_change=settings_change, - ), + open_sidepane( + linkedsplits, + display_name, + ) as sidepane, ): # receive last index for processed historical @@ -472,7 +494,7 @@ async def run_fsp( # read from last calculated value array = shm.array - # XXX: fsp func names are unique meaning we don't have + # XXX: fsp func names must be unique meaning we don't have # duplicates of the underlying data even if multiple # sub-charts reference it under different 'named charts'. value = array[fsp_func_name][-1] @@ -489,6 +511,8 @@ async def run_fsp( array_key=fsp_func_name ) + chart.linked.resize_sidepanes() + # TODO: figure out if we can roll our own `FillToThreshold` to # get brush filled polygons for OS/OB conditions. # ``pg.FillBetweenItems`` seems to be one technique using @@ -622,6 +646,73 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): price_chart.increment_view() +def has_vlm(ohlcv: ShmArray) -> bool: + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + volm = ohlcv.array['volume'] + return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) + + +@asynccontextmanager +async def maybe_open_vlm_display( + + linked: LinkedSplits, + ohlcv: ShmArray, + +) -> ChartPlotWidget: + + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + # volm = ohlcv.array['volume'] + # if ( + # np.all(np.isin(volm, -1)) or + # np.all(np.isnan(volm)) + # ): + if not has_vlm(ohlcv): + log.warning(f"{linked.symbol.key} does not seem to have volume info") + else: + async with open_sidepane(linked, 'volume') as sidepane: + # built-in $vlm + shm = ohlcv + chart = linked.add_plot( + name='vlm', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # vertical bars + # stepMode=True, + # static_yrange=(0, 100), + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + # size view to data once at outset + chart._set_yrange() + + yield chart + + async def display_symbol_data( godwidget: GodWidget, @@ -686,6 +777,7 @@ async def display_symbol_data( # add as next-to-y-axis singleton pane godwidget.pp_pane = pp_pane + # create main OHLC chart chart = linkedsplits.plot_ohlc_main( symbol, bars, @@ -722,7 +814,7 @@ async def display_symbol_data( 'static_yrange': (0, 100), }, }, - # test for duplicate fsps on same chart + # # test for duplicate fsps on same chart # 'rsi2': { # 'fsp_func_name': 'rsi', # 'period': 14, @@ -733,18 +825,8 @@ async def display_symbol_data( } - # make sure that the instrument supports volume history - # (sometimes this is not the case for some commodities and - # derivatives) - volm = ohlcv.array['volume'] - if ( - np.all(np.isin(volm, -1)) or - np.all(np.isnan(volm)) - ): - log.warning( - f"{sym} does not seem to have volume info," - " dropping volume signals") - else: + if has_vlm(ohlcv): + # add VWAP to fsp config for downstream loading fsp_conf.update({ 'vwap': { 'fsp_func_name': 'vwap', @@ -756,11 +838,10 @@ async def display_symbol_data( async with ( trio.open_nursery() as ln, - ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon( - spawn_fsps, + fan_out_spawn_fsp_daemons, linkedsplits, fsp_conf, sym, @@ -787,6 +868,7 @@ async def display_symbol_data( ) async with ( + maybe_open_vlm_display(linkedsplits, ohlcv), open_order_mode( feed,