Only handle hist discrepancies when market is open
We obviously don't want to be debugging a sample-index issue if/when the market for the asset is closed (since we'll be guaranteed to have a mismatch, lul). Pass in the `feed_is_live: trio.Event` throughout the backfilling routines to allow first checking for the live feed being active so as to avoid breakpointing on false +ves. Also, add a detailed warning log message for when *actually* investigating a mismatch.multichartz
parent
702a13a278
commit
1f6ec98790
|
@ -207,7 +207,7 @@ def get_feed_bus(
|
||||||
|
|
||||||
) -> _FeedsBus:
|
) -> _FeedsBus:
|
||||||
'''
|
'''
|
||||||
Retreive broker-daemon-local data feeds bus from process global
|
Retrieve broker-daemon-local data feeds bus from process global
|
||||||
scope. Serialize task access to lock.
|
scope. Serialize task access to lock.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
@ -250,6 +250,7 @@ async def start_backfill(
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
sampler_stream: tractor.MsgStream,
|
sampler_stream: tractor.MsgStream,
|
||||||
|
feed_is_live: trio.Event,
|
||||||
|
|
||||||
last_tsdb_dt: Optional[datetime] = None,
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
storage: Optional[Storage] = None,
|
storage: Optional[Storage] = None,
|
||||||
|
@ -281,7 +282,14 @@ async def start_backfill(
|
||||||
- pendulum.from_timestamp(times[-2])
|
- pendulum.from_timestamp(times[-2])
|
||||||
).seconds
|
).seconds
|
||||||
|
|
||||||
if step_size_s == 60:
|
# if the market is open (aka we have a live feed) but the
|
||||||
|
# history sample step index seems off we report the surrounding
|
||||||
|
# data and drop into a bp. this case shouldn't really ever
|
||||||
|
# happen if we're doing history retrieval correctly.
|
||||||
|
if (
|
||||||
|
step_size_s == 60
|
||||||
|
and feed_is_live.is_set()
|
||||||
|
):
|
||||||
inow = round(time.time())
|
inow = round(time.time())
|
||||||
diff = inow - times[-1]
|
diff = inow - times[-1]
|
||||||
if abs(diff) > 60:
|
if abs(diff) > 60:
|
||||||
|
@ -499,6 +507,7 @@ async def basic_backfill(
|
||||||
bfqsn: str,
|
bfqsn: str,
|
||||||
shms: dict[int, ShmArray],
|
shms: dict[int, ShmArray],
|
||||||
sampler_stream: tractor.MsgStream,
|
sampler_stream: tractor.MsgStream,
|
||||||
|
feed_is_live: trio.Event,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
@ -518,6 +527,7 @@ async def basic_backfill(
|
||||||
shm,
|
shm,
|
||||||
timeframe,
|
timeframe,
|
||||||
sampler_stream,
|
sampler_stream,
|
||||||
|
feed_is_live,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
except DataUnavailable:
|
except DataUnavailable:
|
||||||
|
@ -534,6 +544,7 @@ async def tsdb_backfill(
|
||||||
bfqsn: str,
|
bfqsn: str,
|
||||||
shms: dict[int, ShmArray],
|
shms: dict[int, ShmArray],
|
||||||
sampler_stream: tractor.MsgStream,
|
sampler_stream: tractor.MsgStream,
|
||||||
|
feed_is_live: trio.Event,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
tuple[ShmArray, ShmArray]
|
tuple[ShmArray, ShmArray]
|
||||||
|
@ -568,6 +579,8 @@ async def tsdb_backfill(
|
||||||
shm,
|
shm,
|
||||||
timeframe,
|
timeframe,
|
||||||
sampler_stream,
|
sampler_stream,
|
||||||
|
feed_is_live,
|
||||||
|
|
||||||
last_tsdb_dt=last_tsdb_dt,
|
last_tsdb_dt=last_tsdb_dt,
|
||||||
tsdb_is_up=True,
|
tsdb_is_up=True,
|
||||||
storage=storage,
|
storage=storage,
|
||||||
|
@ -870,6 +883,7 @@ async def manage_history(
|
||||||
60: hist_shm,
|
60: hist_shm,
|
||||||
},
|
},
|
||||||
sample_stream,
|
sample_stream,
|
||||||
|
feed_is_live,
|
||||||
)
|
)
|
||||||
|
|
||||||
# yield back after client connect with filled shm
|
# yield back after client connect with filled shm
|
||||||
|
@ -904,6 +918,7 @@ async def manage_history(
|
||||||
60: hist_shm,
|
60: hist_shm,
|
||||||
},
|
},
|
||||||
sample_stream,
|
sample_stream,
|
||||||
|
feed_is_live,
|
||||||
)
|
)
|
||||||
task_status.started((
|
task_status.started((
|
||||||
hist_zero_index,
|
hist_zero_index,
|
||||||
|
@ -1065,7 +1080,10 @@ async def allocate_persistent_feed(
|
||||||
# seed the buffer with a history datum - this is most handy
|
# seed the buffer with a history datum - this is most handy
|
||||||
# for many backends which don't sample @ 1s OHLC but do have
|
# for many backends which don't sample @ 1s OHLC but do have
|
||||||
# slower data such as 1m OHLC.
|
# slower data such as 1m OHLC.
|
||||||
if not len(rt_shm.array):
|
if (
|
||||||
|
not len(rt_shm.array)
|
||||||
|
and hist_shm.array.size
|
||||||
|
):
|
||||||
rt_shm.push(hist_shm.array[-3:-1])
|
rt_shm.push(hist_shm.array[-3:-1])
|
||||||
ohlckeys = ['open', 'high', 'low', 'close']
|
ohlckeys = ['open', 'high', 'low', 'close']
|
||||||
rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1]
|
rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1]
|
||||||
|
@ -1076,6 +1094,9 @@ async def allocate_persistent_feed(
|
||||||
rt_shm.array['time'][0] = ts
|
rt_shm.array['time'][0] = ts
|
||||||
rt_shm.array['time'][1] = ts + 1
|
rt_shm.array['time'][1] = ts + 1
|
||||||
|
|
||||||
|
elif hist_shm.array.size == 0:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
# wait the spawning parent task to register its subscriber
|
# wait the spawning parent task to register its subscriber
|
||||||
# send-stream entry before we start the sample loop.
|
# send-stream entry before we start the sample loop.
|
||||||
await sub_registered.wait()
|
await sub_registered.wait()
|
||||||
|
|
Loading…
Reference in New Issue