From 6751840568fbf4d1429ee3269da77e831077d835 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 Sep 2021 18:17:15 -0400 Subject: [PATCH] More prep for FSP feeds The major change is moving the fsp "daemon" (more like wanna-be fspd) endpoint to use the newer `tractor.Portal.open_context()` and bi-directional streaming api. There's a few other things in here too: - make a helper for allocating single colume fsp shm arrays - rename some some fsp related functions to be more explicit on their purposes --- piker/ui/_display.py | 165 ++++++++++++++++++++++++++++++------------- 1 file changed, 114 insertions(+), 51 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index fe3698a3..f4bfe72a 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -53,6 +53,7 @@ from ..log import get_logger log = get_logger(__name__) +# TODO: load these from a config.toml! _clear_throttle_rate: int = 58 # Hz _book_throttle_rate: int = 16 # Hz @@ -184,6 +185,13 @@ async def update_chart_from_quotes( if vlm_chart: # print(f"volume: {end['volume']}") vlm_chart.update_curve_from_array('volume', array) + + # built-in tina $vlm FSP using chl3 typical price for ohlc step + # last = array[-1] + # chl3 = (last['close'] + last['high'] + last['low']) / 3 + # v = last['volume'] + # dv = last['volume'] * chl3 + vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) if ( @@ -302,7 +310,38 @@ async def update_chart_from_quotes( last_mx, last_mn = mx, mn -async def fan_out_spawn_fsp_daemons( +def maybe_mk_fsp_shm( + sym: str, + field_name: str, + display_name: Optional[str] = None, + readonly: bool = True, + +) -> (ShmArray, bool): + '''Allocate a single row shm array for an symbol-fsp pair. + + ''' + uid = tractor.current_actor().uid + if not display_name: + display_name = field_name + + # TODO: load function here and introspect + # return stream type(s) + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (field_name, float)]) + + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + return shm, opened + + +async def open_fspd_cluster( linkedsplits: LinkedSplits, fsps: dict[str, str], @@ -321,9 +360,8 @@ async def fan_out_spawn_fsp_daemons( ''' linkedsplits.focus() - uid = tractor.current_actor().uid - - # spawns sub-processes which execute cpu bound FSP code + # spawns sub-processes which execute cpu bound fsp work + # which is streamed back to this parent. async with ( tractor.open_nursery() as n, trio.open_nursery() as ln, @@ -334,21 +372,12 @@ async def fan_out_spawn_fsp_daemons( # scale horizonatlly once cores are used up. for display_name, conf in fsps.items(): - fsp_func_name = conf['fsp_func_name'] + func_name = conf['func_name'] - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - # this is all sync currently - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, + shm, opened = maybe_mk_fsp_shm( + sym, + field_name=func_name, + display_name=display_name, readonly=True, ) @@ -366,13 +395,13 @@ async def fan_out_spawn_fsp_daemons( # init async ln.start_soon( - run_fsp, + update_chart_from_fsp, portal, linkedsplits, brokermod, sym, src_shm, - fsp_func_name, + func_name, display_name, conf, group_status_key, @@ -383,7 +412,7 @@ async def fan_out_spawn_fsp_daemons( @asynccontextmanager -async def open_sidepane( +async def open_fsp_sidepane( linked: LinkedSplits, conf: dict[str, dict[str, str]], @@ -403,8 +432,11 @@ async def open_sidepane( } # add parameters for selection "options" - defaults = config.get('params', {}) - for name, default in defaults.items(): + params = config.get('params', {}) + for name, config in params.items(): + + default = config['default_value'] + kwargs = config.get('widget_kwargs', {}) # add to ORM schema schema.update({ @@ -412,6 +444,7 @@ async def open_sidepane( 'label': f'**{name}**:', 'type': 'edit', 'default_value': default, + 'kwargs': kwargs, }, }) @@ -424,7 +457,7 @@ async def open_sidepane( FspConfig = create_model( 'FspConfig', name=display_name, - **defaults, + **params, ) sidepane.model = FspConfig() @@ -444,14 +477,15 @@ async def open_sidepane( yield sidepane -async def run_fsp( +async def update_chart_from_fsp( + + portal: tractor.Portal, - portal: tractor._portal.Portal, linkedsplits: LinkedSplits, brokermod: ModuleType, sym: str, src_shm: ShmArray, - fsp_func_name: str, + func_name: str, display_name: str, conf: dict[str, dict], group_status_key: str, @@ -470,7 +504,7 @@ async def run_fsp( ) async with ( - portal.open_stream_from( + portal.open_context( # chaining entrypoint fsp.cascade, @@ -480,21 +514,17 @@ async def run_fsp( src_shm_token=src_shm.token, dst_shm_token=conf['shm'].token, symbol=sym, - fsp_func_name=fsp_func_name, + func_name=func_name, loglevel=loglevel, - ) as stream, + ) as (ctx, last_index), + ctx.open_stream() as stream, - open_sidepane( + open_fsp_sidepane( linkedsplits, {display_name: conf}, ) as sidepane, ): - - # receive last index for processed historical - # data-array as first msg - _ = await stream.receive() - shm = conf['shm'] if conf.get('overlay'): @@ -513,7 +543,7 @@ async def run_fsp( name=display_name, array=shm.array, - array_key=conf['fsp_func_name'], + array_key=conf['func_name'], sidepane=sidepane, # curve by default @@ -541,7 +571,7 @@ async def run_fsp( # XXX: fsp func names must be unique meaning we don't have # duplicates of the underlying data even if multiple # sub-charts reference it under different 'named charts'. - value = array[fsp_func_name][-1] + value = array[func_name][-1] last_val_sticky.update_from_data(-1, value) @@ -552,7 +582,7 @@ async def run_fsp( chart.update_curve_from_array( display_name, shm.array, - array_key=fsp_func_name + array_key=func_name ) chart.linked.resize_sidepanes() @@ -563,11 +593,11 @@ async def run_fsp( # generic fills between curve types while ``PlotCurveItem`` has # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) + # graphics = chart.update_from_array(chart.name, array[func_name]) # graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setFillLevel(50) - if fsp_func_name == 'rsi': + if func_name == 'rsi': from ._lines import level_line # add moveable over-[sold/bought] lines # and labels only for the 70/30 lines @@ -611,7 +641,7 @@ async def run_fsp( try: # read last array = shm.array - value = array[-1][fsp_func_name] + value = array[-1][func_name] break except IndexError: @@ -625,7 +655,7 @@ async def run_fsp( chart.update_curve_from_array( display_name, array, - array_key=fsp_func_name, + array_key=func_name, ) # set time of last graphics update @@ -711,12 +741,26 @@ async def maybe_open_vlm_display( yield return else: - async with open_sidepane( + + shm, opened = maybe_mk_fsp_shm( + linked.symbol.key, + '$_vlm', + readonly=True, + ) + + async with open_fsp_sidepane( linked, { 'volume': { + 'params': { - 'price_func': 'ohl3' - } + + 'price_func': { + 'default_value': 'ohl3', + # tell target ``Edit`` widget to not allow + # edits for now. + 'widget_kwargs': {'readonly': True}, + }, + }, } }, ) as sidepane: @@ -872,20 +916,39 @@ async def display_symbol_data( # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { + 'rsi': { - 'fsp_func_name': 'rsi', - 'params': {'period': 14}, + + # literal python func ref lookup name + 'func_name': 'rsi', + + # map of parameters to place on the fsp sidepane widget + # which should map to dynamic inputs available to the + # fsp function at runtime. + 'params': { + 'period': { + 'default_value': 14, + 'widget_kwargs': {'readonly': True}, + }, + }, + + # ``ChartPlotWidget`` options passthrough 'chart_kwargs': { 'static_yrange': (0, 100), }, }, + } - if has_vlm(ohlcv): + if has_vlm(ohlcv): # and provider != 'binance': + # binance is too fast atm for FSPs until we wrap + # the fsp streams as throttled ``Feeds``, see + # + # add VWAP to fsp config for downstream loading fsp_conf.update({ 'vwap': { - 'fsp_func_name': 'vwap', + 'func_name': 'vwap', 'overlay': True, 'anchor': 'session', }, @@ -904,7 +967,7 @@ async def display_symbol_data( ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon( - fan_out_spawn_fsp_daemons, + open_fspd_cluster, linkedsplits, fsp_conf, sym,