Compare commits

...

11 Commits

Author SHA1 Message Date
Tyler Goodlet 1f9a497637 Fixup symcache annot for kucoin as well 2023-12-15 16:01:31 -05:00
Tyler Goodlet 40c5d88a9b Fixup symcache type annots; no more `Pair` type 2023-12-15 16:00:51 -05:00
Tyler Goodlet 8989c73a93 Move `iter_dfs_from_shms` into `.data.history`
Thinking about just moving all of that module (after a content breakup)
to a new `.piker.tsp` which will mostly depend on the `.data` and
`.storage` sub-pkgs; the idea is to move biz-logic for tsdb IO/mgmt and
orchestration with real-time (shm) buffers and the graphics layer into
a common spot for both manual analysis/research work and better
separation of low level data structure primitives from their higher
level usage.

Add a better `data.history` mod doc string in prep for this move
as well as clean out a bunch of legacy commented cruft from the
`trimeter` and `marketstore` days.

TO CHERRY #486 (if we can)
2023-12-15 15:53:02 -05:00
Tyler Goodlet 3639f360c3 Reactivate forced viz updates from sampler broadcasts in hist display loop 2023-12-15 13:59:19 -05:00
Tyler Goodlet afd0781b62 Add (shm) abs index to `ContextLabel` 2023-12-15 13:57:10 -05:00
Tyler Goodlet ba154ef413 ib: don't bother with recursive not-enough-bars queries for now, causes more problems then it solves.. 2023-12-15 13:56:42 -05:00
Tyler Goodlet 97e2403fb1 Rework backfiller and null-segment task conc
For each timeframe open a sub-nursery to do the backfilling + tsdb load
+ null-segment scanning in an effort to both speed up load time (though
we need to reverse the current order to really make it faster rn since
moving to the much faster parquet file backend) and do concurrent
time-gap/null-segment checking of tsdb history while mrf (most recent
frame) history is backfilling.

The details are more or less just `trio` related task-func composition
tricks and a reordering of said funcs for optimal startup latency.
Also commented the `back_load_from_tsdb()` task for now since it's
unused.
2023-12-15 13:11:00 -05:00
Tyler Goodlet a4084d6a0b Bleh, fix another off-by-one issue in `np.argwhere()`
Apparently it returns the index of the prior zero-row (prolly since we
do the backward difference) so ensure `fi_zgaps += 1`..

Also fix remaining edge case handling when there's only 2 zero-segs
which was borked after a refactor to the special case blocks (like
a single zero row) prior to the `absi_zsegs` building loop AND make sure
to always return abs indices OUTSIDE the zero seg, i.e. the indices of
the non-zero row just before and just after so that the history
backfiller can use non-zero timestamps to generate range datetimes for
backend frame queries.

Add much more detailed doc-comments with a small ascii diagram to
explain how all these somewhat subtle vec ops work. Also toss in some
sanity checks on the output indices to ensure they don't point to
zero (time) valued rows when used to read the frame.
2023-12-15 12:48:50 -05:00
Tyler Goodlet 83bdca46a2 Wrap null-gap detect and fill in async gen
Call it `iter_null_segs()` (for now?) and use in the final (sequential)
stage of the `.history.start_backfill()` task-func. Delivers abs,
frame-relative, and equiv time stamps on each iteration pertaining to
each detected null-segment to make it easy to do piece-wise history
queries for each.

Further,
- handle edge case in `get_null_segs()` where there is only 1 zeroed
  row value, in which case we deliver `absi_zsegs` as a single pair of
  the same index value and,
  - when this occurs `iter_null_seqs()` delivers `None` for all the
    `start_` related indices/timestamps since all `get_hist()` routines
    (delivered by `open_history_client()`) should handle it as being a
    "get max history from this end_dt" type query.
- add note about needing to do time gap handling where there's a gap in
  the timeseries-history that isn't actually IN the data-history.
2023-12-13 18:29:06 -05:00
Tyler Goodlet c129f5bb4a Finally write a general purpose null-gap detector!
Using a bunch of fancy `numpy` vec ops (and ideally eventually extending
the same to `polars`) this is a first draft of `get_null_segs()`
a `col: str` field-value-is-zero detector which filters to all zero-valued
input frame segments and returns the corresponding useful slice-indexes:
- gap absolute (in shm buffer terms) index-endpoints as
  `absi_zsegs` for slicing to each null-segment in the src frame.
- ALL abs indices of rows with zeroed `col` values as `absi_zeros`.
- the full set of the input frame's row-entries (view) which are
  null valued for the chosen `col` as `zero_t`.

Use this new null-segment-detector in the
`.data.history.start_backfill()` task to attempt to fill null gaps that
might be extant from some prior backfill attempt. Since
`get_null_segs()` should now deliver a sequence of slices for each gap
we don't really need to have the `while gap_indices:` loop any more, so
just move that to the end-of-func and warn log (for now) if all gaps
aren't eventually filled.

TODO:
-[ ] do the null-seg detection and filling concurrently from
  most-recent-frame backfilling.
-[ ] offer the same detection in `.storage.cli` cmds for manual tsp
  anal.
-[ ] make the graphics layer actually update correctly when null-segs
  are filled (currently still broken somehow in the `Viz` caching
  layer?)

CHERRY INTO #486
2023-12-13 15:26:33 -05:00
Tyler Goodlet c4853a3fee Drop inter-method NL 2023-12-13 09:27:23 -05:00
10 changed files with 931 additions and 474 deletions

View File

@ -401,7 +401,15 @@ class Client:
# => we recursively call this method until we get at least # => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the # as many bars such that they sum in aggregate to the the
# desired total time (duration) at most. # desired total time (duration) at most.
if end_dt: # XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
if (
end_dt
and False
):
nparr: np.ndarray = bars_to_np(bars) nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time'] times: np.ndarray = nparr['time']
first: float = times[0] first: float = times[0]
@ -410,6 +418,7 @@ class Client:
if ( if (
# len(bars) * sample_period_s) < dt_duration.in_seconds() # len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds() tdiff < dt_duration.in_seconds()
# and False
): ):
end_dt: DateTime = from_timestamp(first) end_dt: DateTime = from_timestamp(first)
log.warning( log.warning(
@ -859,6 +868,9 @@ class Client:
timeout=timeout, timeout=timeout,
) )
except TimeoutError: except TimeoutError:
import pdbp
pdbp.set_trace()
if raise_on_timeout: if raise_on_timeout:
raise raise
return None return None

View File

@ -174,8 +174,15 @@ async def open_history_client(
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[np.ndarray, str]: ) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count nonlocal max_timeout, mean, count
if (
start_dt
and start_dt.timestamp() == 0
):
await tractor.pause()
query_start = time.time() query_start = time.time()
out, timedout = await get_bars( out, timedout = await get_bars(
proxy, proxy,
@ -403,35 +410,55 @@ async def get_bars(
bars, bars_array, dt_duration = out bars, bars_array, dt_duration = out
# not enough bars signal, likely due to venue
# operational gaps.
too_little: bool = False
if (
end_dt
and (
not bars
or (too_little :=
start_dt
and (len(bars) * timeframe)
< dt_duration.in_seconds()
)
)
):
if (
end_dt
or too_little
):
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
raise NoData(f'{end_dt}')
if bars_array is None: if bars_array is None:
raise SymbolNotFound(fqme) raise SymbolNotFound(fqme)
# not enough bars signal, likely due to venue
# operational gaps.
# too_little: bool = False
if end_dt:
if not bars:
# no data returned?
log.warning(
'History frame is blank?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
)
raise NoData(f'{end_dt}')
else:
dur_s: float = len(bars) * timeframe
bars_dur = pendulum.Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
first_dt = pendulum.from_timestamp( first_dt = pendulum.from_timestamp(
bars[0].date.timestamp()) bars[0].date.timestamp())
@ -854,7 +881,13 @@ async def stream_quotes(
init_msgs.append(init_msg) init_msgs.append(init_msg)
con: Contract = details.contract con: Contract = details.contract
first_ticker: Ticker = await proxy.get_quote(contract=con) first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
if first_ticker: if first_ticker:
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info( log.info(
@ -862,18 +895,6 @@ async def stream_quotes(
f'{pformat(first_quote)}' f'{pformat(first_quote)}'
) )
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
# NOTE: it might be outside regular trading hours for # NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we # assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset # only "pretend the feed is live" when the dst asset
@ -884,6 +905,8 @@ async def stream_quotes(
# (equitiies, futes, bonds etc.) we at least try to # (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history. # grab the OHLC history.
if ( if (
first_ticker
and
isnan(first_ticker.last) isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY # SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known # "pretend to do" `feed_is_live.set()` if it's a known
@ -907,6 +930,19 @@ async def stream_quotes(
await trio.sleep_forever() await trio.sleep_forever()
return # we never expect feed to come up? return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# XXX: MUST acquire a ticker + first quote before starting
# the live quotes loop!
# with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
cs: trio.CancelScope | None = None cs: trio.CancelScope | None = None
startup: bool = True startup: bool = True
while ( while (

View File

@ -41,7 +41,6 @@ from typing import (
import wsproto import wsproto
from uuid import uuid4 from uuid import uuid4
from rapidfuzz import process as fuzzy
from trio_typing import TaskStatus from trio_typing import TaskStatus
import asks import asks
from bidict import bidict from bidict import bidict
@ -416,8 +415,7 @@ class Client:
await self.get_mkt_pairs() await self.get_mkt_pairs()
assert self._pairs, '`Client.get_mkt_pairs()` was never called!?' assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
matches: dict[str, KucoinMktPair] = match_from_pairs(
matches: dict[str, Pair] = match_from_pairs(
pairs=self._pairs, pairs=self._pairs,
# query=pattern.upper(), # query=pattern.upper(),
query=pattern.upper(), query=pattern.upper(),

View File

@ -31,6 +31,8 @@ from pathlib import Path
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Sequence,
Hashable,
TYPE_CHECKING, TYPE_CHECKING,
) )
from types import ModuleType from types import ModuleType
@ -128,8 +130,8 @@ class SymbologyCache(Struct):
- `.get_mkt_pairs()`: returning a table of pair-`Struct` - `.get_mkt_pairs()`: returning a table of pair-`Struct`
types, custom defined by the particular backend. types, custom defined by the particular backend.
AND, the required `.get_mkt_info()` module-level endpoint which AND, the required `.get_mkt_info()` module-level endpoint
maps `fqme: str` -> `MktPair`s. which maps `fqme: str` -> `MktPair`s.
These tables are then used to fill out the `.assets`, `.pairs` and These tables are then used to fill out the `.assets`, `.pairs` and
`.mktmaps` tables on this cache instance, respectively. `.mktmaps` tables on this cache instance, respectively.
@ -500,7 +502,7 @@ def match_from_pairs(
) )
# pop and repack pairs in output dict # pop and repack pairs in output dict
matched_pairs: dict[str, Pair] = {} matched_pairs: dict[str, Struct] = {}
for item in matches: for item in matches:
pair_key: str = item[0] pair_key: str = item[0]
matched_pairs[pair_key] = pairs[pair_key] matched_pairs[pair_key] = pairs[pair_key]

View File

@ -16,19 +16,26 @@
# <https://www.gnu.org/licenses/>. # <https://www.gnu.org/licenses/>.
''' '''
Historical data business logic for load, backfill and tsdb storage. Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
orchestration and mgmt of tsdb data sets.
- core data-provider history backfilling middleware (as task-funcs) via
(what will eventually be `datad`, but are rn is the) `.brokers` backend
APIs.
- various data set cleaning, repairing and issue-detection/analysis
routines to ensure consistent series whether in shm or when
stored offline (in a tsdb).
''' '''
from __future__ import annotations from __future__ import annotations
# from collections import (
# Counter,
# )
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
# import time from pathlib import Path
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Callable, Callable,
Generator,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -36,6 +43,7 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
DateTime,
Duration, Duration,
from_timestamp, from_timestamp,
) )
@ -56,7 +64,14 @@ from ._source import def_iohlcv_fields
from ._sampling import ( from ._sampling import (
open_sample_stream, open_sample_stream,
) )
from . import tsp from .tsp import (
dedupe,
get_null_segs,
iter_null_segs,
sort_diff,
Frame,
# Seq,
)
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
@ -110,6 +125,7 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()] return array[times >= prepend_until_dt.timestamp()]
# TODO: can't we just make this a sync func now?
async def shm_push_in_between( async def shm_push_in_between(
shm: ShmArray, shm: ShmArray,
to_push: np.ndarray, to_push: np.ndarray,
@ -118,6 +134,10 @@ async def shm_push_in_between(
update_start_on_prepend: bool = False, update_start_on_prepend: bool = False,
) -> int: ) -> int:
# XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
shm.push( shm.push(
to_push, to_push,
prepend=True, prepend=True,
@ -138,10 +158,102 @@ async def shm_push_in_between(
else None else None
), ),
) )
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to async def maybe_fill_null_segments(
# memory... shm: ShmArray,
timeframe: float,
get_hist: Callable,
sampler_stream: tractor.MsgStream,
mkt: MktPair,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
) -> list[Frame]:
null_segs_detected = trio.Event()
task_status.started(null_segs_detected)
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
for (
absi_start, absi_end,
fi_start, fi_end,
start_t, end_t,
start_dt, end_dt,
) in iter_null_segs(
null_segs=null_segs,
frame=frame,
timeframe=timeframe,
):
# XXX NOTE: ?if we get a badly ordered timestamp
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
):
await tractor.pause()
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(absi_end, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=absi_end,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
null_segs_detected.set()
# RECHECK for more null-gaps
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if (
null_segs
and
len(null_segs[-1])
):
await tractor.pause()
array = shm.array array = shm.array
zeros = array[array['low'] == 0] zeros = array[array['low'] == 0]
@ -155,10 +267,24 @@ async def shm_push_in_between(
'low', 'low',
'close', 'close',
]] = shm._array[zeros['index'][0] - 1]['close'] ]] = shm._array[zeros['index'][0] - 1]['close']
# await tractor.pause()
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# if (
# next_end_dt not in frame[
# ):
# pass
async def start_backfill( async def start_backfill(
tn: trio.Nursery,
get_hist, get_hist,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
@ -213,27 +339,20 @@ async def start_backfill(
backfill_until_dt = backfill_from_dt.subtract(**period_duration) backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# TODO: can we drop this? without conc i don't think this # STAGE NOTE: "backward history gap filling":
# is necessary any more? # - we push to the shm buffer until we have history back
# configure async query throttling # until the latest entry loaded from the tsdb's table B)
# rate = config.get('rate', 1) # - after this loop continue to check for other gaps in the
# XXX: legacy from ``trimeter`` code but unsupported now. # (tsdb) history and (at least report) maybe fill them
# erlangs = config.get('erlangs', 1) # from new frame queries to the backend?
# avoid duplicate history frames with a set of datetime frame
# starts and associated counts of how many duplicates we see
# per time stamp.
# starts: Counter[datetime] = Counter()
# conduct "backward history gap filling" where we push to
# the shm buffer until we have history back until the
# latest entry loaded from the tsdb's table B)
last_start_dt: datetime = backfill_from_dt last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index next_prepend_index: int = backfill_from_shm_index
while last_start_dt > backfill_until_dt: while last_start_dt > backfill_until_dt:
log.info(
log.debug( f'Requesting {timeframe}s frame:\n'
f'Requesting {timeframe}s frame ending in {last_start_dt}' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
) )
try: try:
@ -261,36 +380,18 @@ async def start_backfill(
await tractor.pause() await tractor.pause()
return return
# TODO: drop this? see todo above.. assert (
# if ( array['time'][0]
# next_start_dt in starts ==
# and starts[next_start_dt] <= 6 next_start_dt.timestamp()
# ): )
# start_dt = min(starts)
# log.warning(
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
# )
# starts[start_dt] += 1
# await tractor.pause()
# continue
# elif starts[next_start_dt] > 6:
# log.warning(
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
# )
# return
# # only update new start point if not-yet-seen
# starts[next_start_dt] += 1
assert array['time'][0] == next_start_dt.timestamp()
diff = last_start_dt - next_start_dt diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * timeframe frame_size_s: float = len(array) * timeframe
expected_frame_size_s = frame_size_s + timeframe expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s: if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
@ -298,8 +399,10 @@ async def start_backfill(
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
log.warning( log.warning(
f'History frame ending @ {last_start_dt} appears to have a gap:\n' 'GAP DETECTED:\n'
f'{diff} ~= {frame_time_diff_s} seconds' f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
) )
to_push = diff_history( to_push = diff_history(
@ -415,74 +518,15 @@ async def start_backfill(
gaps, gaps,
deduped, deduped,
diff, diff,
) = tsp.dedupe(df) ) = dedupe(df)
if diff: if diff:
tsp.sort_diff(df) sort_diff(df)
else: else:
# finally filled gap # finally filled gap
log.info( log.info(
f'Finished filling gap to tsdb start @ {backfill_until_dt}!' f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
) )
# conduct tsdb timestamp gap detection and backfill any
# seemingly missing sequence segments..
# TODO: ideally these never exist but somehow it seems
# sometimes we're writing zero-ed segments on certain
# (teardown) cases?
from .tsp import detect_null_time_gap
gap_indices: tuple | None = detect_null_time_gap(shm)
while gap_indices:
(
istart,
start,
end,
iend,
) = gap_indices
start_dt = from_timestamp(start)
end_dt = from_timestamp(end)
# if we get a baddly ordered timestamp
# pair, imeeditately stop backfilling.
if end_dt < start_dt:
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(iend, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=iend,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
'backfilling': (mkt.fqme, timeframe),
},
})
gap_indices: tuple | None = detect_null_time_gap(shm)
# XXX: extremely important, there can be no checkpoints # XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames`` # in the block above to avoid entering new ``frames``
@ -494,6 +538,12 @@ async def start_backfill(
bf_done.set() bf_done.set()
# NOTE: originally this was used to cope with a tsdb (marketstore)
# which could not delivery very large frames of history over gRPC
# (thanks goolag) due to corruption issues. NOW, using apache
# parquet (by default in the local filesys) we don't have this
# requirement since the files can be loaded very quickly in
# entirety to memory via
async def back_load_from_tsdb( async def back_load_from_tsdb(
storemod: ModuleType, storemod: ModuleType,
storage: StorageClient, storage: StorageClient,
@ -631,10 +681,94 @@ async def back_load_from_tsdb(
# await sampler_stream.send('broadcast_all') # await sampler_stream.send('broadcast_all')
async def push_latest_frame(
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
],
timeframe: float,
config: dict,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
end_dt=None,
)
# so caller can access these ep values
dt_eps.extend([
mr_start_dt,
mr_end_dt,
])
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
if timeframe > 1:
await tractor.pause()
return
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
) -> tuple[
np.ndarray,
DateTime,
DateTime,
] | None:
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
tsdb_entry: tuple[
np.ndarray,
DateTime,
DateTime,
]
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
return tsdb_entry
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
return None
async def tsdb_backfill( async def tsdb_backfill(
mod: ModuleType, mod: ModuleType,
storemod: ModuleType, storemod: ModuleType,
tn: trio.Nursery,
storage: StorageClient, storage: StorageClient,
mkt: MktPair, mkt: MktPair,
@ -649,96 +783,101 @@ async def tsdb_backfill(
) -> None: ) -> None:
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
get_hist: Callable[ get_hist: Callable[
[int, datetime, datetime], [int, datetime, datetime],
tuple[np.ndarray, str] tuple[np.ndarray, str]
] ]
config: dict[str, int] config: dict[str, int]
async with mod.open_history_client( async with (
mod.open_history_client(
mkt, mkt,
) as (get_hist, config): ) as (get_hist, config),
# NOTE: this sub-nursery splits to tasks for the given
# sampling rate to concurrently load offline tsdb
# timeseries as well as new data from the venue backend!
):
log.info( log.info(
f'`{mod}` history client returned backfill config:\n' f'`{mod}` history client returned backfill config:\n'
f'{config}\n' f'{config}\n'
) )
# get latest query's worth of history all the way dt_eps: list[DateTime, DateTime] = []
# back to what is recorded in the tsdb async with trio.open_nursery() as tn:
try: tn.start_soon(
( push_latest_frame,
array, dt_eps,
mr_start_dt, shm,
mr_end_dt, get_hist,
) = await get_hist(
timeframe, timeframe,
end_dt=None, config,
) )
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
await tractor.pause()
return
# TODO: fill in non-zero epoch time values ALWAYS!
# hist_shm._array['time'] = np.arange(
# start=
# NOTE: removed for now since it'll always break
# on the first 60s of the venue open..
# times: np.ndarray = array['time']
# # sample period step size in seconds
# step_size_s = (
# from_timestamp(times[-1])
# - from_timestamp(times[-2])
# ).seconds
# if step_size_s not in (1, 60):
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
# step_size_s = (
# from_timestamp(times[-2])
# - from_timestamp(times[-3])
# ).seconds
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
backfill_gap_from_shm_index: int = shm._first.value + 1
# tell parent task to continue # tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started() task_status.started()
# loads a (large) frame of data from the tsdb depending tsdb_entry: tuple = await load_tsdb_hist(
# on the db's query size limit; our "nativedb" (using storage,
# parquet) generally can load the entire history into mem mkt,
# but if not then below the remaining history can be lazy timeframe,
# loaded?
fqme: str = mkt.fqme
last_tsdb_dt: datetime | None = None
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
) )
except TimeseriesNotFound:
log.warning( # NOTE: iabs to start backfilling from, reverse chronological,
f'No timeseries yet for {timeframe}@{fqme}' # ONLY AFTER the first history frame has been pushed to
) # mem!
else: backfill_gap_from_shm_index: int = shm._first.value + 1
(
mr_start_dt,
mr_end_dt,
) = dt_eps
async with trio.open_nursery() as tn:
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if tsdb_entry:
( (
tsdb_history, tsdb_history,
first_tsdb_dt, first_tsdb_dt,
last_tsdb_dt, last_tsdb_dt,
) = tsdb_entry ) = tsdb_entry
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
tn=tn,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
# calc the index from which the tsdb data should be # calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the # prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest # latest frame (loaded/read above) and the latest
@ -804,36 +943,32 @@ async def tsdb_backfill(
log.info(f'Loaded {to_push.shape} datums from storage') log.info(f'Loaded {to_push.shape} datums from storage')
# TODO: maybe start history anal and load missing "history # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# gaps" via backend.. # seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm, shm=shm,
timeframe=timeframe, timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index, get_hist=get_hist,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream, sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt, mkt=mkt,
storage=storage, ))
write_tsdb=True,
)
) # TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
# if len(hist_shm.array) < 2: # if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last # TODO: there's an edge case here to solve where if the last
@ -848,32 +983,29 @@ async def tsdb_backfill(
# backload any further data from tsdb (concurrently per # backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory) # timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above. # from the ``StorageClient.load()`` call above.
try:
await trio.sleep_forever() await trio.sleep_forever()
finally:
return
# XXX NOTE: this is legacy from when we were using # XXX NOTE: this is legacy from when we were using
# marketstore and we needed to continue backloading # marketstore and we needed to continue backloading
# incrementally from the tsdb client.. (bc it couldn't # incrementally from the tsdb client.. (bc it couldn't
# handle a single large query with gRPC for some # handle a single large query with gRPC for some
# reason.. classic goolag pos) # reason.. classic goolag pos)
tn.start_soon( # tn.start_soon(
back_load_from_tsdb, # back_load_from_tsdb,
storemod, # storemod,
storage, # storage,
fqme, # fqme,
tsdb_history, # tsdb_history,
last_tsdb_dt, # last_tsdb_dt,
mr_start_dt, # mr_start_dt,
mr_end_dt, # mr_end_dt,
bf_done, # bf_done,
timeframe, # timeframe,
shm, # shm,
) # )
async def manage_history( async def manage_history(
@ -981,6 +1113,11 @@ async def manage_history(
async with ( async with (
storage.open_storage_client() as (storemod, client), storage.open_storage_client() as (storemod, client),
# NOTE: this nursery spawns a task per "timeframe" (aka
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
log.info( log.info(
@ -1030,7 +1167,6 @@ async def manage_history(
tsdb_backfill, tsdb_backfill,
mod=mod, mod=mod,
storemod=storemod, storemod=storemod,
tn=tn,
# bus, # bus,
storage=client, storage=client,
mkt=mkt, mkt=mkt,
@ -1059,3 +1195,70 @@ async def manage_history(
# and thus a small RPC-prot for remotely controllinlg # and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing. # what data is loaded for viewing.
await trio.sleep_forever() await trio.sleep_forever()
def iter_dfs_from_shms(
fqme: str
) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)

View File

@ -29,12 +29,19 @@ from math import (
floor, floor,
) )
import time import time
from typing import Literal from typing import (
Literal,
# AsyncGenerator,
Generator,
)
import numpy as np import numpy as np
import polars as pl import polars as pl
from pendulum import (
DateTime,
from_timestamp,
)
from ._sharedmem import ShmArray
from ..toolz.profile import ( from ..toolz.profile import (
Profiler, Profiler,
pg_profile_enabled, pg_profile_enabled,
@ -53,6 +60,14 @@ get_console_log = partial(
name=subsys, name=subsys,
) )
# NOTE: union type-defs to handle generic `numpy` and `polars` types
# side-by-side Bo
# |_ TODO: schema spec typing?
# -[ ] nptyping!
# -[ ] wtv we can with polars?
Frame = pl.DataFrame | np.ndarray
Seq = pl.Series | np.ndarray
def slice_from_time( def slice_from_time(
arr: np.ndarray, arr: np.ndarray,
@ -209,51 +224,282 @@ def slice_from_time(
return read_slc return read_slc
def detect_null_time_gap( def get_null_segs(
shm: ShmArray, frame: Frame,
period: float, # sampling step in seconds
imargin: int = 1, imargin: int = 1,
col: str = 'time',
) -> tuple[float, float] | None: ) -> tuple[
# Seq, # TODO: can we make it an array-type instead?
list[
list[int, int],
],
Seq,
Frame
] | None:
''' '''
Detect if there are any zero-epoch stamped rows in Detect if there are any zero(-epoch stamped) valued
the presumed 'time' field-column. rows in for the provided `col: str` column; by default
presume the 'time' field/column.
Filter to the gap and return a surrounding index range. Filter to all such zero (time) segments and return
the corresponding frame zeroed segment's,
NOTE: for now presumes only ONE gap XD - gap absolute (in buffer terms) indices-endpoints as
`absi_zsegs`
- abs indices of all rows with zeroed `col` values as `absi_zeros`
- the corresponding frame's row-entries (view) which are
zeroed for the `col` as `zero_t`
''' '''
# ensure we read buffer state only once so that ShmArray rt times: Seq = frame['time']
# circular-buffer updates don't cause a indexing/size mismatch. zero_pred: Seq = (times == 0)
array: np.ndarray = shm.array
zero_pred: np.ndarray = array['time'] == 0 if isinstance(frame, np.ndarray):
zero_t: np.ndarray = array[zero_pred] tis_zeros: int = zero_pred.any()
else:
if zero_t.size: tis_zeros: int = zero_pred.any()
istart, iend = zero_t['index'][[0, -1]]
start, end = shm._array['time'][
[istart - imargin, iend + imargin]
]
return (
istart - imargin,
start,
end,
iend + imargin,
)
if not tis_zeros:
return None return None
# TODO: use ndarray for this?!
absi_zsegs: list[list[int, int]] = []
t_unit: Literal = Literal[ if isinstance(frame, np.ndarray):
'days', # view of ONLY the zero segments as one continuous chunk
'hours', zero_t: np.ndarray = frame[zero_pred]
'minutes', # abs indices of said zeroed rows
'seconds', absi_zeros = zero_t['index']
'miliseconds', # diff of abs index steps between each zeroed row
'microseconds', absi_zdiff: np.ndarray = np.diff(absi_zeros)
'nanoseconds',
] # scan for all frame-indices where the
# zeroed-row-abs-index-step-diff is greater then the
# expected increment of 1.
# data 1st zero seg data zeros
# ---- ------------ ---- ----- ------ ----
# ||||..000000000000..||||..00000..||||||..0000
# ---- ------------ ---- ----- ------ ----
# ^zero_t[0] ^zero_t[-1]
# ^fi_zgaps[0] ^fi_zgaps[1]
# ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple
# absi_zsegs[0][1]^
#
# NOTE: the first entry in `fi_zgaps` is where
# the first (absolute) index step diff is > 1.
# and it is a frame-relative index into `zero_t`.
fi_zgaps = np.argwhere(
absi_zdiff > 1
# NOTE: +1 here is ensure we index to the "start" of each
# segment (if we didn't the below loop needs to be
# re-written to expect `fi_end_rows`!
) + 1
# the rows from the contiguous zeroed segments which have
# abs-index steps >1 compared to the previous zero row
# (indicating an end of zeroed segment).
fi_zseg_start_rows = zero_t[fi_zgaps]
# TODO: equiv for pl.DataFrame case!
else:
izeros: pl.Series = zero_pred.arg_true()
zero_t: pl.DataFrame = frame[izeros]
absi_zeros = zero_t['index']
absi_zdiff: pl.Series = absi_zeros.diff()
fi_zgaps = (absi_zdiff > 1).arg_true()
# XXX: our goal (in this func) is to select out slice index
# pairs (zseg0_start, zseg_end) in abs index units for each
# null-segment portion detected throughout entire input frame.
# only up to one null-segment in entire frame?
num_gaps: int = fi_zgaps.size + 1
if num_gaps < 1:
if absi_zeros.size > 1:
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
max(
absi_zeros[0] - 1,
0,
),
# NOTE: need the + 1 to guarantee we index "up to"
# the next non-null row-datum.
min(
absi_zeros[-1] + 1,
frame['index'][-1],
),
]]
else:
# XXX EDGE CASE: only one null-datum found so
# mark the start abs index as None to trigger
# a full frame-len query to the respective backend?
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
None,
absi_zeros[0] + 1,
]]
# XXX NOTE XXX: if >= 2 zeroed segments are found, there should
# ALWAYS be more then one zero-segment-abs-index-step-diff row
# in `absi_zdiff`, so loop through all such
# abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`)
# and add them as the "end index" entries for each segment.
# Then, iif NOT iterating the first such segment end, look back
# for the prior segments zero-segment start indext by relative
# indexing the `zero_t` frame by -1 and grabbing the abs index
# of what should be the prior zero-segment abs start index.
else:
# NOTE: since `absi_zdiff` will never have a row
# corresponding to the first zero-segment's row, we add it
# manually here.
absi_zsegs.append([
absi_zeros[0] - 1,
None,
])
# TODO: can we do it with vec ops?
for i, (
fi, # frame index of zero-seg start
zseg_start_row, # full row for ^
) in enumerate(zip(
fi_zgaps,
fi_zseg_start_rows,
)):
assert (zseg_start_row == zero_t[fi]).all()
iabs: int = zseg_start_row['index'][0]
absi_zsegs.append([
iabs - 1,
None, # backfilled on next iter
])
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
# absi_pre_zseg = absi - 1
# final iter case, backfill FINAL end iabs!
if (i + 1) == fi_zgaps.size:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
# NOTE: only after the first segment (due to `.diff()`
# usage above) can we do a lookback to the prior
# segment's end row and determine it's abs index to
# retroactively insert to the prior
# `absi_zsegs[i-1][1]` entry Bo
last_end: int = absi_zsegs[i][1]
if last_end is None:
prev_zseg_row = zero_t[fi - 1]
absi_post_zseg = prev_zseg_row['index'][0] + 1
# XXX: MUST BACKFILL previous end iabs!
absi_zsegs[i][1] = absi_post_zseg
else:
if 0 < num_gaps < 2:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
iabs_first: int = frame['index'][0]
for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first]
if (
ts_start == 0
or
ts_end == 0
):
import pdbp
pdbp.set_trace()
assert end
assert start < end
log.warning(
f'Frame has {len(absi_zsegs)} NULL GAPS!?\n'
f'period: {period}\n'
f'total null samples: {len(zero_t)}\n'
)
return (
absi_zsegs, # [start, end] abs slice indices of seg
absi_zeros, # all abs indices within all null-segs
zero_t, # sliced-view of all null-segment rows-datums
)
def iter_null_segs(
timeframe: float,
frame: Frame | None = None,
null_segs: tuple | None = None,
) -> Generator[
tuple[
int, int,
int, int,
float, float,
float, float,
# Seq, # TODO: can we make it an array-type instead?
# list[
# list[int, int],
# ],
# Seq,
# Frame
],
None,
]:
if null_segs is None:
null_segs: tuple = get_null_segs(
frame,
period=timeframe,
)
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
absi_first: int = frame[0]['index']
for (
absi_start,
absi_end,
) in absi_pairs_zsegs:
fi_end: int = absi_end - absi_first
end_row: Seq = frame[fi_end]
end_t: float = end_row['time']
end_dt: DateTime = from_timestamp(end_t)
fi_start = None
start_row = None
start_t = None
start_dt = None
if (
absi_start is not None
and start_t != 0
):
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)
if absi_start < 0:
import pdbp
pdbp.set_trace()
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
)
def with_dts( def with_dts(
@ -292,6 +538,17 @@ def dedup_dt(
) )
t_unit: Literal = Literal[
'days',
'hours',
'minutes',
'seconds',
'miliseconds',
'microseconds',
'nanoseconds',
]
def detect_time_gaps( def detect_time_gaps(
df: pl.DataFrame, df: pl.DataFrame,
@ -406,10 +663,6 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
f'Gaps found:\n{gaps}\n' f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}' f'deduped Gaps found:\n{deduped_gaps}'
) )
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = detect_null_time_gap()
return ( return (
df, df,
gaps, gaps,
@ -428,14 +681,19 @@ def sort_diff(
list[int], # indices of segments that are out-of-order list[int], # indices of segments that are out-of-order
]: ]:
ser: pl.Series = src_df[col] ser: pl.Series = src_df[col]
diff: pl.Series = ser.diff()
sortd: pl.DataFrame = ser.sort() sortd: pl.DataFrame = ser.sort()
diff: pl.Series = ser.diff()
sortd_diff: pl.Series = sortd.diff() sortd_diff: pl.Series = sortd.diff()
i_step_diff = (diff != sortd_diff).arg_true() i_step_diff = (diff != sortd_diff).arg_true()
if i_step_diff.len(): frame_reorders: int = i_step_diff.len()
import pdbp if frame_reorders:
pdbp.set_trace() log.warn(
f'Resorted frame on col: {col}\n'
f'{frame_reorders}'
)
# import pdbp; pdbp.set_trace()
# NOTE: thanks to this SO answer for the below conversion routines # NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back: # to go from numpy struct-arrays to polars dataframes and back:

View File

@ -21,8 +21,6 @@ Storage middle-ware CLIs.
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
import time import time
from typing import Generator
# from typing import TYPE_CHECKING
import polars as pl import polars as pl
import numpy as np import numpy as np
@ -37,14 +35,11 @@ from piker.service import open_piker_runtime
from piker.cli import cli from piker.cli import cli
from piker.config import get_conf_dir from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray, ShmArray,
tsp, tsp,
) )
from piker.data.history import ( from piker.data.history import (
_default_hist_size, iter_dfs_from_shms,
_default_rt_size,
) )
from . import ( from . import (
log, log,
@ -190,6 +185,13 @@ def anal(
) )
assert first_dt < last_dt assert first_dt < last_dt
null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
if null_segs:
await tractor.pause()
shm_df: pl.DataFrame = await client.as_df( shm_df: pl.DataFrame = await client.as_df(
fqme, fqme,
period, period,
@ -204,6 +206,7 @@ def anal(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
if diff: if diff:
await client.write_ohlcv( await client.write_ohlcv(
fqme, fqme,
@ -219,69 +222,6 @@ def anal(
trio.run(main) trio.run(main)
def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
@ -307,8 +247,8 @@ def ldshm(
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
secs: float = times[-1] - times[-2] period_s: float = times[-1] - times[-2]
if secs < 1.: if period_s < 1.:
raise ValueError( raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
@ -323,17 +263,22 @@ def ldshm(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?
if ( if not gaps.is_empty():
not gaps.is_empty() await tractor.pause()
or secs > 2
): if null_segs:
await tractor.pause() await tractor.pause()
# write to parquet file? # write to parquet file?
if write_parquet: if write_parquet:
timeframe: str = f'{secs}s' timeframe: str = f'{period_s}s'
datadir: Path = get_conf_dir() / 'nativedb' datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir(): if not datadir.is_dir():

View File

@ -342,7 +342,6 @@ class NativeStorageClient:
) )
return path return path
async def write_ohlcv( async def write_ohlcv(
self, self,
fqme: str, fqme: str,

View File

@ -207,9 +207,10 @@ class ContentsLabel(pg.LabelItem):
# this being "html" is the dumbest shit :eyeroll: # this being "html" is the dumbest shit :eyeroll:
self.setText( self.setText(
"<b>i</b>:{index}<br/>" "<b>i_arr</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via # NB: these fields must be indexed in the correct order via
# the slice syntax below. # the slice syntax below.
"<b>i_shm</b>:{}<br/>"
"<b>epoch</b>:{}<br/>" "<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>" "<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>" "<b>H</b>:{}<br/>"
@ -219,6 +220,7 @@ class ContentsLabel(pg.LabelItem):
# "<b>wap</b>:{}".format( # "<b>wap</b>:{}".format(
*array[ix][ *array[ix][
[ [
'index',
'time', 'time',
'open', 'open',
'high', 'high',

View File

@ -248,25 +248,27 @@ async def increment_history_view(
# - samplerd could emit the actual update range via # - samplerd could emit the actual update range via
# tuple and then we only enter the below block if that # tuple and then we only enter the below block if that
# range is detected as in-view? # range is detected as in-view?
if ( match msg:
(bf_wut := msg.get('backfilling', False)) case {
): 'backfilling': (viz_name, timeframe),
viz_name, timeframe = bf_wut } if (
if (
viz_name == name viz_name == name
):
log.warning(
f'Forcing HARD REDRAW:\n'
f'name: {name}\n'
f'timeframe: {timeframe}\n'
)
# TODO: only allow this when the data is IN VIEW! # TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently # also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the # / smarter by only redrawing the portion of the
# path necessary? # path necessary?
and False {
): 60: hist_viz,
log.info(f'Forcing hard redraw -> {name}@{timeframe}') 1: viz,
match timeframe: }[timeframe].update_graphics(
case 60: force_redraw=True
hist_viz.update_graphics(force_redraw=True) )
case 1:
viz.update_graphics(force_redraw=True)
# check if slow chart needs an x-domain shift and/or # check if slow chart needs an x-domain shift and/or
# y-range resize. # y-range resize.