Wrap null-gap detect and fill in async gen

Call it `iter_null_segs()` (for now?) and use in the final (sequential)
stage of the `.history.start_backfill()` task-func. Delivers abs,
frame-relative, and equiv time stamps on each iteration pertaining to
each detected null-segment to make it easy to do piece-wise history
queries for each.

Further,
- handle edge case in `get_null_segs()` where there is only 1 zeroed
  row value, in which case we deliver `absi_zsegs` as a single pair of
  the same index value and,
  - when this occurs `iter_null_seqs()` delivers `None` for all the
    `start_` related indices/timestamps since all `get_hist()` routines
    (delivered by `open_history_client()`) should handle it as being a
    "get max history from this end_dt" type query.
- add note about needing to do time gap handling where there's a gap in
  the timeseries-history that isn't actually IN the data-history.
distribute_dis
Tyler Goodlet 2023-12-13 18:29:06 -05:00
parent c129f5bb4a
commit 83bdca46a2
2 changed files with 154 additions and 74 deletions

View File

@ -59,9 +59,10 @@ from ._sampling import (
from .tsp import (
dedupe,
get_null_segs,
iter_null_segs,
sort_diff,
Frame,
Seq,
# Seq,
)
from ..brokers._util import (
DataUnavailable,
@ -174,75 +175,71 @@ async def maybe_fill_null_segments(
) -> list[Frame]:
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
async for (
absi_start, absi_end,
fi_start, fi_end,
start_t, end_t,
start_dt, end_dt,
) in iter_null_segs(
frame,
period=timeframe,
)
if null_segs:
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
timeframe=timeframe,
):
# XXX NOTE: ?if we get a badly ordered timestamp
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
):
break
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
absi_first: int = frame[0]['index']
for absi_start, absi_end in absi_pairs_zsegs:
# await tractor.pause()
fi_start = absi_start - absi_first
fi_end = absi_end - absi_first
start_row: Seq = frame[fi_start]
end_row: Seq = frame[fi_end]
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
start_t: float = start_row['time']
end_t: float = end_row['time']
# make sure we don't overrun the buffer start
len_to_push: int = min(absi_end, array.size)
to_push: np.ndarray = array[-len_to_push:]
start_dt = from_timestamp(start_t)
end_dt = from_timestamp(end_t)
await shm_push_in_between(
shm,
to_push,
prepend_index=absi_end,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
# if we get a badly ordered timestamp
# pair, immediately stop backfilling.
if end_dt < start_dt:
break
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
await tractor.pause()
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(absi_end, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=absi_end,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
'backfilling': (mkt.fqme, timeframe),
},
})
# TODO: interatively step through any remaining time gaps?
# if (
# next_end_dt not in frame[
# ):
# pass
# RECHECK for more null-gaps
frame: Frame = shm.array

View File

@ -29,10 +29,17 @@ from math import (
floor,
)
import time
from typing import Literal
from typing import (
Literal,
AsyncGenerator,
)
import numpy as np
import polars as pl
from pendulum import (
DateTime,
from_timestamp,
)
from ..toolz.profile import (
Profiler,
@ -223,7 +230,10 @@ def get_null_segs(
col: str = 'time',
) -> tuple[
Seq,
# Seq, # TODO: can we make it an array-type instead?
list[
list[int, int],
],
Seq,
Frame
] | None:
@ -285,13 +295,27 @@ def get_null_segs(
# select out slice index pairs for each null-segment
# portion detected throughout entire input frame.
# import pdbp; pdbp.set_trace()
# only one null-segment in entire frame?
if not fi_zgaps.size:
# check for number null rows
# TODO: use ndarray for this!
absi_zsegs = [[
absi_zeros[0], # - 1, # - ifirst,
# TODO: need the + 1 or no?
absi_zeros[-1] + 1, # - ifirst,
]]
if absi_zeros.size > 1:
absi_zsegs = [[
absi_zeros[0], # - 1, # - ifirst,
# TODO: need the + 1 or no?
absi_zeros[-1] + 1, # - ifirst,
]]
else:
absi_zsegs = [[
# absi_zeros[0] + 1,
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
None,
absi_zeros[0] + 1,
]]
else:
absi_zsegs.append([
absi_zeros[0] - 1, # - ifirst,
@ -305,15 +329,12 @@ def get_null_segs(
) in enumerate(zip(
fi_zgaps,
fi_zseg_start_rows,
# fi_zgaps,
# start=1,
)):
assert (zseg_start_row == zero_t[fi]).all()
absi: int = zseg_start_row['index'][0]
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
absi_pre_zseg = absi - 1
# absi_pre_zseg = absi - 1
if i > 0:
prev_zseg_row = zero_t[fi - 1]
@ -330,7 +351,6 @@ def get_null_segs(
assert end
assert start < end
# import pdbp; pdbp.set_trace()
return (
absi_zsegs, # start indices of null
absi_zeros,
@ -338,6 +358,69 @@ def get_null_segs(
)
async def iter_null_segs(
frame: Frame,
timeframe: float,
) -> AsyncGenerator[
tuple[
int, int,
int, int,
float, float,
float, float,
# Seq, # TODO: can we make it an array-type instead?
# list[
# list[int, int],
# ],
# Seq,
# Frame
],
None,
]:
if null_segs := get_null_segs(
frame,
period=timeframe,
):
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
absi_first: int = frame[0]['index']
for (
absi_start,
absi_end,
) in absi_pairs_zsegs:
fi_end: int = absi_end - absi_first
end_row: Seq = frame[fi_end]
end_t: float = end_row['time']
end_dt: DateTime = from_timestamp(end_t)
if absi_start is not None:
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)
else:
fi_start = None
start_row = None
start_t = None
start_dt = None
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
)
def with_dts(
df: pl.DataFrame,
time_col: str = 'time',