Drop legacy back-filling logic
Use the new `open_history_client()` endpoint/API and expect backends to provide a history "getter" routine that can be called to load historical data into shm even when **not** using a tsdb. Add logic for filling in data from the tsdb once the backend has provided data up to the last recorded in the db. Add logic for avoiding overruns of the shm buffer with more-then-necessary queries of tsdb data.m4_corrections
parent
66c20b80a5
commit
e4158dce01
|
@ -203,20 +203,25 @@ def diff_history(
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
|
|
||||||
if last_tsdb_dt:
|
if last_tsdb_dt:
|
||||||
s_diff = (last_tsdb_dt - start_dt).seconds
|
s_diff = (start_dt - last_tsdb_dt).seconds
|
||||||
|
|
||||||
|
to_push = array[:s_diff]
|
||||||
|
|
||||||
# if we detect a partial frame's worth of data
|
# if we detect a partial frame's worth of data
|
||||||
# that is new, slice out only that history and
|
# that is new, slice out only that history and
|
||||||
# write to shm.
|
# write to shm.
|
||||||
if s_diff > 0:
|
if abs(s_diff) < len(array):
|
||||||
assert last_tsdb_dt > start_dt
|
|
||||||
selected = array['time'] > last_tsdb_dt.timestamp()
|
|
||||||
to_push = array[selected]
|
|
||||||
log.info(
|
log.info(
|
||||||
f'Pushing partial frame {to_push.size} to shm'
|
f'Pushing partial frame {to_push.size} to shm'
|
||||||
)
|
)
|
||||||
|
# assert last_tsdb_dt > start_dt
|
||||||
|
# selected = array['time'] > last_tsdb_dt.timestamp()
|
||||||
|
# to_push = array[selected]
|
||||||
|
# return to_push
|
||||||
|
|
||||||
return to_push
|
return to_push
|
||||||
|
|
||||||
|
else:
|
||||||
return array
|
return array
|
||||||
|
|
||||||
|
|
||||||
|
@ -226,19 +231,11 @@ async def start_backfill(
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
|
|
||||||
last_tsdb_dt: Optional[datetime] = None,
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
# do_legacy: bool = False,
|
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
|
|
||||||
# if do_legacy:
|
|
||||||
# return await mod.backfill_bars(
|
|
||||||
# bfqsn,
|
|
||||||
# shm,
|
|
||||||
# task_status=task_status,
|
|
||||||
# )
|
|
||||||
|
|
||||||
async with mod.open_history_client(bfqsn) as hist:
|
async with mod.open_history_client(bfqsn) as hist:
|
||||||
|
|
||||||
# get latest query's worth of history all the way
|
# get latest query's worth of history all the way
|
||||||
|
@ -258,23 +255,23 @@ async def start_backfill(
|
||||||
for delay_s in sampler.subscribers:
|
for delay_s in sampler.subscribers:
|
||||||
await broadcast(delay_s)
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
bf_done = trio.Event()
|
||||||
# let caller unblock and deliver latest history frame
|
# let caller unblock and deliver latest history frame
|
||||||
task_status.started(shm)
|
task_status.started((shm, start_dt, end_dt, bf_done))
|
||||||
|
|
||||||
if last_tsdb_dt is None:
|
if last_tsdb_dt is None:
|
||||||
# maybe a better default (they don't seem to define epoch?!)
|
# maybe a better default (they don't seem to define epoch?!)
|
||||||
last_tsdb_dt = pendulum.now().subtract(days=1)
|
last_tsdb_dt = pendulum.now().subtract(days=1)
|
||||||
|
|
||||||
|
|
||||||
# pull new history frames until we hit latest
|
# pull new history frames until we hit latest
|
||||||
# already in the tsdb or a max count.
|
# already in the tsdb or a max count.
|
||||||
mx_fills = 16
|
# mx_fills = 16
|
||||||
count = 0
|
count = 0
|
||||||
|
# while True:
|
||||||
while (
|
while (
|
||||||
start_dt > last_tsdb_dt
|
end_dt > last_tsdb_dt
|
||||||
# and count < mx_fills
|
# and count < mx_fills
|
||||||
):
|
):
|
||||||
# while True:
|
|
||||||
count += 1
|
count += 1
|
||||||
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
|
@ -282,22 +279,31 @@ async def start_backfill(
|
||||||
start_dt,
|
start_dt,
|
||||||
end_dt,
|
end_dt,
|
||||||
|
|
||||||
# last_tsdb_dt=last_tsdb_dt,
|
last_tsdb_dt=last_tsdb_dt,
|
||||||
# XXX: hacky, just run indefinitely
|
# XXX: hacky, just run indefinitely
|
||||||
last_tsdb_dt=None,
|
# last_tsdb_dt=None,
|
||||||
)
|
)
|
||||||
print("fPULLING {count}")
|
print(f"PULLING {count}")
|
||||||
log.info(f'Pushing {to_push.size} to shm!')
|
log.info(f'Pushing {to_push.size} to shm!')
|
||||||
|
|
||||||
|
if to_push.size < 1:
|
||||||
|
break
|
||||||
|
|
||||||
# bail on shm allocation overrun
|
# bail on shm allocation overrun
|
||||||
try:
|
try:
|
||||||
shm.push(to_push, prepend=True)
|
shm.push(to_push, prepend=True)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
await tractor.breakpoint()
|
||||||
break
|
break
|
||||||
|
|
||||||
for delay_s in sampler.subscribers:
|
for delay_s in sampler.subscribers:
|
||||||
await broadcast(delay_s)
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
bf_done.set()
|
||||||
|
# update start index to include all tsdb history
|
||||||
|
# that was pushed in the caller parent task.
|
||||||
|
# shm._first.value = 0
|
||||||
|
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
|
@ -358,7 +364,12 @@ async def manage_history(
|
||||||
series, first_dt, last_dt = await storage.load(fqsn)
|
series, first_dt, last_dt = await storage.load(fqsn)
|
||||||
|
|
||||||
broker, symbol, expiry = unpack_fqsn(fqsn)
|
broker, symbol, expiry = unpack_fqsn(fqsn)
|
||||||
await bus.nursery.start(
|
(
|
||||||
|
shm,
|
||||||
|
latest_start_dt,
|
||||||
|
latest_end_dt,
|
||||||
|
bf_done,
|
||||||
|
) = await bus.nursery.start(
|
||||||
partial(
|
partial(
|
||||||
start_backfill,
|
start_backfill,
|
||||||
mod,
|
mod,
|
||||||
|
@ -370,19 +381,37 @@ async def manage_history(
|
||||||
task_status.started(shm)
|
task_status.started(shm)
|
||||||
some_data_ready.set()
|
some_data_ready.set()
|
||||||
|
|
||||||
|
await bf_done.wait()
|
||||||
|
# do diff against last start frame of history and only fill
|
||||||
|
# in from the tsdb an allotment that allows for most recent
|
||||||
|
# to be loaded into mem *before* tsdb data.
|
||||||
|
if last_dt:
|
||||||
|
dt_diff_s = (latest_start_dt - last_dt).seconds
|
||||||
|
else:
|
||||||
|
dt_diff_s = 0
|
||||||
|
|
||||||
|
# await trio.sleep_forever()
|
||||||
# TODO: see if there's faster multi-field reads:
|
# TODO: see if there's faster multi-field reads:
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
# re-index with a `time` and index field
|
# re-index with a `time` and index field
|
||||||
|
prepend_start = shm._first.value
|
||||||
|
|
||||||
|
# sanity check on most-recent-data loading
|
||||||
|
assert prepend_start > dt_diff_s
|
||||||
|
|
||||||
history = list(series.values())
|
history = list(series.values())
|
||||||
if history:
|
if history:
|
||||||
fastest = history[0]
|
fastest = history[0]
|
||||||
|
to_push = fastest[:prepend_start]
|
||||||
|
|
||||||
shm.push(
|
shm.push(
|
||||||
fastest[-shm._first.value:],
|
to_push,
|
||||||
|
|
||||||
# insert the history pre a "days worth" of samples
|
# insert the history pre a "days worth" of samples
|
||||||
# to leave some real-time buffer space at the end.
|
# to leave some real-time buffer space at the end.
|
||||||
prepend=True,
|
prepend=True,
|
||||||
# start=shm._len - _secs_in_day,
|
# update_first=False,
|
||||||
|
# start=prepend_start,
|
||||||
field_map={
|
field_map={
|
||||||
'Epoch': 'time',
|
'Epoch': 'time',
|
||||||
'Open': 'open',
|
'Open': 'open',
|
||||||
|
@ -392,6 +421,49 @@ async def manage_history(
|
||||||
'Volume': 'volume',
|
'Volume': 'volume',
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# load as much from storage into shm as spacec will
|
||||||
|
# allow according to user's shm size settings.
|
||||||
|
count = 0
|
||||||
|
end = fastest['Epoch'][0]
|
||||||
|
|
||||||
|
while shm._first.value > 0:
|
||||||
|
count += 1
|
||||||
|
series = await storage.read_ohlcv(
|
||||||
|
fqsn,
|
||||||
|
end=end,
|
||||||
|
)
|
||||||
|
history = list(series.values())
|
||||||
|
fastest = history[0]
|
||||||
|
end = fastest['Epoch'][0]
|
||||||
|
prepend_start -= len(to_push)
|
||||||
|
to_push = fastest[:prepend_start]
|
||||||
|
|
||||||
|
shm.push(
|
||||||
|
to_push,
|
||||||
|
|
||||||
|
# insert the history pre a "days worth" of samples
|
||||||
|
# to leave some real-time buffer space at the end.
|
||||||
|
prepend=True,
|
||||||
|
# update_first=False,
|
||||||
|
# start=prepend_start,
|
||||||
|
field_map={
|
||||||
|
'Epoch': 'time',
|
||||||
|
'Open': 'open',
|
||||||
|
'High': 'high',
|
||||||
|
'Low': 'low',
|
||||||
|
'Close': 'close',
|
||||||
|
'Volume': 'volume',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
for delay_s in sampler.subscribers:
|
||||||
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
if count > 6:
|
||||||
|
break
|
||||||
|
|
||||||
|
log.info(f'Loaded {to_push.shape} datums from storage')
|
||||||
|
|
||||||
# TODO: write new data to tsdb to be ready to for next read.
|
# TODO: write new data to tsdb to be ready to for next read.
|
||||||
|
|
||||||
if do_legacy_backfill:
|
if do_legacy_backfill:
|
||||||
|
@ -407,7 +479,6 @@ async def manage_history(
|
||||||
mod,
|
mod,
|
||||||
bfqsn,
|
bfqsn,
|
||||||
shm,
|
shm,
|
||||||
# do_legacy=True,
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue