`.data.history`: run `.tsp.dedupe()` in backloader

In an effort to catch out-of-order and/or partial-frame-duplicated
segments, add some `.tsp` calls throughout the backloader tasks
including a call to the new `.sort_diff()` to catch the out-of-order
history cases.
distribute_dis
Tyler Goodlet 2023-12-12 19:57:46 -05:00
parent e8bf4c6e04
commit b95932ea09
1 changed files with 70 additions and 39 deletions

View File

@ -1,18 +1,19 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either
# version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
'''
Historical data business logic for load, backfill and tsdb storage.
@ -39,6 +40,7 @@ from pendulum import (
from_timestamp,
)
import numpy as np
import polars as pl
from ..accounting import (
MktPair,
@ -54,6 +56,7 @@ from ._source import def_iohlcv_fields
from ._sampling import (
open_sample_stream,
)
from . import tsp
from ..brokers._util import (
DataUnavailable,
)
@ -197,7 +200,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
@ -246,13 +249,16 @@ async def start_backfill(
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history!?'
f'NO-MORE-DATA: backend {mod.name} halted history:\n'
f'{timeframe}@{mkt.fqme}'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
return
# TODO: drop this? see todo above..
@ -300,9 +306,11 @@ async def start_backfill(
array,
prepend_until_dt=backfill_until_dt,
)
ln = len(to_push)
ln: int = len(to_push)
if ln:
log.info(f'{ln} bars for {next_start_dt} -> {last_start_dt}')
log.info(
f'{ln} bars for {next_start_dt} -> {last_start_dt}'
)
else:
log.warning(
@ -388,14 +396,29 @@ async def start_backfill(
without_src=True,
)
else:
col_sym_key: str = mkt.get_fqme(delim_char='')
col_sym_key: str = mkt.get_fqme(
delim_char='',
)
# TODO: implement parquet append!?
await storage.write_ohlcv(
col_sym_key,
shm.array,
timeframe,
)
df: pl.DataFrame = await storage.as_df(
fqme=mkt.fqme,
period=timeframe,
load_from_offline=False,
)
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(df)
if diff:
tsp.sort_diff(df)
else:
# finally filled gap
log.info(
@ -634,12 +657,19 @@ async def tsdb_backfill(
async with mod.open_history_client(
mkt,
) as (get_hist, config):
log.info(f'{mod} history client returned backfill config: {config}')
log.info(
f'`{mod}` history client returned backfill config:\n'
f'{config}\n'
)
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
array, mr_start_dt, mr_end_dt = await get_hist(
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
end_dt=None,
)
@ -649,6 +679,7 @@ async def tsdb_backfill(
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
await tractor.pause()
return
# TODO: fill in non-zero epoch time values ALWAYS!
@ -699,9 +730,8 @@ async def tsdb_backfill(
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {fqme}'
f'No timeseries yet for {timeframe}@{fqme}'
)
else:
(
tsdb_history,
@ -784,25 +814,24 @@ async def tsdb_backfill(
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
get_hist,
mod,
mkt,
shm,
timeframe,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
@ -824,8 +853,11 @@ async def tsdb_backfill(
finally:
return
# IF we need to continue backloading incrementally from the
# tsdb client..
# XXX NOTE: this is legacy from when we were using
# marketstore and we needed to continue backloading
# incrementally from the tsdb client.. (bc it couldn't
# handle a single large query with gRPC for some
# reason.. classic goolag pos)
tn.start_soon(
back_load_from_tsdb,
@ -994,19 +1026,18 @@ async def manage_history(
log.info(f'Connected to sampler stream: {sample_stream}')
for timeframe in [60, 1]:
await tn.start(
await tn.start(partial(
tsdb_backfill,
mod,
storemod,
tn,
mod=mod,
storemod=storemod,
tn=tn,
# bus,
client,
mkt,
tf2mem[timeframe],
timeframe,
sample_stream,
)
storage=client,
mkt=mkt,
shm=tf2mem[timeframe],
timeframe=timeframe,
sampler_stream=sample_stream,
))
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history