diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 357934d8..93bf0388 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -114,7 +114,7 @@ async def fsp_compute( # HISTORY COMPUTE PHASE # conduct a single iteration of fsp with historical bars input - # and get historical output, pack into + # and get historical output. history_output: Union[ dict[str, np.ndarray], # multi-output case np.ndarray, # single output case @@ -415,12 +415,12 @@ async def cascade( return not ( # the source is likely backfilling and we must # sync history calculations - len_diff > 2 or + len_diff > 2 # we aren't step synced to the source and may be # leading/lagging by a step - step_diff > 1 or - step_diff < 0 + or step_diff > 1 + or step_diff < 0 ), step_diff, len_diff async def poll_and_sync_to_step( @@ -485,15 +485,24 @@ async def cascade( else: last = array[-1:].copy() - # sync with source time step - src_t = src.array['time'][-1] - last['time'] = src_t - dst.push(last) - # dst_t = dst.array['time'][-1] - # print( - # f'{dst.token}\n' - # f'src: {src_t}\n' - # f'dst: {dst_t}\n' - # ) + # sync with source buffer's time step + src_l2 = src.array[-2:] + src_li, src_lt = src_l2[-1][['index', 'time']] + src_2li, src_2lt = src_l2[-2][['index', 'time']] + dst._array['time'][src_li] = src_lt + dst._array['time'][src_2li] = src_2lt + + # last2 = dst.array[-2:] + # if ( + # last2[-1]['index'] != src_li + # or last2[-2]['index'] != src_2li + # ): + # dstl2 = list(last2) + # srcl2 = list(src_l2) + # print( + # # f'{dst.token}\n' + # f'src: {srcl2}\n' + # f'dst: {dstl2}\n' + # )