Unify backfilling logic into common task-routine
parent
46c23e90db
commit
727d3cc027
|
@ -32,7 +32,6 @@ from typing import (
|
||||||
Awaitable,
|
Awaitable,
|
||||||
)
|
)
|
||||||
|
|
||||||
import pendulum
|
|
||||||
import trio
|
import trio
|
||||||
from trio.abc import ReceiveChannel
|
from trio.abc import ReceiveChannel
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -194,21 +193,98 @@ async def _setup_persistent_brokerd(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
def diff_history(
|
||||||
|
array,
|
||||||
|
start_dt,
|
||||||
|
end_dt,
|
||||||
|
last_tsdb_dt: Optional[datetime] = None
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
if last_tsdb_dt:
|
||||||
|
s_diff = (last_tsdb_dt - start_dt).seconds
|
||||||
|
|
||||||
|
# if we detect a partial frame's worth of data
|
||||||
|
# that is new, slice out only that history and
|
||||||
|
# write to shm.
|
||||||
|
if s_diff > 0:
|
||||||
|
assert last_tsdb_dt > start_dt
|
||||||
|
selected = array['time'] > last_tsdb_dt.timestamp()
|
||||||
|
to_push = array[selected]
|
||||||
|
log.info(
|
||||||
|
f'Pushing partial frame {to_push.size} to shm'
|
||||||
|
)
|
||||||
|
return to_push
|
||||||
|
|
||||||
|
return array
|
||||||
|
|
||||||
|
|
||||||
async def start_backfill(
|
async def start_backfill(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
fqsn: str,
|
bfqsn: str,
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
|
|
||||||
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
|
do_legacy: bool = False,
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
|
|
||||||
|
if do_legacy:
|
||||||
return await mod.backfill_bars(
|
return await mod.backfill_bars(
|
||||||
fqsn,
|
bfqsn,
|
||||||
shm,
|
shm,
|
||||||
task_status=task_status,
|
task_status=task_status,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async with mod.open_history_client(bfqsn) as hist:
|
||||||
|
|
||||||
|
# get latest query's worth of history all the way
|
||||||
|
# back to what is recorded in the tsdb
|
||||||
|
array, start_dt, end_dt = await hist(end_dt=None)
|
||||||
|
|
||||||
|
to_push = diff_history(
|
||||||
|
array,
|
||||||
|
start_dt,
|
||||||
|
end_dt,
|
||||||
|
last_tsdb_dt=last_tsdb_dt,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f'Pushing {to_push.size} to shm!')
|
||||||
|
shm.push(to_push)
|
||||||
|
|
||||||
|
for delay_s in sampler.subscribers:
|
||||||
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
# let caller unblock and deliver latest history frame
|
||||||
|
task_status.started(shm)
|
||||||
|
|
||||||
|
# pull new history frames until we hit latest
|
||||||
|
# already in the tsdb
|
||||||
|
# while start_dt > last_tsdb_dt:
|
||||||
|
while True:
|
||||||
|
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||||
|
to_push = diff_history(
|
||||||
|
array,
|
||||||
|
start_dt,
|
||||||
|
end_dt,
|
||||||
|
|
||||||
|
# last_tsdb_dt=last_tsdb_dt,
|
||||||
|
# XXX: hacky, just run indefinitely
|
||||||
|
last_tsdb_dt=None,
|
||||||
|
)
|
||||||
|
log.info(f'Pushing {to_push.size} to shm!')
|
||||||
|
|
||||||
|
# bail on shm allocation overrun
|
||||||
|
try:
|
||||||
|
shm.push(to_push, prepend=True)
|
||||||
|
except ValueError:
|
||||||
|
break
|
||||||
|
|
||||||
|
for delay_s in sampler.subscribers:
|
||||||
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
|
@ -251,108 +327,42 @@ async def manage_history(
|
||||||
# for now only do backfilling if no tsdb can be found
|
# for now only do backfilling if no tsdb can be found
|
||||||
do_legacy_backfill = not is_up and opened
|
do_legacy_backfill = not is_up and opened
|
||||||
|
|
||||||
open_history_client = getattr(mod, 'open_history_client', None)
|
|
||||||
|
|
||||||
bfqsn = fqsn.replace('.' + mod.name, '')
|
bfqsn = fqsn.replace('.' + mod.name, '')
|
||||||
|
open_history_client = getattr(mod, 'open_history_client', None)
|
||||||
|
|
||||||
if is_up and opened and open_history_client:
|
if is_up and opened and open_history_client:
|
||||||
|
|
||||||
log.info('Found existing `marketstored`')
|
log.info('Found existing `marketstored`')
|
||||||
from . import marketstore
|
from . import marketstore
|
||||||
|
|
||||||
async with marketstore.open_storage_client(
|
async with marketstore.open_storage_client(
|
||||||
fqsn,
|
fqsn,
|
||||||
) as storage:
|
) as storage:
|
||||||
|
|
||||||
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
|
||||||
|
|
||||||
if not tsdb_arrays:
|
|
||||||
do_legacy_backfill = True
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
|
||||||
|
|
||||||
fastest = list(tsdb_arrays.values())[0]
|
|
||||||
times = fastest['Epoch']
|
|
||||||
first, last = times[0], times[-1]
|
|
||||||
first_tsdb_dt, last_tsdb_dt = map(
|
|
||||||
pendulum.from_timestamp, [first, last]
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: this should be used verbatim for the pure
|
# TODO: this should be used verbatim for the pure
|
||||||
# shm backfiller approach below.
|
# shm backfiller approach below.
|
||||||
|
|
||||||
def diff_history(
|
|
||||||
array,
|
|
||||||
start_dt,
|
|
||||||
end_dt,
|
|
||||||
last_tsdb_dt: Optional[datetime] = None
|
|
||||||
|
|
||||||
) -> np.ndarray:
|
|
||||||
|
|
||||||
if last_tsdb_dt:
|
|
||||||
s_diff = (last_tsdb_dt - start_dt).seconds
|
|
||||||
|
|
||||||
# if we detect a partial frame's worth of data
|
|
||||||
# that is new, slice out only that history and
|
|
||||||
# write to shm.
|
|
||||||
if s_diff > 0:
|
|
||||||
assert last_tsdb_dt > start_dt
|
|
||||||
selected = array['time'] > last_tsdb_dt.timestamp()
|
|
||||||
to_push = array[selected]
|
|
||||||
log.info(
|
|
||||||
f'Pushing partial frame {to_push.size} to shm'
|
|
||||||
)
|
|
||||||
return to_push
|
|
||||||
|
|
||||||
return array
|
|
||||||
|
|
||||||
# start history anal and load missing new data via backend.
|
# start history anal and load missing new data via backend.
|
||||||
|
series, first_dt, last_dt = await storage.load(fqsn)
|
||||||
|
|
||||||
broker, symbol, expiry = unpack_fqsn(fqsn)
|
broker, symbol, expiry = unpack_fqsn(fqsn)
|
||||||
|
await bus.nursery.start(
|
||||||
async with open_history_client(bfqsn) as hist:
|
partial(
|
||||||
|
start_backfill,
|
||||||
# get latest query's worth of history all the way
|
mod,
|
||||||
# back to what is recorded in the tsdb
|
bfqsn,
|
||||||
array, start_dt, end_dt = await hist(end_dt=None)
|
shm,
|
||||||
to_push = diff_history(
|
last_tsdb_dt=last_dt,
|
||||||
array,
|
)
|
||||||
start_dt,
|
|
||||||
end_dt,
|
|
||||||
last_tsdb_dt=last_tsdb_dt,
|
|
||||||
)
|
)
|
||||||
log.info(f'Pushing {to_push.size} to shm!')
|
|
||||||
shm.push(to_push)
|
|
||||||
|
|
||||||
for delay_s in sampler.subscribers:
|
|
||||||
await broadcast(delay_s)
|
|
||||||
|
|
||||||
# let caller unblock and deliver latest history frame
|
|
||||||
task_status.started(shm)
|
task_status.started(shm)
|
||||||
some_data_ready.set()
|
some_data_ready.set()
|
||||||
|
|
||||||
# pull new history frames until we hit latest
|
|
||||||
# already in the tsdb
|
|
||||||
# while start_dt > last_tsdb_dt:
|
|
||||||
while True:
|
|
||||||
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
|
||||||
to_push = diff_history(
|
|
||||||
array,
|
|
||||||
start_dt,
|
|
||||||
end_dt,
|
|
||||||
# last_tsdb_dt=last_tsdb_dt,
|
|
||||||
# just run indefinitely
|
|
||||||
last_tsdb_dt=None,
|
|
||||||
)
|
|
||||||
log.info(f'Pushing {to_push.size} to shm!')
|
|
||||||
shm.push(to_push, prepend=True)
|
|
||||||
for delay_s in sampler.subscribers:
|
|
||||||
await broadcast(delay_s)
|
|
||||||
|
|
||||||
# TODO: see if there's faster multi-field reads:
|
# TODO: see if there's faster multi-field reads:
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
# re-index with a `time` and index field
|
# re-index with a `time` and index field
|
||||||
|
history = list(series.values())
|
||||||
|
if history:
|
||||||
|
fastest = history[0]
|
||||||
shm.push(
|
shm.push(
|
||||||
fastest[-shm._first.value:],
|
fastest[-shm._first.value:],
|
||||||
|
|
||||||
|
@ -369,9 +379,7 @@ async def manage_history(
|
||||||
'Volume': 'volume',
|
'Volume': 'volume',
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
# TODO: write new data to tsdb to be ready to for next read.
|
||||||
# TODO: write new data to tsdb to be ready to for next
|
|
||||||
# read.
|
|
||||||
|
|
||||||
if do_legacy_backfill:
|
if do_legacy_backfill:
|
||||||
# do a legacy incremental backfill from the provider.
|
# do a legacy incremental backfill from the provider.
|
||||||
|
@ -385,6 +393,7 @@ async def manage_history(
|
||||||
mod,
|
mod,
|
||||||
bfqsn,
|
bfqsn,
|
||||||
shm,
|
shm,
|
||||||
|
do_legacy=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# yield back after client connect with filled shm
|
# yield back after client connect with filled shm
|
||||||
|
|
Loading…
Reference in New Issue