More prep for FSP feeds
The major change is moving the fsp "daemon" (more like wanna-be fspd) endpoint to use the newer `tractor.Portal.open_context()` and bi-directional streaming api. There's a few other things in here too: - make a helper for allocating single colume fsp shm arrays - rename some some fsp related functions to be more explicit on their purposessingle_display_update_loop
							parent
							
								
									30a5f32ef8
								
							
						
					
					
						commit
						4ea42a0a7e
					
				|  | @ -53,6 +53,7 @@ from ..log import get_logger | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | # TODO: load these from a config.toml! | ||||||
| _clear_throttle_rate: int = 58  # Hz | _clear_throttle_rate: int = 58  # Hz | ||||||
| _book_throttle_rate: int = 16  # Hz | _book_throttle_rate: int = 16  # Hz | ||||||
| 
 | 
 | ||||||
|  | @ -184,6 +185,13 @@ async def update_chart_from_quotes( | ||||||
|             if vlm_chart: |             if vlm_chart: | ||||||
|                 # print(f"volume: {end['volume']}") |                 # print(f"volume: {end['volume']}") | ||||||
|                 vlm_chart.update_curve_from_array('volume', array) |                 vlm_chart.update_curve_from_array('volume', array) | ||||||
|  | 
 | ||||||
|  |                 # built-in tina $vlm FSP using chl3 typical price for ohlc step | ||||||
|  |                 # last = array[-1] | ||||||
|  |                 # chl3 = (last['close'] + last['high'] + last['low']) / 3 | ||||||
|  |                 # v = last['volume'] | ||||||
|  |                 # dv = last['volume'] * chl3 | ||||||
|  | 
 | ||||||
|                 vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) |                 vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) | ||||||
| 
 | 
 | ||||||
|                 if ( |                 if ( | ||||||
|  | @ -302,7 +310,38 @@ async def update_chart_from_quotes( | ||||||
|                 last_mx, last_mn = mx, mn |                 last_mx, last_mn = mx, mn | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def fan_out_spawn_fsp_daemons( | def maybe_mk_fsp_shm( | ||||||
|  |     sym: str, | ||||||
|  |     field_name: str, | ||||||
|  |     display_name: Optional[str] = None, | ||||||
|  |     readonly: bool = True, | ||||||
|  | 
 | ||||||
|  | ) -> (ShmArray, bool): | ||||||
|  |     '''Allocate a single row shm array for an symbol-fsp pair. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     uid = tractor.current_actor().uid | ||||||
|  |     if not display_name: | ||||||
|  |         display_name = field_name | ||||||
|  | 
 | ||||||
|  |     # TODO: load function here and introspect | ||||||
|  |     # return stream type(s) | ||||||
|  | 
 | ||||||
|  |     # TODO: should `index` be a required internal field? | ||||||
|  |     fsp_dtype = np.dtype([('index', int), (field_name, float)]) | ||||||
|  | 
 | ||||||
|  |     key = f'{sym}.fsp.{display_name}.{".".join(uid)}' | ||||||
|  | 
 | ||||||
|  |     shm, opened = maybe_open_shm_array( | ||||||
|  |         key, | ||||||
|  |         # TODO: create entry for each time frame | ||||||
|  |         dtype=fsp_dtype, | ||||||
|  |         readonly=True, | ||||||
|  |     ) | ||||||
|  |     return shm, opened | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def open_fspd_cluster( | ||||||
| 
 | 
 | ||||||
|     linkedsplits: LinkedSplits, |     linkedsplits: LinkedSplits, | ||||||
|     fsps: dict[str, str], |     fsps: dict[str, str], | ||||||
|  | @ -321,9 +360,8 @@ async def fan_out_spawn_fsp_daemons( | ||||||
|     ''' |     ''' | ||||||
|     linkedsplits.focus() |     linkedsplits.focus() | ||||||
| 
 | 
 | ||||||
|     uid = tractor.current_actor().uid |     # spawns sub-processes which execute cpu bound fsp work | ||||||
| 
 |     # which is streamed back to this parent. | ||||||
|     # spawns sub-processes which execute cpu bound FSP code |  | ||||||
|     async with ( |     async with ( | ||||||
|         tractor.open_nursery() as n, |         tractor.open_nursery() as n, | ||||||
|         trio.open_nursery() as ln, |         trio.open_nursery() as ln, | ||||||
|  | @ -334,21 +372,12 @@ async def fan_out_spawn_fsp_daemons( | ||||||
|         # scale horizonatlly once cores are used up. |         # scale horizonatlly once cores are used up. | ||||||
|         for display_name, conf in fsps.items(): |         for display_name, conf in fsps.items(): | ||||||
| 
 | 
 | ||||||
|             fsp_func_name = conf['fsp_func_name'] |             func_name = conf['func_name'] | ||||||
| 
 | 
 | ||||||
|             # TODO: load function here and introspect |             shm, opened = maybe_mk_fsp_shm( | ||||||
|             # return stream type(s) |                 sym, | ||||||
| 
 |                 field_name=func_name, | ||||||
|             # TODO: should `index` be a required internal field? |                 display_name=display_name, | ||||||
|             fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) |  | ||||||
| 
 |  | ||||||
|             key = f'{sym}.fsp.{display_name}.{".".join(uid)}' |  | ||||||
| 
 |  | ||||||
|             # this is all sync currently |  | ||||||
|             shm, opened = maybe_open_shm_array( |  | ||||||
|                 key, |  | ||||||
|                 # TODO: create entry for each time frame |  | ||||||
|                 dtype=fsp_dtype, |  | ||||||
|                 readonly=True, |                 readonly=True, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  | @ -366,13 +395,13 @@ async def fan_out_spawn_fsp_daemons( | ||||||
| 
 | 
 | ||||||
|             # init async |             # init async | ||||||
|             ln.start_soon( |             ln.start_soon( | ||||||
|                 run_fsp, |                 update_chart_from_fsp, | ||||||
|                 portal, |                 portal, | ||||||
|                 linkedsplits, |                 linkedsplits, | ||||||
|                 brokermod, |                 brokermod, | ||||||
|                 sym, |                 sym, | ||||||
|                 src_shm, |                 src_shm, | ||||||
|                 fsp_func_name, |                 func_name, | ||||||
|                 display_name, |                 display_name, | ||||||
|                 conf, |                 conf, | ||||||
|                 group_status_key, |                 group_status_key, | ||||||
|  | @ -383,7 +412,7 @@ async def fan_out_spawn_fsp_daemons( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @asynccontextmanager | @asynccontextmanager | ||||||
| async def open_sidepane( | async def open_fsp_sidepane( | ||||||
| 
 | 
 | ||||||
|     linked: LinkedSplits, |     linked: LinkedSplits, | ||||||
|     conf: dict[str, dict[str, str]], |     conf: dict[str, dict[str, str]], | ||||||
|  | @ -403,8 +432,11 @@ async def open_sidepane( | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|         # add parameters for selection "options" |         # add parameters for selection "options" | ||||||
|         defaults = config.get('params', {}) |         params = config.get('params', {}) | ||||||
|         for name, default in defaults.items(): |         for name, config in params.items(): | ||||||
|  | 
 | ||||||
|  |             default = config['default_value'] | ||||||
|  |             kwargs = config.get('widget_kwargs', {}) | ||||||
| 
 | 
 | ||||||
|             # add to ORM schema |             # add to ORM schema | ||||||
|             schema.update({ |             schema.update({ | ||||||
|  | @ -412,6 +444,7 @@ async def open_sidepane( | ||||||
|                     'label': f'**{name}**:', |                     'label': f'**{name}**:', | ||||||
|                     'type': 'edit', |                     'type': 'edit', | ||||||
|                     'default_value': default, |                     'default_value': default, | ||||||
|  |                     'kwargs': kwargs, | ||||||
|                 }, |                 }, | ||||||
|             }) |             }) | ||||||
| 
 | 
 | ||||||
|  | @ -424,7 +457,7 @@ async def open_sidepane( | ||||||
|     FspConfig = create_model( |     FspConfig = create_model( | ||||||
|         'FspConfig', |         'FspConfig', | ||||||
|         name=display_name, |         name=display_name, | ||||||
|         **defaults, |         **params, | ||||||
|     ) |     ) | ||||||
|     sidepane.model = FspConfig() |     sidepane.model = FspConfig() | ||||||
| 
 | 
 | ||||||
|  | @ -444,14 +477,15 @@ async def open_sidepane( | ||||||
|         yield sidepane |         yield sidepane | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def run_fsp( | async def update_chart_from_fsp( | ||||||
|  | 
 | ||||||
|  |     portal: tractor.Portal, | ||||||
| 
 | 
 | ||||||
|     portal: tractor._portal.Portal, |  | ||||||
|     linkedsplits: LinkedSplits, |     linkedsplits: LinkedSplits, | ||||||
|     brokermod: ModuleType, |     brokermod: ModuleType, | ||||||
|     sym: str, |     sym: str, | ||||||
|     src_shm: ShmArray, |     src_shm: ShmArray, | ||||||
|     fsp_func_name: str, |     func_name: str, | ||||||
|     display_name: str, |     display_name: str, | ||||||
|     conf: dict[str, dict], |     conf: dict[str, dict], | ||||||
|     group_status_key: str, |     group_status_key: str, | ||||||
|  | @ -480,17 +514,17 @@ async def run_fsp( | ||||||
|             src_shm_token=src_shm.token, |             src_shm_token=src_shm.token, | ||||||
|             dst_shm_token=conf['shm'].token, |             dst_shm_token=conf['shm'].token, | ||||||
|             symbol=sym, |             symbol=sym, | ||||||
|             func_name=fsp_func_name, |             func_name=func_name, | ||||||
|             loglevel=loglevel, |             loglevel=loglevel, | ||||||
| 
 | 
 | ||||||
|         ) as (ctx, last_index), |         ) as (ctx, last_index), | ||||||
|         ctx.open_stream() as stream, |         ctx.open_stream() as stream, | ||||||
|         open_sidepane( | 
 | ||||||
|  |         open_fsp_sidepane( | ||||||
|             linkedsplits, |             linkedsplits, | ||||||
|             {display_name: conf}, |             {display_name: conf}, | ||||||
|         ) as sidepane, |         ) as sidepane, | ||||||
|     ): |     ): | ||||||
| 
 |  | ||||||
|         shm = conf['shm'] |         shm = conf['shm'] | ||||||
| 
 | 
 | ||||||
|         if conf.get('overlay'): |         if conf.get('overlay'): | ||||||
|  | @ -509,7 +543,7 @@ async def run_fsp( | ||||||
|                 name=display_name, |                 name=display_name, | ||||||
|                 array=shm.array, |                 array=shm.array, | ||||||
| 
 | 
 | ||||||
|                 array_key=conf['fsp_func_name'], |                 array_key=conf['func_name'], | ||||||
|                 sidepane=sidepane, |                 sidepane=sidepane, | ||||||
| 
 | 
 | ||||||
|                 # curve by default |                 # curve by default | ||||||
|  | @ -537,7 +571,7 @@ async def run_fsp( | ||||||
|             # XXX: fsp func names must be unique meaning we don't have |             # XXX: fsp func names must be unique meaning we don't have | ||||||
|             # duplicates of the underlying data even if multiple |             # duplicates of the underlying data even if multiple | ||||||
|             # sub-charts reference it under different 'named charts'. |             # sub-charts reference it under different 'named charts'. | ||||||
|             value = array[fsp_func_name][-1] |             value = array[func_name][-1] | ||||||
| 
 | 
 | ||||||
|             last_val_sticky.update_from_data(-1, value) |             last_val_sticky.update_from_data(-1, value) | ||||||
| 
 | 
 | ||||||
|  | @ -548,7 +582,7 @@ async def run_fsp( | ||||||
|         chart.update_curve_from_array( |         chart.update_curve_from_array( | ||||||
|             display_name, |             display_name, | ||||||
|             shm.array, |             shm.array, | ||||||
|             array_key=fsp_func_name |             array_key=func_name | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         chart.linked.resize_sidepanes() |         chart.linked.resize_sidepanes() | ||||||
|  | @ -559,11 +593,11 @@ async def run_fsp( | ||||||
|         # generic fills between curve types while ``PlotCurveItem`` has |         # generic fills between curve types while ``PlotCurveItem`` has | ||||||
|         # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which |         # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which | ||||||
|         # might be the best solution? |         # might be the best solution? | ||||||
|         # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) |         # graphics = chart.update_from_array(chart.name, array[func_name]) | ||||||
|         # graphics.curve.setBrush(50, 50, 200, 100) |         # graphics.curve.setBrush(50, 50, 200, 100) | ||||||
|         # graphics.curve.setFillLevel(50) |         # graphics.curve.setFillLevel(50) | ||||||
| 
 | 
 | ||||||
|         if fsp_func_name == 'rsi': |         if func_name == 'rsi': | ||||||
|             from ._lines import level_line |             from ._lines import level_line | ||||||
|             # add moveable over-[sold/bought] lines |             # add moveable over-[sold/bought] lines | ||||||
|             # and labels only for the 70/30 lines |             # and labels only for the 70/30 lines | ||||||
|  | @ -607,7 +641,7 @@ async def run_fsp( | ||||||
|                 try: |                 try: | ||||||
|                     # read last |                     # read last | ||||||
|                     array = shm.array |                     array = shm.array | ||||||
|                     value = array[-1][fsp_func_name] |                     value = array[-1][func_name] | ||||||
|                     break |                     break | ||||||
| 
 | 
 | ||||||
|                 except IndexError: |                 except IndexError: | ||||||
|  | @ -621,7 +655,7 @@ async def run_fsp( | ||||||
|             chart.update_curve_from_array( |             chart.update_curve_from_array( | ||||||
|                 display_name, |                 display_name, | ||||||
|                 array, |                 array, | ||||||
|                 array_key=fsp_func_name, |                 array_key=func_name, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # set time of last graphics update |             # set time of last graphics update | ||||||
|  | @ -707,12 +741,26 @@ async def maybe_open_vlm_display( | ||||||
|         yield |         yield | ||||||
|         return |         return | ||||||
|     else: |     else: | ||||||
|         async with open_sidepane( | 
 | ||||||
|  |         shm, opened = maybe_mk_fsp_shm( | ||||||
|  |             linked.symbol.key, | ||||||
|  |             '$_vlm', | ||||||
|  |             readonly=True, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         async with open_fsp_sidepane( | ||||||
|             linked, { |             linked, { | ||||||
|                 'volume': { |                 'volume': { | ||||||
|  | 
 | ||||||
|                     'params': { |                     'params': { | ||||||
|                         'price_func': 'ohl3' | 
 | ||||||
|                     } |                         'price_func': { | ||||||
|  |                             'default_value': 'ohl3', | ||||||
|  |                             # tell target ``Edit`` widget to not allow | ||||||
|  |                             # edits for now. | ||||||
|  |                             'widget_kwargs': {'readonly': True}, | ||||||
|  |                         }, | ||||||
|  |                     }, | ||||||
|                 } |                 } | ||||||
|             }, |             }, | ||||||
|         ) as sidepane: |         ) as sidepane: | ||||||
|  | @ -868,20 +916,39 @@ async def display_symbol_data( | ||||||
| 
 | 
 | ||||||
|         # TODO: eventually we'll support some kind of n-compose syntax |         # TODO: eventually we'll support some kind of n-compose syntax | ||||||
|         fsp_conf = { |         fsp_conf = { | ||||||
|  | 
 | ||||||
|             'rsi': { |             'rsi': { | ||||||
|                 'fsp_func_name': 'rsi', | 
 | ||||||
|                 'params': {'period': 14}, |                 # literal python func ref lookup name | ||||||
|  |                 'func_name': 'rsi', | ||||||
|  | 
 | ||||||
|  |                 # map of parameters to place on the fsp sidepane widget | ||||||
|  |                 # which should map to dynamic inputs available to the | ||||||
|  |                 # fsp function at runtime. | ||||||
|  |                 'params': { | ||||||
|  |                     'period': { | ||||||
|  |                         'default_value': 14, | ||||||
|  |                         'widget_kwargs': {'readonly': True}, | ||||||
|  |                     }, | ||||||
|  |                 }, | ||||||
|  | 
 | ||||||
|  |                 # ``ChartPlotWidget`` options passthrough | ||||||
|                 'chart_kwargs': { |                 'chart_kwargs': { | ||||||
|                     'static_yrange': (0, 100), |                     'static_yrange': (0, 100), | ||||||
|                 }, |                 }, | ||||||
|             }, |             }, | ||||||
|  | 
 | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         if has_vlm(ohlcv): |         if has_vlm(ohlcv):  # and provider != 'binance': | ||||||
|  |             # binance is too fast atm for FSPs until we wrap | ||||||
|  |             # the fsp streams as throttled ``Feeds``, see | ||||||
|  |             # | ||||||
|  | 
 | ||||||
|             # add VWAP to fsp config for downstream loading |             # add VWAP to fsp config for downstream loading | ||||||
|             fsp_conf.update({ |             fsp_conf.update({ | ||||||
|                 'vwap': { |                 'vwap': { | ||||||
|                     'fsp_func_name': 'vwap', |                     'func_name': 'vwap', | ||||||
|                     'overlay': True, |                     'overlay': True, | ||||||
|                     'anchor': 'session', |                     'anchor': 'session', | ||||||
|                 }, |                 }, | ||||||
|  | @ -900,7 +967,7 @@ async def display_symbol_data( | ||||||
|         ): |         ): | ||||||
|             # load initial fsp chain (otherwise known as "indicators") |             # load initial fsp chain (otherwise known as "indicators") | ||||||
|             ln.start_soon( |             ln.start_soon( | ||||||
|                 fan_out_spawn_fsp_daemons, |                 open_fspd_cluster, | ||||||
|                 linkedsplits, |                 linkedsplits, | ||||||
|                 fsp_conf, |                 fsp_conf, | ||||||
|                 sym, |                 sym, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue