Mask out all the duplicate frame detection
parent
c1546eb043
commit
c8f8724887
piker/data
|
@ -19,9 +19,9 @@ Historical data business logic for load, backfill and tsdb storage.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import (
|
# from collections import (
|
||||||
Counter,
|
# Counter,
|
||||||
)
|
# )
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import partial
|
from functools import partial
|
||||||
# import time
|
# import time
|
||||||
|
@ -86,6 +86,7 @@ def diff_history(
|
||||||
else:
|
else:
|
||||||
return array[times >= prepend_until_dt.timestamp()]
|
return array[times >= prepend_until_dt.timestamp()]
|
||||||
|
|
||||||
|
|
||||||
async def shm_push_in_between(
|
async def shm_push_in_between(
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
to_push: np.ndarray,
|
to_push: np.ndarray,
|
||||||
|
@ -191,7 +192,7 @@ async def start_backfill(
|
||||||
# avoid duplicate history frames with a set of datetime frame
|
# avoid duplicate history frames with a set of datetime frame
|
||||||
# starts and associated counts of how many duplicates we see
|
# starts and associated counts of how many duplicates we see
|
||||||
# per time stamp.
|
# per time stamp.
|
||||||
starts: Counter[datetime] = Counter()
|
# starts: Counter[datetime] = Counter()
|
||||||
|
|
||||||
# conduct "backward history gap filling" where we push to
|
# conduct "backward history gap filling" where we push to
|
||||||
# the shm buffer until we have history back until the
|
# the shm buffer until we have history back until the
|
||||||
|
@ -201,11 +202,6 @@ async def start_backfill(
|
||||||
|
|
||||||
while last_start_dt > backfill_until_dt:
|
while last_start_dt > backfill_until_dt:
|
||||||
|
|
||||||
# if timeframe == 60:
|
|
||||||
# await tractor.breakpoint()
|
|
||||||
# else:
|
|
||||||
# return
|
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f'Requesting {timeframe}s frame ending in {last_start_dt}'
|
f'Requesting {timeframe}s frame ending in {last_start_dt}'
|
||||||
)
|
)
|
||||||
|
@ -242,6 +238,7 @@ async def start_backfill(
|
||||||
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
|
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
|
||||||
# )
|
# )
|
||||||
# starts[start_dt] += 1
|
# starts[start_dt] += 1
|
||||||
|
# await tractor.breakpoint()
|
||||||
# continue
|
# continue
|
||||||
|
|
||||||
# elif starts[next_start_dt] > 6:
|
# elif starts[next_start_dt] > 6:
|
||||||
|
@ -250,13 +247,12 @@ async def start_backfill(
|
||||||
# )
|
# )
|
||||||
# return
|
# return
|
||||||
|
|
||||||
# only update new start point if not-yet-seen
|
# # only update new start point if not-yet-seen
|
||||||
start_dt: datetime = next_start_dt
|
# starts[next_start_dt] += 1
|
||||||
starts[start_dt] += 1
|
|
||||||
|
|
||||||
assert array['time'][0] == start_dt.timestamp()
|
assert array['time'][0] == next_start_dt.timestamp()
|
||||||
|
|
||||||
diff = last_start_dt - start_dt
|
diff = last_start_dt - next_start_dt
|
||||||
frame_time_diff_s = diff.seconds
|
frame_time_diff_s = diff.seconds
|
||||||
|
|
||||||
# frame's worth of sample-period-steps, in seconds
|
# frame's worth of sample-period-steps, in seconds
|
||||||
|
@ -279,11 +275,12 @@ async def start_backfill(
|
||||||
)
|
)
|
||||||
ln = len(to_push)
|
ln = len(to_push)
|
||||||
if ln:
|
if ln:
|
||||||
log.info(f'{ln} bars for {start_dt} -> {last_start_dt}')
|
log.info(f'{ln} bars for {next_start_dt} -> {last_start_dt}')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {last_start_dt}'
|
'0 BARS TO PUSH after diff!?\n'
|
||||||
|
f'{next_start_dt} -> {last_start_dt}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# bail gracefully on shm allocation overrun/full
|
# bail gracefully on shm allocation overrun/full
|
||||||
|
@ -308,7 +305,7 @@ async def start_backfill(
|
||||||
except ValueError as ve:
|
except ValueError as ve:
|
||||||
_ve = ve
|
_ve = ve
|
||||||
log.error(
|
log.error(
|
||||||
f'Shm buffer prepend OVERRUN on: {start_dt} -> {last_start_dt}?'
|
f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?'
|
||||||
)
|
)
|
||||||
|
|
||||||
if next_prepend_index < ln:
|
if next_prepend_index < ln:
|
||||||
|
@ -336,7 +333,7 @@ async def start_backfill(
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
f'Shm pushed {ln} frame:\n'
|
f'Shm pushed {ln} frame:\n'
|
||||||
f'{start_dt} -> {last_start_dt}'
|
f'{next_start_dt} -> {last_start_dt}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# FINALLY, maybe write immediately to the tsdb backend for
|
# FINALLY, maybe write immediately to the tsdb backend for
|
||||||
|
@ -347,7 +344,7 @@ async def start_backfill(
|
||||||
):
|
):
|
||||||
log.info(
|
log.info(
|
||||||
f'Writing {ln} frame to storage:\n'
|
f'Writing {ln} frame to storage:\n'
|
||||||
f'{start_dt} -> {last_start_dt}'
|
f'{next_start_dt} -> {last_start_dt}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if mkt.dst.atype not in {'crypto', 'crypto_currency'}:
|
if mkt.dst.atype not in {'crypto', 'crypto_currency'}:
|
||||||
|
@ -372,50 +369,52 @@ async def start_backfill(
|
||||||
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
|
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
|
||||||
)
|
)
|
||||||
# conduct tsdb timestamp gap detection and backfill any
|
# conduct tsdb timestamp gap detection and backfill any
|
||||||
# seemingly missing portions!
|
# seemingly missing sequence segments..
|
||||||
|
# TODO: ideally these never exist but somehow it seems
|
||||||
|
# sometimes we're writing zero-ed segments on certain
|
||||||
|
# (teardown) cases?
|
||||||
from ._timeseries import detect_null_time_gap
|
from ._timeseries import detect_null_time_gap
|
||||||
|
|
||||||
indices: tuple | None = detect_null_time_gap(shm)
|
gap_indices: tuple | None = detect_null_time_gap(shm)
|
||||||
if indices:
|
while gap_indices:
|
||||||
(
|
(
|
||||||
istart,
|
istart,
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
iend,
|
iend,
|
||||||
) = indices
|
) = gap_indices
|
||||||
|
|
||||||
|
start_dt = from_timestamp(start)
|
||||||
|
end_dt = from_timestamp(end)
|
||||||
(
|
(
|
||||||
array,
|
array,
|
||||||
next_start_dt,
|
next_start_dt,
|
||||||
next_end_dt,
|
next_end_dt,
|
||||||
) = await get_hist(
|
) = await get_hist(
|
||||||
timeframe,
|
timeframe,
|
||||||
start_dt=from_timestamp(start),
|
start_dt=start_dt,
|
||||||
end_dt=from_timestamp(end),
|
end_dt=end_dt,
|
||||||
)
|
)
|
||||||
|
|
||||||
await shm_push_in_between(
|
await shm_push_in_between(
|
||||||
shm,
|
shm,
|
||||||
array,
|
array,
|
||||||
prepend_index=iend,
|
prepend_index=iend,
|
||||||
update_start_on_prepend=False,
|
update_start_on_prepend=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: UI side needs IPC event to update..
|
||||||
|
# - make sure the UI actually always handles
|
||||||
|
# this update!
|
||||||
|
# - remember that in the display side, only refersh this
|
||||||
|
# if the respective history is actually "in view".
|
||||||
|
# loop
|
||||||
await sampler_stream.send({
|
await sampler_stream.send({
|
||||||
'broadcast_all': {
|
'broadcast_all': {
|
||||||
'backfilling': True
|
'backfilling': True
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
indices: tuple | None = detect_null_time_gap(shm)
|
gap_indices: tuple | None = detect_null_time_gap(shm)
|
||||||
if indices:
|
|
||||||
(
|
|
||||||
istart,
|
|
||||||
start,
|
|
||||||
end,
|
|
||||||
iend,
|
|
||||||
) = indices
|
|
||||||
await tractor.breakpoint()
|
|
||||||
|
|
||||||
# TODO: can we only trigger this if the respective
|
|
||||||
# history in "in view"?!?
|
|
||||||
|
|
||||||
# XXX: extremely important, there can be no checkpoints
|
# XXX: extremely important, there can be no checkpoints
|
||||||
# in the block above to avoid entering new ``frames``
|
# in the block above to avoid entering new ``frames``
|
||||||
|
|
Loading…
Reference in New Issue