diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index aafaf76c..dac672bb 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -27,29 +27,19 @@ import pyqtgraph as pg import trio from trio_typing import TaskStatus import tractor +from tractor._portal import NamespacePath from ..log import get_logger, get_console_log from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -from ._momo import _rsi, _wma -from ._volume import _tina_vwap, dolla_vlm +# from ._momo import _rsi, _wma +# from ._volume import _tina_vwap, dolla_vlm +from ._api import _load_builtins log = get_logger(__name__) -_fsp_builtins = { - 'rsi': _rsi, - 'wma': _wma, - 'vwap': _tina_vwap, - 'dolla_vlm': dolla_vlm, -} - -# TODO: things to figure the heck out: -# - how to handle non-plottable values (pyqtgraph has facility for this -# now in `arrayToQPath()`) -# - composition of fsps / implicit chaining syntax (we need an issue) - @dataclass class TaskTracker: @@ -88,7 +78,7 @@ async def fsp_compute( src: ShmArray, dst: ShmArray, - func_name: str, + # func_name: str, func: Callable, attach_stream: bool = False, @@ -115,15 +105,27 @@ async def fsp_compute( # and get historical output history_output = await out_stream.__anext__() + func_name = func.__name__ profiler(f'{func_name} generated history') - # build a struct array which includes an 'index' field to push - # as history + # build struct array with an 'index' field to push as history history = np.array( np.arange(len(history_output)), dtype=dst.array.dtype ) - history[func_name] = history_output + + # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? + # if the output array is multi-field then push + # each respective field. + fields = getattr(history.dtype, 'fields', None) + if fields: + for key in fields.keys(): + if key in history.dtype.fields: + history[func_name] = history_output + + # single-key output stream + else: + history[func_name] = history_output # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're @@ -164,8 +166,9 @@ async def fsp_compute( async for processed in out_stream: log.debug(f"{func_name}: {processed}") + key, output = processed index = src.index - dst.array[-1][func_name] = processed + dst.array[-1][key] = output # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts @@ -194,7 +197,7 @@ async def cascade( src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], - func_name: str, + ns_path: NamespacePath, zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -213,10 +216,14 @@ async def cascade( src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) - func: Callable = _fsp_builtins.get(func_name) + # func: Callable = _fsp_builtins.get(tuple(ns_path)) + func: Fsp = _load_builtins().get( + NamespacePath(ns_path) + ) + if not func: # TODO: assume it's a func target path - raise ValueError('Unknown fsp target: {func_name}') + raise ValueError(f'Unknown fsp target: {ns_path}') # open a data feed stream with requested broker async with data.feed.maybe_open_feed( @@ -231,11 +238,12 @@ async def cascade( ) as (feed, quote_stream): - profiler(f'{func_name}: feed up') + profiler(f'{func}: feed up') assert src.token == feed.shm.token # last_len = new_len = len(src.array) + func_name = func.__name__ async with ( trio.open_nursery() as n, ): @@ -252,7 +260,7 @@ async def cascade( src=src, dst=dst, - func_name=func_name, + # func_name=func_name, func=func ) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 22196f23..97dd5022 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -29,6 +29,7 @@ import numpy as np from pydantic import create_model import tractor # from tractor.trionics import gather_contexts +from tractor._portal import NamespacePath import pyqtgraph as pg import trio from trio_typing import TaskStatus @@ -38,57 +39,25 @@ from .._cacheables import maybe_open_context from ..calc import humanize from ..data._sharedmem import ( ShmArray, - maybe_open_shm_array, try_read, ) from ._chart import ( ChartPlotWidget, LinkedSplits, ) -from .. import fsp from ._forms import ( FieldsForm, mk_form, open_form_input_handling, ) +from ..fsp._api import maybe_mk_fsp_shm, Fsp +from ..fsp import cascade +from ..fsp._volume import tina_vwap, dolla_vlm from ..log import get_logger log = get_logger(__name__) -def maybe_mk_fsp_shm( - sym: str, - field_name: str, - display_name: Optional[str] = None, - readonly: bool = True, - -) -> (ShmArray, bool): - ''' - Allocate a single row shm array for an symbol-fsp pair if none - exists, otherwise load the shm already existing for that token. - - ''' - uid = tractor.current_actor().uid - if not display_name: - display_name = field_name - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (field_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) - return shm, opened - - def has_vlm(ohlcv: ShmArray) -> bool: # make sure that the instrument supports volume history # (sometimes this is not the case for some commodities and @@ -148,11 +117,13 @@ async def open_fsp_sidepane( assert len(conf) == 1 # for now # add (single) selection widget - for display_name, config in conf.items(): - schema[display_name] = { + for name, config in conf.items(): + # schema[display_name] = { + # name = target.__name__ + schema[name] = { 'label': '**fsp**:', 'type': 'select', - 'default_value': [display_name], + 'default_value': [name], } # add parameters for selection "options" @@ -180,7 +151,7 @@ async def open_fsp_sidepane( # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation FspConfig = create_model( 'FspConfig', - name=display_name, + name=name, **params, ) sidepane.model = FspConfig() @@ -228,8 +199,9 @@ async def run_fsp_ui( linkedsplits: LinkedSplits, shm: ShmArray, started: trio.Event, - func_name: str, - display_name: str, + target: Fsp, + # func_name: str, + # display_name: str, conf: dict[str, dict], loglevel: str, # profiler: pg.debug.Profiler, @@ -245,12 +217,14 @@ async def run_fsp_ui( ''' # profiler(f'started UI task for fsp: {func_name}') + name = target.__name__ async with ( # side UI for parameters/controls open_fsp_sidepane( linkedsplits, - {display_name: conf}, + {name: conf}, + # {display_name: conf}, ) as sidepane, ): await started.wait() @@ -264,24 +238,29 @@ async def run_fsp_ui( chart = linkedsplits.subplots[overlay_with] chart.draw_curve( - name=display_name, + # name=display_name, + name=name, data=shm.array, overlay=True, color='default_light', - array_key=func_name, + # array_key=func_name, + array_key=name, separate_axes=conf.get('separate_axes', False), **conf.get('chart_kwargs', {}) ) # specially store ref to shm for lookup in display loop - chart._overlays[display_name] = shm + # chart._overlays[display_name] = shm + chart._overlays[name] = shm else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( - name=display_name, + name=name, + # name=display_name, array=shm.array, - array_key=func_name, + # array_key=func_name, + array_key=name, sidepane=sidepane, # curve by default @@ -299,7 +278,8 @@ async def run_fsp_ui( # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name - array_key = func_name + # array_key = func_name + array_key = name # profiler(f'fsp:{func_name} chart created') @@ -307,7 +287,8 @@ async def run_fsp_ui( update_fsp_chart( chart, shm, - display_name, + name, + # display_name, array_key=array_key, ) @@ -410,7 +391,7 @@ class FspAdmin: started: trio.Event, dst_shm: ShmArray, conf: dict, - func_name: str, + target: Fsp, loglevel: str, ) -> None: @@ -420,11 +401,12 @@ class FspAdmin: ''' brokername, sym = self.linked.symbol.front_feed() + ns_path = NamespacePath.from_ref(target) async with ( portal.open_context( # chaining entrypoint - fsp.cascade, + cascade, # data feed key brokername=brokername, @@ -435,7 +417,8 @@ class FspAdmin: dst_shm_token=dst_shm.token, # target - func_name=func_name, + ns_path=str(ns_path), + # func_name=func_name, loglevel=loglevel, zero_on_step=conf.get('zero_on_step', False), @@ -444,8 +427,13 @@ class FspAdmin: ctx.open_stream() as stream, ): # register output data - self._registry[(brokername, sym, func_name)] = ( - stream, dst_shm, complete) + self._registry[ + (brokername, sym, ns_path) + ] = ( + stream, + dst_shm, + complete + ) started.set() @@ -455,7 +443,8 @@ class FspAdmin: async def start_engine_task( self, - display_name: str, + target: Fsp, + # display_name: str, conf: dict[str, dict[str, Any]], worker_name: Optional[str] = None, @@ -464,17 +453,21 @@ class FspAdmin: ) -> (ShmArray, trio.Event): # unpack FSP details from config dict - func_name = conf['func_name'] - + # func_name = conf['func_name'] + # func_name = target.__name__ + fqsn = self.linked.symbol.front_feed() # allocate an output shm array dst_shm, opened = maybe_mk_fsp_shm( - self.linked.symbol.front_feed(), - field_name=func_name, - display_name=display_name, + fqsn, + # field_name=func_name, + # display_name=display_name, + target=target, readonly=True, ) - if not opened: - raise RuntimeError(f'Already started FSP {func_name}') + # if not opened: + # raise RuntimeError( + # f'Already started FSP `{fqsn}:{func_name}`' + # ) portal = self.cluster.get(worker_name) or self.rr_next_portal() complete = trio.Event() @@ -487,7 +480,8 @@ class FspAdmin: started, dst_shm, conf, - func_name, + # func_name, + target, loglevel, ) @@ -495,16 +489,21 @@ class FspAdmin: async def open_fsp_chart( self, - display_name: str, + + target: Fsp, + # display_name: str, + conf: dict, # yeah probably dumb.. loglevel: str = 'error', ) -> (trio.Event, ChartPlotWidget): - func_name = conf['func_name'] + # func_name = conf['func_name'] + # func_name = target.__name__ shm, started = await self.start_engine_task( - display_name, + target, + # display_name, conf, loglevel, ) @@ -517,8 +516,9 @@ class FspAdmin: self.linked, shm, started, - func_name, - display_name, + # func_name, + # display_name, + target, conf=conf, loglevel=loglevel, @@ -671,8 +671,9 @@ async def open_vlm_displays( # spawn and overlay $ vlm on the same subchart shm, started = await admin.start_engine_task( - 'dolla_vlm', - # linked.symbol.front_feed(), # data-feed symbol key + # 'dolla_vlm', + dolla_vlm, + { # fsp engine conf 'func_name': 'dolla_vlm', 'zero_on_step': True, @@ -759,15 +760,18 @@ async def open_vlm_displays( axis.size_to_values() # built-in vlm fsps - for display_name, conf in { - 'vwap': { - 'func_name': 'vwap', + # for display_name, conf in { + for target, conf in { + # 'vwap': { + tina_vwap: { + # 'func_name': 'vwap', 'overlay': 'ohlc', # overlays with OHLCV (main) chart 'anchor': 'session', }, }.items(): started = await admin.open_fsp_chart( - display_name, + # display_name, + target, conf, ) @@ -822,20 +826,22 @@ async def start_fsp_displays( open_fsp_admin(linked, ohlcv) as admin, ): statuses = [] - for display_name, conf in fsp_conf.items(): + # for display_name, conf in fsp_conf.items(): + for target, conf in fsp_conf.items(): started = await admin.open_fsp_chart( - display_name, + # display_name, + target, conf, ) done = linked.window().status_bar.open_status( - f'loading fsp, {display_name}..', + f'loading fsp, {target}..', group_key=group_status_key, ) statuses.append((started, done)) for fsp_loaded, status_cb in statuses: await fsp_loaded.wait() - profiler(f'attached to fsp portal: {display_name}') + profiler(f'attached to fsp portal: {target}') status_cb() # blocks on nursery until all fsp actors complete