Apparently `.storage.nativedb.mk_ohlcv_shm_keyed_filepath()` was always
kinda broken if you passed in a `period: float` with an actual non-`int`
to the format string? Fixed it to strictly cast to `int()` before
str-ifying so that you don't get weird `60.0s.parquet` in there..
Further this rejigs the `sotre ldshm` gap correction-annotation loop to,
- use `StorageClient.write_ohlcv()` instead of hackily re-implementing
it.. now that problem from above is fixed!
- use a `needs_correction: bool` var to determine if gap markup and
de-duplictated data should be pushed to the shm buffer,
- go back to using `AnnotCtl.add_rect()` for all detected gaps such that
they all persist (and thus are shown together) until the client
disconnects.
Also toss in a poll loop around the `hist_shm: ShmArray` backfill
read-check in the `.data.allocate_persisten_feed()` init to cope with
possible racy-ness from the increased tsdb history loading concurrency
now implemented.
Yet again these are (going to be) generally useful in the data proc
layer as well as going forward with (possibly) moving the history and
shm rt-processing layer to apache (arrow or other) shared-ds
equivalents.
Think i finally figured out the weird issue without out-of-order OHLC
history getting jammed in the wrong place:
- gap is detected in parquet/offline ts (likely due to a zero dt or
other gap),
- query for history in the gap is made BUT that frame is then inserted
in the shm buffer **at the end** (likely using array int-entry
indexing) which inserts it at the wrong location,
- later this out-of-order frame is written to the storage layer
(parquet) and then is repeated on further reboots with the original
gap causing further queries for the same frame on every history
backfill.
A set of tools useful for detecting these issues and annotating them
nicely on chart part of this patch's intent:
- `dedupe()` will detect any dt gaps, deduplicate datetime rows and
return the de-duplicated df along with gaps table.
- use this in both `piker store anal` such that we potentially
resolve and backfill the gaps correctly if some rows were removed.
- possibly also use this to detect the backfilling error in logic at
the time of backfilling the frame instead of after the fact (which
would require re-writing the shm array from something like `store
ldshm` and would be a manual post-hoc solution, not a fix to the
original issue..
A common usage error is to run `piker anal mnq.cme.ib` where the CLI
passed fqme is not actually fully-qualified (in this case missing an
expiry token) and we get an underlying `FileNotFoundError` from the
`StorageClient.read_ohlcv()` call. In such key misses, scan the existing
`StorageClient._index` for possible matches and report in a `raise from`
the new error.
CHERRY into #486
Since there's a growing list of top level mods which are more or less
utils/tools for working with the runtime; begin to move them into a new
subpkg starting with a new `.toolz.debug`.
Start with,
- a new `open_crash_handller()` for doing breakpoints around blocks that
might error.
- move in what was `piker._profile` into `.toolz.profile` and adjust all
importing appropriately.
Changed from the old `store clone` to instead simply load any shm buffer
matching a user provided `FQME: str` pattern; writing to parquet file is
only done if an explicit option flag is passed by user.
Implement new `iter_dfs_from_shms()` generator which allows interatively
loading both 1m and 1s buffers delivering the `Path`, `ShmArray` and
`polars.DataFrame` instances per matching file B)
Also add a todo for a `NativeStorageClient.clear_range()` method.
For OHLCV time series we normally presume a uniform sampling period
(1s or 60s by default) and it's handy to have tools to ensure a series
is gapless or contains expected gaps based on (legacy) market hours.
For this we leverage `polars`:
- add `.nativedb.with_dts()` a datetime-from-epoch-time-column frame
"column-expander" which inserts datetime-casted, epoch-diff and
dt-diff columns.
- add `.nativedb.detect_time_gaps()` which filters to any larger then
expected sampling period rows.
- wrap the above (for now) in a `piker store anal` (analysis) cmd which
atm always enters a breakpoint for tinkering.
Supporting storage client additions:
- add a `detect_period()` helper for extracting expected OHLC time step.
- add new `NativedbStorageClient` methods and attrs to provide for the above:
- `.mk_path()` to **only** deliver a parquet-file path for use in
other methods.
- `._dfs` to house cached `pl.DataFrame`s loaded from `.parquet` files.
- `.as_df()` which loads cached frames or loads them from disk and
then caches (for next use).
- `_write_ohlcv()` a private-sync version of the public equivalent
meth since we don't currently have any actual async file IO
underneath; add a flag for whether to return as a `numpy.ndarray`.
After much frustration with a particular tsdb (cough) this instead
implements a new native-file (and apache tech based) backend which
stores time series in parquet files (for now) using the `polars` apis
(since we plan to use that lib as well for processing).
Note this code is currently **very** rough and in draft mode.
Details:
- add conversion routines for going from `polars.DataFrame` to
`numpy.ndarray` and back.
- lay out a simple file-name as series key symbology:
`fqme.<datadescriptions>.parquet`, though probably it will evolve.
- implement the entire `StorageClient` interface as it stands.
- adjust `storage.cli` cmds to instead expect to use this new backend,
which means it's a complete mess XD
Main benefits/motivation:
- wayy faster load times with no "datums to load limit" required.
- smaller space footprint and we haven't even touched compression
settings yet!
- wayyy more compatible with other systems which can lever the apache
ecosystem.
- gives us finer grained control over the filesystem usage so we can
choose to swap out stuff like the replication system or networking
access.