diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f9a4f797..a78308a4 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -21,7 +21,9 @@ core task logic for processing chains from dataclasses import dataclass from functools import partial from typing import ( - AsyncIterator, Callable, Optional, + AsyncIterator, + Callable, + Optional, Union, ) @@ -386,7 +388,7 @@ async def cascade( ) -> tuple[TaskTracker, int]: # TODO: adopt an incremental update engine/approach # where possible here eventually! - log.debug(f're-syncing fsp {func_name} to source') + log.info(f're-syncing fsp {func_name} to source') tracker.cs.cancel() await tracker.complete.wait() tracker, index = await n.start(fsp_target) @@ -429,6 +431,7 @@ async def cascade( tracker: TaskTracker, src: ShmArray, dst: ShmArray, + ) -> tuple[TaskTracker, int]: synced, step_diff, _ = is_synced(src, dst) @@ -444,7 +447,8 @@ async def cascade( # signal times = src.array['time'] if len(times) > 1: - delay_s = float(times[-1] - times[times != times[-1]][-1]) + last_ts = times[-1] + delay_s = float(last_ts - times[times != last_ts][-1]) else: # our default "HFT" sample rate. delay_s = _default_delay_s