Compare commits
4 Commits
b9af6176c5
...
cb941a5554
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | cb941a5554 | |
Tyler Goodlet | 2d72a052aa | |
Tyler Goodlet | 2eeef2a123 | |
Tyler Goodlet | b6d2550f33 |
|
@ -263,6 +263,19 @@ def with_dts(
|
|||
# )
|
||||
|
||||
|
||||
def dedup_dt(
|
||||
df: pl.DataFrame,
|
||||
) -> pl.DataFrame:
|
||||
'''
|
||||
Drop duplicate date-time rows (normally from an OHLC frame).
|
||||
|
||||
'''
|
||||
return df.unique(
|
||||
subset=['dt'],
|
||||
maintain_order=True,
|
||||
)
|
||||
|
||||
|
||||
def detect_time_gaps(
|
||||
df: pl.DataFrame,
|
||||
|
||||
|
@ -294,10 +307,12 @@ def detect_time_gaps(
|
|||
'''
|
||||
return (
|
||||
with_dts(df)
|
||||
# First by a seconds unit step size
|
||||
.filter(
|
||||
pl.col('s_diff').abs() > expect_period
|
||||
)
|
||||
.filter(
|
||||
# Second by an arbitrary dt-unit step size
|
||||
getattr(
|
||||
pl.col('dt_diff').dt,
|
||||
gap_dt_unit,
|
||||
|
|
|
@ -716,9 +716,14 @@ async def tsdb_backfill(
|
|||
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
||||
offset_s: float = backfill_diff.in_seconds()
|
||||
|
||||
# XXX EDGE CASE: when the venue was closed (say over
|
||||
# the weeknd) causing a timeseries gap, AND the query
|
||||
# frames size (eg. for 1s we rx 2k datums ~= 33.33m) IS
|
||||
# XXX EDGE CASEs: the most recent frame overlaps with
|
||||
# prior tsdb history!!
|
||||
# - so the latest frame's start time is earlier then
|
||||
# the tsdb's latest sample.
|
||||
# - alternatively this may also more generally occur
|
||||
# when the venue was closed (say over the weeknd)
|
||||
# causing a timeseries gap, AND the query frames size
|
||||
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
||||
# GREATER THAN the current venue-market's operating
|
||||
# session (time) we will receive datums from BEFORE THE
|
||||
# CLOSURE GAP and thus the `offset_s` value will be
|
||||
|
@ -727,20 +732,35 @@ async def tsdb_backfill(
|
|||
# tsdb. In this case we instead only retreive and push
|
||||
# the series portion missing from the db's data set.
|
||||
if offset_s < 0:
|
||||
backfill_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||
offset_s: float = backfill_diff.in_seconds()
|
||||
non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||
non_overlap_offset_s: float = backfill_diff.in_seconds()
|
||||
|
||||
offset_samples: int = round(offset_s / timeframe)
|
||||
|
||||
# TODO: see if there's faster multi-field reads:
|
||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||
# re-index with a `time` and index field
|
||||
prepend_start = shm._first.value - offset_samples + 1
|
||||
if offset_s > 0:
|
||||
# NOTE XXX: ONLY when there is an actual gap
|
||||
# between the earliest sample in the latest history
|
||||
# frame do we want to NOT stick the latest tsdb
|
||||
# history adjacent to that latest frame!
|
||||
prepend_start = shm._first.value - offset_samples + 1
|
||||
to_push = tsdb_history[-prepend_start:]
|
||||
else:
|
||||
# when there is overlap we want to remove the
|
||||
# overlapping samples from the tsdb portion (taking
|
||||
# instead the latest frame's values since THEY
|
||||
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
||||
# to the latest frame!
|
||||
# TODO: assert the overlap segment array contains
|
||||
# the same values!?!
|
||||
prepend_start = shm._first.value
|
||||
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
||||
|
||||
# tsdb history is so far in the past we can't fit it in
|
||||
# shm buffer space so simply don't load it!
|
||||
if prepend_start > 0:
|
||||
to_push = tsdb_history[-prepend_start:]
|
||||
shm.push(
|
||||
to_push,
|
||||
|
||||
|
|
|
@ -282,28 +282,33 @@ async def maybe_open_pikerd(
|
|||
loglevel=loglevel,
|
||||
**kwargs,
|
||||
) as (actor, addrs),
|
||||
|
||||
# try to attach to any existing (host-local) `pikerd`
|
||||
tractor.find_actor(
|
||||
_root_dname,
|
||||
registry_addrs=registry_addrs,
|
||||
only_first=True,
|
||||
# raise_on_none=True,
|
||||
) as pikerd_portal,
|
||||
|
||||
):
|
||||
# connect to any existing remote daemon presuming its
|
||||
# registry socket was selected.
|
||||
if pikerd_portal is not None:
|
||||
|
||||
# sanity check that we are actually connecting to
|
||||
# a remote process and not ourselves.
|
||||
assert actor.uid != pikerd_portal.channel.uid
|
||||
assert registry_addrs
|
||||
|
||||
yield pikerd_portal
|
||||
if _root_dname in actor.uid:
|
||||
yield None
|
||||
return
|
||||
|
||||
# NOTE: IFF running in disti mode, try to attach to any
|
||||
# existing (host-local) `pikerd`.
|
||||
else:
|
||||
async with tractor.find_actor(
|
||||
_root_dname,
|
||||
registry_addrs=registry_addrs,
|
||||
only_first=True,
|
||||
# raise_on_none=True,
|
||||
) as pikerd_portal:
|
||||
|
||||
# connect to any existing remote daemon presuming its
|
||||
# registry socket was selected.
|
||||
if pikerd_portal is not None:
|
||||
|
||||
# sanity check that we are actually connecting to
|
||||
# a remote process and not ourselves.
|
||||
assert actor.uid != pikerd_portal.channel.uid
|
||||
assert registry_addrs
|
||||
|
||||
yield pikerd_portal
|
||||
return
|
||||
|
||||
# presume pikerd role since no daemon could be found at
|
||||
# configured address
|
||||
async with open_pikerd(
|
||||
|
|
|
@ -136,6 +136,53 @@ def delete(
|
|||
trio.run(main, symbols)
|
||||
|
||||
|
||||
def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||
pl.DataFrame, # with dts
|
||||
pl.DataFrame, # gaps
|
||||
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
||||
bool,
|
||||
]:
|
||||
'''
|
||||
Check for time series gaps and if found
|
||||
de-duplicate any datetime entries, check for
|
||||
a frame height diff and return the newly
|
||||
dt-deduplicated frame.
|
||||
|
||||
'''
|
||||
from piker.data import _timeseries as tsp
|
||||
df: pl.DataFrame = tsp.with_dts(src_df)
|
||||
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
|
||||
if not gaps.is_empty():
|
||||
|
||||
# remove duplicated datetime samples/sections
|
||||
deduped: pl.DataFrame = tsp.dedup_dt(df)
|
||||
deduped_gaps = tsp.detect_time_gaps(deduped)
|
||||
|
||||
log.warning(
|
||||
f'Gaps found:\n{gaps}\n'
|
||||
f'deduped Gaps found:\n{deduped_gaps}'
|
||||
)
|
||||
# TODO: rewrite this in polars and/or convert to
|
||||
# ndarray to detect and remove?
|
||||
# null_gaps = tsp.detect_null_time_gap()
|
||||
|
||||
diff: int = (
|
||||
df.height
|
||||
-
|
||||
deduped.height
|
||||
)
|
||||
was_deduped: bool = False
|
||||
if diff:
|
||||
deduped: bool = True
|
||||
|
||||
return (
|
||||
df,
|
||||
gaps,
|
||||
deduped,
|
||||
was_deduped,
|
||||
)
|
||||
|
||||
|
||||
@store.command()
|
||||
def anal(
|
||||
fqme: str,
|
||||
|
@ -144,7 +191,11 @@ def anal(
|
|||
|
||||
) -> np.ndarray:
|
||||
'''
|
||||
Anal-ysis is when you take the data do stuff to it, i think.
|
||||
Anal-ysis is when you take the data do stuff to it.
|
||||
|
||||
NOTE: This ONLY loads the offline timeseries data (by default
|
||||
from a parquet file) NOT the in-shm version you might be seeing
|
||||
in a chart.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
@ -162,6 +213,7 @@ def anal(
|
|||
syms: list[str] = await client.list_keys()
|
||||
log.info(f'{len(syms)} FOUND for {mod.name}')
|
||||
|
||||
history: ShmArray # np buffer format
|
||||
(
|
||||
history,
|
||||
first_dt,
|
||||
|
@ -172,13 +224,26 @@ def anal(
|
|||
)
|
||||
assert first_dt < last_dt
|
||||
|
||||
src_df = await client.as_df(fqme, period)
|
||||
from piker.data import _timeseries as tsmod
|
||||
df: pl.DataFrame = tsmod.with_dts(src_df)
|
||||
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
|
||||
shm_df: pl.DataFrame = await client.as_df(
|
||||
fqme,
|
||||
period,
|
||||
)
|
||||
|
||||
if not gaps.is_empty():
|
||||
print(f'Gaps found:\n{gaps}')
|
||||
df: pl.DataFrame # with dts
|
||||
deduped: pl.DataFrame # deduplicated dts
|
||||
(
|
||||
df,
|
||||
gaps,
|
||||
deduped,
|
||||
shortened,
|
||||
) = dedupe(shm_df)
|
||||
|
||||
if shortened:
|
||||
await client.write_ohlcv(
|
||||
fqme,
|
||||
ohlcv=deduped,
|
||||
timeframe=period,
|
||||
)
|
||||
|
||||
# TODO: something better with tab completion..
|
||||
# is there something more minimal but nearly as
|
||||
|
@ -275,7 +340,7 @@ def ldshm(
|
|||
'''
|
||||
Linux ONLY: load any fqme file name matching shm buffer from
|
||||
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
|
||||
optionally write to .parquet file.
|
||||
optionally write to offline storage via `.parquet` file.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
@ -287,7 +352,7 @@ def ldshm(
|
|||
),
|
||||
):
|
||||
df: pl.DataFrame | None = None
|
||||
for shmfile, shm, src_df in iter_dfs_from_shms(fqme):
|
||||
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
|
||||
|
||||
# compute ohlc properties for naming
|
||||
times: np.ndarray = shm.array['time']
|
||||
|
@ -297,9 +362,16 @@ def ldshm(
|
|||
f'Something is wrong with time period for {shm}:\n{times}'
|
||||
)
|
||||
|
||||
from piker.data import _timeseries as tsmod
|
||||
df: pl.DataFrame = tsmod.with_dts(src_df)
|
||||
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
|
||||
|
||||
# over-write back to shm?
|
||||
df: pl.DataFrame # with dts
|
||||
deduped: pl.DataFrame # deduplicated dts
|
||||
(
|
||||
df,
|
||||
gaps,
|
||||
deduped,
|
||||
was_dded,
|
||||
) = dedupe(shm_df)
|
||||
|
||||
# TODO: maybe only optionally enter this depending
|
||||
# on some CLI flags and/or gap detection?
|
||||
|
|
|
@ -272,9 +272,21 @@ class NativeStorageClient:
|
|||
# limit: int = int(200e3),
|
||||
|
||||
) -> np.ndarray:
|
||||
path: Path = self.mk_path(fqme, period=int(timeframe))
|
||||
path: Path = self.mk_path(
|
||||
fqme,
|
||||
period=int(timeframe),
|
||||
)
|
||||
df: pl.DataFrame = pl.read_parquet(path)
|
||||
self._dfs.setdefault(timeframe, {})[fqme] = df
|
||||
|
||||
# cache df for later usage since we (currently) need to
|
||||
# convert to np.ndarrays to push to our `ShmArray` rt
|
||||
# buffers subsys but later we may operate entirely on
|
||||
# pyarrow arrays/buffers so keeping the dfs around for
|
||||
# a variety of purposes is handy.
|
||||
self._dfs.setdefault(
|
||||
timeframe,
|
||||
{},
|
||||
)[fqme] = df
|
||||
|
||||
# TODO: filter by end and limit inputs
|
||||
# times: pl.Series = df['time']
|
||||
|
@ -329,7 +341,7 @@ class NativeStorageClient:
|
|||
time.time() - start,
|
||||
ndigits=6,
|
||||
)
|
||||
print(
|
||||
log.info(
|
||||
f'parquet write took {delay} secs\n'
|
||||
f'file path: {path}'
|
||||
)
|
||||
|
@ -339,7 +351,7 @@ class NativeStorageClient:
|
|||
async def write_ohlcv(
|
||||
self,
|
||||
fqme: str,
|
||||
ohlcv: np.ndarray,
|
||||
ohlcv: np.ndarray | pl.DataFrame,
|
||||
timeframe: int,
|
||||
|
||||
) -> Path:
|
||||
|
|
Loading…
Reference in New Issue