Only handle hist discrepancies when market is open

We obviously don't want to be debugging a sample-index issue if/when the
market for the asset is closed (since we'll be guaranteed to have
a mismatch, lul). Pass in the `feed_is_live: trio.Event` throughout the
backfilling routines to allow first checking for the live feed being active
so as to avoid breakpointing on false +ves. Also, add a detailed warning
log message for when *actually* investigating a mismatch.
multichartz_backup
Tyler Goodlet 2023-01-13 18:57:20 -05:00
parent 22ff509b01
commit e6d38d4f94
1 changed files with 24 additions and 3 deletions

View File

@ -207,7 +207,7 @@ def get_feed_bus(
) -> _FeedsBus: ) -> _FeedsBus:
''' '''
Retreive broker-daemon-local data feeds bus from process global Retrieve broker-daemon-local data feeds bus from process global
scope. Serialize task access to lock. scope. Serialize task access to lock.
''' '''
@ -250,6 +250,7 @@ async def start_backfill(
shm: ShmArray, shm: ShmArray,
timeframe: float, timeframe: float,
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
last_tsdb_dt: Optional[datetime] = None, last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None, storage: Optional[Storage] = None,
@ -281,7 +282,14 @@ async def start_backfill(
- pendulum.from_timestamp(times[-2]) - pendulum.from_timestamp(times[-2])
).seconds ).seconds
if step_size_s == 60: # if the market is open (aka we have a live feed) but the
# history sample step index seems off we report the surrounding
# data and drop into a bp. this case shouldn't really ever
# happen if we're doing history retrieval correctly.
if (
step_size_s == 60
and feed_is_live.is_set()
):
inow = round(time.time()) inow = round(time.time())
diff = inow - times[-1] diff = inow - times[-1]
if abs(diff) > 60: if abs(diff) > 60:
@ -499,6 +507,7 @@ async def basic_backfill(
bfqsn: str, bfqsn: str,
shms: dict[int, ShmArray], shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
) -> None: ) -> None:
@ -518,6 +527,7 @@ async def basic_backfill(
shm, shm,
timeframe, timeframe,
sampler_stream, sampler_stream,
feed_is_live,
) )
) )
except DataUnavailable: except DataUnavailable:
@ -534,6 +544,7 @@ async def tsdb_backfill(
bfqsn: str, bfqsn: str,
shms: dict[int, ShmArray], shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ShmArray, ShmArray] tuple[ShmArray, ShmArray]
@ -568,6 +579,8 @@ async def tsdb_backfill(
shm, shm,
timeframe, timeframe,
sampler_stream, sampler_stream,
feed_is_live,
last_tsdb_dt=last_tsdb_dt, last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True, tsdb_is_up=True,
storage=storage, storage=storage,
@ -870,6 +883,7 @@ async def manage_history(
60: hist_shm, 60: hist_shm,
}, },
sample_stream, sample_stream,
feed_is_live,
) )
# yield back after client connect with filled shm # yield back after client connect with filled shm
@ -904,6 +918,7 @@ async def manage_history(
60: hist_shm, 60: hist_shm,
}, },
sample_stream, sample_stream,
feed_is_live,
) )
task_status.started(( task_status.started((
hist_zero_index, hist_zero_index,
@ -1065,7 +1080,10 @@ async def allocate_persistent_feed(
# seed the buffer with a history datum - this is most handy # seed the buffer with a history datum - this is most handy
# for many backends which don't sample @ 1s OHLC but do have # for many backends which don't sample @ 1s OHLC but do have
# slower data such as 1m OHLC. # slower data such as 1m OHLC.
if not len(rt_shm.array): if (
not len(rt_shm.array)
and hist_shm.array.size
):
rt_shm.push(hist_shm.array[-3:-1]) rt_shm.push(hist_shm.array[-3:-1])
ohlckeys = ['open', 'high', 'low', 'close'] ohlckeys = ['open', 'high', 'low', 'close']
rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1]
@ -1076,6 +1094,9 @@ async def allocate_persistent_feed(
rt_shm.array['time'][0] = ts rt_shm.array['time'][0] = ts
rt_shm.array['time'][1] = ts + 1 rt_shm.array['time'][1] = ts + 1
elif hist_shm.array.size == 0:
await tractor.breakpoint()
# wait the spawning parent task to register its subscriber # wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop. # send-stream entry before we start the sample loop.
await sub_registered.wait() await sub_registered.wait()