From c6a3c66e7eeb541ee155e7737e858052372edf28 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 26 Jan 2022 14:38:49 -0500 Subject: [PATCH] WIP start a `@piker.fsp` API for registering processors --- piker/fsp/_api.py | 178 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 piker/fsp/_api.py diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py new file mode 100644 index 00000000..6c3c6cd7 --- /dev/null +++ b/piker/fsp/_api.py @@ -0,0 +1,178 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +FSP (financial signal processing) apis. + +''' +from __future__ import annotations +from functools import partial +from typing import ( + Any, + Callable, + Awaitable, + Optional, +) + +import numpy as np +import tractor +from tractor._portal import NamespacePath +# import wrapt + +from ..data._sharedmem import ( + ShmArray, + maybe_open_shm_array, +) + + +# global fsp registry filled out by @fsp decorator below +_fsp_builtins = {} + # 'rsi': _rsi, + # 'wma': _wma, + # 'vwap': _tina_vwap, + # 'dolla_vlm': dolla_vlm, +# } + +def _load_builtins() -> dict[tuple, Callable]: + from ._momo import _rsi, _wma + from ._volume import tina_vwap, dolla_vlm + + return _fsp_builtins + +# TODO: things to figure the heck out: +# - how to handle non-plottable values (pyqtgraph has facility for this +# now in `arrayToQPath()`) +# - composition of fsps / implicit chaining syntax (we need an issue) + + +class Fsp: + ''' + "Financial signal processor" decorator wrapped async function. + + ''' + + # TODO: checkout the advanced features from ``wrapt``: + # - dynamic enable toggling, + # https://wrapt.readthedocs.io/en/latest/decorators.html#dynamically-disabling-decorators + # - custom object proxies, might be useful for implementing n-compose + # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-object-proxies + # - custom function wrappers, + # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers + + def __init__( + self, + func: Callable[..., Awaitable], + *, + outputs: tuple[str] = (), + display_name: Optional[str] = None, + **config, + + ) -> None: + # if wrapped is not None: + # self.name = wrapped.__name__ + # TODO: should we make this a wrapt object proxy? + self.func = func + self.__name__ = func.__name__ + self.__module__ = func.__module__ + self.ns_path: tuple[str, str] = NamespacePath.from_ref(func) + _fsp_builtins[self.ns_path] = func + self.outputs = outputs + self.config: dict[str, Any] = config + + # @wrapt.decorator + def __call__( + self, + + # TODO: when we settle on py3.10 we should probably use the new + # type annots from pep 612: + # https://www.python.org/dev/peps/pep-0612/ + # instance, + *args, + **kwargs + ): + return self.func(*args, **kwargs) + # return wrapped(*args, **kwargs) + + +def fsp( + wrapped=None, + *, + outputs: tuple[str] = (), + display_name: Optional[str] = None, + **config, + +) -> Fsp: + # @wrapt.decorator + # def wrapper(wrapped, instance, args, kwargs): + # return wrapped(*args, **kwargs) + + if wrapped is None: + # return functools.partial(with_optional_arguments, + # myarg1=myarg1, myarg2=myarg2) + return partial( + Fsp, + outputs=outputs, + display_name=display_name, + **config, + ) + + # return wrapper(wrapped) + return Fsp(wrapped, outputs=(wrapped.__name__,)) + # outputs=outputs, + # display_name=display_name, + # **config, + # )(wrapped) + + +def maybe_mk_fsp_shm( + sym: str, + target: fsp, + # field_name: str, + # display_name: Optional[str] = None, + readonly: bool = True, + +) -> (ShmArray, bool): + ''' + Allocate a single row shm array for an symbol-fsp pair if none + exists, otherwise load the shm already existing for that token. + + ''' + uid = tractor.current_actor().uid + + # load declared fields from fsp and allocate in + # shm array. + # if not display_name: + # display_name = field_name + + # TODO: load function here and introspect + # return stream type(s) + display_name = target.__name__ + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype( + [('index', int)] + + [(field_name, float) for field_name in target.outputs] + ) + + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + return shm, opened