From b95932ea0998ad2a167473c977bbe83f0be54218 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Dec 2023 19:57:46 -0500 Subject: [PATCH] `.data.history`: run `.tsp.dedupe()` in backloader In an effort to catch out-of-order and/or partial-frame-duplicated segments, add some `.tsp` calls throughout the backloader tasks including a call to the new `.sort_diff()` to catch the out-of-order history cases. --- piker/data/history.py | 109 +++++++++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 39 deletions(-) diff --git a/piker/data/history.py b/piker/data/history.py index 048769fa..0c2ecc25 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -1,18 +1,19 @@ # piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for pikers) -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# This program is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Affero General Public License for more details. -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# You should have received a copy of the GNU Affero General Public +# License along with this program. If not, see +# . ''' Historical data business logic for load, backfill and tsdb storage. @@ -39,6 +40,7 @@ from pendulum import ( from_timestamp, ) import numpy as np +import polars as pl from ..accounting import ( MktPair, @@ -54,6 +56,7 @@ from ._source import def_iohlcv_fields from ._sampling import ( open_sample_stream, ) +from . import tsp from ..brokers._util import ( DataUnavailable, ) @@ -197,7 +200,7 @@ async def start_backfill( # do a decently sized backfill and load it into storage. periods = { - 1: {'days': 6}, + 1: {'days': 2}, 60: {'years': 6}, } period_duration: int = periods[timeframe] @@ -246,13 +249,16 @@ async def start_backfill( # broker says there never was or is no more history to pull except DataUnavailable: log.warning( - f'NO-MORE-DATA: backend {mod.name} halted history!?' + f'NO-MORE-DATA: backend {mod.name} halted history:\n' + f'{timeframe}@{mkt.fqme}' ) # ugh, what's a better way? # TODO: fwiw, we probably want a way to signal a throttle # condition (eg. with ib) so that we can halt the # request loop until the condition is resolved? + if timeframe > 1: + await tractor.pause() return # TODO: drop this? see todo above.. @@ -300,9 +306,11 @@ async def start_backfill( array, prepend_until_dt=backfill_until_dt, ) - ln = len(to_push) + ln: int = len(to_push) if ln: - log.info(f'{ln} bars for {next_start_dt} -> {last_start_dt}') + log.info( + f'{ln} bars for {next_start_dt} -> {last_start_dt}' + ) else: log.warning( @@ -388,14 +396,29 @@ async def start_backfill( without_src=True, ) else: - col_sym_key: str = mkt.get_fqme(delim_char='') + col_sym_key: str = mkt.get_fqme( + delim_char='', + ) - # TODO: implement parquet append!? await storage.write_ohlcv( col_sym_key, shm.array, timeframe, ) + df: pl.DataFrame = await storage.as_df( + fqme=mkt.fqme, + period=timeframe, + load_from_offline=False, + ) + ( + df, + gaps, + deduped, + diff, + ) = tsp.dedupe(df) + if diff: + tsp.sort_diff(df) + else: # finally filled gap log.info( @@ -634,12 +657,19 @@ async def tsdb_backfill( async with mod.open_history_client( mkt, ) as (get_hist, config): - log.info(f'{mod} history client returned backfill config: {config}') + log.info( + f'`{mod}` history client returned backfill config:\n' + f'{config}\n' + ) # get latest query's worth of history all the way # back to what is recorded in the tsdb try: - array, mr_start_dt, mr_end_dt = await get_hist( + ( + array, + mr_start_dt, + mr_end_dt, + ) = await get_hist( timeframe, end_dt=None, ) @@ -649,6 +679,7 @@ async def tsdb_backfill( # there's no backfilling possible. except DataUnavailable: task_status.started() + await tractor.pause() return # TODO: fill in non-zero epoch time values ALWAYS! @@ -699,9 +730,8 @@ async def tsdb_backfill( ) except TimeseriesNotFound: log.warning( - f'No timeseries yet for {fqme}' + f'No timeseries yet for {timeframe}@{fqme}' ) - else: ( tsdb_history, @@ -784,25 +814,24 @@ async def tsdb_backfill( f'timeframe of {timeframe} seconds..\n' 'So yuh.. dun do dat brudder.' ) + # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb # continue that now in the background bf_done = await tn.start( partial( start_backfill, - get_hist, - mod, - mkt, - shm, - timeframe, - + get_hist=get_hist, + mod=mod, + mkt=mkt, + shm=shm, + timeframe=timeframe, backfill_from_shm_index=backfill_gap_from_shm_index, backfill_from_dt=mr_start_dt, - sampler_stream=sampler_stream, - backfill_until_dt=last_tsdb_dt, storage=storage, + write_tsdb=True, ) ) @@ -824,8 +853,11 @@ async def tsdb_backfill( finally: return - # IF we need to continue backloading incrementally from the - # tsdb client.. + # XXX NOTE: this is legacy from when we were using + # marketstore and we needed to continue backloading + # incrementally from the tsdb client.. (bc it couldn't + # handle a single large query with gRPC for some + # reason.. classic goolag pos) tn.start_soon( back_load_from_tsdb, @@ -994,19 +1026,18 @@ async def manage_history( log.info(f'Connected to sampler stream: {sample_stream}') for timeframe in [60, 1]: - await tn.start( + await tn.start(partial( tsdb_backfill, - mod, - storemod, - tn, + mod=mod, + storemod=storemod, + tn=tn, # bus, - client, - mkt, - tf2mem[timeframe], - timeframe, - - sample_stream, - ) + storage=client, + mkt=mkt, + shm=tf2mem[timeframe], + timeframe=timeframe, + sampler_stream=sample_stream, + )) # indicate to caller that feed can be delivered to # remote requesting client since we've loaded history