Fix the drunk fix

This should finally be correct fsp src-to-dst array syncing now..
There's a few edge cases but mostly we need to be sure we sync both
back-filled history diffs and avoid current step lag/leads. Use
a polling routine and the more stringent task re-spawn system to get
this right.
vlm_plotz_backup
Tyler Goodlet 2021-10-04 16:34:54 -04:00
parent 3aeb6e03f1
commit b3ed09249a
2 changed files with 63 additions and 33 deletions

View File

@ -47,7 +47,7 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}

View File

@ -149,24 +149,27 @@ async def fsp_compute(
# import time
# last = time.time()
# rt stream
async for processed in out_stream:
try:
# rt stream
async for processed in out_stream:
log.debug(f"{func_name}: {processed}")
index = src.index
dst.array[-1][func_name] = processed
log.debug(f"{func_name}: {processed}")
index = src.index
dst.array[-1][func_name] = processed
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm
if attach_stream:
await stream.send(index)
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm
if attach_stream:
await stream.send(index)
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
finally:
tracker.complete.set()
@tractor.context
@ -218,7 +221,7 @@ async def cascade(
profiler(f'{func_name}: feed up')
assert src.token == feed.shm.token
last_len = new_len = len(src.array)
# last_len = new_len = len(src.array)
async with (
ctx.open_stream() as stream,
@ -250,9 +253,16 @@ async def cascade(
await ctx.started(index)
profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream:
profiler(f'{func_name}: sample stream up')
@ -264,23 +274,44 @@ async def cascade(
# array has been updated such that we compute
# new history from the (prepended) source.
diff = src.index - dst.index
new_len = len(src.array)
# XXX: ok no idea why this works but "drunk fix"
# says it don't matter.
# new_len = len(src.array)
async def poll_and_sync_to_step(tracker):
diff = src.index - dst.index
while True:
if diff in (0, 1):
break
tracker, index = await resync(tracker)
diff = src.index - dst.index
# log.info(
# '\n'.join((
# f'history index after sync: {index}',
# f'diff after sync: {diff}',
# ))
# )
return tracker, diff
# log.debug(f'diff {diff}')
if (
new_len > last_len + 1 or
abs(diff) > 1
):
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
# the source is likely backfilling and we must
# sync history calculations
abs(len(src.array) - len(dst.array)) > 0 or
# skip adding a new bar since we should be fully aligned.
continue
# we aren't step synced to the source and may be
# leading/lagging by a step
diff > 1 or
diff < 0
):
tracker, diff = await poll_and_sync_to_step(tracker)
# skip adding a last bar since we should be
# source alinged
if diff == 0:
continue
# read out last shm row, copy and write new row
array = dst.array
@ -293,4 +324,3 @@ async def cascade(
last = array[-1:].copy()
dst.push(last)
last_len = new_len