diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 70e86481..afd6aa2c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -269,21 +269,23 @@ async def cascade( async for msg in stream: - # respawn the compute task if the source - # array has been updated such that we compute - # new history from the (prepended) source. - diff = src.index - dst.index + def desynced(src: ShmArray, dst: ShmArray) -> bool: + diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or - # new_len = len(src.array) + # we aren't step synced to the source and may be + # leading/lagging by a step + diff > 1 or + diff < 0 + ) async def poll_and_sync_to_step(tracker): - diff = src.index - dst.index - while True: - if diff in (0, 1): - break - + while desynced(src, dst): tracker, index = await resync(tracker) - diff = src.index - dst.index # log.info( # '\n'.join(( # f'history index after sync: {index}', @@ -293,18 +295,10 @@ async def cascade( return tracker, diff - # log.debug(f'diff {diff}') - - if ( - # the source is likely backfilling and we must - # sync history calculations - abs(len(src.array) - len(dst.array)) > 0 or - - # we aren't step synced to the source and may be - # leading/lagging by a step - diff > 1 or - diff < 0 - ): + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + if desynced(src, dst): tracker, diff = await poll_and_sync_to_step(tracker) # skip adding a last bar since we should be