Compare commits

..

No commits in common. "1f9a4976378f863193572a0b64a5dba5e4aaf672" and "f274c3db3b5cc00abee8cec84ef98bd564e437d2" have entirely different histories.

10 changed files with 472 additions and 929 deletions

View File

@ -401,15 +401,7 @@ class Client:
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
# XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
if (
end_dt
and False
):
if end_dt:
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
first: float = times[0]
@ -418,7 +410,6 @@ class Client:
if (
# len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds()
# and False
):
end_dt: DateTime = from_timestamp(first)
log.warning(
@ -868,9 +859,6 @@ class Client:
timeout=timeout,
)
except TimeoutError:
import pdbp
pdbp.set_trace()
if raise_on_timeout:
raise
return None

View File

@ -174,15 +174,8 @@ async def open_history_client(
start_dt: datetime | None = None,
) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
if (
start_dt
and start_dt.timestamp() == 0
):
await tractor.pause()
query_start = time.time()
out, timedout = await get_bars(
proxy,
@ -410,54 +403,34 @@ async def get_bars(
bars, bars_array, dt_duration = out
if bars_array is None:
raise SymbolNotFound(fqme)
# not enough bars signal, likely due to venue
# operational gaps.
# too_little: bool = False
if end_dt:
if not bars:
# no data returned?
log.warning(
'History frame is blank?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
too_little: bool = False
if (
end_dt
and (
not bars
or (too_little :=
start_dt
and (len(bars) * timeframe)
< dt_duration.in_seconds()
)
raise NoData(f'{end_dt}')
)
):
if (
end_dt
or too_little
):
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
else:
dur_s: float = len(bars) * timeframe
bars_dur = pendulum.Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
raise NoData(f'{end_dt}')
if bars_array is None:
raise SymbolNotFound(fqme)
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
@ -881,13 +854,7 @@ async def stream_quotes(
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
first_ticker: Ticker = await proxy.get_quote(contract=con)
if first_ticker:
first_quote: dict = normalize(first_ticker)
log.info(
@ -895,6 +862,18 @@ async def stream_quotes(
f'{pformat(first_quote)}'
)
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
@ -905,8 +884,6 @@ async def stream_quotes(
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
@ -930,19 +907,6 @@ async def stream_quotes(
await trio.sleep_forever()
return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# XXX: MUST acquire a ticker + first quote before starting
# the live quotes loop!
# with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
cs: trio.CancelScope | None = None
startup: bool = True
while (

View File

@ -41,6 +41,7 @@ from typing import (
import wsproto
from uuid import uuid4
from rapidfuzz import process as fuzzy
from trio_typing import TaskStatus
import asks
from bidict import bidict
@ -415,7 +416,8 @@ class Client:
await self.get_mkt_pairs()
assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
matches: dict[str, KucoinMktPair] = match_from_pairs(
matches: dict[str, Pair] = match_from_pairs(
pairs=self._pairs,
# query=pattern.upper(),
query=pattern.upper(),

View File

@ -31,8 +31,6 @@ from pathlib import Path
from pprint import pformat
from typing import (
Any,
Sequence,
Hashable,
TYPE_CHECKING,
)
from types import ModuleType
@ -130,8 +128,8 @@ class SymbologyCache(Struct):
- `.get_mkt_pairs()`: returning a table of pair-`Struct`
types, custom defined by the particular backend.
AND, the required `.get_mkt_info()` module-level endpoint
which maps `fqme: str` -> `MktPair`s.
AND, the required `.get_mkt_info()` module-level endpoint which
maps `fqme: str` -> `MktPair`s.
These tables are then used to fill out the `.assets`, `.pairs` and
`.mktmaps` tables on this cache instance, respectively.
@ -502,7 +500,7 @@ def match_from_pairs(
)
# pop and repack pairs in output dict
matched_pairs: dict[str, Struct] = {}
matched_pairs: dict[str, Pair] = {}
for item in matches:
pair_key: str = item[0]
matched_pairs[pair_key] = pairs[pair_key]

View File

@ -16,26 +16,19 @@
# <https://www.gnu.org/licenses/>.
'''
Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
orchestration and mgmt of tsdb data sets.
- core data-provider history backfilling middleware (as task-funcs) via
(what will eventually be `datad`, but are rn is the) `.brokers` backend
APIs.
- various data set cleaning, repairing and issue-detection/analysis
routines to ensure consistent series whether in shm or when
stored offline (in a tsdb).
Historical data business logic for load, backfill and tsdb storage.
'''
from __future__ import annotations
# from collections import (
# Counter,
# )
from datetime import datetime
from functools import partial
from pathlib import Path
# import time
from types import ModuleType
from typing import (
Callable,
Generator,
TYPE_CHECKING,
)
@ -43,7 +36,6 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
DateTime,
Duration,
from_timestamp,
)
@ -64,14 +56,7 @@ from ._source import def_iohlcv_fields
from ._sampling import (
open_sample_stream,
)
from .tsp import (
dedupe,
get_null_segs,
iter_null_segs,
sort_diff,
Frame,
# Seq,
)
from . import tsp
from ..brokers._util import (
DataUnavailable,
)
@ -125,7 +110,6 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()]
# TODO: can't we just make this a sync func now?
async def shm_push_in_between(
shm: ShmArray,
to_push: np.ndarray,
@ -134,10 +118,6 @@ async def shm_push_in_between(
update_start_on_prepend: bool = False,
) -> int:
# XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
shm.push(
to_push,
prepend=True,
@ -158,102 +138,10 @@ async def shm_push_in_between(
else None
),
)
async def maybe_fill_null_segments(
shm: ShmArray,
timeframe: float,
get_hist: Callable,
sampler_stream: tractor.MsgStream,
mkt: MktPair,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
) -> list[Frame]:
null_segs_detected = trio.Event()
task_status.started(null_segs_detected)
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
for (
absi_start, absi_end,
fi_start, fi_end,
start_t, end_t,
start_dt, end_dt,
) in iter_null_segs(
null_segs=null_segs,
frame=frame,
timeframe=timeframe,
):
# XXX NOTE: ?if we get a badly ordered timestamp
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
):
await tractor.pause()
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(absi_end, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=absi_end,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
null_segs_detected.set()
# RECHECK for more null-gaps
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if (
null_segs
and
len(null_segs[-1])
):
await tractor.pause()
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
array = shm.array
zeros = array[array['low'] == 0]
@ -267,24 +155,10 @@ async def maybe_fill_null_segments(
'low',
'close',
]] = shm._array[zeros['index'][0] - 1]['close']
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# if (
# next_end_dt not in frame[
# ):
# pass
# await tractor.pause()
async def start_backfill(
tn: trio.Nursery,
get_hist,
mod: ModuleType,
mkt: MktPair,
@ -339,20 +213,27 @@ async def start_backfill(
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B)
# - after this loop continue to check for other gaps in the
# (tsdb) history and (at least report) maybe fill them
# from new frame queries to the backend?
# TODO: can we drop this? without conc i don't think this
# is necessary any more?
# configure async query throttling
# rate = config.get('rate', 1)
# XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts and associated counts of how many duplicates we see
# per time stamp.
# starts: Counter[datetime] = Counter()
# conduct "backward history gap filling" where we push to
# the shm buffer until we have history back until the
# latest entry loaded from the tsdb's table B)
last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index
while last_start_dt > backfill_until_dt:
log.info(
f'Requesting {timeframe}s frame:\n'
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
log.debug(
f'Requesting {timeframe}s frame ending in {last_start_dt}'
)
try:
@ -380,18 +261,36 @@ async def start_backfill(
await tractor.pause()
return
assert (
array['time'][0]
==
next_start_dt.timestamp()
)
# TODO: drop this? see todo above..
# if (
# next_start_dt in starts
# and starts[next_start_dt] <= 6
# ):
# start_dt = min(starts)
# log.warning(
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
# )
# starts[start_dt] += 1
# await tractor.pause()
# continue
# elif starts[next_start_dt] > 6:
# log.warning(
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
# )
# return
# # only update new start point if not-yet-seen
# starts[next_start_dt] += 1
assert array['time'][0] == next_start_dt.timestamp()
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe
frame_size_s = len(array) * timeframe
expected_frame_size_s = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our
@ -399,10 +298,8 @@ async def start_backfill(
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
log.warning(
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
f'History frame ending @ {last_start_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds'
)
to_push = diff_history(
@ -518,15 +415,74 @@ async def start_backfill(
gaps,
deduped,
diff,
) = dedupe(df)
) = tsp.dedupe(df)
if diff:
sort_diff(df)
tsp.sort_diff(df)
else:
# finally filled gap
log.info(
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
)
# conduct tsdb timestamp gap detection and backfill any
# seemingly missing sequence segments..
# TODO: ideally these never exist but somehow it seems
# sometimes we're writing zero-ed segments on certain
# (teardown) cases?
from .tsp import detect_null_time_gap
gap_indices: tuple | None = detect_null_time_gap(shm)
while gap_indices:
(
istart,
start,
end,
iend,
) = gap_indices
start_dt = from_timestamp(start)
end_dt = from_timestamp(end)
# if we get a baddly ordered timestamp
# pair, imeeditately stop backfilling.
if end_dt < start_dt:
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(iend, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=iend,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
'backfilling': (mkt.fqme, timeframe),
},
})
gap_indices: tuple | None = detect_null_time_gap(shm)
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
@ -538,12 +494,6 @@ async def start_backfill(
bf_done.set()
# NOTE: originally this was used to cope with a tsdb (marketstore)
# which could not delivery very large frames of history over gRPC
# (thanks goolag) due to corruption issues. NOW, using apache
# parquet (by default in the local filesys) we don't have this
# requirement since the files can be loaded very quickly in
# entirety to memory via
async def back_load_from_tsdb(
storemod: ModuleType,
storage: StorageClient,
@ -681,94 +631,10 @@ async def back_load_from_tsdb(
# await sampler_stream.send('broadcast_all')
async def push_latest_frame(
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
],
timeframe: float,
config: dict,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
end_dt=None,
)
# so caller can access these ep values
dt_eps.extend([
mr_start_dt,
mr_end_dt,
])
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
if timeframe > 1:
await tractor.pause()
return
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
) -> tuple[
np.ndarray,
DateTime,
DateTime,
] | None:
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
tsdb_entry: tuple[
np.ndarray,
DateTime,
DateTime,
]
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
return tsdb_entry
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
return None
async def tsdb_backfill(
mod: ModuleType,
storemod: ModuleType,
tn: trio.Nursery,
storage: StorageClient,
mkt: MktPair,
@ -783,193 +649,192 @@ async def tsdb_backfill(
) -> None:
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with (
mod.open_history_client(
mkt,
) as (get_hist, config),
# NOTE: this sub-nursery splits to tasks for the given
# sampling rate to concurrently load offline tsdb
# timeseries as well as new data from the venue backend!
):
async with mod.open_history_client(
mkt,
) as (get_hist, config):
log.info(
f'`{mod}` history client returned backfill config:\n'
f'{config}\n'
)
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
push_latest_frame,
dt_eps,
shm,
get_hist,
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
config,
end_dt=None,
)
# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
await tractor.pause()
return
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)
# TODO: fill in non-zero epoch time values ALWAYS!
# hist_shm._array['time'] = np.arange(
# start=
# NOTE: iabs to start backfilling from, reverse chronological,
# ONLY AFTER the first history frame has been pushed to
# mem!
# NOTE: removed for now since it'll always break
# on the first 60s of the venue open..
# times: np.ndarray = array['time']
# # sample period step size in seconds
# step_size_s = (
# from_timestamp(times[-1])
# - from_timestamp(times[-2])
# ).seconds
# if step_size_s not in (1, 60):
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
# step_size_s = (
# from_timestamp(times[-2])
# - from_timestamp(times[-3])
# ).seconds
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
backfill_gap_from_shm_index: int = shm._first.value + 1
(
mr_start_dt,
mr_end_dt,
) = dt_eps
# tell parent task to continue
task_status.started()
async with trio.open_nursery() as tn:
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
last_tsdb_dt: datetime | None = None
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
else:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if tsdb_entry:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
tn=tn,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
log.info(f'Loaded {to_push.shape} datums from storage')
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
))
# TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
@ -983,29 +848,32 @@ async def tsdb_backfill(
# backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above.
await trio.sleep_forever()
try:
await trio.sleep_forever()
finally:
return
# XXX NOTE: this is legacy from when we were using
# marketstore and we needed to continue backloading
# incrementally from the tsdb client.. (bc it couldn't
# handle a single large query with gRPC for some
# reason.. classic goolag pos)
# tn.start_soon(
# back_load_from_tsdb,
tn.start_soon(
back_load_from_tsdb,
# storemod,
# storage,
# fqme,
storemod,
storage,
fqme,
# tsdb_history,
# last_tsdb_dt,
# mr_start_dt,
# mr_end_dt,
# bf_done,
tsdb_history,
last_tsdb_dt,
mr_start_dt,
mr_end_dt,
bf_done,
# timeframe,
# shm,
# )
timeframe,
shm,
)
async def manage_history(
@ -1113,11 +981,6 @@ async def manage_history(
async with (
storage.open_storage_client() as (storemod, client),
# NOTE: this nursery spawns a task per "timeframe" (aka
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
trio.open_nursery() as tn,
):
log.info(
@ -1167,6 +1030,7 @@ async def manage_history(
tsdb_backfill,
mod=mod,
storemod=storemod,
tn=tn,
# bus,
storage=client,
mkt=mkt,
@ -1195,70 +1059,3 @@ async def manage_history(
# and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing.
await trio.sleep_forever()
def iter_dfs_from_shms(
fqme: str
) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)

View File

@ -29,19 +29,12 @@ from math import (
floor,
)
import time
from typing import (
Literal,
# AsyncGenerator,
Generator,
)
from typing import Literal
import numpy as np
import polars as pl
from pendulum import (
DateTime,
from_timestamp,
)
from ._sharedmem import ShmArray
from ..toolz.profile import (
Profiler,
pg_profile_enabled,
@ -60,14 +53,6 @@ get_console_log = partial(
name=subsys,
)
# NOTE: union type-defs to handle generic `numpy` and `polars` types
# side-by-side Bo
# |_ TODO: schema spec typing?
# -[ ] nptyping!
# -[ ] wtv we can with polars?
Frame = pl.DataFrame | np.ndarray
Seq = pl.Series | np.ndarray
def slice_from_time(
arr: np.ndarray,
@ -224,282 +209,51 @@ def slice_from_time(
return read_slc
def get_null_segs(
frame: Frame,
period: float, # sampling step in seconds
def detect_null_time_gap(
shm: ShmArray,
imargin: int = 1,
col: str = 'time',
) -> tuple[
# Seq, # TODO: can we make it an array-type instead?
list[
list[int, int],
],
Seq,
Frame
] | None:
) -> tuple[float, float] | None:
'''
Detect if there are any zero(-epoch stamped) valued
rows in for the provided `col: str` column; by default
presume the 'time' field/column.
Detect if there are any zero-epoch stamped rows in
the presumed 'time' field-column.
Filter to all such zero (time) segments and return
the corresponding frame zeroed segment's,
Filter to the gap and return a surrounding index range.
- gap absolute (in buffer terms) indices-endpoints as
`absi_zsegs`
- abs indices of all rows with zeroed `col` values as `absi_zeros`
- the corresponding frame's row-entries (view) which are
zeroed for the `col` as `zero_t`
NOTE: for now presumes only ONE gap XD
'''
times: Seq = frame['time']
zero_pred: Seq = (times == 0)
# ensure we read buffer state only once so that ShmArray rt
# circular-buffer updates don't cause a indexing/size mismatch.
array: np.ndarray = shm.array
if isinstance(frame, np.ndarray):
tis_zeros: int = zero_pred.any()
else:
tis_zeros: int = zero_pred.any()
zero_pred: np.ndarray = array['time'] == 0
zero_t: np.ndarray = array[zero_pred]
if not tis_zeros:
return None
# TODO: use ndarray for this?!
absi_zsegs: list[list[int, int]] = []
if isinstance(frame, np.ndarray):
# view of ONLY the zero segments as one continuous chunk
zero_t: np.ndarray = frame[zero_pred]
# abs indices of said zeroed rows
absi_zeros = zero_t['index']
# diff of abs index steps between each zeroed row
absi_zdiff: np.ndarray = np.diff(absi_zeros)
# scan for all frame-indices where the
# zeroed-row-abs-index-step-diff is greater then the
# expected increment of 1.
# data 1st zero seg data zeros
# ---- ------------ ---- ----- ------ ----
# ||||..000000000000..||||..00000..||||||..0000
# ---- ------------ ---- ----- ------ ----
# ^zero_t[0] ^zero_t[-1]
# ^fi_zgaps[0] ^fi_zgaps[1]
# ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple
# absi_zsegs[0][1]^
#
# NOTE: the first entry in `fi_zgaps` is where
# the first (absolute) index step diff is > 1.
# and it is a frame-relative index into `zero_t`.
fi_zgaps = np.argwhere(
absi_zdiff > 1
# NOTE: +1 here is ensure we index to the "start" of each
# segment (if we didn't the below loop needs to be
# re-written to expect `fi_end_rows`!
) + 1
# the rows from the contiguous zeroed segments which have
# abs-index steps >1 compared to the previous zero row
# (indicating an end of zeroed segment).
fi_zseg_start_rows = zero_t[fi_zgaps]
# TODO: equiv for pl.DataFrame case!
else:
izeros: pl.Series = zero_pred.arg_true()
zero_t: pl.DataFrame = frame[izeros]
absi_zeros = zero_t['index']
absi_zdiff: pl.Series = absi_zeros.diff()
fi_zgaps = (absi_zdiff > 1).arg_true()
# XXX: our goal (in this func) is to select out slice index
# pairs (zseg0_start, zseg_end) in abs index units for each
# null-segment portion detected throughout entire input frame.
# only up to one null-segment in entire frame?
num_gaps: int = fi_zgaps.size + 1
if num_gaps < 1:
if absi_zeros.size > 1:
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
max(
absi_zeros[0] - 1,
0,
),
# NOTE: need the + 1 to guarantee we index "up to"
# the next non-null row-datum.
min(
absi_zeros[-1] + 1,
frame['index'][-1],
),
]]
else:
# XXX EDGE CASE: only one null-datum found so
# mark the start abs index as None to trigger
# a full frame-len query to the respective backend?
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
None,
absi_zeros[0] + 1,
]]
# XXX NOTE XXX: if >= 2 zeroed segments are found, there should
# ALWAYS be more then one zero-segment-abs-index-step-diff row
# in `absi_zdiff`, so loop through all such
# abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`)
# and add them as the "end index" entries for each segment.
# Then, iif NOT iterating the first such segment end, look back
# for the prior segments zero-segment start indext by relative
# indexing the `zero_t` frame by -1 and grabbing the abs index
# of what should be the prior zero-segment abs start index.
else:
# NOTE: since `absi_zdiff` will never have a row
# corresponding to the first zero-segment's row, we add it
# manually here.
absi_zsegs.append([
absi_zeros[0] - 1,
None,
])
# TODO: can we do it with vec ops?
for i, (
fi, # frame index of zero-seg start
zseg_start_row, # full row for ^
) in enumerate(zip(
fi_zgaps,
fi_zseg_start_rows,
)):
assert (zseg_start_row == zero_t[fi]).all()
iabs: int = zseg_start_row['index'][0]
absi_zsegs.append([
iabs - 1,
None, # backfilled on next iter
])
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
# absi_pre_zseg = absi - 1
# final iter case, backfill FINAL end iabs!
if (i + 1) == fi_zgaps.size:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
# NOTE: only after the first segment (due to `.diff()`
# usage above) can we do a lookback to the prior
# segment's end row and determine it's abs index to
# retroactively insert to the prior
# `absi_zsegs[i-1][1]` entry Bo
last_end: int = absi_zsegs[i][1]
if last_end is None:
prev_zseg_row = zero_t[fi - 1]
absi_post_zseg = prev_zseg_row['index'][0] + 1
# XXX: MUST BACKFILL previous end iabs!
absi_zsegs[i][1] = absi_post_zseg
else:
if 0 < num_gaps < 2:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
iabs_first: int = frame['index'][0]
for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first]
if (
ts_start == 0
or
ts_end == 0
):
import pdbp
pdbp.set_trace()
assert end
assert start < end
log.warning(
f'Frame has {len(absi_zsegs)} NULL GAPS!?\n'
f'period: {period}\n'
f'total null samples: {len(zero_t)}\n'
)
return (
absi_zsegs, # [start, end] abs slice indices of seg
absi_zeros, # all abs indices within all null-segs
zero_t, # sliced-view of all null-segment rows-datums
)
def iter_null_segs(
timeframe: float,
frame: Frame | None = None,
null_segs: tuple | None = None,
) -> Generator[
tuple[
int, int,
int, int,
float, float,
float, float,
# Seq, # TODO: can we make it an array-type instead?
# list[
# list[int, int],
# ],
# Seq,
# Frame
],
None,
]:
if null_segs is None:
null_segs: tuple = get_null_segs(
frame,
period=timeframe,
if zero_t.size:
istart, iend = zero_t['index'][[0, -1]]
start, end = shm._array['time'][
[istart - imargin, iend + imargin]
]
return (
istart - imargin,
start,
end,
iend + imargin,
)
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
return None
absi_first: int = frame[0]['index']
for (
absi_start,
absi_end,
) in absi_pairs_zsegs:
fi_end: int = absi_end - absi_first
end_row: Seq = frame[fi_end]
end_t: float = end_row['time']
end_dt: DateTime = from_timestamp(end_t)
fi_start = None
start_row = None
start_t = None
start_dt = None
if (
absi_start is not None
and start_t != 0
):
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)
if absi_start < 0:
import pdbp
pdbp.set_trace()
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
)
t_unit: Literal = Literal[
'days',
'hours',
'minutes',
'seconds',
'miliseconds',
'microseconds',
'nanoseconds',
]
def with_dts(
@ -538,17 +292,6 @@ def dedup_dt(
)
t_unit: Literal = Literal[
'days',
'hours',
'minutes',
'seconds',
'miliseconds',
'microseconds',
'nanoseconds',
]
def detect_time_gaps(
df: pl.DataFrame,
@ -663,6 +406,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = detect_null_time_gap()
return (
df,
gaps,
@ -681,19 +428,14 @@ def sort_diff(
list[int], # indices of segments that are out-of-order
]:
ser: pl.Series = src_df[col]
sortd: pl.DataFrame = ser.sort()
diff: pl.Series = ser.diff()
diff: pl.Series = ser.diff()
sortd: pl.DataFrame = ser.sort()
sortd_diff: pl.Series = sortd.diff()
i_step_diff = (diff != sortd_diff).arg_true()
frame_reorders: int = i_step_diff.len()
if frame_reorders:
log.warn(
f'Resorted frame on col: {col}\n'
f'{frame_reorders}'
)
# import pdbp; pdbp.set_trace()
if i_step_diff.len():
import pdbp
pdbp.set_trace()
# NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back:

View File

@ -21,6 +21,8 @@ Storage middle-ware CLIs.
from __future__ import annotations
from pathlib import Path
import time
from typing import Generator
# from typing import TYPE_CHECKING
import polars as pl
import numpy as np
@ -35,11 +37,14 @@ from piker.service import open_piker_runtime
from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.data.history import (
iter_dfs_from_shms,
_default_hist_size,
_default_rt_size,
)
from . import (
log,
@ -185,13 +190,6 @@ def anal(
)
assert first_dt < last_dt
null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
if null_segs:
await tractor.pause()
shm_df: pl.DataFrame = await client.as_df(
fqme,
period,
@ -206,7 +204,6 @@ def anal(
diff,
) = tsp.dedupe(shm_df)
if diff:
await client.write_ohlcv(
fqme,
@ -222,6 +219,69 @@ def anal(
trio.run(main)
def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)
@store.command()
def ldshm(
fqme: str,
@ -247,8 +307,8 @@ def ldshm(
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
period_s: float = times[-1] - times[-2]
if period_s < 1.:
secs: float = times[-1] - times[-2]
if secs < 1.:
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
@ -263,22 +323,17 @@ def ldshm(
diff,
) = tsp.dedupe(shm_df)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if not gaps.is_empty():
await tractor.pause()
if null_segs:
if (
not gaps.is_empty()
or secs > 2
):
await tractor.pause()
# write to parquet file?
if write_parquet:
timeframe: str = f'{period_s}s'
timeframe: str = f'{secs}s'
datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():

View File

@ -342,6 +342,7 @@ class NativeStorageClient:
)
return path
async def write_ohlcv(
self,
fqme: str,

View File

@ -207,10 +207,9 @@ class ContentsLabel(pg.LabelItem):
# this being "html" is the dumbest shit :eyeroll:
self.setText(
"<b>i_arr</b>:{index}<br/>"
"<b>i</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via
# the slice syntax below.
"<b>i_shm</b>:{}<br/>"
"<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>"
@ -220,7 +219,6 @@ class ContentsLabel(pg.LabelItem):
# "<b>wap</b>:{}".format(
*array[ix][
[
'index',
'time',
'open',
'high',

View File

@ -248,27 +248,25 @@ async def increment_history_view(
# - samplerd could emit the actual update range via
# tuple and then we only enter the below block if that
# range is detected as in-view?
match msg:
case {
'backfilling': (viz_name, timeframe),
} if (
if (
(bf_wut := msg.get('backfilling', False))
):
viz_name, timeframe = bf_wut
if (
viz_name == name
):
log.warning(
f'Forcing HARD REDRAW:\n'
f'name: {name}\n'
f'timeframe: {timeframe}\n'
)
# TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the
# path necessary?
{
60: hist_viz,
1: viz,
}[timeframe].update_graphics(
force_redraw=True
)
and False
):
log.info(f'Forcing hard redraw -> {name}@{timeframe}')
match timeframe:
case 60:
hist_viz.update_graphics(force_redraw=True)
case 1:
viz.update_graphics(force_redraw=True)
# check if slow chart needs an x-domain shift and/or
# y-range resize.