From e57a2649d13745a86bb436a72c9311cb930f86db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Jan 2023 18:57:20 -0500 Subject: [PATCH] Only handle hist discrepancies when market is open We obviously don't want to be debugging a sample-index issue if/when the market for the asset is closed (since we'll be guaranteed to have a mismatch, lul). Pass in the `feed_is_live: trio.Event` throughout the backfilling routines to allow first checking for the live feed being active so as to avoid breakpointing on false +ves. Also, add a detailed warning log message for when *actually* investigating a mismatch. --- piker/data/feed.py | 43 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 56a0391e..0287eff0 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -207,7 +207,7 @@ def get_feed_bus( ) -> _FeedsBus: ''' - Retreive broker-daemon-local data feeds bus from process global + Retrieve broker-daemon-local data feeds bus from process global scope. Serialize task access to lock. ''' @@ -250,6 +250,7 @@ async def start_backfill( shm: ShmArray, timeframe: float, sampler_stream: tractor.MsgStream, + feed_is_live: trio.Event, last_tsdb_dt: Optional[datetime] = None, storage: Optional[Storage] = None, @@ -281,9 +282,30 @@ async def start_backfill( - pendulum.from_timestamp(times[-2]) ).seconds - if step_size_s == 60: + # if the market is open (aka we have a live feed) but the + # history sample step index seems off we report the surrounding + # data and drop into a bp. this case shouldn't really ever + # happen if we're doing history retrieval correctly. + if ( + step_size_s == 60 + and feed_is_live.is_set() + ): inow = round(time.time()) - if (inow - times[-1]) > 60: + diff = inow - times[-1] + if abs(diff) > 60: + surr = array[-6:] + diff_in_mins = round(diff/60., ndigits=2) + log.warning( + f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n' + f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' + 'Surrounding 6 time stamps:\n' + f'{list(surr["time"])}\n' + 'Here is surrounding 6 samples:\n' + f'{surr}\nn' + ) + + # for now we expect a hacker to investigate this case + # manually.. await tractor.breakpoint() # frame's worth of sample-period-steps, in seconds @@ -485,6 +507,7 @@ async def basic_backfill( bfqsn: str, shms: dict[int, ShmArray], sampler_stream: tractor.MsgStream, + feed_is_live: trio.Event, ) -> None: @@ -504,6 +527,7 @@ async def basic_backfill( shm, timeframe, sampler_stream, + feed_is_live, ) ) except DataUnavailable: @@ -520,6 +544,7 @@ async def tsdb_backfill( bfqsn: str, shms: dict[int, ShmArray], sampler_stream: tractor.MsgStream, + feed_is_live: trio.Event, task_status: TaskStatus[ tuple[ShmArray, ShmArray] @@ -554,6 +579,8 @@ async def tsdb_backfill( shm, timeframe, sampler_stream, + feed_is_live, + last_tsdb_dt=last_tsdb_dt, tsdb_is_up=True, storage=storage, @@ -856,6 +883,7 @@ async def manage_history( 60: hist_shm, }, sample_stream, + feed_is_live, ) # yield back after client connect with filled shm @@ -890,6 +918,7 @@ async def manage_history( 60: hist_shm, }, sample_stream, + feed_is_live, ) task_status.started(( hist_zero_index, @@ -1051,7 +1080,10 @@ async def allocate_persistent_feed( # seed the buffer with a history datum - this is most handy # for many backends which don't sample @ 1s OHLC but do have # slower data such as 1m OHLC. - if not len(rt_shm.array): + if ( + not len(rt_shm.array) + and hist_shm.array.size + ): rt_shm.push(hist_shm.array[-3:-1]) ohlckeys = ['open', 'high', 'low', 'close'] rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] @@ -1062,6 +1094,9 @@ async def allocate_persistent_feed( rt_shm.array['time'][0] = ts rt_shm.array['time'][1] = ts + 1 + elif hist_shm.array.size == 0: + await tractor.breakpoint() + # wait the spawning parent task to register its subscriber # send-stream entry before we start the sample loop. await sub_registered.wait()