Drop legacy back-filling logic

Use the new `open_history_client()` endpoint/API and expect backends to
provide a history "getter" routine that can be called to load historical
data into shm even when **not** using a tsdb. Add logic for filling in
data from the tsdb once the backend has provided data up to the last
recorded in the db. Add logic for avoiding overruns of the shm buffer
with more-then-necessary queries of tsdb data.
incr_update_backup
Tyler Goodlet 2022-04-27 17:13:15 -04:00
parent 48cce42c77
commit 8e11d79712
1 changed files with 98 additions and 27 deletions

View File

@ -203,20 +203,25 @@ def diff_history(
) -> np.ndarray: ) -> np.ndarray:
if last_tsdb_dt: if last_tsdb_dt:
s_diff = (last_tsdb_dt - start_dt).seconds s_diff = (start_dt - last_tsdb_dt).seconds
to_push = array[:s_diff]
# if we detect a partial frame's worth of data # if we detect a partial frame's worth of data
# that is new, slice out only that history and # that is new, slice out only that history and
# write to shm. # write to shm.
if s_diff > 0: if abs(s_diff) < len(array):
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info( log.info(
f'Pushing partial frame {to_push.size} to shm' f'Pushing partial frame {to_push.size} to shm'
) )
# assert last_tsdb_dt > start_dt
# selected = array['time'] > last_tsdb_dt.timestamp()
# to_push = array[selected]
# return to_push
return to_push return to_push
else:
return array return array
@ -226,19 +231,11 @@ async def start_backfill(
shm: ShmArray, shm: ShmArray,
last_tsdb_dt: Optional[datetime] = None, last_tsdb_dt: Optional[datetime] = None,
# do_legacy: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> int: ) -> int:
# if do_legacy:
# return await mod.backfill_bars(
# bfqsn,
# shm,
# task_status=task_status,
# )
async with mod.open_history_client(bfqsn) as hist: async with mod.open_history_client(bfqsn) as hist:
# get latest query's worth of history all the way # get latest query's worth of history all the way
@ -258,23 +255,23 @@ async def start_backfill(
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)
bf_done = trio.Event()
# let caller unblock and deliver latest history frame # let caller unblock and deliver latest history frame
task_status.started(shm) task_status.started((shm, start_dt, end_dt, bf_done))
if last_tsdb_dt is None: if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!) # maybe a better default (they don't seem to define epoch?!)
last_tsdb_dt = pendulum.now().subtract(days=1) last_tsdb_dt = pendulum.now().subtract(days=1)
# pull new history frames until we hit latest # pull new history frames until we hit latest
# already in the tsdb or a max count. # already in the tsdb or a max count.
mx_fills = 16 # mx_fills = 16
count = 0 count = 0
# while True:
while ( while (
start_dt > last_tsdb_dt end_dt > last_tsdb_dt
# and count < mx_fills # and count < mx_fills
): ):
# while True:
count += 1 count += 1
array, start_dt, end_dt = await hist(end_dt=start_dt) array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history( to_push = diff_history(
@ -282,22 +279,31 @@ async def start_backfill(
start_dt, start_dt,
end_dt, end_dt,
# last_tsdb_dt=last_tsdb_dt, last_tsdb_dt=last_tsdb_dt,
# XXX: hacky, just run indefinitely # XXX: hacky, just run indefinitely
last_tsdb_dt=None, # last_tsdb_dt=None,
) )
print("fPULLING {count}") print(f"PULLING {count}")
log.info(f'Pushing {to_push.size} to shm!') log.info(f'Pushing {to_push.size} to shm!')
if to_push.size < 1:
break
# bail on shm allocation overrun # bail on shm allocation overrun
try: try:
shm.push(to_push, prepend=True) shm.push(to_push, prepend=True)
except ValueError: except ValueError:
await tractor.breakpoint()
break break
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)
bf_done.set()
# update start index to include all tsdb history
# that was pushed in the caller parent task.
# shm._first.value = 0
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
@ -358,7 +364,12 @@ async def manage_history(
series, first_dt, last_dt = await storage.load(fqsn) series, first_dt, last_dt = await storage.load(fqsn)
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
await bus.nursery.start( (
shm,
latest_start_dt,
latest_end_dt,
bf_done,
) = await bus.nursery.start(
partial( partial(
start_backfill, start_backfill,
mod, mod,
@ -370,19 +381,37 @@ async def manage_history(
task_status.started(shm) task_status.started(shm)
some_data_ready.set() some_data_ready.set()
await bf_done.wait()
# do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data.
if last_dt:
dt_diff_s = (latest_start_dt - last_dt).seconds
else:
dt_diff_s = 0
# await trio.sleep_forever()
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
prepend_start = shm._first.value
# sanity check on most-recent-data loading
assert prepend_start > dt_diff_s
history = list(series.values()) history = list(series.values())
if history: if history:
fastest = history[0] fastest = history[0]
to_push = fastest[:prepend_start]
shm.push( shm.push(
fastest[-shm._first.value:], to_push,
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end. # to leave some real-time buffer space at the end.
prepend=True, prepend=True,
# start=shm._len - _secs_in_day, # update_first=False,
# start=prepend_start,
field_map={ field_map={
'Epoch': 'time', 'Epoch': 'time',
'Open': 'open', 'Open': 'open',
@ -392,6 +421,49 @@ async def manage_history(
'Volume': 'volume', 'Volume': 'volume',
}, },
) )
# load as much from storage into shm as spacec will
# allow according to user's shm size settings.
count = 0
end = fastest['Epoch'][0]
while shm._first.value > 0:
count += 1
series = await storage.read_ohlcv(
fqsn,
end=end,
)
history = list(series.values())
fastest = history[0]
end = fastest['Epoch'][0]
prepend_start -= len(to_push)
to_push = fastest[:prepend_start]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
# start=prepend_start,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
if count > 6:
break
log.info(f'Loaded {to_push.shape} datums from storage')
# TODO: write new data to tsdb to be ready to for next read. # TODO: write new data to tsdb to be ready to for next read.
if do_legacy_backfill: if do_legacy_backfill:
@ -407,7 +479,6 @@ async def manage_history(
mod, mod,
bfqsn, bfqsn,
shm, shm,
# do_legacy=True,
) )
) )