Finally write a general purpose null-gap detector!

Using a bunch of fancy `numpy` vec ops (and ideally eventually extending
the same to `polars`) this is a first draft of `get_null_segs()`
a `col: str` field-value-is-zero detector which filters to all zero-valued
input frame segments and returns the corresponding useful slice-indexes:
- gap absolute (in shm buffer terms) index-endpoints as
  `absi_zsegs` for slicing to each null-segment in the src frame.
- ALL abs indices of rows with zeroed `col` values as `absi_zeros`.
- the full set of the input frame's row-entries (view) which are
  null valued for the chosen `col` as `zero_t`.

Use this new null-segment-detector in the
`.data.history.start_backfill()` task to attempt to fill null gaps that
might be extant from some prior backfill attempt. Since
`get_null_segs()` should now deliver a sequence of slices for each gap
we don't really need to have the `while gap_indices:` loop any more, so
just move that to the end-of-func and warn log (for now) if all gaps
aren't eventually filled.

TODO:
-[ ] do the null-seg detection and filling concurrently from
  most-recent-frame backfilling.
-[ ] offer the same detection in `.storage.cli` cmds for manual tsp
  anal.
-[ ] make the graphics layer actually update correctly when null-segs
  are filled (currently still broken somehow in the `Viz` caching
  layer?)

CHERRY INTO #486
distribute_dis
Tyler Goodlet 2023-12-13 15:08:42 -05:00
parent c4853a3fee
commit c129f5bb4a
2 changed files with 278 additions and 155 deletions

View File

@ -56,7 +56,13 @@ from ._source import def_iohlcv_fields
from ._sampling import ( from ._sampling import (
open_sample_stream, open_sample_stream,
) )
from . import tsp from .tsp import (
dedupe,
get_null_segs,
sort_diff,
Frame,
Seq,
)
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
@ -158,6 +164,100 @@ async def shm_push_in_between(
# await tractor.pause() # await tractor.pause()
async def maybe_fill_null_segments(
shm: ShmArray,
timeframe: float,
get_hist: Callable,
sampler_stream: tractor.MsgStream,
mkt: MktPair,
) -> list[Frame]:
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if null_segs:
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
absi_first: int = frame[0]['index']
for absi_start, absi_end in absi_pairs_zsegs:
# await tractor.pause()
fi_start = absi_start - absi_first
fi_end = absi_end - absi_first
start_row: Seq = frame[fi_start]
end_row: Seq = frame[fi_end]
start_t: float = start_row['time']
end_t: float = end_row['time']
start_dt = from_timestamp(start_t)
end_dt = from_timestamp(end_t)
# if we get a badly ordered timestamp
# pair, immediately stop backfilling.
if end_dt < start_dt:
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(absi_end, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=absi_end,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
'backfilling': (mkt.fqme, timeframe),
},
})
# RECHECK for more null-gaps
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if (
null_segs
and
len(null_segs[-1])
):
await tractor.pause()
async def start_backfill( async def start_backfill(
get_hist, get_hist,
mod: ModuleType, mod: ModuleType,
@ -224,16 +324,20 @@ async def start_backfill(
# per time stamp. # per time stamp.
# starts: Counter[datetime] = Counter() # starts: Counter[datetime] = Counter()
# conduct "backward history gap filling" where we push to # STAGE NOTE: "backward history gap filling":
# the shm buffer until we have history back until the # - we push to the shm buffer until we have history back
# latest entry loaded from the tsdb's table B) # until the latest entry loaded from the tsdb's table B)
# - after this loop continue to check for other gaps in the
# (tsdb) history and (at least report) maybe fill them
# from new frame queries to the backend?
last_start_dt: datetime = backfill_from_dt last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index next_prepend_index: int = backfill_from_shm_index
while last_start_dt > backfill_until_dt: while last_start_dt > backfill_until_dt:
log.info(
log.debug( f'Requesting {timeframe}s frame:\n'
f'Requesting {timeframe}s frame ending in {last_start_dt}' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
) )
try: try:
@ -261,36 +365,18 @@ async def start_backfill(
await tractor.pause() await tractor.pause()
return return
# TODO: drop this? see todo above.. assert (
# if ( array['time'][0]
# next_start_dt in starts ==
# and starts[next_start_dt] <= 6 next_start_dt.timestamp()
# ): )
# start_dt = min(starts)
# log.warning(
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
# )
# starts[start_dt] += 1
# await tractor.pause()
# continue
# elif starts[next_start_dt] > 6:
# log.warning(
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
# )
# return
# # only update new start point if not-yet-seen
# starts[next_start_dt] += 1
assert array['time'][0] == next_start_dt.timestamp()
diff = last_start_dt - next_start_dt diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * timeframe frame_size_s: float = len(array) * timeframe
expected_frame_size_s = frame_size_s + timeframe expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s: if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
@ -298,8 +384,10 @@ async def start_backfill(
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
log.warning( log.warning(
f'History frame ending @ {last_start_dt} appears to have a gap:\n' 'GAP DETECTED:\n'
f'{diff} ~= {frame_time_diff_s} seconds' f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
) )
to_push = diff_history( to_push = diff_history(
@ -415,75 +503,33 @@ async def start_backfill(
gaps, gaps,
deduped, deduped,
diff, diff,
) = tsp.dedupe(df) ) = dedupe(df)
if diff: if diff:
tsp.sort_diff(df) sort_diff(df)
else: else:
# finally filled gap # finally filled gap
log.info( log.info(
f'Finished filling gap to tsdb start @ {backfill_until_dt}!' f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
) )
# conduct tsdb timestamp gap detection and backfill any
# seemingly missing sequence segments..
# TODO: ideally these never exist but somehow it seems
# sometimes we're writing zero-ed segments on certain
# (teardown) cases?
from .tsp import detect_null_time_gap
gap_indices: tuple | None = detect_null_time_gap(shm) # NOTE: conduct tsdb timestamp gap detection and backfill any
while gap_indices: # seemingly missing (null-time) segments..
(
istart,
start,
end,
iend,
) = gap_indices
start_dt = from_timestamp(start) # TODO: ideally these can never exist!
end_dt = from_timestamp(end) # -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# if we get a baddly ordered timestamp # -[ ] can we ensure that the backcfiller tasks do this
# pair, imeeditately stop backfilling. # work PREVENTAVELY instead?
if end_dt < start_dt: # -[ ] fill in non-zero epoch time values ALWAYS!
break await maybe_fill_null_segments(
shm=shm,
( timeframe=timeframe,
array, get_hist=get_hist,
next_start_dt, sampler_stream=sampler_stream,
next_end_dt, mkt=mkt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
) )
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(iend, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=iend,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
'backfilling': (mkt.fqme, timeframe),
},
})
gap_indices: tuple | None = detect_null_time_gap(shm)
# XXX: extremely important, there can be no checkpoints # XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames`` # in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to # values while we're pipelining the current ones to
@ -679,29 +725,12 @@ async def tsdb_backfill(
# there's no backfilling possible. # there's no backfilling possible.
except DataUnavailable: except DataUnavailable:
task_status.started() task_status.started()
if timeframe > 1:
await tractor.pause() await tractor.pause()
return return
# TODO: fill in non-zero epoch time values ALWAYS!
# hist_shm._array['time'] = np.arange(
# start=
# NOTE: removed for now since it'll always break
# on the first 60s of the venue open..
# times: np.ndarray = array['time']
# # sample period step size in seconds
# step_size_s = (
# from_timestamp(times[-1])
# - from_timestamp(times[-2])
# ).seconds
# if step_size_s not in (1, 60):
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
# step_size_s = (
# from_timestamp(times[-2])
# - from_timestamp(times[-3])
# ).seconds
# NOTE: on the first history, most recent history # NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index # frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here # and thus a gap between the earliest datum loaded here

View File

@ -34,7 +34,6 @@ from typing import Literal
import numpy as np import numpy as np
import polars as pl import polars as pl
from ._sharedmem import ShmArray
from ..toolz.profile import ( from ..toolz.profile import (
Profiler, Profiler,
pg_profile_enabled, pg_profile_enabled,
@ -53,6 +52,14 @@ get_console_log = partial(
name=subsys, name=subsys,
) )
# NOTE: union type-defs to handle generic `numpy` and `polars` types
# side-by-side Bo
# |_ TODO: schema spec typing?
# -[ ] nptyping!
# -[ ] wtv we can with polars?
Frame = pl.DataFrame | np.ndarray
Seq = pl.Series | np.ndarray
def slice_from_time( def slice_from_time(
arr: np.ndarray, arr: np.ndarray,
@ -209,51 +216,126 @@ def slice_from_time(
return read_slc return read_slc
def detect_null_time_gap( def get_null_segs(
shm: ShmArray, frame: Frame,
period: float, # sampling step in seconds
imargin: int = 1, imargin: int = 1,
col: str = 'time',
) -> tuple[float, float] | None: ) -> tuple[
Seq,
Seq,
Frame
] | None:
''' '''
Detect if there are any zero-epoch stamped rows in Detect if there are any zero(-epoch stamped) valued
the presumed 'time' field-column. rows in for the provided `col: str` column; by default
presume the 'time' field/column.
Filter to the gap and return a surrounding index range. Filter to all such zero (time) segments and return
the corresponding frame zeroed segment's,
NOTE: for now presumes only ONE gap XD - gap absolute (in buffer terms) indices-endpoints as
`absi_zsegs`
- abs indices of all rows with zeroed `col` values as `absi_zeros`
- the corresponding frame's row-entries (view) which are
zeroed for the `col` as `zero_t`
''' '''
# ensure we read buffer state only once so that ShmArray rt # TODO: remove this?
# NOTE: ensure we read buffer state only once so that ShmArray rt
# circular-buffer updates don't cause a indexing/size mismatch. # circular-buffer updates don't cause a indexing/size mismatch.
array: np.ndarray = shm.array # frame: np.ndarray = shm.array
zero_pred: np.ndarray = array['time'] == 0 times: Seq = frame['time']
zero_t: np.ndarray = array[zero_pred] zero_pred: Seq = (times == 0)
if zero_t.size: if isinstance(frame, np.ndarray):
istart, iend = zero_t['index'][[0, -1]] tis_zeros: int = zero_pred.any()
start, end = shm._array['time'][ else:
[istart - imargin, iend + imargin] tis_zeros: int = zero_pred.any()
]
return (
istart - imargin,
start,
end,
iend + imargin,
)
if not tis_zeros:
return None return None
absi_zsegs: list[list[int, int]] = []
t_unit: Literal = Literal[ if isinstance(frame, np.ndarray):
'days', # ifirst: int = frame[0]['index']
'hours', zero_t: np.ndarray = frame[zero_pred]
'minutes',
'seconds', absi_zeros = zero_t['index']
'miliseconds', # relative frame-indexes of zeros
'microseconds', # fizeros = np.ndarray = zero_t['index'] - ifirst
'nanoseconds', absi_zdiff: np.ndarray = np.diff(absi_zeros)
] fi_zgaps = np.argwhere(
absi_zdiff > 1
# OR null / inf?
# OR is 0? for first zero-row entry?
)
fi_zseg_start_rows = zero_t[fi_zgaps]
else: # pl.DataFrame case
izeros: pl.Series = zero_pred.arg_true()
zero_t: pl.DataFrame = frame[izeros]
absi_zeros = zero_t['index']
absi_zdiff: pl.Series = absi_zeros.diff()
fi_zgaps = (absi_zdiff > 1).arg_true()
# select out slice index pairs for each null-segment
# portion detected throughout entire input frame.
if not fi_zgaps.size:
# TODO: use ndarray for this!
absi_zsegs = [[
absi_zeros[0], # - 1, # - ifirst,
# TODO: need the + 1 or no?
absi_zeros[-1] + 1, # - ifirst,
]]
else:
absi_zsegs.append([
absi_zeros[0] - 1, # - ifirst,
None,
])
# TODO: can we do it with vec ops?
for i, (
fi,
zseg_start_row,
) in enumerate(zip(
fi_zgaps,
fi_zseg_start_rows,
# fi_zgaps,
# start=1,
)):
assert (zseg_start_row == zero_t[fi]).all()
absi: int = zseg_start_row['index'][0]
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
absi_pre_zseg = absi - 1
if i > 0:
prev_zseg_row = zero_t[fi - 1]
absi_post_zseg = prev_zseg_row['index'][0] + 1
absi_zsegs[i - 1][1] = absi_post_zseg
if (i + 1) < fi_zgaps.size:
absi_zsegs.append([
absi,
None,
])
else:
for start, end in absi_zsegs:
assert end
assert start < end
# import pdbp; pdbp.set_trace()
return (
absi_zsegs, # start indices of null
absi_zeros,
zero_t,
)
def with_dts( def with_dts(
@ -292,6 +374,17 @@ def dedup_dt(
) )
t_unit: Literal = Literal[
'days',
'hours',
'minutes',
'seconds',
'miliseconds',
'microseconds',
'nanoseconds',
]
def detect_time_gaps( def detect_time_gaps(
df: pl.DataFrame, df: pl.DataFrame,
@ -406,10 +499,6 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
f'Gaps found:\n{gaps}\n' f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}' f'deduped Gaps found:\n{deduped_gaps}'
) )
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = detect_null_time_gap()
return ( return (
df, df,
gaps, gaps,
@ -428,14 +517,19 @@ def sort_diff(
list[int], # indices of segments that are out-of-order list[int], # indices of segments that are out-of-order
]: ]:
ser: pl.Series = src_df[col] ser: pl.Series = src_df[col]
diff: pl.Series = ser.diff()
sortd: pl.DataFrame = ser.sort() sortd: pl.DataFrame = ser.sort()
diff: pl.Series = ser.diff()
sortd_diff: pl.Series = sortd.diff() sortd_diff: pl.Series = sortd.diff()
i_step_diff = (diff != sortd_diff).arg_true() i_step_diff = (diff != sortd_diff).arg_true()
if i_step_diff.len(): frame_reorders: int = i_step_diff.len()
import pdbp if frame_reorders:
pdbp.set_trace() log.warn(
f'Resorted frame on col: {col}\n'
f'{frame_reorders}'
)
# import pdbp; pdbp.set_trace()
# NOTE: thanks to this SO answer for the below conversion routines # NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back: # to go from numpy struct-arrays to polars dataframes and back: