diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 466ef0e7..10dc43f6 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -142,11 +142,17 @@ async def broadcast( shm: Optional[ShmArray] = None, ) -> None: - # broadcast the buffer index step to any subscribers for - # a given sample period. + ''' + Broadcast the given ``shm: ShmArray``'s buffer index step to any + subscribers for a given sample period. + + The sent msg will include the first and last index which slice into + the buffer's non-empty data. + + ''' subs = sampler.subscribers.get(delay_s, ()) - last = -1 + first = last = -1 if shm is None: periods = sampler.ohlcv_shms.keys() @@ -156,11 +162,16 @@ async def broadcast( if periods: lowest = min(periods) shm = sampler.ohlcv_shms[lowest][0] + first = shm._first.value last = shm._last.value for stream in subs: try: - await stream.send({'index': last}) + await stream.send({ + 'first': first, + 'last': last, + 'index': last, + }) except ( trio.BrokenResourceError, trio.ClosedResourceError diff --git a/piker/data/feed.py b/piker/data/feed.py index 605349e9..848fcc10 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -795,6 +795,15 @@ async def manage_history( # manually trigger step update to update charts/fsps # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. for delay_s in sampler.subscribers: await broadcast(delay_s) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 0776c7a2..cf45c40e 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -369,7 +369,12 @@ async def cascade( # always trigger UI refresh after history update, # see ``piker.ui._fsp.FspAdmin.open_chain()`` and # ``piker.ui._display.trigger_update()``. - await client_stream.send('update') + await client_stream.send({ + 'fsp_update': { + 'key': dst_shm_token, + 'first': dst._first.value, + 'last': dst._last.value, + }}) return tracker, index def is_synced( diff --git a/piker/ui/_display.py b/piker/ui/_display.py index aa7761db..d0654b10 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -309,6 +309,7 @@ def graphics_update_cycle( ds: DisplayState, wap_in_history: bool = False, trigger_all: bool = False, # flag used by prepend history updates + prepend_update_index: Optional[int] = None, ) -> None: # TODO: eventually optimize this whole graphics stack with ``numba`` @@ -368,6 +369,17 @@ def graphics_update_cycle( profiler('maxmin call') liv = r >= i_step # the last datum is in view + if ( + prepend_update_index is not None + and lbar > prepend_update_index + ): + # on a history update (usually from the FSP subsys) + # if the segment of history that is being prepended + # isn't in view there is no reason to do a graphics + # update. + log.debug('Skipping prepend graphics cycle: frame not in view') + return + # don't real-time "shift" the curve to the # left unless we get one of the following: if ( @@ -639,7 +651,7 @@ async def display_symbol_data( ) # historical data fetch - brokermod = brokers.get_brokermod(provider) + # brokermod = brokers.get_brokermod(provider) # ohlc_status_done = sbar.open_status( # 'retreiving OHLC history.. ', diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 5ed85d9b..3d90f014 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -435,12 +435,16 @@ class FspAdmin: # wait for graceful shutdown signal async with stream.subscribe() as stream: async for msg in stream: - if msg == 'update': + info = msg.get('fsp_update') + if info: # if the chart isn't hidden try to update # the data on screen. if not self.linked.isHidden(): log.info(f'Re-syncing graphics for fsp: {ns_path}') - self.linked.graphics_cycle(trigger_all=True) + self.linked.graphics_cycle( + trigger_all=True, + prepend_update_index=info['first'], + ) else: log.info(f'recved unexpected fsp engine msg: {msg}')