Define a flow registry on `FspAdmin`, use it to update fsp engine clusters

windows_fixes_yo
Tyler Goodlet 2022-02-04 11:08:42 -05:00
parent d130f0449f
commit df6afe24a4
1 changed files with 22 additions and 9 deletions

View File

@ -37,6 +37,7 @@ from .._cacheables import maybe_open_context
from ..calc import humanize
from ..data._sharedmem import (
ShmArray,
_Token,
try_read,
)
from ._chart import (
@ -367,6 +368,7 @@ class FspAdmin:
tuple,
tuple[tractor.MsgStream, ShmArray]
] = {}
self._flow_registry: dict[_Token, str] = {}
self.src_shm = src_shm
def rr_next_portal(self) -> tractor.Portal:
@ -411,6 +413,11 @@ class FspAdmin:
loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False),
shm_registry=[
(token.as_msg(), fsp_name, dst_token.as_msg())
for (token, fsp_name), dst_token
in self._flow_registry.items()
],
) as (ctx, last_index),
ctx.open_stream() as stream,
@ -443,11 +450,15 @@ class FspAdmin:
fqsn = self.linked.symbol.front_feed()
# allocate an output shm array
dst_shm, opened = maybe_mk_fsp_shm(
key, dst_shm, opened = maybe_mk_fsp_shm(
'.'.join(fqsn),
target=target,
readonly=True,
)
self._flow_registry[
(self.src_shm._token, target.name)
] = dst_shm._token
# if not opened:
# raise RuntimeError(
# f'Already started FSP `{fqsn}:{func_name}`'
@ -675,14 +686,6 @@ async def open_vlm_displays(
# had this before??
# dolla_vlm,
# spawn and overlay $ vlm on the same subchart
fr_shm, started = await admin.start_engine_task(
flow_rates,
{ # fsp engine conf
'func_name': 'flow_rates',
},
# loglevel,
)
tasks_ready.append(started)
# profiler(f'created shm for fsp actor: {display_name}')
@ -753,7 +756,17 @@ async def open_vlm_displays(
##################
# Vlm rate overlay
##################
# spawn and overlay $ vlm on the same subchart
fr_shm, started = await admin.start_engine_task(
flow_rates,
{ # fsp engine conf
'func_name': 'flow_rates',
},
# loglevel,
)
await started.wait()
trade_rate_color = vlm_rate_color = 'i3'
fr_pi = chart.overlay_plotitem(
'vlm_rates',
index=0, # place axis on inside (nearest to chart)