Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet cb941a5554 BABOSO.. fix last history frame overlap slicing!
I guess since i started supporting the whole "allow a gap between
the latest tsdb sample and the latest retrieved history frame" the
overlap slicing has been completely borked XD where we've been sticking
in duplicate history samples and this has caused all sorts of down
stream time-series processing issues..

So fix that but ensuring whenever there IS an overlap between history in
the latest frame and the tsdb that we always prefer the latest frame's
data and slice OUT the tsdb's duplicate indices..

CHERRY TO #486
2023-12-08 18:56:38 -05:00
Tyler Goodlet 2d72a052aa Woops, make sure non-disti mode still works wen maybe getting `pikerd` XD 2023-12-08 17:43:52 -05:00
Tyler Goodlet 2eeef2a123 Add `dedupe()` to help with gap detection/resolution
Think i finally figured out the weird issue without out-of-order OHLC
history getting jammed in the wrong place:
- gap is detected in parquet/offline ts (likely due to a zero dt or
  other gap),
- query for history in the gap is made BUT that frame is then inserted
  in the shm buffer **at the end** (likely using array int-entry
  indexing) which inserts it at the wrong location,
- later this out-of-order frame is written to the storage layer
  (parquet) and then is repeated on further reboots with the original
  gap causing further queries for the same frame on every history
  backfill.

A set of tools useful for detecting these issues and annotating them
nicely on chart part of this patch's intent:
- `dedupe()` will detect any dt gaps, deduplicate datetime rows and
  return the de-duplicated df along with gaps table.
- use this in both `piker store anal` such that we potentially
  resolve and backfill the gaps correctly if some rows were removed.
- possibly also use this to detect the backfilling error in logic at
  the time of backfilling the frame instead of after the fact (which
  would require re-writing the shm array from something like `store
  ldshm` and would be a manual post-hoc solution, not a fix to the
  original issue..
2023-12-08 15:11:34 -05:00
Tyler Goodlet b6d2550f33 Add datetime col de-duplicator 2023-12-08 14:38:27 -05:00
5 changed files with 166 additions and 42 deletions

View File

@ -263,6 +263,19 @@ def with_dts(
# ) # )
def dedup_dt(
df: pl.DataFrame,
) -> pl.DataFrame:
'''
Drop duplicate date-time rows (normally from an OHLC frame).
'''
return df.unique(
subset=['dt'],
maintain_order=True,
)
def detect_time_gaps( def detect_time_gaps(
df: pl.DataFrame, df: pl.DataFrame,
@ -294,10 +307,12 @@ def detect_time_gaps(
''' '''
return ( return (
with_dts(df) with_dts(df)
# First by a seconds unit step size
.filter( .filter(
pl.col('s_diff').abs() > expect_period pl.col('s_diff').abs() > expect_period
) )
.filter( .filter(
# Second by an arbitrary dt-unit step size
getattr( getattr(
pl.col('dt_diff').dt, pl.col('dt_diff').dt,
gap_dt_unit, gap_dt_unit,

View File

@ -716,9 +716,14 @@ async def tsdb_backfill(
backfill_diff: Duration = mr_start_dt - last_tsdb_dt backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds() offset_s: float = backfill_diff.in_seconds()
# XXX EDGE CASE: when the venue was closed (say over # XXX EDGE CASEs: the most recent frame overlaps with
# the weeknd) causing a timeseries gap, AND the query # prior tsdb history!!
# frames size (eg. for 1s we rx 2k datums ~= 33.33m) IS # - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating # GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE # session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be # CLOSURE GAP and thus the `offset_s` value will be
@ -727,20 +732,35 @@ async def tsdb_backfill(
# tsdb. In this case we instead only retreive and push # tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set. # the series portion missing from the db's data set.
if offset_s < 0: if offset_s < 0:
backfill_diff: Duration = mr_end_dt - last_tsdb_dt non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds() non_overlap_offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe) offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1 prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in # tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it! # shm buffer space so simply don't load it!
if prepend_start > 0: if prepend_start > 0:
to_push = tsdb_history[-prepend_start:]
shm.push( shm.push(
to_push, to_push,

View File

@ -282,16 +282,21 @@ async def maybe_open_pikerd(
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,
) as (actor, addrs), ) as (actor, addrs),
):
if _root_dname in actor.uid:
yield None
return
# try to attach to any existing (host-local) `pikerd` # NOTE: IFF running in disti mode, try to attach to any
tractor.find_actor( # existing (host-local) `pikerd`.
else:
async with tractor.find_actor(
_root_dname, _root_dname,
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
only_first=True, only_first=True,
# raise_on_none=True, # raise_on_none=True,
) as pikerd_portal, ) as pikerd_portal:
):
# connect to any existing remote daemon presuming its # connect to any existing remote daemon presuming its
# registry socket was selected. # registry socket was selected.
if pikerd_portal is not None: if pikerd_portal is not None:

View File

@ -136,6 +136,53 @@ def delete(
trio.run(main, symbols) trio.run(main, symbols)
def dedupe(src_df: pl.DataFrame) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
bool,
]:
'''
Check for time series gaps and if found
de-duplicate any datetime entries, check for
a frame height diff and return the newly
dt-deduplicated frame.
'''
from piker.data import _timeseries as tsp
df: pl.DataFrame = tsp.with_dts(src_df)
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
if not gaps.is_empty():
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = tsp.dedup_dt(df)
deduped_gaps = tsp.detect_time_gaps(deduped)
log.warning(
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = tsp.detect_null_time_gap()
diff: int = (
df.height
-
deduped.height
)
was_deduped: bool = False
if diff:
deduped: bool = True
return (
df,
gaps,
deduped,
was_deduped,
)
@store.command() @store.command()
def anal( def anal(
fqme: str, fqme: str,
@ -144,7 +191,11 @@ def anal(
) -> np.ndarray: ) -> np.ndarray:
''' '''
Anal-ysis is when you take the data do stuff to it, i think. Anal-ysis is when you take the data do stuff to it.
NOTE: This ONLY loads the offline timeseries data (by default
from a parquet file) NOT the in-shm version you might be seeing
in a chart.
''' '''
async def main(): async def main():
@ -162,6 +213,7 @@ def anal(
syms: list[str] = await client.list_keys() syms: list[str] = await client.list_keys()
log.info(f'{len(syms)} FOUND for {mod.name}') log.info(f'{len(syms)} FOUND for {mod.name}')
history: ShmArray # np buffer format
( (
history, history,
first_dt, first_dt,
@ -172,13 +224,26 @@ def anal(
) )
assert first_dt < last_dt assert first_dt < last_dt
src_df = await client.as_df(fqme, period) shm_df: pl.DataFrame = await client.as_df(
from piker.data import _timeseries as tsmod fqme,
df: pl.DataFrame = tsmod.with_dts(src_df) period,
gaps: pl.DataFrame = tsmod.detect_time_gaps(df) )
if not gaps.is_empty(): df: pl.DataFrame # with dts
print(f'Gaps found:\n{gaps}') deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
shortened,
) = dedupe(shm_df)
if shortened:
await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period,
)
# TODO: something better with tab completion.. # TODO: something better with tab completion..
# is there something more minimal but nearly as # is there something more minimal but nearly as
@ -275,7 +340,7 @@ def ldshm(
''' '''
Linux ONLY: load any fqme file name matching shm buffer from Linux ONLY: load any fqme file name matching shm buffer from
/dev/shm/ into an OHLCV numpy array and polars DataFrame, /dev/shm/ into an OHLCV numpy array and polars DataFrame,
optionally write to .parquet file. optionally write to offline storage via `.parquet` file.
''' '''
async def main(): async def main():
@ -287,7 +352,7 @@ def ldshm(
), ),
): ):
df: pl.DataFrame | None = None df: pl.DataFrame | None = None
for shmfile, shm, src_df in iter_dfs_from_shms(fqme): for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
@ -297,9 +362,16 @@ def ldshm(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
from piker.data import _timeseries as tsmod
df: pl.DataFrame = tsmod.with_dts(src_df) # over-write back to shm?
gaps: pl.DataFrame = tsmod.detect_time_gaps(df) df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
was_dded,
) = dedupe(shm_df)
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?

View File

@ -272,9 +272,21 @@ class NativeStorageClient:
# limit: int = int(200e3), # limit: int = int(200e3),
) -> np.ndarray: ) -> np.ndarray:
path: Path = self.mk_path(fqme, period=int(timeframe)) path: Path = self.mk_path(
fqme,
period=int(timeframe),
)
df: pl.DataFrame = pl.read_parquet(path) df: pl.DataFrame = pl.read_parquet(path)
self._dfs.setdefault(timeframe, {})[fqme] = df
# cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt
# buffers subsys but later we may operate entirely on
# pyarrow arrays/buffers so keeping the dfs around for
# a variety of purposes is handy.
self._dfs.setdefault(
timeframe,
{},
)[fqme] = df
# TODO: filter by end and limit inputs # TODO: filter by end and limit inputs
# times: pl.Series = df['time'] # times: pl.Series = df['time']
@ -329,7 +341,7 @@ class NativeStorageClient:
time.time() - start, time.time() - start,
ndigits=6, ndigits=6,
) )
print( log.info(
f'parquet write took {delay} secs\n' f'parquet write took {delay} secs\n'
f'file path: {path}' f'file path: {path}'
) )
@ -339,7 +351,7 @@ class NativeStorageClient:
async def write_ohlcv( async def write_ohlcv(
self, self,
fqme: str, fqme: str,
ohlcv: np.ndarray, ohlcv: np.ndarray | pl.DataFrame,
timeframe: int, timeframe: int,
) -> Path: ) -> Path: