Only udpate prepended graphics when actually in view
parent
1fcb9233b4
commit
4f36743f64
|
@ -142,11 +142,17 @@ async def broadcast(
|
||||||
shm: Optional[ShmArray] = None,
|
shm: Optional[ShmArray] = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# broadcast the buffer index step to any subscribers for
|
'''
|
||||||
# a given sample period.
|
Broadcast the given ``shm: ShmArray``'s buffer index step to any
|
||||||
|
subscribers for a given sample period.
|
||||||
|
|
||||||
|
The sent msg will include the first and last index which slice into
|
||||||
|
the buffer's non-empty data.
|
||||||
|
|
||||||
|
'''
|
||||||
subs = sampler.subscribers.get(delay_s, ())
|
subs = sampler.subscribers.get(delay_s, ())
|
||||||
|
|
||||||
last = -1
|
first = last = -1
|
||||||
|
|
||||||
if shm is None:
|
if shm is None:
|
||||||
periods = sampler.ohlcv_shms.keys()
|
periods = sampler.ohlcv_shms.keys()
|
||||||
|
@ -156,11 +162,16 @@ async def broadcast(
|
||||||
if periods:
|
if periods:
|
||||||
lowest = min(periods)
|
lowest = min(periods)
|
||||||
shm = sampler.ohlcv_shms[lowest][0]
|
shm = sampler.ohlcv_shms[lowest][0]
|
||||||
|
first = shm._first.value
|
||||||
last = shm._last.value
|
last = shm._last.value
|
||||||
|
|
||||||
for stream in subs:
|
for stream in subs:
|
||||||
try:
|
try:
|
||||||
await stream.send({'index': last})
|
await stream.send({
|
||||||
|
'first': first,
|
||||||
|
'last': last,
|
||||||
|
'index': last,
|
||||||
|
})
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError
|
trio.ClosedResourceError
|
||||||
|
|
|
@ -795,6 +795,15 @@ async def manage_history(
|
||||||
|
|
||||||
# manually trigger step update to update charts/fsps
|
# manually trigger step update to update charts/fsps
|
||||||
# which need an incremental update.
|
# which need an incremental update.
|
||||||
|
# NOTE: the way this works is super duper
|
||||||
|
# un-intuitive right now:
|
||||||
|
# - the broadcaster fires a msg to the fsp subsystem.
|
||||||
|
# - fsp subsys then checks for a sample step diff and
|
||||||
|
# possibly recomputes prepended history.
|
||||||
|
# - the fsp then sends back to the parent actor
|
||||||
|
# (usually a chart showing graphics for said fsp)
|
||||||
|
# which tells the chart to conduct a manual full
|
||||||
|
# graphics loop cycle.
|
||||||
for delay_s in sampler.subscribers:
|
for delay_s in sampler.subscribers:
|
||||||
await broadcast(delay_s)
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
|
|
@ -369,7 +369,12 @@ async def cascade(
|
||||||
# always trigger UI refresh after history update,
|
# always trigger UI refresh after history update,
|
||||||
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
||||||
# ``piker.ui._display.trigger_update()``.
|
# ``piker.ui._display.trigger_update()``.
|
||||||
await client_stream.send('update')
|
await client_stream.send({
|
||||||
|
'fsp_update': {
|
||||||
|
'key': dst_shm_token,
|
||||||
|
'first': dst._first.value,
|
||||||
|
'last': dst._last.value,
|
||||||
|
}})
|
||||||
return tracker, index
|
return tracker, index
|
||||||
|
|
||||||
def is_synced(
|
def is_synced(
|
||||||
|
|
|
@ -309,6 +309,7 @@ def graphics_update_cycle(
|
||||||
ds: DisplayState,
|
ds: DisplayState,
|
||||||
wap_in_history: bool = False,
|
wap_in_history: bool = False,
|
||||||
trigger_all: bool = False, # flag used by prepend history updates
|
trigger_all: bool = False, # flag used by prepend history updates
|
||||||
|
prepend_update_index: Optional[int] = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# TODO: eventually optimize this whole graphics stack with ``numba``
|
# TODO: eventually optimize this whole graphics stack with ``numba``
|
||||||
|
@ -368,6 +369,17 @@ def graphics_update_cycle(
|
||||||
profiler('maxmin call')
|
profiler('maxmin call')
|
||||||
liv = r >= i_step # the last datum is in view
|
liv = r >= i_step # the last datum is in view
|
||||||
|
|
||||||
|
if (
|
||||||
|
prepend_update_index is not None
|
||||||
|
and lbar > prepend_update_index
|
||||||
|
):
|
||||||
|
# on a history update (usually from the FSP subsys)
|
||||||
|
# if the segment of history that is being prepended
|
||||||
|
# isn't in view there is no reason to do a graphics
|
||||||
|
# update.
|
||||||
|
log.debug('Skipping prepend graphics cycle: frame not in view')
|
||||||
|
return
|
||||||
|
|
||||||
# don't real-time "shift" the curve to the
|
# don't real-time "shift" the curve to the
|
||||||
# left unless we get one of the following:
|
# left unless we get one of the following:
|
||||||
if (
|
if (
|
||||||
|
@ -639,7 +651,7 @@ async def display_symbol_data(
|
||||||
)
|
)
|
||||||
|
|
||||||
# historical data fetch
|
# historical data fetch
|
||||||
brokermod = brokers.get_brokermod(provider)
|
# brokermod = brokers.get_brokermod(provider)
|
||||||
|
|
||||||
# ohlc_status_done = sbar.open_status(
|
# ohlc_status_done = sbar.open_status(
|
||||||
# 'retreiving OHLC history.. ',
|
# 'retreiving OHLC history.. ',
|
||||||
|
|
|
@ -435,12 +435,16 @@ class FspAdmin:
|
||||||
# wait for graceful shutdown signal
|
# wait for graceful shutdown signal
|
||||||
async with stream.subscribe() as stream:
|
async with stream.subscribe() as stream:
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
if msg == 'update':
|
info = msg.get('fsp_update')
|
||||||
|
if info:
|
||||||
# if the chart isn't hidden try to update
|
# if the chart isn't hidden try to update
|
||||||
# the data on screen.
|
# the data on screen.
|
||||||
if not self.linked.isHidden():
|
if not self.linked.isHidden():
|
||||||
log.info(f'Re-syncing graphics for fsp: {ns_path}')
|
log.info(f'Re-syncing graphics for fsp: {ns_path}')
|
||||||
self.linked.graphics_cycle(trigger_all=True)
|
self.linked.graphics_cycle(
|
||||||
|
trigger_all=True,
|
||||||
|
prepend_update_index=info['first'],
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log.info(f'recved unexpected fsp engine msg: {msg}')
|
log.info(f'recved unexpected fsp engine msg: {msg}')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue