diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 2169e262..5d38cbbd 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -272,9 +272,8 @@ class ShmArray: return end except ValueError as err: - # shoudl raise if diff detected + # should raise if diff detected self.diff_err_fields(data) - raise err def diff_err_fields( diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 463f243d..f6b8dce3 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -61,18 +61,19 @@ log = get_logger(__name__) _quote_throttle_rate: int = 58 # Hz -def update_fsp_chart( - chart: ChartPlotWidget, - shm: ShmArray, - graphics_name: str, - array_key: Optional[str], +def try_read(array: np.ndarray) -> Optional[np.ndarray]: + ''' + Try to read the last row from a shared mem array or ``None`` + if the array read returns a zero-length array result. -) -> None: - - array = shm.array + Can be used to check for backfilling race conditions where an array + is currently being (re-)written by a writer actor but the reader is + unaware and reads during the window where the first and last indexes + are being updated. + ''' try: - last_row = array[-1] + return array[-1] except IndexError: # XXX: race condition with backfilling shm. # @@ -85,6 +86,23 @@ def update_fsp_chart( # signal that a prepend is taking place and this consumer can # respond (eg. redrawing graphics) accordingly. log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') + + # the array read was emtpy + return None + + +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + graphics_name: str, + array_key: Optional[str], + +) -> None: + + array = shm.array + last_row = try_read(array) + # guard against unreadable case + if not last_row: return # update graphics @@ -850,13 +868,17 @@ async def check_for_new_bars( # current bar) and then either write the current bar manually # or place a cursor for visual cue of the current time step. + array = ohlcv.array + # avoid unreadable race case on backfills + while not try_read(array): + await trio.sleep(0.01) + # XXX: this puts a flat bar on the current time step # TODO: if we eventually have an x-axis time-step "cursor" # we can get rid of this since it is extra overhead. - price_chart.update_ohlc_from_array( price_chart.name, - ohlcv.array, + array, just_history=False, ) @@ -870,6 +892,9 @@ async def check_for_new_bars( # each subplot for name, chart in linkedsplits.subplots.items(): + + # TODO: do we need the same unreadable guard as for the + # price chart (above) here? chart.update_curve_from_array( chart.name, chart._shm.array,