Pass `Flume`s throughout FSP-ui and charting APIs

Since higher level charting and fsp management need access to the
new `Flume` indexing apis this adjusts some func sigs to pass through
(and/or create) flume instances:
- `LinkedSplits.add_plot()` and dependents.
- `ChartPlotWidget.draw_curve()` and deps, and it now returns a `Flow`.
- `.ui._fsp.open_fsp_admin()` and `FspAdmin.open_fsp_ui()` related
  methods => now we wrap the destination fsp shm in a flume on the admin
  side and is returned from `.start_engine_method()`.

Drop a bunch of (unused) chart widget methods including some already
moved to flume methods: `.get_index()`, `.in_view()`,
`.last_bar_in_view()`, `.is_valid_index()`.
multi_symbol_input
Tyler Goodlet 2022-11-24 14:48:30 -05:00
parent 69ea296a9b
commit 8d592886fa
5 changed files with 143 additions and 97 deletions

View File

@ -1037,12 +1037,11 @@ async def allocate_persistent_feed(
flume = Flume(
symbol=symbol,
_hist_shm_token=hist_shm.token,
_rt_shm_token=rt_shm.token,
first_quote=first_quote,
_rt_shm_token=rt_shm.token,
_hist_shm_token=hist_shm.token,
izero_hist=izero_hist,
izero_rt=izero_rt,
# throttle_rate=tick_throttle,
)
# for ambiguous names we simply apply the retreived

View File

@ -38,7 +38,6 @@ from PyQt5.QtWidgets import (
QVBoxLayout,
QSplitter,
)
import numpy as np
import pyqtgraph as pg
import trio
@ -63,7 +62,10 @@ from ._style import (
_xaxis_at,
_min_points_to_show,
)
from ..data.feed import Feed
from ..data.feed import (
Feed,
Flume,
)
from ..data._source import Symbol
from ..log import get_logger
from ._interaction import ChartView
@ -538,6 +540,7 @@ class LinkedSplits(QWidget):
symbol: Symbol,
shm: ShmArray,
flume: Flume,
sidepane: FieldsForm,
style: str = 'ohlc_bar',
@ -562,6 +565,7 @@ class LinkedSplits(QWidget):
self.chart = self.add_plot(
name=symbol.fqsn,
shm=shm,
flume=flume,
style=style,
_is_main=True,
sidepane=sidepane,
@ -582,6 +586,7 @@ class LinkedSplits(QWidget):
name: str,
shm: ShmArray,
flume: Flume,
array_key: Optional[str] = None,
style: str = 'line',
@ -705,9 +710,11 @@ class LinkedSplits(QWidget):
# draw curve graphics
if style == 'ohlc_bar':
graphics, data_key = cpw.draw_ohlc(
# graphics, data_key = cpw.draw_ohlc(
flow = cpw.draw_ohlc(
name,
shm,
flume=flume,
array_key=array_key
)
self.cursor.contents_labels.add_label(
@ -719,18 +726,22 @@ class LinkedSplits(QWidget):
elif style == 'line':
add_label = True
graphics, data_key = cpw.draw_curve(
# graphics, data_key = cpw.draw_curve(
flow = cpw.draw_curve(
name,
shm,
flume,
array_key=array_key,
color='default_light',
)
elif style == 'step':
add_label = True
graphics, data_key = cpw.draw_curve(
# graphics, data_key = cpw.draw_curve(
flow = cpw.draw_curve(
name,
shm,
flume,
array_key=array_key,
step_mode=True,
color='davies',
@ -740,6 +751,9 @@ class LinkedSplits(QWidget):
else:
raise ValueError(f"Chart style {style} is currently unsupported")
graphics = flow.graphics
data_key = flow.name
if _is_main:
assert style == 'ohlc_bar', 'main chart must be OHLC'
else:
@ -937,12 +951,6 @@ class ChartPlotWidget(pg.PlotWidget):
def focus(self) -> None:
self.view.setFocus()
def last_bar_in_view(self) -> int:
self._arrays[self.name][-1]['index']
def is_valid_index(self, index: int) -> bool:
return index >= 0 and index < self._arrays[self.name][-1]['index']
def _set_xlimits(
self,
xfirst: int,
@ -1035,9 +1043,14 @@ class ChartPlotWidget(pg.PlotWidget):
log.warning(f'`Flow` for {self.name} not loaded yet?')
return
index = flow.shm.array['index']
arr = flow.shm.array
index = arr['index']
# times = arr['time']
# these will be epoch time floats
xfirst, xlast = index[0], index[-1]
l, lbar, rbar, r = self.bars_range()
view = self.view
if (
@ -1194,6 +1207,7 @@ class ChartPlotWidget(pg.PlotWidget):
name: str,
shm: ShmArray,
flume: Flume,
array_key: Optional[str] = None,
overlay: bool = False,
@ -1206,10 +1220,7 @@ class ChartPlotWidget(pg.PlotWidget):
**graphics_kwargs,
) -> tuple[
pg.GraphicsObject,
str,
]:
) -> Flow:
'''
Draw a "curve" (line plot graphics) for the provided data in
the input shm array ``shm``.
@ -1243,14 +1254,17 @@ class ChartPlotWidget(pg.PlotWidget):
**graphics_kwargs,
)
self._flows[data_key] = Flow(
name=name,
plot=pi,
_shm=shm,
flow = self._flows[data_key] = Flow(
data_key,
pi,
shm,
flume,
is_ohlc=is_ohlc,
# register curve graphics with this flow
graphics=graphics,
)
assert isinstance(flow.shm, ShmArray)
# TODO: this probably needs its own method?
if overlay:
@ -1307,24 +1321,26 @@ class ChartPlotWidget(pg.PlotWidget):
# understand.
pi.addItem(graphics)
return graphics, data_key
return flow
def draw_ohlc(
self,
name: str,
shm: ShmArray,
flume: Flume,
array_key: Optional[str] = None,
**draw_curve_kwargs,
) -> (pg.GraphicsObject, str):
) -> Flow:
'''
Draw OHLC datums to chart.
'''
return self.draw_curve(
name=name,
shm=shm,
name,
shm,
flume,
array_key=array_key,
is_ohlc=True,
**draw_curve_kwargs,
@ -1389,37 +1405,6 @@ class ChartPlotWidget(pg.PlotWidget):
self.sig_mouse_leave.emit(self)
self.scene().leaveEvent(ev)
def get_index(self, time: float) -> int:
# TODO: this should go onto some sort of
# data-view thinger..right?
ohlc = self._flows[self.name].shm.array
# XXX: not sure why the time is so off here
# looks like we're gonna have to do some fixing..
indexes = ohlc['time'] >= time
if any(indexes):
return ohlc['index'][indexes][-1]
else:
return ohlc['index'][-1]
def in_view(
self,
array: np.ndarray,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
l, lbar, rbar, r = self.bars_range()
ifirst = array[0]['index']
# slice data by offset from the first index
# available in the passed datum set.
return array[lbar - ifirst:(rbar - ifirst) + 1]
def maxmin(
self,
name: Optional[str] = None,

View File

@ -1191,6 +1191,7 @@ async def display_symbol_data(
hist_chart = hist_linked.plot_ohlc_main(
symbol,
hist_ohlcv,
flume,
# in the case of history chart we explicitly set `False`
# to avoid internal pane creation.
# sidepane=False,
@ -1204,6 +1205,7 @@ async def display_symbol_data(
rt_chart = rt_linked.plot_ohlc_main(
symbol,
ohlcv,
flume,
# in the case of history chart we explicitly set `False`
# to avoid internal pane creation.
sidepane=pp_pane,
@ -1275,9 +1277,10 @@ async def display_symbol_data(
hist_pi.hideAxis('left')
hist_pi.hideAxis('bottom')
curve, _ = hist_chart.draw_curve(
name=fqsn,
shm=hist_ohlcv,
flow = hist_chart.draw_curve(
fqsn,
hist_ohlcv,
flume,
array_key=fqsn,
overlay=hist_pi,
pi=hist_pi,
@ -1307,9 +1310,10 @@ async def display_symbol_data(
rt_pi.hideAxis('left')
rt_pi.hideAxis('bottom')
curve, _ = rt_chart.draw_curve(
name=fqsn,
shm=ohlcv,
flow = rt_chart.draw_curve(
fqsn,
ohlcv,
flume,
array_key=fqsn,
overlay=rt_pi,
pi=rt_pi,

View File

@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF
from ..data._sharedmem import (
ShmArray,
)
from ..data.feed import Flume
from .._profile import (
pg_profile_enabled,
# ms_slower_then,
@ -208,13 +209,16 @@ class Flow(msgspec.Struct): # , frozen=True):
'''
name: str
plot: pg.PlotItem
graphics: Curve | BarItems
_shm: ShmArray
flume: Flume
graphics: Curve | BarItems
# for tracking y-mn/mx for y-axis auto-ranging
yrange: tuple[float, float] = None
# in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling,
# normally this is just a plain line.
# graphical "type" or, "form" when downsampling, to
# start this is only ever an interpolation line.
ds_graphics: Optional[Curve] = None
is_ohlc: bool = False
@ -249,9 +253,9 @@ class Flow(msgspec.Struct): # , frozen=True):
# TODO: remove this and only allow setting through
# private ``._shm`` attr?
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm
# @shm.setter
# def shm(self, shm: ShmArray) -> ShmArray:
# self._shm = shm
def maxmin(
self,
@ -318,9 +322,15 @@ class Flow(msgspec.Struct): # , frozen=True):
'''
vr = self.plot.viewRect()
return int(vr.left()), int(vr.right())
return (
vr.left(),
vr.right(),
)
def datums_range(self) -> tuple[
def datums_range(
self,
index_field: str = 'index',
) -> tuple[
int, int, int, int, int, int
]:
'''
@ -328,6 +338,8 @@ class Flow(msgspec.Struct): # , frozen=True):
'''
l, r = self.view_range()
l = round(l)
r = round(r)
# TODO: avoid this and have shm passed
# in earlier.
@ -348,15 +360,23 @@ class Flow(msgspec.Struct): # , frozen=True):
def read(
self,
array_field: Optional[str] = None,
index_field: str = 'index',
) -> tuple[
int, int, np.ndarray,
int, int, np.ndarray,
]:
# read call
'''
Read the underlying shm array buffer and
return the data plus indexes for the first
and last
which has been written to.
'''
# readable data
array = self.shm.array
indexes = array['index']
indexes = array[index_field]
ifirst = indexes[0]
ilast = indexes[-1]

View File

@ -43,6 +43,7 @@ from ..data._sharedmem import (
try_read,
)
from ..data.feed import Flume
from ..data._source import Symbol
from ._chart import (
ChartPlotWidget,
LinkedSplits,
@ -213,7 +214,7 @@ async def open_fsp_actor_cluster(
async def run_fsp_ui(
linkedsplits: LinkedSplits,
shm: ShmArray,
flume: Flume,
started: trio.Event,
target: Fsp,
conf: dict[str, dict],
@ -250,9 +251,11 @@ async def run_fsp_ui(
else:
chart = linkedsplits.subplots[overlay_with]
shm = flume.rt_shm
chart.draw_curve(
name=name,
shm=shm,
name,
shm,
flume,
overlay=True,
color='default_light',
array_key=name,
@ -262,8 +265,9 @@ async def run_fsp_ui(
else:
# create a new sub-chart widget for this fsp
chart = linkedsplits.add_plot(
name=name,
shm=shm,
name,
shm,
flume,
array_key=name,
sidepane=sidepane,
@ -353,6 +357,9 @@ async def run_fsp_ui(
# last = time.time()
# TODO: maybe this should be our ``Flow`` type since it maps
# one flume to the next? The machinery for task/actor mgmt should
# be part of the instantiation API?
class FspAdmin:
'''
Client API for orchestrating FSP actors and displaying
@ -376,6 +383,10 @@ class FspAdmin:
tuple[tractor.MsgStream, ShmArray]
] = {}
self._flow_registry: dict[_Token, str] = {}
# TODO: make this a `.src_flume` and add
# a `dst_flume`?
# (=> but then wouldn't this be the most basic `Flow`?)
self.flume = flume
def rr_next_portal(self) -> tractor.Portal:
@ -389,7 +400,7 @@ class FspAdmin:
complete: trio.Event,
started: trio.Event,
fqsn: str,
dst_shm: ShmArray,
dst_fsp_flume: Flume,
conf: dict,
target: Fsp,
loglevel: str,
@ -410,9 +421,10 @@ class FspAdmin:
# data feed key
fqsn=fqsn,
# TODO: pass `Flume.to_msg()`s here?
# mems
src_shm_token=self.flume.rt_shm.token,
dst_shm_token=dst_shm.token,
dst_shm_token=dst_fsp_flume.rt_shm.token,
# target
ns_path=ns_path,
@ -429,12 +441,14 @@ class FspAdmin:
ctx.open_stream() as stream,
):
dst_fsp_flume.stream: tractor.MsgStream = stream
# register output data
self._registry[
(fqsn, ns_path)
] = (
stream,
dst_shm,
dst_fsp_flume.rt_shm,
complete
)
@ -469,7 +483,7 @@ class FspAdmin:
worker_name: Optional[str] = None,
loglevel: str = 'info',
) -> (ShmArray, trio.Event):
) -> (Flume, trio.Event):
fqsn = self.flume.symbol.fqsn
@ -479,6 +493,26 @@ class FspAdmin:
target=target,
readonly=True,
)
portal = self.cluster.get(worker_name) or self.rr_next_portal()
provider_tag = portal.channel.uid
symbol = Symbol(
key=key,
broker_info={
provider_tag: {'asset_type': 'fsp'},
},
)
dst_fsp_flume = Flume(
symbol=symbol,
_rt_shm_token=dst_shm.token,
first_quote={},
# set to 0 presuming for now that we can't load
# FSP history (though we should eventually).
izero_hist=0,
izero_rt=0,
)
self._flow_registry[(
self.flume.rt_shm._token,
target.name
@ -489,7 +523,6 @@ class FspAdmin:
# f'Already started FSP `{fqsn}:{func_name}`'
# )
portal = self.cluster.get(worker_name) or self.rr_next_portal()
complete = trio.Event()
started = trio.Event()
self.tn.start_soon(
@ -498,13 +531,13 @@ class FspAdmin:
complete,
started,
fqsn,
dst_shm,
dst_fsp_flume,
conf,
target,
loglevel,
)
return dst_shm, started
return dst_fsp_flume, started
async def open_fsp_chart(
self,
@ -516,7 +549,7 @@ class FspAdmin:
) -> (trio.Event, ChartPlotWidget):
shm, started = await self.start_engine_task(
flume, started = await self.start_engine_task(
target,
conf,
loglevel,
@ -528,7 +561,7 @@ class FspAdmin:
run_fsp_ui,
self.linked,
shm,
flume,
started,
target,
@ -636,6 +669,7 @@ async def open_vlm_displays(
chart = linked.add_plot(
name='volume',
shm=shm,
flume=flume,
array_key='volume',
sidepane=sidepane,
@ -715,7 +749,7 @@ async def open_vlm_displays(
tasks_ready = []
# spawn and overlay $ vlm on the same subchart
dvlm_shm, started = await admin.start_engine_task(
dvlm_flume, started = await admin.start_engine_task(
dolla_vlm,
{ # fsp engine conf
@ -812,9 +846,13 @@ async def open_vlm_displays(
else:
color = 'bracket'
curve, _ = chart.draw_curve(
name=name,
shm=shm,
assert isinstance(shm, ShmArray)
assert isinstance(flume, Flume)
flow = chart.draw_curve(
name,
shm,
flume,
array_key=name,
overlay=pi,
color=color,
@ -827,20 +865,20 @@ async def open_vlm_displays(
# specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in
# ``.draw_curve()``.
flow = chart._flows[name]
# flow = chart._flows[name]
assert flow.plot is pi
chart_curves(
fields,
dvlm_pi,
dvlm_shm,
dvlm_flume.rt_shm,
step_mode=True,
)
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since this one depends on it.
fr_shm, started = await admin.start_engine_task(
fr_flume, started = await admin.start_engine_task(
flow_rates,
{ # fsp engine conf
'func_name': 'flow_rates',
@ -853,7 +891,7 @@ async def open_vlm_displays(
# chart_curves(
# dvlm_rate_fields,
# dvlm_pi,
# fr_shm,
# fr_flume.rt_shm,
# )
# TODO: is there a way to "sync" the dual axes such that only
@ -901,7 +939,7 @@ async def open_vlm_displays(
chart_curves(
trade_rate_fields,
tr_pi,
fr_shm,
fr_flume.rt_shm,
# step_mode=True,
# dashed line to represent "individual trades" being