Adjust FSP UI/mgmt apis to be `Flume` oriented

epoch_index_backup
Tyler Goodlet 2022-11-15 15:35:57 -05:00
parent eb1650197b
commit a6241a5a16
1 changed files with 23 additions and 13 deletions

View File

@ -42,6 +42,7 @@ from ..data._sharedmem import (
_Token, _Token,
try_read, try_read,
) )
from ..data.feed import Flume
from ._chart import ( from ._chart import (
ChartPlotWidget, ChartPlotWidget,
LinkedSplits, LinkedSplits,
@ -107,7 +108,8 @@ def update_fsp_chart(
# sub-charts reference it under different 'named charts'. # sub-charts reference it under different 'named charts'.
# read from last calculated value and update any label # read from last calculated value and update any label
last_val_sticky = chart._ysticks.get(graphics_name) last_val_sticky = chart.plotItem.getAxis(
'right')._stickies.get(graphics_name)
if last_val_sticky: if last_val_sticky:
last = last_row[array_key] last = last_row[array_key]
last_val_sticky.update_from_data(-1, last) last_val_sticky.update_from_data(-1, last)
@ -359,7 +361,7 @@ class FspAdmin:
tn: trio.Nursery, tn: trio.Nursery,
cluster: dict[str, tractor.Portal], cluster: dict[str, tractor.Portal],
linked: LinkedSplits, linked: LinkedSplits,
src_shm: ShmArray, flume: Flume,
) -> None: ) -> None:
self.tn = tn self.tn = tn
@ -371,7 +373,7 @@ class FspAdmin:
tuple[tractor.MsgStream, ShmArray] tuple[tractor.MsgStream, ShmArray]
] = {} ] = {}
self._flow_registry: dict[_Token, str] = {} self._flow_registry: dict[_Token, str] = {}
self.src_shm = src_shm self.flume = flume
def rr_next_portal(self) -> tractor.Portal: def rr_next_portal(self) -> tractor.Portal:
name, portal = next(self._rr_next_actor) name, portal = next(self._rr_next_actor)
@ -406,7 +408,7 @@ class FspAdmin:
fqsn=fqsn, fqsn=fqsn,
# mems # mems
src_shm_token=self.src_shm.token, src_shm_token=self.flume.rt_shm.token,
dst_shm_token=dst_shm.token, dst_shm_token=dst_shm.token,
# target # target
@ -466,7 +468,7 @@ class FspAdmin:
) -> (ShmArray, trio.Event): ) -> (ShmArray, trio.Event):
fqsn = self.linked.symbol.front_fqsn() fqsn = self.flume.symbol.fqsn
# allocate an output shm array # allocate an output shm array
key, dst_shm, opened = maybe_mk_fsp_shm( key, dst_shm, opened = maybe_mk_fsp_shm(
@ -475,7 +477,7 @@ class FspAdmin:
readonly=True, readonly=True,
) )
self._flow_registry[( self._flow_registry[(
self.src_shm._token, self.flume.rt_shm._token,
target.name target.name
)] = dst_shm._token )] = dst_shm._token
@ -537,7 +539,7 @@ class FspAdmin:
@acm @acm
async def open_fsp_admin( async def open_fsp_admin(
linked: LinkedSplits, linked: LinkedSplits,
src_shm: ShmArray, flume: Flume,
**kwargs, **kwargs,
) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: ) -> AsyncGenerator[dict, dict[str, tractor.Portal]]:
@ -558,7 +560,7 @@ async def open_fsp_admin(
tn, tn,
cluster_map, cluster_map,
linked, linked,
src_shm, flume,
) )
try: try:
yield admin yield admin
@ -572,7 +574,7 @@ async def open_fsp_admin(
async def open_vlm_displays( async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
ohlcv: ShmArray, flume: Flume,
dvlm: bool = True, dvlm: bool = True,
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
@ -594,6 +596,8 @@ async def open_vlm_displays(
sig = inspect.signature(flow_rates.func) sig = inspect.signature(flow_rates.func)
params = sig.parameters params = sig.parameters
ohlcv: ShmArray = flume.rt_shm
async with ( async with (
open_fsp_sidepane( open_fsp_sidepane(
linked, { linked, {
@ -613,7 +617,7 @@ async def open_vlm_displays(
} }
}, },
) as sidepane, ) as sidepane,
open_fsp_admin(linked, ohlcv) as admin, open_fsp_admin(linked, flume) as admin,
): ):
# TODO: support updates # TODO: support updates
# period_field = sidepane.fields['period'] # period_field = sidepane.fields['period']
@ -641,6 +645,8 @@ async def open_vlm_displays(
# the curve item internals are pretty convoluted. # the curve item internals are pretty convoluted.
style='step', style='step',
) )
# back-link the volume chart to trigger y-autoranging
# in the ohlc (parent) chart.
ohlc_chart.view.enable_auto_yrange( ohlc_chart.view.enable_auto_yrange(
src_vb=chart.view, src_vb=chart.view,
) )
@ -682,7 +688,8 @@ async def open_vlm_displays(
assert chart.name != linked.chart.name assert chart.name != linked.chart.name
# sticky only on sub-charts atm # sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name] last_val_sticky = chart.plotItem.getAxis(
'right')._stickies.get(chart.name)
# read from last calculated value # read from last calculated value
value = shm.array['volume'][-1] value = shm.array['volume'][-1]
@ -925,7 +932,7 @@ async def open_vlm_displays(
async def start_fsp_displays( async def start_fsp_displays(
linked: LinkedSplits, linked: LinkedSplits,
ohlcv: ShmArray, flume: Flume,
group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
@ -968,7 +975,10 @@ async def start_fsp_displays(
async with ( async with (
# NOTE: this admin internally opens an actor cluster # NOTE: this admin internally opens an actor cluster
open_fsp_admin(linked, ohlcv) as admin, open_fsp_admin(
linked,
flume,
) as admin,
): ):
statuses = [] statuses = []
for target, conf in fsp_conf.items(): for target, conf in fsp_conf.items():