From b7f27f201fe67c43fa31e4bca751f718cfc1b0f4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Dec 2021 14:29:57 -0500 Subject: [PATCH 1/9] Add `try_read()` to shm mod --- piker/data/_sharedmem.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 9c515ce3..c741ba1c 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -505,3 +505,35 @@ def maybe_open_shm_array( # to fail if a block has been allocated # on the OS by someone else. return open_shm_array(key=key, dtype=dtype, **kwargs), True + + +def try_read( + array: np.ndarray + +) -> Optional[np.ndarray]: + ''' + Try to read the last row from a shared mem array or ``None`` + if the array read returns a zero-length array result. + + Can be used to check for backfilling race conditions where an array + is currently being (re-)written by a writer actor but the reader is + unaware and reads during the window where the first and last indexes + are being updated. + + ''' + try: + return array[-1] + except IndexError: + # XXX: race condition with backfilling shm. + # + # the underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the indices may be updated in such a way that + # a read delivers an empty array (though it seems like we + # *should* be able to prevent that?). also, as and alt and + # something we need anyway, maybe there should be some kind of + # signal that a prepend is taking place and this consumer can + # respond (eg. redrawing graphics) accordingly. + + # the array read was emtpy + return None From 7a41c83f84906b1b200cd73d34c1328fbbb893bf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Dec 2021 14:30:33 -0500 Subject: [PATCH 2/9] Move FSP related graphics management into new mod --- piker/ui/_display.py | 294 +++---------------------------------------- piker/ui/_fsp.py | 272 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 288 insertions(+), 278 deletions(-) create mode 100644 piker/ui/_fsp.py diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 657d203a..b4b9618f 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -29,7 +29,6 @@ from types import ModuleType from typing import Optional, AsyncGenerator import numpy as np -from pydantic import create_model import pyqtgraph as pg import tractor import trio @@ -45,12 +44,17 @@ from ._chart import ( ) from .. import fsp from ._l1 import L1Labels -from ..data._sharedmem import ShmArray, maybe_open_shm_array +from ._fsp import ( + update_fsp_chart, + maybe_mk_fsp_shm, + open_fsp_sidepane, + has_vlm, + maybe_open_vlm_display, +) +from ..data._sharedmem import ShmArray, try_read from ._forms import ( FieldsForm, - mk_form, mk_order_pane_layout, - open_form_input_handling, ) from .order_mode import open_order_mode from ..log import get_logger @@ -61,78 +65,6 @@ log = get_logger(__name__) _quote_throttle_rate: int = 58 # Hz -def try_read( - array: np.ndarray - -) -> Optional[np.ndarray]: - ''' - Try to read the last row from a shared mem array or ``None`` - if the array read returns a zero-length array result. - - Can be used to check for backfilling race conditions where an array - is currently being (re-)written by a writer actor but the reader is - unaware and reads during the window where the first and last indexes - are being updated. - - ''' - try: - return array[-1] - except IndexError: - # XXX: race condition with backfilling shm. - # - # the underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the indices may be updated in such a way that - # a read delivers an empty array (though it seems like we - # *should* be able to prevent that?). also, as and alt and - # something we need anyway, maybe there should be some kind of - # signal that a prepend is taking place and this consumer can - # respond (eg. redrawing graphics) accordingly. - - # the array read was emtpy - return None - - -def update_fsp_chart( - chart: ChartPlotWidget, - shm: ShmArray, - graphics_name: str, - array_key: Optional[str], - -) -> None: - - array = shm.array - last_row = try_read(array) - - # guard against unreadable case - if not last_row: - log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') - return - - # update graphics - # NOTE: this does a length check internally which allows it - # staying above the last row check below.. - chart.update_curve_from_array( - graphics_name, - array, - array_key=array_key or graphics_name, - ) - chart.cv._set_yrange() - - # XXX: re: ``array_key``: fsp func names must be unique meaning we - # can't have duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - - # read from last calculated value and update any label - last_val_sticky = chart._ysticks.get(graphics_name) - if last_val_sticky: - # array = shm.array[array_key] - # if len(array): - # value = array[-1] - last = last_row[array_key] - last_val_sticky.update_from_data(-1, last) - - # a working tick-type-classes template _tick_groups = { 'clears': {'trade', 'utrade', 'last'}, @@ -155,7 +87,7 @@ def chart_maxmin( # https://arxiv.org/abs/cs/0610046 # https://github.com/lemire/pythonmaxmin - array = chart._arrays[chart.name] + array = chart._arrays['ohlc'] ifirst = array[0]['index'] last_bars_range = chart.bars_range() @@ -212,7 +144,6 @@ async def update_chart_from_quotes( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) @@ -249,7 +180,6 @@ async def update_chart_from_quotes( tick_margin = 3 * tick_size chart.show() - view = chart.view last_quote = time.time() async for quotes in stream: @@ -297,10 +227,8 @@ async def update_chart_from_quotes( mx_vlm_in_view != last_mx_vlm or mx_vlm_in_view > last_mx_vlm ): - print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vlm_view._set_yrange( - yrange=(0, mx_vlm_in_view * 1.375) - ) + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) last_mx_vlm = mx_vlm_in_view ticks_frame = quote.get('ticks', ()) @@ -416,12 +344,9 @@ async def update_chart_from_quotes( l1.bid_label.update_fields({'level': price, 'size': size}) # check for y-range re-size - if ( - (mx > last_mx) or (mn < last_mn) - and not chart._static_yrange == 'axis' - ): - print(f'new y range: {(mn, mx)}') - view._set_yrange( + if (mx > last_mx) or (mn < last_mn): + # print(f'new y range: {(mn, mx)}') + chart._set_yrange( yrange=(mn, mx), # TODO: we should probably scale # the view margin based on the size @@ -443,7 +368,6 @@ async def update_chart_from_quotes( name, array_key=name, ) - subchart.cv._set_yrange() # TODO: all overlays on all subplots.. @@ -455,105 +379,6 @@ async def update_chart_from_quotes( curve_name, array_key=curve_name, ) - # chart._set_yrange() - - -def maybe_mk_fsp_shm( - sym: str, - field_name: str, - display_name: Optional[str] = None, - readonly: bool = True, - -) -> (ShmArray, bool): - ''' - Allocate a single row shm array for an symbol-fsp pair if none - exists, otherwise load the shm already existing for that token. - - ''' - uid = tractor.current_actor().uid - if not display_name: - display_name = field_name - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (field_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) - return shm, opened - - -@acm -async def open_fsp_sidepane( - linked: LinkedSplits, - conf: dict[str, dict[str, str]], - -) -> FieldsForm: - - schema = {} - - assert len(conf) == 1 # for now - - # add (single) selection widget - for display_name, config in conf.items(): - schema[display_name] = { - 'label': '**fsp**:', - 'type': 'select', - 'default_value': [display_name], - } - - # add parameters for selection "options" - params = config.get('params', {}) - for name, config in params.items(): - - default = config['default_value'] - kwargs = config.get('widget_kwargs', {}) - - # add to ORM schema - schema.update({ - name: { - 'label': f'**{name}**:', - 'type': 'edit', - 'default_value': default, - 'kwargs': kwargs, - }, - }) - - sidepane: FieldsForm = mk_form( - parent=linked.godwidget, - fields_schema=schema, - ) - - # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation - FspConfig = create_model( - 'FspConfig', - name=display_name, - **params, - ) - sidepane.model = FspConfig() - - # just a logger for now until we get fsp configs up and running. - async def settings_change(key: str, value: str) -> bool: - print(f'{key}: {value}') - return True - - # TODO: - async with ( - open_form_input_handling( - sidepane, - focus_next=linked.godwidget, - on_value_change=settings_change, - ) - ): - yield sidepane @acm @@ -799,7 +624,7 @@ async def update_chart_from_fsp( level_line(chart, 70, orient_v='bottom') level_line(chart, 80, orient_v='top') - chart.cv._set_yrange() + chart._set_yrange() done() # status updates profiler(f'fsp:{func_name} starting update loop') @@ -908,93 +733,6 @@ async def check_for_new_bars( price_chart.increment_view() -def has_vlm(ohlcv: ShmArray) -> bool: - # make sure that the instrument supports volume history - # (sometimes this is not the case for some commodities and - # derivatives) - volm = ohlcv.array['volume'] - return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) - - -@acm -async def maybe_open_vlm_display( - linked: LinkedSplits, - ohlcv: ShmArray, - -) -> ChartPlotWidget: - - if not has_vlm(ohlcv): - log.warning(f"{linked.symbol.key} does not seem to have volume info") - yield - return - else: - - shm, opened = maybe_mk_fsp_shm( - linked.symbol.key, - 'vlm', - readonly=True, - ) - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - } - }, - ) as sidepane: - - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, - - array_key='volume', - sidepane=sidepane, - - # curve by default - ohlc=False, - - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - value = shm.array['volume'][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.update_curve_from_array( - 'volume', - shm.array, - ) - - # size view to data once at outset - chart.cv._set_yrange() - - yield chart - - async def display_symbol_data( godwidget: GodWidget, provider: str, @@ -1079,7 +817,7 @@ async def display_symbol_data( ) # size view to data once at outset - chart.cv._set_yrange() + chart._set_yrange() # TODO: a data view api that makes this less shit chart._shm = ohlcv diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py new file mode 100644 index 00000000..7d45fef2 --- /dev/null +++ b/piker/ui/_fsp.py @@ -0,0 +1,272 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +FSP UI and graphics components. + +Financial signal processing cluster and real-time graphics management. + +''' +from contextlib import asynccontextmanager as acm +from typing import Optional + +import numpy as np +from pydantic import create_model +import tractor + +from ..data._sharedmem import ( + ShmArray, + maybe_open_shm_array, + try_read, +) +from ._chart import ( + ChartPlotWidget, + LinkedSplits, + # GodWidget, +) +from ._forms import ( + FieldsForm, + mk_form, + open_form_input_handling, +) +from ..log import get_logger + +log = get_logger(__name__) + + +def maybe_mk_fsp_shm( + sym: str, + field_name: str, + display_name: Optional[str] = None, + readonly: bool = True, + +) -> (ShmArray, bool): + ''' + Allocate a single row shm array for an symbol-fsp pair if none + exists, otherwise load the shm already existing for that token. + + ''' + uid = tractor.current_actor().uid + if not display_name: + display_name = field_name + + # TODO: load function here and introspect + # return stream type(s) + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (field_name, float)]) + + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + return shm, opened + + +def has_vlm(ohlcv: ShmArray) -> bool: + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + volm = ohlcv.array['volume'] + return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) + + +@acm +async def maybe_open_vlm_display( + linked: LinkedSplits, + ohlcv: ShmArray, + +) -> ChartPlotWidget: + + if not has_vlm(ohlcv): + log.warning(f"{linked.symbol.key} does not seem to have volume info") + yield + return + else: + + shm, opened = maybe_mk_fsp_shm( + linked.symbol.key, + 'vlm', + readonly=True, + ) + + async with open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + } + }, + ) as sidepane: + + # built-in $vlm + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.update_curve_from_array( + 'volume', + shm.array, + ) + + # size view to data once at outset + chart._set_yrange() + + yield chart + + +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + graphics_name: str, + array_key: Optional[str], + +) -> None: + + array = shm.array + last_row = try_read(array) + + # guard against unreadable case + if not last_row: + log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') + return + + # update graphics + # NOTE: this does a length check internally which allows it + # staying above the last row check below.. + chart.update_curve_from_array( + graphics_name, + array, + array_key=array_key or graphics_name, + ) + chart._set_yrange() + + # XXX: re: ``array_key``: fsp func names must be unique meaning we + # can't have duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + + # read from last calculated value and update any label + last_val_sticky = chart._ysticks.get(graphics_name) + if last_val_sticky: + # array = shm.array[array_key] + # if len(array): + # value = array[-1] + last = last_row[array_key] + last_val_sticky.update_from_data(-1, last) + + +@acm +async def open_fsp_sidepane( + linked: LinkedSplits, + conf: dict[str, dict[str, str]], + +) -> FieldsForm: + + schema = {} + + assert len(conf) == 1 # for now + + # add (single) selection widget + for display_name, config in conf.items(): + schema[display_name] = { + 'label': '**fsp**:', + 'type': 'select', + 'default_value': [display_name], + } + + # add parameters for selection "options" + params = config.get('params', {}) + for name, config in params.items(): + + default = config['default_value'] + kwargs = config.get('widget_kwargs', {}) + + # add to ORM schema + schema.update({ + name: { + 'label': f'**{name}**:', + 'type': 'edit', + 'default_value': default, + 'kwargs': kwargs, + }, + }) + + sidepane: FieldsForm = mk_form( + parent=linked.godwidget, + fields_schema=schema, + ) + + # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation + FspConfig = create_model( + 'FspConfig', + name=display_name, + **params, + ) + sidepane.model = FspConfig() + + # just a logger for now until we get fsp configs up and running. + async def settings_change(key: str, value: str) -> bool: + print(f'{key}: {value}') + return True + + # TODO: + async with ( + open_form_input_handling( + sidepane, + focus_next=linked.godwidget, + on_value_change=settings_change, + ) + ): + yield sidepane From 06fe2bd1bec1a590d2b09db044e993db3ea860d6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 Jan 2022 15:24:46 -0500 Subject: [PATCH 3/9] Support "volume" and "dollar volume" on same chart This is a huge commit which moves a bunch of code around in order to simplify some of our UI modules as well as support our first official mult-axis chart: overlaid volume and "dollar volume". A good deal of this change set is to make startup fast such that volume data which is often shipped alongside OHLC history is loaded and shown asap and FSPs are loaded in an actor cluster with their graphics overlayed concurrently as each responsible worker generates plottable output. For everything to work this commit requires use of a draft `pyqtgraph` PR: https://github.com/pyqtgraph/pyqtgraph/pull/2162 Change summary: - move remaining FSP actor cluster helpers into `.ui._fsp` mod as well as fsp specific UI managers (`maybe_open_vlm_display()`, `start_fsp_displays()`). - add an `FspAdmin` API for starting fsp chains on the cluster concurrently allowing for future work toward reload/unloading. - bring FSP config dict into `start_fsp_displays()` and `.started()`-deliver both the fsp admin and any volume chart back up to the calling display loop code. ToDo: - repair `ChartView` click-drag interactions - auto-range on $ vlm needs to use `ChartPlotWidget._set_yrange()` - a lot better styling for the $_vlm overlay XD --- piker/fsp/_engine.py | 6 +- piker/ui/_display.py | 381 ++----------------------- piker/ui/_fsp.py | 646 +++++++++++++++++++++++++++++++++++++------ 3 files changed, 593 insertions(+), 440 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index bc55e154..aafaf76c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -186,15 +186,17 @@ async def fsp_compute( async def cascade( ctx: tractor.Context, + + # data feed key brokername: str, + symbol: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], - symbol: str, func_name: str, - zero_on_step: bool = False, + zero_on_step: bool = False, loglevel: Optional[str] = None, ) -> None: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index b4b9618f..3ffcb4b0 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,35 +21,25 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' -from contextlib import asynccontextmanager as acm from functools import partial -from itertools import cycle import time -from types import ModuleType -from typing import Optional, AsyncGenerator +from typing import Optional import numpy as np -import pyqtgraph as pg import tractor import trio from .. import brokers -from .._cacheables import maybe_open_context -from tractor.trionics import gather_contexts from ..data.feed import open_feed, Feed from ._chart import ( ChartPlotWidget, LinkedSplits, GodWidget, ) -from .. import fsp from ._l1 import L1Labels from ._fsp import ( update_fsp_chart, - maybe_mk_fsp_shm, - open_fsp_sidepane, - has_vlm, - maybe_open_vlm_display, + start_fsp_displays, ) from ..data._sharedmem import ShmArray, try_read from ._forms import ( @@ -83,6 +73,10 @@ def chart_maxmin( float, float, ]: + ''' + Compute max and min datums "in view" for range limits. + + ''' # TODO: implement this # https://arxiv.org/abs/cs/0610046 # https://github.com/lemire/pythonmaxmin @@ -110,7 +104,7 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view -async def update_chart_from_quotes( +async def update_linked_charts_graphics( linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -121,7 +115,8 @@ async def update_chart_from_quotes( ) -> None: '''The 'main' (price) chart real-time update loop. - Receive from the quote stream and update the OHLC chart. + Receive from the primary instrument quote stream and update the OHLC + chart. ''' # TODO: bunch of stuff: @@ -231,6 +226,14 @@ async def update_chart_from_quotes( vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) last_mx_vlm = mx_vlm_in_view + for curve_name, shm in vlm_chart._overlays.items(): + update_fsp_chart( + vlm_chart, + shm, + curve_name, + array_key=curve_name, + ) + ticks_frame = quote.get('ticks', ()) frames_by_type: dict[str, dict] = {} @@ -381,284 +384,6 @@ async def update_chart_from_quotes( ) -@acm -async def open_fsp_cluster( - workers: int = 2 - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - from tractor._clustering import open_actor_cluster - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - async with open_actor_cluster( - count=2, - names=['fsp_0', 'fsp_1'], - modules=['piker.fsp._engine'], - ) as cluster_map: - profiler('started fsp cluster') - yield cluster_map - - -@acm -async def maybe_open_fsp_cluster( - workers: int = 2, - **kwargs, - -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - - kwargs.update( - {'workers': workers} - ) - - async with maybe_open_context( - # for now make a cluster per client? - acm_func=open_fsp_cluster, - kwargs=kwargs, - ) as (cache_hit, cluster_map): - if cache_hit: - log.info('re-using existing fsp cluster') - yield cluster_map - else: - yield cluster_map - - -async def start_fsp_displays( - cluster_map: dict[str, tractor.Portal], - linkedsplits: LinkedSplits, - fsps: dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - ''' - Create sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. - - Pass target entrypoint and historical data. - - ''' - linkedsplits.focus() - - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) - - async with trio.open_nursery() as n: - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for (display_name, conf), (name, portal) in zip( - fsps.items(), - - # round robin to cluster for now.. - cycle(cluster_map.items()), - ): - func_name = conf['func_name'] - shm, opened = maybe_mk_fsp_shm( - sym, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - - profiler(f'created shm for fsp actor: {display_name}') - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - profiler(f'attached to fsp portal: {display_name}') - - # init async - n.start_soon( - partial( - update_chart_from_fsp, - - portal, - linkedsplits, - brokermod, - sym, - src_shm, - func_name, - display_name, - conf=conf, - shm=shm, - is_overlay=conf.get('overlay', False), - group_status_key=group_status_key, - loglevel=loglevel, - profiler=profiler, - ) - ) - - # blocks here until all fsp actors complete - - -async def update_chart_from_fsp( - portal: tractor.Portal, - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, - src_shm: ShmArray, - func_name: str, - display_name: str, - conf: dict[str, dict], - - shm: ShmArray, - is_overlay: bool, - - group_status_key: str, - loglevel: str, - profiler: pg.debug.Profiler, - -) -> None: - ''' - FSP stream chart update loop. - - This is called once for each entry in the fsp - config map. - - ''' - - profiler(f'started chart task for fsp: {func_name}') - - done = linkedsplits.window().status_bar.open_status( - f'loading fsp, {display_name}..', - group_key=group_status_key, - ) - - async with ( - portal.open_context( - - # chaining entrypoint - fsp.cascade, - - # name as title of sub-chart - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=shm.token, - symbol=sym, - func_name=func_name, - loglevel=loglevel, - zero_on_step=conf.get('zero_on_step', False), - - ) as (ctx, last_index), - ctx.open_stream() as stream, - - open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, - ): - profiler(f'fsp:{func_name} attached to fsp ctx-stream') - - if is_overlay: - chart = linkedsplits.chart - chart.draw_curve( - name=display_name, - data=shm.array, - overlay=True, - color='default_light', - ) - # specially store ref to shm for lookup in display loop - chart._overlays[display_name] = shm - - else: - chart = linkedsplits.add_plot( - name=display_name, - array=shm.array, - - array_key=conf['func_name'], - sidepane=sidepane, - - # curve by default - ohlc=False, - - # settings passed down to ``ChartPlotWidget`` - **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linkedsplits.chart.name - - array_key = func_name - - profiler(f'fsp:{func_name} chart created') - - # first UI update, usually from shm pushed history - update_fsp_chart( - chart, - shm, - display_name, - array_key=array_key, - ) - - chart.linked.focus() - - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) - - if func_name == 'rsi': - from ._lines import level_line - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') - - chart._set_yrange() - done() # status updates - - profiler(f'fsp:{func_name} starting update loop') - profiler.finish() - - # update chart graphics - last = time.time() - async for value in stream: - - # chart isn't actively shown so just skip render cycle - if chart.linked.isHidden(): - continue - - else: - now = time.time() - period = now - last - - if period <= 1/_quote_throttle_rate: - # faster then display refresh rate - print(f'fsp too fast: {1/period}') - continue - - # run synchronous update - update_fsp_chart( - chart, - shm, - display_name, - array_key=func_name, - ) - - # set time of last graphics update - last = time.time() - - async def check_for_new_bars( feed: Feed, ohlcv: np.ndarray, @@ -822,57 +547,6 @@ async def display_symbol_data( # TODO: a data view api that makes this less shit chart._shm = ohlcv - # TODO: eventually we'll support some kind of n-compose syntax - fsp_conf = { - - # 'dolla_vlm': { - # 'func_name': 'dolla_vlm', - # 'zero_on_step': True, - # 'params': { - # 'price_func': { - # 'default_value': 'chl3', - # # tell target ``Edit`` widget to not allow - # # edits for now. - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - # 'chart_kwargs': {'style': 'step'} - # }, - - # 'rsi': { - # 'func_name': 'rsi', # literal python func ref lookup name - - # # map of parameters to place on the fsp sidepane widget - # # which should map to dynamic inputs available to the - # # fsp function at runtime. - # 'params': { - # 'period': { - # 'default_value': 14, - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - - # # ``ChartPlotWidget`` options passthrough - # 'chart_kwargs': { - # 'static_yrange': (0, 100), - # }, - # }, - } - - if has_vlm(ohlcv): # and provider != 'binance': - # binance is too fast atm for FSPs until we wrap - # the fsp streams as throttled ``Feeds``, see - # - - # add VWAP to fsp config for downstream loading - fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': True, - 'anchor': 'session', - }, - }) - # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to # the linked set *before* the main price chart! @@ -880,32 +554,24 @@ async def display_symbol_data( linkedsplits.focus() await trio.sleep(0) - vlm_chart = None - - async with gather_contexts( - ( - trio.open_nursery(), - maybe_open_vlm_display(linkedsplits, ohlcv), - maybe_open_fsp_cluster(), - ) - ) as (ln, vlm_chart, cluster_map): + vlm_chart: Optional[ChartPlotWidget] = None + async with trio.open_nursery() as ln: # load initial fsp chain (otherwise known as "indicators") - ln.start_soon( + admin, vlm_chart = await ln.start( start_fsp_displays, - cluster_map, + linkedsplits, - fsp_conf, + brokermod, sym, ohlcv, - brokermod, loading_sym_key, loglevel, ) # start graphics update loop after receiving first live quote ln.start_soon( - update_chart_from_quotes, + update_linked_charts_graphics, linkedsplits, feed.stream, ohlcv, @@ -913,6 +579,7 @@ async def display_symbol_data( vlm_chart, ) + # start sample step incrementer ln.start_soon( check_for_new_bars, feed, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 7d45fef2..22b99f2c 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -21,12 +21,20 @@ Financial signal processing cluster and real-time graphics management. ''' from contextlib import asynccontextmanager as acm -from typing import Optional +from functools import partial +from itertools import cycle +from types import ModuleType +from typing import Optional, AsyncGenerator, Any import numpy as np from pydantic import create_model import tractor +from tractor.trionics import gather_contexts +import pyqtgraph as pg +import trio +from trio_typing import TaskStatus +from .._cacheables import maybe_open_context from ..data._sharedmem import ( ShmArray, maybe_open_shm_array, @@ -35,8 +43,8 @@ from ..data._sharedmem import ( from ._chart import ( ChartPlotWidget, LinkedSplits, - # GodWidget, ) +from .. import fsp from ._forms import ( FieldsForm, mk_form, @@ -88,85 +96,6 @@ def has_vlm(ohlcv: ShmArray) -> bool: return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) -@acm -async def maybe_open_vlm_display( - linked: LinkedSplits, - ohlcv: ShmArray, - -) -> ChartPlotWidget: - - if not has_vlm(ohlcv): - log.warning(f"{linked.symbol.key} does not seem to have volume info") - yield - return - else: - - shm, opened = maybe_mk_fsp_shm( - linked.symbol.key, - 'vlm', - readonly=True, - ) - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - } - }, - ) as sidepane: - - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, - - array_key='volume', - sidepane=sidepane, - - # curve by default - ohlc=False, - - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - value = shm.array['volume'][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.update_curve_from_array( - 'volume', - shm.array, - ) - - # size view to data once at outset - chart._set_yrange() - - yield chart - - def update_fsp_chart( chart: ChartPlotWidget, shm: ShmArray, @@ -270,3 +199,558 @@ async def open_fsp_sidepane( ) ): yield sidepane + + +@acm +async def open_fsp_actor_cluster( + names: list[str] = ['fsp_0', 'fsp_1'], + +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + from tractor._clustering import open_actor_cluster + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + async with open_actor_cluster( + count=2, + names=names, + modules=['piker.fsp._engine'], + + ) as cluster_map: + profiler('started fsp cluster') + yield cluster_map + + +class FspAdmin: + ''' + Client API for orchestrating FSP actors and displaying + real-time graphics output. + + ''' + def __init__( + self, + tn: trio.Nursery, + cluster: dict[str, tractor.Portal], + + ) -> None: + self.tn = tn + self.cluster = cluster + self._rr_next_actor = cycle(cluster.items()) + self._registry: dict[ + tuple, + tuple[tractor.MsgStream, ShmArray] + ] = {} + + def rr_next_portal(self) -> tractor.Portal: + name, portal = next(self._rr_next_actor) + return portal + + async def open_remote_fsp( + self, + + portal: tractor.Portal, + complete: trio.Event, + started: trio.Event, + + brokername: str, + sym: str, + + src_shm: ShmArray, + dst_shm: ShmArray, + + conf: dict, + func_name: str, + loglevel: str, + + ) -> None: + ''' + Task which opens a remote FSP endpoint in the managed + cluster and sleeps until signalled to exit. + + ''' + async with ( + portal.open_context( + + # chaining entrypoint + fsp.cascade, + + # data feed key + brokername=brokername, + symbol=sym, + + # mems + src_shm_token=src_shm.token, + dst_shm_token=dst_shm.token, + + # target + func_name=func_name, + + loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), + + ) as (ctx, last_index), + ctx.open_stream() as stream, + ): + # register output data + self._registry[(brokername, sym, func_name)] = ( + stream, dst_shm, complete) + + started.set() + + # wait for graceful shutdown signal + await complete.wait() + + async def start_fsp( + self, + + display_name: str, + feed_key: tuple[str, str], + src_shm: ShmArray, + conf: dict[str, dict[str, Any]], + + worker_name: Optional[str] = None, + loglevel: str = 'error', + + ) -> (ShmArray, trio.Event): + + # unpack FSP details from config dict + func_name = conf['func_name'] + + # allocate an output shm array + dst_shm, opened = maybe_mk_fsp_shm( + feed_key, + field_name=func_name, + display_name=display_name, + readonly=True, + ) + if not opened: + raise RuntimeError("Already started FSP {func_name}") + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + complete = trio.Event() + started = trio.Event() + + brokername, sym = feed_key + self.tn.start_soon( + self.open_remote_fsp, + portal, + complete, + started, + + brokername, + sym, + + src_shm, + dst_shm, + + conf, + func_name, + loglevel, + ) + + return dst_shm, started + + +@acm +async def open_fsp_admin( + **kwargs, + +) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: + + async with ( + maybe_open_context( + # for now make a cluster per client? + acm_func=open_fsp_actor_cluster, + kwargs=kwargs, + ) as (cache_hit, cluster_map), + + trio.open_nursery() as tn, + ): + if cache_hit: + log.info('re-using existing fsp cluster') + + admin = FspAdmin(tn, cluster_map) + try: + yield admin + finally: + # terminate all tasks via signals + for key, entry in admin._registry.items(): + _, _, event = entry + event.set() + + +@acm +async def maybe_open_vlm_display( + linked: LinkedSplits, + ohlcv: ShmArray, + +) -> ChartPlotWidget: + ''' + Volume subchart helper. + + Since "volume" is often included directly alongside OHLCV price + data, we don't really need a separate FSP-actor + shm array for it + since it's likely already directly adjacent to OHLC samples from the + data provider. + + ''' + if not has_vlm(ohlcv): + log.warning(f"{linked.symbol.key} does not seem to have volume info") + yield + return + else: + + # shm, opened = maybe_mk_fsp_shm( + # linked.symbol.key, + # 'vlm', + # readonly=True, + # ) + + async with open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + } + }, + ) as sidepane: + + # built-in $vlm + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.update_curve_from_array( + 'volume', + shm.array, + ) + + # size view to data once at outset + chart._set_yrange() + + yield chart + + +async def run_fsp_ui( + + shm: ShmArray, + started: trio.Event, + linkedsplits: LinkedSplits, + func_name: str, + display_name: str, + conf: dict[str, dict], + group_status_key: str, + loglevel: str, + profiler: pg.debug.Profiler, + + _quote_throttle_rate: int = 58, + +) -> None: + ''' + FSP stream chart update loop. + + This is called once for each entry in the fsp + config map. + + ''' + profiler(f'started UI task for fsp: {func_name}') + + async with ( + # side UI for parameters/controls + open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, + ): + await started.wait() + profiler(f'fsp:{func_name} attached to fsp ctx-stream') + + overlay_with = conf.get('overlay', False) + if overlay_with: + if overlay_with == 'ohlc': + chart = linkedsplits.chart + else: + chart = linkedsplits.subplots[overlay_with] + + chart.draw_curve( + name=display_name, + data=shm.array, + overlay=True, + color='default_light', + separate_axes=conf.get('separate_axes', False), + **conf.get('chart_kwargs', {}) + ) + # specially store ref to shm for lookup in display loop + chart._overlays[display_name] = shm + + else: + chart = linkedsplits.add_plot( + name=display_name, + array=shm.array, + + array_key=func_name, + sidepane=sidepane, + + # curve by default + ohlc=False, + + # settings passed down to ``ChartPlotWidget`` + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linkedsplits.chart.name + + array_key = func_name + + profiler(f'fsp:{func_name} chart created') + + # first UI update, usually from shm pushed history + update_fsp_chart( + chart, + shm, + display_name, + array_key=array_key, + ) + + chart.linked.focus() + + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + + # graphics = chart.update_from_array(chart.name, array[func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) + + if func_name == 'rsi': + from ._lines import level_line + # add moveable over-[sold/bought] lines + # and labels only for the 70/30 lines + level_line(chart, 20) + level_line(chart, 30, orient_v='top') + level_line(chart, 70, orient_v='bottom') + level_line(chart, 80, orient_v='top') + + chart._set_yrange() + # done() # status updates + + profiler(f'fsp:{func_name} starting update loop') + profiler.finish() + + # update chart graphics + # last = time.time() + + # XXX: this currently doesn't loop since + # the FSP engine does **not** push updates atm + # since we do graphics update in the main loop + # in ``._display. + # async for value in stream: + # print(value) + + # # chart isn't actively shown so just skip render cycle + # if chart.linked.isHidden(): + # continue + + # else: + # now = time.time() + # period = now - last + + # if period <= 1/_quote_throttle_rate: + # # faster then display refresh rate + # print(f'fsp too fast: {1/period}') + # continue + + # # run synchronous update + # update_fsp_chart( + # chart, + # shm, + # display_name, + # array_key=func_name, + # ) + + # # set time of last graphics update + # last = time.time() + + +async def start_fsp_displays( + + linkedsplits: LinkedSplits, + brokermod: ModuleType, + sym: str, + ohlcv: ShmArray, + group_status_key: str, + loglevel: str, + + task_status: TaskStatus[ + tuple[FspAdmin, 'ChartPlotWidet'] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Create sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. + + Pass target entrypoint and historical data. + + ''' + # TODO: eventually we'll support some kind of n-compose syntax + fsp_conf = { + + # 'rsi': { + # 'func_name': 'rsi', # literal python func ref lookup name + + # # map of parameters to place on the fsp sidepane widget + # # which should map to dynamic inputs available to the + # # fsp function at runtime. + # 'params': { + # 'period': { + # 'default_value': 14, + # 'widget_kwargs': {'readonly': True}, + # }, + # }, + + # # ``ChartPlotWidget`` options passthrough + # 'chart_kwargs': { + # 'static_yrange': (0, 100), + # }, + # }, + } + + if has_vlm(ohlcv): + fsp_conf.update({ + 'vwap': { + 'func_name': 'vwap', + 'overlay': 'ohlc', + 'anchor': 'session', + }, + + 'dolla_vlm': { + + 'func_name': 'dolla_vlm', + 'zero_on_step': True, + 'overlay': 'volume', + 'separate_axes': True, + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + 'chart_kwargs': {'step_mode': True} + }, + + }) + + linkedsplits.focus() + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + + async with gather_contexts(( + + # NOTE: this admin internally opens an actor pool. + open_fsp_admin(), + + trio.open_nursery(), + + # TODO: fast startup of volume overlayed with $_vlm + maybe_open_vlm_display(linkedsplits, ohlcv), + + )) as (admin, n, vlm_chart): + + task_status.started((admin, vlm_chart)) + + for display_name, conf in fsp_conf.items(): + func_name = conf['func_name'] + + done = linkedsplits.window().status_bar.open_status( + f'loading fsp, {display_name}..', + group_key=group_status_key, + ) + + shm, started = await admin.start_fsp( + display_name, + (brokermod.name, sym), + ohlcv, + fsp_conf[display_name], + loglevel, + ) + + profiler(f'created shm for fsp actor: {display_name}') + + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + + profiler(f'attached to fsp portal: {display_name}') + + # init async + n.start_soon( + partial( + run_fsp_ui, + + shm, + started, + linkedsplits, + func_name, + display_name, + + conf=conf, + group_status_key=group_status_key, + loglevel=loglevel, + profiler=profiler, + ) + ) + await started.wait() + done() + + # blocks on nursery until all fsp actors complete From e69af9e29137f622eb78d578c8b864202f607f44 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Jan 2022 11:36:30 -0500 Subject: [PATCH 4/9] Show unit vlm on LHS for now --- piker/ui/_fsp.py | 117 ++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 58 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 22b99f2c..047d7e80 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -120,7 +120,6 @@ def update_fsp_chart( array, array_key=array_key or graphics_name, ) - chart._set_yrange() # XXX: re: ``array_key``: fsp func names must be unique meaning we # can't have duplicates of the underlying data even if multiple @@ -400,72 +399,69 @@ async def maybe_open_vlm_display( log.warning(f"{linked.symbol.key} does not seem to have volume info") yield return - else: - # shm, opened = maybe_mk_fsp_shm( - # linked.symbol.key, - # 'vlm', - # readonly=True, - # ) - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, + async with open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, }, - } - }, - ) as sidepane: + }, + } + }, + ) as sidepane: - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, + # built-in $vlm + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, - array_key='volume', - sidepane=sidepane, + array_key='volume', + sidepane=sidepane, - # curve by default - ohlc=False, + # curve by default + ohlc=False, - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm + # show volume units value on LHS (for dinkus) + chart.hideAxis('right') + chart.showAxis('left') - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name - # read from last calculated value - value = shm.array['volume'][-1] + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] - last_val_sticky.update_from_data(-1, value) + # read from last calculated value + value = shm.array['volume'][-1] - chart.update_curve_from_array( - 'volume', - shm.array, - ) + last_val_sticky.update_from_data(-1, value) - # size view to data once at outset - chart._set_yrange() + chart.update_curve_from_array( + 'volume', + shm.array, + ) - yield chart + # size view to data once at outset + chart.view._set_yrange() + + yield chart async def run_fsp_ui( @@ -494,7 +490,10 @@ async def run_fsp_ui( async with ( # side UI for parameters/controls - open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, + open_fsp_sidepane( + linkedsplits, + {display_name: conf}, + ) as sidepane, ): await started.wait() profiler(f'fsp:{func_name} attached to fsp ctx-stream') @@ -511,6 +510,7 @@ async def run_fsp_ui( data=shm.array, overlay=True, color='default_light', + array_key=func_name, separate_axes=conf.get('separate_axes', False), **conf.get('chart_kwargs', {}) ) @@ -575,7 +575,7 @@ async def run_fsp_ui( level_line(chart, 70, orient_v='bottom') level_line(chart, 80, orient_v='top') - chart._set_yrange() + chart.view._set_yrange() # done() # status updates profiler(f'fsp:{func_name} starting update loop') @@ -669,7 +669,6 @@ async def start_fsp_displays( }, 'dolla_vlm': { - 'func_name': 'dolla_vlm', 'zero_on_step': True, 'overlay': 'volume', @@ -701,8 +700,10 @@ async def start_fsp_displays( trio.open_nursery(), - # TODO: fast startup of volume overlayed with $_vlm - maybe_open_vlm_display(linkedsplits, ohlcv), + maybe_open_vlm_display( + linkedsplits, + ohlcv, + ), )) as (admin, n, vlm_chart): From 09fc901b0d55c2c2da35b49257c94e7949da8b61 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Jan 2022 16:06:51 -0500 Subject: [PATCH 5/9] Handle left axis case for x-axis label placement For wtv cucked reason all the viewbox/scene coordinate calcs do **not** include a left axis in the geo (likely because it's a hacked in widget + layout thing managed by `PlotItem`). Detect if there's a left axis and if so use it in the label placement scene coords calc. ToDo: probably make this a non-move calc and only recompute any time the axis changes. Other: - rate limit mouse events down to the 60 (ish) Hz for now - change one last lingering `'ohlc'` array lookup - fix `.mouseMoved()` "event" type annot --- piker/ui/_cursor.py | 62 ++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/piker/ui/_cursor.py b/piker/ui/_cursor.py index d9a4e45a..3833a123 100644 --- a/piker/ui/_cursor.py +++ b/piker/ui/_cursor.py @@ -24,7 +24,7 @@ from typing import Optional, Callable import inspect import numpy as np import pyqtgraph as pg -from PyQt5 import QtCore, QtGui, QtWidgets +from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import QPointF, QRectF from ._style import ( @@ -43,8 +43,8 @@ log = get_logger(__name__) # latency (in terms of perceived lag in cross hair) so really be sure # there's an improvement if you want to change it! -_mouse_rate_limit = 120 # TODO; should we calc current screen refresh rate? -_debounce_delay = 1 / 40 +_mouse_rate_limit = 58 # TODO; should we calc current screen refresh rate? +_debounce_delay = 1 / 60 _ch_label_opac = 1 @@ -64,7 +64,7 @@ class LineDot(pg.CurvePoint): ) -> None: # scale from dpi aware font size - size = int(_font.px_size * 0.375) + size = int(_font.px_size * 0.375) pg.CurvePoint.__init__( self, @@ -246,12 +246,16 @@ class ContentsLabels: # for name, (label, update) in self._labels.items(): for chart, name, label, update in self._labels: - if not (index >= 0 and index < chart._arrays['ohlc'][-1]['index']): + array = chart._arrays[name] + if not ( + index >= 0 + and index < array[-1]['index'] + ): # out of range print('out of range?') continue - array = chart._arrays[name] + # array = chart._arrays[name] # call provided update func with data point try: @@ -462,12 +466,15 @@ class Cursor(pg.GraphicsObject): def mouseMoved( self, - evt: 'tuple[QMouseEvent]', # noqa - ) -> None: # noqa - """Update horizonal and vertical lines when mouse moves inside + coords: tuple[QPointF], # noqa + + ) -> None: + ''' + Update horizonal and vertical lines when mouse moves inside either the main chart or any indicator subplot. - """ - pos = evt[0] + + ''' + pos = coords[0] # find position inside active plot try: @@ -516,28 +523,37 @@ class Cursor(pg.GraphicsObject): # with cursor movement self.contents_labels.update_labels(ix) + vl_x = ix + line_offset for plot, opts in self.graphics.items(): - # update the chart's "contents" label - # plot.update_contents_labels(ix) - # move the vertical line to the current "center of bar" - opts['vl'].setX(ix + line_offset) + opts['vl'].setX(vl_x) # update all subscribed curve dots for cursor in opts.get('cursors', ()): cursor.setIndex(ix) # update the label on the bottom of the crosshair - if 'bottom' in plot.plotItem.axes: + axes = plot.plotItem.axes + + # TODO: make this an up-front calc that we update + # on axis-widget resize events. + # left axis offset width for calcuating + # absolute x-axis label placement. + left_axis_width = 0 + + if 'bottom' in axes: + + left = axes.get('left') + if left: + left_axis_width = left['item'].width() + + # map back to abs (label-local) coordinates self.xaxis_label.update_label( - - # XXX: requires: - # https://github.com/pyqtgraph/pyqtgraph/pull/1418 - # otherwise gobbles tons of CPU.. - - # map back to abs (label-local) coordinates - abs_pos=plot.mapFromView(QPointF(ix + line_offset, iy)), + abs_pos=( + plot.mapFromView(QPointF(vl_x, iy)) - + QPointF(left_axis_width, 0) + ), value=ix, ) From e22a6528528590e7a7a0c020ea6f1085355631df Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Jan 2022 18:29:07 -0500 Subject: [PATCH 6/9] Move plotitem overlaying into a `.overlay_plotitem()` --- piker/ui/_chart.py | 168 +++++++++++++++++++++------------------------ 1 file changed, 79 insertions(+), 89 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 3ac2cf14..dd665fde 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -19,8 +19,6 @@ High level chart-widget apis. ''' from __future__ import annotations -from functools import partial -from dataclasses import dataclass from typing import Optional from PyQt5 import QtCore, QtWidgets @@ -383,8 +381,9 @@ class LinkedSplits(QWidget): style: str = 'bar', - ) -> 'ChartPlotWidget': - '''Start up and show main (price) chart and all linked subcharts. + ) -> ChartPlotWidget: + ''' + Start up and show main (price) chart and all linked subcharts. The data input struct array must include OHLC fields. @@ -537,7 +536,7 @@ class LinkedSplits(QWidget): ) self.cursor.contents_labels.add_label( cpw, - 'ohlc', + name, anchor_at=('top', 'left'), update_func=ContentsLabel.update_from_ohlc, ) @@ -611,11 +610,11 @@ class LinkedSplits(QWidget): # import pydantic -# class ArrayScene(pydantic.BaseModel): +# class Graphics(pydantic.BaseModel): # ''' # Data-AGGRegate: high level API onto multiple (categorized) -# ``ShmArray``s with high level processing routines mostly for -# graphics summary and display. +# ``ShmArray``s with high level processing routines for +# graphics computations and display. # ''' # arrays: dict[str, np.ndarray] = {} @@ -738,7 +737,7 @@ class ChartPlotWidget(pg.PlotWidget): self.default_view() self.cv.enable_auto_yrange() - self.overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) + self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) def resume_all_feeds(self): for feed in self._feeds.values(): @@ -849,7 +848,7 @@ class ChartPlotWidget(pg.PlotWidget): # adds all bar/candle graphics objects for each data point in # the np array buffer to be drawn on next render cycle - self.addItem(graphics) + self.plotItem.addItem(graphics) # draw after to allow self.scene() to work... graphics.draw_from_data(data) @@ -860,6 +859,57 @@ class ChartPlotWidget(pg.PlotWidget): return graphics, data_key + def overlay_plotitem( + self, + name: str, + + ) -> pg.PlotItem: + # Custom viewbox impl + cv = self.mk_vb(name) + cv.chart = self + + # xaxis = DynamicDateAxis( + # orientation='bottom', + # linkedsplits=self.linked, + # ) + yaxis = PriceAxis( + orientation='right', + linkedsplits=self.linked, + ) + + plotitem = pg.PlotItem( + parent=self.plotItem, + name=name, + enableMenu=False, + viewBox=cv, + axisItems={ + # 'bottom': xaxis, + 'right': yaxis, + }, + default_axes=[], + ) + # plotitem.setAxisItems( + # add_to_layout=False, + # axisItems={ + # 'bottom': xaxis, + # 'right': yaxis, + # }, + # ) + # plotite.hideAxis('right') + # plotite.hideAxis('bottom') + # plotitem.addItem(curve) + cv.enable_auto_yrange() + + # plotitem.enableAutoRange(axis='y') + plotitem.hideButtons() + + self.pi_overlay.add_plotitem( + plotitem, + # only link x-axes, + link_axes=(0,), + ) + return plotitem + def draw_curve( self, @@ -868,7 +918,6 @@ class ChartPlotWidget(pg.PlotWidget): array_key: Optional[str] = None, overlay: bool = False, - separate_axes: bool = False, color: Optional[str] = None, add_label: bool = True, @@ -907,11 +956,11 @@ class ChartPlotWidget(pg.PlotWidget): **pdi_kwargs, ) - # XXX: see explanation for differenct caching modes: + # XXX: see explanation for different caching modes: # https://stackoverflow.com/a/39410081 # seems to only be useful if we don't re-generate the entire # QPainterPath every time - # curve.curve.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + # curve.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) # don't ever use this - it's a colossal nightmare of artefacts # and is disastrous for performance. @@ -921,83 +970,28 @@ class ChartPlotWidget(pg.PlotWidget): self._graphics[name] = curve self._arrays[data_key] = data + pi = self.plotItem + + # TODO: this probably needs its own method? if overlay: # anchor_at = ('bottom', 'left') self._overlays[name] = None - if separate_axes: + if isinstance(overlay, pg.PlotItem): + if overlay not in self.pi_overlay.overlays: + raise RuntimeError( + f'{overlay} must be from `.plotitem_overlay()`' + ) + pi = overlay - # Custom viewbox impl - cv = self.mk_vb(name) - - def maxmin(): - return self.maxmin(name=data_key) - - # ensure view maxmin is computed from correct array - # cv._maxmin = partial(self.maxmin, name=data_key) - - cv._maxmin = maxmin - - cv.chart = self - - # xaxis = DynamicDateAxis( - # orientation='bottom', - # linkedsplits=self.linked, - # ) - yaxis = PriceAxis( - orientation='right', - linkedsplits=self.linked, - ) - - plotitem = pg.PlotItem( - parent=self.plotItem, - name=name, - enableMenu=False, - viewBox=cv, - axisItems={ - # 'bottom': xaxis, - 'right': yaxis, - }, - default_axes=[], - ) - # plotitem.setAxisItems( - # add_to_layout=False, - # axisItems={ - # 'bottom': xaxis, - # 'right': yaxis, - # }, - # ) - # plotite.hideAxis('right') - # plotite.hideAxis('bottom') - plotitem.addItem(curve) - cv.enable_auto_yrange() - - # config - # plotitem.setAutoVisible(y=True) - # plotitem.enableAutoRange(axis='y') - plotitem.hideButtons() - - self.overlay.add_plotitem( - plotitem, - # only link x-axes, - link_axes=(0,), - ) - - else: - # this intnernally calls `PlotItem.addItem()` on the - # graphics object - self.addItem(curve) else: - # this intnernally calls `PlotItem.addItem()` on the - # graphics object - self.addItem(curve) - # anchor_at = ('top', 'left') # TODO: something instead of stickies for overlays # (we need something that avoids clutter on x-axis). self._add_sticky(name, bg_color=color) + pi.addItem(curve) return curve, data_key # TODO: make this a ctx mngr @@ -1036,7 +1030,8 @@ class ChartPlotWidget(pg.PlotWidget): **kwargs, ) -> pg.GraphicsObject: - '''Update the named internal graphics from ``array``. + ''' + Update the named internal graphics from ``array``. ''' self._arrays[self.name] = array @@ -1053,7 +1048,8 @@ class ChartPlotWidget(pg.PlotWidget): **kwargs, ) -> pg.GraphicsObject: - '''Update the named internal graphics from ``array``. + ''' + Update the named internal graphics from ``array``. ''' assert len(array) @@ -1123,7 +1119,7 @@ class ChartPlotWidget(pg.PlotWidget): def get_index(self, time: float) -> int: # TODO: this should go onto some sort of - # data-view strimg thinger..right? + # data-view thinger..right? ohlc = self._shm.array # XXX: not sure why the time is so off here @@ -1178,15 +1174,9 @@ class ChartPlotWidget(pg.PlotWidget): yhigh = np.nanmax(bars['high']) else: - try: - view = bars[name or self.data_key] - except: - breakpoint() - # if self.data_key != 'volume': - # else: - # view = bars + view = bars[name or self.data_key] ylow = np.nanmin(view) yhigh = np.nanmax(view) - # print(f'{(ylow, yhigh)}') + # print(f'{(ylow, yhigh)}') return ylow, yhigh From 404f5d6d23d0ddd8497fe8217ed0ac14b51bb797 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Jan 2022 18:48:47 -0500 Subject: [PATCH 7/9] Factor (sub-)chart spawning into a admin method Adds `FspAdmin.open_fsp_chart()` which allows adding a real time graphics display of an fsp's output with different options for where (which chart or make a new one) to place it. Further, - change some method naming, namely the other fsp engine task methods to `.open_chain()` and `.start_engine_task()`. - make `run_fsp_ui()` a lone task function for now with the default config parsing and chart setup logic (and it still includes a buncha commented out stuff for doing graphics update which is now done in the main loop to avoid task switching overhead). - move all vlm related fsp config entries into the `open_vlm_displays()` task for dedicated setup with the fsp admin api such as special auto-yrange handling and graph overlays. - `start_fsp_displays()` is now just a small loop through config entries with synced startup status messages. --- piker/ui/_fsp.py | 787 +++++++++++++++++++++++++---------------------- 1 file changed, 421 insertions(+), 366 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 047d7e80..12fe9a69 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -23,13 +23,12 @@ Financial signal processing cluster and real-time graphics management. from contextlib import asynccontextmanager as acm from functools import partial from itertools import cycle -from types import ModuleType from typing import Optional, AsyncGenerator, Any import numpy as np from pydantic import create_model import tractor -from tractor.trionics import gather_contexts +# from tractor.trionics import gather_contexts import pyqtgraph as pg import trio from trio_typing import TaskStatus @@ -92,8 +91,8 @@ def has_vlm(ohlcv: ShmArray) -> bool: # make sure that the instrument supports volume history # (sometimes this is not the case for some commodities and # derivatives) - volm = ohlcv.array['volume'] - return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) + vlm = ohlcv.array['volume'] + return not bool(np.all(np.isin(vlm, -1)) or np.all(np.isnan(vlm))) def update_fsp_chart( @@ -208,285 +207,42 @@ async def open_fsp_actor_cluster( from tractor._clustering import open_actor_cluster - profiler = pg.debug.Profiler( - delayed=False, - disabled=False - ) + # profiler = pg.debug.Profiler( + # delayed=False, + # disabled=False + # ) async with open_actor_cluster( count=2, names=names, modules=['piker.fsp._engine'], ) as cluster_map: - profiler('started fsp cluster') + # profiler('started fsp cluster') yield cluster_map -class FspAdmin: - ''' - Client API for orchestrating FSP actors and displaying - real-time graphics output. - - ''' - def __init__( - self, - tn: trio.Nursery, - cluster: dict[str, tractor.Portal], - - ) -> None: - self.tn = tn - self.cluster = cluster - self._rr_next_actor = cycle(cluster.items()) - self._registry: dict[ - tuple, - tuple[tractor.MsgStream, ShmArray] - ] = {} - - def rr_next_portal(self) -> tractor.Portal: - name, portal = next(self._rr_next_actor) - return portal - - async def open_remote_fsp( - self, - - portal: tractor.Portal, - complete: trio.Event, - started: trio.Event, - - brokername: str, - sym: str, - - src_shm: ShmArray, - dst_shm: ShmArray, - - conf: dict, - func_name: str, - loglevel: str, - - ) -> None: - ''' - Task which opens a remote FSP endpoint in the managed - cluster and sleeps until signalled to exit. - - ''' - async with ( - portal.open_context( - - # chaining entrypoint - fsp.cascade, - - # data feed key - brokername=brokername, - symbol=sym, - - # mems - src_shm_token=src_shm.token, - dst_shm_token=dst_shm.token, - - # target - func_name=func_name, - - loglevel=loglevel, - zero_on_step=conf.get('zero_on_step', False), - - ) as (ctx, last_index), - ctx.open_stream() as stream, - ): - # register output data - self._registry[(brokername, sym, func_name)] = ( - stream, dst_shm, complete) - - started.set() - - # wait for graceful shutdown signal - await complete.wait() - - async def start_fsp( - self, - - display_name: str, - feed_key: tuple[str, str], - src_shm: ShmArray, - conf: dict[str, dict[str, Any]], - - worker_name: Optional[str] = None, - loglevel: str = 'error', - - ) -> (ShmArray, trio.Event): - - # unpack FSP details from config dict - func_name = conf['func_name'] - - # allocate an output shm array - dst_shm, opened = maybe_mk_fsp_shm( - feed_key, - field_name=func_name, - display_name=display_name, - readonly=True, - ) - if not opened: - raise RuntimeError("Already started FSP {func_name}") - - portal = self.cluster.get(worker_name) or self.rr_next_portal() - complete = trio.Event() - started = trio.Event() - - brokername, sym = feed_key - self.tn.start_soon( - self.open_remote_fsp, - portal, - complete, - started, - - brokername, - sym, - - src_shm, - dst_shm, - - conf, - func_name, - loglevel, - ) - - return dst_shm, started - - -@acm -async def open_fsp_admin( - **kwargs, - -) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: - - async with ( - maybe_open_context( - # for now make a cluster per client? - acm_func=open_fsp_actor_cluster, - kwargs=kwargs, - ) as (cache_hit, cluster_map), - - trio.open_nursery() as tn, - ): - if cache_hit: - log.info('re-using existing fsp cluster') - - admin = FspAdmin(tn, cluster_map) - try: - yield admin - finally: - # terminate all tasks via signals - for key, entry in admin._registry.items(): - _, _, event = entry - event.set() - - -@acm -async def maybe_open_vlm_display( - linked: LinkedSplits, - ohlcv: ShmArray, - -) -> ChartPlotWidget: - ''' - Volume subchart helper. - - Since "volume" is often included directly alongside OHLCV price - data, we don't really need a separate FSP-actor + shm array for it - since it's likely already directly adjacent to OHLC samples from the - data provider. - - ''' - if not has_vlm(ohlcv): - log.warning(f"{linked.symbol.key} does not seem to have volume info") - yield - return - - async with open_fsp_sidepane( - linked, { - 'vlm': { - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - } - }, - ) as sidepane: - - # built-in $vlm - shm = ohlcv - chart = linked.add_plot( - name='volume', - array=shm.array, - - array_key='volume', - sidepane=sidepane, - - # curve by default - ohlc=False, - - # Draw vertical bars from zero. - # we do this internally ourselves since - # the curve item internals are pretty convoluted. - style='step', - ) - - # show volume units value on LHS (for dinkus) - chart.hideAxis('right') - chart.showAxis('left') - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linked.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - value = shm.array['volume'][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.update_curve_from_array( - 'volume', - shm.array, - ) - - # size view to data once at outset - chart.view._set_yrange() - - yield chart - - async def run_fsp_ui( + linkedsplits: LinkedSplits, shm: ShmArray, started: trio.Event, - linkedsplits: LinkedSplits, func_name: str, display_name: str, conf: dict[str, dict], - group_status_key: str, loglevel: str, - profiler: pg.debug.Profiler, - - _quote_throttle_rate: int = 58, + # profiler: pg.debug.Profiler, + # _quote_throttle_rate: int = 58, ) -> None: ''' - FSP stream chart update loop. + Taskf for UI spawning around a ``LinkedSplits`` chart for fsp + related graphics/UX management. - This is called once for each entry in the fsp - config map. + This is normally spawned/called once for each entry in the fsp + config. ''' - profiler(f'started UI task for fsp: {func_name}') + # profiler(f'started UI task for fsp: {func_name}') async with ( # side UI for parameters/controls @@ -496,7 +252,7 @@ async def run_fsp_ui( ) as sidepane, ): await started.wait() - profiler(f'fsp:{func_name} attached to fsp ctx-stream') + # profiler(f'fsp:{func_name} attached to fsp ctx-stream') overlay_with = conf.get('overlay', False) if overlay_with: @@ -518,6 +274,7 @@ async def run_fsp_ui( chart._overlays[display_name] = shm else: + # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( name=display_name, array=shm.array, @@ -530,7 +287,6 @@ async def run_fsp_ui( # settings passed down to ``ChartPlotWidget`` **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), ) # XXX: ONLY for sub-chart fsps, overlays have their @@ -543,7 +299,7 @@ async def run_fsp_ui( array_key = func_name - profiler(f'fsp:{func_name} chart created') + # profiler(f'fsp:{func_name} chart created') # first UI update, usually from shm pushed history update_fsp_chart( @@ -566,20 +322,20 @@ async def run_fsp_ui( # graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setFillLevel(50) - if func_name == 'rsi': - from ._lines import level_line - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') + # if func_name == 'rsi': + # from ._lines import level_line + # # add moveable over-[sold/bought] lines + # # and labels only for the 70/30 lines + # level_line(chart, 20) + # level_line(chart, 30, orient_v='top') + # level_line(chart, 70, orient_v='bottom') + # level_line(chart, 80, orient_v='top') chart.view._set_yrange() # done() # status updates - profiler(f'fsp:{func_name} starting update loop') - profiler.finish() + # profiler(f'fsp:{func_name} starting update loop') + # profiler.finish() # update chart graphics # last = time.time() @@ -616,30 +372,396 @@ async def run_fsp_ui( # last = time.time() +class FspAdmin: + ''' + Client API for orchestrating FSP actors and displaying + real-time graphics output. + + ''' + def __init__( + self, + tn: trio.Nursery, + cluster: dict[str, tractor.Portal], + linked: LinkedSplits, + src_shm: ShmArray, + + ) -> None: + self.tn = tn + self.cluster = cluster + self.linked = linked + self._rr_next_actor = cycle(cluster.items()) + self._registry: dict[ + tuple, + tuple[tractor.MsgStream, ShmArray] + ] = {} + self.src_shm = src_shm + + def rr_next_portal(self) -> tractor.Portal: + name, portal = next(self._rr_next_actor) + return portal + + async def open_chain( + self, + + portal: tractor.Portal, + complete: trio.Event, + started: trio.Event, + dst_shm: ShmArray, + conf: dict, + func_name: str, + loglevel: str, + + ) -> None: + ''' + Task which opens a remote FSP endpoint in the managed + cluster and sleeps until signalled to exit. + + ''' + brokername, sym = self.linked.symbol.front_feed() + async with ( + portal.open_context( + + # chaining entrypoint + fsp.cascade, + + # data feed key + brokername=brokername, + symbol=sym, + + # mems + src_shm_token=self.src_shm.token, + dst_shm_token=dst_shm.token, + + # target + func_name=func_name, + + loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), + + ) as (ctx, last_index), + ctx.open_stream() as stream, + ): + # register output data + self._registry[(brokername, sym, func_name)] = ( + stream, dst_shm, complete) + + started.set() + + # wait for graceful shutdown signal + await complete.wait() + + async def start_engine_task( + self, + + display_name: str, + conf: dict[str, dict[str, Any]], + + worker_name: Optional[str] = None, + loglevel: str = 'error', + + ) -> (ShmArray, trio.Event): + + # unpack FSP details from config dict + func_name = conf['func_name'] + + # allocate an output shm array + dst_shm, opened = maybe_mk_fsp_shm( + self.linked.symbol.front_feed(), + field_name=func_name, + display_name=display_name, + readonly=True, + ) + if not opened: + raise RuntimeError(f'Already started FSP {func_name}') + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + complete = trio.Event() + started = trio.Event() + self.tn.start_soon( + self.open_chain, + + portal, + complete, + started, + dst_shm, + conf, + func_name, + loglevel, + ) + + return dst_shm, started + + async def open_fsp_chart( + self, + display_name: str, + conf: dict, # yeah probably dumb.. + loglevel: str = 'error', + + ) -> (trio.Event, ChartPlotWidget): + + func_name = conf['func_name'] + + shm, started = await self.start_engine_task( + display_name, + conf, + loglevel, + ) + + # init async + self.tn.start_soon( + partial( + run_fsp_ui, + + self.linked, + shm, + started, + func_name, + display_name, + + conf=conf, + loglevel=loglevel, + ) + ) + return started + + +@acm +async def open_fsp_admin( + linked: LinkedSplits, + src_shm: ShmArray, + **kwargs, + +) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: + + async with ( + maybe_open_context( + # for now make a cluster per client? + acm_func=open_fsp_actor_cluster, + kwargs=kwargs, + ) as (cache_hit, cluster_map), + + trio.open_nursery() as tn, + ): + if cache_hit: + log.info('re-using existing fsp cluster') + + admin = FspAdmin( + tn, + cluster_map, + linked, + src_shm, + ) + try: + yield admin + finally: + # terminate all tasks via signals + for key, entry in admin._registry.items(): + _, _, event = entry + event.set() + + +async def open_vlm_displays( + + linked: LinkedSplits, + ohlcv: ShmArray, + dvlm: bool = True, + + task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, + +) -> ChartPlotWidget: + ''' + Volume subchart displays. + + Since "volume" is often included directly alongside OHLCV price + data, we don't really need a separate FSP-actor + shm array for it + since it's likely already directly adjacent to OHLC samples from the + data provider. + + Further only if volume data is detected (it sometimes isn't provided + eg. forex, certain commodities markets) will volume dependent FSPs + be spawned here. + + ''' + async with ( + open_fsp_sidepane( + linked, { + 'vlm': { + 'params': { + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, + } + }, + ) as sidepane, + open_fsp_admin(linked, ohlcv) as admin, + ): + # built-in vlm which we plot ASAP since it's + # usually data provided directly with OHLC history. + shm = ohlcv + chart = linked.add_plot( + name='volume', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # Draw vertical bars from zero. + # we do this internally ourselves since + # the curve item internals are pretty convoluted. + style='step', + ) + + # force 0 to always be in view + def maxmin(name) -> tuple[float, float]: + mxmn = chart.maxmin(name=name) + if mxmn: + return 0, mxmn[1] + + return 0, 0 + + chart.view._maxmin = partial(maxmin, name='volume') + + # TODO: fix the x-axis label issue where if you put + # the axis on the left it's totally not lined up... + # show volume units value on LHS (for dinkus) + # chart.hideAxis('right') + # chart.showAxis('left') + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # send back new chart to caller + task_status.started(chart) + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.update_curve_from_array( + 'volume', + shm.array, + ) + + # size view to data once at outset + chart.view._set_yrange() + + if not dvlm: + return + + # spawn and overlay $ vlm on the same subchart + shm, started = await admin.start_engine_task( + 'dolla_vlm', + # linked.symbol.front_feed(), # data-feed symbol key + { # fsp engine conf + 'func_name': 'dolla_vlm', + 'zero_on_step': True, + 'params': { + 'price_func': { + 'default_value': 'chl3', + }, + }, + }, + # loglevel, + ) + # profiler(f'created shm for fsp actor: {display_name}') + + await started.wait() + + pi = chart.overlay_plotitem( + 'dolla_vlm', + ) + # add custom auto range handler + pi.vb._maxmin = partial(maxmin, name='dolla_vlm') + + curve, _ = chart.draw_curve( + + name='dolla_vlm', + data=shm.array, + + array_key='dolla_vlm', + overlay=pi, + color='charcoal', + step_mode=True, + # **conf.get('chart_kwargs', {}) + ) + # TODO: is there a way to "sync" the dual axes such that only + # one curve is needed? + # curve.hide() + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + chart._overlays['dolla_vlm'] = shm + + # XXX: old dict-style config before it was moved into the helper task + # 'dolla_vlm': { + # 'func_name': 'dolla_vlm', + # 'zero_on_step': True, + # 'overlay': 'volume', + # 'separate_axes': True, + # 'params': { + # 'price_func': { + # 'default_value': 'chl3', + # # tell target ``Edit`` widget to not allow + # # edits for now. + # 'widget_kwargs': {'readonly': True}, + # }, + # }, + # 'chart_kwargs': {'step_mode': True} + # }, + + # } + + # built-in vlm fsps + for display_name, conf in { + 'vwap': { + 'func_name': 'vwap', + 'overlay': 'ohlc', # overlays with OHLCV (main) chart + 'anchor': 'session', + }, + }.items(): + started = await admin.open_fsp_chart( + display_name, + conf, + ) + + async def start_fsp_displays( - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, + linked: LinkedSplits, ohlcv: ShmArray, group_status_key: str, loglevel: str, - task_status: TaskStatus[ - tuple[FspAdmin, 'ChartPlotWidet'] - ] = trio.TASK_STATUS_IGNORED, - ) -> None: ''' - Create sub-actors (under flat tree) - for each entry in config and attach to local graphics update tasks. + Create fsp charts from a config input attached to a local actor + compute cluster. - Pass target entrypoint and historical data. + Pass target entrypoint and historical data via ``ShmArray``. ''' + linked.focus() + # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { - # 'rsi': { # 'func_name': 'rsi', # literal python func ref lookup name @@ -659,99 +781,32 @@ async def start_fsp_displays( # }, # }, } - - if has_vlm(ohlcv): - fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': 'ohlc', - 'anchor': 'session', - }, - - 'dolla_vlm': { - 'func_name': 'dolla_vlm', - 'zero_on_step': True, - 'overlay': 'volume', - 'separate_axes': True, - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - 'chart_kwargs': {'step_mode': True} - }, - - }) - - linkedsplits.focus() - profiler = pg.debug.Profiler( delayed=False, disabled=False ) - async with gather_contexts(( - - # NOTE: this admin internally opens an actor pool. - open_fsp_admin(), - - trio.open_nursery(), - - maybe_open_vlm_display( - linkedsplits, - ohlcv, - ), - - )) as (admin, n, vlm_chart): - - task_status.started((admin, vlm_chart)) + # async with gather_contexts(( + async with ( + # NOTE: this admin internally opens an actor cluster + open_fsp_admin(linked, ohlcv) as admin, + ): + statuses = [] for display_name, conf in fsp_conf.items(): - func_name = conf['func_name'] - - done = linkedsplits.window().status_bar.open_status( + started = await admin.open_fsp_chart( + display_name, + conf, + ) + done = linked.window().status_bar.open_status( f'loading fsp, {display_name}..', group_key=group_status_key, ) + statuses.append((started, done)) - shm, started = await admin.start_fsp( - display_name, - (brokermod.name, sym), - ohlcv, - fsp_conf[display_name], - loglevel, - ) - - profiler(f'created shm for fsp actor: {display_name}') - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - + for fsp_loaded, status_cb in statuses: + await fsp_loaded.wait() profiler(f'attached to fsp portal: {display_name}') - - # init async - n.start_soon( - partial( - run_fsp_ui, - - shm, - started, - linkedsplits, - func_name, - display_name, - - conf=conf, - group_status_key=group_status_key, - loglevel=loglevel, - profiler=profiler, - ) - ) - await started.wait() - done() + status_cb() # blocks on nursery until all fsp actors complete From c3c1e14cf4ff39e7167cab9b729176ea03666cb9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Jan 2022 13:38:36 -0500 Subject: [PATCH 8/9] Start vlm and other fsps as separate tasks --- piker/ui/_display.py | 46 +++++++++++++++++++++++++++++++++----------- piker/ui/_window.py | 10 ++++++---- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 3ffcb4b0..1e1855bb 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -40,6 +40,8 @@ from ._l1 import L1Labels from ._fsp import ( update_fsp_chart, start_fsp_displays, + has_vlm, + open_vlm_displays, ) from ..data._sharedmem import ShmArray, try_read from ._forms import ( @@ -81,7 +83,7 @@ def chart_maxmin( # https://arxiv.org/abs/cs/0610046 # https://github.com/lemire/pythonmaxmin - array = chart._arrays['ohlc'] + array = chart._arrays[chart.name] ifirst = array[0]['index'] last_bars_range = chart.bars_range() @@ -139,6 +141,7 @@ async def update_linked_charts_graphics( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] + vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) @@ -175,6 +178,7 @@ async def update_linked_charts_graphics( tick_margin = 3 * tick_size chart.show() + view = chart.view last_quote = time.time() async for quotes in stream: @@ -186,7 +190,10 @@ async def update_linked_charts_graphics( if ( quote_period <= 1/_quote_throttle_rate - and quote_rate > _quote_throttle_rate * 1.5 + + # in the absolute worst case we shouldn't see more then + # twice the expected throttle rate right!? + and quote_rate >= _quote_throttle_rate * 1.5 ): log.warning(f'High quote rate {symbol.key}: {quote_rate}') last_quote = now @@ -223,7 +230,9 @@ async def update_linked_charts_graphics( mx_vlm_in_view > last_mx_vlm ): # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) + vlm_view._set_yrange( + yrange=(0, mx_vlm_in_view * 1.375) + ) last_mx_vlm = mx_vlm_in_view for curve_name, shm in vlm_chart._overlays.items(): @@ -347,9 +356,12 @@ async def update_linked_charts_graphics( l1.bid_label.update_fields({'level': price, 'size': size}) # check for y-range re-size - if (mx > last_mx) or (mn < last_mn): + if ( + (mx > last_mx) or (mn < last_mn) + and not chart._static_yrange == 'axis' + ): # print(f'new y range: {(mn, mx)}') - chart._set_yrange( + view._set_yrange( yrange=(mn, mx), # TODO: we should probably scale # the view margin based on the size @@ -371,6 +383,7 @@ async def update_linked_charts_graphics( name, array_key=name, ) + subchart.cv._set_yrange() # TODO: all overlays on all subplots.. @@ -382,6 +395,7 @@ async def update_linked_charts_graphics( curve_name, array_key=curve_name, ) + # chart._set_yrange() async def check_for_new_bars( @@ -542,7 +556,7 @@ async def display_symbol_data( ) # size view to data once at outset - chart._set_yrange() + chart.cv._set_yrange() # TODO: a data view api that makes this less shit chart._shm = ohlcv @@ -557,13 +571,19 @@ async def display_symbol_data( vlm_chart: Optional[ChartPlotWidget] = None async with trio.open_nursery() as ln: - # load initial fsp chain (otherwise known as "indicators") - admin, vlm_chart = await ln.start( - start_fsp_displays, + # if available load volume related built-in display(s) + if has_vlm(ohlcv): + vlm_chart = await ln.start( + open_vlm_displays, + linkedsplits, + ohlcv, + ) + # load (user's) FSP set (otherwise known as "indicators") + # from an input config. + ln.start_soon( + start_fsp_displays, linkedsplits, - brokermod, - sym, ohlcv, loading_sym_key, loglevel, @@ -601,5 +621,9 @@ async def display_symbol_data( await trio.sleep(0) linkedsplits.resize_sidepanes() + # TODO: make this not so shit XD + # close group status + sbar._status_groups[loading_sym_key][1]() + # let the app run. await trio.sleep_forever() diff --git a/piker/ui/_window.py b/piker/ui/_window.py index 17c97411..6a0c1d8b 100644 --- a/piker/ui/_window.py +++ b/piker/ui/_window.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -25,7 +25,7 @@ from typing import Callable, Optional, Union import uuid from pyqtgraph import QtGui -from PyQt5 import QtCore, QtWidgets +from PyQt5 import QtCore from PyQt5.QtWidgets import QLabel, QStatusBar from ..log import get_logger @@ -55,7 +55,8 @@ class MultiStatus: group_key: Optional[Union[bool, str]] = False, ) -> Union[Callable[..., None], str]: - '''Add a status to the status bar and return a close callback which + ''' + Add a status to the status bar and return a close callback which when called will remove the status ``msg``. ''' @@ -137,7 +138,8 @@ class MultiStatus: return ret def render(self) -> None: - '''Display all open statuses to bar. + ''' + Display all open statuses to bar. ''' if self.statuses: From 9813cf41694f234dde442deb1fe058bf90cabfd0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Jan 2022 16:20:26 -0500 Subject: [PATCH 9/9] Add a symbol "front feed" helper --- piker/data/_source.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/piker/data/_source.py b/piker/data/_source.py index 8ec92dfd..9b9b323d 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -106,6 +106,18 @@ class Symbol(BaseModel): mult = 1 / self.tick_size return round(value * mult) / mult + def front_feed(self) -> tuple[str, str]: + ''' + Return the "current" feed key for this symbol. + + (i.e. the broker + symbol key in a tuple). + + ''' + return ( + list(self.broker_info.keys())[0], + self.key, + ) + @validate_arguments def mk_symbol(