diff --git a/piker/data/feed.py b/piker/data/feed.py index 534aebc9..d91b890e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1037,12 +1037,11 @@ async def allocate_persistent_feed( flume = Flume( symbol=symbol, - _hist_shm_token=hist_shm.token, - _rt_shm_token=rt_shm.token, first_quote=first_quote, + _rt_shm_token=rt_shm.token, + _hist_shm_token=hist_shm.token, izero_hist=izero_hist, izero_rt=izero_rt, - # throttle_rate=tick_throttle, ) # for ambiguous names we simply apply the retreived diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 8c1169b8..bfe1c110 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -38,7 +38,6 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QSplitter, ) -import numpy as np import pyqtgraph as pg import trio @@ -63,7 +62,10 @@ from ._style import ( _xaxis_at, _min_points_to_show, ) -from ..data.feed import Feed +from ..data.feed import ( + Feed, + Flume, +) from ..data._source import Symbol from ..log import get_logger from ._interaction import ChartView @@ -538,6 +540,7 @@ class LinkedSplits(QWidget): symbol: Symbol, shm: ShmArray, + flume: Flume, sidepane: FieldsForm, style: str = 'ohlc_bar', @@ -562,6 +565,7 @@ class LinkedSplits(QWidget): self.chart = self.add_plot( name=symbol.fqsn, shm=shm, + flume=flume, style=style, _is_main=True, sidepane=sidepane, @@ -582,6 +586,7 @@ class LinkedSplits(QWidget): name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, style: str = 'line', @@ -705,9 +710,11 @@ class LinkedSplits(QWidget): # draw curve graphics if style == 'ohlc_bar': - graphics, data_key = cpw.draw_ohlc( + # graphics, data_key = cpw.draw_ohlc( + flow = cpw.draw_ohlc( name, shm, + flume=flume, array_key=array_key ) self.cursor.contents_labels.add_label( @@ -719,18 +726,22 @@ class LinkedSplits(QWidget): elif style == 'line': add_label = True - graphics, data_key = cpw.draw_curve( + # graphics, data_key = cpw.draw_curve( + flow = cpw.draw_curve( name, shm, + flume, array_key=array_key, color='default_light', ) elif style == 'step': add_label = True - graphics, data_key = cpw.draw_curve( + # graphics, data_key = cpw.draw_curve( + flow = cpw.draw_curve( name, shm, + flume, array_key=array_key, step_mode=True, color='davies', @@ -740,6 +751,9 @@ class LinkedSplits(QWidget): else: raise ValueError(f"Chart style {style} is currently unsupported") + graphics = flow.graphics + data_key = flow.name + if _is_main: assert style == 'ohlc_bar', 'main chart must be OHLC' else: @@ -937,12 +951,6 @@ class ChartPlotWidget(pg.PlotWidget): def focus(self) -> None: self.view.setFocus() - def last_bar_in_view(self) -> int: - self._arrays[self.name][-1]['index'] - - def is_valid_index(self, index: int) -> bool: - return index >= 0 and index < self._arrays[self.name][-1]['index'] - def _set_xlimits( self, xfirst: int, @@ -1035,9 +1043,14 @@ class ChartPlotWidget(pg.PlotWidget): log.warning(f'`Flow` for {self.name} not loaded yet?') return - index = flow.shm.array['index'] + arr = flow.shm.array + index = arr['index'] + # times = arr['time'] + + # these will be epoch time floats xfirst, xlast = index[0], index[-1] l, lbar, rbar, r = self.bars_range() + view = self.view if ( @@ -1194,6 +1207,7 @@ class ChartPlotWidget(pg.PlotWidget): name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, overlay: bool = False, @@ -1206,10 +1220,7 @@ class ChartPlotWidget(pg.PlotWidget): **graphics_kwargs, - ) -> tuple[ - pg.GraphicsObject, - str, - ]: + ) -> Flow: ''' Draw a "curve" (line plot graphics) for the provided data in the input shm array ``shm``. @@ -1243,14 +1254,17 @@ class ChartPlotWidget(pg.PlotWidget): **graphics_kwargs, ) - self._flows[data_key] = Flow( - name=name, - plot=pi, - _shm=shm, + flow = self._flows[data_key] = Flow( + data_key, + pi, + shm, + flume, + is_ohlc=is_ohlc, # register curve graphics with this flow graphics=graphics, ) + assert isinstance(flow.shm, ShmArray) # TODO: this probably needs its own method? if overlay: @@ -1307,24 +1321,26 @@ class ChartPlotWidget(pg.PlotWidget): # understand. pi.addItem(graphics) - return graphics, data_key + return flow def draw_ohlc( self, name: str, shm: ShmArray, + flume: Flume, array_key: Optional[str] = None, **draw_curve_kwargs, - ) -> (pg.GraphicsObject, str): + ) -> Flow: ''' Draw OHLC datums to chart. ''' return self.draw_curve( - name=name, - shm=shm, + name, + shm, + flume, array_key=array_key, is_ohlc=True, **draw_curve_kwargs, @@ -1389,37 +1405,6 @@ class ChartPlotWidget(pg.PlotWidget): self.sig_mouse_leave.emit(self) self.scene().leaveEvent(ev) - def get_index(self, time: float) -> int: - - # TODO: this should go onto some sort of - # data-view thinger..right? - ohlc = self._flows[self.name].shm.array - - # XXX: not sure why the time is so off here - # looks like we're gonna have to do some fixing.. - indexes = ohlc['time'] >= time - - if any(indexes): - return ohlc['index'][indexes][-1] - else: - return ohlc['index'][-1] - - def in_view( - self, - array: np.ndarray, - - ) -> np.ndarray: - ''' - Slice an input struct array providing only datums - "in view" of this chart. - - ''' - l, lbar, rbar, r = self.bars_range() - ifirst = array[0]['index'] - # slice data by offset from the first index - # available in the passed datum set. - return array[lbar - ifirst:(rbar - ifirst) + 1] - def maxmin( self, name: Optional[str] = None, diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 743460b9..a9bb5406 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -1191,6 +1191,7 @@ async def display_symbol_data( hist_chart = hist_linked.plot_ohlc_main( symbol, hist_ohlcv, + flume, # in the case of history chart we explicitly set `False` # to avoid internal pane creation. # sidepane=False, @@ -1204,6 +1205,7 @@ async def display_symbol_data( rt_chart = rt_linked.plot_ohlc_main( symbol, ohlcv, + flume, # in the case of history chart we explicitly set `False` # to avoid internal pane creation. sidepane=pp_pane, @@ -1275,9 +1277,10 @@ async def display_symbol_data( hist_pi.hideAxis('left') hist_pi.hideAxis('bottom') - curve, _ = hist_chart.draw_curve( - name=fqsn, - shm=hist_ohlcv, + flow = hist_chart.draw_curve( + fqsn, + hist_ohlcv, + flume, array_key=fqsn, overlay=hist_pi, pi=hist_pi, @@ -1307,9 +1310,10 @@ async def display_symbol_data( rt_pi.hideAxis('left') rt_pi.hideAxis('bottom') - curve, _ = rt_chart.draw_curve( - name=fqsn, - shm=ohlcv, + flow = rt_chart.draw_curve( + fqsn, + ohlcv, + flume, array_key=fqsn, overlay=rt_pi, pi=rt_pi, diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index a2908905..2e04bb37 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF from ..data._sharedmem import ( ShmArray, ) +from ..data.feed import Flume from .._profile import ( pg_profile_enabled, # ms_slower_then, @@ -208,13 +209,16 @@ class Flow(msgspec.Struct): # , frozen=True): ''' name: str plot: pg.PlotItem - graphics: Curve | BarItems _shm: ShmArray + flume: Flume + graphics: Curve | BarItems + + # for tracking y-mn/mx for y-axis auto-ranging yrange: tuple[float, float] = None # in some cases a flow may want to change its - # graphical "type" or, "form" when downsampling, - # normally this is just a plain line. + # graphical "type" or, "form" when downsampling, to + # start this is only ever an interpolation line. ds_graphics: Optional[Curve] = None is_ohlc: bool = False @@ -249,9 +253,9 @@ class Flow(msgspec.Struct): # , frozen=True): # TODO: remove this and only allow setting through # private ``._shm`` attr? - @shm.setter - def shm(self, shm: ShmArray) -> ShmArray: - self._shm = shm + # @shm.setter + # def shm(self, shm: ShmArray) -> ShmArray: + # self._shm = shm def maxmin( self, @@ -318,9 +322,15 @@ class Flow(msgspec.Struct): # , frozen=True): ''' vr = self.plot.viewRect() - return int(vr.left()), int(vr.right()) + return ( + vr.left(), + vr.right(), + ) - def datums_range(self) -> tuple[ + def datums_range( + self, + index_field: str = 'index', + ) -> tuple[ int, int, int, int, int, int ]: ''' @@ -328,6 +338,8 @@ class Flow(msgspec.Struct): # , frozen=True): ''' l, r = self.view_range() + l = round(l) + r = round(r) # TODO: avoid this and have shm passed # in earlier. @@ -348,15 +360,23 @@ class Flow(msgspec.Struct): # , frozen=True): def read( self, array_field: Optional[str] = None, + index_field: str = 'index', ) -> tuple[ int, int, np.ndarray, int, int, np.ndarray, ]: - # read call + ''' + Read the underlying shm array buffer and + return the data plus indexes for the first + and last + which has been written to. + + ''' + # readable data array = self.shm.array - indexes = array['index'] + indexes = array[index_field] ifirst = indexes[0] ilast = indexes[-1] diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 370eeb02..29162635 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -43,6 +43,7 @@ from ..data._sharedmem import ( try_read, ) from ..data.feed import Flume +from ..data._source import Symbol from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -213,7 +214,7 @@ async def open_fsp_actor_cluster( async def run_fsp_ui( linkedsplits: LinkedSplits, - shm: ShmArray, + flume: Flume, started: trio.Event, target: Fsp, conf: dict[str, dict], @@ -250,9 +251,11 @@ async def run_fsp_ui( else: chart = linkedsplits.subplots[overlay_with] + shm = flume.rt_shm chart.draw_curve( - name=name, - shm=shm, + name, + shm, + flume, overlay=True, color='default_light', array_key=name, @@ -262,8 +265,9 @@ async def run_fsp_ui( else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( - name=name, - shm=shm, + name, + shm, + flume, array_key=name, sidepane=sidepane, @@ -353,6 +357,9 @@ async def run_fsp_ui( # last = time.time() +# TODO: maybe this should be our ``Flow`` type since it maps +# one flume to the next? The machinery for task/actor mgmt should +# be part of the instantiation API? class FspAdmin: ''' Client API for orchestrating FSP actors and displaying @@ -376,6 +383,10 @@ class FspAdmin: tuple[tractor.MsgStream, ShmArray] ] = {} self._flow_registry: dict[_Token, str] = {} + + # TODO: make this a `.src_flume` and add + # a `dst_flume`? + # (=> but then wouldn't this be the most basic `Flow`?) self.flume = flume def rr_next_portal(self) -> tractor.Portal: @@ -389,7 +400,7 @@ class FspAdmin: complete: trio.Event, started: trio.Event, fqsn: str, - dst_shm: ShmArray, + dst_fsp_flume: Flume, conf: dict, target: Fsp, loglevel: str, @@ -410,9 +421,10 @@ class FspAdmin: # data feed key fqsn=fqsn, + # TODO: pass `Flume.to_msg()`s here? # mems src_shm_token=self.flume.rt_shm.token, - dst_shm_token=dst_shm.token, + dst_shm_token=dst_fsp_flume.rt_shm.token, # target ns_path=ns_path, @@ -429,12 +441,14 @@ class FspAdmin: ctx.open_stream() as stream, ): + dst_fsp_flume.stream: tractor.MsgStream = stream + # register output data self._registry[ (fqsn, ns_path) ] = ( stream, - dst_shm, + dst_fsp_flume.rt_shm, complete ) @@ -469,7 +483,7 @@ class FspAdmin: worker_name: Optional[str] = None, loglevel: str = 'info', - ) -> (ShmArray, trio.Event): + ) -> (Flume, trio.Event): fqsn = self.flume.symbol.fqsn @@ -479,6 +493,26 @@ class FspAdmin: target=target, readonly=True, ) + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + provider_tag = portal.channel.uid + + symbol = Symbol( + key=key, + broker_info={ + provider_tag: {'asset_type': 'fsp'}, + }, + ) + dst_fsp_flume = Flume( + symbol=symbol, + _rt_shm_token=dst_shm.token, + first_quote={}, + + # set to 0 presuming for now that we can't load + # FSP history (though we should eventually). + izero_hist=0, + izero_rt=0, + ) self._flow_registry[( self.flume.rt_shm._token, target.name @@ -489,7 +523,6 @@ class FspAdmin: # f'Already started FSP `{fqsn}:{func_name}`' # ) - portal = self.cluster.get(worker_name) or self.rr_next_portal() complete = trio.Event() started = trio.Event() self.tn.start_soon( @@ -498,13 +531,13 @@ class FspAdmin: complete, started, fqsn, - dst_shm, + dst_fsp_flume, conf, target, loglevel, ) - return dst_shm, started + return dst_fsp_flume, started async def open_fsp_chart( self, @@ -516,7 +549,7 @@ class FspAdmin: ) -> (trio.Event, ChartPlotWidget): - shm, started = await self.start_engine_task( + flume, started = await self.start_engine_task( target, conf, loglevel, @@ -528,7 +561,7 @@ class FspAdmin: run_fsp_ui, self.linked, - shm, + flume, started, target, @@ -636,6 +669,7 @@ async def open_vlm_displays( chart = linked.add_plot( name='volume', shm=shm, + flume=flume, array_key='volume', sidepane=sidepane, @@ -715,7 +749,7 @@ async def open_vlm_displays( tasks_ready = [] # spawn and overlay $ vlm on the same subchart - dvlm_shm, started = await admin.start_engine_task( + dvlm_flume, started = await admin.start_engine_task( dolla_vlm, { # fsp engine conf @@ -812,9 +846,13 @@ async def open_vlm_displays( else: color = 'bracket' - curve, _ = chart.draw_curve( - name=name, - shm=shm, + assert isinstance(shm, ShmArray) + assert isinstance(flume, Flume) + + flow = chart.draw_curve( + name, + shm, + flume, array_key=name, overlay=pi, color=color, @@ -827,20 +865,20 @@ async def open_vlm_displays( # specially store ref to shm for lookup in display loop # since only a placeholder of `None` is entered in # ``.draw_curve()``. - flow = chart._flows[name] + # flow = chart._flows[name] assert flow.plot is pi chart_curves( fields, dvlm_pi, - dvlm_shm, + dvlm_flume.rt_shm, step_mode=True, ) # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # up since this one depends on it. - fr_shm, started = await admin.start_engine_task( + fr_flume, started = await admin.start_engine_task( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', @@ -853,7 +891,7 @@ async def open_vlm_displays( # chart_curves( # dvlm_rate_fields, # dvlm_pi, - # fr_shm, + # fr_flume.rt_shm, # ) # TODO: is there a way to "sync" the dual axes such that only @@ -901,7 +939,7 @@ async def open_vlm_displays( chart_curves( trade_rate_fields, tr_pi, - fr_shm, + fr_flume.rt_shm, # step_mode=True, # dashed line to represent "individual trades" being