Pass `Flume`s throughout FSP-ui and charting APIs
Since higher level charting and fsp management need access to the new `Flume` indexing apis this adjusts some func sigs to pass through (and/or create) flume instances: - `LinkedSplits.add_plot()` and dependents. - `ChartPlotWidget.draw_curve()` and deps, and it now returns a `Flow`. - `.ui._fsp.open_fsp_admin()` and `FspAdmin.open_fsp_ui()` related methods => now we wrap the destination fsp shm in a flume on the admin side and is returned from `.start_engine_method()`. Drop a bunch of (unused) chart widget methods including some already moved to flume methods: `.get_index()`, `.in_view()`, `.last_bar_in_view()`, `.is_valid_index()`.pre_viz_calls
							parent
							
								
									437fc511a3
								
							
						
					
					
						commit
						b92ff7caf9
					
				| 
						 | 
				
			
			@ -1023,12 +1023,11 @@ async def allocate_persistent_feed(
 | 
			
		|||
 | 
			
		||||
    flume = Flume(
 | 
			
		||||
        symbol=symbol,
 | 
			
		||||
        _hist_shm_token=hist_shm.token,
 | 
			
		||||
        _rt_shm_token=rt_shm.token,
 | 
			
		||||
        first_quote=first_quote,
 | 
			
		||||
        _rt_shm_token=rt_shm.token,
 | 
			
		||||
        _hist_shm_token=hist_shm.token,
 | 
			
		||||
        izero_hist=izero_hist,
 | 
			
		||||
        izero_rt=izero_rt,
 | 
			
		||||
        # throttle_rate=tick_throttle,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # for ambiguous names we simply apply the retreived
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -38,7 +38,6 @@ from PyQt5.QtWidgets import (
 | 
			
		|||
    QVBoxLayout,
 | 
			
		||||
    QSplitter,
 | 
			
		||||
)
 | 
			
		||||
import numpy as np
 | 
			
		||||
import pyqtgraph as pg
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -63,7 +62,10 @@ from ._style import (
 | 
			
		|||
    _xaxis_at,
 | 
			
		||||
    _min_points_to_show,
 | 
			
		||||
)
 | 
			
		||||
from ..data.feed import Feed
 | 
			
		||||
from ..data.feed import (
 | 
			
		||||
    Feed,
 | 
			
		||||
    Flume,
 | 
			
		||||
)
 | 
			
		||||
from ..data._source import Symbol
 | 
			
		||||
from ..log import get_logger
 | 
			
		||||
from ._interaction import ChartView
 | 
			
		||||
| 
						 | 
				
			
			@ -538,6 +540,7 @@ class LinkedSplits(QWidget):
 | 
			
		|||
 | 
			
		||||
        symbol: Symbol,
 | 
			
		||||
        shm: ShmArray,
 | 
			
		||||
        flume: Flume,
 | 
			
		||||
        sidepane: FieldsForm,
 | 
			
		||||
 | 
			
		||||
        style: str = 'ohlc_bar',
 | 
			
		||||
| 
						 | 
				
			
			@ -562,6 +565,7 @@ class LinkedSplits(QWidget):
 | 
			
		|||
        self.chart = self.add_plot(
 | 
			
		||||
            name=symbol.fqsn,
 | 
			
		||||
            shm=shm,
 | 
			
		||||
            flume=flume,
 | 
			
		||||
            style=style,
 | 
			
		||||
            _is_main=True,
 | 
			
		||||
            sidepane=sidepane,
 | 
			
		||||
| 
						 | 
				
			
			@ -582,6 +586,7 @@ class LinkedSplits(QWidget):
 | 
			
		|||
 | 
			
		||||
        name: str,
 | 
			
		||||
        shm: ShmArray,
 | 
			
		||||
        flume: Flume,
 | 
			
		||||
 | 
			
		||||
        array_key: Optional[str] = None,
 | 
			
		||||
        style: str = 'line',
 | 
			
		||||
| 
						 | 
				
			
			@ -705,9 +710,11 @@ class LinkedSplits(QWidget):
 | 
			
		|||
        # draw curve graphics
 | 
			
		||||
        if style == 'ohlc_bar':
 | 
			
		||||
 | 
			
		||||
            graphics, data_key = cpw.draw_ohlc(
 | 
			
		||||
            # graphics, data_key = cpw.draw_ohlc(
 | 
			
		||||
            flow = cpw.draw_ohlc(
 | 
			
		||||
                name,
 | 
			
		||||
                shm,
 | 
			
		||||
                flume=flume,
 | 
			
		||||
                array_key=array_key
 | 
			
		||||
            )
 | 
			
		||||
            self.cursor.contents_labels.add_label(
 | 
			
		||||
| 
						 | 
				
			
			@ -719,18 +726,22 @@ class LinkedSplits(QWidget):
 | 
			
		|||
 | 
			
		||||
        elif style == 'line':
 | 
			
		||||
            add_label = True
 | 
			
		||||
            graphics, data_key = cpw.draw_curve(
 | 
			
		||||
            # graphics, data_key = cpw.draw_curve(
 | 
			
		||||
            flow = cpw.draw_curve(
 | 
			
		||||
                name,
 | 
			
		||||
                shm,
 | 
			
		||||
                flume,
 | 
			
		||||
                array_key=array_key,
 | 
			
		||||
                color='default_light',
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        elif style == 'step':
 | 
			
		||||
            add_label = True
 | 
			
		||||
            graphics, data_key = cpw.draw_curve(
 | 
			
		||||
            # graphics, data_key = cpw.draw_curve(
 | 
			
		||||
            flow = cpw.draw_curve(
 | 
			
		||||
                name,
 | 
			
		||||
                shm,
 | 
			
		||||
                flume,
 | 
			
		||||
                array_key=array_key,
 | 
			
		||||
                step_mode=True,
 | 
			
		||||
                color='davies',
 | 
			
		||||
| 
						 | 
				
			
			@ -740,6 +751,9 @@ class LinkedSplits(QWidget):
 | 
			
		|||
        else:
 | 
			
		||||
            raise ValueError(f"Chart style {style} is currently unsupported")
 | 
			
		||||
 | 
			
		||||
        graphics = flow.graphics
 | 
			
		||||
        data_key = flow.name
 | 
			
		||||
 | 
			
		||||
        if _is_main:
 | 
			
		||||
            assert style == 'ohlc_bar', 'main chart must be OHLC'
 | 
			
		||||
        else:
 | 
			
		||||
| 
						 | 
				
			
			@ -937,12 +951,6 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
			
		|||
    def focus(self) -> None:
 | 
			
		||||
        self.view.setFocus()
 | 
			
		||||
 | 
			
		||||
    def last_bar_in_view(self) -> int:
 | 
			
		||||
        self._arrays[self.name][-1]['index']
 | 
			
		||||
 | 
			
		||||
    def is_valid_index(self, index: int) -> bool:
 | 
			
		||||
        return index >= 0 and index < self._arrays[self.name][-1]['index']
 | 
			
		||||
 | 
			
		||||
    def _set_xlimits(
 | 
			
		||||
        self,
 | 
			
		||||
        xfirst: int,
 | 
			
		||||
| 
						 | 
				
			
			@ -1035,9 +1043,14 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
			
		|||
            log.warning(f'`Flow` for {self.name} not loaded yet?')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        index = flow.shm.array['index']
 | 
			
		||||
        arr = flow.shm.array
 | 
			
		||||
        index = arr['index']
 | 
			
		||||
        # times = arr['time']
 | 
			
		||||
 | 
			
		||||
        # these will be epoch time floats
 | 
			
		||||
        xfirst, xlast = index[0], index[-1]
 | 
			
		||||
        l, lbar, rbar, r = self.bars_range()
 | 
			
		||||
 | 
			
		||||
        view = self.view
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
| 
						 | 
				
			
			@ -1194,6 +1207,7 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
			
		|||
 | 
			
		||||
        name: str,
 | 
			
		||||
        shm: ShmArray,
 | 
			
		||||
        flume: Flume,
 | 
			
		||||
 | 
			
		||||
        array_key: Optional[str] = None,
 | 
			
		||||
        overlay: bool = False,
 | 
			
		||||
| 
						 | 
				
			
			@ -1206,10 +1220,7 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
			
		|||
 | 
			
		||||
        **graphics_kwargs,
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        pg.GraphicsObject,
 | 
			
		||||
        str,
 | 
			
		||||
    ]:
 | 
			
		||||
    ) -> Flow:
 | 
			
		||||
        '''
 | 
			
		||||
        Draw a "curve" (line plot graphics) for the provided data in
 | 
			
		||||
        the input shm array ``shm``.
 | 
			
		||||
| 
						 | 
				
			
			@ -1243,14 +1254,17 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
			
		|||
                **graphics_kwargs,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        self._flows[data_key] = Flow(
 | 
			
		||||
            name=name,
 | 
			
		||||
            plot=pi,
 | 
			
		||||
            _shm=shm,
 | 
			
		||||
        flow = self._flows[data_key] = Flow(
 | 
			
		||||
            data_key,
 | 
			
		||||
            pi,
 | 
			
		||||
            shm,
 | 
			
		||||
            flume,
 | 
			
		||||
 | 
			
		||||
            is_ohlc=is_ohlc,
 | 
			
		||||
            # register curve graphics with this flow
 | 
			
		||||
            graphics=graphics,
 | 
			
		||||
        )
 | 
			
		||||
        assert isinstance(flow.shm, ShmArray)
 | 
			
		||||
 | 
			
		||||
        # TODO: this probably needs its own method?
 | 
			
		||||
        if overlay:
 | 
			
		||||
| 
						 | 
				
			
			@ -1307,24 +1321,26 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
			
		|||
        # understand.
 | 
			
		||||
        pi.addItem(graphics)
 | 
			
		||||
 | 
			
		||||
        return graphics, data_key
 | 
			
		||||
        return flow
 | 
			
		||||
 | 
			
		||||
    def draw_ohlc(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str,
 | 
			
		||||
        shm: ShmArray,
 | 
			
		||||
        flume: Flume,
 | 
			
		||||
 | 
			
		||||
        array_key: Optional[str] = None,
 | 
			
		||||
        **draw_curve_kwargs,
 | 
			
		||||
 | 
			
		||||
    ) -> (pg.GraphicsObject, str):
 | 
			
		||||
    ) -> Flow:
 | 
			
		||||
        '''
 | 
			
		||||
        Draw OHLC datums to chart.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self.draw_curve(
 | 
			
		||||
            name=name,
 | 
			
		||||
            shm=shm,
 | 
			
		||||
            name,
 | 
			
		||||
            shm,
 | 
			
		||||
            flume,
 | 
			
		||||
            array_key=array_key,
 | 
			
		||||
            is_ohlc=True,
 | 
			
		||||
            **draw_curve_kwargs,
 | 
			
		||||
| 
						 | 
				
			
			@ -1389,37 +1405,6 @@ class ChartPlotWidget(pg.PlotWidget):
 | 
			
		|||
        self.sig_mouse_leave.emit(self)
 | 
			
		||||
        self.scene().leaveEvent(ev)
 | 
			
		||||
 | 
			
		||||
    def get_index(self, time: float) -> int:
 | 
			
		||||
 | 
			
		||||
        # TODO: this should go onto some sort of
 | 
			
		||||
        # data-view thinger..right?
 | 
			
		||||
        ohlc = self._flows[self.name].shm.array
 | 
			
		||||
 | 
			
		||||
        # XXX: not sure why the time is so off here
 | 
			
		||||
        # looks like we're gonna have to do some fixing..
 | 
			
		||||
        indexes = ohlc['time'] >= time
 | 
			
		||||
 | 
			
		||||
        if any(indexes):
 | 
			
		||||
            return ohlc['index'][indexes][-1]
 | 
			
		||||
        else:
 | 
			
		||||
            return ohlc['index'][-1]
 | 
			
		||||
 | 
			
		||||
    def in_view(
 | 
			
		||||
        self,
 | 
			
		||||
        array: np.ndarray,
 | 
			
		||||
 | 
			
		||||
    ) -> np.ndarray:
 | 
			
		||||
        '''
 | 
			
		||||
        Slice an input struct array providing only datums
 | 
			
		||||
        "in view" of this chart.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        l, lbar, rbar, r = self.bars_range()
 | 
			
		||||
        ifirst = array[0]['index']
 | 
			
		||||
        # slice data by offset from the first index
 | 
			
		||||
        # available in the passed datum set.
 | 
			
		||||
        return array[lbar - ifirst:(rbar - ifirst) + 1]
 | 
			
		||||
 | 
			
		||||
    def maxmin(
 | 
			
		||||
        self,
 | 
			
		||||
        name: Optional[str] = None,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1191,6 +1191,7 @@ async def display_symbol_data(
 | 
			
		|||
        hist_chart = hist_linked.plot_ohlc_main(
 | 
			
		||||
            symbol,
 | 
			
		||||
            hist_ohlcv,
 | 
			
		||||
            flume,
 | 
			
		||||
            # in the case of history chart we explicitly set `False`
 | 
			
		||||
            # to avoid internal pane creation.
 | 
			
		||||
            # sidepane=False,
 | 
			
		||||
| 
						 | 
				
			
			@ -1204,6 +1205,7 @@ async def display_symbol_data(
 | 
			
		|||
        rt_chart = rt_linked.plot_ohlc_main(
 | 
			
		||||
            symbol,
 | 
			
		||||
            ohlcv,
 | 
			
		||||
            flume,
 | 
			
		||||
            # in the case of history chart we explicitly set `False`
 | 
			
		||||
            # to avoid internal pane creation.
 | 
			
		||||
            sidepane=pp_pane,
 | 
			
		||||
| 
						 | 
				
			
			@ -1275,9 +1277,10 @@ async def display_symbol_data(
 | 
			
		|||
                hist_pi.hideAxis('left')
 | 
			
		||||
                hist_pi.hideAxis('bottom')
 | 
			
		||||
 | 
			
		||||
                curve, _ = hist_chart.draw_curve(
 | 
			
		||||
                    name=fqsn,
 | 
			
		||||
                    shm=hist_ohlcv,
 | 
			
		||||
                flow = hist_chart.draw_curve(
 | 
			
		||||
                    fqsn,
 | 
			
		||||
                    hist_ohlcv,
 | 
			
		||||
                    flume,
 | 
			
		||||
                    array_key=fqsn,
 | 
			
		||||
                    overlay=hist_pi,
 | 
			
		||||
                    pi=hist_pi,
 | 
			
		||||
| 
						 | 
				
			
			@ -1307,9 +1310,10 @@ async def display_symbol_data(
 | 
			
		|||
                rt_pi.hideAxis('left')
 | 
			
		||||
                rt_pi.hideAxis('bottom')
 | 
			
		||||
 | 
			
		||||
                curve, _ = rt_chart.draw_curve(
 | 
			
		||||
                    name=fqsn,
 | 
			
		||||
                    shm=ohlcv,
 | 
			
		||||
                flow = rt_chart.draw_curve(
 | 
			
		||||
                    fqsn,
 | 
			
		||||
                    ohlcv,
 | 
			
		||||
                    flume,
 | 
			
		||||
                    array_key=fqsn,
 | 
			
		||||
                    overlay=rt_pi,
 | 
			
		||||
                    pi=rt_pi,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF
 | 
			
		|||
from ..data._sharedmem import (
 | 
			
		||||
    ShmArray,
 | 
			
		||||
)
 | 
			
		||||
from ..data.feed import Flume
 | 
			
		||||
from .._profile import (
 | 
			
		||||
    pg_profile_enabled,
 | 
			
		||||
    # ms_slower_then,
 | 
			
		||||
| 
						 | 
				
			
			@ -208,13 +209,16 @@ class Flow(msgspec.Struct):  # , frozen=True):
 | 
			
		|||
    '''
 | 
			
		||||
    name: str
 | 
			
		||||
    plot: pg.PlotItem
 | 
			
		||||
    graphics: Curve | BarItems
 | 
			
		||||
    _shm: ShmArray
 | 
			
		||||
    flume: Flume
 | 
			
		||||
    graphics: Curve | BarItems
 | 
			
		||||
 | 
			
		||||
    # for tracking y-mn/mx for y-axis auto-ranging
 | 
			
		||||
    yrange: tuple[float, float] = None
 | 
			
		||||
 | 
			
		||||
    # in some cases a flow may want to change its
 | 
			
		||||
    # graphical "type" or, "form" when downsampling,
 | 
			
		||||
    # normally this is just a plain line.
 | 
			
		||||
    # graphical "type" or, "form" when downsampling, to
 | 
			
		||||
    # start this is only ever an interpolation line.
 | 
			
		||||
    ds_graphics: Optional[Curve] = None
 | 
			
		||||
 | 
			
		||||
    is_ohlc: bool = False
 | 
			
		||||
| 
						 | 
				
			
			@ -249,9 +253,9 @@ class Flow(msgspec.Struct):  # , frozen=True):
 | 
			
		|||
 | 
			
		||||
    # TODO: remove this and only allow setting through
 | 
			
		||||
    # private ``._shm`` attr?
 | 
			
		||||
    @shm.setter
 | 
			
		||||
    def shm(self, shm: ShmArray) -> ShmArray:
 | 
			
		||||
        self._shm = shm
 | 
			
		||||
    # @shm.setter
 | 
			
		||||
    # def shm(self, shm: ShmArray) -> ShmArray:
 | 
			
		||||
    #     self._shm = shm
 | 
			
		||||
 | 
			
		||||
    def maxmin(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			@ -318,9 +322,15 @@ class Flow(msgspec.Struct):  # , frozen=True):
 | 
			
		|||
 | 
			
		||||
        '''
 | 
			
		||||
        vr = self.plot.viewRect()
 | 
			
		||||
        return int(vr.left()), int(vr.right())
 | 
			
		||||
        return (
 | 
			
		||||
            vr.left(),
 | 
			
		||||
            vr.right(),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def datums_range(self) -> tuple[
 | 
			
		||||
    def datums_range(
 | 
			
		||||
        self,
 | 
			
		||||
        index_field: str = 'index',
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        int, int, int, int, int, int
 | 
			
		||||
    ]:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			@ -328,6 +338,8 @@ class Flow(msgspec.Struct):  # , frozen=True):
 | 
			
		|||
 | 
			
		||||
        '''
 | 
			
		||||
        l, r = self.view_range()
 | 
			
		||||
        l = round(l)
 | 
			
		||||
        r = round(r)
 | 
			
		||||
 | 
			
		||||
        # TODO: avoid this and have shm passed
 | 
			
		||||
        # in earlier.
 | 
			
		||||
| 
						 | 
				
			
			@ -348,15 +360,23 @@ class Flow(msgspec.Struct):  # , frozen=True):
 | 
			
		|||
    def read(
 | 
			
		||||
        self,
 | 
			
		||||
        array_field: Optional[str] = None,
 | 
			
		||||
        index_field: str = 'index',
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
            int, int, np.ndarray,
 | 
			
		||||
            int, int, np.ndarray,
 | 
			
		||||
    ]:
 | 
			
		||||
        # read call
 | 
			
		||||
        '''
 | 
			
		||||
        Read the underlying shm array buffer and
 | 
			
		||||
        return the data plus indexes for the first
 | 
			
		||||
        and last
 | 
			
		||||
        which has been written to.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        # readable data
 | 
			
		||||
        array = self.shm.array
 | 
			
		||||
 | 
			
		||||
        indexes = array['index']
 | 
			
		||||
        indexes = array[index_field]
 | 
			
		||||
        ifirst = indexes[0]
 | 
			
		||||
        ilast = indexes[-1]
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -43,6 +43,7 @@ from ..data._sharedmem import (
 | 
			
		|||
    try_read,
 | 
			
		||||
)
 | 
			
		||||
from ..data.feed import Flume
 | 
			
		||||
from ..data._source import Symbol
 | 
			
		||||
from ._chart import (
 | 
			
		||||
    ChartPlotWidget,
 | 
			
		||||
    LinkedSplits,
 | 
			
		||||
| 
						 | 
				
			
			@ -213,7 +214,7 @@ async def open_fsp_actor_cluster(
 | 
			
		|||
async def run_fsp_ui(
 | 
			
		||||
 | 
			
		||||
    linkedsplits: LinkedSplits,
 | 
			
		||||
    shm: ShmArray,
 | 
			
		||||
    flume: Flume,
 | 
			
		||||
    started: trio.Event,
 | 
			
		||||
    target: Fsp,
 | 
			
		||||
    conf: dict[str, dict],
 | 
			
		||||
| 
						 | 
				
			
			@ -250,9 +251,11 @@ async def run_fsp_ui(
 | 
			
		|||
            else:
 | 
			
		||||
                chart = linkedsplits.subplots[overlay_with]
 | 
			
		||||
 | 
			
		||||
            shm = flume.rt_shm
 | 
			
		||||
            chart.draw_curve(
 | 
			
		||||
                name=name,
 | 
			
		||||
                shm=shm,
 | 
			
		||||
                name,
 | 
			
		||||
                shm,
 | 
			
		||||
                flume,
 | 
			
		||||
                overlay=True,
 | 
			
		||||
                color='default_light',
 | 
			
		||||
                array_key=name,
 | 
			
		||||
| 
						 | 
				
			
			@ -262,8 +265,9 @@ async def run_fsp_ui(
 | 
			
		|||
        else:
 | 
			
		||||
            # create a new sub-chart widget for this fsp
 | 
			
		||||
            chart = linkedsplits.add_plot(
 | 
			
		||||
                name=name,
 | 
			
		||||
                shm=shm,
 | 
			
		||||
                name,
 | 
			
		||||
                shm,
 | 
			
		||||
                flume,
 | 
			
		||||
 | 
			
		||||
                array_key=name,
 | 
			
		||||
                sidepane=sidepane,
 | 
			
		||||
| 
						 | 
				
			
			@ -353,6 +357,9 @@ async def run_fsp_ui(
 | 
			
		|||
        #         last = time.time()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: maybe this should be our ``Flow`` type since it maps
 | 
			
		||||
# one flume to the next? The machinery for task/actor mgmt should
 | 
			
		||||
# be part of the instantiation API?
 | 
			
		||||
class FspAdmin:
 | 
			
		||||
    '''
 | 
			
		||||
    Client API for orchestrating FSP actors and displaying
 | 
			
		||||
| 
						 | 
				
			
			@ -376,6 +383,10 @@ class FspAdmin:
 | 
			
		|||
            tuple[tractor.MsgStream, ShmArray]
 | 
			
		||||
        ] = {}
 | 
			
		||||
        self._flow_registry: dict[_Token, str] = {}
 | 
			
		||||
 | 
			
		||||
        # TODO: make this a `.src_flume` and add
 | 
			
		||||
        # a `dst_flume`?
 | 
			
		||||
        # (=> but then wouldn't this be the most basic `Flow`?)
 | 
			
		||||
        self.flume = flume
 | 
			
		||||
 | 
			
		||||
    def rr_next_portal(self) -> tractor.Portal:
 | 
			
		||||
| 
						 | 
				
			
			@ -389,7 +400,7 @@ class FspAdmin:
 | 
			
		|||
        complete: trio.Event,
 | 
			
		||||
        started: trio.Event,
 | 
			
		||||
        fqsn: str,
 | 
			
		||||
        dst_shm: ShmArray,
 | 
			
		||||
        dst_fsp_flume: Flume,
 | 
			
		||||
        conf: dict,
 | 
			
		||||
        target: Fsp,
 | 
			
		||||
        loglevel: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -410,9 +421,10 @@ class FspAdmin:
 | 
			
		|||
                # data feed key
 | 
			
		||||
                fqsn=fqsn,
 | 
			
		||||
 | 
			
		||||
                # TODO: pass `Flume.to_msg()`s here?
 | 
			
		||||
                # mems
 | 
			
		||||
                src_shm_token=self.flume.rt_shm.token,
 | 
			
		||||
                dst_shm_token=dst_shm.token,
 | 
			
		||||
                dst_shm_token=dst_fsp_flume.rt_shm.token,
 | 
			
		||||
 | 
			
		||||
                # target
 | 
			
		||||
                ns_path=ns_path,
 | 
			
		||||
| 
						 | 
				
			
			@ -429,12 +441,14 @@ class FspAdmin:
 | 
			
		|||
            ctx.open_stream() as stream,
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
            dst_fsp_flume.stream: tractor.MsgStream = stream
 | 
			
		||||
 | 
			
		||||
            # register output data
 | 
			
		||||
            self._registry[
 | 
			
		||||
                (fqsn, ns_path)
 | 
			
		||||
            ] = (
 | 
			
		||||
                stream,
 | 
			
		||||
                dst_shm,
 | 
			
		||||
                dst_fsp_flume.rt_shm,
 | 
			
		||||
                complete
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -469,7 +483,7 @@ class FspAdmin:
 | 
			
		|||
        worker_name: Optional[str] = None,
 | 
			
		||||
        loglevel: str = 'info',
 | 
			
		||||
 | 
			
		||||
    ) -> (ShmArray, trio.Event):
 | 
			
		||||
    ) -> (Flume, trio.Event):
 | 
			
		||||
 | 
			
		||||
        fqsn = self.flume.symbol.fqsn
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -479,6 +493,26 @@ class FspAdmin:
 | 
			
		|||
            target=target,
 | 
			
		||||
            readonly=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        portal = self.cluster.get(worker_name) or self.rr_next_portal()
 | 
			
		||||
        provider_tag = portal.channel.uid
 | 
			
		||||
 | 
			
		||||
        symbol = Symbol(
 | 
			
		||||
            key=key,
 | 
			
		||||
            broker_info={
 | 
			
		||||
                provider_tag: {'asset_type': 'fsp'},
 | 
			
		||||
            },
 | 
			
		||||
        )
 | 
			
		||||
        dst_fsp_flume = Flume(
 | 
			
		||||
            symbol=symbol,
 | 
			
		||||
            _rt_shm_token=dst_shm.token,
 | 
			
		||||
            first_quote={},
 | 
			
		||||
 | 
			
		||||
            # set to 0 presuming for now that we can't load
 | 
			
		||||
            # FSP history (though we should eventually).
 | 
			
		||||
            izero_hist=0,
 | 
			
		||||
            izero_rt=0,
 | 
			
		||||
        )
 | 
			
		||||
        self._flow_registry[(
 | 
			
		||||
            self.flume.rt_shm._token,
 | 
			
		||||
            target.name
 | 
			
		||||
| 
						 | 
				
			
			@ -489,7 +523,6 @@ class FspAdmin:
 | 
			
		|||
        #         f'Already started FSP `{fqsn}:{func_name}`'
 | 
			
		||||
        #     )
 | 
			
		||||
 | 
			
		||||
        portal = self.cluster.get(worker_name) or self.rr_next_portal()
 | 
			
		||||
        complete = trio.Event()
 | 
			
		||||
        started = trio.Event()
 | 
			
		||||
        self.tn.start_soon(
 | 
			
		||||
| 
						 | 
				
			
			@ -498,13 +531,13 @@ class FspAdmin:
 | 
			
		|||
            complete,
 | 
			
		||||
            started,
 | 
			
		||||
            fqsn,
 | 
			
		||||
            dst_shm,
 | 
			
		||||
            dst_fsp_flume,
 | 
			
		||||
            conf,
 | 
			
		||||
            target,
 | 
			
		||||
            loglevel,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return dst_shm, started
 | 
			
		||||
        return dst_fsp_flume, started
 | 
			
		||||
 | 
			
		||||
    async def open_fsp_chart(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			@ -516,7 +549,7 @@ class FspAdmin:
 | 
			
		|||
 | 
			
		||||
    ) -> (trio.Event, ChartPlotWidget):
 | 
			
		||||
 | 
			
		||||
        shm, started = await self.start_engine_task(
 | 
			
		||||
        flume, started = await self.start_engine_task(
 | 
			
		||||
            target,
 | 
			
		||||
            conf,
 | 
			
		||||
            loglevel,
 | 
			
		||||
| 
						 | 
				
			
			@ -528,7 +561,7 @@ class FspAdmin:
 | 
			
		|||
                run_fsp_ui,
 | 
			
		||||
 | 
			
		||||
                self.linked,
 | 
			
		||||
                shm,
 | 
			
		||||
                flume,
 | 
			
		||||
                started,
 | 
			
		||||
                target,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -636,6 +669,7 @@ async def open_vlm_displays(
 | 
			
		|||
        chart = linked.add_plot(
 | 
			
		||||
            name='volume',
 | 
			
		||||
            shm=shm,
 | 
			
		||||
            flume=flume,
 | 
			
		||||
 | 
			
		||||
            array_key='volume',
 | 
			
		||||
            sidepane=sidepane,
 | 
			
		||||
| 
						 | 
				
			
			@ -715,7 +749,7 @@ async def open_vlm_displays(
 | 
			
		|||
 | 
			
		||||
            tasks_ready = []
 | 
			
		||||
            # spawn and overlay $ vlm on the same subchart
 | 
			
		||||
            dvlm_shm, started = await admin.start_engine_task(
 | 
			
		||||
            dvlm_flume, started = await admin.start_engine_task(
 | 
			
		||||
                dolla_vlm,
 | 
			
		||||
 | 
			
		||||
                {  # fsp engine conf
 | 
			
		||||
| 
						 | 
				
			
			@ -812,9 +846,13 @@ async def open_vlm_displays(
 | 
			
		|||
                    else:
 | 
			
		||||
                        color = 'bracket'
 | 
			
		||||
 | 
			
		||||
                    curve, _ = chart.draw_curve(
 | 
			
		||||
                        name=name,
 | 
			
		||||
                        shm=shm,
 | 
			
		||||
                    assert isinstance(shm, ShmArray)
 | 
			
		||||
                    assert isinstance(flume, Flume)
 | 
			
		||||
 | 
			
		||||
                    flow = chart.draw_curve(
 | 
			
		||||
                        name,
 | 
			
		||||
                        shm,
 | 
			
		||||
                        flume,
 | 
			
		||||
                        array_key=name,
 | 
			
		||||
                        overlay=pi,
 | 
			
		||||
                        color=color,
 | 
			
		||||
| 
						 | 
				
			
			@ -827,20 +865,20 @@ async def open_vlm_displays(
 | 
			
		|||
                    # specially store ref to shm for lookup in display loop
 | 
			
		||||
                    # since only a placeholder of `None` is entered in
 | 
			
		||||
                    # ``.draw_curve()``.
 | 
			
		||||
                    flow = chart._flows[name]
 | 
			
		||||
                    # flow = chart._flows[name]
 | 
			
		||||
                    assert flow.plot is pi
 | 
			
		||||
 | 
			
		||||
            chart_curves(
 | 
			
		||||
                fields,
 | 
			
		||||
                dvlm_pi,
 | 
			
		||||
                dvlm_shm,
 | 
			
		||||
                dvlm_flume.rt_shm,
 | 
			
		||||
                step_mode=True,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
 | 
			
		||||
            # up since this one depends on it.
 | 
			
		||||
 | 
			
		||||
            fr_shm, started = await admin.start_engine_task(
 | 
			
		||||
            fr_flume, started = await admin.start_engine_task(
 | 
			
		||||
                flow_rates,
 | 
			
		||||
                {  # fsp engine conf
 | 
			
		||||
                    'func_name': 'flow_rates',
 | 
			
		||||
| 
						 | 
				
			
			@ -853,7 +891,7 @@ async def open_vlm_displays(
 | 
			
		|||
            # chart_curves(
 | 
			
		||||
            #     dvlm_rate_fields,
 | 
			
		||||
            #     dvlm_pi,
 | 
			
		||||
            #     fr_shm,
 | 
			
		||||
            #     fr_flume.rt_shm,
 | 
			
		||||
            # )
 | 
			
		||||
 | 
			
		||||
            # TODO: is there a way to "sync" the dual axes such that only
 | 
			
		||||
| 
						 | 
				
			
			@ -901,7 +939,7 @@ async def open_vlm_displays(
 | 
			
		|||
            chart_curves(
 | 
			
		||||
                trade_rate_fields,
 | 
			
		||||
                tr_pi,
 | 
			
		||||
                fr_shm,
 | 
			
		||||
                fr_flume.rt_shm,
 | 
			
		||||
                # step_mode=True,
 | 
			
		||||
 | 
			
		||||
                # dashed line to represent "individual trades" being
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue