Move sync diffing helpers out of index loop
parent
9cd63ffc99
commit
80fa76e8a9
|
@ -45,6 +45,11 @@ _fsp_builtins = {
|
||||||
'dolla_vlm': dolla_vlm,
|
'dolla_vlm': dolla_vlm,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# TODO: things to figure the heck out:
|
||||||
|
# - how to handle non-plottable values (pyqtgraph has facility for this
|
||||||
|
# now in `arrayToQPath()`)
|
||||||
|
# - composition of fsps / implicit chaining syntax (we need an issue)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TaskTracker:
|
class TaskTracker:
|
||||||
|
@ -55,10 +60,11 @@ class TaskTracker:
|
||||||
async def filter_quotes_by_sym(
|
async def filter_quotes_by_sym(
|
||||||
|
|
||||||
sym: str,
|
sym: str,
|
||||||
quote_stream,
|
quote_stream: tractor.MsgStream,
|
||||||
|
|
||||||
) -> AsyncIterator[dict]:
|
) -> AsyncIterator[dict]:
|
||||||
'''Filter quote stream by target symbol.
|
'''
|
||||||
|
Filter quote stream by target symbol.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: make this the actual first quote from feed
|
# TODO: make this the actual first quote from feed
|
||||||
|
@ -188,8 +194,9 @@ async def cascade(
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''Chain streaming signal processors and deliver output to
|
'''
|
||||||
destination mem buf.
|
Chain streaming signal processors and deliver output to
|
||||||
|
destination shm array buffer.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
profiler = pg.debug.Profiler(delayed=False, disabled=False)
|
profiler = pg.debug.Profiler(delayed=False, disabled=False)
|
||||||
|
@ -261,6 +268,46 @@ async def cascade(
|
||||||
await tracker.complete.wait()
|
await tracker.complete.wait()
|
||||||
return await n.start(fsp_target)
|
return await n.start(fsp_target)
|
||||||
|
|
||||||
|
def is_synced(
|
||||||
|
src: ShmArray,
|
||||||
|
dst: ShmArray
|
||||||
|
) -> tuple[bool, int, int]:
|
||||||
|
'''Predicate to dertmine if a destination FSP
|
||||||
|
output array is aligned to its source array.
|
||||||
|
|
||||||
|
'''
|
||||||
|
step_diff = src.index - dst.index
|
||||||
|
len_diff = abs(len(src.array) - len(dst.array))
|
||||||
|
return not (
|
||||||
|
# the source is likely backfilling and we must
|
||||||
|
# sync history calculations
|
||||||
|
len_diff > 2 or
|
||||||
|
|
||||||
|
# we aren't step synced to the source and may be
|
||||||
|
# leading/lagging by a step
|
||||||
|
step_diff > 1 or
|
||||||
|
step_diff < 0
|
||||||
|
), step_diff, len_diff
|
||||||
|
|
||||||
|
async def poll_and_sync_to_step(
|
||||||
|
|
||||||
|
tracker: TaskTracker,
|
||||||
|
src: ShmArray,
|
||||||
|
dst: ShmArray,
|
||||||
|
|
||||||
|
) -> tuple[TaskTracker, int]:
|
||||||
|
|
||||||
|
synced, step_diff, _ = is_synced(src, dst)
|
||||||
|
while not synced:
|
||||||
|
tracker, index = await resync(tracker)
|
||||||
|
synced, step_diff, _ = is_synced(src, dst)
|
||||||
|
|
||||||
|
return tracker, step_diff
|
||||||
|
|
||||||
|
s, step, ld = is_synced(src, dst)
|
||||||
|
if step or ld:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
# Increment the underlying shared memory buffer on every
|
# Increment the underlying shared memory buffer on every
|
||||||
# "increment" msg received from the underlying data feed.
|
# "increment" msg received from the underlying data feed.
|
||||||
async with feed.index_stream() as stream:
|
async with feed.index_stream() as stream:
|
||||||
|
@ -270,41 +317,20 @@ async def cascade(
|
||||||
|
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
|
||||||
def desynced(src: ShmArray, dst: ShmArray) -> bool:
|
|
||||||
diff = src.index - dst.index
|
|
||||||
len_diff = abs(len(src.array) - len(dst.array))
|
|
||||||
return (
|
|
||||||
# the source is likely backfilling and we must
|
|
||||||
# sync history calculations
|
|
||||||
len_diff > 2 or
|
|
||||||
|
|
||||||
# we aren't step synced to the source and may be
|
|
||||||
# leading/lagging by a step
|
|
||||||
diff > 1 or
|
|
||||||
diff < 0
|
|
||||||
)
|
|
||||||
|
|
||||||
async def poll_and_sync_to_step(tracker):
|
|
||||||
while desynced(src, dst):
|
|
||||||
tracker, index = await resync(tracker)
|
|
||||||
# log.info(
|
|
||||||
# '\n'.join((
|
|
||||||
# f'history index after sync: {index}',
|
|
||||||
# f'diff after sync: {diff}',
|
|
||||||
# ))
|
|
||||||
# )
|
|
||||||
|
|
||||||
return tracker, diff
|
|
||||||
|
|
||||||
# respawn the compute task if the source
|
# respawn the compute task if the source
|
||||||
# array has been updated such that we compute
|
# array has been updated such that we compute
|
||||||
# new history from the (prepended) source.
|
# new history from the (prepended) source.
|
||||||
if desynced(src, dst):
|
synced, step_diff, _ = is_synced(src, dst)
|
||||||
tracker, diff = await poll_and_sync_to_step(tracker)
|
if not synced:
|
||||||
|
tracker, step_diff = await poll_and_sync_to_step(
|
||||||
|
tracker,
|
||||||
|
src,
|
||||||
|
dst,
|
||||||
|
)
|
||||||
|
|
||||||
# skip adding a last bar since we should be
|
# skip adding a last bar since we should already
|
||||||
# source alinged
|
# be step alinged
|
||||||
if diff == 0:
|
if step_diff == 0:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# read out last shm row, copy and write new row
|
# read out last shm row, copy and write new row
|
||||||
|
|
Loading…
Reference in New Issue