Move sync diffing helpers out of index loop

fsp_drunken_alignment
Tyler Goodlet 2021-10-05 12:27:53 -04:00
parent 53dedbd645
commit dd9f6e8a7c
1 changed files with 61 additions and 35 deletions

View File

@ -44,6 +44,11 @@ _fsp_builtins = {
'vwap': _tina_vwap,
}
# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)
@dataclass
class TaskTracker:
@ -54,10 +59,11 @@ class TaskTracker:
async def filter_quotes_by_sym(
sym: str,
quote_stream,
quote_stream: tractor.MsgStream,
) -> AsyncIterator[dict]:
'''Filter quote stream by target symbol.
'''
Filter quote stream by target symbol.
'''
# TODO: make this the actual first quote from feed
@ -187,8 +193,9 @@ async def cascade(
loglevel: Optional[str] = None,
) -> None:
'''Chain streaming signal processors and deliver output to
destination mem buf.
'''
Chain streaming signal processors and deliver output to
destination shm array buffer.
'''
profiler = pg.debug.Profiler(delayed=False, disabled=False)
@ -260,6 +267,46 @@ async def cascade(
await tracker.complete.wait()
return await n.start(fsp_target)
def is_synced(
src: ShmArray,
dst: ShmArray
) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP
output array is aligned to its source array.
'''
step_diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
return not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be
# leading/lagging by a step
step_diff > 1 or
step_diff < 0
), step_diff, len_diff
async def poll_and_sync_to_step(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff
s, step, ld = is_synced(src, dst)
if step or ld:
await tractor.breakpoint()
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream:
@ -269,41 +316,20 @@ async def cascade(
async for msg in stream:
def desynced(src: ShmArray, dst: ShmArray) -> bool:
diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
return (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be
# leading/lagging by a step
diff > 1 or
diff < 0
)
async def poll_and_sync_to_step(tracker):
while desynced(src, dst):
tracker, index = await resync(tracker)
# log.info(
# '\n'.join((
# f'history index after sync: {index}',
# f'diff after sync: {diff}',
# ))
# )
return tracker, diff
# respawn the compute task if the source
# array has been updated such that we compute
# new history from the (prepended) source.
if desynced(src, dst):
tracker, diff = await poll_and_sync_to_step(tracker)
synced, step_diff, _ = is_synced(src, dst)
if not synced:
tracker, step_diff = await poll_and_sync_to_step(
tracker,
src,
dst,
)
# skip adding a last bar since we should be
# source alinged
if diff == 0:
# skip adding a last bar since we should already
# be step alinged
if step_diff == 0:
continue
# read out last shm row, copy and write new row