Pass `Flume`s throughout FSP control APIs
FSP management is much better suited to accessing to the new higher level `Flume` indexing apis; this adjusts some func sigs to pass through (and/or create) flume instances, particularly `.ui._fsp.open_fsp_admin()` and `FspAdmin.open_fsp_ui()` related methods. Now we wrap the destination fsp shm in a flume on the admin (aka normally UI/client) side and is returned from `.start_engine_method()`.fsps_and_flumes
parent
ca5665ae52
commit
ae1773d6e5
|
@ -1037,12 +1037,11 @@ async def allocate_persistent_feed(
|
||||||
|
|
||||||
flume = Flume(
|
flume = Flume(
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
_hist_shm_token=hist_shm.token,
|
|
||||||
_rt_shm_token=rt_shm.token,
|
|
||||||
first_quote=first_quote,
|
first_quote=first_quote,
|
||||||
|
_rt_shm_token=rt_shm.token,
|
||||||
|
_hist_shm_token=hist_shm.token,
|
||||||
izero_hist=izero_hist,
|
izero_hist=izero_hist,
|
||||||
izero_rt=izero_rt,
|
izero_rt=izero_rt,
|
||||||
# throttle_rate=tick_throttle,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# for ambiguous names we simply apply the retreived
|
# for ambiguous names we simply apply the retreived
|
||||||
|
|
|
@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF
|
||||||
from ..data._sharedmem import (
|
from ..data._sharedmem import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
)
|
)
|
||||||
|
from ..data.feed import Flume
|
||||||
from .._profile import (
|
from .._profile import (
|
||||||
pg_profile_enabled,
|
pg_profile_enabled,
|
||||||
# ms_slower_then,
|
# ms_slower_then,
|
||||||
|
@ -208,13 +209,16 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
'''
|
'''
|
||||||
name: str
|
name: str
|
||||||
plot: pg.PlotItem
|
plot: pg.PlotItem
|
||||||
graphics: Curve | BarItems
|
|
||||||
_shm: ShmArray
|
_shm: ShmArray
|
||||||
|
flume: Flume
|
||||||
|
graphics: Curve | BarItems
|
||||||
|
|
||||||
|
# for tracking y-mn/mx for y-axis auto-ranging
|
||||||
yrange: tuple[float, float] = None
|
yrange: tuple[float, float] = None
|
||||||
|
|
||||||
# in some cases a flow may want to change its
|
# in some cases a flow may want to change its
|
||||||
# graphical "type" or, "form" when downsampling,
|
# graphical "type" or, "form" when downsampling, to
|
||||||
# normally this is just a plain line.
|
# start this is only ever an interpolation line.
|
||||||
ds_graphics: Optional[Curve] = None
|
ds_graphics: Optional[Curve] = None
|
||||||
|
|
||||||
is_ohlc: bool = False
|
is_ohlc: bool = False
|
||||||
|
@ -249,9 +253,9 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
|
|
||||||
# TODO: remove this and only allow setting through
|
# TODO: remove this and only allow setting through
|
||||||
# private ``._shm`` attr?
|
# private ``._shm`` attr?
|
||||||
@shm.setter
|
# @shm.setter
|
||||||
def shm(self, shm: ShmArray) -> ShmArray:
|
# def shm(self, shm: ShmArray) -> ShmArray:
|
||||||
self._shm = shm
|
# self._shm = shm
|
||||||
|
|
||||||
def maxmin(
|
def maxmin(
|
||||||
self,
|
self,
|
||||||
|
@ -318,9 +322,15 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
vr = self.plot.viewRect()
|
vr = self.plot.viewRect()
|
||||||
return int(vr.left()), int(vr.right())
|
return (
|
||||||
|
vr.left(),
|
||||||
|
vr.right(),
|
||||||
|
)
|
||||||
|
|
||||||
def datums_range(self) -> tuple[
|
def datums_range(
|
||||||
|
self,
|
||||||
|
index_field: str = 'index',
|
||||||
|
) -> tuple[
|
||||||
int, int, int, int, int, int
|
int, int, int, int, int, int
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
|
@ -328,6 +338,8 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
l, r = self.view_range()
|
l, r = self.view_range()
|
||||||
|
l = round(l)
|
||||||
|
r = round(r)
|
||||||
|
|
||||||
# TODO: avoid this and have shm passed
|
# TODO: avoid this and have shm passed
|
||||||
# in earlier.
|
# in earlier.
|
||||||
|
@ -348,15 +360,23 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
def read(
|
def read(
|
||||||
self,
|
self,
|
||||||
array_field: Optional[str] = None,
|
array_field: Optional[str] = None,
|
||||||
|
index_field: str = 'index',
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
int, int, np.ndarray,
|
int, int, np.ndarray,
|
||||||
int, int, np.ndarray,
|
int, int, np.ndarray,
|
||||||
]:
|
]:
|
||||||
# read call
|
'''
|
||||||
|
Read the underlying shm array buffer and
|
||||||
|
return the data plus indexes for the first
|
||||||
|
and last
|
||||||
|
which has been written to.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# readable data
|
||||||
array = self.shm.array
|
array = self.shm.array
|
||||||
|
|
||||||
indexes = array['index']
|
indexes = array[index_field]
|
||||||
ifirst = indexes[0]
|
ifirst = indexes[0]
|
||||||
ilast = indexes[-1]
|
ilast = indexes[-1]
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ from ..data._sharedmem import (
|
||||||
try_read,
|
try_read,
|
||||||
)
|
)
|
||||||
from ..data.feed import Flume
|
from ..data.feed import Flume
|
||||||
|
from ..data._source import Symbol
|
||||||
from ._chart import (
|
from ._chart import (
|
||||||
ChartPlotWidget,
|
ChartPlotWidget,
|
||||||
LinkedSplits,
|
LinkedSplits,
|
||||||
|
@ -213,7 +214,7 @@ async def open_fsp_actor_cluster(
|
||||||
async def run_fsp_ui(
|
async def run_fsp_ui(
|
||||||
|
|
||||||
linkedsplits: LinkedSplits,
|
linkedsplits: LinkedSplits,
|
||||||
shm: ShmArray,
|
flume: Flume,
|
||||||
started: trio.Event,
|
started: trio.Event,
|
||||||
target: Fsp,
|
target: Fsp,
|
||||||
conf: dict[str, dict],
|
conf: dict[str, dict],
|
||||||
|
@ -250,9 +251,11 @@ async def run_fsp_ui(
|
||||||
else:
|
else:
|
||||||
chart = linkedsplits.subplots[overlay_with]
|
chart = linkedsplits.subplots[overlay_with]
|
||||||
|
|
||||||
|
shm = flume.rt_shm
|
||||||
chart.draw_curve(
|
chart.draw_curve(
|
||||||
name=name,
|
name,
|
||||||
shm=shm,
|
shm,
|
||||||
|
flume,
|
||||||
overlay=True,
|
overlay=True,
|
||||||
color='default_light',
|
color='default_light',
|
||||||
array_key=name,
|
array_key=name,
|
||||||
|
@ -262,8 +265,9 @@ async def run_fsp_ui(
|
||||||
else:
|
else:
|
||||||
# create a new sub-chart widget for this fsp
|
# create a new sub-chart widget for this fsp
|
||||||
chart = linkedsplits.add_plot(
|
chart = linkedsplits.add_plot(
|
||||||
name=name,
|
name,
|
||||||
shm=shm,
|
shm,
|
||||||
|
flume,
|
||||||
|
|
||||||
array_key=name,
|
array_key=name,
|
||||||
sidepane=sidepane,
|
sidepane=sidepane,
|
||||||
|
@ -353,6 +357,9 @@ async def run_fsp_ui(
|
||||||
# last = time.time()
|
# last = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: maybe this should be our ``Flow`` type since it maps
|
||||||
|
# one flume to the next? The machinery for task/actor mgmt should
|
||||||
|
# be part of the instantiation API?
|
||||||
class FspAdmin:
|
class FspAdmin:
|
||||||
'''
|
'''
|
||||||
Client API for orchestrating FSP actors and displaying
|
Client API for orchestrating FSP actors and displaying
|
||||||
|
@ -376,6 +383,10 @@ class FspAdmin:
|
||||||
tuple[tractor.MsgStream, ShmArray]
|
tuple[tractor.MsgStream, ShmArray]
|
||||||
] = {}
|
] = {}
|
||||||
self._flow_registry: dict[_Token, str] = {}
|
self._flow_registry: dict[_Token, str] = {}
|
||||||
|
|
||||||
|
# TODO: make this a `.src_flume` and add
|
||||||
|
# a `dst_flume`?
|
||||||
|
# (=> but then wouldn't this be the most basic `Flow`?)
|
||||||
self.flume = flume
|
self.flume = flume
|
||||||
|
|
||||||
def rr_next_portal(self) -> tractor.Portal:
|
def rr_next_portal(self) -> tractor.Portal:
|
||||||
|
@ -389,7 +400,7 @@ class FspAdmin:
|
||||||
complete: trio.Event,
|
complete: trio.Event,
|
||||||
started: trio.Event,
|
started: trio.Event,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
dst_shm: ShmArray,
|
dst_fsp_flume: Flume,
|
||||||
conf: dict,
|
conf: dict,
|
||||||
target: Fsp,
|
target: Fsp,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
@ -410,9 +421,10 @@ class FspAdmin:
|
||||||
# data feed key
|
# data feed key
|
||||||
fqsn=fqsn,
|
fqsn=fqsn,
|
||||||
|
|
||||||
|
# TODO: pass `Flume.to_msg()`s here?
|
||||||
# mems
|
# mems
|
||||||
src_shm_token=self.flume.rt_shm.token,
|
src_shm_token=self.flume.rt_shm.token,
|
||||||
dst_shm_token=dst_shm.token,
|
dst_shm_token=dst_fsp_flume.rt_shm.token,
|
||||||
|
|
||||||
# target
|
# target
|
||||||
ns_path=ns_path,
|
ns_path=ns_path,
|
||||||
|
@ -429,12 +441,14 @@ class FspAdmin:
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
|
dst_fsp_flume.stream: tractor.MsgStream = stream
|
||||||
|
|
||||||
# register output data
|
# register output data
|
||||||
self._registry[
|
self._registry[
|
||||||
(fqsn, ns_path)
|
(fqsn, ns_path)
|
||||||
] = (
|
] = (
|
||||||
stream,
|
stream,
|
||||||
dst_shm,
|
dst_fsp_flume.rt_shm,
|
||||||
complete
|
complete
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -469,7 +483,7 @@ class FspAdmin:
|
||||||
worker_name: Optional[str] = None,
|
worker_name: Optional[str] = None,
|
||||||
loglevel: str = 'info',
|
loglevel: str = 'info',
|
||||||
|
|
||||||
) -> (ShmArray, trio.Event):
|
) -> (Flume, trio.Event):
|
||||||
|
|
||||||
fqsn = self.flume.symbol.fqsn
|
fqsn = self.flume.symbol.fqsn
|
||||||
|
|
||||||
|
@ -479,6 +493,26 @@ class FspAdmin:
|
||||||
target=target,
|
target=target,
|
||||||
readonly=True,
|
readonly=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
portal = self.cluster.get(worker_name) or self.rr_next_portal()
|
||||||
|
provider_tag = portal.channel.uid
|
||||||
|
|
||||||
|
symbol = Symbol(
|
||||||
|
key=key,
|
||||||
|
broker_info={
|
||||||
|
provider_tag: {'asset_type': 'fsp'},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
dst_fsp_flume = Flume(
|
||||||
|
symbol=symbol,
|
||||||
|
_rt_shm_token=dst_shm.token,
|
||||||
|
first_quote={},
|
||||||
|
|
||||||
|
# set to 0 presuming for now that we can't load
|
||||||
|
# FSP history (though we should eventually).
|
||||||
|
izero_hist=0,
|
||||||
|
izero_rt=0,
|
||||||
|
)
|
||||||
self._flow_registry[(
|
self._flow_registry[(
|
||||||
self.flume.rt_shm._token,
|
self.flume.rt_shm._token,
|
||||||
target.name
|
target.name
|
||||||
|
@ -489,7 +523,6 @@ class FspAdmin:
|
||||||
# f'Already started FSP `{fqsn}:{func_name}`'
|
# f'Already started FSP `{fqsn}:{func_name}`'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
portal = self.cluster.get(worker_name) or self.rr_next_portal()
|
|
||||||
complete = trio.Event()
|
complete = trio.Event()
|
||||||
started = trio.Event()
|
started = trio.Event()
|
||||||
self.tn.start_soon(
|
self.tn.start_soon(
|
||||||
|
@ -498,13 +531,13 @@ class FspAdmin:
|
||||||
complete,
|
complete,
|
||||||
started,
|
started,
|
||||||
fqsn,
|
fqsn,
|
||||||
dst_shm,
|
dst_fsp_flume,
|
||||||
conf,
|
conf,
|
||||||
target,
|
target,
|
||||||
loglevel,
|
loglevel,
|
||||||
)
|
)
|
||||||
|
|
||||||
return dst_shm, started
|
return dst_fsp_flume, started
|
||||||
|
|
||||||
async def open_fsp_chart(
|
async def open_fsp_chart(
|
||||||
self,
|
self,
|
||||||
|
@ -516,7 +549,7 @@ class FspAdmin:
|
||||||
|
|
||||||
) -> (trio.Event, ChartPlotWidget):
|
) -> (trio.Event, ChartPlotWidget):
|
||||||
|
|
||||||
shm, started = await self.start_engine_task(
|
flume, started = await self.start_engine_task(
|
||||||
target,
|
target,
|
||||||
conf,
|
conf,
|
||||||
loglevel,
|
loglevel,
|
||||||
|
@ -528,7 +561,7 @@ class FspAdmin:
|
||||||
run_fsp_ui,
|
run_fsp_ui,
|
||||||
|
|
||||||
self.linked,
|
self.linked,
|
||||||
shm,
|
flume,
|
||||||
started,
|
started,
|
||||||
target,
|
target,
|
||||||
|
|
||||||
|
@ -636,6 +669,7 @@ async def open_vlm_displays(
|
||||||
chart = linked.add_plot(
|
chart = linked.add_plot(
|
||||||
name='volume',
|
name='volume',
|
||||||
shm=shm,
|
shm=shm,
|
||||||
|
flume=flume,
|
||||||
|
|
||||||
array_key='volume',
|
array_key='volume',
|
||||||
sidepane=sidepane,
|
sidepane=sidepane,
|
||||||
|
@ -715,7 +749,7 @@ async def open_vlm_displays(
|
||||||
|
|
||||||
tasks_ready = []
|
tasks_ready = []
|
||||||
# spawn and overlay $ vlm on the same subchart
|
# spawn and overlay $ vlm on the same subchart
|
||||||
dvlm_shm, started = await admin.start_engine_task(
|
dvlm_flume, started = await admin.start_engine_task(
|
||||||
dolla_vlm,
|
dolla_vlm,
|
||||||
|
|
||||||
{ # fsp engine conf
|
{ # fsp engine conf
|
||||||
|
@ -812,9 +846,13 @@ async def open_vlm_displays(
|
||||||
else:
|
else:
|
||||||
color = 'bracket'
|
color = 'bracket'
|
||||||
|
|
||||||
curve, _ = chart.draw_curve(
|
assert isinstance(shm, ShmArray)
|
||||||
name=name,
|
assert isinstance(flume, Flume)
|
||||||
shm=shm,
|
|
||||||
|
flow = chart.draw_curve(
|
||||||
|
name,
|
||||||
|
shm,
|
||||||
|
flume,
|
||||||
array_key=name,
|
array_key=name,
|
||||||
overlay=pi,
|
overlay=pi,
|
||||||
color=color,
|
color=color,
|
||||||
|
@ -827,20 +865,20 @@ async def open_vlm_displays(
|
||||||
# specially store ref to shm for lookup in display loop
|
# specially store ref to shm for lookup in display loop
|
||||||
# since only a placeholder of `None` is entered in
|
# since only a placeholder of `None` is entered in
|
||||||
# ``.draw_curve()``.
|
# ``.draw_curve()``.
|
||||||
flow = chart._flows[name]
|
# flow = chart._flows[name]
|
||||||
assert flow.plot is pi
|
assert flow.plot is pi
|
||||||
|
|
||||||
chart_curves(
|
chart_curves(
|
||||||
fields,
|
fields,
|
||||||
dvlm_pi,
|
dvlm_pi,
|
||||||
dvlm_shm,
|
dvlm_flume.rt_shm,
|
||||||
step_mode=True,
|
step_mode=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
|
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
|
||||||
# up since this one depends on it.
|
# up since this one depends on it.
|
||||||
|
|
||||||
fr_shm, started = await admin.start_engine_task(
|
fr_flume, started = await admin.start_engine_task(
|
||||||
flow_rates,
|
flow_rates,
|
||||||
{ # fsp engine conf
|
{ # fsp engine conf
|
||||||
'func_name': 'flow_rates',
|
'func_name': 'flow_rates',
|
||||||
|
@ -853,7 +891,7 @@ async def open_vlm_displays(
|
||||||
# chart_curves(
|
# chart_curves(
|
||||||
# dvlm_rate_fields,
|
# dvlm_rate_fields,
|
||||||
# dvlm_pi,
|
# dvlm_pi,
|
||||||
# fr_shm,
|
# fr_flume.rt_shm,
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# TODO: is there a way to "sync" the dual axes such that only
|
# TODO: is there a way to "sync" the dual axes such that only
|
||||||
|
@ -901,7 +939,7 @@ async def open_vlm_displays(
|
||||||
chart_curves(
|
chart_curves(
|
||||||
trade_rate_fields,
|
trade_rate_fields,
|
||||||
tr_pi,
|
tr_pi,
|
||||||
fr_shm,
|
fr_flume.rt_shm,
|
||||||
# step_mode=True,
|
# step_mode=True,
|
||||||
|
|
||||||
# dashed line to represent "individual trades" being
|
# dashed line to represent "individual trades" being
|
||||||
|
|
Loading…
Reference in New Issue