From 9cd63ffc9951380f68b01ee653c62cc95bb6b70d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 08:27:03 -0400 Subject: [PATCH] Move "desynced" logic into a predicate --- piker/fsp/_engine.py | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 2a44ce3d..83d3ca32 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -270,21 +270,23 @@ async def cascade( async for msg in stream: - # respawn the compute task if the source - # array has been updated such that we compute - # new history from the (prepended) source. - diff = src.index - dst.index + def desynced(src: ShmArray, dst: ShmArray) -> bool: + diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or - # new_len = len(src.array) + # we aren't step synced to the source and may be + # leading/lagging by a step + diff > 1 or + diff < 0 + ) async def poll_and_sync_to_step(tracker): - diff = src.index - dst.index - while True: - if diff in (0, 1): - break - + while desynced(src, dst): tracker, index = await resync(tracker) - diff = src.index - dst.index # log.info( # '\n'.join(( # f'history index after sync: {index}', @@ -294,18 +296,10 @@ async def cascade( return tracker, diff - # log.debug(f'diff {diff}') - - if ( - # the source is likely backfilling and we must - # sync history calculations - abs(len(src.array) - len(dst.array)) > 0 or - - # we aren't step synced to the source and may be - # leading/lagging by a step - diff > 1 or - diff < 0 - ): + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + if desynced(src, dst): tracker, diff = await poll_and_sync_to_step(tracker) # skip adding a last bar since we should be