From ae1773d6e5a89315375c4453765df67b19ba09ab Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Nov 2022 14:48:30 -0500 Subject: [PATCH] Pass `Flume`s throughout FSP control APIs FSP management is much better suited to accessing to the new higher level `Flume` indexing apis; this adjusts some func sigs to pass through (and/or create) flume instances, particularly `.ui._fsp.open_fsp_admin()` and `FspAdmin.open_fsp_ui()` related methods. Now we wrap the destination fsp shm in a flume on the admin (aka normally UI/client) side and is returned from `.start_engine_method()`. --- piker/data/feed.py | 5 ++- piker/ui/_flows.py | 40 ++++++++++++++++------ piker/ui/_fsp.py | 84 +++++++++++++++++++++++++++++++++------------- 3 files changed, 93 insertions(+), 36 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 534aebc9..d91b890e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1037,12 +1037,11 @@ async def allocate_persistent_feed( flume = Flume( symbol=symbol, - _hist_shm_token=hist_shm.token, - _rt_shm_token=rt_shm.token, first_quote=first_quote, + _rt_shm_token=rt_shm.token, + _hist_shm_token=hist_shm.token, izero_hist=izero_hist, izero_rt=izero_rt, - # throttle_rate=tick_throttle, ) # for ambiguous names we simply apply the retreived diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index a2908905..2e04bb37 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF from ..data._sharedmem import ( ShmArray, ) +from ..data.feed import Flume from .._profile import ( pg_profile_enabled, # ms_slower_then, @@ -208,13 +209,16 @@ class Flow(msgspec.Struct): # , frozen=True): ''' name: str plot: pg.PlotItem - graphics: Curve | BarItems _shm: ShmArray + flume: Flume + graphics: Curve | BarItems + + # for tracking y-mn/mx for y-axis auto-ranging yrange: tuple[float, float] = None # in some cases a flow may want to change its - # graphical "type" or, "form" when downsampling, - # normally this is just a plain line. + # graphical "type" or, "form" when downsampling, to + # start this is only ever an interpolation line. ds_graphics: Optional[Curve] = None is_ohlc: bool = False @@ -249,9 +253,9 @@ class Flow(msgspec.Struct): # , frozen=True): # TODO: remove this and only allow setting through # private ``._shm`` attr? - @shm.setter - def shm(self, shm: ShmArray) -> ShmArray: - self._shm = shm + # @shm.setter + # def shm(self, shm: ShmArray) -> ShmArray: + # self._shm = shm def maxmin( self, @@ -318,9 +322,15 @@ class Flow(msgspec.Struct): # , frozen=True): ''' vr = self.plot.viewRect() - return int(vr.left()), int(vr.right()) + return ( + vr.left(), + vr.right(), + ) - def datums_range(self) -> tuple[ + def datums_range( + self, + index_field: str = 'index', + ) -> tuple[ int, int, int, int, int, int ]: ''' @@ -328,6 +338,8 @@ class Flow(msgspec.Struct): # , frozen=True): ''' l, r = self.view_range() + l = round(l) + r = round(r) # TODO: avoid this and have shm passed # in earlier. @@ -348,15 +360,23 @@ class Flow(msgspec.Struct): # , frozen=True): def read( self, array_field: Optional[str] = None, + index_field: str = 'index', ) -> tuple[ int, int, np.ndarray, int, int, np.ndarray, ]: - # read call + ''' + Read the underlying shm array buffer and + return the data plus indexes for the first + and last + which has been written to. + + ''' + # readable data array = self.shm.array - indexes = array['index'] + indexes = array[index_field] ifirst = indexes[0] ilast = indexes[-1] diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 370eeb02..29162635 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -43,6 +43,7 @@ from ..data._sharedmem import ( try_read, ) from ..data.feed import Flume +from ..data._source import Symbol from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -213,7 +214,7 @@ async def open_fsp_actor_cluster( async def run_fsp_ui( linkedsplits: LinkedSplits, - shm: ShmArray, + flume: Flume, started: trio.Event, target: Fsp, conf: dict[str, dict], @@ -250,9 +251,11 @@ async def run_fsp_ui( else: chart = linkedsplits.subplots[overlay_with] + shm = flume.rt_shm chart.draw_curve( - name=name, - shm=shm, + name, + shm, + flume, overlay=True, color='default_light', array_key=name, @@ -262,8 +265,9 @@ async def run_fsp_ui( else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( - name=name, - shm=shm, + name, + shm, + flume, array_key=name, sidepane=sidepane, @@ -353,6 +357,9 @@ async def run_fsp_ui( # last = time.time() +# TODO: maybe this should be our ``Flow`` type since it maps +# one flume to the next? The machinery for task/actor mgmt should +# be part of the instantiation API? class FspAdmin: ''' Client API for orchestrating FSP actors and displaying @@ -376,6 +383,10 @@ class FspAdmin: tuple[tractor.MsgStream, ShmArray] ] = {} self._flow_registry: dict[_Token, str] = {} + + # TODO: make this a `.src_flume` and add + # a `dst_flume`? + # (=> but then wouldn't this be the most basic `Flow`?) self.flume = flume def rr_next_portal(self) -> tractor.Portal: @@ -389,7 +400,7 @@ class FspAdmin: complete: trio.Event, started: trio.Event, fqsn: str, - dst_shm: ShmArray, + dst_fsp_flume: Flume, conf: dict, target: Fsp, loglevel: str, @@ -410,9 +421,10 @@ class FspAdmin: # data feed key fqsn=fqsn, + # TODO: pass `Flume.to_msg()`s here? # mems src_shm_token=self.flume.rt_shm.token, - dst_shm_token=dst_shm.token, + dst_shm_token=dst_fsp_flume.rt_shm.token, # target ns_path=ns_path, @@ -429,12 +441,14 @@ class FspAdmin: ctx.open_stream() as stream, ): + dst_fsp_flume.stream: tractor.MsgStream = stream + # register output data self._registry[ (fqsn, ns_path) ] = ( stream, - dst_shm, + dst_fsp_flume.rt_shm, complete ) @@ -469,7 +483,7 @@ class FspAdmin: worker_name: Optional[str] = None, loglevel: str = 'info', - ) -> (ShmArray, trio.Event): + ) -> (Flume, trio.Event): fqsn = self.flume.symbol.fqsn @@ -479,6 +493,26 @@ class FspAdmin: target=target, readonly=True, ) + + portal = self.cluster.get(worker_name) or self.rr_next_portal() + provider_tag = portal.channel.uid + + symbol = Symbol( + key=key, + broker_info={ + provider_tag: {'asset_type': 'fsp'}, + }, + ) + dst_fsp_flume = Flume( + symbol=symbol, + _rt_shm_token=dst_shm.token, + first_quote={}, + + # set to 0 presuming for now that we can't load + # FSP history (though we should eventually). + izero_hist=0, + izero_rt=0, + ) self._flow_registry[( self.flume.rt_shm._token, target.name @@ -489,7 +523,6 @@ class FspAdmin: # f'Already started FSP `{fqsn}:{func_name}`' # ) - portal = self.cluster.get(worker_name) or self.rr_next_portal() complete = trio.Event() started = trio.Event() self.tn.start_soon( @@ -498,13 +531,13 @@ class FspAdmin: complete, started, fqsn, - dst_shm, + dst_fsp_flume, conf, target, loglevel, ) - return dst_shm, started + return dst_fsp_flume, started async def open_fsp_chart( self, @@ -516,7 +549,7 @@ class FspAdmin: ) -> (trio.Event, ChartPlotWidget): - shm, started = await self.start_engine_task( + flume, started = await self.start_engine_task( target, conf, loglevel, @@ -528,7 +561,7 @@ class FspAdmin: run_fsp_ui, self.linked, - shm, + flume, started, target, @@ -636,6 +669,7 @@ async def open_vlm_displays( chart = linked.add_plot( name='volume', shm=shm, + flume=flume, array_key='volume', sidepane=sidepane, @@ -715,7 +749,7 @@ async def open_vlm_displays( tasks_ready = [] # spawn and overlay $ vlm on the same subchart - dvlm_shm, started = await admin.start_engine_task( + dvlm_flume, started = await admin.start_engine_task( dolla_vlm, { # fsp engine conf @@ -812,9 +846,13 @@ async def open_vlm_displays( else: color = 'bracket' - curve, _ = chart.draw_curve( - name=name, - shm=shm, + assert isinstance(shm, ShmArray) + assert isinstance(flume, Flume) + + flow = chart.draw_curve( + name, + shm, + flume, array_key=name, overlay=pi, color=color, @@ -827,20 +865,20 @@ async def open_vlm_displays( # specially store ref to shm for lookup in display loop # since only a placeholder of `None` is entered in # ``.draw_curve()``. - flow = chart._flows[name] + # flow = chart._flows[name] assert flow.plot is pi chart_curves( fields, dvlm_pi, - dvlm_shm, + dvlm_flume.rt_shm, step_mode=True, ) # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # up since this one depends on it. - fr_shm, started = await admin.start_engine_task( + fr_flume, started = await admin.start_engine_task( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', @@ -853,7 +891,7 @@ async def open_vlm_displays( # chart_curves( # dvlm_rate_fields, # dvlm_pi, - # fr_shm, + # fr_flume.rt_shm, # ) # TODO: is there a way to "sync" the dual axes such that only @@ -901,7 +939,7 @@ async def open_vlm_displays( chart_curves( trade_rate_fields, tr_pi, - fr_shm, + fr_flume.rt_shm, # step_mode=True, # dashed line to represent "individual trades" being