Sync history recalcs to diff checks via a "task tracker"
parent
f68671b614
commit
086aaf1d16
|
@ -18,6 +18,7 @@
|
|||
core task logic for processing chains
|
||||
|
||||
'''
|
||||
from dataclasses import dataclass
|
||||
from functools import partial
|
||||
from typing import AsyncIterator, Callable, Optional
|
||||
|
||||
|
@ -44,6 +45,12 @@ _fsp_builtins = {
|
|||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TaskTracker:
|
||||
complete: trio.Event
|
||||
cs: trio.CancelScope
|
||||
|
||||
|
||||
async def filter_quotes_by_sym(
|
||||
|
||||
sym: str,
|
||||
|
@ -134,7 +141,8 @@ async def fsp_compute(
|
|||
|
||||
# setup a respawn handle
|
||||
with trio.CancelScope() as cs:
|
||||
task_status.started((cs, index))
|
||||
tracker = TaskTracker(trio.Event(), cs)
|
||||
task_status.started((tracker, index))
|
||||
profiler(f'{func_name} yield last index')
|
||||
|
||||
# import time
|
||||
|
@ -232,7 +240,7 @@ async def cascade(
|
|||
func=func
|
||||
)
|
||||
|
||||
cs, index = await n.start(fsp_target)
|
||||
tracker, index = await n.start(fsp_target)
|
||||
|
||||
if zero_on_step:
|
||||
last = dst.array[-1:]
|
||||
|
@ -263,17 +271,20 @@ async def cascade(
|
|||
new_len > last_len + 1 or
|
||||
abs(diff) > 1
|
||||
):
|
||||
log.warning(f're-syncing fsp {func_name} to source')
|
||||
cs.cancel()
|
||||
cs, index = await n.start(fsp_target)
|
||||
|
||||
# TODO: adopt an incremental update engine/approach
|
||||
# where possible here eventually!
|
||||
log.warning(f're-syncing fsp {func_name} to source')
|
||||
tracker.cs.cancel()
|
||||
await tracker.complete.wait()
|
||||
tracker, index = await n.start(fsp_target)
|
||||
|
||||
# skip adding a new bar since we should be fully aligned.
|
||||
continue
|
||||
|
||||
# read out last shm row, copy and write new row
|
||||
array = dst.array
|
||||
|
||||
# some metrics, like vlm should be reset
|
||||
# some metrics like vlm should be reset
|
||||
# to zero every step.
|
||||
if zero_on_step:
|
||||
last = zeroed
|
||||
|
|
Loading…
Reference in New Issue