Use `Fsp` abstration layer through engine and UI

Instead of referencing the remote processing funcs by a `str` name start
embracing the new `@fsp`/`Fsp` API such that wrapped processing
functions are first class APIs.

Summary of the changeset:
- move and load the fsp built-in set in the new `.fsp._api` module
- handle processors ("fsps") which want to yield multiple keyed-values
  (interleaved in time) by expecting both history that is keyed and
  assigned to the appropriate struct-array field, *and* real-time
  `yield`ed value in tuples of the form `tuple[str, float]` such that
  any one (async) processing function can deliver multiple outputs from
  the same base calculation.
- drop `maybe_mk_fsp_shm()` from UI module
- expect and manage `Fsp` instances (`@fsp` decorated funcs) throughout
  the UI code, particularly the `FspAdmin` layer.
dark_vlm
Tyler Goodlet 2022-01-27 18:57:16 -05:00
parent 72f4474273
commit cc5390376c
2 changed files with 117 additions and 103 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of piker0) # Copyright (C) Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -27,29 +27,19 @@ import pyqtgraph as pg
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor._portal import NamespacePath
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .. import data from .. import data
from ..data import attach_shm_array from ..data import attach_shm_array
from ..data.feed import Feed from ..data.feed import Feed
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ._momo import _rsi, _wma # from ._momo import _rsi, _wma
from ._volume import _tina_vwap, dolla_vlm # from ._volume import _tina_vwap, dolla_vlm
from ._api import _load_builtins
log = get_logger(__name__) log = get_logger(__name__)
_fsp_builtins = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
'dolla_vlm': dolla_vlm,
}
# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)
@dataclass @dataclass
class TaskTracker: class TaskTracker:
@ -88,7 +78,7 @@ async def fsp_compute(
src: ShmArray, src: ShmArray,
dst: ShmArray, dst: ShmArray,
func_name: str, # func_name: str,
func: Callable, func: Callable,
attach_stream: bool = False, attach_stream: bool = False,
@ -115,15 +105,27 @@ async def fsp_compute(
# and get historical output # and get historical output
history_output = await out_stream.__anext__() history_output = await out_stream.__anext__()
func_name = func.__name__
profiler(f'{func_name} generated history') profiler(f'{func_name} generated history')
# build a struct array which includes an 'index' field to push # build struct array with an 'index' field to push as history
# as history
history = np.array( history = np.array(
np.arange(len(history_output)), np.arange(len(history_output)),
dtype=dst.array.dtype dtype=dst.array.dtype
) )
history[func_name] = history_output
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
# if the output array is multi-field then push
# each respective field.
fields = getattr(history.dtype, 'fields', None)
if fields:
for key in fields.keys():
if key in history.dtype.fields:
history[func_name] = history_output
# single-key output stream
else:
history[func_name] = history_output
# TODO: XXX: # TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're # THERE'S A BIG BUG HERE WITH THE `index` field since we're
@ -164,8 +166,9 @@ async def fsp_compute(
async for processed in out_stream: async for processed in out_stream:
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
key, output = processed
index = src.index index = src.index
dst.array[-1][func_name] = processed dst.array[-1][key] = output
# NOTE: for now we aren't streaming this to the consumer # NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts # stream latest array index entry which basically just acts
@ -194,7 +197,7 @@ async def cascade(
src_shm_token: dict, src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype], dst_shm_token: tuple[str, np.dtype],
func_name: str, ns_path: NamespacePath,
zero_on_step: bool = False, zero_on_step: bool = False,
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -213,10 +216,14 @@ async def cascade(
src = attach_shm_array(token=src_shm_token) src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token)
func: Callable = _fsp_builtins.get(func_name) # func: Callable = _fsp_builtins.get(tuple(ns_path))
func: Fsp = _load_builtins().get(
NamespacePath(ns_path)
)
if not func: if not func:
# TODO: assume it's a func target path # TODO: assume it's a func target path
raise ValueError('Unknown fsp target: {func_name}') raise ValueError(f'Unknown fsp target: {ns_path}')
# open a data feed stream with requested broker # open a data feed stream with requested broker
async with data.feed.maybe_open_feed( async with data.feed.maybe_open_feed(
@ -231,11 +238,12 @@ async def cascade(
) as (feed, quote_stream): ) as (feed, quote_stream):
profiler(f'{func_name}: feed up') profiler(f'{func}: feed up')
assert src.token == feed.shm.token assert src.token == feed.shm.token
# last_len = new_len = len(src.array) # last_len = new_len = len(src.array)
func_name = func.__name__
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
@ -252,7 +260,7 @@ async def cascade(
src=src, src=src,
dst=dst, dst=dst,
func_name=func_name, # func_name=func_name,
func=func func=func
) )

View File

@ -29,6 +29,7 @@ import numpy as np
from pydantic import create_model from pydantic import create_model
import tractor import tractor
# from tractor.trionics import gather_contexts # from tractor.trionics import gather_contexts
from tractor._portal import NamespacePath
import pyqtgraph as pg import pyqtgraph as pg
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -38,57 +39,25 @@ from .._cacheables import maybe_open_context
from ..calc import humanize from ..calc import humanize
from ..data._sharedmem import ( from ..data._sharedmem import (
ShmArray, ShmArray,
maybe_open_shm_array,
try_read, try_read,
) )
from ._chart import ( from ._chart import (
ChartPlotWidget, ChartPlotWidget,
LinkedSplits, LinkedSplits,
) )
from .. import fsp
from ._forms import ( from ._forms import (
FieldsForm, FieldsForm,
mk_form, mk_form,
open_form_input_handling, open_form_input_handling,
) )
from ..fsp._api import maybe_mk_fsp_shm, Fsp
from ..fsp import cascade
from ..fsp._volume import tina_vwap, dolla_vlm
from ..log import get_logger from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
def maybe_mk_fsp_shm(
sym: str,
field_name: str,
display_name: Optional[str] = None,
readonly: bool = True,
) -> (ShmArray, bool):
'''
Allocate a single row shm array for an symbol-fsp pair if none
exists, otherwise load the shm already existing for that token.
'''
uid = tractor.current_actor().uid
if not display_name:
display_name = field_name
# TODO: load function here and introspect
# return stream type(s)
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (field_name, float)])
key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
return shm, opened
def has_vlm(ohlcv: ShmArray) -> bool: def has_vlm(ohlcv: ShmArray) -> bool:
# make sure that the instrument supports volume history # make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and # (sometimes this is not the case for some commodities and
@ -148,11 +117,13 @@ async def open_fsp_sidepane(
assert len(conf) == 1 # for now assert len(conf) == 1 # for now
# add (single) selection widget # add (single) selection widget
for display_name, config in conf.items(): for name, config in conf.items():
schema[display_name] = { # schema[display_name] = {
# name = target.__name__
schema[name] = {
'label': '**fsp**:', 'label': '**fsp**:',
'type': 'select', 'type': 'select',
'default_value': [display_name], 'default_value': [name],
} }
# add parameters for selection "options" # add parameters for selection "options"
@ -180,7 +151,7 @@ async def open_fsp_sidepane(
# https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation
FspConfig = create_model( FspConfig = create_model(
'FspConfig', 'FspConfig',
name=display_name, name=name,
**params, **params,
) )
sidepane.model = FspConfig() sidepane.model = FspConfig()
@ -228,8 +199,9 @@ async def run_fsp_ui(
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
shm: ShmArray, shm: ShmArray,
started: trio.Event, started: trio.Event,
func_name: str, target: Fsp,
display_name: str, # func_name: str,
# display_name: str,
conf: dict[str, dict], conf: dict[str, dict],
loglevel: str, loglevel: str,
# profiler: pg.debug.Profiler, # profiler: pg.debug.Profiler,
@ -245,12 +217,14 @@ async def run_fsp_ui(
''' '''
# profiler(f'started UI task for fsp: {func_name}') # profiler(f'started UI task for fsp: {func_name}')
name = target.__name__
async with ( async with (
# side UI for parameters/controls # side UI for parameters/controls
open_fsp_sidepane( open_fsp_sidepane(
linkedsplits, linkedsplits,
{display_name: conf}, {name: conf},
# {display_name: conf},
) as sidepane, ) as sidepane,
): ):
await started.wait() await started.wait()
@ -264,24 +238,29 @@ async def run_fsp_ui(
chart = linkedsplits.subplots[overlay_with] chart = linkedsplits.subplots[overlay_with]
chart.draw_curve( chart.draw_curve(
name=display_name, # name=display_name,
name=name,
data=shm.array, data=shm.array,
overlay=True, overlay=True,
color='default_light', color='default_light',
array_key=func_name, # array_key=func_name,
array_key=name,
separate_axes=conf.get('separate_axes', False), separate_axes=conf.get('separate_axes', False),
**conf.get('chart_kwargs', {}) **conf.get('chart_kwargs', {})
) )
# specially store ref to shm for lookup in display loop # specially store ref to shm for lookup in display loop
chart._overlays[display_name] = shm # chart._overlays[display_name] = shm
chart._overlays[name] = shm
else: else:
# create a new sub-chart widget for this fsp # create a new sub-chart widget for this fsp
chart = linkedsplits.add_plot( chart = linkedsplits.add_plot(
name=display_name, name=name,
# name=display_name,
array=shm.array, array=shm.array,
array_key=func_name, # array_key=func_name,
array_key=name,
sidepane=sidepane, sidepane=sidepane,
# curve by default # curve by default
@ -299,7 +278,8 @@ async def run_fsp_ui(
# should **not** be the same sub-chart widget # should **not** be the same sub-chart widget
assert chart.name != linkedsplits.chart.name assert chart.name != linkedsplits.chart.name
array_key = func_name # array_key = func_name
array_key = name
# profiler(f'fsp:{func_name} chart created') # profiler(f'fsp:{func_name} chart created')
@ -307,7 +287,8 @@ async def run_fsp_ui(
update_fsp_chart( update_fsp_chart(
chart, chart,
shm, shm,
display_name, name,
# display_name,
array_key=array_key, array_key=array_key,
) )
@ -410,7 +391,7 @@ class FspAdmin:
started: trio.Event, started: trio.Event,
dst_shm: ShmArray, dst_shm: ShmArray,
conf: dict, conf: dict,
func_name: str, target: Fsp,
loglevel: str, loglevel: str,
) -> None: ) -> None:
@ -420,11 +401,12 @@ class FspAdmin:
''' '''
brokername, sym = self.linked.symbol.front_feed() brokername, sym = self.linked.symbol.front_feed()
ns_path = NamespacePath.from_ref(target)
async with ( async with (
portal.open_context( portal.open_context(
# chaining entrypoint # chaining entrypoint
fsp.cascade, cascade,
# data feed key # data feed key
brokername=brokername, brokername=brokername,
@ -435,7 +417,8 @@ class FspAdmin:
dst_shm_token=dst_shm.token, dst_shm_token=dst_shm.token,
# target # target
func_name=func_name, ns_path=str(ns_path),
# func_name=func_name,
loglevel=loglevel, loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False), zero_on_step=conf.get('zero_on_step', False),
@ -444,8 +427,13 @@ class FspAdmin:
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
# register output data # register output data
self._registry[(brokername, sym, func_name)] = ( self._registry[
stream, dst_shm, complete) (brokername, sym, ns_path)
] = (
stream,
dst_shm,
complete
)
started.set() started.set()
@ -455,7 +443,8 @@ class FspAdmin:
async def start_engine_task( async def start_engine_task(
self, self,
display_name: str, target: Fsp,
# display_name: str,
conf: dict[str, dict[str, Any]], conf: dict[str, dict[str, Any]],
worker_name: Optional[str] = None, worker_name: Optional[str] = None,
@ -464,17 +453,21 @@ class FspAdmin:
) -> (ShmArray, trio.Event): ) -> (ShmArray, trio.Event):
# unpack FSP details from config dict # unpack FSP details from config dict
func_name = conf['func_name'] # func_name = conf['func_name']
# func_name = target.__name__
fqsn = self.linked.symbol.front_feed()
# allocate an output shm array # allocate an output shm array
dst_shm, opened = maybe_mk_fsp_shm( dst_shm, opened = maybe_mk_fsp_shm(
self.linked.symbol.front_feed(), fqsn,
field_name=func_name, # field_name=func_name,
display_name=display_name, # display_name=display_name,
target=target,
readonly=True, readonly=True,
) )
if not opened: # if not opened:
raise RuntimeError(f'Already started FSP {func_name}') # raise RuntimeError(
# f'Already started FSP `{fqsn}:{func_name}`'
# )
portal = self.cluster.get(worker_name) or self.rr_next_portal() portal = self.cluster.get(worker_name) or self.rr_next_portal()
complete = trio.Event() complete = trio.Event()
@ -487,7 +480,8 @@ class FspAdmin:
started, started,
dst_shm, dst_shm,
conf, conf,
func_name, # func_name,
target,
loglevel, loglevel,
) )
@ -495,16 +489,21 @@ class FspAdmin:
async def open_fsp_chart( async def open_fsp_chart(
self, self,
display_name: str,
target: Fsp,
# display_name: str,
conf: dict, # yeah probably dumb.. conf: dict, # yeah probably dumb..
loglevel: str = 'error', loglevel: str = 'error',
) -> (trio.Event, ChartPlotWidget): ) -> (trio.Event, ChartPlotWidget):
func_name = conf['func_name'] # func_name = conf['func_name']
# func_name = target.__name__
shm, started = await self.start_engine_task( shm, started = await self.start_engine_task(
display_name, target,
# display_name,
conf, conf,
loglevel, loglevel,
) )
@ -517,8 +516,9 @@ class FspAdmin:
self.linked, self.linked,
shm, shm,
started, started,
func_name, # func_name,
display_name, # display_name,
target,
conf=conf, conf=conf,
loglevel=loglevel, loglevel=loglevel,
@ -671,8 +671,9 @@ async def open_vlm_displays(
# spawn and overlay $ vlm on the same subchart # spawn and overlay $ vlm on the same subchart
shm, started = await admin.start_engine_task( shm, started = await admin.start_engine_task(
'dolla_vlm', # 'dolla_vlm',
# linked.symbol.front_feed(), # data-feed symbol key dolla_vlm,
{ # fsp engine conf { # fsp engine conf
'func_name': 'dolla_vlm', 'func_name': 'dolla_vlm',
'zero_on_step': True, 'zero_on_step': True,
@ -759,15 +760,18 @@ async def open_vlm_displays(
axis.size_to_values() axis.size_to_values()
# built-in vlm fsps # built-in vlm fsps
for display_name, conf in { # for display_name, conf in {
'vwap': { for target, conf in {
'func_name': 'vwap', # 'vwap': {
tina_vwap: {
# 'func_name': 'vwap',
'overlay': 'ohlc', # overlays with OHLCV (main) chart 'overlay': 'ohlc', # overlays with OHLCV (main) chart
'anchor': 'session', 'anchor': 'session',
}, },
}.items(): }.items():
started = await admin.open_fsp_chart( started = await admin.open_fsp_chart(
display_name, # display_name,
target,
conf, conf,
) )
@ -822,20 +826,22 @@ async def start_fsp_displays(
open_fsp_admin(linked, ohlcv) as admin, open_fsp_admin(linked, ohlcv) as admin,
): ):
statuses = [] statuses = []
for display_name, conf in fsp_conf.items(): # for display_name, conf in fsp_conf.items():
for target, conf in fsp_conf.items():
started = await admin.open_fsp_chart( started = await admin.open_fsp_chart(
display_name, # display_name,
target,
conf, conf,
) )
done = linked.window().status_bar.open_status( done = linked.window().status_bar.open_status(
f'loading fsp, {display_name}..', f'loading fsp, {target}..',
group_key=group_status_key, group_key=group_status_key,
) )
statuses.append((started, done)) statuses.append((started, done))
for fsp_loaded, status_cb in statuses: for fsp_loaded, status_cb in statuses:
await fsp_loaded.wait() await fsp_loaded.wait()
profiler(f'attached to fsp portal: {display_name}') profiler(f'attached to fsp portal: {target}')
status_cb() status_cb()
# blocks on nursery until all fsp actors complete # blocks on nursery until all fsp actors complete