Guard against empty array read in step update task

fspd_cluster
Tyler Goodlet 2021-11-01 10:03:58 -04:00
parent 8f023cd66f
commit ca467f45b6
2 changed files with 37 additions and 13 deletions

View File

@ -272,9 +272,8 @@ class ShmArray:
return end return end
except ValueError as err: except ValueError as err:
# shoudl raise if diff detected # should raise if diff detected
self.diff_err_fields(data) self.diff_err_fields(data)
raise err raise err
def diff_err_fields( def diff_err_fields(

View File

@ -61,18 +61,19 @@ log = get_logger(__name__)
_quote_throttle_rate: int = 58 # Hz _quote_throttle_rate: int = 58 # Hz
def update_fsp_chart( def try_read(array: np.ndarray) -> Optional[np.ndarray]:
chart: ChartPlotWidget, '''
shm: ShmArray, Try to read the last row from a shared mem array or ``None``
graphics_name: str, if the array read returns a zero-length array result.
array_key: Optional[str],
) -> None: Can be used to check for backfilling race conditions where an array
is currently being (re-)written by a writer actor but the reader is
array = shm.array unaware and reads during the window where the first and last indexes
are being updated.
'''
try: try:
last_row = array[-1] return array[-1]
except IndexError: except IndexError:
# XXX: race condition with backfilling shm. # XXX: race condition with backfilling shm.
# #
@ -85,6 +86,23 @@ def update_fsp_chart(
# signal that a prepend is taking place and this consumer can # signal that a prepend is taking place and this consumer can
# respond (eg. redrawing graphics) accordingly. # respond (eg. redrawing graphics) accordingly.
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
# the array read was emtpy
return None
def update_fsp_chart(
chart: ChartPlotWidget,
shm: ShmArray,
graphics_name: str,
array_key: Optional[str],
) -> None:
array = shm.array
last_row = try_read(array)
# guard against unreadable case
if not last_row:
return return
# update graphics # update graphics
@ -850,13 +868,17 @@ async def check_for_new_bars(
# current bar) and then either write the current bar manually # current bar) and then either write the current bar manually
# or place a cursor for visual cue of the current time step. # or place a cursor for visual cue of the current time step.
array = ohlcv.array
# avoid unreadable race case on backfills
while not try_read(array):
await trio.sleep(0.01)
# XXX: this puts a flat bar on the current time step # XXX: this puts a flat bar on the current time step
# TODO: if we eventually have an x-axis time-step "cursor" # TODO: if we eventually have an x-axis time-step "cursor"
# we can get rid of this since it is extra overhead. # we can get rid of this since it is extra overhead.
price_chart.update_ohlc_from_array( price_chart.update_ohlc_from_array(
price_chart.name, price_chart.name,
ohlcv.array, array,
just_history=False, just_history=False,
) )
@ -870,6 +892,9 @@ async def check_for_new_bars(
# each subplot # each subplot
for name, chart in linkedsplits.subplots.items(): for name, chart in linkedsplits.subplots.items():
# TODO: do we need the same unreadable guard as for the
# price chart (above) here?
chart.update_curve_from_array( chart.update_curve_from_array(
chart.name, chart.name,
chart._shm.array, chart._shm.array,