Factor (sub-)chart spawning into a admin method
Adds `FspAdmin.open_fsp_chart()` which allows adding a real time graphics display of an fsp's output with different options for where (which chart or make a new one) to place it. Further, - change some method naming, namely the other fsp engine task methods to `.open_chain()` and `.start_engine_task()`. - make `run_fsp_ui()` a lone task function for now with the default config parsing and chart setup logic (and it still includes a buncha commented out stuff for doing graphics update which is now done in the main loop to avoid task switching overhead). - move all vlm related fsp config entries into the `open_vlm_displays()` task for dedicated setup with the fsp admin api such as special auto-yrange handling and graph overlays. - `start_fsp_displays()` is now just a small loop through config entries with synced startup status messages.fsp_ui_mod
parent
e22a652852
commit
404f5d6d23
787
piker/ui/_fsp.py
787
piker/ui/_fsp.py
|
@ -23,13 +23,12 @@ Financial signal processing cluster and real-time graphics management.
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import cycle
|
from itertools import cycle
|
||||||
from types import ModuleType
|
|
||||||
from typing import Optional, AsyncGenerator, Any
|
from typing import Optional, AsyncGenerator, Any
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pydantic import create_model
|
from pydantic import create_model
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import gather_contexts
|
# from tractor.trionics import gather_contexts
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -92,8 +91,8 @@ def has_vlm(ohlcv: ShmArray) -> bool:
|
||||||
# make sure that the instrument supports volume history
|
# make sure that the instrument supports volume history
|
||||||
# (sometimes this is not the case for some commodities and
|
# (sometimes this is not the case for some commodities and
|
||||||
# derivatives)
|
# derivatives)
|
||||||
volm = ohlcv.array['volume']
|
vlm = ohlcv.array['volume']
|
||||||
return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
|
return not bool(np.all(np.isin(vlm, -1)) or np.all(np.isnan(vlm)))
|
||||||
|
|
||||||
|
|
||||||
def update_fsp_chart(
|
def update_fsp_chart(
|
||||||
|
@ -208,285 +207,42 @@ async def open_fsp_actor_cluster(
|
||||||
|
|
||||||
from tractor._clustering import open_actor_cluster
|
from tractor._clustering import open_actor_cluster
|
||||||
|
|
||||||
profiler = pg.debug.Profiler(
|
# profiler = pg.debug.Profiler(
|
||||||
delayed=False,
|
# delayed=False,
|
||||||
disabled=False
|
# disabled=False
|
||||||
)
|
# )
|
||||||
async with open_actor_cluster(
|
async with open_actor_cluster(
|
||||||
count=2,
|
count=2,
|
||||||
names=names,
|
names=names,
|
||||||
modules=['piker.fsp._engine'],
|
modules=['piker.fsp._engine'],
|
||||||
|
|
||||||
) as cluster_map:
|
) as cluster_map:
|
||||||
profiler('started fsp cluster')
|
# profiler('started fsp cluster')
|
||||||
yield cluster_map
|
yield cluster_map
|
||||||
|
|
||||||
|
|
||||||
class FspAdmin:
|
|
||||||
'''
|
|
||||||
Client API for orchestrating FSP actors and displaying
|
|
||||||
real-time graphics output.
|
|
||||||
|
|
||||||
'''
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
tn: trio.Nursery,
|
|
||||||
cluster: dict[str, tractor.Portal],
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
self.tn = tn
|
|
||||||
self.cluster = cluster
|
|
||||||
self._rr_next_actor = cycle(cluster.items())
|
|
||||||
self._registry: dict[
|
|
||||||
tuple,
|
|
||||||
tuple[tractor.MsgStream, ShmArray]
|
|
||||||
] = {}
|
|
||||||
|
|
||||||
def rr_next_portal(self) -> tractor.Portal:
|
|
||||||
name, portal = next(self._rr_next_actor)
|
|
||||||
return portal
|
|
||||||
|
|
||||||
async def open_remote_fsp(
|
|
||||||
self,
|
|
||||||
|
|
||||||
portal: tractor.Portal,
|
|
||||||
complete: trio.Event,
|
|
||||||
started: trio.Event,
|
|
||||||
|
|
||||||
brokername: str,
|
|
||||||
sym: str,
|
|
||||||
|
|
||||||
src_shm: ShmArray,
|
|
||||||
dst_shm: ShmArray,
|
|
||||||
|
|
||||||
conf: dict,
|
|
||||||
func_name: str,
|
|
||||||
loglevel: str,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Task which opens a remote FSP endpoint in the managed
|
|
||||||
cluster and sleeps until signalled to exit.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async with (
|
|
||||||
portal.open_context(
|
|
||||||
|
|
||||||
# chaining entrypoint
|
|
||||||
fsp.cascade,
|
|
||||||
|
|
||||||
# data feed key
|
|
||||||
brokername=brokername,
|
|
||||||
symbol=sym,
|
|
||||||
|
|
||||||
# mems
|
|
||||||
src_shm_token=src_shm.token,
|
|
||||||
dst_shm_token=dst_shm.token,
|
|
||||||
|
|
||||||
# target
|
|
||||||
func_name=func_name,
|
|
||||||
|
|
||||||
loglevel=loglevel,
|
|
||||||
zero_on_step=conf.get('zero_on_step', False),
|
|
||||||
|
|
||||||
) as (ctx, last_index),
|
|
||||||
ctx.open_stream() as stream,
|
|
||||||
):
|
|
||||||
# register output data
|
|
||||||
self._registry[(brokername, sym, func_name)] = (
|
|
||||||
stream, dst_shm, complete)
|
|
||||||
|
|
||||||
started.set()
|
|
||||||
|
|
||||||
# wait for graceful shutdown signal
|
|
||||||
await complete.wait()
|
|
||||||
|
|
||||||
async def start_fsp(
|
|
||||||
self,
|
|
||||||
|
|
||||||
display_name: str,
|
|
||||||
feed_key: tuple[str, str],
|
|
||||||
src_shm: ShmArray,
|
|
||||||
conf: dict[str, dict[str, Any]],
|
|
||||||
|
|
||||||
worker_name: Optional[str] = None,
|
|
||||||
loglevel: str = 'error',
|
|
||||||
|
|
||||||
) -> (ShmArray, trio.Event):
|
|
||||||
|
|
||||||
# unpack FSP details from config dict
|
|
||||||
func_name = conf['func_name']
|
|
||||||
|
|
||||||
# allocate an output shm array
|
|
||||||
dst_shm, opened = maybe_mk_fsp_shm(
|
|
||||||
feed_key,
|
|
||||||
field_name=func_name,
|
|
||||||
display_name=display_name,
|
|
||||||
readonly=True,
|
|
||||||
)
|
|
||||||
if not opened:
|
|
||||||
raise RuntimeError("Already started FSP {func_name}")
|
|
||||||
|
|
||||||
portal = self.cluster.get(worker_name) or self.rr_next_portal()
|
|
||||||
complete = trio.Event()
|
|
||||||
started = trio.Event()
|
|
||||||
|
|
||||||
brokername, sym = feed_key
|
|
||||||
self.tn.start_soon(
|
|
||||||
self.open_remote_fsp,
|
|
||||||
portal,
|
|
||||||
complete,
|
|
||||||
started,
|
|
||||||
|
|
||||||
brokername,
|
|
||||||
sym,
|
|
||||||
|
|
||||||
src_shm,
|
|
||||||
dst_shm,
|
|
||||||
|
|
||||||
conf,
|
|
||||||
func_name,
|
|
||||||
loglevel,
|
|
||||||
)
|
|
||||||
|
|
||||||
return dst_shm, started
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def open_fsp_admin(
|
|
||||||
**kwargs,
|
|
||||||
|
|
||||||
) -> AsyncGenerator[dict, dict[str, tractor.Portal]]:
|
|
||||||
|
|
||||||
async with (
|
|
||||||
maybe_open_context(
|
|
||||||
# for now make a cluster per client?
|
|
||||||
acm_func=open_fsp_actor_cluster,
|
|
||||||
kwargs=kwargs,
|
|
||||||
) as (cache_hit, cluster_map),
|
|
||||||
|
|
||||||
trio.open_nursery() as tn,
|
|
||||||
):
|
|
||||||
if cache_hit:
|
|
||||||
log.info('re-using existing fsp cluster')
|
|
||||||
|
|
||||||
admin = FspAdmin(tn, cluster_map)
|
|
||||||
try:
|
|
||||||
yield admin
|
|
||||||
finally:
|
|
||||||
# terminate all tasks via signals
|
|
||||||
for key, entry in admin._registry.items():
|
|
||||||
_, _, event = entry
|
|
||||||
event.set()
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def maybe_open_vlm_display(
|
|
||||||
linked: LinkedSplits,
|
|
||||||
ohlcv: ShmArray,
|
|
||||||
|
|
||||||
) -> ChartPlotWidget:
|
|
||||||
'''
|
|
||||||
Volume subchart helper.
|
|
||||||
|
|
||||||
Since "volume" is often included directly alongside OHLCV price
|
|
||||||
data, we don't really need a separate FSP-actor + shm array for it
|
|
||||||
since it's likely already directly adjacent to OHLC samples from the
|
|
||||||
data provider.
|
|
||||||
|
|
||||||
'''
|
|
||||||
if not has_vlm(ohlcv):
|
|
||||||
log.warning(f"{linked.symbol.key} does not seem to have volume info")
|
|
||||||
yield
|
|
||||||
return
|
|
||||||
|
|
||||||
async with open_fsp_sidepane(
|
|
||||||
linked, {
|
|
||||||
'vlm': {
|
|
||||||
'params': {
|
|
||||||
'price_func': {
|
|
||||||
'default_value': 'chl3',
|
|
||||||
# tell target ``Edit`` widget to not allow
|
|
||||||
# edits for now.
|
|
||||||
'widget_kwargs': {'readonly': True},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
) as sidepane:
|
|
||||||
|
|
||||||
# built-in $vlm
|
|
||||||
shm = ohlcv
|
|
||||||
chart = linked.add_plot(
|
|
||||||
name='volume',
|
|
||||||
array=shm.array,
|
|
||||||
|
|
||||||
array_key='volume',
|
|
||||||
sidepane=sidepane,
|
|
||||||
|
|
||||||
# curve by default
|
|
||||||
ohlc=False,
|
|
||||||
|
|
||||||
# Draw vertical bars from zero.
|
|
||||||
# we do this internally ourselves since
|
|
||||||
# the curve item internals are pretty convoluted.
|
|
||||||
style='step',
|
|
||||||
)
|
|
||||||
|
|
||||||
# show volume units value on LHS (for dinkus)
|
|
||||||
chart.hideAxis('right')
|
|
||||||
chart.showAxis('left')
|
|
||||||
|
|
||||||
# XXX: ONLY for sub-chart fsps, overlays have their
|
|
||||||
# data looked up from the chart's internal array set.
|
|
||||||
# TODO: we must get a data view api going STAT!!
|
|
||||||
chart._shm = shm
|
|
||||||
|
|
||||||
# should **not** be the same sub-chart widget
|
|
||||||
assert chart.name != linked.chart.name
|
|
||||||
|
|
||||||
# sticky only on sub-charts atm
|
|
||||||
last_val_sticky = chart._ysticks[chart.name]
|
|
||||||
|
|
||||||
# read from last calculated value
|
|
||||||
value = shm.array['volume'][-1]
|
|
||||||
|
|
||||||
last_val_sticky.update_from_data(-1, value)
|
|
||||||
|
|
||||||
chart.update_curve_from_array(
|
|
||||||
'volume',
|
|
||||||
shm.array,
|
|
||||||
)
|
|
||||||
|
|
||||||
# size view to data once at outset
|
|
||||||
chart.view._set_yrange()
|
|
||||||
|
|
||||||
yield chart
|
|
||||||
|
|
||||||
|
|
||||||
async def run_fsp_ui(
|
async def run_fsp_ui(
|
||||||
|
|
||||||
|
linkedsplits: LinkedSplits,
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
started: trio.Event,
|
started: trio.Event,
|
||||||
linkedsplits: LinkedSplits,
|
|
||||||
func_name: str,
|
func_name: str,
|
||||||
display_name: str,
|
display_name: str,
|
||||||
conf: dict[str, dict],
|
conf: dict[str, dict],
|
||||||
group_status_key: str,
|
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
profiler: pg.debug.Profiler,
|
# profiler: pg.debug.Profiler,
|
||||||
|
# _quote_throttle_rate: int = 58,
|
||||||
_quote_throttle_rate: int = 58,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
FSP stream chart update loop.
|
Taskf for UI spawning around a ``LinkedSplits`` chart for fsp
|
||||||
|
related graphics/UX management.
|
||||||
|
|
||||||
This is called once for each entry in the fsp
|
This is normally spawned/called once for each entry in the fsp
|
||||||
config map.
|
config.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
profiler(f'started UI task for fsp: {func_name}')
|
# profiler(f'started UI task for fsp: {func_name}')
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
# side UI for parameters/controls
|
# side UI for parameters/controls
|
||||||
|
@ -496,7 +252,7 @@ async def run_fsp_ui(
|
||||||
) as sidepane,
|
) as sidepane,
|
||||||
):
|
):
|
||||||
await started.wait()
|
await started.wait()
|
||||||
profiler(f'fsp:{func_name} attached to fsp ctx-stream')
|
# profiler(f'fsp:{func_name} attached to fsp ctx-stream')
|
||||||
|
|
||||||
overlay_with = conf.get('overlay', False)
|
overlay_with = conf.get('overlay', False)
|
||||||
if overlay_with:
|
if overlay_with:
|
||||||
|
@ -518,6 +274,7 @@ async def run_fsp_ui(
|
||||||
chart._overlays[display_name] = shm
|
chart._overlays[display_name] = shm
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# create a new sub-chart widget for this fsp
|
||||||
chart = linkedsplits.add_plot(
|
chart = linkedsplits.add_plot(
|
||||||
name=display_name,
|
name=display_name,
|
||||||
array=shm.array,
|
array=shm.array,
|
||||||
|
@ -530,7 +287,6 @@ async def run_fsp_ui(
|
||||||
|
|
||||||
# settings passed down to ``ChartPlotWidget``
|
# settings passed down to ``ChartPlotWidget``
|
||||||
**conf.get('chart_kwargs', {})
|
**conf.get('chart_kwargs', {})
|
||||||
# static_yrange=(0, 100),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: ONLY for sub-chart fsps, overlays have their
|
# XXX: ONLY for sub-chart fsps, overlays have their
|
||||||
|
@ -543,7 +299,7 @@ async def run_fsp_ui(
|
||||||
|
|
||||||
array_key = func_name
|
array_key = func_name
|
||||||
|
|
||||||
profiler(f'fsp:{func_name} chart created')
|
# profiler(f'fsp:{func_name} chart created')
|
||||||
|
|
||||||
# first UI update, usually from shm pushed history
|
# first UI update, usually from shm pushed history
|
||||||
update_fsp_chart(
|
update_fsp_chart(
|
||||||
|
@ -566,20 +322,20 @@ async def run_fsp_ui(
|
||||||
# graphics.curve.setBrush(50, 50, 200, 100)
|
# graphics.curve.setBrush(50, 50, 200, 100)
|
||||||
# graphics.curve.setFillLevel(50)
|
# graphics.curve.setFillLevel(50)
|
||||||
|
|
||||||
if func_name == 'rsi':
|
# if func_name == 'rsi':
|
||||||
from ._lines import level_line
|
# from ._lines import level_line
|
||||||
# add moveable over-[sold/bought] lines
|
# # add moveable over-[sold/bought] lines
|
||||||
# and labels only for the 70/30 lines
|
# # and labels only for the 70/30 lines
|
||||||
level_line(chart, 20)
|
# level_line(chart, 20)
|
||||||
level_line(chart, 30, orient_v='top')
|
# level_line(chart, 30, orient_v='top')
|
||||||
level_line(chart, 70, orient_v='bottom')
|
# level_line(chart, 70, orient_v='bottom')
|
||||||
level_line(chart, 80, orient_v='top')
|
# level_line(chart, 80, orient_v='top')
|
||||||
|
|
||||||
chart.view._set_yrange()
|
chart.view._set_yrange()
|
||||||
# done() # status updates
|
# done() # status updates
|
||||||
|
|
||||||
profiler(f'fsp:{func_name} starting update loop')
|
# profiler(f'fsp:{func_name} starting update loop')
|
||||||
profiler.finish()
|
# profiler.finish()
|
||||||
|
|
||||||
# update chart graphics
|
# update chart graphics
|
||||||
# last = time.time()
|
# last = time.time()
|
||||||
|
@ -616,30 +372,396 @@ async def run_fsp_ui(
|
||||||
# last = time.time()
|
# last = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
class FspAdmin:
|
||||||
|
'''
|
||||||
|
Client API for orchestrating FSP actors and displaying
|
||||||
|
real-time graphics output.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
tn: trio.Nursery,
|
||||||
|
cluster: dict[str, tractor.Portal],
|
||||||
|
linked: LinkedSplits,
|
||||||
|
src_shm: ShmArray,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
self.tn = tn
|
||||||
|
self.cluster = cluster
|
||||||
|
self.linked = linked
|
||||||
|
self._rr_next_actor = cycle(cluster.items())
|
||||||
|
self._registry: dict[
|
||||||
|
tuple,
|
||||||
|
tuple[tractor.MsgStream, ShmArray]
|
||||||
|
] = {}
|
||||||
|
self.src_shm = src_shm
|
||||||
|
|
||||||
|
def rr_next_portal(self) -> tractor.Portal:
|
||||||
|
name, portal = next(self._rr_next_actor)
|
||||||
|
return portal
|
||||||
|
|
||||||
|
async def open_chain(
|
||||||
|
self,
|
||||||
|
|
||||||
|
portal: tractor.Portal,
|
||||||
|
complete: trio.Event,
|
||||||
|
started: trio.Event,
|
||||||
|
dst_shm: ShmArray,
|
||||||
|
conf: dict,
|
||||||
|
func_name: str,
|
||||||
|
loglevel: str,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Task which opens a remote FSP endpoint in the managed
|
||||||
|
cluster and sleeps until signalled to exit.
|
||||||
|
|
||||||
|
'''
|
||||||
|
brokername, sym = self.linked.symbol.front_feed()
|
||||||
|
async with (
|
||||||
|
portal.open_context(
|
||||||
|
|
||||||
|
# chaining entrypoint
|
||||||
|
fsp.cascade,
|
||||||
|
|
||||||
|
# data feed key
|
||||||
|
brokername=brokername,
|
||||||
|
symbol=sym,
|
||||||
|
|
||||||
|
# mems
|
||||||
|
src_shm_token=self.src_shm.token,
|
||||||
|
dst_shm_token=dst_shm.token,
|
||||||
|
|
||||||
|
# target
|
||||||
|
func_name=func_name,
|
||||||
|
|
||||||
|
loglevel=loglevel,
|
||||||
|
zero_on_step=conf.get('zero_on_step', False),
|
||||||
|
|
||||||
|
) as (ctx, last_index),
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
# register output data
|
||||||
|
self._registry[(brokername, sym, func_name)] = (
|
||||||
|
stream, dst_shm, complete)
|
||||||
|
|
||||||
|
started.set()
|
||||||
|
|
||||||
|
# wait for graceful shutdown signal
|
||||||
|
await complete.wait()
|
||||||
|
|
||||||
|
async def start_engine_task(
|
||||||
|
self,
|
||||||
|
|
||||||
|
display_name: str,
|
||||||
|
conf: dict[str, dict[str, Any]],
|
||||||
|
|
||||||
|
worker_name: Optional[str] = None,
|
||||||
|
loglevel: str = 'error',
|
||||||
|
|
||||||
|
) -> (ShmArray, trio.Event):
|
||||||
|
|
||||||
|
# unpack FSP details from config dict
|
||||||
|
func_name = conf['func_name']
|
||||||
|
|
||||||
|
# allocate an output shm array
|
||||||
|
dst_shm, opened = maybe_mk_fsp_shm(
|
||||||
|
self.linked.symbol.front_feed(),
|
||||||
|
field_name=func_name,
|
||||||
|
display_name=display_name,
|
||||||
|
readonly=True,
|
||||||
|
)
|
||||||
|
if not opened:
|
||||||
|
raise RuntimeError(f'Already started FSP {func_name}')
|
||||||
|
|
||||||
|
portal = self.cluster.get(worker_name) or self.rr_next_portal()
|
||||||
|
complete = trio.Event()
|
||||||
|
started = trio.Event()
|
||||||
|
self.tn.start_soon(
|
||||||
|
self.open_chain,
|
||||||
|
|
||||||
|
portal,
|
||||||
|
complete,
|
||||||
|
started,
|
||||||
|
dst_shm,
|
||||||
|
conf,
|
||||||
|
func_name,
|
||||||
|
loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
return dst_shm, started
|
||||||
|
|
||||||
|
async def open_fsp_chart(
|
||||||
|
self,
|
||||||
|
display_name: str,
|
||||||
|
conf: dict, # yeah probably dumb..
|
||||||
|
loglevel: str = 'error',
|
||||||
|
|
||||||
|
) -> (trio.Event, ChartPlotWidget):
|
||||||
|
|
||||||
|
func_name = conf['func_name']
|
||||||
|
|
||||||
|
shm, started = await self.start_engine_task(
|
||||||
|
display_name,
|
||||||
|
conf,
|
||||||
|
loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
# init async
|
||||||
|
self.tn.start_soon(
|
||||||
|
partial(
|
||||||
|
run_fsp_ui,
|
||||||
|
|
||||||
|
self.linked,
|
||||||
|
shm,
|
||||||
|
started,
|
||||||
|
func_name,
|
||||||
|
display_name,
|
||||||
|
|
||||||
|
conf=conf,
|
||||||
|
loglevel=loglevel,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return started
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_fsp_admin(
|
||||||
|
linked: LinkedSplits,
|
||||||
|
src_shm: ShmArray,
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> AsyncGenerator[dict, dict[str, tractor.Portal]]:
|
||||||
|
|
||||||
|
async with (
|
||||||
|
maybe_open_context(
|
||||||
|
# for now make a cluster per client?
|
||||||
|
acm_func=open_fsp_actor_cluster,
|
||||||
|
kwargs=kwargs,
|
||||||
|
) as (cache_hit, cluster_map),
|
||||||
|
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
if cache_hit:
|
||||||
|
log.info('re-using existing fsp cluster')
|
||||||
|
|
||||||
|
admin = FspAdmin(
|
||||||
|
tn,
|
||||||
|
cluster_map,
|
||||||
|
linked,
|
||||||
|
src_shm,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
yield admin
|
||||||
|
finally:
|
||||||
|
# terminate all tasks via signals
|
||||||
|
for key, entry in admin._registry.items():
|
||||||
|
_, _, event = entry
|
||||||
|
event.set()
|
||||||
|
|
||||||
|
|
||||||
|
async def open_vlm_displays(
|
||||||
|
|
||||||
|
linked: LinkedSplits,
|
||||||
|
ohlcv: ShmArray,
|
||||||
|
dvlm: bool = True,
|
||||||
|
|
||||||
|
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> ChartPlotWidget:
|
||||||
|
'''
|
||||||
|
Volume subchart displays.
|
||||||
|
|
||||||
|
Since "volume" is often included directly alongside OHLCV price
|
||||||
|
data, we don't really need a separate FSP-actor + shm array for it
|
||||||
|
since it's likely already directly adjacent to OHLC samples from the
|
||||||
|
data provider.
|
||||||
|
|
||||||
|
Further only if volume data is detected (it sometimes isn't provided
|
||||||
|
eg. forex, certain commodities markets) will volume dependent FSPs
|
||||||
|
be spawned here.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with (
|
||||||
|
open_fsp_sidepane(
|
||||||
|
linked, {
|
||||||
|
'vlm': {
|
||||||
|
'params': {
|
||||||
|
'price_func': {
|
||||||
|
'default_value': 'chl3',
|
||||||
|
# tell target ``Edit`` widget to not allow
|
||||||
|
# edits for now.
|
||||||
|
'widget_kwargs': {'readonly': True},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
) as sidepane,
|
||||||
|
open_fsp_admin(linked, ohlcv) as admin,
|
||||||
|
):
|
||||||
|
# built-in vlm which we plot ASAP since it's
|
||||||
|
# usually data provided directly with OHLC history.
|
||||||
|
shm = ohlcv
|
||||||
|
chart = linked.add_plot(
|
||||||
|
name='volume',
|
||||||
|
array=shm.array,
|
||||||
|
|
||||||
|
array_key='volume',
|
||||||
|
sidepane=sidepane,
|
||||||
|
|
||||||
|
# curve by default
|
||||||
|
ohlc=False,
|
||||||
|
|
||||||
|
# Draw vertical bars from zero.
|
||||||
|
# we do this internally ourselves since
|
||||||
|
# the curve item internals are pretty convoluted.
|
||||||
|
style='step',
|
||||||
|
)
|
||||||
|
|
||||||
|
# force 0 to always be in view
|
||||||
|
def maxmin(name) -> tuple[float, float]:
|
||||||
|
mxmn = chart.maxmin(name=name)
|
||||||
|
if mxmn:
|
||||||
|
return 0, mxmn[1]
|
||||||
|
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
|
chart.view._maxmin = partial(maxmin, name='volume')
|
||||||
|
|
||||||
|
# TODO: fix the x-axis label issue where if you put
|
||||||
|
# the axis on the left it's totally not lined up...
|
||||||
|
# show volume units value on LHS (for dinkus)
|
||||||
|
# chart.hideAxis('right')
|
||||||
|
# chart.showAxis('left')
|
||||||
|
|
||||||
|
# XXX: ONLY for sub-chart fsps, overlays have their
|
||||||
|
# data looked up from the chart's internal array set.
|
||||||
|
# TODO: we must get a data view api going STAT!!
|
||||||
|
chart._shm = shm
|
||||||
|
|
||||||
|
# send back new chart to caller
|
||||||
|
task_status.started(chart)
|
||||||
|
|
||||||
|
# should **not** be the same sub-chart widget
|
||||||
|
assert chart.name != linked.chart.name
|
||||||
|
|
||||||
|
# sticky only on sub-charts atm
|
||||||
|
last_val_sticky = chart._ysticks[chart.name]
|
||||||
|
|
||||||
|
# read from last calculated value
|
||||||
|
value = shm.array['volume'][-1]
|
||||||
|
|
||||||
|
last_val_sticky.update_from_data(-1, value)
|
||||||
|
|
||||||
|
chart.update_curve_from_array(
|
||||||
|
'volume',
|
||||||
|
shm.array,
|
||||||
|
)
|
||||||
|
|
||||||
|
# size view to data once at outset
|
||||||
|
chart.view._set_yrange()
|
||||||
|
|
||||||
|
if not dvlm:
|
||||||
|
return
|
||||||
|
|
||||||
|
# spawn and overlay $ vlm on the same subchart
|
||||||
|
shm, started = await admin.start_engine_task(
|
||||||
|
'dolla_vlm',
|
||||||
|
# linked.symbol.front_feed(), # data-feed symbol key
|
||||||
|
{ # fsp engine conf
|
||||||
|
'func_name': 'dolla_vlm',
|
||||||
|
'zero_on_step': True,
|
||||||
|
'params': {
|
||||||
|
'price_func': {
|
||||||
|
'default_value': 'chl3',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
# loglevel,
|
||||||
|
)
|
||||||
|
# profiler(f'created shm for fsp actor: {display_name}')
|
||||||
|
|
||||||
|
await started.wait()
|
||||||
|
|
||||||
|
pi = chart.overlay_plotitem(
|
||||||
|
'dolla_vlm',
|
||||||
|
)
|
||||||
|
# add custom auto range handler
|
||||||
|
pi.vb._maxmin = partial(maxmin, name='dolla_vlm')
|
||||||
|
|
||||||
|
curve, _ = chart.draw_curve(
|
||||||
|
|
||||||
|
name='dolla_vlm',
|
||||||
|
data=shm.array,
|
||||||
|
|
||||||
|
array_key='dolla_vlm',
|
||||||
|
overlay=pi,
|
||||||
|
color='charcoal',
|
||||||
|
step_mode=True,
|
||||||
|
# **conf.get('chart_kwargs', {})
|
||||||
|
)
|
||||||
|
# TODO: is there a way to "sync" the dual axes such that only
|
||||||
|
# one curve is needed?
|
||||||
|
# curve.hide()
|
||||||
|
|
||||||
|
# TODO: we need a better API to do this..
|
||||||
|
# specially store ref to shm for lookup in display loop
|
||||||
|
# since only a placeholder of `None` is entered in
|
||||||
|
# ``.draw_curve()``.
|
||||||
|
chart._overlays['dolla_vlm'] = shm
|
||||||
|
|
||||||
|
# XXX: old dict-style config before it was moved into the helper task
|
||||||
|
# 'dolla_vlm': {
|
||||||
|
# 'func_name': 'dolla_vlm',
|
||||||
|
# 'zero_on_step': True,
|
||||||
|
# 'overlay': 'volume',
|
||||||
|
# 'separate_axes': True,
|
||||||
|
# 'params': {
|
||||||
|
# 'price_func': {
|
||||||
|
# 'default_value': 'chl3',
|
||||||
|
# # tell target ``Edit`` widget to not allow
|
||||||
|
# # edits for now.
|
||||||
|
# 'widget_kwargs': {'readonly': True},
|
||||||
|
# },
|
||||||
|
# },
|
||||||
|
# 'chart_kwargs': {'step_mode': True}
|
||||||
|
# },
|
||||||
|
|
||||||
|
# }
|
||||||
|
|
||||||
|
# built-in vlm fsps
|
||||||
|
for display_name, conf in {
|
||||||
|
'vwap': {
|
||||||
|
'func_name': 'vwap',
|
||||||
|
'overlay': 'ohlc', # overlays with OHLCV (main) chart
|
||||||
|
'anchor': 'session',
|
||||||
|
},
|
||||||
|
}.items():
|
||||||
|
started = await admin.open_fsp_chart(
|
||||||
|
display_name,
|
||||||
|
conf,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def start_fsp_displays(
|
async def start_fsp_displays(
|
||||||
|
|
||||||
linkedsplits: LinkedSplits,
|
linked: LinkedSplits,
|
||||||
brokermod: ModuleType,
|
|
||||||
sym: str,
|
|
||||||
ohlcv: ShmArray,
|
ohlcv: ShmArray,
|
||||||
group_status_key: str,
|
group_status_key: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
|
||||||
tuple[FspAdmin, 'ChartPlotWidet']
|
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Create sub-actors (under flat tree)
|
Create fsp charts from a config input attached to a local actor
|
||||||
for each entry in config and attach to local graphics update tasks.
|
compute cluster.
|
||||||
|
|
||||||
Pass target entrypoint and historical data.
|
Pass target entrypoint and historical data via ``ShmArray``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
linked.focus()
|
||||||
|
|
||||||
# TODO: eventually we'll support some kind of n-compose syntax
|
# TODO: eventually we'll support some kind of n-compose syntax
|
||||||
fsp_conf = {
|
fsp_conf = {
|
||||||
|
|
||||||
# 'rsi': {
|
# 'rsi': {
|
||||||
# 'func_name': 'rsi', # literal python func ref lookup name
|
# 'func_name': 'rsi', # literal python func ref lookup name
|
||||||
|
|
||||||
|
@ -659,99 +781,32 @@ async def start_fsp_displays(
|
||||||
# },
|
# },
|
||||||
# },
|
# },
|
||||||
}
|
}
|
||||||
|
|
||||||
if has_vlm(ohlcv):
|
|
||||||
fsp_conf.update({
|
|
||||||
'vwap': {
|
|
||||||
'func_name': 'vwap',
|
|
||||||
'overlay': 'ohlc',
|
|
||||||
'anchor': 'session',
|
|
||||||
},
|
|
||||||
|
|
||||||
'dolla_vlm': {
|
|
||||||
'func_name': 'dolla_vlm',
|
|
||||||
'zero_on_step': True,
|
|
||||||
'overlay': 'volume',
|
|
||||||
'separate_axes': True,
|
|
||||||
'params': {
|
|
||||||
'price_func': {
|
|
||||||
'default_value': 'chl3',
|
|
||||||
# tell target ``Edit`` widget to not allow
|
|
||||||
# edits for now.
|
|
||||||
'widget_kwargs': {'readonly': True},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
'chart_kwargs': {'step_mode': True}
|
|
||||||
},
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
linkedsplits.focus()
|
|
||||||
|
|
||||||
profiler = pg.debug.Profiler(
|
profiler = pg.debug.Profiler(
|
||||||
delayed=False,
|
delayed=False,
|
||||||
disabled=False
|
disabled=False
|
||||||
)
|
)
|
||||||
|
|
||||||
async with gather_contexts((
|
# async with gather_contexts((
|
||||||
|
async with (
|
||||||
# NOTE: this admin internally opens an actor pool.
|
|
||||||
open_fsp_admin(),
|
|
||||||
|
|
||||||
trio.open_nursery(),
|
|
||||||
|
|
||||||
maybe_open_vlm_display(
|
|
||||||
linkedsplits,
|
|
||||||
ohlcv,
|
|
||||||
),
|
|
||||||
|
|
||||||
)) as (admin, n, vlm_chart):
|
|
||||||
|
|
||||||
task_status.started((admin, vlm_chart))
|
|
||||||
|
|
||||||
|
# NOTE: this admin internally opens an actor cluster
|
||||||
|
open_fsp_admin(linked, ohlcv) as admin,
|
||||||
|
):
|
||||||
|
statuses = []
|
||||||
for display_name, conf in fsp_conf.items():
|
for display_name, conf in fsp_conf.items():
|
||||||
func_name = conf['func_name']
|
started = await admin.open_fsp_chart(
|
||||||
|
display_name,
|
||||||
done = linkedsplits.window().status_bar.open_status(
|
conf,
|
||||||
|
)
|
||||||
|
done = linked.window().status_bar.open_status(
|
||||||
f'loading fsp, {display_name}..',
|
f'loading fsp, {display_name}..',
|
||||||
group_key=group_status_key,
|
group_key=group_status_key,
|
||||||
)
|
)
|
||||||
|
statuses.append((started, done))
|
||||||
|
|
||||||
shm, started = await admin.start_fsp(
|
for fsp_loaded, status_cb in statuses:
|
||||||
display_name,
|
await fsp_loaded.wait()
|
||||||
(brokermod.name, sym),
|
|
||||||
ohlcv,
|
|
||||||
fsp_conf[display_name],
|
|
||||||
loglevel,
|
|
||||||
)
|
|
||||||
|
|
||||||
profiler(f'created shm for fsp actor: {display_name}')
|
|
||||||
|
|
||||||
# XXX: fsp may have been opened by a duplicate chart.
|
|
||||||
# Error for now until we figure out how to wrap fsps as
|
|
||||||
# "feeds". assert opened, f"A chart for {key} likely
|
|
||||||
# already exists?"
|
|
||||||
|
|
||||||
profiler(f'attached to fsp portal: {display_name}')
|
profiler(f'attached to fsp portal: {display_name}')
|
||||||
|
status_cb()
|
||||||
# init async
|
|
||||||
n.start_soon(
|
|
||||||
partial(
|
|
||||||
run_fsp_ui,
|
|
||||||
|
|
||||||
shm,
|
|
||||||
started,
|
|
||||||
linkedsplits,
|
|
||||||
func_name,
|
|
||||||
display_name,
|
|
||||||
|
|
||||||
conf=conf,
|
|
||||||
group_status_key=group_status_key,
|
|
||||||
loglevel=loglevel,
|
|
||||||
profiler=profiler,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
await started.wait()
|
|
||||||
done()
|
|
||||||
|
|
||||||
# blocks on nursery until all fsp actors complete
|
# blocks on nursery until all fsp actors complete
|
||||||
|
|
Loading…
Reference in New Issue