Compare commits

..

No commits in common. "cb941a55549b1da48c6ae837865a48dbc1a63026" and "b9af6176c5e0929595fba4112e4eb605505ac1d3" have entirely different histories.

5 changed files with 42 additions and 166 deletions

View File

@ -263,19 +263,6 @@ def with_dts(
# )
def dedup_dt(
df: pl.DataFrame,
) -> pl.DataFrame:
'''
Drop duplicate date-time rows (normally from an OHLC frame).
'''
return df.unique(
subset=['dt'],
maintain_order=True,
)
def detect_time_gaps(
df: pl.DataFrame,
@ -307,12 +294,10 @@ def detect_time_gaps(
'''
return (
with_dts(df)
# First by a seconds unit step size
.filter(
pl.col('s_diff').abs() > expect_period
)
.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
gap_dt_unit,

View File

@ -716,14 +716,9 @@ async def tsdb_backfill(
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# XXX EDGE CASE: when the venue was closed (say over
# the weeknd) causing a timeseries gap, AND the query
# frames size (eg. for 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
@ -732,35 +727,20 @@ async def tsdb_backfill(
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
if offset_s < 0:
non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
non_overlap_offset_s: float = backfill_diff.in_seconds()
backfill_diff: Duration = mr_end_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
prepend_start = shm._first.value - offset_samples + 1
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,

View File

@ -282,33 +282,28 @@ async def maybe_open_pikerd(
loglevel=loglevel,
**kwargs,
) as (actor, addrs),
# try to attach to any existing (host-local) `pikerd`
tractor.find_actor(
_root_dname,
registry_addrs=registry_addrs,
only_first=True,
# raise_on_none=True,
) as pikerd_portal,
):
if _root_dname in actor.uid:
yield None
# connect to any existing remote daemon presuming its
# registry socket was selected.
if pikerd_portal is not None:
# sanity check that we are actually connecting to
# a remote process and not ourselves.
assert actor.uid != pikerd_portal.channel.uid
assert registry_addrs
yield pikerd_portal
return
# NOTE: IFF running in disti mode, try to attach to any
# existing (host-local) `pikerd`.
else:
async with tractor.find_actor(
_root_dname,
registry_addrs=registry_addrs,
only_first=True,
# raise_on_none=True,
) as pikerd_portal:
# connect to any existing remote daemon presuming its
# registry socket was selected.
if pikerd_portal is not None:
# sanity check that we are actually connecting to
# a remote process and not ourselves.
assert actor.uid != pikerd_portal.channel.uid
assert registry_addrs
yield pikerd_portal
return
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(

View File

@ -136,53 +136,6 @@ def delete(
trio.run(main, symbols)
def dedupe(src_df: pl.DataFrame) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
bool,
]:
'''
Check for time series gaps and if found
de-duplicate any datetime entries, check for
a frame height diff and return the newly
dt-deduplicated frame.
'''
from piker.data import _timeseries as tsp
df: pl.DataFrame = tsp.with_dts(src_df)
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
if not gaps.is_empty():
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = tsp.dedup_dt(df)
deduped_gaps = tsp.detect_time_gaps(deduped)
log.warning(
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = tsp.detect_null_time_gap()
diff: int = (
df.height
-
deduped.height
)
was_deduped: bool = False
if diff:
deduped: bool = True
return (
df,
gaps,
deduped,
was_deduped,
)
@store.command()
def anal(
fqme: str,
@ -191,11 +144,7 @@ def anal(
) -> np.ndarray:
'''
Anal-ysis is when you take the data do stuff to it.
NOTE: This ONLY loads the offline timeseries data (by default
from a parquet file) NOT the in-shm version you might be seeing
in a chart.
Anal-ysis is when you take the data do stuff to it, i think.
'''
async def main():
@ -213,7 +162,6 @@ def anal(
syms: list[str] = await client.list_keys()
log.info(f'{len(syms)} FOUND for {mod.name}')
history: ShmArray # np buffer format
(
history,
first_dt,
@ -224,26 +172,13 @@ def anal(
)
assert first_dt < last_dt
shm_df: pl.DataFrame = await client.as_df(
fqme,
period,
)
src_df = await client.as_df(fqme, period)
from piker.data import _timeseries as tsmod
df: pl.DataFrame = tsmod.with_dts(src_df)
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
shortened,
) = dedupe(shm_df)
if shortened:
await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period,
)
if not gaps.is_empty():
print(f'Gaps found:\n{gaps}')
# TODO: something better with tab completion..
# is there something more minimal but nearly as
@ -340,7 +275,7 @@ def ldshm(
'''
Linux ONLY: load any fqme file name matching shm buffer from
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
optionally write to offline storage via `.parquet` file.
optionally write to .parquet file.
'''
async def main():
@ -352,7 +287,7 @@ def ldshm(
),
):
df: pl.DataFrame | None = None
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
for shmfile, shm, src_df in iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
@ -362,16 +297,9 @@ def ldshm(
f'Something is wrong with time period for {shm}:\n{times}'
)
# over-write back to shm?
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
was_dded,
) = dedupe(shm_df)
from piker.data import _timeseries as tsmod
df: pl.DataFrame = tsmod.with_dts(src_df)
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?

View File

@ -272,21 +272,9 @@ class NativeStorageClient:
# limit: int = int(200e3),
) -> np.ndarray:
path: Path = self.mk_path(
fqme,
period=int(timeframe),
)
path: Path = self.mk_path(fqme, period=int(timeframe))
df: pl.DataFrame = pl.read_parquet(path)
# cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt
# buffers subsys but later we may operate entirely on
# pyarrow arrays/buffers so keeping the dfs around for
# a variety of purposes is handy.
self._dfs.setdefault(
timeframe,
{},
)[fqme] = df
self._dfs.setdefault(timeframe, {})[fqme] = df
# TODO: filter by end and limit inputs
# times: pl.Series = df['time']
@ -341,7 +329,7 @@ class NativeStorageClient:
time.time() - start,
ndigits=6,
)
log.info(
print(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
@ -351,7 +339,7 @@ class NativeStorageClient:
async def write_ohlcv(
self,
fqme: str,
ohlcv: np.ndarray | pl.DataFrame,
ohlcv: np.ndarray,
timeframe: int,
) -> Path: