More prep for FSP feeds
The major change is moving the fsp "daemon" (more like wanna-be fspd) endpoint to use the newer `tractor.Portal.open_context()` and bi-directional streaming api. There's a few other things in here too: - make a helper for allocating single colume fsp shm arrays - rename some some fsp related functions to be more explicit on their purposesvlm_plotz_backup
							parent
							
								
									429b6f6891
								
							
						
					
					
						commit
						6fffa071d2
					
				| 
						 | 
				
			
			@ -53,6 +53,7 @@ from ..log import get_logger
 | 
			
		|||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
# TODO: load these from a config.toml!
 | 
			
		||||
_clear_throttle_rate: int = 58  # Hz
 | 
			
		||||
_book_throttle_rate: int = 16  # Hz
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -184,6 +185,13 @@ async def update_chart_from_quotes(
 | 
			
		|||
            if vlm_chart:
 | 
			
		||||
                # print(f"volume: {end['volume']}")
 | 
			
		||||
                vlm_chart.update_curve_from_array('volume', array)
 | 
			
		||||
 | 
			
		||||
                # built-in tina $vlm FSP using chl3 typical price for ohlc step
 | 
			
		||||
                # last = array[-1]
 | 
			
		||||
                # chl3 = (last['close'] + last['high'] + last['low']) / 3
 | 
			
		||||
                # v = last['volume']
 | 
			
		||||
                # dv = last['volume'] * chl3
 | 
			
		||||
 | 
			
		||||
                vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
 | 
			
		||||
 | 
			
		||||
                if (
 | 
			
		||||
| 
						 | 
				
			
			@ -302,7 +310,38 @@ async def update_chart_from_quotes(
 | 
			
		|||
                last_mx, last_mn = mx, mn
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def fan_out_spawn_fsp_daemons(
 | 
			
		||||
def maybe_mk_fsp_shm(
 | 
			
		||||
    sym: str,
 | 
			
		||||
    field_name: str,
 | 
			
		||||
    display_name: Optional[str] = None,
 | 
			
		||||
    readonly: bool = True,
 | 
			
		||||
 | 
			
		||||
) -> (ShmArray, bool):
 | 
			
		||||
    '''Allocate a single row shm array for an symbol-fsp pair.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    uid = tractor.current_actor().uid
 | 
			
		||||
    if not display_name:
 | 
			
		||||
        display_name = field_name
 | 
			
		||||
 | 
			
		||||
    # TODO: load function here and introspect
 | 
			
		||||
    # return stream type(s)
 | 
			
		||||
 | 
			
		||||
    # TODO: should `index` be a required internal field?
 | 
			
		||||
    fsp_dtype = np.dtype([('index', int), (field_name, float)])
 | 
			
		||||
 | 
			
		||||
    key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
 | 
			
		||||
 | 
			
		||||
    shm, opened = maybe_open_shm_array(
 | 
			
		||||
        key,
 | 
			
		||||
        # TODO: create entry for each time frame
 | 
			
		||||
        dtype=fsp_dtype,
 | 
			
		||||
        readonly=True,
 | 
			
		||||
    )
 | 
			
		||||
    return shm, opened
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def open_fspd_cluster(
 | 
			
		||||
 | 
			
		||||
    linkedsplits: LinkedSplits,
 | 
			
		||||
    fsps: dict[str, str],
 | 
			
		||||
| 
						 | 
				
			
			@ -321,9 +360,8 @@ async def fan_out_spawn_fsp_daemons(
 | 
			
		|||
    '''
 | 
			
		||||
    linkedsplits.focus()
 | 
			
		||||
 | 
			
		||||
    uid = tractor.current_actor().uid
 | 
			
		||||
 | 
			
		||||
    # spawns sub-processes which execute cpu bound FSP code
 | 
			
		||||
    # spawns sub-processes which execute cpu bound fsp work
 | 
			
		||||
    # which is streamed back to this parent.
 | 
			
		||||
    async with (
 | 
			
		||||
        tractor.open_nursery() as n,
 | 
			
		||||
        trio.open_nursery() as ln,
 | 
			
		||||
| 
						 | 
				
			
			@ -334,21 +372,12 @@ async def fan_out_spawn_fsp_daemons(
 | 
			
		|||
        # scale horizonatlly once cores are used up.
 | 
			
		||||
        for display_name, conf in fsps.items():
 | 
			
		||||
 | 
			
		||||
            fsp_func_name = conf['fsp_func_name']
 | 
			
		||||
            func_name = conf['func_name']
 | 
			
		||||
 | 
			
		||||
            # TODO: load function here and introspect
 | 
			
		||||
            # return stream type(s)
 | 
			
		||||
 | 
			
		||||
            # TODO: should `index` be a required internal field?
 | 
			
		||||
            fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
 | 
			
		||||
 | 
			
		||||
            key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
 | 
			
		||||
 | 
			
		||||
            # this is all sync currently
 | 
			
		||||
            shm, opened = maybe_open_shm_array(
 | 
			
		||||
                key,
 | 
			
		||||
                # TODO: create entry for each time frame
 | 
			
		||||
                dtype=fsp_dtype,
 | 
			
		||||
            shm, opened = maybe_mk_fsp_shm(
 | 
			
		||||
                sym,
 | 
			
		||||
                field_name=func_name,
 | 
			
		||||
                display_name=display_name,
 | 
			
		||||
                readonly=True,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -366,13 +395,13 @@ async def fan_out_spawn_fsp_daemons(
 | 
			
		|||
 | 
			
		||||
            # init async
 | 
			
		||||
            ln.start_soon(
 | 
			
		||||
                run_fsp,
 | 
			
		||||
                update_chart_from_fsp,
 | 
			
		||||
                portal,
 | 
			
		||||
                linkedsplits,
 | 
			
		||||
                brokermod,
 | 
			
		||||
                sym,
 | 
			
		||||
                src_shm,
 | 
			
		||||
                fsp_func_name,
 | 
			
		||||
                func_name,
 | 
			
		||||
                display_name,
 | 
			
		||||
                conf,
 | 
			
		||||
                group_status_key,
 | 
			
		||||
| 
						 | 
				
			
			@ -383,7 +412,7 @@ async def fan_out_spawn_fsp_daemons(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def open_sidepane(
 | 
			
		||||
async def open_fsp_sidepane(
 | 
			
		||||
 | 
			
		||||
    linked: LinkedSplits,
 | 
			
		||||
    conf: dict[str, dict[str, str]],
 | 
			
		||||
| 
						 | 
				
			
			@ -403,8 +432,11 @@ async def open_sidepane(
 | 
			
		|||
            }
 | 
			
		||||
 | 
			
		||||
        # add parameters for selection "options"
 | 
			
		||||
        defaults = config.get('params', {})
 | 
			
		||||
        for name, default in defaults.items():
 | 
			
		||||
        params = config.get('params', {})
 | 
			
		||||
        for name, config in params.items():
 | 
			
		||||
 | 
			
		||||
            default = config['default_value']
 | 
			
		||||
            kwargs = config.get('widget_kwargs', {})
 | 
			
		||||
 | 
			
		||||
            # add to ORM schema
 | 
			
		||||
            schema.update({
 | 
			
		||||
| 
						 | 
				
			
			@ -412,6 +444,7 @@ async def open_sidepane(
 | 
			
		|||
                    'label': f'**{name}**:',
 | 
			
		||||
                    'type': 'edit',
 | 
			
		||||
                    'default_value': default,
 | 
			
		||||
                    'kwargs': kwargs,
 | 
			
		||||
                },
 | 
			
		||||
            })
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -424,7 +457,7 @@ async def open_sidepane(
 | 
			
		|||
    FspConfig = create_model(
 | 
			
		||||
        'FspConfig',
 | 
			
		||||
        name=display_name,
 | 
			
		||||
        **defaults,
 | 
			
		||||
        **params,
 | 
			
		||||
    )
 | 
			
		||||
    sidepane.model = FspConfig()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -444,14 +477,15 @@ async def open_sidepane(
 | 
			
		|||
        yield sidepane
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def run_fsp(
 | 
			
		||||
async def update_chart_from_fsp(
 | 
			
		||||
 | 
			
		||||
    portal: tractor.Portal,
 | 
			
		||||
 | 
			
		||||
    portal: tractor._portal.Portal,
 | 
			
		||||
    linkedsplits: LinkedSplits,
 | 
			
		||||
    brokermod: ModuleType,
 | 
			
		||||
    sym: str,
 | 
			
		||||
    src_shm: ShmArray,
 | 
			
		||||
    fsp_func_name: str,
 | 
			
		||||
    func_name: str,
 | 
			
		||||
    display_name: str,
 | 
			
		||||
    conf: dict[str, dict],
 | 
			
		||||
    group_status_key: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -470,7 +504,7 @@ async def run_fsp(
 | 
			
		|||
    )
 | 
			
		||||
 | 
			
		||||
    async with (
 | 
			
		||||
        portal.open_stream_from(
 | 
			
		||||
        portal.open_context(
 | 
			
		||||
 | 
			
		||||
            # chaining entrypoint
 | 
			
		||||
            fsp.cascade,
 | 
			
		||||
| 
						 | 
				
			
			@ -480,21 +514,17 @@ async def run_fsp(
 | 
			
		|||
            src_shm_token=src_shm.token,
 | 
			
		||||
            dst_shm_token=conf['shm'].token,
 | 
			
		||||
            symbol=sym,
 | 
			
		||||
            fsp_func_name=fsp_func_name,
 | 
			
		||||
            func_name=func_name,
 | 
			
		||||
            loglevel=loglevel,
 | 
			
		||||
 | 
			
		||||
        ) as stream,
 | 
			
		||||
        ) as (ctx, last_index),
 | 
			
		||||
        ctx.open_stream() as stream,
 | 
			
		||||
 | 
			
		||||
        open_sidepane(
 | 
			
		||||
        open_fsp_sidepane(
 | 
			
		||||
            linkedsplits,
 | 
			
		||||
            {display_name: conf},
 | 
			
		||||
        ) as sidepane,
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        # receive last index for processed historical
 | 
			
		||||
        # data-array as first msg
 | 
			
		||||
        _ = await stream.receive()
 | 
			
		||||
 | 
			
		||||
        shm = conf['shm']
 | 
			
		||||
 | 
			
		||||
        if conf.get('overlay'):
 | 
			
		||||
| 
						 | 
				
			
			@ -513,7 +543,7 @@ async def run_fsp(
 | 
			
		|||
                name=display_name,
 | 
			
		||||
                array=shm.array,
 | 
			
		||||
 | 
			
		||||
                array_key=conf['fsp_func_name'],
 | 
			
		||||
                array_key=conf['func_name'],
 | 
			
		||||
                sidepane=sidepane,
 | 
			
		||||
 | 
			
		||||
                # curve by default
 | 
			
		||||
| 
						 | 
				
			
			@ -541,7 +571,7 @@ async def run_fsp(
 | 
			
		|||
            # XXX: fsp func names must be unique meaning we don't have
 | 
			
		||||
            # duplicates of the underlying data even if multiple
 | 
			
		||||
            # sub-charts reference it under different 'named charts'.
 | 
			
		||||
            value = array[fsp_func_name][-1]
 | 
			
		||||
            value = array[func_name][-1]
 | 
			
		||||
 | 
			
		||||
            last_val_sticky.update_from_data(-1, value)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -552,7 +582,7 @@ async def run_fsp(
 | 
			
		|||
        chart.update_curve_from_array(
 | 
			
		||||
            display_name,
 | 
			
		||||
            shm.array,
 | 
			
		||||
            array_key=fsp_func_name
 | 
			
		||||
            array_key=func_name
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        chart.linked.resize_sidepanes()
 | 
			
		||||
| 
						 | 
				
			
			@ -563,11 +593,11 @@ async def run_fsp(
 | 
			
		|||
        # generic fills between curve types while ``PlotCurveItem`` has
 | 
			
		||||
        # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
 | 
			
		||||
        # might be the best solution?
 | 
			
		||||
        # graphics = chart.update_from_array(chart.name, array[fsp_func_name])
 | 
			
		||||
        # graphics = chart.update_from_array(chart.name, array[func_name])
 | 
			
		||||
        # graphics.curve.setBrush(50, 50, 200, 100)
 | 
			
		||||
        # graphics.curve.setFillLevel(50)
 | 
			
		||||
 | 
			
		||||
        if fsp_func_name == 'rsi':
 | 
			
		||||
        if func_name == 'rsi':
 | 
			
		||||
            from ._lines import level_line
 | 
			
		||||
            # add moveable over-[sold/bought] lines
 | 
			
		||||
            # and labels only for the 70/30 lines
 | 
			
		||||
| 
						 | 
				
			
			@ -611,7 +641,7 @@ async def run_fsp(
 | 
			
		|||
                try:
 | 
			
		||||
                    # read last
 | 
			
		||||
                    array = shm.array
 | 
			
		||||
                    value = array[-1][fsp_func_name]
 | 
			
		||||
                    value = array[-1][func_name]
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
                except IndexError:
 | 
			
		||||
| 
						 | 
				
			
			@ -625,7 +655,7 @@ async def run_fsp(
 | 
			
		|||
            chart.update_curve_from_array(
 | 
			
		||||
                display_name,
 | 
			
		||||
                array,
 | 
			
		||||
                array_key=fsp_func_name,
 | 
			
		||||
                array_key=func_name,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # set time of last graphics update
 | 
			
		||||
| 
						 | 
				
			
			@ -711,12 +741,26 @@ async def maybe_open_vlm_display(
 | 
			
		|||
        yield
 | 
			
		||||
        return
 | 
			
		||||
    else:
 | 
			
		||||
        async with open_sidepane(
 | 
			
		||||
 | 
			
		||||
        shm, opened = maybe_mk_fsp_shm(
 | 
			
		||||
            linked.symbol.key,
 | 
			
		||||
            '$_vlm',
 | 
			
		||||
            readonly=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        async with open_fsp_sidepane(
 | 
			
		||||
            linked, {
 | 
			
		||||
                'volume': {
 | 
			
		||||
 | 
			
		||||
                    'params': {
 | 
			
		||||
                        'price_func': 'ohl3'
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                        'price_func': {
 | 
			
		||||
                            'default_value': 'ohl3',
 | 
			
		||||
                            # tell target ``Edit`` widget to not allow
 | 
			
		||||
                            # edits for now.
 | 
			
		||||
                            'widget_kwargs': {'readonly': True},
 | 
			
		||||
                        },
 | 
			
		||||
                    },
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
        ) as sidepane:
 | 
			
		||||
| 
						 | 
				
			
			@ -872,20 +916,39 @@ async def display_symbol_data(
 | 
			
		|||
 | 
			
		||||
        # TODO: eventually we'll support some kind of n-compose syntax
 | 
			
		||||
        fsp_conf = {
 | 
			
		||||
 | 
			
		||||
            'rsi': {
 | 
			
		||||
                'fsp_func_name': 'rsi',
 | 
			
		||||
                'params': {'period': 14},
 | 
			
		||||
 | 
			
		||||
                # literal python func ref lookup name
 | 
			
		||||
                'func_name': 'rsi',
 | 
			
		||||
 | 
			
		||||
                # map of parameters to place on the fsp sidepane widget
 | 
			
		||||
                # which should map to dynamic inputs available to the
 | 
			
		||||
                # fsp function at runtime.
 | 
			
		||||
                'params': {
 | 
			
		||||
                    'period': {
 | 
			
		||||
                        'default_value': 14,
 | 
			
		||||
                        'widget_kwargs': {'readonly': True},
 | 
			
		||||
                    },
 | 
			
		||||
                },
 | 
			
		||||
 | 
			
		||||
                # ``ChartPlotWidget`` options passthrough
 | 
			
		||||
                'chart_kwargs': {
 | 
			
		||||
                    'static_yrange': (0, 100),
 | 
			
		||||
                },
 | 
			
		||||
            },
 | 
			
		||||
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if has_vlm(ohlcv):
 | 
			
		||||
        if has_vlm(ohlcv): # and provider != 'binance':
 | 
			
		||||
            # binance is too fast atm for FSPs until we wrap
 | 
			
		||||
            # the fsp streams as throttled ``Feeds``, see
 | 
			
		||||
            #
 | 
			
		||||
 | 
			
		||||
            # add VWAP to fsp config for downstream loading
 | 
			
		||||
            fsp_conf.update({
 | 
			
		||||
                'vwap': {
 | 
			
		||||
                    'fsp_func_name': 'vwap',
 | 
			
		||||
                    'func_name': 'vwap',
 | 
			
		||||
                    'overlay': True,
 | 
			
		||||
                    'anchor': 'session',
 | 
			
		||||
                },
 | 
			
		||||
| 
						 | 
				
			
			@ -904,7 +967,7 @@ async def display_symbol_data(
 | 
			
		|||
        ):
 | 
			
		||||
            # load initial fsp chain (otherwise known as "indicators")
 | 
			
		||||
            ln.start_soon(
 | 
			
		||||
                fan_out_spawn_fsp_daemons,
 | 
			
		||||
                open_fspd_cluster,
 | 
			
		||||
                linkedsplits,
 | 
			
		||||
                fsp_conf,
 | 
			
		||||
                sym,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue