Compare commits

..

11 Commits

Author SHA1 Message Date
Tyler Goodlet 1f9a497637 Fixup symcache annot for kucoin as well 2023-12-15 16:01:31 -05:00
Tyler Goodlet 40c5d88a9b Fixup symcache type annots; no more `Pair` type 2023-12-15 16:00:51 -05:00
Tyler Goodlet 8989c73a93 Move `iter_dfs_from_shms` into `.data.history`
Thinking about just moving all of that module (after a content breakup)
to a new `.piker.tsp` which will mostly depend on the `.data` and
`.storage` sub-pkgs; the idea is to move biz-logic for tsdb IO/mgmt and
orchestration with real-time (shm) buffers and the graphics layer into
a common spot for both manual analysis/research work and better
separation of low level data structure primitives from their higher
level usage.

Add a better `data.history` mod doc string in prep for this move
as well as clean out a bunch of legacy commented cruft from the
`trimeter` and `marketstore` days.

TO CHERRY #486 (if we can)
2023-12-15 15:53:02 -05:00
Tyler Goodlet 3639f360c3 Reactivate forced viz updates from sampler broadcasts in hist display loop 2023-12-15 13:59:19 -05:00
Tyler Goodlet afd0781b62 Add (shm) abs index to `ContextLabel` 2023-12-15 13:57:10 -05:00
Tyler Goodlet ba154ef413 ib: don't bother with recursive not-enough-bars queries for now, causes more problems then it solves.. 2023-12-15 13:56:42 -05:00
Tyler Goodlet 97e2403fb1 Rework backfiller and null-segment task conc
For each timeframe open a sub-nursery to do the backfilling + tsdb load
+ null-segment scanning in an effort to both speed up load time (though
we need to reverse the current order to really make it faster rn since
moving to the much faster parquet file backend) and do concurrent
time-gap/null-segment checking of tsdb history while mrf (most recent
frame) history is backfilling.

The details are more or less just `trio` related task-func composition
tricks and a reordering of said funcs for optimal startup latency.
Also commented the `back_load_from_tsdb()` task for now since it's
unused.
2023-12-15 13:11:00 -05:00
Tyler Goodlet a4084d6a0b Bleh, fix another off-by-one issue in `np.argwhere()`
Apparently it returns the index of the prior zero-row (prolly since we
do the backward difference) so ensure `fi_zgaps += 1`..

Also fix remaining edge case handling when there's only 2 zero-segs
which was borked after a refactor to the special case blocks (like
a single zero row) prior to the `absi_zsegs` building loop AND make sure
to always return abs indices OUTSIDE the zero seg, i.e. the indices of
the non-zero row just before and just after so that the history
backfiller can use non-zero timestamps to generate range datetimes for
backend frame queries.

Add much more detailed doc-comments with a small ascii diagram to
explain how all these somewhat subtle vec ops work. Also toss in some
sanity checks on the output indices to ensure they don't point to
zero (time) valued rows when used to read the frame.
2023-12-15 12:48:50 -05:00
Tyler Goodlet 83bdca46a2 Wrap null-gap detect and fill in async gen
Call it `iter_null_segs()` (for now?) and use in the final (sequential)
stage of the `.history.start_backfill()` task-func. Delivers abs,
frame-relative, and equiv time stamps on each iteration pertaining to
each detected null-segment to make it easy to do piece-wise history
queries for each.

Further,
- handle edge case in `get_null_segs()` where there is only 1 zeroed
  row value, in which case we deliver `absi_zsegs` as a single pair of
  the same index value and,
  - when this occurs `iter_null_seqs()` delivers `None` for all the
    `start_` related indices/timestamps since all `get_hist()` routines
    (delivered by `open_history_client()`) should handle it as being a
    "get max history from this end_dt" type query.
- add note about needing to do time gap handling where there's a gap in
  the timeseries-history that isn't actually IN the data-history.
2023-12-13 18:29:06 -05:00
Tyler Goodlet c129f5bb4a Finally write a general purpose null-gap detector!
Using a bunch of fancy `numpy` vec ops (and ideally eventually extending
the same to `polars`) this is a first draft of `get_null_segs()`
a `col: str` field-value-is-zero detector which filters to all zero-valued
input frame segments and returns the corresponding useful slice-indexes:
- gap absolute (in shm buffer terms) index-endpoints as
  `absi_zsegs` for slicing to each null-segment in the src frame.
- ALL abs indices of rows with zeroed `col` values as `absi_zeros`.
- the full set of the input frame's row-entries (view) which are
  null valued for the chosen `col` as `zero_t`.

Use this new null-segment-detector in the
`.data.history.start_backfill()` task to attempt to fill null gaps that
might be extant from some prior backfill attempt. Since
`get_null_segs()` should now deliver a sequence of slices for each gap
we don't really need to have the `while gap_indices:` loop any more, so
just move that to the end-of-func and warn log (for now) if all gaps
aren't eventually filled.

TODO:
-[ ] do the null-seg detection and filling concurrently from
  most-recent-frame backfilling.
-[ ] offer the same detection in `.storage.cli` cmds for manual tsp
  anal.
-[ ] make the graphics layer actually update correctly when null-segs
  are filled (currently still broken somehow in the `Viz` caching
  layer?)

CHERRY INTO #486
2023-12-13 15:26:33 -05:00
Tyler Goodlet c4853a3fee Drop inter-method NL 2023-12-13 09:27:23 -05:00
10 changed files with 931 additions and 474 deletions

View File

@ -401,7 +401,15 @@ class Client:
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
if end_dt:
# XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
if (
end_dt
and False
):
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
first: float = times[0]
@ -410,6 +418,7 @@ class Client:
if (
# len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds()
# and False
):
end_dt: DateTime = from_timestamp(first)
log.warning(
@ -859,6 +868,9 @@ class Client:
timeout=timeout,
)
except TimeoutError:
import pdbp
pdbp.set_trace()
if raise_on_timeout:
raise
return None

View File

@ -174,8 +174,15 @@ async def open_history_client(
start_dt: datetime | None = None,
) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
if (
start_dt
and start_dt.timestamp() == 0
):
await tractor.pause()
query_start = time.time()
out, timedout = await get_bars(
proxy,
@ -403,35 +410,55 @@ async def get_bars(
bars, bars_array, dt_duration = out
# not enough bars signal, likely due to venue
# operational gaps.
too_little: bool = False
if (
end_dt
and (
not bars
or (too_little :=
start_dt
and (len(bars) * timeframe)
< dt_duration.in_seconds()
)
)
):
if (
end_dt
or too_little
):
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
raise NoData(f'{end_dt}')
if bars_array is None:
raise SymbolNotFound(fqme)
# not enough bars signal, likely due to venue
# operational gaps.
# too_little: bool = False
if end_dt:
if not bars:
# no data returned?
log.warning(
'History frame is blank?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
)
raise NoData(f'{end_dt}')
else:
dur_s: float = len(bars) * timeframe
bars_dur = pendulum.Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
@ -854,7 +881,13 @@ async def stream_quotes(
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker = await proxy.get_quote(contract=con)
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
if first_ticker:
first_quote: dict = normalize(first_ticker)
log.info(
@ -862,18 +895,6 @@ async def stream_quotes(
f'{pformat(first_quote)}'
)
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
@ -884,6 +905,8 @@ async def stream_quotes(
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
@ -907,6 +930,19 @@ async def stream_quotes(
await trio.sleep_forever()
return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# XXX: MUST acquire a ticker + first quote before starting
# the live quotes loop!
# with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
cs: trio.CancelScope | None = None
startup: bool = True
while (

View File

@ -41,7 +41,6 @@ from typing import (
import wsproto
from uuid import uuid4
from rapidfuzz import process as fuzzy
from trio_typing import TaskStatus
import asks
from bidict import bidict
@ -416,8 +415,7 @@ class Client:
await self.get_mkt_pairs()
assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
matches: dict[str, Pair] = match_from_pairs(
matches: dict[str, KucoinMktPair] = match_from_pairs(
pairs=self._pairs,
# query=pattern.upper(),
query=pattern.upper(),

View File

@ -31,6 +31,8 @@ from pathlib import Path
from pprint import pformat
from typing import (
Any,
Sequence,
Hashable,
TYPE_CHECKING,
)
from types import ModuleType
@ -128,8 +130,8 @@ class SymbologyCache(Struct):
- `.get_mkt_pairs()`: returning a table of pair-`Struct`
types, custom defined by the particular backend.
AND, the required `.get_mkt_info()` module-level endpoint which
maps `fqme: str` -> `MktPair`s.
AND, the required `.get_mkt_info()` module-level endpoint
which maps `fqme: str` -> `MktPair`s.
These tables are then used to fill out the `.assets`, `.pairs` and
`.mktmaps` tables on this cache instance, respectively.
@ -500,7 +502,7 @@ def match_from_pairs(
)
# pop and repack pairs in output dict
matched_pairs: dict[str, Pair] = {}
matched_pairs: dict[str, Struct] = {}
for item in matches:
pair_key: str = item[0]
matched_pairs[pair_key] = pairs[pair_key]

View File

@ -16,19 +16,26 @@
# <https://www.gnu.org/licenses/>.
'''
Historical data business logic for load, backfill and tsdb storage.
Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
orchestration and mgmt of tsdb data sets.
- core data-provider history backfilling middleware (as task-funcs) via
(what will eventually be `datad`, but are rn is the) `.brokers` backend
APIs.
- various data set cleaning, repairing and issue-detection/analysis
routines to ensure consistent series whether in shm or when
stored offline (in a tsdb).
'''
from __future__ import annotations
# from collections import (
# Counter,
# )
from datetime import datetime
from functools import partial
# import time
from pathlib import Path
from types import ModuleType
from typing import (
Callable,
Generator,
TYPE_CHECKING,
)
@ -36,6 +43,7 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
DateTime,
Duration,
from_timestamp,
)
@ -56,7 +64,14 @@ from ._source import def_iohlcv_fields
from ._sampling import (
open_sample_stream,
)
from . import tsp
from .tsp import (
dedupe,
get_null_segs,
iter_null_segs,
sort_diff,
Frame,
# Seq,
)
from ..brokers._util import (
DataUnavailable,
)
@ -110,6 +125,7 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()]
# TODO: can't we just make this a sync func now?
async def shm_push_in_between(
shm: ShmArray,
to_push: np.ndarray,
@ -118,6 +134,10 @@ async def shm_push_in_between(
update_start_on_prepend: bool = False,
) -> int:
# XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
shm.push(
to_push,
prepend=True,
@ -138,10 +158,102 @@ async def shm_push_in_between(
else None
),
)
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
async def maybe_fill_null_segments(
shm: ShmArray,
timeframe: float,
get_hist: Callable,
sampler_stream: tractor.MsgStream,
mkt: MktPair,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
) -> list[Frame]:
null_segs_detected = trio.Event()
task_status.started(null_segs_detected)
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
for (
absi_start, absi_end,
fi_start, fi_end,
start_t, end_t,
start_dt, end_dt,
) in iter_null_segs(
null_segs=null_segs,
frame=frame,
timeframe=timeframe,
):
# XXX NOTE: ?if we get a badly ordered timestamp
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
):
await tractor.pause()
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(absi_end, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=absi_end,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
null_segs_detected.set()
# RECHECK for more null-gaps
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if (
null_segs
and
len(null_segs[-1])
):
await tractor.pause()
array = shm.array
zeros = array[array['low'] == 0]
@ -155,10 +267,24 @@ async def shm_push_in_between(
'low',
'close',
]] = shm._array[zeros['index'][0] - 1]['close']
# await tractor.pause()
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# if (
# next_end_dt not in frame[
# ):
# pass
async def start_backfill(
tn: trio.Nursery,
get_hist,
mod: ModuleType,
mkt: MktPair,
@ -213,27 +339,20 @@ async def start_backfill(
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# TODO: can we drop this? without conc i don't think this
# is necessary any more?
# configure async query throttling
# rate = config.get('rate', 1)
# XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts and associated counts of how many duplicates we see
# per time stamp.
# starts: Counter[datetime] = Counter()
# conduct "backward history gap filling" where we push to
# the shm buffer until we have history back until the
# latest entry loaded from the tsdb's table B)
# STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B)
# - after this loop continue to check for other gaps in the
# (tsdb) history and (at least report) maybe fill them
# from new frame queries to the backend?
last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index
while last_start_dt > backfill_until_dt:
log.debug(
f'Requesting {timeframe}s frame ending in {last_start_dt}'
log.info(
f'Requesting {timeframe}s frame:\n'
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
)
try:
@ -261,36 +380,18 @@ async def start_backfill(
await tractor.pause()
return
# TODO: drop this? see todo above..
# if (
# next_start_dt in starts
# and starts[next_start_dt] <= 6
# ):
# start_dt = min(starts)
# log.warning(
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
# )
# starts[start_dt] += 1
# await tractor.pause()
# continue
# elif starts[next_start_dt] > 6:
# log.warning(
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
# )
# return
# # only update new start point if not-yet-seen
# starts[next_start_dt] += 1
assert array['time'][0] == next_start_dt.timestamp()
assert (
array['time'][0]
==
next_start_dt.timestamp()
)
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * timeframe
expected_frame_size_s = frame_size_s + timeframe
frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our
@ -298,8 +399,10 @@ async def start_backfill(
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
log.warning(
f'History frame ending @ {last_start_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds'
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
)
to_push = diff_history(
@ -415,74 +518,15 @@ async def start_backfill(
gaps,
deduped,
diff,
) = tsp.dedupe(df)
) = dedupe(df)
if diff:
tsp.sort_diff(df)
sort_diff(df)
else:
# finally filled gap
log.info(
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
)
# conduct tsdb timestamp gap detection and backfill any
# seemingly missing sequence segments..
# TODO: ideally these never exist but somehow it seems
# sometimes we're writing zero-ed segments on certain
# (teardown) cases?
from .tsp import detect_null_time_gap
gap_indices: tuple | None = detect_null_time_gap(shm)
while gap_indices:
(
istart,
start,
end,
iend,
) = gap_indices
start_dt = from_timestamp(start)
end_dt = from_timestamp(end)
# if we get a baddly ordered timestamp
# pair, imeeditately stop backfilling.
if end_dt < start_dt:
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(iend, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=iend,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
'backfilling': (mkt.fqme, timeframe),
},
})
gap_indices: tuple | None = detect_null_time_gap(shm)
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
@ -494,6 +538,12 @@ async def start_backfill(
bf_done.set()
# NOTE: originally this was used to cope with a tsdb (marketstore)
# which could not delivery very large frames of history over gRPC
# (thanks goolag) due to corruption issues. NOW, using apache
# parquet (by default in the local filesys) we don't have this
# requirement since the files can be loaded very quickly in
# entirety to memory via
async def back_load_from_tsdb(
storemod: ModuleType,
storage: StorageClient,
@ -631,10 +681,94 @@ async def back_load_from_tsdb(
# await sampler_stream.send('broadcast_all')
async def push_latest_frame(
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
],
timeframe: float,
config: dict,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
end_dt=None,
)
# so caller can access these ep values
dt_eps.extend([
mr_start_dt,
mr_end_dt,
])
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
if timeframe > 1:
await tractor.pause()
return
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
) -> tuple[
np.ndarray,
DateTime,
DateTime,
] | None:
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
tsdb_entry: tuple[
np.ndarray,
DateTime,
DateTime,
]
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
return tsdb_entry
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
return None
async def tsdb_backfill(
mod: ModuleType,
storemod: ModuleType,
tn: trio.Nursery,
storage: StorageClient,
mkt: MktPair,
@ -649,96 +783,101 @@ async def tsdb_backfill(
) -> None:
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(
async with (
mod.open_history_client(
mkt,
) as (get_hist, config):
) as (get_hist, config),
# NOTE: this sub-nursery splits to tasks for the given
# sampling rate to concurrently load offline tsdb
# timeseries as well as new data from the venue backend!
):
log.info(
f'`{mod}` history client returned backfill config:\n'
f'{config}\n'
)
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
push_latest_frame,
dt_eps,
shm,
get_hist,
timeframe,
end_dt=None,
config,
)
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
await tractor.pause()
return
# TODO: fill in non-zero epoch time values ALWAYS!
# hist_shm._array['time'] = np.arange(
# start=
# NOTE: removed for now since it'll always break
# on the first 60s of the venue open..
# times: np.ndarray = array['time']
# # sample period step size in seconds
# step_size_s = (
# from_timestamp(times[-1])
# - from_timestamp(times[-2])
# ).seconds
# if step_size_s not in (1, 60):
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
# step_size_s = (
# from_timestamp(times[-2])
# - from_timestamp(times[-3])
# ).seconds
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
backfill_gap_from_shm_index: int = shm._first.value + 1
# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
last_tsdb_dt: datetime | None = None
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
else:
# NOTE: iabs to start backfilling from, reverse chronological,
# ONLY AFTER the first history frame has been pushed to
# mem!
backfill_gap_from_shm_index: int = shm._first.value + 1
(
mr_start_dt,
mr_end_dt,
) = dt_eps
async with trio.open_nursery() as tn:
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if tsdb_entry:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
tn=tn,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
@ -804,36 +943,32 @@ async def tsdb_backfill(
log.info(f'Loaded {to_push.shape} datums from storage')
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
get_hist=get_hist,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
mkt=mkt,
))
# TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
@ -848,32 +983,29 @@ async def tsdb_backfill(
# backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above.
try:
await trio.sleep_forever()
finally:
return
# XXX NOTE: this is legacy from when we were using
# marketstore and we needed to continue backloading
# incrementally from the tsdb client.. (bc it couldn't
# handle a single large query with gRPC for some
# reason.. classic goolag pos)
tn.start_soon(
back_load_from_tsdb,
# tn.start_soon(
# back_load_from_tsdb,
storemod,
storage,
fqme,
# storemod,
# storage,
# fqme,
tsdb_history,
last_tsdb_dt,
mr_start_dt,
mr_end_dt,
bf_done,
# tsdb_history,
# last_tsdb_dt,
# mr_start_dt,
# mr_end_dt,
# bf_done,
timeframe,
shm,
)
# timeframe,
# shm,
# )
async def manage_history(
@ -981,6 +1113,11 @@ async def manage_history(
async with (
storage.open_storage_client() as (storemod, client),
# NOTE: this nursery spawns a task per "timeframe" (aka
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
trio.open_nursery() as tn,
):
log.info(
@ -1030,7 +1167,6 @@ async def manage_history(
tsdb_backfill,
mod=mod,
storemod=storemod,
tn=tn,
# bus,
storage=client,
mkt=mkt,
@ -1059,3 +1195,70 @@ async def manage_history(
# and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing.
await trio.sleep_forever()
def iter_dfs_from_shms(
fqme: str
) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)

View File

@ -29,12 +29,19 @@ from math import (
floor,
)
import time
from typing import Literal
from typing import (
Literal,
# AsyncGenerator,
Generator,
)
import numpy as np
import polars as pl
from pendulum import (
DateTime,
from_timestamp,
)
from ._sharedmem import ShmArray
from ..toolz.profile import (
Profiler,
pg_profile_enabled,
@ -53,6 +60,14 @@ get_console_log = partial(
name=subsys,
)
# NOTE: union type-defs to handle generic `numpy` and `polars` types
# side-by-side Bo
# |_ TODO: schema spec typing?
# -[ ] nptyping!
# -[ ] wtv we can with polars?
Frame = pl.DataFrame | np.ndarray
Seq = pl.Series | np.ndarray
def slice_from_time(
arr: np.ndarray,
@ -209,51 +224,282 @@ def slice_from_time(
return read_slc
def detect_null_time_gap(
shm: ShmArray,
def get_null_segs(
frame: Frame,
period: float, # sampling step in seconds
imargin: int = 1,
col: str = 'time',
) -> tuple[float, float] | None:
) -> tuple[
# Seq, # TODO: can we make it an array-type instead?
list[
list[int, int],
],
Seq,
Frame
] | None:
'''
Detect if there are any zero-epoch stamped rows in
the presumed 'time' field-column.
Detect if there are any zero(-epoch stamped) valued
rows in for the provided `col: str` column; by default
presume the 'time' field/column.
Filter to the gap and return a surrounding index range.
Filter to all such zero (time) segments and return
the corresponding frame zeroed segment's,
NOTE: for now presumes only ONE gap XD
- gap absolute (in buffer terms) indices-endpoints as
`absi_zsegs`
- abs indices of all rows with zeroed `col` values as `absi_zeros`
- the corresponding frame's row-entries (view) which are
zeroed for the `col` as `zero_t`
'''
# ensure we read buffer state only once so that ShmArray rt
# circular-buffer updates don't cause a indexing/size mismatch.
array: np.ndarray = shm.array
times: Seq = frame['time']
zero_pred: Seq = (times == 0)
zero_pred: np.ndarray = array['time'] == 0
zero_t: np.ndarray = array[zero_pred]
if zero_t.size:
istart, iend = zero_t['index'][[0, -1]]
start, end = shm._array['time'][
[istart - imargin, iend + imargin]
]
return (
istart - imargin,
start,
end,
iend + imargin,
)
if isinstance(frame, np.ndarray):
tis_zeros: int = zero_pred.any()
else:
tis_zeros: int = zero_pred.any()
if not tis_zeros:
return None
# TODO: use ndarray for this?!
absi_zsegs: list[list[int, int]] = []
t_unit: Literal = Literal[
'days',
'hours',
'minutes',
'seconds',
'miliseconds',
'microseconds',
'nanoseconds',
]
if isinstance(frame, np.ndarray):
# view of ONLY the zero segments as one continuous chunk
zero_t: np.ndarray = frame[zero_pred]
# abs indices of said zeroed rows
absi_zeros = zero_t['index']
# diff of abs index steps between each zeroed row
absi_zdiff: np.ndarray = np.diff(absi_zeros)
# scan for all frame-indices where the
# zeroed-row-abs-index-step-diff is greater then the
# expected increment of 1.
# data 1st zero seg data zeros
# ---- ------------ ---- ----- ------ ----
# ||||..000000000000..||||..00000..||||||..0000
# ---- ------------ ---- ----- ------ ----
# ^zero_t[0] ^zero_t[-1]
# ^fi_zgaps[0] ^fi_zgaps[1]
# ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple
# absi_zsegs[0][1]^
#
# NOTE: the first entry in `fi_zgaps` is where
# the first (absolute) index step diff is > 1.
# and it is a frame-relative index into `zero_t`.
fi_zgaps = np.argwhere(
absi_zdiff > 1
# NOTE: +1 here is ensure we index to the "start" of each
# segment (if we didn't the below loop needs to be
# re-written to expect `fi_end_rows`!
) + 1
# the rows from the contiguous zeroed segments which have
# abs-index steps >1 compared to the previous zero row
# (indicating an end of zeroed segment).
fi_zseg_start_rows = zero_t[fi_zgaps]
# TODO: equiv for pl.DataFrame case!
else:
izeros: pl.Series = zero_pred.arg_true()
zero_t: pl.DataFrame = frame[izeros]
absi_zeros = zero_t['index']
absi_zdiff: pl.Series = absi_zeros.diff()
fi_zgaps = (absi_zdiff > 1).arg_true()
# XXX: our goal (in this func) is to select out slice index
# pairs (zseg0_start, zseg_end) in abs index units for each
# null-segment portion detected throughout entire input frame.
# only up to one null-segment in entire frame?
num_gaps: int = fi_zgaps.size + 1
if num_gaps < 1:
if absi_zeros.size > 1:
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
max(
absi_zeros[0] - 1,
0,
),
# NOTE: need the + 1 to guarantee we index "up to"
# the next non-null row-datum.
min(
absi_zeros[-1] + 1,
frame['index'][-1],
),
]]
else:
# XXX EDGE CASE: only one null-datum found so
# mark the start abs index as None to trigger
# a full frame-len query to the respective backend?
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
None,
absi_zeros[0] + 1,
]]
# XXX NOTE XXX: if >= 2 zeroed segments are found, there should
# ALWAYS be more then one zero-segment-abs-index-step-diff row
# in `absi_zdiff`, so loop through all such
# abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`)
# and add them as the "end index" entries for each segment.
# Then, iif NOT iterating the first such segment end, look back
# for the prior segments zero-segment start indext by relative
# indexing the `zero_t` frame by -1 and grabbing the abs index
# of what should be the prior zero-segment abs start index.
else:
# NOTE: since `absi_zdiff` will never have a row
# corresponding to the first zero-segment's row, we add it
# manually here.
absi_zsegs.append([
absi_zeros[0] - 1,
None,
])
# TODO: can we do it with vec ops?
for i, (
fi, # frame index of zero-seg start
zseg_start_row, # full row for ^
) in enumerate(zip(
fi_zgaps,
fi_zseg_start_rows,
)):
assert (zseg_start_row == zero_t[fi]).all()
iabs: int = zseg_start_row['index'][0]
absi_zsegs.append([
iabs - 1,
None, # backfilled on next iter
])
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
# absi_pre_zseg = absi - 1
# final iter case, backfill FINAL end iabs!
if (i + 1) == fi_zgaps.size:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
# NOTE: only after the first segment (due to `.diff()`
# usage above) can we do a lookback to the prior
# segment's end row and determine it's abs index to
# retroactively insert to the prior
# `absi_zsegs[i-1][1]` entry Bo
last_end: int = absi_zsegs[i][1]
if last_end is None:
prev_zseg_row = zero_t[fi - 1]
absi_post_zseg = prev_zseg_row['index'][0] + 1
# XXX: MUST BACKFILL previous end iabs!
absi_zsegs[i][1] = absi_post_zseg
else:
if 0 < num_gaps < 2:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
iabs_first: int = frame['index'][0]
for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first]
if (
ts_start == 0
or
ts_end == 0
):
import pdbp
pdbp.set_trace()
assert end
assert start < end
log.warning(
f'Frame has {len(absi_zsegs)} NULL GAPS!?\n'
f'period: {period}\n'
f'total null samples: {len(zero_t)}\n'
)
return (
absi_zsegs, # [start, end] abs slice indices of seg
absi_zeros, # all abs indices within all null-segs
zero_t, # sliced-view of all null-segment rows-datums
)
def iter_null_segs(
timeframe: float,
frame: Frame | None = None,
null_segs: tuple | None = None,
) -> Generator[
tuple[
int, int,
int, int,
float, float,
float, float,
# Seq, # TODO: can we make it an array-type instead?
# list[
# list[int, int],
# ],
# Seq,
# Frame
],
None,
]:
if null_segs is None:
null_segs: tuple = get_null_segs(
frame,
period=timeframe,
)
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
absi_first: int = frame[0]['index']
for (
absi_start,
absi_end,
) in absi_pairs_zsegs:
fi_end: int = absi_end - absi_first
end_row: Seq = frame[fi_end]
end_t: float = end_row['time']
end_dt: DateTime = from_timestamp(end_t)
fi_start = None
start_row = None
start_t = None
start_dt = None
if (
absi_start is not None
and start_t != 0
):
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)
if absi_start < 0:
import pdbp
pdbp.set_trace()
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
)
def with_dts(
@ -292,6 +538,17 @@ def dedup_dt(
)
t_unit: Literal = Literal[
'days',
'hours',
'minutes',
'seconds',
'miliseconds',
'microseconds',
'nanoseconds',
]
def detect_time_gaps(
df: pl.DataFrame,
@ -406,10 +663,6 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = detect_null_time_gap()
return (
df,
gaps,
@ -428,14 +681,19 @@ def sort_diff(
list[int], # indices of segments that are out-of-order
]:
ser: pl.Series = src_df[col]
diff: pl.Series = ser.diff()
sortd: pl.DataFrame = ser.sort()
diff: pl.Series = ser.diff()
sortd_diff: pl.Series = sortd.diff()
i_step_diff = (diff != sortd_diff).arg_true()
if i_step_diff.len():
import pdbp
pdbp.set_trace()
frame_reorders: int = i_step_diff.len()
if frame_reorders:
log.warn(
f'Resorted frame on col: {col}\n'
f'{frame_reorders}'
)
# import pdbp; pdbp.set_trace()
# NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back:

View File

@ -21,8 +21,6 @@ Storage middle-ware CLIs.
from __future__ import annotations
from pathlib import Path
import time
from typing import Generator
# from typing import TYPE_CHECKING
import polars as pl
import numpy as np
@ -37,14 +35,11 @@ from piker.service import open_piker_runtime
from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.data.history import (
_default_hist_size,
_default_rt_size,
iter_dfs_from_shms,
)
from . import (
log,
@ -190,6 +185,13 @@ def anal(
)
assert first_dt < last_dt
null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
if null_segs:
await tractor.pause()
shm_df: pl.DataFrame = await client.as_df(
fqme,
period,
@ -204,6 +206,7 @@ def anal(
diff,
) = tsp.dedupe(shm_df)
if diff:
await client.write_ohlcv(
fqme,
@ -219,69 +222,6 @@ def anal(
trio.run(main)
def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)
@store.command()
def ldshm(
fqme: str,
@ -307,8 +247,8 @@ def ldshm(
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
secs: float = times[-1] - times[-2]
if secs < 1.:
period_s: float = times[-1] - times[-2]
if period_s < 1.:
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
@ -323,17 +263,22 @@ def ldshm(
diff,
) = tsp.dedupe(shm_df)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if (
not gaps.is_empty()
or secs > 2
):
if not gaps.is_empty():
await tractor.pause()
if null_segs:
await tractor.pause()
# write to parquet file?
if write_parquet:
timeframe: str = f'{secs}s'
timeframe: str = f'{period_s}s'
datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():

View File

@ -342,7 +342,6 @@ class NativeStorageClient:
)
return path
async def write_ohlcv(
self,
fqme: str,

View File

@ -207,9 +207,10 @@ class ContentsLabel(pg.LabelItem):
# this being "html" is the dumbest shit :eyeroll:
self.setText(
"<b>i</b>:{index}<br/>"
"<b>i_arr</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via
# the slice syntax below.
"<b>i_shm</b>:{}<br/>"
"<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>"
@ -219,6 +220,7 @@ class ContentsLabel(pg.LabelItem):
# "<b>wap</b>:{}".format(
*array[ix][
[
'index',
'time',
'open',
'high',

View File

@ -248,25 +248,27 @@ async def increment_history_view(
# - samplerd could emit the actual update range via
# tuple and then we only enter the below block if that
# range is detected as in-view?
if (
(bf_wut := msg.get('backfilling', False))
):
viz_name, timeframe = bf_wut
if (
match msg:
case {
'backfilling': (viz_name, timeframe),
} if (
viz_name == name
):
log.warning(
f'Forcing HARD REDRAW:\n'
f'name: {name}\n'
f'timeframe: {timeframe}\n'
)
# TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the
# path necessary?
and False
):
log.info(f'Forcing hard redraw -> {name}@{timeframe}')
match timeframe:
case 60:
hist_viz.update_graphics(force_redraw=True)
case 1:
viz.update_graphics(force_redraw=True)
{
60: hist_viz,
1: viz,
}[timeframe].update_graphics(
force_redraw=True
)
# check if slow chart needs an x-domain shift and/or
# y-range resize.