diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index e246e0b7..0494ef3c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -123,20 +123,11 @@ async def fsp_compute( # is `index` aware such that historical data can be indexed # relative to the true first datum? Not sure if this is sane # for incremental compuations. - dst._first.value = src._first.value - dst._last.value = src._first.value - - # compare with source signal and time align - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff > 0: - log.warning(f"WTF DIFF fsp to ohlc history {diff}") - for _ in range(diff): - dst.push(history[:1]) + first = dst._first.value = src._first.value # TODO: can we use this `start` flag instead of the manual # setting above? - index = dst.push(history) #, start=src._first.value) + index = dst.push(history, start=first) profiler(f'{func_name} pushed history') profiler.finish() @@ -146,8 +137,8 @@ async def fsp_compute( task_status.started((cs, index)) profiler(f'{func_name} yield last index') - import time - last = time.time() + # import time + # last = time.time() # rt stream async for processed in out_stream: @@ -254,12 +245,12 @@ async def cascade( async for msg in stream: + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. new_len = len(src.array) - if new_len > last_len + 1: - # respawn the signal compute task if the source - # signal has been updated - log.warning(f'Re-spawning fsp {func_name}') + log.warning(f're-syncing fsp {func_name} to source') cs.cancel() cs, index = await n.start(fsp_target) @@ -268,8 +259,22 @@ async def cascade( # read out last shm row, copy and write new row array = dst.array + # TODO: some signals, like vlm should be reset to # zero every step. last = array[-1:].copy() dst.push(last) last_len = new_len + + # compare again with source and make sure + # histories are index aligned. + diff = src.index - dst.index + if diff: + if abs(diff) < 10: + log.warning( + f'syncing fsp to source by offset: {diff}') + history = dst.array + dst.push(history[:-1], start=src._first.value) + else: + log.warning( + f'large offset {diff} re-spawn ongoing?..')