Drop legacy backfilling, load a day's worth of data by default
parent
5294017891
commit
d39e1e9a46
|
@ -226,18 +226,18 @@ async def start_backfill(
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
|
|
||||||
last_tsdb_dt: Optional[datetime] = None,
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
do_legacy: bool = False,
|
# do_legacy: bool = False,
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
|
|
||||||
if do_legacy:
|
# if do_legacy:
|
||||||
return await mod.backfill_bars(
|
# return await mod.backfill_bars(
|
||||||
bfqsn,
|
# bfqsn,
|
||||||
shm,
|
# shm,
|
||||||
task_status=task_status,
|
# task_status=task_status,
|
||||||
)
|
# )
|
||||||
|
|
||||||
async with mod.open_history_client(bfqsn) as hist:
|
async with mod.open_history_client(bfqsn) as hist:
|
||||||
|
|
||||||
|
@ -263,16 +263,16 @@ async def start_backfill(
|
||||||
|
|
||||||
if last_tsdb_dt is None:
|
if last_tsdb_dt is None:
|
||||||
# maybe a better default (they don't seem to define epoch?!)
|
# maybe a better default (they don't seem to define epoch?!)
|
||||||
last_tsdb_dt = pendulum.yesterday()
|
last_tsdb_dt = pendulum.now().subtract(days=1)
|
||||||
|
|
||||||
|
|
||||||
# pull new history frames until we hit latest
|
# pull new history frames until we hit latest
|
||||||
# already in the tsdb
|
# already in the tsdb or a max count.
|
||||||
mx_fills = 16
|
mx_fills = 16
|
||||||
count = 0
|
count = 0
|
||||||
while (
|
while (
|
||||||
start_dt > last_tsdb_dt
|
start_dt > last_tsdb_dt
|
||||||
and count > mx_fills
|
# and count < mx_fills
|
||||||
):
|
):
|
||||||
# while True:
|
# while True:
|
||||||
count += 1
|
count += 1
|
||||||
|
@ -286,6 +286,7 @@ async def start_backfill(
|
||||||
# XXX: hacky, just run indefinitely
|
# XXX: hacky, just run indefinitely
|
||||||
last_tsdb_dt=None,
|
last_tsdb_dt=None,
|
||||||
)
|
)
|
||||||
|
print("fPULLING {count}")
|
||||||
log.info(f'Pushing {to_push.size} to shm!')
|
log.info(f'Pushing {to_push.size} to shm!')
|
||||||
|
|
||||||
# bail on shm allocation overrun
|
# bail on shm allocation overrun
|
||||||
|
|
Loading…
Reference in New Issue