Compare commits
No commits in common. "1f9a4976378f863193572a0b64a5dba5e4aaf672" and "f274c3db3b5cc00abee8cec84ef98bd564e437d2" have entirely different histories.
1f9a497637
...
f274c3db3b
|
@ -401,15 +401,7 @@ class Client:
|
|||
# => we recursively call this method until we get at least
|
||||
# as many bars such that they sum in aggregate to the the
|
||||
# desired total time (duration) at most.
|
||||
# XXX XXX XXX
|
||||
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
|
||||
# XXX XXX XXX
|
||||
# - if you query over a gap and get no data
|
||||
# that may short circuit the history
|
||||
if (
|
||||
end_dt
|
||||
and False
|
||||
):
|
||||
if end_dt:
|
||||
nparr: np.ndarray = bars_to_np(bars)
|
||||
times: np.ndarray = nparr['time']
|
||||
first: float = times[0]
|
||||
|
@ -418,7 +410,6 @@ class Client:
|
|||
if (
|
||||
# len(bars) * sample_period_s) < dt_duration.in_seconds()
|
||||
tdiff < dt_duration.in_seconds()
|
||||
# and False
|
||||
):
|
||||
end_dt: DateTime = from_timestamp(first)
|
||||
log.warning(
|
||||
|
@ -868,9 +859,6 @@ class Client:
|
|||
timeout=timeout,
|
||||
)
|
||||
except TimeoutError:
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
|
||||
if raise_on_timeout:
|
||||
raise
|
||||
return None
|
||||
|
|
|
@ -174,15 +174,8 @@ async def open_history_client(
|
|||
start_dt: datetime | None = None,
|
||||
|
||||
) -> tuple[np.ndarray, str]:
|
||||
|
||||
nonlocal max_timeout, mean, count
|
||||
|
||||
if (
|
||||
start_dt
|
||||
and start_dt.timestamp() == 0
|
||||
):
|
||||
await tractor.pause()
|
||||
|
||||
query_start = time.time()
|
||||
out, timedout = await get_bars(
|
||||
proxy,
|
||||
|
@ -410,54 +403,34 @@ async def get_bars(
|
|||
|
||||
bars, bars_array, dt_duration = out
|
||||
|
||||
if bars_array is None:
|
||||
raise SymbolNotFound(fqme)
|
||||
|
||||
# not enough bars signal, likely due to venue
|
||||
# operational gaps.
|
||||
# too_little: bool = False
|
||||
if end_dt:
|
||||
if not bars:
|
||||
# no data returned?
|
||||
log.warning(
|
||||
'History frame is blank?\n'
|
||||
f'start_dt: {start_dt}\n'
|
||||
f'end_dt: {end_dt}\n'
|
||||
f'duration: {dt_duration}\n'
|
||||
too_little: bool = False
|
||||
if (
|
||||
end_dt
|
||||
and (
|
||||
not bars
|
||||
or (too_little :=
|
||||
start_dt
|
||||
and (len(bars) * timeframe)
|
||||
< dt_duration.in_seconds()
|
||||
)
|
||||
)
|
||||
):
|
||||
if (
|
||||
end_dt
|
||||
or too_little
|
||||
):
|
||||
log.warning(
|
||||
f'History is blank for {dt_duration} from {end_dt}'
|
||||
)
|
||||
end_dt -= dt_duration
|
||||
continue
|
||||
|
||||
raise NoData(f'{end_dt}')
|
||||
|
||||
else:
|
||||
dur_s: float = len(bars) * timeframe
|
||||
bars_dur = pendulum.Duration(seconds=dur_s)
|
||||
dt_dur_s: float = dt_duration.in_seconds()
|
||||
if dur_s < dt_dur_s:
|
||||
log.warning(
|
||||
'History frame is shorter then expected?\n'
|
||||
f'start_dt: {start_dt}\n'
|
||||
f'end_dt: {end_dt}\n'
|
||||
f'duration: {dt_dur_s}\n'
|
||||
f'frame duration seconds: {dur_s}\n'
|
||||
f'dur diff: {dt_duration - bars_dur}\n'
|
||||
)
|
||||
# NOTE: we used to try to get a minimal
|
||||
# set of bars by recursing but this ran
|
||||
# into possible infinite query loops
|
||||
# when logic in the `Client.bars()` dt
|
||||
# diffing went bad. So instead for now
|
||||
# we just return the
|
||||
# shorter-then-expected history with
|
||||
# a warning.
|
||||
# TODO: in the future it prolly makes
|
||||
# the most send to do venue operating
|
||||
# hours lookup and
|
||||
# timestamp-in-operating-range set
|
||||
# checking to know for sure if we can
|
||||
# safely and quickly ignore non-uniform history
|
||||
# frame timestamp gaps..
|
||||
# end_dt -= dt_duration
|
||||
# continue
|
||||
# await tractor.pause()
|
||||
if bars_array is None:
|
||||
raise SymbolNotFound(fqme)
|
||||
|
||||
first_dt = pendulum.from_timestamp(
|
||||
bars[0].date.timestamp())
|
||||
|
@ -881,13 +854,7 @@ async def stream_quotes(
|
|||
init_msgs.append(init_msg)
|
||||
|
||||
con: Contract = details.contract
|
||||
first_ticker: Ticker | None = None
|
||||
with trio.move_on_after(1):
|
||||
first_ticker: Ticker = await proxy.get_quote(
|
||||
contract=con,
|
||||
raise_on_timeout=False,
|
||||
)
|
||||
|
||||
first_ticker: Ticker = await proxy.get_quote(contract=con)
|
||||
if first_ticker:
|
||||
first_quote: dict = normalize(first_ticker)
|
||||
log.info(
|
||||
|
@ -895,6 +862,18 @@ async def stream_quotes(
|
|||
f'{pformat(first_quote)}'
|
||||
)
|
||||
|
||||
# TODO: we should instead spawn a task that waits on a feed
|
||||
# to start and let it wait indefinitely..instead of this
|
||||
# hard coded stuff.
|
||||
# async def wait_for_first_quote():
|
||||
# with trio.CancelScope() as cs:
|
||||
|
||||
with trio.move_on_after(1):
|
||||
first_ticker = await proxy.get_quote(
|
||||
contract=con,
|
||||
raise_on_timeout=True,
|
||||
)
|
||||
|
||||
# NOTE: it might be outside regular trading hours for
|
||||
# assets with "standard venue operating hours" so we
|
||||
# only "pretend the feed is live" when the dst asset
|
||||
|
@ -905,8 +884,6 @@ async def stream_quotes(
|
|||
# (equitiies, futes, bonds etc.) we at least try to
|
||||
# grab the OHLC history.
|
||||
if (
|
||||
first_ticker
|
||||
and
|
||||
isnan(first_ticker.last)
|
||||
# SO, if the last quote price value is NaN we ONLY
|
||||
# "pretend to do" `feed_is_live.set()` if it's a known
|
||||
|
@ -930,19 +907,6 @@ async def stream_quotes(
|
|||
await trio.sleep_forever()
|
||||
return # we never expect feed to come up?
|
||||
|
||||
# TODO: we should instead spawn a task that waits on a feed
|
||||
# to start and let it wait indefinitely..instead of this
|
||||
# hard coded stuff.
|
||||
# async def wait_for_first_quote():
|
||||
# with trio.CancelScope() as cs:
|
||||
|
||||
# XXX: MUST acquire a ticker + first quote before starting
|
||||
# the live quotes loop!
|
||||
# with trio.move_on_after(1):
|
||||
first_ticker = await proxy.get_quote(
|
||||
contract=con,
|
||||
raise_on_timeout=True,
|
||||
)
|
||||
cs: trio.CancelScope | None = None
|
||||
startup: bool = True
|
||||
while (
|
||||
|
|
|
@ -41,6 +41,7 @@ from typing import (
|
|||
import wsproto
|
||||
from uuid import uuid4
|
||||
|
||||
from rapidfuzz import process as fuzzy
|
||||
from trio_typing import TaskStatus
|
||||
import asks
|
||||
from bidict import bidict
|
||||
|
@ -415,7 +416,8 @@ class Client:
|
|||
await self.get_mkt_pairs()
|
||||
assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
|
||||
|
||||
matches: dict[str, KucoinMktPair] = match_from_pairs(
|
||||
|
||||
matches: dict[str, Pair] = match_from_pairs(
|
||||
pairs=self._pairs,
|
||||
# query=pattern.upper(),
|
||||
query=pattern.upper(),
|
||||
|
|
|
@ -31,8 +31,6 @@ from pathlib import Path
|
|||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Sequence,
|
||||
Hashable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from types import ModuleType
|
||||
|
@ -130,8 +128,8 @@ class SymbologyCache(Struct):
|
|||
- `.get_mkt_pairs()`: returning a table of pair-`Struct`
|
||||
types, custom defined by the particular backend.
|
||||
|
||||
AND, the required `.get_mkt_info()` module-level endpoint
|
||||
which maps `fqme: str` -> `MktPair`s.
|
||||
AND, the required `.get_mkt_info()` module-level endpoint which
|
||||
maps `fqme: str` -> `MktPair`s.
|
||||
|
||||
These tables are then used to fill out the `.assets`, `.pairs` and
|
||||
`.mktmaps` tables on this cache instance, respectively.
|
||||
|
@ -502,7 +500,7 @@ def match_from_pairs(
|
|||
)
|
||||
|
||||
# pop and repack pairs in output dict
|
||||
matched_pairs: dict[str, Struct] = {}
|
||||
matched_pairs: dict[str, Pair] = {}
|
||||
for item in matches:
|
||||
pair_key: str = item[0]
|
||||
matched_pairs[pair_key] = pairs[pair_key]
|
||||
|
|
|
@ -16,26 +16,19 @@
|
|||
# <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
|
||||
|
||||
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
|
||||
orchestration and mgmt of tsdb data sets.
|
||||
- core data-provider history backfilling middleware (as task-funcs) via
|
||||
(what will eventually be `datad`, but are rn is the) `.brokers` backend
|
||||
APIs.
|
||||
- various data set cleaning, repairing and issue-detection/analysis
|
||||
routines to ensure consistent series whether in shm or when
|
||||
stored offline (in a tsdb).
|
||||
Historical data business logic for load, backfill and tsdb storage.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
# from collections import (
|
||||
# Counter,
|
||||
# )
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
# import time
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
Callable,
|
||||
Generator,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
|
@ -43,7 +36,6 @@ import trio
|
|||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from pendulum import (
|
||||
DateTime,
|
||||
Duration,
|
||||
from_timestamp,
|
||||
)
|
||||
|
@ -64,14 +56,7 @@ from ._source import def_iohlcv_fields
|
|||
from ._sampling import (
|
||||
open_sample_stream,
|
||||
)
|
||||
from .tsp import (
|
||||
dedupe,
|
||||
get_null_segs,
|
||||
iter_null_segs,
|
||||
sort_diff,
|
||||
Frame,
|
||||
# Seq,
|
||||
)
|
||||
from . import tsp
|
||||
from ..brokers._util import (
|
||||
DataUnavailable,
|
||||
)
|
||||
|
@ -125,7 +110,6 @@ def diff_history(
|
|||
return array[times >= prepend_until_dt.timestamp()]
|
||||
|
||||
|
||||
# TODO: can't we just make this a sync func now?
|
||||
async def shm_push_in_between(
|
||||
shm: ShmArray,
|
||||
to_push: np.ndarray,
|
||||
|
@ -134,10 +118,6 @@ async def shm_push_in_between(
|
|||
update_start_on_prepend: bool = False,
|
||||
|
||||
) -> int:
|
||||
# XXX: extremely important, there can be no checkpoints
|
||||
# in the body of this func to avoid entering new ``frames``
|
||||
# values while we're pipelining the current ones to
|
||||
# memory...
|
||||
shm.push(
|
||||
to_push,
|
||||
prepend=True,
|
||||
|
@ -158,102 +138,10 @@ async def shm_push_in_between(
|
|||
else None
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def maybe_fill_null_segments(
|
||||
shm: ShmArray,
|
||||
timeframe: float,
|
||||
get_hist: Callable,
|
||||
sampler_stream: tractor.MsgStream,
|
||||
mkt: MktPair,
|
||||
|
||||
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> list[Frame]:
|
||||
|
||||
null_segs_detected = trio.Event()
|
||||
task_status.started(null_segs_detected)
|
||||
|
||||
frame: Frame = shm.array
|
||||
|
||||
null_segs: tuple | None = get_null_segs(
|
||||
frame,
|
||||
period=timeframe,
|
||||
)
|
||||
for (
|
||||
absi_start, absi_end,
|
||||
fi_start, fi_end,
|
||||
start_t, end_t,
|
||||
start_dt, end_dt,
|
||||
) in iter_null_segs(
|
||||
null_segs=null_segs,
|
||||
frame=frame,
|
||||
timeframe=timeframe,
|
||||
):
|
||||
|
||||
# XXX NOTE: ?if we get a badly ordered timestamp
|
||||
# pair, immediately stop backfilling?
|
||||
if (
|
||||
start_dt
|
||||
and end_dt < start_dt
|
||||
):
|
||||
await tractor.pause()
|
||||
break
|
||||
|
||||
(
|
||||
array,
|
||||
next_start_dt,
|
||||
next_end_dt,
|
||||
) = await get_hist(
|
||||
timeframe,
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt,
|
||||
)
|
||||
|
||||
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
||||
# and mnq.cme.ib this causes a Qt crash XXDDD
|
||||
|
||||
# make sure we don't overrun the buffer start
|
||||
len_to_push: int = min(absi_end, array.size)
|
||||
to_push: np.ndarray = array[-len_to_push:]
|
||||
|
||||
await shm_push_in_between(
|
||||
shm,
|
||||
to_push,
|
||||
prepend_index=absi_end,
|
||||
update_start_on_prepend=False,
|
||||
)
|
||||
# TODO: UI side needs IPC event to update..
|
||||
# - make sure the UI actually always handles
|
||||
# this update!
|
||||
# - remember that in the display side, only refersh this
|
||||
# if the respective history is actually "in view".
|
||||
# loop
|
||||
await sampler_stream.send({
|
||||
'broadcast_all': {
|
||||
|
||||
# XXX NOTE XXX: see the
|
||||
# `.ui._display.increment_history_view()` if block
|
||||
# that looks for this info to FORCE a hard viz
|
||||
# redraw!
|
||||
'backfilling': (mkt.fqme, timeframe),
|
||||
},
|
||||
})
|
||||
|
||||
null_segs_detected.set()
|
||||
# RECHECK for more null-gaps
|
||||
frame: Frame = shm.array
|
||||
null_segs: tuple | None = get_null_segs(
|
||||
frame,
|
||||
period=timeframe,
|
||||
)
|
||||
if (
|
||||
null_segs
|
||||
and
|
||||
len(null_segs[-1])
|
||||
):
|
||||
await tractor.pause()
|
||||
|
||||
# XXX: extremely important, there can be no checkpoints
|
||||
# in the block above to avoid entering new ``frames``
|
||||
# values while we're pipelining the current ones to
|
||||
# memory...
|
||||
array = shm.array
|
||||
zeros = array[array['low'] == 0]
|
||||
|
||||
|
@ -267,24 +155,10 @@ async def maybe_fill_null_segments(
|
|||
'low',
|
||||
'close',
|
||||
]] = shm._array[zeros['index'][0] - 1]['close']
|
||||
|
||||
# TODO: interatively step through any remaining
|
||||
# time-gaps/null-segments and spawn piecewise backfiller
|
||||
# tasks in a nursery?
|
||||
# -[ ] not sure that's going to work so well on the ib
|
||||
# backend but worth a shot?
|
||||
# -[ ] mk new history connections to make it properly
|
||||
# parallel possible no matter the backend?
|
||||
# -[ ] fill algo: do queries in alternating "latest, then
|
||||
# earliest, then latest.. etc?"
|
||||
# if (
|
||||
# next_end_dt not in frame[
|
||||
# ):
|
||||
# pass
|
||||
# await tractor.pause()
|
||||
|
||||
|
||||
async def start_backfill(
|
||||
tn: trio.Nursery,
|
||||
get_hist,
|
||||
mod: ModuleType,
|
||||
mkt: MktPair,
|
||||
|
@ -339,20 +213,27 @@ async def start_backfill(
|
|||
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
|
||||
|
||||
|
||||
# STAGE NOTE: "backward history gap filling":
|
||||
# - we push to the shm buffer until we have history back
|
||||
# until the latest entry loaded from the tsdb's table B)
|
||||
# - after this loop continue to check for other gaps in the
|
||||
# (tsdb) history and (at least report) maybe fill them
|
||||
# from new frame queries to the backend?
|
||||
# TODO: can we drop this? without conc i don't think this
|
||||
# is necessary any more?
|
||||
# configure async query throttling
|
||||
# rate = config.get('rate', 1)
|
||||
# XXX: legacy from ``trimeter`` code but unsupported now.
|
||||
# erlangs = config.get('erlangs', 1)
|
||||
# avoid duplicate history frames with a set of datetime frame
|
||||
# starts and associated counts of how many duplicates we see
|
||||
# per time stamp.
|
||||
# starts: Counter[datetime] = Counter()
|
||||
|
||||
# conduct "backward history gap filling" where we push to
|
||||
# the shm buffer until we have history back until the
|
||||
# latest entry loaded from the tsdb's table B)
|
||||
last_start_dt: datetime = backfill_from_dt
|
||||
next_prepend_index: int = backfill_from_shm_index
|
||||
|
||||
while last_start_dt > backfill_until_dt:
|
||||
log.info(
|
||||
f'Requesting {timeframe}s frame:\n'
|
||||
f'backfill_until_dt: {backfill_until_dt}\n'
|
||||
f'last_start_dt: {last_start_dt}\n'
|
||||
|
||||
log.debug(
|
||||
f'Requesting {timeframe}s frame ending in {last_start_dt}'
|
||||
)
|
||||
|
||||
try:
|
||||
|
@ -380,18 +261,36 @@ async def start_backfill(
|
|||
await tractor.pause()
|
||||
return
|
||||
|
||||
assert (
|
||||
array['time'][0]
|
||||
==
|
||||
next_start_dt.timestamp()
|
||||
)
|
||||
# TODO: drop this? see todo above..
|
||||
# if (
|
||||
# next_start_dt in starts
|
||||
# and starts[next_start_dt] <= 6
|
||||
# ):
|
||||
# start_dt = min(starts)
|
||||
# log.warning(
|
||||
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
|
||||
# )
|
||||
# starts[start_dt] += 1
|
||||
# await tractor.pause()
|
||||
# continue
|
||||
|
||||
# elif starts[next_start_dt] > 6:
|
||||
# log.warning(
|
||||
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
|
||||
# )
|
||||
# return
|
||||
|
||||
# # only update new start point if not-yet-seen
|
||||
# starts[next_start_dt] += 1
|
||||
|
||||
assert array['time'][0] == next_start_dt.timestamp()
|
||||
|
||||
diff = last_start_dt - next_start_dt
|
||||
frame_time_diff_s = diff.seconds
|
||||
|
||||
# frame's worth of sample-period-steps, in seconds
|
||||
frame_size_s: float = len(array) * timeframe
|
||||
expected_frame_size_s: float = frame_size_s + timeframe
|
||||
frame_size_s = len(array) * timeframe
|
||||
expected_frame_size_s = frame_size_s + timeframe
|
||||
if frame_time_diff_s > expected_frame_size_s:
|
||||
|
||||
# XXX: query result includes a start point prior to our
|
||||
|
@ -399,10 +298,8 @@ async def start_backfill(
|
|||
# history gap (eg. market closed period, outage, etc.)
|
||||
# so just report it to console for now.
|
||||
log.warning(
|
||||
'GAP DETECTED:\n'
|
||||
f'last_start_dt: {last_start_dt}\n'
|
||||
f'diff: {diff}\n'
|
||||
f'frame_time_diff_s: {frame_time_diff_s}\n'
|
||||
f'History frame ending @ {last_start_dt} appears to have a gap:\n'
|
||||
f'{diff} ~= {frame_time_diff_s} seconds'
|
||||
)
|
||||
|
||||
to_push = diff_history(
|
||||
|
@ -518,15 +415,74 @@ async def start_backfill(
|
|||
gaps,
|
||||
deduped,
|
||||
diff,
|
||||
) = dedupe(df)
|
||||
) = tsp.dedupe(df)
|
||||
if diff:
|
||||
sort_diff(df)
|
||||
tsp.sort_diff(df)
|
||||
|
||||
else:
|
||||
# finally filled gap
|
||||
log.info(
|
||||
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
|
||||
)
|
||||
# conduct tsdb timestamp gap detection and backfill any
|
||||
# seemingly missing sequence segments..
|
||||
# TODO: ideally these never exist but somehow it seems
|
||||
# sometimes we're writing zero-ed segments on certain
|
||||
# (teardown) cases?
|
||||
from .tsp import detect_null_time_gap
|
||||
|
||||
gap_indices: tuple | None = detect_null_time_gap(shm)
|
||||
while gap_indices:
|
||||
(
|
||||
istart,
|
||||
start,
|
||||
end,
|
||||
iend,
|
||||
) = gap_indices
|
||||
|
||||
start_dt = from_timestamp(start)
|
||||
end_dt = from_timestamp(end)
|
||||
|
||||
# if we get a baddly ordered timestamp
|
||||
# pair, imeeditately stop backfilling.
|
||||
if end_dt < start_dt:
|
||||
break
|
||||
|
||||
(
|
||||
array,
|
||||
next_start_dt,
|
||||
next_end_dt,
|
||||
) = await get_hist(
|
||||
timeframe,
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt,
|
||||
)
|
||||
|
||||
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
||||
# and mnq.cme.ib this causes a Qt crash XXDDD
|
||||
|
||||
# make sure we don't overrun the buffer start
|
||||
len_to_push: int = min(iend, array.size)
|
||||
to_push: np.ndarray = array[-len_to_push:]
|
||||
await shm_push_in_between(
|
||||
shm,
|
||||
to_push,
|
||||
prepend_index=iend,
|
||||
update_start_on_prepend=False,
|
||||
)
|
||||
|
||||
# TODO: UI side needs IPC event to update..
|
||||
# - make sure the UI actually always handles
|
||||
# this update!
|
||||
# - remember that in the display side, only refersh this
|
||||
# if the respective history is actually "in view".
|
||||
# loop
|
||||
await sampler_stream.send({
|
||||
'broadcast_all': {
|
||||
'backfilling': (mkt.fqme, timeframe),
|
||||
},
|
||||
})
|
||||
gap_indices: tuple | None = detect_null_time_gap(shm)
|
||||
|
||||
# XXX: extremely important, there can be no checkpoints
|
||||
# in the block above to avoid entering new ``frames``
|
||||
|
@ -538,12 +494,6 @@ async def start_backfill(
|
|||
bf_done.set()
|
||||
|
||||
|
||||
# NOTE: originally this was used to cope with a tsdb (marketstore)
|
||||
# which could not delivery very large frames of history over gRPC
|
||||
# (thanks goolag) due to corruption issues. NOW, using apache
|
||||
# parquet (by default in the local filesys) we don't have this
|
||||
# requirement since the files can be loaded very quickly in
|
||||
# entirety to memory via
|
||||
async def back_load_from_tsdb(
|
||||
storemod: ModuleType,
|
||||
storage: StorageClient,
|
||||
|
@ -681,94 +631,10 @@ async def back_load_from_tsdb(
|
|||
# await sampler_stream.send('broadcast_all')
|
||||
|
||||
|
||||
async def push_latest_frame(
|
||||
dt_eps: list[DateTime, DateTime],
|
||||
shm: ShmArray,
|
||||
get_hist: Callable[
|
||||
[int, datetime, datetime],
|
||||
tuple[np.ndarray, str]
|
||||
],
|
||||
timeframe: float,
|
||||
config: dict,
|
||||
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
# get latest query's worth of history all the way
|
||||
# back to what is recorded in the tsdb
|
||||
try:
|
||||
(
|
||||
array,
|
||||
mr_start_dt,
|
||||
mr_end_dt,
|
||||
) = await get_hist(
|
||||
timeframe,
|
||||
end_dt=None,
|
||||
)
|
||||
# so caller can access these ep values
|
||||
dt_eps.extend([
|
||||
mr_start_dt,
|
||||
mr_end_dt,
|
||||
])
|
||||
|
||||
# XXX: timeframe not supported for backend (since
|
||||
# above exception type), terminate immediately since
|
||||
# there's no backfilling possible.
|
||||
except DataUnavailable:
|
||||
task_status.started()
|
||||
|
||||
if timeframe > 1:
|
||||
await tractor.pause()
|
||||
|
||||
return
|
||||
|
||||
# NOTE: on the first history, most recent history
|
||||
# frame we PREPEND from the current shm ._last index
|
||||
# and thus a gap between the earliest datum loaded here
|
||||
# and the latest loaded from the tsdb may exist!
|
||||
log.info(f'Pushing {array.size} to shm!')
|
||||
shm.push(
|
||||
array,
|
||||
prepend=True, # append on first frame
|
||||
)
|
||||
|
||||
|
||||
async def load_tsdb_hist(
|
||||
storage: StorageClient,
|
||||
mkt: MktPair,
|
||||
timeframe: float,
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
DateTime,
|
||||
DateTime,
|
||||
] | None:
|
||||
# loads a (large) frame of data from the tsdb depending
|
||||
# on the db's query size limit; our "nativedb" (using
|
||||
# parquet) generally can load the entire history into mem
|
||||
# but if not then below the remaining history can be lazy
|
||||
# loaded?
|
||||
fqme: str = mkt.fqme
|
||||
tsdb_entry: tuple[
|
||||
np.ndarray,
|
||||
DateTime,
|
||||
DateTime,
|
||||
]
|
||||
try:
|
||||
tsdb_entry: tuple | None = await storage.load(
|
||||
fqme,
|
||||
timeframe=timeframe,
|
||||
)
|
||||
return tsdb_entry
|
||||
|
||||
except TimeseriesNotFound:
|
||||
log.warning(
|
||||
f'No timeseries yet for {timeframe}@{fqme}'
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
async def tsdb_backfill(
|
||||
mod: ModuleType,
|
||||
storemod: ModuleType,
|
||||
tn: trio.Nursery,
|
||||
|
||||
storage: StorageClient,
|
||||
mkt: MktPair,
|
||||
|
@ -783,101 +649,96 @@ async def tsdb_backfill(
|
|||
|
||||
) -> None:
|
||||
|
||||
if timeframe not in (1, 60):
|
||||
raise ValueError(
|
||||
'`piker` only needs to support 1m and 1s sampling '
|
||||
'but ur api is trying to deliver a longer '
|
||||
f'timeframe of {timeframe} seconds..\n'
|
||||
'So yuh.. dun do dat brudder.'
|
||||
)
|
||||
|
||||
get_hist: Callable[
|
||||
[int, datetime, datetime],
|
||||
tuple[np.ndarray, str]
|
||||
]
|
||||
config: dict[str, int]
|
||||
async with (
|
||||
mod.open_history_client(
|
||||
async with mod.open_history_client(
|
||||
mkt,
|
||||
) as (get_hist, config),
|
||||
|
||||
# NOTE: this sub-nursery splits to tasks for the given
|
||||
# sampling rate to concurrently load offline tsdb
|
||||
# timeseries as well as new data from the venue backend!
|
||||
):
|
||||
) as (get_hist, config):
|
||||
log.info(
|
||||
f'`{mod}` history client returned backfill config:\n'
|
||||
f'{config}\n'
|
||||
)
|
||||
|
||||
dt_eps: list[DateTime, DateTime] = []
|
||||
async with trio.open_nursery() as tn:
|
||||
tn.start_soon(
|
||||
push_latest_frame,
|
||||
dt_eps,
|
||||
shm,
|
||||
get_hist,
|
||||
timeframe,
|
||||
config,
|
||||
)
|
||||
|
||||
# tell parent task to continue
|
||||
# TODO: really we'd want this the other way with the
|
||||
# tsdb load happening asap and the since the latest
|
||||
# frame query will normally be the main source of
|
||||
# latency?
|
||||
task_status.started()
|
||||
|
||||
tsdb_entry: tuple = await load_tsdb_hist(
|
||||
storage,
|
||||
mkt,
|
||||
timeframe,
|
||||
)
|
||||
|
||||
# NOTE: iabs to start backfilling from, reverse chronological,
|
||||
# ONLY AFTER the first history frame has been pushed to
|
||||
# mem!
|
||||
backfill_gap_from_shm_index: int = shm._first.value + 1
|
||||
|
||||
# get latest query's worth of history all the way
|
||||
# back to what is recorded in the tsdb
|
||||
try:
|
||||
(
|
||||
array,
|
||||
mr_start_dt,
|
||||
mr_end_dt,
|
||||
) = dt_eps
|
||||
) = await get_hist(
|
||||
timeframe,
|
||||
end_dt=None,
|
||||
)
|
||||
|
||||
async with trio.open_nursery() as tn:
|
||||
# XXX: timeframe not supported for backend (since
|
||||
# above exception type), terminate immediately since
|
||||
# there's no backfilling possible.
|
||||
except DataUnavailable:
|
||||
task_status.started()
|
||||
await tractor.pause()
|
||||
return
|
||||
|
||||
# Prepend any tsdb history to the shm buffer which should
|
||||
# now be full of the most recent history pulled from the
|
||||
# backend's last frame.
|
||||
if tsdb_entry:
|
||||
# TODO: fill in non-zero epoch time values ALWAYS!
|
||||
# hist_shm._array['time'] = np.arange(
|
||||
# start=
|
||||
|
||||
# NOTE: removed for now since it'll always break
|
||||
# on the first 60s of the venue open..
|
||||
# times: np.ndarray = array['time']
|
||||
# # sample period step size in seconds
|
||||
# step_size_s = (
|
||||
# from_timestamp(times[-1])
|
||||
# - from_timestamp(times[-2])
|
||||
# ).seconds
|
||||
|
||||
# if step_size_s not in (1, 60):
|
||||
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
|
||||
# step_size_s = (
|
||||
# from_timestamp(times[-2])
|
||||
# - from_timestamp(times[-3])
|
||||
# ).seconds
|
||||
|
||||
# NOTE: on the first history, most recent history
|
||||
# frame we PREPEND from the current shm ._last index
|
||||
# and thus a gap between the earliest datum loaded here
|
||||
# and the latest loaded from the tsdb may exist!
|
||||
log.info(f'Pushing {array.size} to shm!')
|
||||
shm.push(
|
||||
array,
|
||||
prepend=True, # append on first frame
|
||||
)
|
||||
backfill_gap_from_shm_index: int = shm._first.value + 1
|
||||
|
||||
# tell parent task to continue
|
||||
task_status.started()
|
||||
|
||||
# loads a (large) frame of data from the tsdb depending
|
||||
# on the db's query size limit; our "nativedb" (using
|
||||
# parquet) generally can load the entire history into mem
|
||||
# but if not then below the remaining history can be lazy
|
||||
# loaded?
|
||||
fqme: str = mkt.fqme
|
||||
last_tsdb_dt: datetime | None = None
|
||||
try:
|
||||
tsdb_entry: tuple | None = await storage.load(
|
||||
fqme,
|
||||
timeframe=timeframe,
|
||||
)
|
||||
except TimeseriesNotFound:
|
||||
log.warning(
|
||||
f'No timeseries yet for {timeframe}@{fqme}'
|
||||
)
|
||||
else:
|
||||
(
|
||||
tsdb_history,
|
||||
first_tsdb_dt,
|
||||
last_tsdb_dt,
|
||||
) = tsdb_entry
|
||||
|
||||
# if there is a gap to backfill from the first
|
||||
# history frame until the last datum loaded from the tsdb
|
||||
# continue that now in the background
|
||||
bf_done = await tn.start(
|
||||
partial(
|
||||
start_backfill,
|
||||
tn=tn,
|
||||
get_hist=get_hist,
|
||||
mod=mod,
|
||||
mkt=mkt,
|
||||
shm=shm,
|
||||
timeframe=timeframe,
|
||||
|
||||
backfill_from_shm_index=backfill_gap_from_shm_index,
|
||||
backfill_from_dt=mr_start_dt,
|
||||
sampler_stream=sampler_stream,
|
||||
backfill_until_dt=last_tsdb_dt,
|
||||
storage=storage,
|
||||
write_tsdb=True,
|
||||
)
|
||||
)
|
||||
|
||||
# calc the index from which the tsdb data should be
|
||||
# prepended, presuming there is a gap between the
|
||||
# latest frame (loaded/read above) and the latest
|
||||
|
@ -943,33 +804,37 @@ async def tsdb_backfill(
|
|||
|
||||
log.info(f'Loaded {to_push.shape} datums from storage')
|
||||
|
||||
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
|
||||
# seemingly missing (null-time) segments..
|
||||
# TODO: ideally these can never exist!
|
||||
# -[ ] somehow it seems sometimes we're writing zero-ed
|
||||
# segments to tsdbs during teardown?
|
||||
# -[ ] can we ensure that the backcfiller tasks do this
|
||||
# work PREVENTAVELY instead?
|
||||
# -[ ] fill in non-zero epoch time values ALWAYS!
|
||||
# await maybe_fill_null_segments(
|
||||
nulls_detected: trio.Event = await tn.start(partial(
|
||||
maybe_fill_null_segments,
|
||||
|
||||
shm=shm,
|
||||
timeframe=timeframe,
|
||||
get_hist=get_hist,
|
||||
sampler_stream=sampler_stream,
|
||||
mkt=mkt,
|
||||
))
|
||||
|
||||
|
||||
# TODO: who would want to?
|
||||
await nulls_detected.wait()
|
||||
|
||||
await bf_done.wait()
|
||||
# TODO: maybe start history anal and load missing "history
|
||||
# gaps" via backend..
|
||||
|
||||
if timeframe not in (1, 60):
|
||||
raise ValueError(
|
||||
'`piker` only needs to support 1m and 1s sampling '
|
||||
'but ur api is trying to deliver a longer '
|
||||
f'timeframe of {timeframe} seconds..\n'
|
||||
'So yuh.. dun do dat brudder.'
|
||||
)
|
||||
|
||||
# if there is a gap to backfill from the first
|
||||
# history frame until the last datum loaded from the tsdb
|
||||
# continue that now in the background
|
||||
bf_done = await tn.start(
|
||||
partial(
|
||||
start_backfill,
|
||||
get_hist=get_hist,
|
||||
mod=mod,
|
||||
mkt=mkt,
|
||||
shm=shm,
|
||||
timeframe=timeframe,
|
||||
backfill_from_shm_index=backfill_gap_from_shm_index,
|
||||
backfill_from_dt=mr_start_dt,
|
||||
sampler_stream=sampler_stream,
|
||||
backfill_until_dt=last_tsdb_dt,
|
||||
storage=storage,
|
||||
write_tsdb=True,
|
||||
)
|
||||
)
|
||||
|
||||
# if len(hist_shm.array) < 2:
|
||||
# TODO: there's an edge case here to solve where if the last
|
||||
# frame before market close (at least on ib) was pushed and
|
||||
|
@ -983,29 +848,32 @@ async def tsdb_backfill(
|
|||
# backload any further data from tsdb (concurrently per
|
||||
# timeframe) if not all data was able to be loaded (in memory)
|
||||
# from the ``StorageClient.load()`` call above.
|
||||
try:
|
||||
await trio.sleep_forever()
|
||||
finally:
|
||||
return
|
||||
|
||||
# XXX NOTE: this is legacy from when we were using
|
||||
# marketstore and we needed to continue backloading
|
||||
# incrementally from the tsdb client.. (bc it couldn't
|
||||
# handle a single large query with gRPC for some
|
||||
# reason.. classic goolag pos)
|
||||
# tn.start_soon(
|
||||
# back_load_from_tsdb,
|
||||
tn.start_soon(
|
||||
back_load_from_tsdb,
|
||||
|
||||
# storemod,
|
||||
# storage,
|
||||
# fqme,
|
||||
storemod,
|
||||
storage,
|
||||
fqme,
|
||||
|
||||
# tsdb_history,
|
||||
# last_tsdb_dt,
|
||||
# mr_start_dt,
|
||||
# mr_end_dt,
|
||||
# bf_done,
|
||||
tsdb_history,
|
||||
last_tsdb_dt,
|
||||
mr_start_dt,
|
||||
mr_end_dt,
|
||||
bf_done,
|
||||
|
||||
# timeframe,
|
||||
# shm,
|
||||
# )
|
||||
timeframe,
|
||||
shm,
|
||||
)
|
||||
|
||||
|
||||
async def manage_history(
|
||||
|
@ -1113,11 +981,6 @@ async def manage_history(
|
|||
|
||||
async with (
|
||||
storage.open_storage_client() as (storemod, client),
|
||||
|
||||
# NOTE: this nursery spawns a task per "timeframe" (aka
|
||||
# sampling period) data set since normally differently
|
||||
# sampled timeseries can be loaded / process independently
|
||||
# ;)
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
log.info(
|
||||
|
@ -1167,6 +1030,7 @@ async def manage_history(
|
|||
tsdb_backfill,
|
||||
mod=mod,
|
||||
storemod=storemod,
|
||||
tn=tn,
|
||||
# bus,
|
||||
storage=client,
|
||||
mkt=mkt,
|
||||
|
@ -1195,70 +1059,3 @@ async def manage_history(
|
|||
# and thus a small RPC-prot for remotely controllinlg
|
||||
# what data is loaded for viewing.
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
def iter_dfs_from_shms(
|
||||
fqme: str
|
||||
) -> Generator[
|
||||
tuple[Path, ShmArray, pl.DataFrame],
|
||||
None,
|
||||
None,
|
||||
]:
|
||||
# shm buffer size table based on known sample rates
|
||||
sizes: dict[str, int] = {
|
||||
'hist': _default_hist_size,
|
||||
'rt': _default_rt_size,
|
||||
}
|
||||
|
||||
# load all detected shm buffer files which have the
|
||||
# passed FQME pattern in the file name.
|
||||
shmfiles: list[Path] = []
|
||||
shmdir = Path('/dev/shm/')
|
||||
|
||||
for shmfile in shmdir.glob(f'*{fqme}*'):
|
||||
filename: str = shmfile.name
|
||||
|
||||
# skip index files
|
||||
if (
|
||||
'_first' in filename
|
||||
or '_last' in filename
|
||||
):
|
||||
continue
|
||||
|
||||
assert shmfile.is_file()
|
||||
log.debug(f'Found matching shm buffer file: {filename}')
|
||||
shmfiles.append(shmfile)
|
||||
|
||||
for shmfile in shmfiles:
|
||||
|
||||
# lookup array buffer size based on file suffix
|
||||
# being either .rt or .hist
|
||||
key: str = shmfile.name.rsplit('.')[-1]
|
||||
|
||||
# skip FSP buffers for now..
|
||||
if key not in sizes:
|
||||
continue
|
||||
|
||||
size: int = sizes[key]
|
||||
|
||||
# attach to any shm buffer, load array into polars df,
|
||||
# write to local parquet file.
|
||||
shm, opened = maybe_open_shm_array(
|
||||
key=shmfile.name,
|
||||
size=size,
|
||||
dtype=def_iohlcv_fields,
|
||||
readonly=True,
|
||||
)
|
||||
assert not opened
|
||||
ohlcv = shm.array
|
||||
|
||||
from ..data import tsp
|
||||
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
||||
|
||||
yield (
|
||||
shmfile,
|
||||
shm,
|
||||
df,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -29,19 +29,12 @@ from math import (
|
|||
floor,
|
||||
)
|
||||
import time
|
||||
from typing import (
|
||||
Literal,
|
||||
# AsyncGenerator,
|
||||
Generator,
|
||||
)
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
from pendulum import (
|
||||
DateTime,
|
||||
from_timestamp,
|
||||
)
|
||||
|
||||
from ._sharedmem import ShmArray
|
||||
from ..toolz.profile import (
|
||||
Profiler,
|
||||
pg_profile_enabled,
|
||||
|
@ -60,14 +53,6 @@ get_console_log = partial(
|
|||
name=subsys,
|
||||
)
|
||||
|
||||
# NOTE: union type-defs to handle generic `numpy` and `polars` types
|
||||
# side-by-side Bo
|
||||
# |_ TODO: schema spec typing?
|
||||
# -[ ] nptyping!
|
||||
# -[ ] wtv we can with polars?
|
||||
Frame = pl.DataFrame | np.ndarray
|
||||
Seq = pl.Series | np.ndarray
|
||||
|
||||
|
||||
def slice_from_time(
|
||||
arr: np.ndarray,
|
||||
|
@ -224,282 +209,51 @@ def slice_from_time(
|
|||
return read_slc
|
||||
|
||||
|
||||
def get_null_segs(
|
||||
frame: Frame,
|
||||
period: float, # sampling step in seconds
|
||||
def detect_null_time_gap(
|
||||
shm: ShmArray,
|
||||
imargin: int = 1,
|
||||
col: str = 'time',
|
||||
|
||||
) -> tuple[
|
||||
# Seq, # TODO: can we make it an array-type instead?
|
||||
list[
|
||||
list[int, int],
|
||||
],
|
||||
Seq,
|
||||
Frame
|
||||
] | None:
|
||||
) -> tuple[float, float] | None:
|
||||
'''
|
||||
Detect if there are any zero(-epoch stamped) valued
|
||||
rows in for the provided `col: str` column; by default
|
||||
presume the 'time' field/column.
|
||||
Detect if there are any zero-epoch stamped rows in
|
||||
the presumed 'time' field-column.
|
||||
|
||||
Filter to all such zero (time) segments and return
|
||||
the corresponding frame zeroed segment's,
|
||||
Filter to the gap and return a surrounding index range.
|
||||
|
||||
- gap absolute (in buffer terms) indices-endpoints as
|
||||
`absi_zsegs`
|
||||
- abs indices of all rows with zeroed `col` values as `absi_zeros`
|
||||
- the corresponding frame's row-entries (view) which are
|
||||
zeroed for the `col` as `zero_t`
|
||||
NOTE: for now presumes only ONE gap XD
|
||||
|
||||
'''
|
||||
times: Seq = frame['time']
|
||||
zero_pred: Seq = (times == 0)
|
||||
# ensure we read buffer state only once so that ShmArray rt
|
||||
# circular-buffer updates don't cause a indexing/size mismatch.
|
||||
array: np.ndarray = shm.array
|
||||
|
||||
if isinstance(frame, np.ndarray):
|
||||
tis_zeros: int = zero_pred.any()
|
||||
else:
|
||||
tis_zeros: int = zero_pred.any()
|
||||
zero_pred: np.ndarray = array['time'] == 0
|
||||
zero_t: np.ndarray = array[zero_pred]
|
||||
|
||||
if zero_t.size:
|
||||
istart, iend = zero_t['index'][[0, -1]]
|
||||
start, end = shm._array['time'][
|
||||
[istart - imargin, iend + imargin]
|
||||
]
|
||||
return (
|
||||
istart - imargin,
|
||||
start,
|
||||
end,
|
||||
iend + imargin,
|
||||
)
|
||||
|
||||
if not tis_zeros:
|
||||
return None
|
||||
|
||||
# TODO: use ndarray for this?!
|
||||
absi_zsegs: list[list[int, int]] = []
|
||||
|
||||
if isinstance(frame, np.ndarray):
|
||||
# view of ONLY the zero segments as one continuous chunk
|
||||
zero_t: np.ndarray = frame[zero_pred]
|
||||
# abs indices of said zeroed rows
|
||||
absi_zeros = zero_t['index']
|
||||
# diff of abs index steps between each zeroed row
|
||||
absi_zdiff: np.ndarray = np.diff(absi_zeros)
|
||||
|
||||
# scan for all frame-indices where the
|
||||
# zeroed-row-abs-index-step-diff is greater then the
|
||||
# expected increment of 1.
|
||||
# data 1st zero seg data zeros
|
||||
# ---- ------------ ---- ----- ------ ----
|
||||
# ||||..000000000000..||||..00000..||||||..0000
|
||||
# ---- ------------ ---- ----- ------ ----
|
||||
# ^zero_t[0] ^zero_t[-1]
|
||||
# ^fi_zgaps[0] ^fi_zgaps[1]
|
||||
# ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple
|
||||
# absi_zsegs[0][1]^
|
||||
#
|
||||
# NOTE: the first entry in `fi_zgaps` is where
|
||||
# the first (absolute) index step diff is > 1.
|
||||
# and it is a frame-relative index into `zero_t`.
|
||||
fi_zgaps = np.argwhere(
|
||||
absi_zdiff > 1
|
||||
# NOTE: +1 here is ensure we index to the "start" of each
|
||||
# segment (if we didn't the below loop needs to be
|
||||
# re-written to expect `fi_end_rows`!
|
||||
) + 1
|
||||
# the rows from the contiguous zeroed segments which have
|
||||
# abs-index steps >1 compared to the previous zero row
|
||||
# (indicating an end of zeroed segment).
|
||||
fi_zseg_start_rows = zero_t[fi_zgaps]
|
||||
|
||||
# TODO: equiv for pl.DataFrame case!
|
||||
else:
|
||||
izeros: pl.Series = zero_pred.arg_true()
|
||||
zero_t: pl.DataFrame = frame[izeros]
|
||||
|
||||
absi_zeros = zero_t['index']
|
||||
absi_zdiff: pl.Series = absi_zeros.diff()
|
||||
fi_zgaps = (absi_zdiff > 1).arg_true()
|
||||
|
||||
# XXX: our goal (in this func) is to select out slice index
|
||||
# pairs (zseg0_start, zseg_end) in abs index units for each
|
||||
# null-segment portion detected throughout entire input frame.
|
||||
|
||||
# only up to one null-segment in entire frame?
|
||||
num_gaps: int = fi_zgaps.size + 1
|
||||
if num_gaps < 1:
|
||||
if absi_zeros.size > 1:
|
||||
absi_zsegs = [[
|
||||
# see `get_hist()` in backend, should ALWAYS be
|
||||
# able to handle a `start_dt=None`!
|
||||
# None,
|
||||
max(
|
||||
absi_zeros[0] - 1,
|
||||
0,
|
||||
),
|
||||
# NOTE: need the + 1 to guarantee we index "up to"
|
||||
# the next non-null row-datum.
|
||||
min(
|
||||
absi_zeros[-1] + 1,
|
||||
frame['index'][-1],
|
||||
),
|
||||
]]
|
||||
else:
|
||||
# XXX EDGE CASE: only one null-datum found so
|
||||
# mark the start abs index as None to trigger
|
||||
# a full frame-len query to the respective backend?
|
||||
absi_zsegs = [[
|
||||
# see `get_hist()` in backend, should ALWAYS be
|
||||
# able to handle a `start_dt=None`!
|
||||
# None,
|
||||
None,
|
||||
absi_zeros[0] + 1,
|
||||
]]
|
||||
|
||||
# XXX NOTE XXX: if >= 2 zeroed segments are found, there should
|
||||
# ALWAYS be more then one zero-segment-abs-index-step-diff row
|
||||
# in `absi_zdiff`, so loop through all such
|
||||
# abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`)
|
||||
# and add them as the "end index" entries for each segment.
|
||||
# Then, iif NOT iterating the first such segment end, look back
|
||||
# for the prior segments zero-segment start indext by relative
|
||||
# indexing the `zero_t` frame by -1 and grabbing the abs index
|
||||
# of what should be the prior zero-segment abs start index.
|
||||
else:
|
||||
# NOTE: since `absi_zdiff` will never have a row
|
||||
# corresponding to the first zero-segment's row, we add it
|
||||
# manually here.
|
||||
absi_zsegs.append([
|
||||
absi_zeros[0] - 1,
|
||||
None,
|
||||
])
|
||||
|
||||
# TODO: can we do it with vec ops?
|
||||
for i, (
|
||||
fi, # frame index of zero-seg start
|
||||
zseg_start_row, # full row for ^
|
||||
) in enumerate(zip(
|
||||
fi_zgaps,
|
||||
fi_zseg_start_rows,
|
||||
)):
|
||||
assert (zseg_start_row == zero_t[fi]).all()
|
||||
iabs: int = zseg_start_row['index'][0]
|
||||
absi_zsegs.append([
|
||||
iabs - 1,
|
||||
None, # backfilled on next iter
|
||||
])
|
||||
|
||||
# row = zero_t[fi]
|
||||
# absi_pre_zseg = row['index'][0] - 1
|
||||
# absi_pre_zseg = absi - 1
|
||||
|
||||
# final iter case, backfill FINAL end iabs!
|
||||
if (i + 1) == fi_zgaps.size:
|
||||
absi_zsegs[-1][1] = absi_zeros[-1] + 1
|
||||
|
||||
# NOTE: only after the first segment (due to `.diff()`
|
||||
# usage above) can we do a lookback to the prior
|
||||
# segment's end row and determine it's abs index to
|
||||
# retroactively insert to the prior
|
||||
# `absi_zsegs[i-1][1]` entry Bo
|
||||
last_end: int = absi_zsegs[i][1]
|
||||
if last_end is None:
|
||||
prev_zseg_row = zero_t[fi - 1]
|
||||
absi_post_zseg = prev_zseg_row['index'][0] + 1
|
||||
# XXX: MUST BACKFILL previous end iabs!
|
||||
absi_zsegs[i][1] = absi_post_zseg
|
||||
|
||||
else:
|
||||
if 0 < num_gaps < 2:
|
||||
absi_zsegs[-1][1] = absi_zeros[-1] + 1
|
||||
|
||||
iabs_first: int = frame['index'][0]
|
||||
for start, end in absi_zsegs:
|
||||
ts_start: float = times[start - iabs_first]
|
||||
ts_end: float = times[end - iabs_first]
|
||||
if (
|
||||
ts_start == 0
|
||||
or
|
||||
ts_end == 0
|
||||
):
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
|
||||
assert end
|
||||
assert start < end
|
||||
|
||||
log.warning(
|
||||
f'Frame has {len(absi_zsegs)} NULL GAPS!?\n'
|
||||
f'period: {period}\n'
|
||||
f'total null samples: {len(zero_t)}\n'
|
||||
)
|
||||
|
||||
return (
|
||||
absi_zsegs, # [start, end] abs slice indices of seg
|
||||
absi_zeros, # all abs indices within all null-segs
|
||||
zero_t, # sliced-view of all null-segment rows-datums
|
||||
)
|
||||
|
||||
|
||||
def iter_null_segs(
|
||||
timeframe: float,
|
||||
frame: Frame | None = None,
|
||||
null_segs: tuple | None = None,
|
||||
|
||||
) -> Generator[
|
||||
tuple[
|
||||
int, int,
|
||||
int, int,
|
||||
float, float,
|
||||
float, float,
|
||||
|
||||
# Seq, # TODO: can we make it an array-type instead?
|
||||
# list[
|
||||
# list[int, int],
|
||||
# ],
|
||||
# Seq,
|
||||
# Frame
|
||||
],
|
||||
None,
|
||||
]:
|
||||
if null_segs is None:
|
||||
null_segs: tuple = get_null_segs(
|
||||
frame,
|
||||
period=timeframe,
|
||||
)
|
||||
|
||||
absi_pairs_zsegs: list[list[float, float]]
|
||||
izeros: Seq
|
||||
zero_t: Frame
|
||||
(
|
||||
absi_pairs_zsegs,
|
||||
izeros,
|
||||
zero_t,
|
||||
) = null_segs
|
||||
|
||||
absi_first: int = frame[0]['index']
|
||||
for (
|
||||
absi_start,
|
||||
absi_end,
|
||||
) in absi_pairs_zsegs:
|
||||
|
||||
fi_end: int = absi_end - absi_first
|
||||
end_row: Seq = frame[fi_end]
|
||||
end_t: float = end_row['time']
|
||||
end_dt: DateTime = from_timestamp(end_t)
|
||||
|
||||
fi_start = None
|
||||
start_row = None
|
||||
start_t = None
|
||||
start_dt = None
|
||||
if (
|
||||
absi_start is not None
|
||||
and start_t != 0
|
||||
):
|
||||
fi_start: int = absi_start - absi_first
|
||||
start_row: Seq = frame[fi_start]
|
||||
start_t: float = start_row['time']
|
||||
start_dt: DateTime = from_timestamp(start_t)
|
||||
|
||||
if absi_start < 0:
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
|
||||
yield (
|
||||
absi_start, absi_end, # abs indices
|
||||
fi_start, fi_end, # relative "frame" indices
|
||||
start_t, end_t,
|
||||
start_dt, end_dt,
|
||||
)
|
||||
t_unit: Literal = Literal[
|
||||
'days',
|
||||
'hours',
|
||||
'minutes',
|
||||
'seconds',
|
||||
'miliseconds',
|
||||
'microseconds',
|
||||
'nanoseconds',
|
||||
]
|
||||
|
||||
|
||||
def with_dts(
|
||||
|
@ -538,17 +292,6 @@ def dedup_dt(
|
|||
)
|
||||
|
||||
|
||||
t_unit: Literal = Literal[
|
||||
'days',
|
||||
'hours',
|
||||
'minutes',
|
||||
'seconds',
|
||||
'miliseconds',
|
||||
'microseconds',
|
||||
'nanoseconds',
|
||||
]
|
||||
|
||||
|
||||
def detect_time_gaps(
|
||||
df: pl.DataFrame,
|
||||
|
||||
|
@ -663,6 +406,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
|||
f'Gaps found:\n{gaps}\n'
|
||||
f'deduped Gaps found:\n{deduped_gaps}'
|
||||
)
|
||||
# TODO: rewrite this in polars and/or convert to
|
||||
# ndarray to detect and remove?
|
||||
# null_gaps = detect_null_time_gap()
|
||||
|
||||
return (
|
||||
df,
|
||||
gaps,
|
||||
|
@ -681,19 +428,14 @@ def sort_diff(
|
|||
list[int], # indices of segments that are out-of-order
|
||||
]:
|
||||
ser: pl.Series = src_df[col]
|
||||
sortd: pl.DataFrame = ser.sort()
|
||||
diff: pl.Series = ser.diff()
|
||||
|
||||
diff: pl.Series = ser.diff()
|
||||
sortd: pl.DataFrame = ser.sort()
|
||||
sortd_diff: pl.Series = sortd.diff()
|
||||
i_step_diff = (diff != sortd_diff).arg_true()
|
||||
frame_reorders: int = i_step_diff.len()
|
||||
if frame_reorders:
|
||||
log.warn(
|
||||
f'Resorted frame on col: {col}\n'
|
||||
f'{frame_reorders}'
|
||||
|
||||
)
|
||||
# import pdbp; pdbp.set_trace()
|
||||
if i_step_diff.len():
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
|
||||
# NOTE: thanks to this SO answer for the below conversion routines
|
||||
# to go from numpy struct-arrays to polars dataframes and back:
|
||||
|
|
|
@ -21,6 +21,8 @@ Storage middle-ware CLIs.
|
|||
from __future__ import annotations
|
||||
from pathlib import Path
|
||||
import time
|
||||
from typing import Generator
|
||||
# from typing import TYPE_CHECKING
|
||||
|
||||
import polars as pl
|
||||
import numpy as np
|
||||
|
@ -35,11 +37,14 @@ from piker.service import open_piker_runtime
|
|||
from piker.cli import cli
|
||||
from piker.config import get_conf_dir
|
||||
from piker.data import (
|
||||
maybe_open_shm_array,
|
||||
def_iohlcv_fields,
|
||||
ShmArray,
|
||||
tsp,
|
||||
)
|
||||
from piker.data.history import (
|
||||
iter_dfs_from_shms,
|
||||
_default_hist_size,
|
||||
_default_rt_size,
|
||||
)
|
||||
from . import (
|
||||
log,
|
||||
|
@ -185,13 +190,6 @@ def anal(
|
|||
)
|
||||
assert first_dt < last_dt
|
||||
|
||||
null_segs: tuple = tsp.get_null_segs(
|
||||
frame=history,
|
||||
period=period,
|
||||
)
|
||||
if null_segs:
|
||||
await tractor.pause()
|
||||
|
||||
shm_df: pl.DataFrame = await client.as_df(
|
||||
fqme,
|
||||
period,
|
||||
|
@ -206,7 +204,6 @@ def anal(
|
|||
diff,
|
||||
) = tsp.dedupe(shm_df)
|
||||
|
||||
|
||||
if diff:
|
||||
await client.write_ohlcv(
|
||||
fqme,
|
||||
|
@ -222,6 +219,69 @@ def anal(
|
|||
trio.run(main)
|
||||
|
||||
|
||||
def iter_dfs_from_shms(fqme: str) -> Generator[
|
||||
tuple[Path, ShmArray, pl.DataFrame],
|
||||
None,
|
||||
None,
|
||||
]:
|
||||
# shm buffer size table based on known sample rates
|
||||
sizes: dict[str, int] = {
|
||||
'hist': _default_hist_size,
|
||||
'rt': _default_rt_size,
|
||||
}
|
||||
|
||||
# load all detected shm buffer files which have the
|
||||
# passed FQME pattern in the file name.
|
||||
shmfiles: list[Path] = []
|
||||
shmdir = Path('/dev/shm/')
|
||||
|
||||
for shmfile in shmdir.glob(f'*{fqme}*'):
|
||||
filename: str = shmfile.name
|
||||
|
||||
# skip index files
|
||||
if (
|
||||
'_first' in filename
|
||||
or '_last' in filename
|
||||
):
|
||||
continue
|
||||
|
||||
assert shmfile.is_file()
|
||||
log.debug(f'Found matching shm buffer file: {filename}')
|
||||
shmfiles.append(shmfile)
|
||||
|
||||
for shmfile in shmfiles:
|
||||
|
||||
# lookup array buffer size based on file suffix
|
||||
# being either .rt or .hist
|
||||
key: str = shmfile.name.rsplit('.')[-1]
|
||||
|
||||
# skip FSP buffers for now..
|
||||
if key not in sizes:
|
||||
continue
|
||||
|
||||
size: int = sizes[key]
|
||||
|
||||
# attach to any shm buffer, load array into polars df,
|
||||
# write to local parquet file.
|
||||
shm, opened = maybe_open_shm_array(
|
||||
key=shmfile.name,
|
||||
size=size,
|
||||
dtype=def_iohlcv_fields,
|
||||
readonly=True,
|
||||
)
|
||||
assert not opened
|
||||
ohlcv = shm.array
|
||||
|
||||
from ..data import tsp
|
||||
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
||||
|
||||
yield (
|
||||
shmfile,
|
||||
shm,
|
||||
df,
|
||||
)
|
||||
|
||||
|
||||
@store.command()
|
||||
def ldshm(
|
||||
fqme: str,
|
||||
|
@ -247,8 +307,8 @@ def ldshm(
|
|||
|
||||
# compute ohlc properties for naming
|
||||
times: np.ndarray = shm.array['time']
|
||||
period_s: float = times[-1] - times[-2]
|
||||
if period_s < 1.:
|
||||
secs: float = times[-1] - times[-2]
|
||||
if secs < 1.:
|
||||
raise ValueError(
|
||||
f'Something is wrong with time period for {shm}:\n{times}'
|
||||
)
|
||||
|
@ -263,22 +323,17 @@ def ldshm(
|
|||
diff,
|
||||
) = tsp.dedupe(shm_df)
|
||||
|
||||
null_segs: tuple = tsp.get_null_segs(
|
||||
frame=shm.array,
|
||||
period=period_s,
|
||||
)
|
||||
|
||||
# TODO: maybe only optionally enter this depending
|
||||
# on some CLI flags and/or gap detection?
|
||||
if not gaps.is_empty():
|
||||
await tractor.pause()
|
||||
|
||||
if null_segs:
|
||||
if (
|
||||
not gaps.is_empty()
|
||||
or secs > 2
|
||||
):
|
||||
await tractor.pause()
|
||||
|
||||
# write to parquet file?
|
||||
if write_parquet:
|
||||
timeframe: str = f'{period_s}s'
|
||||
timeframe: str = f'{secs}s'
|
||||
|
||||
datadir: Path = get_conf_dir() / 'nativedb'
|
||||
if not datadir.is_dir():
|
||||
|
|
|
@ -342,6 +342,7 @@ class NativeStorageClient:
|
|||
)
|
||||
return path
|
||||
|
||||
|
||||
async def write_ohlcv(
|
||||
self,
|
||||
fqme: str,
|
||||
|
|
|
@ -207,10 +207,9 @@ class ContentsLabel(pg.LabelItem):
|
|||
# this being "html" is the dumbest shit :eyeroll:
|
||||
|
||||
self.setText(
|
||||
"<b>i_arr</b>:{index}<br/>"
|
||||
"<b>i</b>:{index}<br/>"
|
||||
# NB: these fields must be indexed in the correct order via
|
||||
# the slice syntax below.
|
||||
"<b>i_shm</b>:{}<br/>"
|
||||
"<b>epoch</b>:{}<br/>"
|
||||
"<b>O</b>:{}<br/>"
|
||||
"<b>H</b>:{}<br/>"
|
||||
|
@ -220,7 +219,6 @@ class ContentsLabel(pg.LabelItem):
|
|||
# "<b>wap</b>:{}".format(
|
||||
*array[ix][
|
||||
[
|
||||
'index',
|
||||
'time',
|
||||
'open',
|
||||
'high',
|
||||
|
|
|
@ -248,27 +248,25 @@ async def increment_history_view(
|
|||
# - samplerd could emit the actual update range via
|
||||
# tuple and then we only enter the below block if that
|
||||
# range is detected as in-view?
|
||||
match msg:
|
||||
case {
|
||||
'backfilling': (viz_name, timeframe),
|
||||
} if (
|
||||
viz_name == name
|
||||
if (
|
||||
(bf_wut := msg.get('backfilling', False))
|
||||
):
|
||||
log.warning(
|
||||
f'Forcing HARD REDRAW:\n'
|
||||
f'name: {name}\n'
|
||||
f'timeframe: {timeframe}\n'
|
||||
)
|
||||
viz_name, timeframe = bf_wut
|
||||
if (
|
||||
viz_name == name
|
||||
|
||||
# TODO: only allow this when the data is IN VIEW!
|
||||
# also, we probably can do this more efficiently
|
||||
# / smarter by only redrawing the portion of the
|
||||
# path necessary?
|
||||
{
|
||||
60: hist_viz,
|
||||
1: viz,
|
||||
}[timeframe].update_graphics(
|
||||
force_redraw=True
|
||||
)
|
||||
and False
|
||||
):
|
||||
log.info(f'Forcing hard redraw -> {name}@{timeframe}')
|
||||
match timeframe:
|
||||
case 60:
|
||||
hist_viz.update_graphics(force_redraw=True)
|
||||
case 1:
|
||||
viz.update_graphics(force_redraw=True)
|
||||
|
||||
# check if slow chart needs an x-domain shift and/or
|
||||
# y-range resize.
|
||||
|
|
Loading…
Reference in New Issue