For each timeframe open a sub-nursery to do the backfilling + tsdb load
+ null-segment scanning in an effort to both speed up load time (though
we need to reverse the current order to really make it faster rn since
moving to the much faster parquet file backend) and do concurrent
time-gap/null-segment checking of tsdb history while mrf (most recent
frame) history is backfilling.
The details are more or less just `trio` related task-func composition
tricks and a reordering of said funcs for optimal startup latency.
Also commented the `back_load_from_tsdb()` task for now since it's
unused.
Apparently it returns the index of the prior zero-row (prolly since we
do the backward difference) so ensure `fi_zgaps += 1`..
Also fix remaining edge case handling when there's only 2 zero-segs
which was borked after a refactor to the special case blocks (like
a single zero row) prior to the `absi_zsegs` building loop AND make sure
to always return abs indices OUTSIDE the zero seg, i.e. the indices of
the non-zero row just before and just after so that the history
backfiller can use non-zero timestamps to generate range datetimes for
backend frame queries.
Add much more detailed doc-comments with a small ascii diagram to
explain how all these somewhat subtle vec ops work. Also toss in some
sanity checks on the output indices to ensure they don't point to
zero (time) valued rows when used to read the frame.
Call it `iter_null_segs()` (for now?) and use in the final (sequential)
stage of the `.history.start_backfill()` task-func. Delivers abs,
frame-relative, and equiv time stamps on each iteration pertaining to
each detected null-segment to make it easy to do piece-wise history
queries for each.
Further,
- handle edge case in `get_null_segs()` where there is only 1 zeroed
row value, in which case we deliver `absi_zsegs` as a single pair of
the same index value and,
- when this occurs `iter_null_seqs()` delivers `None` for all the
`start_` related indices/timestamps since all `get_hist()` routines
(delivered by `open_history_client()`) should handle it as being a
"get max history from this end_dt" type query.
- add note about needing to do time gap handling where there's a gap in
the timeseries-history that isn't actually IN the data-history.
Using a bunch of fancy `numpy` vec ops (and ideally eventually extending
the same to `polars`) this is a first draft of `get_null_segs()`
a `col: str` field-value-is-zero detector which filters to all zero-valued
input frame segments and returns the corresponding useful slice-indexes:
- gap absolute (in shm buffer terms) index-endpoints as
`absi_zsegs` for slicing to each null-segment in the src frame.
- ALL abs indices of rows with zeroed `col` values as `absi_zeros`.
- the full set of the input frame's row-entries (view) which are
null valued for the chosen `col` as `zero_t`.
Use this new null-segment-detector in the
`.data.history.start_backfill()` task to attempt to fill null gaps that
might be extant from some prior backfill attempt. Since
`get_null_segs()` should now deliver a sequence of slices for each gap
we don't really need to have the `while gap_indices:` loop any more, so
just move that to the end-of-func and warn log (for now) if all gaps
aren't eventually filled.
TODO:
-[ ] do the null-seg detection and filling concurrently from
most-recent-frame backfilling.
-[ ] offer the same detection in `.storage.cli` cmds for manual tsp
anal.
-[ ] make the graphics layer actually update correctly when null-segs
are filled (currently still broken somehow in the `Viz` caching
layer?)
CHERRY INTO #486
In an effort to catch out-of-order and/or partial-frame-duplicated
segments, add some `.tsp` calls throughout the backloader tasks
including a call to the new `.sort_diff()` to catch the out-of-order
history cases.
Since the `diff: int` serves as a predicate anyway (when `0` nothing
duplicate was detected) might as well just return it directly since it's
likely also useful for the caller when doing deeper anal.
Also, handle the zero-diff case by just returning early with a copy of
the input frame and a `diff=0`.
CHERRY INTO #486
Turns out this was the main source of all sorts of gaps and overlaps
in history frame backfilling. The original idea was that when a gap
causes not enough (1m) bars to be delivered (like over a weekend or
holiday) when we just implicitly do another frame query to try and at
least fill out the default duration (normally 1-2 days). Doing the
recursion sloppily was causing all sorts of stupid problems..
It's kinda obvious now what was wrong in hindsight:
- always pass the sampling period (timeframe) when recursing
- adjust the logic to not be mutex with the no-data case (since it
already is mutex..)
- pack to the `numpy` array BEFORE the recursive call to ensure the
`end_dt: DateTime` is selected and passed correctly!
Toss in some other helpfuls:
- more explicit `pendulum` typing imports
- some masked out sorted-diffing checks (that can be enabled when
debugging out-of-order frame issues)
- always error log about less-than time step mismatches since we should never
have time-diff steps **smaller** then specified in the
`sample_period_s`!
Yet again these are (going to be) generally useful in the data proc
layer as well as going forward with (possibly) moving the history and
shm rt-processing layer to apache (arrow or other) shared-ds
equivalents.
Includes a rename of `.data._timeseries` -> `.data.tsp` for "time series
processing", making it a public sub-mod; it contains a highly useful set
of data-frame and `numpy.ndarray` ops routines in various subsystems Bo
I guess since i started supporting the whole "allow a gap between
the latest tsdb sample and the latest retrieved history frame" the
overlap slicing has been completely borked XD where we've been sticking
in duplicate history samples and this has caused all sorts of down
stream time-series processing issues..
So fix that but ensuring whenever there IS an overlap between history in
the latest frame and the tsdb that we always prefer the latest frame's
data and slice OUT the tsdb's duplicate indices..
CHERRY TO #486
Think i finally figured out the weird issue without out-of-order OHLC
history getting jammed in the wrong place:
- gap is detected in parquet/offline ts (likely due to a zero dt or
other gap),
- query for history in the gap is made BUT that frame is then inserted
in the shm buffer **at the end** (likely using array int-entry
indexing) which inserts it at the wrong location,
- later this out-of-order frame is written to the storage layer
(parquet) and then is repeated on further reboots with the original
gap causing further queries for the same frame on every history
backfill.
A set of tools useful for detecting these issues and annotating them
nicely on chart part of this patch's intent:
- `dedupe()` will detect any dt gaps, deduplicate datetime rows and
return the de-duplicated df along with gaps table.
- use this in both `piker store anal` such that we potentially
resolve and backfill the gaps correctly if some rows were removed.
- possibly also use this to detect the backfilling error in logic at
the time of backfilling the frame instead of after the fact (which
would require re-writing the shm array from something like `store
ldshm` and would be a manual post-hoc solution, not a fix to the
original issue..
Been meaning to this for a while, and there's still a few design
/ interface kinks (like `.mkt: MktPair` which should be better
generalized?) but this flips over all of the fsp chaining engine
to operate on the higher level `Flume` APIs via the newly cobbled
`Cascade` thinger..
Allows opening with `.from_msg(readonly=False)` for write permissions
making underlyig shm arrays readonly. Also, make sure to pop the
`ShmArray` field entries prior to msg-ization, not sure how that worked
with the `Feed.flumes` equivalent..but?
A common usage error is to run `piker anal mnq.cme.ib` where the CLI
passed fqme is not actually fully-qualified (in this case missing an
expiry token) and we get an underlying `FileNotFoundError` from the
`StorageClient.read_ohlcv()` call. In such key misses, scan the existing
`StorageClient._index` for possible matches and report in a `raise from`
the new error.
CHERRY into #486
Since it probably IS sane to just assume a root-actor-as-registrar
listening on the localhost as a default, AND allows NOT expecting every
caller of `open_piker_runtime()` to not have to pass an addr set XD
This makes a bucha CLI shit work again after breakage due to no
default..
For now def it `.cli.load_trans_eps()` just inside the pkg mod; only
loads the ep for `pikerd` which currently acts as the main service-actor
registrar per host. Delegate to this new `.load_trans_eps()`
as-it-was-used from the `pikerd` cmd body and add fresh support for
`piker chart --maddr <addr: str>` using the routine in the body of the
`piker.cli.cli` cmd group after loading the `conf.toml::network` section
B)
Also, toss in runtime debug mode wrapping around `piker chart` using the
new `tractor.devx.maybe_open_crash_handler()` and pull the switch from
a `--pdb` flag now factored into the `.cli.cli` click group.
Since `tractor` and our runtime internals is now moved to multihomed semantics,
do the same in the CLI / config entrypoints.
Also, try using the new `tractor.devx.maybe_open_crash_handler()` around
the `pikerd` CLI.
When a new (actor) caller opens the registry there are 2 possible cases:
1. - some task already opened the registry during init and set the global
superset of registrar addrs that are expected to be used,
2. - some task after the init task opens with a subset of addrs.
3. - some task after init opens with a disjoint set - should be an error?
In the 2nd case we don't want to error since the may just not need to
know about other registrar (multi-homed) addrs and thus only needs
specific access - so only warn about the diff in that case. If the
caller is requesting some disjoint set then we still runtime raise.
Adjust `find_service()` to allow a null `registry_addrs` input in which
case we fail over to using whatever pre-set the `Registry.addrs` has;
makes it simple for actors that don't want/need to know about the global
registrar set for their actor tree. Also, always set pass
`tractor.find_actor(only_first=True)` (for now).