Ensure FSPs last 2 times are synced with its source
parent
04c0d77595
commit
89095d4e9f
|
@ -114,7 +114,7 @@ async def fsp_compute(
|
|||
|
||||
# HISTORY COMPUTE PHASE
|
||||
# conduct a single iteration of fsp with historical bars input
|
||||
# and get historical output, pack into
|
||||
# and get historical output.
|
||||
history_output: Union[
|
||||
dict[str, np.ndarray], # multi-output case
|
||||
np.ndarray, # single output case
|
||||
|
@ -415,12 +415,12 @@ async def cascade(
|
|||
return not (
|
||||
# the source is likely backfilling and we must
|
||||
# sync history calculations
|
||||
len_diff > 2 or
|
||||
len_diff > 2
|
||||
|
||||
# we aren't step synced to the source and may be
|
||||
# leading/lagging by a step
|
||||
step_diff > 1 or
|
||||
step_diff < 0
|
||||
or step_diff > 1
|
||||
or step_diff < 0
|
||||
), step_diff, len_diff
|
||||
|
||||
async def poll_and_sync_to_step(
|
||||
|
@ -485,15 +485,24 @@ async def cascade(
|
|||
else:
|
||||
last = array[-1:].copy()
|
||||
|
||||
# sync with source time step
|
||||
src_t = src.array['time'][-1]
|
||||
last['time'] = src_t
|
||||
|
||||
dst.push(last)
|
||||
|
||||
# dst_t = dst.array['time'][-1]
|
||||
# print(
|
||||
# f'{dst.token}\n'
|
||||
# f'src: {src_t}\n'
|
||||
# f'dst: {dst_t}\n'
|
||||
# )
|
||||
# sync with source buffer's time step
|
||||
src_l2 = src.array[-2:]
|
||||
src_li, src_lt = src_l2[-1][['index', 'time']]
|
||||
src_2li, src_2lt = src_l2[-2][['index', 'time']]
|
||||
dst._array['time'][src_li] = src_lt
|
||||
dst._array['time'][src_2li] = src_2lt
|
||||
|
||||
# last2 = dst.array[-2:]
|
||||
# if (
|
||||
# last2[-1]['index'] != src_li
|
||||
# or last2[-2]['index'] != src_2li
|
||||
# ):
|
||||
# dstl2 = list(last2)
|
||||
# srcl2 = list(src_l2)
|
||||
# print(
|
||||
# # f'{dst.token}\n'
|
||||
# f'src: {srcl2}\n'
|
||||
# f'dst: {dstl2}\n'
|
||||
# )
|
||||
|
|
Loading…
Reference in New Issue