Move "desynced" logic into a predicate
parent
3dd82c8d31
commit
53dedbd645
|
@ -269,21 +269,23 @@ async def cascade(
|
|||
|
||||
async for msg in stream:
|
||||
|
||||
# respawn the compute task if the source
|
||||
# array has been updated such that we compute
|
||||
# new history from the (prepended) source.
|
||||
def desynced(src: ShmArray, dst: ShmArray) -> bool:
|
||||
diff = src.index - dst.index
|
||||
len_diff = abs(len(src.array) - len(dst.array))
|
||||
return (
|
||||
# the source is likely backfilling and we must
|
||||
# sync history calculations
|
||||
len_diff > 2 or
|
||||
|
||||
# new_len = len(src.array)
|
||||
# we aren't step synced to the source and may be
|
||||
# leading/lagging by a step
|
||||
diff > 1 or
|
||||
diff < 0
|
||||
)
|
||||
|
||||
async def poll_and_sync_to_step(tracker):
|
||||
diff = src.index - dst.index
|
||||
while True:
|
||||
if diff in (0, 1):
|
||||
break
|
||||
|
||||
while desynced(src, dst):
|
||||
tracker, index = await resync(tracker)
|
||||
diff = src.index - dst.index
|
||||
# log.info(
|
||||
# '\n'.join((
|
||||
# f'history index after sync: {index}',
|
||||
|
@ -293,18 +295,10 @@ async def cascade(
|
|||
|
||||
return tracker, diff
|
||||
|
||||
# log.debug(f'diff {diff}')
|
||||
|
||||
if (
|
||||
# the source is likely backfilling and we must
|
||||
# sync history calculations
|
||||
abs(len(src.array) - len(dst.array)) > 0 or
|
||||
|
||||
# we aren't step synced to the source and may be
|
||||
# leading/lagging by a step
|
||||
diff > 1 or
|
||||
diff < 0
|
||||
):
|
||||
# respawn the compute task if the source
|
||||
# array has been updated such that we compute
|
||||
# new history from the (prepended) source.
|
||||
if desynced(src, dst):
|
||||
tracker, diff = await poll_and_sync_to_step(tracker)
|
||||
|
||||
# skip adding a last bar since we should be
|
||||
|
|
Loading…
Reference in New Issue