diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 7bc4510c..2df3c126 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -18,6 +18,7 @@ core task logic for processing chains ''' +from dataclasses import dataclass from functools import partial from typing import AsyncIterator, Callable, Optional @@ -44,6 +45,12 @@ _fsp_builtins = { } +@dataclass +class TaskTracker: + complete: trio.Event + cs: trio.CancelScope + + async def filter_quotes_by_sym( sym: str, @@ -134,7 +141,8 @@ async def fsp_compute( # setup a respawn handle with trio.CancelScope() as cs: - task_status.started((cs, index)) + tracker = TaskTracker(trio.Event(), cs) + task_status.started((tracker, index)) profiler(f'{func_name} yield last index') # import time @@ -232,7 +240,7 @@ async def cascade( func=func ) - cs, index = await n.start(fsp_target) + tracker, index = await n.start(fsp_target) if zero_on_step: last = dst.array[-1:] @@ -263,17 +271,20 @@ async def cascade( new_len > last_len + 1 or abs(diff) > 1 ): - log.warning(f're-syncing fsp {func_name} to source') - cs.cancel() - cs, index = await n.start(fsp_target) - # TODO: adopt an incremental update engine/approach # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + tracker, index = await n.start(fsp_target) + + # skip adding a new bar since we should be fully aligned. + continue # read out last shm row, copy and write new row array = dst.array - # some metrics, like vlm should be reset + # some metrics like vlm should be reset # to zero every step. if zero_on_step: last = zeroed