diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 65d6f4c0..f66f81c5 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -37,7 +37,6 @@ import asyncio from pprint import pformat import inspect import logging -import platform from random import randint import time @@ -1583,7 +1582,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. - count: int = 100, + count: int = 16, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1603,11 +1602,6 @@ async def backfill_bars( # async with open_history_client(fqsn) as proxy: async with open_client_proxy() as proxy: - if platform.system() == 'Windows': - log.warning( - 'Decreasing history query count to 4 since, windows...') - count = 4 - out, fails = await get_bars(proxy, fqsn) if out is None: diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 7e75c283..c9c53d60 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -76,7 +76,6 @@ async def filter_quotes_by_sym( async def fsp_compute( - ctx: tractor.Context, symbol: Symbol, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -86,7 +85,7 @@ async def fsp_compute( func: Callable, - attach_stream: bool = False, + # attach_stream: bool = False, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -127,8 +126,8 @@ async def fsp_compute( # each respective field. fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') - # TODO: nptyping here! - history: Optional[np.ndarray] = None + history: Optional[np.ndarray] = None # TODO: nptyping here! + if fields and len(fields) > 1 and fields: if not isinstance(history_output, dict): raise ValueError( @@ -193,40 +192,47 @@ async def fsp_compute( profiler(f'{func_name} pushed history') profiler.finish() - # TODO: UGH, what is the right way to do something like this? - if not ctx._started_called: - await ctx.started(index) - # setup a respawn handle with trio.CancelScope() as cs: + + # TODO: might be better to just make a "restart" method where + # the target task is spawned implicitly and then the event is + # set via some higher level api? At that poing we might as well + # be writing a one-cancels-one nursery though right? tracker = TaskTracker(trio.Event(), cs) task_status.started((tracker, index)) + profiler(f'{func_name} yield last index') # import time # last = time.time() try: - # rt stream - async with ctx.open_stream() as stream: - async for processed in out_stream: - log.debug(f"{func_name}: {processed}") - key, output = processed - index = src.index - dst.array[-1][key] = output + async for processed in out_stream: - # NOTE: for now we aren't streaming this to the consumer - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - if attach_stream: - await stream.send(index) + log.debug(f"{func_name}: {processed}") + key, output = processed + index = src.index + dst.array[-1][key] = output - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - # last = time.time() + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + # TODO: further this should likely be implemented much + # like our `Feed` api where there is one background + # "service" task which computes output and then sends to + # N-consumers who subscribe for the real-time output, + # which we'll likely want to implement using local-mem + # chans for the fan out? + # if attach_stream: + # await client_stream.send(index) + + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() finally: tracker.complete.set() @@ -314,7 +320,6 @@ async def cascade( fsp_target = partial( fsp_compute, - ctx=ctx, symbol=symbol, feed=feed, quote_stream=quote_stream, @@ -323,7 +328,7 @@ async def cascade( src=src, dst=dst, - # func_name=func_name, + # target func=func ) @@ -335,90 +340,113 @@ async def cascade( profiler(f'{func_name}: fsp up') - async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - log.warning(f're-syncing fsp {func_name} to source') - tracker.cs.cancel() - await tracker.complete.wait() - return await n.start(fsp_target) + # sync client + await ctx.started(index) - def is_synced( - src: ShmArray, - dst: ShmArray - ) -> tuple[bool, int, int]: - '''Predicate to dertmine if a destination FSP - output array is aligned to its source array. + # XXX: rt stream with client which we MUST + # open here (and keep it open) in order to make + # incremental "updates" as history prepends take + # place. + async with ctx.open_stream() as client_stream: - ''' - step_diff = src.index - dst.index - len_diff = abs(len(src.array) - len(dst.array)) - return not ( - # the source is likely backfilling and we must - # sync history calculations - len_diff > 2 or + # TODO: these likely should all become + # methods of this ``TaskLifetime`` or wtv + # abstraction.. + async def resync( + tracker: TaskTracker, - # we aren't step synced to the source and may be - # leading/lagging by a step - step_diff > 1 or - step_diff < 0 - ), step_diff, len_diff + ) -> tuple[TaskTracker, int]: + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + tracker, index = await n.start(fsp_target) - async def poll_and_sync_to_step( + # always trigger UI refresh after history update, + # see ``piker.ui._fsp.FspAdmin.open_chain()`` and + # ``piker.ui._display.trigger_update()``. + await client_stream.send('update') + return tracker, index - tracker: TaskTracker, - src: ShmArray, - dst: ShmArray, + def is_synced( + src: ShmArray, + dst: ShmArray + ) -> tuple[bool, int, int]: + '''Predicate to dertmine if a destination FSP + output array is aligned to its source array. - ) -> tuple[TaskTracker, int]: + ''' + step_diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return not ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or + + # we aren't step synced to the source and may be + # leading/lagging by a step + step_diff > 1 or + step_diff < 0 + ), step_diff, len_diff + + async def poll_and_sync_to_step( + + tracker: TaskTracker, + src: ShmArray, + dst: ShmArray, + + ) -> tuple[TaskTracker, int]: - synced, step_diff, _ = is_synced(src, dst) - while not synced: - tracker, index = await resync(tracker) synced, step_diff, _ = is_synced(src, dst) + while not synced: + tracker, index = await resync(tracker) + synced, step_diff, _ = is_synced(src, dst) - return tracker, step_diff + return tracker, step_diff - s, step, ld = is_synced(src, dst) + s, step, ld = is_synced(src, dst) - # detect sample period step for subscription to increment - # signal - times = src.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # detect sample period step for subscription to increment + # signal + times = src.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - async with feed.index_stream(int(delay_s)) as istream: + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. + async with feed.index_stream( + int(delay_s) + ) as istream: - profiler(f'{func_name}: sample stream up') - profiler.finish() + profiler(f'{func_name}: sample stream up') + profiler.finish() - async for _ in istream: + async for _ in istream: - # respawn the compute task if the source - # array has been updated such that we compute - # new history from the (prepended) source. - synced, step_diff, _ = is_synced(src, dst) - if not synced: - tracker, step_diff = await poll_and_sync_to_step( - tracker, - src, - dst, - ) + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + synced, step_diff, _ = is_synced(src, dst) + if not synced: + tracker, step_diff = await poll_and_sync_to_step( + tracker, + src, + dst, + ) - # skip adding a last bar since we should already - # be step alinged - if step_diff == 0: - continue + # skip adding a last bar since we should already + # be step alinged + if step_diff == 0: + continue - # read out last shm row, copy and write new row - array = dst.array + # read out last shm row, copy and write new row + array = dst.array - # some metrics like vlm should be reset - # to zero every step. - if zero_on_step: - last = zeroed - else: - last = array[-1:].copy() + # some metrics like vlm should be reset + # to zero every step. + if zero_on_step: + last = zeroed + else: + last = array[-1:].copy() - dst.push(last) + dst.push(last) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 7938e0d8..3fcaae07 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -19,7 +19,7 @@ High level chart-widget apis. ''' from __future__ import annotations -from typing import Optional +from typing import Optional, TYPE_CHECKING from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import Qt @@ -63,6 +63,8 @@ from ._interaction import ChartView from ._forms import FieldsForm from ._overlay import PlotItemOverlay +if TYPE_CHECKING: + from ._display import DisplayState log = get_logger(__name__) @@ -230,6 +232,7 @@ class GodWidget(QWidget): # chart is already in memory so just focus it linkedsplits.show() linkedsplits.focus() + linkedsplits.graphics_cycle() await trio.sleep(0) # resume feeds *after* rendering chart view asap @@ -346,8 +349,19 @@ class LinkedSplits(QWidget): self.layout.setContentsMargins(0, 0, 0, 0) self.layout.addWidget(self.splitter) + # chart-local graphics state that can be passed to + # a ``graphic_update_cycle()`` call by any task wishing to + # update the UI for a given "chart instance". + self.display_state: Optional[DisplayState] = None + self._symbol: Symbol = None + def graphics_cycle(self, **kwargs) -> None: + from . import _display + ds = self.display_state + if ds: + return _display.graphics_update_cycle(ds, **kwargs) + @property def symbol(self) -> Symbol: return self._symbol diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 9957baa4..398a180c 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,9 +21,10 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' +from dataclasses import dataclass from functools import partial import time -from typing import Optional +from typing import Optional, Any, Callable import numpy as np import tractor @@ -31,6 +32,7 @@ import trio from .. import brokers from ..data.feed import open_feed +from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -109,6 +111,33 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view +@dataclass +class DisplayState: + ''' + Chart-local real-time graphics state container. + + ''' + quotes: dict[str, Any] + + maxmin: Callable + ohlcv: ShmArray + + # high level chart handles + linked: LinkedSplits + chart: ChartPlotWidget + vlm_chart: ChartPlotWidget + + # axis labels + l1: L1Labels + last_price_sticky: YAxisLabel + vlm_sticky: YAxisLabel + + # misc state tracking + vars: dict[str, Any] + + wap_in_history: bool = False + + async def graphics_update_loop( linked: LinkedSplits, @@ -147,7 +176,6 @@ async def graphics_update_loop( if vlm_chart: vlm_sticky = vlm_chart._ysticks['volume'] - vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) chart.default_view() @@ -183,7 +211,7 @@ async def graphics_update_loop( tick_margin = 3 * tick_size chart.show() - view = chart.view + # view = chart.view last_quote = time.time() i_last = ohlcv.index @@ -210,7 +238,29 @@ async def graphics_update_loop( # async for quotes in iter_drain_quotes(): + ds = linked.display_state = DisplayState(**{ + 'quotes': {}, + 'linked': linked, + 'maxmin': maxmin, + 'ohlcv': ohlcv, + 'chart': chart, + 'last_price_sticky': last_price_sticky, + 'vlm_chart': vlm_chart, + 'vlm_sticky': vlm_sticky, + 'l1': l1, + + 'vars': { + 'tick_margin': tick_margin, + 'i_last': i_last, + 'last_mx_vlm': last_mx_vlm, + 'last_mx': last_mx, + 'last_mn': last_mn, + } + }) + + # main loop async for quotes in stream: + ds.quotes = quotes quote_period = time.time() - last_quote quote_rate = round( 1/quote_period, 1) if quote_period > 0 else float('inf') @@ -231,222 +281,263 @@ async def graphics_update_loop( chart.pause_all_feeds() continue - for sym, quote in quotes.items(): + # sync call to update all graphics/UX components. + graphics_update_cycle(ds) + +def graphics_update_cycle( + ds: DisplayState, + wap_in_history: bool = False, + trigger_all: bool = False, # flag used by prepend history updates + +) -> None: + + # TODO: eventually optimize this whole graphics stack with ``numba`` + # hopefully XD + + # unpack multi-referenced components + chart = ds.chart + vlm_chart = ds.vlm_chart + l1 = ds.l1 + + ohlcv = ds.ohlcv + array = ohlcv.array + vars = ds.vars + tick_margin = vars['tick_margin'] + + for sym, quote in ds.quotes.items(): + + # NOTE: vlm may be written by the ``brokerd`` backend + # event though a tick sample is not emitted. + # TODO: show dark trades differently + # https://github.com/pikers/piker/issues/116 + + # NOTE: this used to be implemented in a dedicated + # "increment tas": ``check_for_new_bars()`` but it doesn't + # make sense to do a whole task switch when we can just do + # this simple index-diff and all the fsp sub-curve graphics + # are diffed on each draw cycle anyway; so updates to the + # "curve" length is already automatic. + + # increment the view position by the sample offset. + i_step = ohlcv.index + i_diff = i_step - vars['i_last'] + if i_diff > 0: + chart.increment_view( + steps=i_diff, + ) + vars['i_last'] = i_step + + ( + brange, + mx_in_view, + mn_in_view, + mx_vlm_in_view, + ) = ds.maxmin() + + l, lbar, rbar, r = brange + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin + liv = r > i_step # the last datum is in view + + # don't real-time "shift" the curve to the + # left under the following conditions: + if ( ( - brange, - mx_in_view, - mn_in_view, - mx_vlm_in_view, - ) = maxmin() - l, lbar, rbar, r = brange - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin + i_diff > 0 # no new sample step + and liv + ) + or trigger_all + ): + # TODO: we should track and compute whether the last + # pixel in a curve should show new data based on uppx + # and then iff update curves and shift? + chart.increment_view(steps=i_diff) - # NOTE: vlm may be written by the ``brokerd`` backend - # event though a tick sample is not emitted. - # TODO: show dark trades differently - # https://github.com/pikers/piker/issues/116 - array = ohlcv.array + if vlm_chart: + vlm_chart.update_curve_from_array('volume', array) + ds.vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) - # NOTE: this used to be implemented in a dedicated - # "increment tas": ``check_for_new_bars()`` but it doesn't - # make sense to do a whole task switch when we can just do - # this simple index-diff and all the fsp sub-curve graphics - # are diffed on each draw cycle anyway; so updates to the - # "curve" length is already automatic. - - # increment the view position by the sample offset. - i_step = ohlcv.index - i_diff = i_step - i_last - if i_diff > 0: - chart.increment_view( - steps=i_diff, - ) - i_last = i_step - - if vlm_chart: - vlm_chart.update_curve_from_array('volume', array) - vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) - - if ( - mx_vlm_in_view != last_mx_vlm or - mx_vlm_in_view > last_mx_vlm - ): - # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') - vlm_view._set_yrange( - yrange=(0, mx_vlm_in_view * 1.375) - ) - last_mx_vlm = mx_vlm_in_view - - for curve_name, flow in vlm_chart._flows.items(): - update_fsp_chart( - vlm_chart, - flow.shm, - curve_name, - array_key=curve_name, - ) - # is this even doing anything? - flow.plot.vb._set_yrange( - autoscale_linked_plots=False, - name=curve_name, - ) - - ticks_frame = quote.get('ticks', ()) - - frames_by_type: dict[str, dict] = {} - lasts = {} - - # build tick-type "frames" of tick sequences since - # likely the tick arrival rate is higher then our - # (throttled) quote stream rate. - for tick in ticks_frame: - price = tick.get('price') - ticktype = tick.get('type') - - if ticktype == 'n/a' or price == -1: - # okkk.. - continue - - # keys are entered in olded-event-inserted-first order - # since we iterate ``ticks_frame`` in standard order - # above. in other words the order of the keys is the order - # of tick events by type from the provider feed. - frames_by_type.setdefault(ticktype, []).append(tick) - - # overwrites so the last tick per type is the entry - lasts[ticktype] = tick - - # from pprint import pformat - # frame_counts = { - # typ: len(frame) for typ, frame in frames_by_type.items() - # } - # print(f'{pformat(frame_counts)}') - # print(f'framed: {pformat(frames_by_type)}') - # print(f'lasts: {pformat(lasts)}') - - # TODO: eventually we want to separate out the utrade (aka - # dark vlm prices) here and show them as an additional - # graphic. - clear_types = _tick_groups['clears'] - - # XXX: if we wanted to iterate in "latest" (i.e. most - # current) tick first order as an optimization where we only - # update from the last tick from each type class. - # last_clear_updated: bool = False - # for typ, tick in reversed(lasts.items()): - - # iterate in FIFO order per frame - for typ, tick in lasts.items(): - - price = tick.get('price') - size = tick.get('size') - - # compute max and min prices (including bid/ask) from - # tick frames to determine the y-range for chart - # auto-scaling. - # TODO: we need a streaming minmax algo here, see def above. - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) - - if typ in clear_types: - - # XXX: if we only wanted to update graphics from the - # "current"/"latest received" clearing price tick - # once (see alt iteration order above). - # if last_clear_updated: - # continue - - # last_clear_updated = True - # we only want to update grahpics from the *last* - # tick event that falls under the "clearing price" - # set. - - # update price sticky(s) - end = array[-1] - last_price_sticky.update_from_data( - *end[['index', 'close']] - ) - - # update ohlc sampled price bars - chart.update_ohlc_from_array( - chart.name, - array, - ) - - if wap_in_history: - # update vwap overlay line - chart.update_curve_from_array('bar_wap', ohlcv.array) - - # L1 book label-line updates - # XXX: is this correct for ib? - # if ticktype in ('trade', 'last'): - # if ticktype in ('last',): # 'size'): - if typ in ('last',): # 'size'): - - label = { - l1.ask_label.fields['level']: l1.ask_label, - l1.bid_label.fields['level']: l1.bid_label, - }.get(price) - - if label is not None: - label.update_fields({'level': price, 'size': size}) - - # TODO: on trades should we be knocking down - # the relevant L1 queue? - # label.size -= size - - # elif ticktype in ('ask', 'asize'): - elif typ in _tick_groups['asks']: - l1.ask_label.update_fields({'level': price, 'size': size}) - - # elif ticktype in ('bid', 'bsize'): - elif typ in _tick_groups['bids']: - l1.bid_label.update_fields({'level': price, 'size': size}) - - # check for y-range re-size if ( - (mx > last_mx) or (mn < last_mn) - and not chart._static_yrange == 'axis' + mx_vlm_in_view > vars['last_mx_vlm'] + or trigger_all ): - # print(f'new y range: {(mn, mx)}') - view._set_yrange( - yrange=(mn, mx), - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, + # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') + vlm_chart.view._set_yrange( + yrange=(0, mx_vlm_in_view * 1.375) ) + vars['last_mx_vlm'] = mx_vlm_in_view - last_mx, last_mn = mx, mn - - # run synchronous update on all derived fsp subplots - for name, subchart in linked.subplots.items(): + for curve_name, flow in vlm_chart._flows.items(): update_fsp_chart( - subchart, - subchart._shm, - - # XXX: do we really needs seperate names here? - name, - array_key=name, - ) - subchart.cv._set_yrange() - - # TODO: all overlays on all subplots.. - - # run synchronous update on all derived overlays - for curve_name, flow in chart._flows.items(): - update_fsp_chart( - chart, + vlm_chart, flow.shm, curve_name, array_key=curve_name, ) - # chart.view._set_yrange() + # is this even doing anything? + flow.plot.vb._set_yrange( + autoscale_linked_plots=False, + name=curve_name, + ) - # loop end + ticks_frame = quote.get('ticks', ()) + + frames_by_type: dict[str, dict] = {} + lasts = {} + + # build tick-type "frames" of tick sequences since + # likely the tick arrival rate is higher then our + # (throttled) quote stream rate. + for tick in ticks_frame: + price = tick.get('price') + ticktype = tick.get('type') + + if ticktype == 'n/a' or price == -1: + # okkk.. + continue + + # keys are entered in olded-event-inserted-first order + # since we iterate ``ticks_frame`` in standard order + # above. in other words the order of the keys is the order + # of tick events by type from the provider feed. + frames_by_type.setdefault(ticktype, []).append(tick) + + # overwrites so the last tick per type is the entry + lasts[ticktype] = tick + + # from pprint import pformat + # frame_counts = { + # typ: len(frame) for typ, frame in frames_by_type.items() + # } + # print(f'{pformat(frame_counts)}') + # print(f'framed: {pformat(frames_by_type)}') + # print(f'lasts: {pformat(lasts)}') + + # TODO: eventually we want to separate out the utrade (aka + # dark vlm prices) here and show them as an additional + # graphic. + clear_types = _tick_groups['clears'] + + # XXX: if we wanted to iterate in "latest" (i.e. most + # current) tick first order as an optimization where we only + # update from the last tick from each type class. + # last_clear_updated: bool = False + # for typ, tick in reversed(lasts.items()): + + # update ohlc sampled price bars + chart.update_ohlc_from_array( + chart.name, + array, + ) + + # iterate in FIFO order per frame + for typ, tick in lasts.items(): + + price = tick.get('price') + size = tick.get('size') + + # compute max and min prices (including bid/ask) from + # tick frames to determine the y-range for chart + # auto-scaling. + # TODO: we need a streaming minmax algo here, see def above. + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) + + if typ in clear_types: + + # XXX: if we only wanted to update graphics from the + # "current"/"latest received" clearing price tick + # once (see alt iteration order above). + # if last_clear_updated: + # continue + + # last_clear_updated = True + # we only want to update grahpics from the *last* + # tick event that falls under the "clearing price" + # set. + + # update price sticky(s) + end = array[-1] + ds.last_price_sticky.update_from_data( + *end[['index', 'close']] + ) + + if wap_in_history: + # update vwap overlay line + chart.update_curve_from_array( + 'bar_wap', + array, + ) + + # L1 book label-line updates + # XXX: is this correct for ib? + # if ticktype in ('trade', 'last'): + # if ticktype in ('last',): # 'size'): + if typ in ('last',): # 'size'): + + label = { + l1.ask_label.fields['level']: l1.ask_label, + l1.bid_label.fields['level']: l1.bid_label, + }.get(price) + + if label is not None: + label.update_fields( + {'level': price, 'size': size} + ) + + # TODO: on trades should we be knocking down + # the relevant L1 queue? + # label.size -= size + + # elif ticktype in ('ask', 'asize'): + elif typ in _tick_groups['asks']: + l1.ask_label.update_fields({'level': price, 'size': size}) + + # elif ticktype in ('bid', 'bsize'): + elif typ in _tick_groups['bids']: + l1.bid_label.update_fields({'level': price, 'size': size}) + + # check for y-range re-size + if ( + (mx > vars['last_mx']) or (mn < vars['last_mn']) + and not chart._static_yrange == 'axis' + ): + # print(f'new y range: {(mn, mx)}') + chart.view._set_yrange( + yrange=(mn, mx), + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + ) + + vars['last_mx'], vars['last_mn'] = mx, mn + + # run synchronous update on all derived fsp subplots + for name, subchart in ds.linked.subplots.items(): + update_fsp_chart( + subchart, + subchart._shm, + + # XXX: do we really needs seperate names here? + name, + array_key=name, + ) + subchart.cv._set_yrange() + + # TODO: all overlays on all subplots.. + + # run synchronous update on all derived overlays + for curve_name, flow in chart._flows.items(): + update_fsp_chart( + chart, + flow.shm, + curve_name, + array_key=curve_name, + ) async def display_symbol_data( @@ -479,8 +570,10 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) + fqsn = '.'.join((sym, provider)) + async with open_feed( - ['.'.join((sym, provider))], + [fqsn], loglevel=loglevel, # limit to at least display's FPS diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index d56cc2d5..a1193327 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -438,6 +438,17 @@ class FspAdmin: started.set() # wait for graceful shutdown signal + async with stream.subscribe() as stream: + async for msg in stream: + if msg == 'update': + # if the chart isn't hidden try to update + # the data on screen. + if not self.linked.isHidden(): + log.info(f'Re-syncing graphics for fsp: {ns_path}') + self.linked.graphics_cycle(trigger_all=True) + else: + log.info(f'recved unexpected fsp engine msg: {msg}') + await complete.wait() async def start_engine_task(