Drunkfix: finally solve the fsp alignment race?
parent
6f83e358fe
commit
1981b113b7
|
@ -254,8 +254,15 @@ async def cascade(
|
||||||
# respawn the compute task if the source
|
# respawn the compute task if the source
|
||||||
# array has been updated such that we compute
|
# array has been updated such that we compute
|
||||||
# new history from the (prepended) source.
|
# new history from the (prepended) source.
|
||||||
|
diff = src.index - dst.index
|
||||||
new_len = len(src.array)
|
new_len = len(src.array)
|
||||||
if new_len > last_len + 1:
|
|
||||||
|
# XXX: ok no idea why this works but "drunk fix"
|
||||||
|
# says it don't matter.
|
||||||
|
if (
|
||||||
|
new_len > last_len + 1 or
|
||||||
|
abs(diff) > 1
|
||||||
|
):
|
||||||
log.warning(f're-syncing fsp {func_name} to source')
|
log.warning(f're-syncing fsp {func_name} to source')
|
||||||
cs.cancel()
|
cs.cancel()
|
||||||
cs, index = await n.start(fsp_target)
|
cs, index = await n.start(fsp_target)
|
||||||
|
@ -266,24 +273,12 @@ async def cascade(
|
||||||
# read out last shm row, copy and write new row
|
# read out last shm row, copy and write new row
|
||||||
array = dst.array
|
array = dst.array
|
||||||
|
|
||||||
# TODO: some signals, like vlm should be reset to
|
# some metrics, like vlm should be reset
|
||||||
# zero every step.
|
# to zero every step.
|
||||||
last = array[-1:].copy()
|
|
||||||
if zero_on_step:
|
if zero_on_step:
|
||||||
last = zeroed
|
last = zeroed
|
||||||
|
else:
|
||||||
|
last = array[-1:].copy()
|
||||||
|
|
||||||
dst.push(last)
|
dst.push(last)
|
||||||
last_len = new_len
|
last_len = new_len
|
||||||
|
|
||||||
# compare again with source and make sure
|
|
||||||
# histories are index aligned.
|
|
||||||
diff = src.index - dst.index
|
|
||||||
if diff:
|
|
||||||
if abs(diff) < 10:
|
|
||||||
log.warning(
|
|
||||||
f'syncing fsp to source by offset: {diff}')
|
|
||||||
history = dst.array
|
|
||||||
dst.push(history[:-1], start=src._first.value)
|
|
||||||
else:
|
|
||||||
log.warning(
|
|
||||||
f'large offset {diff} re-spawn ongoing?..')
|
|
||||||
|
|
Loading…
Reference in New Issue