Factor (sub-)chart spawning into a admin method

Adds `FspAdmin.open_fsp_chart()` which allows adding a real time graphics
display of an fsp's output with different options for where (which chart
or make a new one) to place it.

Further,
- change some method naming, namely the other fsp engine task methods to
  `.open_chain()` and `.start_engine_task()`.
- make `run_fsp_ui()` a lone task function for now with the default
  config parsing and chart setup logic (and it still includes a buncha
  commented out stuff for doing graphics update which is now done in the
  main loop to avoid task switching overhead).
- move all vlm related fsp config entries into the `open_vlm_displays()`
  task for dedicated setup with the fsp admin api such as special
  auto-yrange handling and graph overlays.
- `start_fsp_displays()` is now just a small loop through config entries
  with synced startup status messages.
vlm_plotz
Tyler Goodlet 2022-01-12 18:48:47 -05:00
parent ca9973e619
commit cfa9dbc906
1 changed files with 421 additions and 366 deletions

View File

@ -23,13 +23,12 @@ Financial signal processing cluster and real-time graphics management.
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from itertools import cycle from itertools import cycle
from types import ModuleType
from typing import Optional, AsyncGenerator, Any from typing import Optional, AsyncGenerator, Any
import numpy as np import numpy as np
from pydantic import create_model from pydantic import create_model
import tractor import tractor
from tractor.trionics import gather_contexts # from tractor.trionics import gather_contexts
import pyqtgraph as pg import pyqtgraph as pg
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -92,8 +91,8 @@ def has_vlm(ohlcv: ShmArray) -> bool:
# make sure that the instrument supports volume history # make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and # (sometimes this is not the case for some commodities and
# derivatives) # derivatives)
volm = ohlcv.array['volume'] vlm = ohlcv.array['volume']
return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) return not bool(np.all(np.isin(vlm, -1)) or np.all(np.isnan(vlm)))
def update_fsp_chart( def update_fsp_chart(
@ -208,285 +207,42 @@ async def open_fsp_actor_cluster(
from tractor._clustering import open_actor_cluster from tractor._clustering import open_actor_cluster
profiler = pg.debug.Profiler( # profiler = pg.debug.Profiler(
delayed=False, # delayed=False,
disabled=False # disabled=False
) # )
async with open_actor_cluster( async with open_actor_cluster(
count=2, count=2,
names=names, names=names,
modules=['piker.fsp._engine'], modules=['piker.fsp._engine'],
) as cluster_map: ) as cluster_map:
profiler('started fsp cluster') # profiler('started fsp cluster')
yield cluster_map yield cluster_map
class FspAdmin:
'''
Client API for orchestrating FSP actors and displaying
real-time graphics output.
'''
def __init__(
self,
tn: trio.Nursery,
cluster: dict[str, tractor.Portal],
) -> None:
self.tn = tn
self.cluster = cluster
self._rr_next_actor = cycle(cluster.items())
self._registry: dict[
tuple,
tuple[tractor.MsgStream, ShmArray]
] = {}
def rr_next_portal(self) -> tractor.Portal:
name, portal = next(self._rr_next_actor)
return portal
async def open_remote_fsp(
self,
portal: tractor.Portal,
complete: trio.Event,
started: trio.Event,
brokername: str,
sym: str,
src_shm: ShmArray,
dst_shm: ShmArray,
conf: dict,
func_name: str,
loglevel: str,
) -> None:
'''
Task which opens a remote FSP endpoint in the managed
cluster and sleeps until signalled to exit.
'''
async with (
portal.open_context(
# chaining entrypoint
fsp.cascade,
# data feed key
brokername=brokername,
symbol=sym,
# mems
src_shm_token=src_shm.token,
dst_shm_token=dst_shm.token,
# target
func_name=func_name,
loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False),
) as (ctx, last_index),
ctx.open_stream() as stream,
):
# register output data
self._registry[(brokername, sym, func_name)] = (
stream, dst_shm, complete)
started.set()
# wait for graceful shutdown signal
await complete.wait()
async def start_fsp(
self,
display_name: str,
feed_key: tuple[str, str],
src_shm: ShmArray,
conf: dict[str, dict[str, Any]],
worker_name: Optional[str] = None,
loglevel: str = 'error',
) -> (ShmArray, trio.Event):
# unpack FSP details from config dict
func_name = conf['func_name']
# allocate an output shm array
dst_shm, opened = maybe_mk_fsp_shm(
feed_key,
field_name=func_name,
display_name=display_name,
readonly=True,
)
if not opened:
raise RuntimeError("Already started FSP {func_name}")
portal = self.cluster.get(worker_name) or self.rr_next_portal()
complete = trio.Event()
started = trio.Event()
brokername, sym = feed_key
self.tn.start_soon(
self.open_remote_fsp,
portal,
complete,
started,
brokername,
sym,
src_shm,
dst_shm,
conf,
func_name,
loglevel,
)
return dst_shm, started
@acm
async def open_fsp_admin(
**kwargs,
) -> AsyncGenerator[dict, dict[str, tractor.Portal]]:
async with (
maybe_open_context(
# for now make a cluster per client?
acm_func=open_fsp_actor_cluster,
kwargs=kwargs,
) as (cache_hit, cluster_map),
trio.open_nursery() as tn,
):
if cache_hit:
log.info('re-using existing fsp cluster')
admin = FspAdmin(tn, cluster_map)
try:
yield admin
finally:
# terminate all tasks via signals
for key, entry in admin._registry.items():
_, _, event = entry
event.set()
@acm
async def maybe_open_vlm_display(
linked: LinkedSplits,
ohlcv: ShmArray,
) -> ChartPlotWidget:
'''
Volume subchart helper.
Since "volume" is often included directly alongside OHLCV price
data, we don't really need a separate FSP-actor + shm array for it
since it's likely already directly adjacent to OHLC samples from the
data provider.
'''
if not has_vlm(ohlcv):
log.warning(f"{linked.symbol.key} does not seem to have volume info")
yield
return
async with open_fsp_sidepane(
linked, {
'vlm': {
'params': {
'price_func': {
'default_value': 'chl3',
# tell target ``Edit`` widget to not allow
# edits for now.
'widget_kwargs': {'readonly': True},
},
},
}
},
) as sidepane:
# built-in $vlm
shm = ohlcv
chart = linked.add_plot(
name='volume',
array=shm.array,
array_key='volume',
sidepane=sidepane,
# curve by default
ohlc=False,
# Draw vertical bars from zero.
# we do this internally ourselves since
# the curve item internals are pretty convoluted.
style='step',
)
# show volume units value on LHS (for dinkus)
chart.hideAxis('right')
chart.showAxis('left')
# XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linked.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value
value = shm.array['volume'][-1]
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(
'volume',
shm.array,
)
# size view to data once at outset
chart.view._set_yrange()
yield chart
async def run_fsp_ui( async def run_fsp_ui(
linkedsplits: LinkedSplits,
shm: ShmArray, shm: ShmArray,
started: trio.Event, started: trio.Event,
linkedsplits: LinkedSplits,
func_name: str, func_name: str,
display_name: str, display_name: str,
conf: dict[str, dict], conf: dict[str, dict],
group_status_key: str,
loglevel: str, loglevel: str,
profiler: pg.debug.Profiler, # profiler: pg.debug.Profiler,
# _quote_throttle_rate: int = 58,
_quote_throttle_rate: int = 58,
) -> None: ) -> None:
''' '''
FSP stream chart update loop. Taskf for UI spawning around a ``LinkedSplits`` chart for fsp
related graphics/UX management.
This is called once for each entry in the fsp This is normally spawned/called once for each entry in the fsp
config map. config.
''' '''
profiler(f'started UI task for fsp: {func_name}') # profiler(f'started UI task for fsp: {func_name}')
async with ( async with (
# side UI for parameters/controls # side UI for parameters/controls
@ -496,7 +252,7 @@ async def run_fsp_ui(
) as sidepane, ) as sidepane,
): ):
await started.wait() await started.wait()
profiler(f'fsp:{func_name} attached to fsp ctx-stream') # profiler(f'fsp:{func_name} attached to fsp ctx-stream')
overlay_with = conf.get('overlay', False) overlay_with = conf.get('overlay', False)
if overlay_with: if overlay_with:
@ -518,6 +274,7 @@ async def run_fsp_ui(
chart._overlays[display_name] = shm chart._overlays[display_name] = shm
else: else:
# create a new sub-chart widget for this fsp
chart = linkedsplits.add_plot( chart = linkedsplits.add_plot(
name=display_name, name=display_name,
array=shm.array, array=shm.array,
@ -530,7 +287,6 @@ async def run_fsp_ui(
# settings passed down to ``ChartPlotWidget`` # settings passed down to ``ChartPlotWidget``
**conf.get('chart_kwargs', {}) **conf.get('chart_kwargs', {})
# static_yrange=(0, 100),
) )
# XXX: ONLY for sub-chart fsps, overlays have their # XXX: ONLY for sub-chart fsps, overlays have their
@ -543,7 +299,7 @@ async def run_fsp_ui(
array_key = func_name array_key = func_name
profiler(f'fsp:{func_name} chart created') # profiler(f'fsp:{func_name} chart created')
# first UI update, usually from shm pushed history # first UI update, usually from shm pushed history
update_fsp_chart( update_fsp_chart(
@ -566,20 +322,20 @@ async def run_fsp_ui(
# graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50) # graphics.curve.setFillLevel(50)
if func_name == 'rsi': # if func_name == 'rsi':
from ._lines import level_line # from ._lines import level_line
# add moveable over-[sold/bought] lines # # add moveable over-[sold/bought] lines
# and labels only for the 70/30 lines # # and labels only for the 70/30 lines
level_line(chart, 20) # level_line(chart, 20)
level_line(chart, 30, orient_v='top') # level_line(chart, 30, orient_v='top')
level_line(chart, 70, orient_v='bottom') # level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top') # level_line(chart, 80, orient_v='top')
chart.view._set_yrange() chart.view._set_yrange()
# done() # status updates # done() # status updates
profiler(f'fsp:{func_name} starting update loop') # profiler(f'fsp:{func_name} starting update loop')
profiler.finish() # profiler.finish()
# update chart graphics # update chart graphics
# last = time.time() # last = time.time()
@ -616,30 +372,396 @@ async def run_fsp_ui(
# last = time.time() # last = time.time()
class FspAdmin:
'''
Client API for orchestrating FSP actors and displaying
real-time graphics output.
'''
def __init__(
self,
tn: trio.Nursery,
cluster: dict[str, tractor.Portal],
linked: LinkedSplits,
src_shm: ShmArray,
) -> None:
self.tn = tn
self.cluster = cluster
self.linked = linked
self._rr_next_actor = cycle(cluster.items())
self._registry: dict[
tuple,
tuple[tractor.MsgStream, ShmArray]
] = {}
self.src_shm = src_shm
def rr_next_portal(self) -> tractor.Portal:
name, portal = next(self._rr_next_actor)
return portal
async def open_chain(
self,
portal: tractor.Portal,
complete: trio.Event,
started: trio.Event,
dst_shm: ShmArray,
conf: dict,
func_name: str,
loglevel: str,
) -> None:
'''
Task which opens a remote FSP endpoint in the managed
cluster and sleeps until signalled to exit.
'''
brokername, sym = self.linked.symbol.front_feed()
async with (
portal.open_context(
# chaining entrypoint
fsp.cascade,
# data feed key
brokername=brokername,
symbol=sym,
# mems
src_shm_token=self.src_shm.token,
dst_shm_token=dst_shm.token,
# target
func_name=func_name,
loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False),
) as (ctx, last_index),
ctx.open_stream() as stream,
):
# register output data
self._registry[(brokername, sym, func_name)] = (
stream, dst_shm, complete)
started.set()
# wait for graceful shutdown signal
await complete.wait()
async def start_engine_task(
self,
display_name: str,
conf: dict[str, dict[str, Any]],
worker_name: Optional[str] = None,
loglevel: str = 'error',
) -> (ShmArray, trio.Event):
# unpack FSP details from config dict
func_name = conf['func_name']
# allocate an output shm array
dst_shm, opened = maybe_mk_fsp_shm(
self.linked.symbol.front_feed(),
field_name=func_name,
display_name=display_name,
readonly=True,
)
if not opened:
raise RuntimeError(f'Already started FSP {func_name}')
portal = self.cluster.get(worker_name) or self.rr_next_portal()
complete = trio.Event()
started = trio.Event()
self.tn.start_soon(
self.open_chain,
portal,
complete,
started,
dst_shm,
conf,
func_name,
loglevel,
)
return dst_shm, started
async def open_fsp_chart(
self,
display_name: str,
conf: dict, # yeah probably dumb..
loglevel: str = 'error',
) -> (trio.Event, ChartPlotWidget):
func_name = conf['func_name']
shm, started = await self.start_engine_task(
display_name,
conf,
loglevel,
)
# init async
self.tn.start_soon(
partial(
run_fsp_ui,
self.linked,
shm,
started,
func_name,
display_name,
conf=conf,
loglevel=loglevel,
)
)
return started
@acm
async def open_fsp_admin(
linked: LinkedSplits,
src_shm: ShmArray,
**kwargs,
) -> AsyncGenerator[dict, dict[str, tractor.Portal]]:
async with (
maybe_open_context(
# for now make a cluster per client?
acm_func=open_fsp_actor_cluster,
kwargs=kwargs,
) as (cache_hit, cluster_map),
trio.open_nursery() as tn,
):
if cache_hit:
log.info('re-using existing fsp cluster')
admin = FspAdmin(
tn,
cluster_map,
linked,
src_shm,
)
try:
yield admin
finally:
# terminate all tasks via signals
for key, entry in admin._registry.items():
_, _, event = entry
event.set()
async def open_vlm_displays(
linked: LinkedSplits,
ohlcv: ShmArray,
dvlm: bool = True,
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
) -> ChartPlotWidget:
'''
Volume subchart displays.
Since "volume" is often included directly alongside OHLCV price
data, we don't really need a separate FSP-actor + shm array for it
since it's likely already directly adjacent to OHLC samples from the
data provider.
Further only if volume data is detected (it sometimes isn't provided
eg. forex, certain commodities markets) will volume dependent FSPs
be spawned here.
'''
async with (
open_fsp_sidepane(
linked, {
'vlm': {
'params': {
'price_func': {
'default_value': 'chl3',
# tell target ``Edit`` widget to not allow
# edits for now.
'widget_kwargs': {'readonly': True},
},
},
}
},
) as sidepane,
open_fsp_admin(linked, ohlcv) as admin,
):
# built-in vlm which we plot ASAP since it's
# usually data provided directly with OHLC history.
shm = ohlcv
chart = linked.add_plot(
name='volume',
array=shm.array,
array_key='volume',
sidepane=sidepane,
# curve by default
ohlc=False,
# Draw vertical bars from zero.
# we do this internally ourselves since
# the curve item internals are pretty convoluted.
style='step',
)
# force 0 to always be in view
def maxmin(name) -> tuple[float, float]:
mxmn = chart.maxmin(name=name)
if mxmn:
return 0, mxmn[1]
return 0, 0
chart.view._maxmin = partial(maxmin, name='volume')
# TODO: fix the x-axis label issue where if you put
# the axis on the left it's totally not lined up...
# show volume units value on LHS (for dinkus)
# chart.hideAxis('right')
# chart.showAxis('left')
# XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
# send back new chart to caller
task_status.started(chart)
# should **not** be the same sub-chart widget
assert chart.name != linked.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value
value = shm.array['volume'][-1]
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(
'volume',
shm.array,
)
# size view to data once at outset
chart.view._set_yrange()
if not dvlm:
return
# spawn and overlay $ vlm on the same subchart
shm, started = await admin.start_engine_task(
'dolla_vlm',
# linked.symbol.front_feed(), # data-feed symbol key
{ # fsp engine conf
'func_name': 'dolla_vlm',
'zero_on_step': True,
'params': {
'price_func': {
'default_value': 'chl3',
},
},
},
# loglevel,
)
# profiler(f'created shm for fsp actor: {display_name}')
await started.wait()
pi = chart.overlay_plotitem(
'dolla_vlm',
)
# add custom auto range handler
pi.vb._maxmin = partial(maxmin, name='dolla_vlm')
curve, _ = chart.draw_curve(
name='dolla_vlm',
data=shm.array,
array_key='dolla_vlm',
overlay=pi,
color='charcoal',
step_mode=True,
# **conf.get('chart_kwargs', {})
)
# TODO: is there a way to "sync" the dual axes such that only
# one curve is needed?
# curve.hide()
# TODO: we need a better API to do this..
# specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in
# ``.draw_curve()``.
chart._overlays['dolla_vlm'] = shm
# XXX: old dict-style config before it was moved into the helper task
# 'dolla_vlm': {
# 'func_name': 'dolla_vlm',
# 'zero_on_step': True,
# 'overlay': 'volume',
# 'separate_axes': True,
# 'params': {
# 'price_func': {
# 'default_value': 'chl3',
# # tell target ``Edit`` widget to not allow
# # edits for now.
# 'widget_kwargs': {'readonly': True},
# },
# },
# 'chart_kwargs': {'step_mode': True}
# },
# }
# built-in vlm fsps
for display_name, conf in {
'vwap': {
'func_name': 'vwap',
'overlay': 'ohlc', # overlays with OHLCV (main) chart
'anchor': 'session',
},
}.items():
started = await admin.open_fsp_chart(
display_name,
conf,
)
async def start_fsp_displays( async def start_fsp_displays(
linkedsplits: LinkedSplits, linked: LinkedSplits,
brokermod: ModuleType,
sym: str,
ohlcv: ShmArray, ohlcv: ShmArray,
group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
task_status: TaskStatus[
tuple[FspAdmin, 'ChartPlotWidet']
] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
Create sub-actors (under flat tree) Create fsp charts from a config input attached to a local actor
for each entry in config and attach to local graphics update tasks. compute cluster.
Pass target entrypoint and historical data. Pass target entrypoint and historical data via ``ShmArray``.
''' '''
linked.focus()
# TODO: eventually we'll support some kind of n-compose syntax # TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = { fsp_conf = {
# 'rsi': { # 'rsi': {
# 'func_name': 'rsi', # literal python func ref lookup name # 'func_name': 'rsi', # literal python func ref lookup name
@ -659,99 +781,32 @@ async def start_fsp_displays(
# }, # },
# }, # },
} }
if has_vlm(ohlcv):
fsp_conf.update({
'vwap': {
'func_name': 'vwap',
'overlay': 'ohlc',
'anchor': 'session',
},
'dolla_vlm': {
'func_name': 'dolla_vlm',
'zero_on_step': True,
'overlay': 'volume',
'separate_axes': True,
'params': {
'price_func': {
'default_value': 'chl3',
# tell target ``Edit`` widget to not allow
# edits for now.
'widget_kwargs': {'readonly': True},
},
},
'chart_kwargs': {'step_mode': True}
},
})
linkedsplits.focus()
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
delayed=False, delayed=False,
disabled=False disabled=False
) )
async with gather_contexts(( # async with gather_contexts((
async with (
# NOTE: this admin internally opens an actor pool.
open_fsp_admin(),
trio.open_nursery(),
maybe_open_vlm_display(
linkedsplits,
ohlcv,
),
)) as (admin, n, vlm_chart):
task_status.started((admin, vlm_chart))
# NOTE: this admin internally opens an actor cluster
open_fsp_admin(linked, ohlcv) as admin,
):
statuses = []
for display_name, conf in fsp_conf.items(): for display_name, conf in fsp_conf.items():
func_name = conf['func_name'] started = await admin.open_fsp_chart(
display_name,
done = linkedsplits.window().status_bar.open_status( conf,
)
done = linked.window().status_bar.open_status(
f'loading fsp, {display_name}..', f'loading fsp, {display_name}..',
group_key=group_status_key, group_key=group_status_key,
) )
statuses.append((started, done))
shm, started = await admin.start_fsp( for fsp_loaded, status_cb in statuses:
display_name, await fsp_loaded.wait()
(brokermod.name, sym),
ohlcv,
fsp_conf[display_name],
loglevel,
)
profiler(f'created shm for fsp actor: {display_name}')
# XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely
# already exists?"
profiler(f'attached to fsp portal: {display_name}') profiler(f'attached to fsp portal: {display_name}')
status_cb()
# init async
n.start_soon(
partial(
run_fsp_ui,
shm,
started,
linkedsplits,
func_name,
display_name,
conf=conf,
group_status_key=group_status_key,
loglevel=loglevel,
profiler=profiler,
)
)
await started.wait()
done()
# blocks on nursery until all fsp actors complete # blocks on nursery until all fsp actors complete