Do fsp sync-to-source in sample step task

vlm_plotz_backup
Tyler Goodlet 2021-09-30 10:46:44 -04:00
parent 2a723ac994
commit 3ed0739bbe
1 changed files with 22 additions and 17 deletions

View File

@ -124,20 +124,11 @@ async def fsp_compute(
# is `index` aware such that historical data can be indexed # is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane # relative to the true first datum? Not sure if this is sane
# for incremental compuations. # for incremental compuations.
dst._first.value = src._first.value first = dst._first.value = src._first.value
dst._last.value = src._first.value
# compare with source signal and time align
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff > 0:
log.warning(f"WTF DIFF fsp to ohlc history {diff}")
for _ in range(diff):
dst.push(history[:1])
# TODO: can we use this `start` flag instead of the manual # TODO: can we use this `start` flag instead of the manual
# setting above? # setting above?
index = dst.push(history) #, start=src._first.value) index = dst.push(history, start=first)
profiler(f'{func_name} pushed history') profiler(f'{func_name} pushed history')
profiler.finish() profiler.finish()
@ -147,8 +138,8 @@ async def fsp_compute(
task_status.started((cs, index)) task_status.started((cs, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
import time # import time
last = time.time() # last = time.time()
# rt stream # rt stream
async for processed in out_stream: async for processed in out_stream:
@ -255,12 +246,12 @@ async def cascade(
async for msg in stream: async for msg in stream:
# respawn the compute task if the source
# array has been updated such that we compute
# new history from the (prepended) source.
new_len = len(src.array) new_len = len(src.array)
if new_len > last_len + 1: if new_len > last_len + 1:
# respawn the signal compute task if the source log.warning(f're-syncing fsp {func_name} to source')
# signal has been updated
log.warning(f'Re-spawning fsp {func_name}')
cs.cancel() cs.cancel()
cs, index = await n.start(fsp_target) cs, index = await n.start(fsp_target)
@ -269,8 +260,22 @@ async def cascade(
# read out last shm row, copy and write new row # read out last shm row, copy and write new row
array = dst.array array = dst.array
# TODO: some signals, like vlm should be reset to # TODO: some signals, like vlm should be reset to
# zero every step. # zero every step.
last = array[-1:].copy() last = array[-1:].copy()
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len
# compare again with source and make sure
# histories are index aligned.
diff = src.index - dst.index
if diff:
if abs(diff) < 10:
log.warning(
f'syncing fsp to source by offset: {diff}')
history = dst.array
dst.push(history[:-1], start=src._first.value)
else:
log.warning(
f'large offset {diff} re-spawn ongoing?..')