Unify backfilling logic into common task-routine

m4_corrections
Tyler Goodlet 2022-04-17 15:13:07 -04:00
parent ba8f443bf9
commit 13c88a075d
1 changed files with 122 additions and 113 deletions

View File

@ -32,7 +32,6 @@ from typing import (
Awaitable, Awaitable,
) )
import pendulum
import trio import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -194,21 +193,98 @@ async def _setup_persistent_brokerd(
await trio.sleep_forever() await trio.sleep_forever()
def diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt: Optional[datetime] = None
) -> np.ndarray:
if last_tsdb_dt:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
return array
async def start_backfill( async def start_backfill(
mod: ModuleType, mod: ModuleType,
fqsn: str, bfqsn: str,
shm: ShmArray, shm: ShmArray,
last_tsdb_dt: Optional[datetime] = None,
do_legacy: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> int: ) -> int:
if do_legacy:
return await mod.backfill_bars( return await mod.backfill_bars(
fqsn, bfqsn,
shm, shm,
task_status=task_status, task_status=task_status,
) )
async with mod.open_history_client(bfqsn) as hist:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt=None)
to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# let caller unblock and deliver latest history frame
task_status.started(shm)
# pull new history frames until we hit latest
# already in the tsdb
# while start_dt > last_tsdb_dt:
while True:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(
array,
start_dt,
end_dt,
# last_tsdb_dt=last_tsdb_dt,
# XXX: hacky, just run indefinitely
last_tsdb_dt=None,
)
log.info(f'Pushing {to_push.size} to shm!')
# bail on shm allocation overrun
try:
shm.push(to_push, prepend=True)
except ValueError:
break
for delay_s in sampler.subscribers:
await broadcast(delay_s)
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
@ -251,108 +327,42 @@ async def manage_history(
# for now only do backfilling if no tsdb can be found # for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened do_legacy_backfill = not is_up and opened
open_history_client = getattr(mod, 'open_history_client', None)
bfqsn = fqsn.replace('.' + mod.name, '') bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client: if is_up and opened and open_history_client:
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
from . import marketstore from . import marketstore
async with marketstore.open_storage_client( async with marketstore.open_storage_client(
fqsn, fqsn,
) as storage: ) as storage:
tsdb_arrays = await storage.read_ohlcv(fqsn)
if not tsdb_arrays:
do_legacy_backfill = True
else:
log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
)
# TODO: this should be used verbatim for the pure # TODO: this should be used verbatim for the pure
# shm backfiller approach below. # shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt: Optional[datetime] = None
) -> np.ndarray:
if last_tsdb_dt:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
return array
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
series, first_dt, last_dt = await storage.load(fqsn)
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
await bus.nursery.start(
async with open_history_client(bfqsn) as hist: partial(
start_backfill,
# get latest query's worth of history all the way mod,
# back to what is recorded in the tsdb bfqsn,
array, start_dt, end_dt = await hist(end_dt=None) shm,
to_push = diff_history( last_tsdb_dt=last_dt,
array, )
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
) )
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# let caller unblock and deliver latest history frame
task_status.started(shm) task_status.started(shm)
some_data_ready.set() some_data_ready.set()
# pull new history frames until we hit latest
# already in the tsdb
# while start_dt > last_tsdb_dt:
while True:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(
array,
start_dt,
end_dt,
# last_tsdb_dt=last_tsdb_dt,
# just run indefinitely
last_tsdb_dt=None,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push, prepend=True)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
history = list(series.values())
if history:
fastest = history[0]
shm.push( shm.push(
fastest[-shm._first.value:], fastest[-shm._first.value:],
@ -369,9 +379,7 @@ async def manage_history(
'Volume': 'volume', 'Volume': 'volume',
}, },
) )
# TODO: write new data to tsdb to be ready to for next read.
# TODO: write new data to tsdb to be ready to for next
# read.
if do_legacy_backfill: if do_legacy_backfill:
# do a legacy incremental backfill from the provider. # do a legacy incremental backfill from the provider.
@ -385,6 +393,7 @@ async def manage_history(
mod, mod,
bfqsn, bfqsn,
shm, shm,
do_legacy=True,
) )
# yield back after client connect with filled shm # yield back after client connect with filled shm