Add `dedupe()` to help with gap detection/resolution

Think i finally figured out the weird issue without out-of-order OHLC
history getting jammed in the wrong place:
- gap is detected in parquet/offline ts (likely due to a zero dt or
  other gap),
- query for history in the gap is made BUT that frame is then inserted
  in the shm buffer **at the end** (likely using array int-entry
  indexing) which inserts it at the wrong location,
- later this out-of-order frame is written to the storage layer
  (parquet) and then is repeated on further reboots with the original
  gap causing further queries for the same frame on every history
  backfill.

A set of tools useful for detecting these issues and annotating them
nicely on chart part of this patch's intent:
- `dedupe()` will detect any dt gaps, deduplicate datetime rows and
  return the de-duplicated df along with gaps table.
- use this in both `piker store anal` such that we potentially
  resolve and backfill the gaps correctly if some rows were removed.
- possibly also use this to detect the backfilling error in logic at
  the time of backfilling the frame instead of after the fact (which
  would require re-writing the shm array from something like `store
  ldshm` and would be a manual post-hoc solution, not a fix to the
  original issue..
distribute_dis
Tyler Goodlet 2023-12-08 15:11:34 -05:00
parent b6d2550f33
commit 2eeef2a123
2 changed files with 100 additions and 16 deletions

View File

@ -136,6 +136,53 @@ def delete(
trio.run(main, symbols) trio.run(main, symbols)
def dedupe(src_df: pl.DataFrame) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
bool,
]:
'''
Check for time series gaps and if found
de-duplicate any datetime entries, check for
a frame height diff and return the newly
dt-deduplicated frame.
'''
from piker.data import _timeseries as tsp
df: pl.DataFrame = tsp.with_dts(src_df)
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
if not gaps.is_empty():
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = tsp.dedup_dt(df)
deduped_gaps = tsp.detect_time_gaps(deduped)
log.warning(
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = tsp.detect_null_time_gap()
diff: int = (
df.height
-
deduped.height
)
was_deduped: bool = False
if diff:
deduped: bool = True
return (
df,
gaps,
deduped,
was_deduped,
)
@store.command() @store.command()
def anal( def anal(
fqme: str, fqme: str,
@ -144,7 +191,11 @@ def anal(
) -> np.ndarray: ) -> np.ndarray:
''' '''
Anal-ysis is when you take the data do stuff to it, i think. Anal-ysis is when you take the data do stuff to it.
NOTE: This ONLY loads the offline timeseries data (by default
from a parquet file) NOT the in-shm version you might be seeing
in a chart.
''' '''
async def main(): async def main():
@ -162,6 +213,7 @@ def anal(
syms: list[str] = await client.list_keys() syms: list[str] = await client.list_keys()
log.info(f'{len(syms)} FOUND for {mod.name}') log.info(f'{len(syms)} FOUND for {mod.name}')
history: ShmArray # np buffer format
( (
history, history,
first_dt, first_dt,
@ -172,13 +224,26 @@ def anal(
) )
assert first_dt < last_dt assert first_dt < last_dt
src_df = await client.as_df(fqme, period) shm_df: pl.DataFrame = await client.as_df(
from piker.data import _timeseries as tsmod fqme,
df: pl.DataFrame = tsmod.with_dts(src_df) period,
gaps: pl.DataFrame = tsmod.detect_time_gaps(df) )
if not gaps.is_empty(): df: pl.DataFrame # with dts
print(f'Gaps found:\n{gaps}') deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
shortened,
) = dedupe(shm_df)
if shortened:
await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period,
)
# TODO: something better with tab completion.. # TODO: something better with tab completion..
# is there something more minimal but nearly as # is there something more minimal but nearly as
@ -275,7 +340,7 @@ def ldshm(
''' '''
Linux ONLY: load any fqme file name matching shm buffer from Linux ONLY: load any fqme file name matching shm buffer from
/dev/shm/ into an OHLCV numpy array and polars DataFrame, /dev/shm/ into an OHLCV numpy array and polars DataFrame,
optionally write to .parquet file. optionally write to offline storage via `.parquet` file.
''' '''
async def main(): async def main():
@ -287,7 +352,7 @@ def ldshm(
), ),
): ):
df: pl.DataFrame | None = None df: pl.DataFrame | None = None
for shmfile, shm, src_df in iter_dfs_from_shms(fqme): for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
@ -297,9 +362,16 @@ def ldshm(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
from piker.data import _timeseries as tsmod
df: pl.DataFrame = tsmod.with_dts(src_df) # over-write back to shm?
gaps: pl.DataFrame = tsmod.detect_time_gaps(df) df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
was_dded,
) = dedupe(shm_df)
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?

View File

@ -272,9 +272,21 @@ class NativeStorageClient:
# limit: int = int(200e3), # limit: int = int(200e3),
) -> np.ndarray: ) -> np.ndarray:
path: Path = self.mk_path(fqme, period=int(timeframe)) path: Path = self.mk_path(
fqme,
period=int(timeframe),
)
df: pl.DataFrame = pl.read_parquet(path) df: pl.DataFrame = pl.read_parquet(path)
self._dfs.setdefault(timeframe, {})[fqme] = df
# cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt
# buffers subsys but later we may operate entirely on
# pyarrow arrays/buffers so keeping the dfs around for
# a variety of purposes is handy.
self._dfs.setdefault(
timeframe,
{},
)[fqme] = df
# TODO: filter by end and limit inputs # TODO: filter by end and limit inputs
# times: pl.Series = df['time'] # times: pl.Series = df['time']
@ -329,7 +341,7 @@ class NativeStorageClient:
time.time() - start, time.time() - start,
ndigits=6, ndigits=6,
) )
print( log.info(
f'parquet write took {delay} secs\n' f'parquet write took {delay} secs\n'
f'file path: {path}' f'file path: {path}'
) )
@ -339,7 +351,7 @@ class NativeStorageClient:
async def write_ohlcv( async def write_ohlcv(
self, self,
fqme: str, fqme: str,
ohlcv: np.ndarray, ohlcv: np.ndarray | pl.DataFrame,
timeframe: int, timeframe: int,
) -> Path: ) -> Path: