Ensure FSPs last 2 times are synced with its source

epoch_index_backup
Tyler Goodlet 2022-12-08 15:37:59 -05:00
parent de791e62c8
commit b4384209b6
1 changed files with 23 additions and 14 deletions

View File

@ -114,7 +114,7 @@ async def fsp_compute(
# HISTORY COMPUTE PHASE # HISTORY COMPUTE PHASE
# conduct a single iteration of fsp with historical bars input # conduct a single iteration of fsp with historical bars input
# and get historical output, pack into # and get historical output.
history_output: Union[ history_output: Union[
dict[str, np.ndarray], # multi-output case dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case np.ndarray, # single output case
@ -415,12 +415,12 @@ async def cascade(
return not ( return not (
# the source is likely backfilling and we must # the source is likely backfilling and we must
# sync history calculations # sync history calculations
len_diff > 2 or len_diff > 2
# we aren't step synced to the source and may be # we aren't step synced to the source and may be
# leading/lagging by a step # leading/lagging by a step
step_diff > 1 or or step_diff > 1
step_diff < 0 or step_diff < 0
), step_diff, len_diff ), step_diff, len_diff
async def poll_and_sync_to_step( async def poll_and_sync_to_step(
@ -485,15 +485,24 @@ async def cascade(
else: else:
last = array[-1:].copy() last = array[-1:].copy()
# sync with source time step
src_t = src.array['time'][-1]
last['time'] = src_t
dst.push(last) dst.push(last)
# dst_t = dst.array['time'][-1] # sync with source buffer's time step
# print( src_l2 = src.array[-2:]
# f'{dst.token}\n' src_li, src_lt = src_l2[-1][['index', 'time']]
# f'src: {src_t}\n' src_2li, src_2lt = src_l2[-2][['index', 'time']]
# f'dst: {dst_t}\n' dst._array['time'][src_li] = src_lt
# ) dst._array['time'][src_2li] = src_2lt
# last2 = dst.array[-2:]
# if (
# last2[-1]['index'] != src_li
# or last2[-2]['index'] != src_2li
# ):
# dstl2 = list(last2)
# srcl2 = list(src_l2)
# print(
# # f'{dst.token}\n'
# f'src: {srcl2}\n'
# f'dst: {dstl2}\n'
# )