Compare commits

..

14 Commits

Author SHA1 Message Date
Gud Boi cd6bc105de Enable tracing back insert backfills
Namely insertion writes which over-fill the shm buffer past the latest
tsdb sample via `.tsp._history.shm_push_in_between()`.

Deats,
- check earliest `to_push` timestamp and enter pause point if it's
  earlier then the tsdb's `backfill_until_dt` stamp.
- requires actually passing the `backfill_until_dt: datetime` thru,
  * `get_null_segs()`
  * `maybe_fill_null_segments()`
  * `shm_push_in_between()` (obvi XD)
2026-01-21 22:38:42 -05:00
Gud Boi a8e4e1b2c5 Tolerate various "bad data" cases in `markup_gaps()`
Namely such that when the previous-df-row by our shm-abs-'index' doesn't
exist we ignore certain cases which are likely due to borked-but-benign
samples written to the tsdb or rt shm buffers prior.

Particularly we now ignore,
- any `dt`/`prev_dt` values which are UNIX-epoch timestamped (val of 0).
- any row-is-first-row in the df; there is no previous.
- any missing previous datum by 'index', in which case we lookup the
  `wdts` prior row and use that instead.
  * this would indicate a missing sample for the time-step but we can
    still detect a "gap" by looking at the prior row, by df-abs-index
    `i`, and use its timestamp to determine the period/size of missing
    samples (which need to likely still be retrieved).
  * in this case i'm leaving in a pause-point for introspecting these
    rarer cases when `--pdb` is passed via CLI.

Relatedly in the `piker store` CLI ep,
- add `--pdb` flag to `piker store`, pass it verbatim as `debug_mode`.
- when `times` has only a single row, don't calc a `period_s` median.
- only trace `null_segs` when in debug mode.
- always markup/dedupe gaps for `period_s==60`
2026-01-21 22:20:43 -05:00
Gud Boi caf2cc5a5b ib: up API timeout default for remote host conns 2026-01-21 22:20:43 -05:00
Gud Boi d4b46e0eda Fix `Qt6` types for new sub-namespaces 2026-01-21 22:20:43 -05:00
Gud Boi a1048c847b Add vlm-based "smart" OHLCV de-duping & bar validation
Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter"
duplicate bars by attempting to distinguish between erroneous bars
partially written during concurrent backfill race conditions vs.
**actual** data quality issues from historical providers.

Problem:
--------
Concurrent writes (live updates vs. backfilling) can result in create
duplicate timestamped ohlcv vars with different values. Some
potential scenarios include,

- a market live feed is cancelled during live update resulting in the
  "last" datum being partially updated with all the ticks for the
  time step.
- when the feed is rebooted during charting, the backfiller will not
  finalize this bar since rn it presumes it should only fill data for
  time steps not already in the tsdb storage.

Our current naive  `.unique()` approach obvi keeps the incomplete bar
and a "smarter" approach is to compare the provider's final vlm
amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from
the provider likely indicates the cancelled-during-live-write and
**not** a datum discrepancy from said data provider.

Analysis (with `claude`) of `zecusdt` data revealed:
- 1000 duplicate timestamps
- 999 identical bars (pure duplicates from 2022 backfill overlap)
- 1 volume-monotonic conflict (live partial vs backfill complete)

A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()`
which:
- sorts by vlm **before** deduplication and keep the most complete
  bar based on vlm monotonicity as well as the following OHLCV
  validation assumptions:
  * volume should always increase
  * high should be non-decreasing,
  * low should be non-increasing
  * open should be identical
- Separates valid race conditions from provider data quality issues
  and reports and returns both dfs.

Change summary by `claude`:
- `.tsp._dedupe_smart`: new module with validation logic
- `.tsp.__init__`: expose `dedupe_ohlcv_smart()`
- `.storage.cli`: integrate smart dedupe, add logging for:
  * duplicate counts (identical vs monotonic races)
  * data quality violations (non-monotonic, invalid OHLC ranges)
  * warnings for provider data issues
- Remove `assert not diff` (duplicates are valid now)

Verified on `zecusdt`: correctly keeps index 3143645
(volume=287.777) over 3143644 (volume=140.299) for
conflicting 2026-01-16 18:54 UTC bar.

`claude`'s Summary of reasoning
-------------------------------
- volume monotonicity is critical: a bar's volume only increases
  during its time window.
- a backfilled bar should always have volume >= live updated.
- violations indicate any of:
  * Provider data corruption
  * Non-OHLCV aggregation semantics
  * Timestamp misalignment

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-21 22:20:43 -05:00
Gud Boi 192fe0dc73 Add `pexpect`-based `pdbp`-REPL offline helper
Add a new `snippets/claude_debug_helper.py` to
provide a programmatic interface to `tractor.pause()` debugger
sessions for incremental data inspection matching the interactive UX
but able to be run by `claude` "offline" since it can't seem to feed
stdin (so it claims) to the `pdb` instance due to lack of ability to
allocate a tty internally.

The script-wrapper is based on `tractor`'s `tests/devx/` suite's use of
`pexpect` patterns for driving `pdbp` prompts and thus enables
automated-offline execution of REPL-inspection commands **without**
using incremental-realtime output capture (like a human would use it).

Features:
- `run_pdb_commands()`: batch command execution
- `InteractivePdbSession`: context manager for step-by-step REPL interaction
- `expect()` wrapper: timeout handling with buffer display
- Proper stdin/stdout handling via `pexpect.spawn()`

Example usage:
```python
from debug_helper import InteractivePdbSession

with InteractivePdbSession(
    cmd='piker store ldshm zecusdt.usdtm.perp.binance'
) as session:
    session.run('deduped.shape')
    session.run('step_gaps.shape')
```

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-21 22:20:43 -05:00
Gud Boi 4bfdd388bb Fix polars 1.36.0 duration API
Polars tightened type safety for `.dt` accessor methods requiring
`total_*` methods for duration types vs datetime component accessors
like `day()` which now only work on datetime dtypes.

`detect_time_gaps()` in `.tsp._anal` was calling `.dt.day()`
on `dt_diff` column (a duration from `.diff()`) which throws
`InvalidOperationError` on modern polars.

Changes:
- use f-string to add pluralization to map time unit strings to
  `total_<unit>s` form for the new duration API.
- Handle singular/plural forms: 'day' -> 'days' -> 'total_days'
- Ensure trailing 's' before applying 'total_' prefix

Also updates inline comments explaining the polars type distinction
between datetime components vs duration totals.

Fixes `piker store ldshm` crashes on datasets with time gaps.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-21 22:20:43 -05:00
Tyler Goodlet 534b13f755 `.storage.__init__`: code styling updates 2026-01-21 22:20:43 -05:00
Tyler Goodlet 108646fdfb `.tsp._history`: drop `feed_is_live` syncing, another seg flag
The `await feed_is_live.wait()` is more or less pointless and would only
cause slower startup afaig (as-far-as-i-grok) so i'm masking it here.
This also removes the final `strict_exception_groups=False` use from the
non-tests code base, flipping to the `tractor.trionics` collapser once
and for all!
2026-01-21 22:20:43 -05:00
Tyler Goodlet d6d4fec666 Woops, keep `np2pl` exposed from `.tsp` 2026-01-21 22:20:43 -05:00
Tyler Goodlet 14ac351a65 Factor to a new `.tsp._history` sub-mod
Cleaning out the `piker.tsp` pkg-mod to be only the (re)exports needed
for `._anal`/`._history` refs-use elsewhere!
2026-01-21 22:20:43 -05:00
Gud Boi d146060d5c Merge pull request 'binance: mk `AggTrade.nq` optional..'
Reviewed-on: #67
Reviewed-by: momo <dilarayalniz1@gmail.com>
2026-01-22 03:20:27 +00:00
Gud Boi fff9de9aec binance: add API changelog link 2026-01-21 22:19:13 -05:00
Gud Boi b7cdbd89d4 binance: mk `AggTrade.nq` optional..
Oof! my bad.
Turns out spot pairs don't provide the `.nq` field looks like..
i guess i should not just test `.perp.` pairs all the time!

Bp
2026-01-21 19:59:07 -05:00
2 changed files with 43 additions and 16 deletions

View File

@ -108,6 +108,7 @@ class AggTrade(Struct, frozen=True):
m: bool # Is the buyer the market maker?
M: bool|None = None # Ignore
nq: float|None = None # Normal quantity without the trades involving RPI orders
# ^XXX https://developers.binance.com/docs/derivatives/change-log#2025-12-29
async def stream_messages(

View File

@ -75,7 +75,6 @@ from piker.brokers._util import (
)
from piker.storage import TimeseriesNotFound
from ._anal import (
dedupe,
get_null_segs,
iter_null_segs,
@ -120,15 +119,16 @@ _rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
def diff_history(
array: np.ndarray,
append_until_dt: datetime | None = None,
prepend_until_dt: datetime | None = None,
append_until_dt: datetime|None = None,
prepend_until_dt: datetime|None = None,
) -> np.ndarray:
# no diffing with tsdb dt index possible..
if (
prepend_until_dt is None
and append_until_dt is None
and
append_until_dt is None
):
return array
@ -140,15 +140,26 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()]
# TODO: can't we just make this a sync func now?
async def shm_push_in_between(
shm: ShmArray,
to_push: np.ndarray,
prepend_index: int,
backfill_until_dt: datetime,
update_start_on_prepend: bool = False,
) -> int:
# XXX, try to catch bad inserts by peeking at the first/last
# times and ensure we don't violate order.
f_times: np.ndarray = to_push['time']
f_start: float = f_times[0]
f_start_dt = from_timestamp(f_start)
if (
f_start_dt < backfill_until_dt
):
await tractor.pause()
# XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to
@ -181,6 +192,7 @@ async def maybe_fill_null_segments(
get_hist: Callable,
sampler_stream: tractor.MsgStream,
mkt: MktPair,
backfill_until_dt: datetime,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
@ -191,7 +203,11 @@ async def maybe_fill_null_segments(
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
# TODO, put in parent task/daemon root!
import greenback
await greenback.ensure_portal()
null_segs: tuple|None = get_null_segs(
frame,
period=timeframe,
)
@ -237,6 +253,7 @@ async def maybe_fill_null_segments(
shm,
to_push,
prepend_index=absi_end,
backfill_until_dt=backfill_until_dt,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
@ -352,15 +369,12 @@ async def start_backfill(
mkt: MktPair,
shm: ShmArray,
timeframe: float,
backfill_from_shm_index: int,
backfill_from_dt: datetime,
sampler_stream: tractor.MsgStream,
backfill_until_dt: datetime | None = None,
storage: StorageClient | None = None,
backfill_until_dt: datetime|None = None,
storage: StorageClient|None = None,
write_tsdb: bool = True,
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
@ -495,7 +509,14 @@ async def start_backfill(
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt
expected_dur: Interval = (
last_start_dt.subtract(
seconds=timeframe
# ^XXX, always "up to" the bar *before*
)
-
next_start_dt
)
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
@ -556,6 +577,7 @@ async def start_backfill(
shm,
to_push,
prepend_index=next_prepend_index,
backfill_until_dt=backfill_until_dt,
update_start_on_prepend=update_start_on_prepend,
)
await sampler_stream.send({
@ -585,6 +607,7 @@ async def start_backfill(
shm,
to_push,
prepend_index=next_prepend_index,
backfill_until_dt=backfill_until_dt,
update_start_on_prepend=update_start_on_prepend,
)
await sampler_stream.send({
@ -1056,7 +1079,7 @@ async def tsdb_backfill(
trio.open_nursery() as tn,
):
bf_done = await tn.start(
bf_done: trio.Event = await tn.start(
partial(
start_backfill,
get_hist=get_hist,
@ -1076,8 +1099,10 @@ async def tsdb_backfill(
write_tsdb=True,
)
)
nulls_detected: trio.Event | None = None
nulls_detected: trio.Event|None = None
if last_tsdb_dt is not None:
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
@ -1148,7 +1173,7 @@ async def tsdb_backfill(
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# -[ ] can we ensure that the backfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
@ -1160,6 +1185,7 @@ async def tsdb_backfill(
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
backfill_until_dt=last_tsdb_dt,
))
# 2nd nursery END