Detect and fill time gaps in tsdb history

For now, just detect and fill in gaps (via fresh backend queries)
*in the shm buffer* but eventually i'm pretty sure we can just write
these direct to the parquet file as well.

Use the new `.data._timeseries.detect_null_time_gap()` to find and fill
in the `ShmArray` index range, re-check it and enter a prompt if it
didn't totally fill.

Also,
- do a massive cleanup and removal of all unused/commented code.
  - drop the duplicate frames tracking, don't think we need it after
    removing multi-frame concurrent queries.
- change backfill loop variable `end_dt` -> `last_start_dt` which is
  more semantically correct.
- fix logic to backfill any missing sub-sequence portion for any frame
  query that overruns the shm buffer prependable space by detecting
  the available rows left to insert and only push those.
  - add a new `shm_push_in_between()` helper to match.
basic_buy_bot
Tyler Goodlet 2023-06-08 11:16:19 -04:00
parent f25248c871
commit 8233d12afb
1 changed files with 208 additions and 373 deletions

View File

@ -67,10 +67,6 @@ if TYPE_CHECKING:
def diff_history(
array: np.ndarray,
# timeframe: int,
# start_dt: datetime,
# end_dt: datetime,
append_until_dt: datetime | None = None,
prepend_until_dt: datetime | None = None,
@ -90,123 +86,45 @@ def diff_history(
else:
return array[times >= prepend_until_dt.timestamp()]
async def shm_push_in_between(
shm: ShmArray,
to_push: np.ndarray,
prepend_index: int,
# async def open_history_mngr(
# mod: ModuleType,
# mkt: MktPair,
# # shm: ShmArray,
# # timeframes: list[float] = [60, 1],
# timeframes: float,
update_start_on_prepend: bool = False,
# ) -> Callable[
# [int, datetime, datetime],
# tuple[np.ndarray, str]
# ]:
# '''
# Open a "history manager" for the backend data provider,
# get the latest "frames worth" of ohlcv history,
# push the history to shm and deliver
# the start datum's datetime value so that further history loading
# can be done until synchronized with the tsdb loaded time series.
) -> int:
shm.push(
to_push,
prepend=True,
# '''
# hist: Callable[
# [int, datetime, datetime],
# tuple[np.ndarray, str]
# ]
# config: dict[str, int]
# XXX: only update the ._first index if no tsdb
# segment was previously prepended by the
# parent task.
update_first=update_start_on_prepend,
# async with mod.open_history_client(
# mkt,
# ) as (hist, config):
# log.info(f'{mod} history client returned backfill config: {config}')
# XXX: only prepend from a manually calculated shm
# index if there was already a tsdb history
# segment prepended (since then the
# ._first.value is going to be wayyy in the
# past!)
start=(
prepend_index
if not update_start_on_prepend
else None
),
)
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
array = shm.array
zeros = array[array['low'] == 0]
if (
0 < zeros.size < 1000
):
tractor.breakpoint()
# # get latest query's worth of history all the way
# # back to what is recorded in the tsdb
# array, mr_start_dt, mr_end_dt = await hist(
# timeframe,
# end_dt=None,
# )
# times: np.ndarray = array['time']
# # sample period step size in seconds
# step_size_s = (
# from_timestamp(times[-1])
# - from_timestamp(times[-2])
# ).seconds
# if step_size_s not in (1, 60):
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
# step_size_s = (
# from_timestamp(times[-2])
# - from_timestamp(times[-3])
# ).seconds
# # NOTE: on the first history, most recent history
# # frame we PREPEND from the current shm ._last index
# # and thus a gap between the earliest datum loaded here
# # and the latest loaded from the tsdb may exist!
# log.info(f'Pushing {to_push.size} to shm!')
# shm.push(
# to_push,
# prepend=True,
# # start=
# )
# # if the market is open (aka we have a live feed) but the
# # history sample step index seems off we report the surrounding
# # data and drop into a bp. this case shouldn't really ever
# # happen if we're doing history retrieval correctly.
# # if (
# # step_size_s == 60
# # and feed_is_live.is_set()
# # ):
# # inow = round(time.time())
# # diff = inow - times[-1]
# # if abs(diff) > 60:
# # surr = array[-6:]
# # diff_in_mins = round(diff/60., ndigits=2)
# # log.warning(
# # f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n'
# # f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
# # 'Surrounding 6 time stamps:\n'
# # f'{list(surr["time"])}\n'
# # 'Here is surrounding 6 samples:\n'
# # f'{surr}\nn'
# # )
# # uncomment this for a hacker who wants to investigate
# # this case manually..
# # await tractor.breakpoint()
# # frame's worth of sample-period-steps, in seconds
# # frame_size_s = len(array) * step_size_s
# to_push = array
# # to_push = diff_history(
# # array,
# # # timeframe,
# # # mr_start_dt,
# # # mr_end_dt,
# # # backfill scenario for "most recent" frame
# # prepend_until_dt=last_tsdb_dt,
# # )
# # NOTE: on the first history, most recent history
# # frame we PREPEND from the current shm ._last index
# # and thus a gap between the earliest datum loaded here
# # and the latest loaded from the tsdb may exist!
# log.info(f'Pushing {to_push.size} to shm!')
# shm.push(
# to_push,
# prepend=True,
# # start=
# )
# # TODO: should we wrap this in a "history frame" type or
# # something?
# yield hist, mr_start_dt, mr_end_dt
async def start_backfill(
@ -221,44 +139,27 @@ async def start_backfill(
sampler_stream: tractor.MsgStream,
backfill_until_dt: datetime | None = None,
storage: StorageClient | None = None,
write_tsdb: bool = True,
# tsdb_is_up: bool = True,
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
) -> int:
# TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqme..
# otherwise all fsps get reset on every chart..
# await sampler_stream.send('broadcast_all')
# signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event()
# let caller unblock and deliver latest history frame
task_status.started( #(
# mr_start_dt,
# mr_end_dt,
bf_done,
)# )
# and use to signal that backfilling the shm gap until
# the tsdb end is complete!
bf_done = trio.Event()
task_status.started(bf_done)
# based on the sample step size, maybe load a certain amount history
update_start_on_prepend: bool = False
if backfill_until_dt is None:
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# TODO: drop this right and just expose the backfill
# limits inside a [storage] section in conf.toml?
# when no tsdb "last datum" is provided, we just load
# some near-term history.
# periods = {
@ -266,7 +167,6 @@ async def start_backfill(
# 60: {'days': 14},
# }
# if tsdb_is_up:
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
@ -281,38 +181,33 @@ async def start_backfill(
# settings above when the tsdb is detected as being empty.
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# TODO: can we drop this? without conc i don't think this
# is necessary any more?
# configure async query throttling
# rate = config.get('rate', 1)
# XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts and associated counts of how many duplicates we see
# per time stamp.
starts: Counter[datetime] = Counter()
# conduct "backward history filling" since
# no tsdb history yet exists.
# implemented via a simple inline sequential loop where we
# simply pass the last retrieved start dt to the next
# request as it's end dt.
# while end_dt < backfill_until_dt:
# while (
# end_dt is None # init case
# or end_dt < mr_start_dt
# ):
# conduct "forward filling" from the last time step
# loaded from the tsdb until the first step loaded
# just above
end_dt: datetime = backfill_from_dt
# start_dt: datetime = backfill_until_dt
# conduct "backward history gap filling" where we push to
# the shm buffer until we have history back until the
# latest entry loaded from the tsdb's table B)
last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index
while end_dt > backfill_until_dt:
while last_start_dt > backfill_until_dt:
# if timeframe == 60:
# await tractor.breakpoint()
# else:
# return
log.debug(
f'Requesting {timeframe}s frame ending in {end_dt}'
f'Requesting {timeframe}s frame ending in {last_start_dt}'
)
try:
@ -322,8 +217,7 @@ async def start_backfill(
next_end_dt,
) = await get_hist(
timeframe,
end_dt=end_dt,
# start_dt=start_dt,
end_dt=last_start_dt,
)
# broker says there never was or is no more history to pull
@ -338,22 +232,23 @@ async def start_backfill(
# request loop until the condition is resolved?
return
if (
next_start_dt in starts
and starts[next_start_dt] <= 6
):
start_dt = min(starts)
log.warning(
f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
)
starts[start_dt] += 1
continue
# TODO: drop this? see todo above..
# if (
# next_start_dt in starts
# and starts[next_start_dt] <= 6
# ):
# start_dt = min(starts)
# log.warning(
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
# )
# starts[start_dt] += 1
# continue
elif starts[next_start_dt] > 6:
log.warning(
f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
)
return
# elif starts[next_start_dt] > 6:
# log.warning(
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
# )
# return
# only update new start point if not-yet-seen
start_dt: datetime = next_start_dt
@ -361,7 +256,7 @@ async def start_backfill(
assert array['time'][0] == start_dt.timestamp()
diff = end_dt - start_dt
diff = last_start_dt - start_dt
frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds
@ -374,48 +269,31 @@ async def start_backfill(
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
log.warning(
f'History frame ending @ {end_dt} appears to have a gap:\n'
f'History frame ending @ {last_start_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds'
)
to_push = diff_history(
array,
# timeframe,
# start_dt,
# end_dt,
prepend_until_dt=backfill_until_dt,
)
ln = len(to_push)
if ln:
log.info(f'{ln} bars for {start_dt} -> {end_dt}')
log.info(f'{ln} bars for {start_dt} -> {last_start_dt}')
else:
log.warning(
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {last_start_dt}'
)
# bail gracefully on shm allocation overrun/full
# condition
try:
shm.push(
await shm_push_in_between(
shm,
to_push,
prepend=True,
# XXX: only update the ._first index if no tsdb
# segment was previously prepended by the
# parent task.
update_first=update_start_on_prepend,
# XXX: only prepend from a manually calculated shm
# index if there was already a tsdb history
# segment prepended (since then the
# ._first.value is going to be wayyy in the
# past!)
start=(
next_prepend_index
if not update_start_on_prepend
else None
),
prepend_index=next_prepend_index,
update_start_on_prepend=update_start_on_prepend,
)
await sampler_stream.send({
'broadcast_all': {
@ -425,34 +303,40 @@ async def start_backfill(
# decrement next prepend point
next_prepend_index = next_prepend_index - ln
end_dt = next_start_dt
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
array = shm.array
zeros = array[array['low'] == 0]
if (
0 < zeros.size < 10
):
await tractor.breakpoint()
last_start_dt = next_start_dt
except ValueError as ve:
_ve = ve
log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
log.error(
f'Shm buffer prepend OVERRUN on: {start_dt} -> {last_start_dt}?'
)
await tractor.breakpoint()
if next_prepend_index < ln:
log.warning(
f'Shm buffer can only hold {next_prepend_index} more rows..\n'
f'Appending those from recent {ln}-sized frame, no more!'
)
to_push = to_push[-next_prepend_index + 1:]
await shm_push_in_between(
shm,
to_push,
prepend_index=next_prepend_index,
update_start_on_prepend=update_start_on_prepend,
)
await sampler_stream.send({
'broadcast_all': {
'backfilling': True
},
})
# can't push the entire frame? so
# push only the amount that can fit..
break
log.info(
f'Shm pushed {ln} frame:\n'
f'{start_dt} -> {end_dt}'
f'{start_dt} -> {last_start_dt}'
)
# FINALLY, maybe write immediately to the tsdb backend for
@ -460,11 +344,10 @@ async def start_backfill(
if (
storage is not None
and write_tsdb
and False
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
f'{start_dt} -> {last_start_dt}'
)
if mkt.dst.atype not in {'crypto', 'crypto_currency'}:
@ -477,11 +360,59 @@ async def start_backfill(
else:
col_sym_key: str = mkt.get_fqme(delim_char='')
# TODO: implement parquet append!?
await storage.write_ohlcv(
col_sym_key,
shm.array,
timeframe,
)
else:
# finally filled gap
log.info(
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
)
# conduct tsdb timestamp gap detection and backfill any
# seemingly missing portions!
from ._timeseries import detect_null_time_gap
indices: tuple | None = detect_null_time_gap(shm)
if indices:
(
istart,
start,
end,
iend,
) = indices
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=from_timestamp(start),
end_dt=from_timestamp(end),
)
await shm_push_in_between(
shm,
array,
prepend_index=iend,
update_start_on_prepend=False,
)
await sampler_stream.send({
'broadcast_all': {
'backfilling': True
},
})
indices: tuple | None = detect_null_time_gap(shm)
if indices:
(
istart,
start,
end,
iend,
) = indices
await tractor.breakpoint()
# TODO: can we only trigger this if the respective
# history in "in view"?!?
@ -496,43 +427,11 @@ async def start_backfill(
bf_done.set()
def push_tsdb_history_to_shm(
storemod: ModuleType,
shm: ShmArray,
tsdb_history: np.ndarray,
time_field_key: str,
prepend: bool = False,
) -> datetime:
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
prepend_start = shm._first.value
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=prepend,
# update_first=False,
# start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
tsdb_last_frame_start = tsdb_history[time_field_key][0]
return from_timestamp(tsdb_last_frame_start)
async def back_load_from_tsdb(
storemod: ModuleType,
storage: StorageClient,
fqme: str,
# dts_per_tf: dict[int, datetime],
tsdb_history: np.ndarray,
@ -631,13 +530,26 @@ async def back_load_from_tsdb(
else:
tsdb_last_frame_start = next_start
tsdb_last_frame_start: datetime = push_tsdb_history_to_shm(
storemod,
shm,
tsdb_history,
time_key,
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
prepend_start = shm._first.value
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
# start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
tsdb_last_frame_start = tsdb_history[time_key][0]
# manually trigger step update to update charts/fsps
# which need an incremental update.
# NOTE: the way this works is super duper
@ -651,22 +563,18 @@ async def back_load_from_tsdb(
# graphics loop cycle.
# await sampler_stream.send('broadcast_all')
# TODO: write new data to tsdb to be ready to for next read.
async def tsdb_backfill(
mod: ModuleType,
storemod: ModuleType,
# bus: _FeedsBus,
tn: trio.Nursery,
storage: StorageClient,
mkt: MktPair,
# shms: dict[int, ShmArray],
shm: ShmArray,
timeframe: float,
sampler_stream: tractor.MsgStream,
# feed_is_live: trio.Event,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
@ -674,22 +582,11 @@ async def tsdb_backfill(
) -> None:
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
# dts_per_tf: dict[int, datetime] = {}
fqme: str = mkt.fqme
# time_key: str = 'time'
# if getattr(storemod, 'ohlc_key_map', False):
# keymap: bidict = storemod.ohlc_key_map
# time_key: str = keymap.inverse['time']
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(
mkt,
) as (get_hist, config):
@ -733,24 +630,18 @@ async def tsdb_backfill(
shm.push(
array,
prepend=True, # append on first frame
# start=
)
backfill_gap_from_shm_index: int = shm._first.value + 1
# tell parent task to continue
task_status.started()
# start history anal and load missing new data via backend.
# backfill_until_dt: datetime | None = None
# started_after_tsdb_load: bool = False
# for timeframe, shm in shms.items():
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
@ -777,45 +668,36 @@ async def tsdb_backfill(
# re-index with a `time` and index field
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
log.info(f'Loaded {to_push.shape} datums from storage')
# tsdb_last_frame_start: datetime = push_tsdb_history_to_shm(
# storemod,
# shm,
# tsdb_history,
# time_key,
# prepend=True,
# )
# assert tsdb_last_frame_start == first_tsdb_dt
# unblock the feed bus management task
# assert len(shms[1].array)
# if not started_after_tsdb_load:
# task_status.started()
# started_after_tsdb_load = True
# begin backfiller task ASAP
# try:
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
# try:
# (
# latest_start_dt,
# latest_end_dt,
bf_done = await tn.start(
partial(
start_backfill,
@ -827,46 +709,24 @@ async def tsdb_backfill(
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
backfill_until_dt=last_tsdb_dt,
sampler_stream=sampler_stream,
# feed_is_live,
backfill_until_dt=last_tsdb_dt,
storage=storage,
# tsdb_is_up=True,
)
)
# if tsdb_entry:
# dts_per_tf[timeframe] = (
# tsdb_history,
# last_tsdb_dt,
# latest_start_dt,
# latest_end_dt,
# bf_done,
# )
# elif not started_after_tsdb_load:
# task_status.started()
# started_after_tsdb_load = True
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
# except DataUnavailable:
# return
# continue
# tsdb_history = series.get(timeframe)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
# backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory)
@ -875,7 +735,6 @@ async def tsdb_backfill(
await trio.sleep_forever()
finally:
return
# write_ohlcv
# IF we need to continue backloading incrementall from the
# tsdb client..
@ -895,29 +754,6 @@ async def tsdb_backfill(
timeframe,
shm,
)
# async with trio.open_nursery() as nurse:
# for timeframe, shm in shms.items():
# entry = dts_per_tf.get(timeframe)
# if not entry:
# continue
# (
# tsdb_history,
# last_tsdb_dt,
# latest_start_dt,
# latest_end_dt,
# bf_done,
# ) = entry
# if not tsdb_history.size:
# continue
# try:
# await trio.sleep_forever()
# finally:
# write_ohlcv
async def manage_history(
@ -1079,7 +915,6 @@ async def manage_history(
timeframe,
sample_stream,
# feed_is_live,
)
# indicate to caller that feed can be delivered to