Compare commits
11 Commits
f274c3db3b
...
1f9a497637
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 1f9a497637 | |
Tyler Goodlet | 40c5d88a9b | |
Tyler Goodlet | 8989c73a93 | |
Tyler Goodlet | 3639f360c3 | |
Tyler Goodlet | afd0781b62 | |
Tyler Goodlet | ba154ef413 | |
Tyler Goodlet | 97e2403fb1 | |
Tyler Goodlet | a4084d6a0b | |
Tyler Goodlet | 83bdca46a2 | |
Tyler Goodlet | c129f5bb4a | |
Tyler Goodlet | c4853a3fee |
|
@ -401,7 +401,15 @@ class Client:
|
||||||
# => we recursively call this method until we get at least
|
# => we recursively call this method until we get at least
|
||||||
# as many bars such that they sum in aggregate to the the
|
# as many bars such that they sum in aggregate to the the
|
||||||
# desired total time (duration) at most.
|
# desired total time (duration) at most.
|
||||||
if end_dt:
|
# XXX XXX XXX
|
||||||
|
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
|
||||||
|
# XXX XXX XXX
|
||||||
|
# - if you query over a gap and get no data
|
||||||
|
# that may short circuit the history
|
||||||
|
if (
|
||||||
|
end_dt
|
||||||
|
and False
|
||||||
|
):
|
||||||
nparr: np.ndarray = bars_to_np(bars)
|
nparr: np.ndarray = bars_to_np(bars)
|
||||||
times: np.ndarray = nparr['time']
|
times: np.ndarray = nparr['time']
|
||||||
first: float = times[0]
|
first: float = times[0]
|
||||||
|
@ -410,6 +418,7 @@ class Client:
|
||||||
if (
|
if (
|
||||||
# len(bars) * sample_period_s) < dt_duration.in_seconds()
|
# len(bars) * sample_period_s) < dt_duration.in_seconds()
|
||||||
tdiff < dt_duration.in_seconds()
|
tdiff < dt_duration.in_seconds()
|
||||||
|
# and False
|
||||||
):
|
):
|
||||||
end_dt: DateTime = from_timestamp(first)
|
end_dt: DateTime = from_timestamp(first)
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -859,6 +868,9 @@ class Client:
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
|
import pdbp
|
||||||
|
pdbp.set_trace()
|
||||||
|
|
||||||
if raise_on_timeout:
|
if raise_on_timeout:
|
||||||
raise
|
raise
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -174,8 +174,15 @@ async def open_history_client(
|
||||||
start_dt: datetime | None = None,
|
start_dt: datetime | None = None,
|
||||||
|
|
||||||
) -> tuple[np.ndarray, str]:
|
) -> tuple[np.ndarray, str]:
|
||||||
|
|
||||||
nonlocal max_timeout, mean, count
|
nonlocal max_timeout, mean, count
|
||||||
|
|
||||||
|
if (
|
||||||
|
start_dt
|
||||||
|
and start_dt.timestamp() == 0
|
||||||
|
):
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
query_start = time.time()
|
query_start = time.time()
|
||||||
out, timedout = await get_bars(
|
out, timedout = await get_bars(
|
||||||
proxy,
|
proxy,
|
||||||
|
@ -403,35 +410,55 @@ async def get_bars(
|
||||||
|
|
||||||
bars, bars_array, dt_duration = out
|
bars, bars_array, dt_duration = out
|
||||||
|
|
||||||
# not enough bars signal, likely due to venue
|
|
||||||
# operational gaps.
|
|
||||||
too_little: bool = False
|
|
||||||
if (
|
|
||||||
end_dt
|
|
||||||
and (
|
|
||||||
not bars
|
|
||||||
or (too_little :=
|
|
||||||
start_dt
|
|
||||||
and (len(bars) * timeframe)
|
|
||||||
< dt_duration.in_seconds()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
):
|
|
||||||
if (
|
|
||||||
end_dt
|
|
||||||
or too_little
|
|
||||||
):
|
|
||||||
log.warning(
|
|
||||||
f'History is blank for {dt_duration} from {end_dt}'
|
|
||||||
)
|
|
||||||
end_dt -= dt_duration
|
|
||||||
continue
|
|
||||||
|
|
||||||
raise NoData(f'{end_dt}')
|
|
||||||
|
|
||||||
if bars_array is None:
|
if bars_array is None:
|
||||||
raise SymbolNotFound(fqme)
|
raise SymbolNotFound(fqme)
|
||||||
|
|
||||||
|
# not enough bars signal, likely due to venue
|
||||||
|
# operational gaps.
|
||||||
|
# too_little: bool = False
|
||||||
|
if end_dt:
|
||||||
|
if not bars:
|
||||||
|
# no data returned?
|
||||||
|
log.warning(
|
||||||
|
'History frame is blank?\n'
|
||||||
|
f'start_dt: {start_dt}\n'
|
||||||
|
f'end_dt: {end_dt}\n'
|
||||||
|
f'duration: {dt_duration}\n'
|
||||||
|
)
|
||||||
|
raise NoData(f'{end_dt}')
|
||||||
|
|
||||||
|
else:
|
||||||
|
dur_s: float = len(bars) * timeframe
|
||||||
|
bars_dur = pendulum.Duration(seconds=dur_s)
|
||||||
|
dt_dur_s: float = dt_duration.in_seconds()
|
||||||
|
if dur_s < dt_dur_s:
|
||||||
|
log.warning(
|
||||||
|
'History frame is shorter then expected?\n'
|
||||||
|
f'start_dt: {start_dt}\n'
|
||||||
|
f'end_dt: {end_dt}\n'
|
||||||
|
f'duration: {dt_dur_s}\n'
|
||||||
|
f'frame duration seconds: {dur_s}\n'
|
||||||
|
f'dur diff: {dt_duration - bars_dur}\n'
|
||||||
|
)
|
||||||
|
# NOTE: we used to try to get a minimal
|
||||||
|
# set of bars by recursing but this ran
|
||||||
|
# into possible infinite query loops
|
||||||
|
# when logic in the `Client.bars()` dt
|
||||||
|
# diffing went bad. So instead for now
|
||||||
|
# we just return the
|
||||||
|
# shorter-then-expected history with
|
||||||
|
# a warning.
|
||||||
|
# TODO: in the future it prolly makes
|
||||||
|
# the most send to do venue operating
|
||||||
|
# hours lookup and
|
||||||
|
# timestamp-in-operating-range set
|
||||||
|
# checking to know for sure if we can
|
||||||
|
# safely and quickly ignore non-uniform history
|
||||||
|
# frame timestamp gaps..
|
||||||
|
# end_dt -= dt_duration
|
||||||
|
# continue
|
||||||
|
# await tractor.pause()
|
||||||
|
|
||||||
first_dt = pendulum.from_timestamp(
|
first_dt = pendulum.from_timestamp(
|
||||||
bars[0].date.timestamp())
|
bars[0].date.timestamp())
|
||||||
|
|
||||||
|
@ -854,7 +881,13 @@ async def stream_quotes(
|
||||||
init_msgs.append(init_msg)
|
init_msgs.append(init_msg)
|
||||||
|
|
||||||
con: Contract = details.contract
|
con: Contract = details.contract
|
||||||
first_ticker: Ticker = await proxy.get_quote(contract=con)
|
first_ticker: Ticker | None = None
|
||||||
|
with trio.move_on_after(1):
|
||||||
|
first_ticker: Ticker = await proxy.get_quote(
|
||||||
|
contract=con,
|
||||||
|
raise_on_timeout=False,
|
||||||
|
)
|
||||||
|
|
||||||
if first_ticker:
|
if first_ticker:
|
||||||
first_quote: dict = normalize(first_ticker)
|
first_quote: dict = normalize(first_ticker)
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -862,18 +895,6 @@ async def stream_quotes(
|
||||||
f'{pformat(first_quote)}'
|
f'{pformat(first_quote)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: we should instead spawn a task that waits on a feed
|
|
||||||
# to start and let it wait indefinitely..instead of this
|
|
||||||
# hard coded stuff.
|
|
||||||
# async def wait_for_first_quote():
|
|
||||||
# with trio.CancelScope() as cs:
|
|
||||||
|
|
||||||
with trio.move_on_after(1):
|
|
||||||
first_ticker = await proxy.get_quote(
|
|
||||||
contract=con,
|
|
||||||
raise_on_timeout=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: it might be outside regular trading hours for
|
# NOTE: it might be outside regular trading hours for
|
||||||
# assets with "standard venue operating hours" so we
|
# assets with "standard venue operating hours" so we
|
||||||
# only "pretend the feed is live" when the dst asset
|
# only "pretend the feed is live" when the dst asset
|
||||||
|
@ -884,6 +905,8 @@ async def stream_quotes(
|
||||||
# (equitiies, futes, bonds etc.) we at least try to
|
# (equitiies, futes, bonds etc.) we at least try to
|
||||||
# grab the OHLC history.
|
# grab the OHLC history.
|
||||||
if (
|
if (
|
||||||
|
first_ticker
|
||||||
|
and
|
||||||
isnan(first_ticker.last)
|
isnan(first_ticker.last)
|
||||||
# SO, if the last quote price value is NaN we ONLY
|
# SO, if the last quote price value is NaN we ONLY
|
||||||
# "pretend to do" `feed_is_live.set()` if it's a known
|
# "pretend to do" `feed_is_live.set()` if it's a known
|
||||||
|
@ -907,6 +930,19 @@ async def stream_quotes(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
return # we never expect feed to come up?
|
return # we never expect feed to come up?
|
||||||
|
|
||||||
|
# TODO: we should instead spawn a task that waits on a feed
|
||||||
|
# to start and let it wait indefinitely..instead of this
|
||||||
|
# hard coded stuff.
|
||||||
|
# async def wait_for_first_quote():
|
||||||
|
# with trio.CancelScope() as cs:
|
||||||
|
|
||||||
|
# XXX: MUST acquire a ticker + first quote before starting
|
||||||
|
# the live quotes loop!
|
||||||
|
# with trio.move_on_after(1):
|
||||||
|
first_ticker = await proxy.get_quote(
|
||||||
|
contract=con,
|
||||||
|
raise_on_timeout=True,
|
||||||
|
)
|
||||||
cs: trio.CancelScope | None = None
|
cs: trio.CancelScope | None = None
|
||||||
startup: bool = True
|
startup: bool = True
|
||||||
while (
|
while (
|
||||||
|
|
|
@ -41,7 +41,6 @@ from typing import (
|
||||||
import wsproto
|
import wsproto
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from rapidfuzz import process as fuzzy
|
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import asks
|
import asks
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
|
@ -416,8 +415,7 @@ class Client:
|
||||||
await self.get_mkt_pairs()
|
await self.get_mkt_pairs()
|
||||||
assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
|
assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
|
||||||
|
|
||||||
|
matches: dict[str, KucoinMktPair] = match_from_pairs(
|
||||||
matches: dict[str, Pair] = match_from_pairs(
|
|
||||||
pairs=self._pairs,
|
pairs=self._pairs,
|
||||||
# query=pattern.upper(),
|
# query=pattern.upper(),
|
||||||
query=pattern.upper(),
|
query=pattern.upper(),
|
||||||
|
|
|
@ -31,6 +31,8 @@ from pathlib import Path
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
Sequence,
|
||||||
|
Hashable,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
@ -128,8 +130,8 @@ class SymbologyCache(Struct):
|
||||||
- `.get_mkt_pairs()`: returning a table of pair-`Struct`
|
- `.get_mkt_pairs()`: returning a table of pair-`Struct`
|
||||||
types, custom defined by the particular backend.
|
types, custom defined by the particular backend.
|
||||||
|
|
||||||
AND, the required `.get_mkt_info()` module-level endpoint which
|
AND, the required `.get_mkt_info()` module-level endpoint
|
||||||
maps `fqme: str` -> `MktPair`s.
|
which maps `fqme: str` -> `MktPair`s.
|
||||||
|
|
||||||
These tables are then used to fill out the `.assets`, `.pairs` and
|
These tables are then used to fill out the `.assets`, `.pairs` and
|
||||||
`.mktmaps` tables on this cache instance, respectively.
|
`.mktmaps` tables on this cache instance, respectively.
|
||||||
|
@ -500,7 +502,7 @@ def match_from_pairs(
|
||||||
)
|
)
|
||||||
|
|
||||||
# pop and repack pairs in output dict
|
# pop and repack pairs in output dict
|
||||||
matched_pairs: dict[str, Pair] = {}
|
matched_pairs: dict[str, Struct] = {}
|
||||||
for item in matches:
|
for item in matches:
|
||||||
pair_key: str = item[0]
|
pair_key: str = item[0]
|
||||||
matched_pairs[pair_key] = pairs[pair_key]
|
matched_pairs[pair_key] = pairs[pair_key]
|
||||||
|
|
|
@ -16,19 +16,26 @@
|
||||||
# <https://www.gnu.org/licenses/>.
|
# <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Historical data business logic for load, backfill and tsdb storage.
|
Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
|
||||||
|
|
||||||
|
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
|
||||||
|
orchestration and mgmt of tsdb data sets.
|
||||||
|
- core data-provider history backfilling middleware (as task-funcs) via
|
||||||
|
(what will eventually be `datad`, but are rn is the) `.brokers` backend
|
||||||
|
APIs.
|
||||||
|
- various data set cleaning, repairing and issue-detection/analysis
|
||||||
|
routines to ensure consistent series whether in shm or when
|
||||||
|
stored offline (in a tsdb).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
# from collections import (
|
|
||||||
# Counter,
|
|
||||||
# )
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import partial
|
from functools import partial
|
||||||
# import time
|
from pathlib import Path
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
|
Generator,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,6 +43,7 @@ import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from pendulum import (
|
from pendulum import (
|
||||||
|
DateTime,
|
||||||
Duration,
|
Duration,
|
||||||
from_timestamp,
|
from_timestamp,
|
||||||
)
|
)
|
||||||
|
@ -56,7 +64,14 @@ from ._source import def_iohlcv_fields
|
||||||
from ._sampling import (
|
from ._sampling import (
|
||||||
open_sample_stream,
|
open_sample_stream,
|
||||||
)
|
)
|
||||||
from . import tsp
|
from .tsp import (
|
||||||
|
dedupe,
|
||||||
|
get_null_segs,
|
||||||
|
iter_null_segs,
|
||||||
|
sort_diff,
|
||||||
|
Frame,
|
||||||
|
# Seq,
|
||||||
|
)
|
||||||
from ..brokers._util import (
|
from ..brokers._util import (
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
)
|
)
|
||||||
|
@ -110,6 +125,7 @@ def diff_history(
|
||||||
return array[times >= prepend_until_dt.timestamp()]
|
return array[times >= prepend_until_dt.timestamp()]
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: can't we just make this a sync func now?
|
||||||
async def shm_push_in_between(
|
async def shm_push_in_between(
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
to_push: np.ndarray,
|
to_push: np.ndarray,
|
||||||
|
@ -118,6 +134,10 @@ async def shm_push_in_between(
|
||||||
update_start_on_prepend: bool = False,
|
update_start_on_prepend: bool = False,
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
|
# XXX: extremely important, there can be no checkpoints
|
||||||
|
# in the body of this func to avoid entering new ``frames``
|
||||||
|
# values while we're pipelining the current ones to
|
||||||
|
# memory...
|
||||||
shm.push(
|
shm.push(
|
||||||
to_push,
|
to_push,
|
||||||
prepend=True,
|
prepend=True,
|
||||||
|
@ -138,10 +158,102 @@ async def shm_push_in_between(
|
||||||
else None
|
else None
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
# XXX: extremely important, there can be no checkpoints
|
|
||||||
# in the block above to avoid entering new ``frames``
|
|
||||||
# values while we're pipelining the current ones to
|
async def maybe_fill_null_segments(
|
||||||
# memory...
|
shm: ShmArray,
|
||||||
|
timeframe: float,
|
||||||
|
get_hist: Callable,
|
||||||
|
sampler_stream: tractor.MsgStream,
|
||||||
|
mkt: MktPair,
|
||||||
|
|
||||||
|
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> list[Frame]:
|
||||||
|
|
||||||
|
null_segs_detected = trio.Event()
|
||||||
|
task_status.started(null_segs_detected)
|
||||||
|
|
||||||
|
frame: Frame = shm.array
|
||||||
|
|
||||||
|
null_segs: tuple | None = get_null_segs(
|
||||||
|
frame,
|
||||||
|
period=timeframe,
|
||||||
|
)
|
||||||
|
for (
|
||||||
|
absi_start, absi_end,
|
||||||
|
fi_start, fi_end,
|
||||||
|
start_t, end_t,
|
||||||
|
start_dt, end_dt,
|
||||||
|
) in iter_null_segs(
|
||||||
|
null_segs=null_segs,
|
||||||
|
frame=frame,
|
||||||
|
timeframe=timeframe,
|
||||||
|
):
|
||||||
|
|
||||||
|
# XXX NOTE: ?if we get a badly ordered timestamp
|
||||||
|
# pair, immediately stop backfilling?
|
||||||
|
if (
|
||||||
|
start_dt
|
||||||
|
and end_dt < start_dt
|
||||||
|
):
|
||||||
|
await tractor.pause()
|
||||||
|
break
|
||||||
|
|
||||||
|
(
|
||||||
|
array,
|
||||||
|
next_start_dt,
|
||||||
|
next_end_dt,
|
||||||
|
) = await get_hist(
|
||||||
|
timeframe,
|
||||||
|
start_dt=start_dt,
|
||||||
|
end_dt=end_dt,
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
||||||
|
# and mnq.cme.ib this causes a Qt crash XXDDD
|
||||||
|
|
||||||
|
# make sure we don't overrun the buffer start
|
||||||
|
len_to_push: int = min(absi_end, array.size)
|
||||||
|
to_push: np.ndarray = array[-len_to_push:]
|
||||||
|
|
||||||
|
await shm_push_in_between(
|
||||||
|
shm,
|
||||||
|
to_push,
|
||||||
|
prepend_index=absi_end,
|
||||||
|
update_start_on_prepend=False,
|
||||||
|
)
|
||||||
|
# TODO: UI side needs IPC event to update..
|
||||||
|
# - make sure the UI actually always handles
|
||||||
|
# this update!
|
||||||
|
# - remember that in the display side, only refersh this
|
||||||
|
# if the respective history is actually "in view".
|
||||||
|
# loop
|
||||||
|
await sampler_stream.send({
|
||||||
|
'broadcast_all': {
|
||||||
|
|
||||||
|
# XXX NOTE XXX: see the
|
||||||
|
# `.ui._display.increment_history_view()` if block
|
||||||
|
# that looks for this info to FORCE a hard viz
|
||||||
|
# redraw!
|
||||||
|
'backfilling': (mkt.fqme, timeframe),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
null_segs_detected.set()
|
||||||
|
# RECHECK for more null-gaps
|
||||||
|
frame: Frame = shm.array
|
||||||
|
null_segs: tuple | None = get_null_segs(
|
||||||
|
frame,
|
||||||
|
period=timeframe,
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
null_segs
|
||||||
|
and
|
||||||
|
len(null_segs[-1])
|
||||||
|
):
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
array = shm.array
|
array = shm.array
|
||||||
zeros = array[array['low'] == 0]
|
zeros = array[array['low'] == 0]
|
||||||
|
|
||||||
|
@ -155,10 +267,24 @@ async def shm_push_in_between(
|
||||||
'low',
|
'low',
|
||||||
'close',
|
'close',
|
||||||
]] = shm._array[zeros['index'][0] - 1]['close']
|
]] = shm._array[zeros['index'][0] - 1]['close']
|
||||||
# await tractor.pause()
|
|
||||||
|
# TODO: interatively step through any remaining
|
||||||
|
# time-gaps/null-segments and spawn piecewise backfiller
|
||||||
|
# tasks in a nursery?
|
||||||
|
# -[ ] not sure that's going to work so well on the ib
|
||||||
|
# backend but worth a shot?
|
||||||
|
# -[ ] mk new history connections to make it properly
|
||||||
|
# parallel possible no matter the backend?
|
||||||
|
# -[ ] fill algo: do queries in alternating "latest, then
|
||||||
|
# earliest, then latest.. etc?"
|
||||||
|
# if (
|
||||||
|
# next_end_dt not in frame[
|
||||||
|
# ):
|
||||||
|
# pass
|
||||||
|
|
||||||
|
|
||||||
async def start_backfill(
|
async def start_backfill(
|
||||||
|
tn: trio.Nursery,
|
||||||
get_hist,
|
get_hist,
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
|
@ -213,27 +339,20 @@ async def start_backfill(
|
||||||
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
|
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
|
||||||
|
|
||||||
|
|
||||||
# TODO: can we drop this? without conc i don't think this
|
# STAGE NOTE: "backward history gap filling":
|
||||||
# is necessary any more?
|
# - we push to the shm buffer until we have history back
|
||||||
# configure async query throttling
|
# until the latest entry loaded from the tsdb's table B)
|
||||||
# rate = config.get('rate', 1)
|
# - after this loop continue to check for other gaps in the
|
||||||
# XXX: legacy from ``trimeter`` code but unsupported now.
|
# (tsdb) history and (at least report) maybe fill them
|
||||||
# erlangs = config.get('erlangs', 1)
|
# from new frame queries to the backend?
|
||||||
# avoid duplicate history frames with a set of datetime frame
|
|
||||||
# starts and associated counts of how many duplicates we see
|
|
||||||
# per time stamp.
|
|
||||||
# starts: Counter[datetime] = Counter()
|
|
||||||
|
|
||||||
# conduct "backward history gap filling" where we push to
|
|
||||||
# the shm buffer until we have history back until the
|
|
||||||
# latest entry loaded from the tsdb's table B)
|
|
||||||
last_start_dt: datetime = backfill_from_dt
|
last_start_dt: datetime = backfill_from_dt
|
||||||
next_prepend_index: int = backfill_from_shm_index
|
next_prepend_index: int = backfill_from_shm_index
|
||||||
|
|
||||||
while last_start_dt > backfill_until_dt:
|
while last_start_dt > backfill_until_dt:
|
||||||
|
log.info(
|
||||||
log.debug(
|
f'Requesting {timeframe}s frame:\n'
|
||||||
f'Requesting {timeframe}s frame ending in {last_start_dt}'
|
f'backfill_until_dt: {backfill_until_dt}\n'
|
||||||
|
f'last_start_dt: {last_start_dt}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -261,36 +380,18 @@ async def start_backfill(
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
return
|
return
|
||||||
|
|
||||||
# TODO: drop this? see todo above..
|
assert (
|
||||||
# if (
|
array['time'][0]
|
||||||
# next_start_dt in starts
|
==
|
||||||
# and starts[next_start_dt] <= 6
|
next_start_dt.timestamp()
|
||||||
# ):
|
)
|
||||||
# start_dt = min(starts)
|
|
||||||
# log.warning(
|
|
||||||
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
|
|
||||||
# )
|
|
||||||
# starts[start_dt] += 1
|
|
||||||
# await tractor.pause()
|
|
||||||
# continue
|
|
||||||
|
|
||||||
# elif starts[next_start_dt] > 6:
|
|
||||||
# log.warning(
|
|
||||||
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
|
|
||||||
# )
|
|
||||||
# return
|
|
||||||
|
|
||||||
# # only update new start point if not-yet-seen
|
|
||||||
# starts[next_start_dt] += 1
|
|
||||||
|
|
||||||
assert array['time'][0] == next_start_dt.timestamp()
|
|
||||||
|
|
||||||
diff = last_start_dt - next_start_dt
|
diff = last_start_dt - next_start_dt
|
||||||
frame_time_diff_s = diff.seconds
|
frame_time_diff_s = diff.seconds
|
||||||
|
|
||||||
# frame's worth of sample-period-steps, in seconds
|
# frame's worth of sample-period-steps, in seconds
|
||||||
frame_size_s = len(array) * timeframe
|
frame_size_s: float = len(array) * timeframe
|
||||||
expected_frame_size_s = frame_size_s + timeframe
|
expected_frame_size_s: float = frame_size_s + timeframe
|
||||||
if frame_time_diff_s > expected_frame_size_s:
|
if frame_time_diff_s > expected_frame_size_s:
|
||||||
|
|
||||||
# XXX: query result includes a start point prior to our
|
# XXX: query result includes a start point prior to our
|
||||||
|
@ -298,8 +399,10 @@ async def start_backfill(
|
||||||
# history gap (eg. market closed period, outage, etc.)
|
# history gap (eg. market closed period, outage, etc.)
|
||||||
# so just report it to console for now.
|
# so just report it to console for now.
|
||||||
log.warning(
|
log.warning(
|
||||||
f'History frame ending @ {last_start_dt} appears to have a gap:\n'
|
'GAP DETECTED:\n'
|
||||||
f'{diff} ~= {frame_time_diff_s} seconds'
|
f'last_start_dt: {last_start_dt}\n'
|
||||||
|
f'diff: {diff}\n'
|
||||||
|
f'frame_time_diff_s: {frame_time_diff_s}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
|
@ -415,74 +518,15 @@ async def start_backfill(
|
||||||
gaps,
|
gaps,
|
||||||
deduped,
|
deduped,
|
||||||
diff,
|
diff,
|
||||||
) = tsp.dedupe(df)
|
) = dedupe(df)
|
||||||
if diff:
|
if diff:
|
||||||
tsp.sort_diff(df)
|
sort_diff(df)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# finally filled gap
|
# finally filled gap
|
||||||
log.info(
|
log.info(
|
||||||
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
|
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
|
||||||
)
|
)
|
||||||
# conduct tsdb timestamp gap detection and backfill any
|
|
||||||
# seemingly missing sequence segments..
|
|
||||||
# TODO: ideally these never exist but somehow it seems
|
|
||||||
# sometimes we're writing zero-ed segments on certain
|
|
||||||
# (teardown) cases?
|
|
||||||
from .tsp import detect_null_time_gap
|
|
||||||
|
|
||||||
gap_indices: tuple | None = detect_null_time_gap(shm)
|
|
||||||
while gap_indices:
|
|
||||||
(
|
|
||||||
istart,
|
|
||||||
start,
|
|
||||||
end,
|
|
||||||
iend,
|
|
||||||
) = gap_indices
|
|
||||||
|
|
||||||
start_dt = from_timestamp(start)
|
|
||||||
end_dt = from_timestamp(end)
|
|
||||||
|
|
||||||
# if we get a baddly ordered timestamp
|
|
||||||
# pair, imeeditately stop backfilling.
|
|
||||||
if end_dt < start_dt:
|
|
||||||
break
|
|
||||||
|
|
||||||
(
|
|
||||||
array,
|
|
||||||
next_start_dt,
|
|
||||||
next_end_dt,
|
|
||||||
) = await get_hist(
|
|
||||||
timeframe,
|
|
||||||
start_dt=start_dt,
|
|
||||||
end_dt=end_dt,
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
|
||||||
# and mnq.cme.ib this causes a Qt crash XXDDD
|
|
||||||
|
|
||||||
# make sure we don't overrun the buffer start
|
|
||||||
len_to_push: int = min(iend, array.size)
|
|
||||||
to_push: np.ndarray = array[-len_to_push:]
|
|
||||||
await shm_push_in_between(
|
|
||||||
shm,
|
|
||||||
to_push,
|
|
||||||
prepend_index=iend,
|
|
||||||
update_start_on_prepend=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: UI side needs IPC event to update..
|
|
||||||
# - make sure the UI actually always handles
|
|
||||||
# this update!
|
|
||||||
# - remember that in the display side, only refersh this
|
|
||||||
# if the respective history is actually "in view".
|
|
||||||
# loop
|
|
||||||
await sampler_stream.send({
|
|
||||||
'broadcast_all': {
|
|
||||||
'backfilling': (mkt.fqme, timeframe),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
gap_indices: tuple | None = detect_null_time_gap(shm)
|
|
||||||
|
|
||||||
# XXX: extremely important, there can be no checkpoints
|
# XXX: extremely important, there can be no checkpoints
|
||||||
# in the block above to avoid entering new ``frames``
|
# in the block above to avoid entering new ``frames``
|
||||||
|
@ -494,6 +538,12 @@ async def start_backfill(
|
||||||
bf_done.set()
|
bf_done.set()
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE: originally this was used to cope with a tsdb (marketstore)
|
||||||
|
# which could not delivery very large frames of history over gRPC
|
||||||
|
# (thanks goolag) due to corruption issues. NOW, using apache
|
||||||
|
# parquet (by default in the local filesys) we don't have this
|
||||||
|
# requirement since the files can be loaded very quickly in
|
||||||
|
# entirety to memory via
|
||||||
async def back_load_from_tsdb(
|
async def back_load_from_tsdb(
|
||||||
storemod: ModuleType,
|
storemod: ModuleType,
|
||||||
storage: StorageClient,
|
storage: StorageClient,
|
||||||
|
@ -631,10 +681,94 @@ async def back_load_from_tsdb(
|
||||||
# await sampler_stream.send('broadcast_all')
|
# await sampler_stream.send('broadcast_all')
|
||||||
|
|
||||||
|
|
||||||
|
async def push_latest_frame(
|
||||||
|
dt_eps: list[DateTime, DateTime],
|
||||||
|
shm: ShmArray,
|
||||||
|
get_hist: Callable[
|
||||||
|
[int, datetime, datetime],
|
||||||
|
tuple[np.ndarray, str]
|
||||||
|
],
|
||||||
|
timeframe: float,
|
||||||
|
config: dict,
|
||||||
|
|
||||||
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
# get latest query's worth of history all the way
|
||||||
|
# back to what is recorded in the tsdb
|
||||||
|
try:
|
||||||
|
(
|
||||||
|
array,
|
||||||
|
mr_start_dt,
|
||||||
|
mr_end_dt,
|
||||||
|
) = await get_hist(
|
||||||
|
timeframe,
|
||||||
|
end_dt=None,
|
||||||
|
)
|
||||||
|
# so caller can access these ep values
|
||||||
|
dt_eps.extend([
|
||||||
|
mr_start_dt,
|
||||||
|
mr_end_dt,
|
||||||
|
])
|
||||||
|
|
||||||
|
# XXX: timeframe not supported for backend (since
|
||||||
|
# above exception type), terminate immediately since
|
||||||
|
# there's no backfilling possible.
|
||||||
|
except DataUnavailable:
|
||||||
|
task_status.started()
|
||||||
|
|
||||||
|
if timeframe > 1:
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
# NOTE: on the first history, most recent history
|
||||||
|
# frame we PREPEND from the current shm ._last index
|
||||||
|
# and thus a gap between the earliest datum loaded here
|
||||||
|
# and the latest loaded from the tsdb may exist!
|
||||||
|
log.info(f'Pushing {array.size} to shm!')
|
||||||
|
shm.push(
|
||||||
|
array,
|
||||||
|
prepend=True, # append on first frame
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def load_tsdb_hist(
|
||||||
|
storage: StorageClient,
|
||||||
|
mkt: MktPair,
|
||||||
|
timeframe: float,
|
||||||
|
) -> tuple[
|
||||||
|
np.ndarray,
|
||||||
|
DateTime,
|
||||||
|
DateTime,
|
||||||
|
] | None:
|
||||||
|
# loads a (large) frame of data from the tsdb depending
|
||||||
|
# on the db's query size limit; our "nativedb" (using
|
||||||
|
# parquet) generally can load the entire history into mem
|
||||||
|
# but if not then below the remaining history can be lazy
|
||||||
|
# loaded?
|
||||||
|
fqme: str = mkt.fqme
|
||||||
|
tsdb_entry: tuple[
|
||||||
|
np.ndarray,
|
||||||
|
DateTime,
|
||||||
|
DateTime,
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
tsdb_entry: tuple | None = await storage.load(
|
||||||
|
fqme,
|
||||||
|
timeframe=timeframe,
|
||||||
|
)
|
||||||
|
return tsdb_entry
|
||||||
|
|
||||||
|
except TimeseriesNotFound:
|
||||||
|
log.warning(
|
||||||
|
f'No timeseries yet for {timeframe}@{fqme}'
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def tsdb_backfill(
|
async def tsdb_backfill(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
storemod: ModuleType,
|
storemod: ModuleType,
|
||||||
tn: trio.Nursery,
|
|
||||||
|
|
||||||
storage: StorageClient,
|
storage: StorageClient,
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
|
@ -649,192 +783,193 @@ async def tsdb_backfill(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
if timeframe not in (1, 60):
|
||||||
|
raise ValueError(
|
||||||
|
'`piker` only needs to support 1m and 1s sampling '
|
||||||
|
'but ur api is trying to deliver a longer '
|
||||||
|
f'timeframe of {timeframe} seconds..\n'
|
||||||
|
'So yuh.. dun do dat brudder.'
|
||||||
|
)
|
||||||
|
|
||||||
get_hist: Callable[
|
get_hist: Callable[
|
||||||
[int, datetime, datetime],
|
[int, datetime, datetime],
|
||||||
tuple[np.ndarray, str]
|
tuple[np.ndarray, str]
|
||||||
]
|
]
|
||||||
config: dict[str, int]
|
config: dict[str, int]
|
||||||
async with mod.open_history_client(
|
async with (
|
||||||
mkt,
|
mod.open_history_client(
|
||||||
) as (get_hist, config):
|
mkt,
|
||||||
|
) as (get_hist, config),
|
||||||
|
|
||||||
|
# NOTE: this sub-nursery splits to tasks for the given
|
||||||
|
# sampling rate to concurrently load offline tsdb
|
||||||
|
# timeseries as well as new data from the venue backend!
|
||||||
|
):
|
||||||
log.info(
|
log.info(
|
||||||
f'`{mod}` history client returned backfill config:\n'
|
f'`{mod}` history client returned backfill config:\n'
|
||||||
f'{config}\n'
|
f'{config}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# get latest query's worth of history all the way
|
dt_eps: list[DateTime, DateTime] = []
|
||||||
# back to what is recorded in the tsdb
|
async with trio.open_nursery() as tn:
|
||||||
try:
|
tn.start_soon(
|
||||||
(
|
push_latest_frame,
|
||||||
array,
|
dt_eps,
|
||||||
mr_start_dt,
|
shm,
|
||||||
mr_end_dt,
|
get_hist,
|
||||||
) = await get_hist(
|
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=None,
|
config,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: timeframe not supported for backend (since
|
# tell parent task to continue
|
||||||
# above exception type), terminate immediately since
|
# TODO: really we'd want this the other way with the
|
||||||
# there's no backfilling possible.
|
# tsdb load happening asap and the since the latest
|
||||||
except DataUnavailable:
|
# frame query will normally be the main source of
|
||||||
|
# latency?
|
||||||
task_status.started()
|
task_status.started()
|
||||||
await tractor.pause()
|
|
||||||
return
|
|
||||||
|
|
||||||
# TODO: fill in non-zero epoch time values ALWAYS!
|
tsdb_entry: tuple = await load_tsdb_hist(
|
||||||
# hist_shm._array['time'] = np.arange(
|
storage,
|
||||||
# start=
|
mkt,
|
||||||
|
timeframe,
|
||||||
|
)
|
||||||
|
|
||||||
# NOTE: removed for now since it'll always break
|
# NOTE: iabs to start backfilling from, reverse chronological,
|
||||||
# on the first 60s of the venue open..
|
# ONLY AFTER the first history frame has been pushed to
|
||||||
# times: np.ndarray = array['time']
|
# mem!
|
||||||
# # sample period step size in seconds
|
|
||||||
# step_size_s = (
|
|
||||||
# from_timestamp(times[-1])
|
|
||||||
# - from_timestamp(times[-2])
|
|
||||||
# ).seconds
|
|
||||||
|
|
||||||
# if step_size_s not in (1, 60):
|
|
||||||
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
|
|
||||||
# step_size_s = (
|
|
||||||
# from_timestamp(times[-2])
|
|
||||||
# - from_timestamp(times[-3])
|
|
||||||
# ).seconds
|
|
||||||
|
|
||||||
# NOTE: on the first history, most recent history
|
|
||||||
# frame we PREPEND from the current shm ._last index
|
|
||||||
# and thus a gap between the earliest datum loaded here
|
|
||||||
# and the latest loaded from the tsdb may exist!
|
|
||||||
log.info(f'Pushing {array.size} to shm!')
|
|
||||||
shm.push(
|
|
||||||
array,
|
|
||||||
prepend=True, # append on first frame
|
|
||||||
)
|
|
||||||
backfill_gap_from_shm_index: int = shm._first.value + 1
|
backfill_gap_from_shm_index: int = shm._first.value + 1
|
||||||
|
|
||||||
# tell parent task to continue
|
(
|
||||||
task_status.started()
|
mr_start_dt,
|
||||||
|
mr_end_dt,
|
||||||
|
) = dt_eps
|
||||||
|
|
||||||
# loads a (large) frame of data from the tsdb depending
|
async with trio.open_nursery() as tn:
|
||||||
# on the db's query size limit; our "nativedb" (using
|
|
||||||
# parquet) generally can load the entire history into mem
|
|
||||||
# but if not then below the remaining history can be lazy
|
|
||||||
# loaded?
|
|
||||||
fqme: str = mkt.fqme
|
|
||||||
last_tsdb_dt: datetime | None = None
|
|
||||||
try:
|
|
||||||
tsdb_entry: tuple | None = await storage.load(
|
|
||||||
fqme,
|
|
||||||
timeframe=timeframe,
|
|
||||||
)
|
|
||||||
except TimeseriesNotFound:
|
|
||||||
log.warning(
|
|
||||||
f'No timeseries yet for {timeframe}@{fqme}'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
(
|
|
||||||
tsdb_history,
|
|
||||||
first_tsdb_dt,
|
|
||||||
last_tsdb_dt,
|
|
||||||
) = tsdb_entry
|
|
||||||
|
|
||||||
# calc the index from which the tsdb data should be
|
# Prepend any tsdb history to the shm buffer which should
|
||||||
# prepended, presuming there is a gap between the
|
# now be full of the most recent history pulled from the
|
||||||
# latest frame (loaded/read above) and the latest
|
# backend's last frame.
|
||||||
# sample loaded from the tsdb.
|
if tsdb_entry:
|
||||||
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
(
|
||||||
offset_s: float = backfill_diff.in_seconds()
|
tsdb_history,
|
||||||
|
first_tsdb_dt,
|
||||||
|
last_tsdb_dt,
|
||||||
|
) = tsdb_entry
|
||||||
|
|
||||||
# XXX EDGE CASEs: the most recent frame overlaps with
|
# if there is a gap to backfill from the first
|
||||||
# prior tsdb history!!
|
# history frame until the last datum loaded from the tsdb
|
||||||
# - so the latest frame's start time is earlier then
|
# continue that now in the background
|
||||||
# the tsdb's latest sample.
|
bf_done = await tn.start(
|
||||||
# - alternatively this may also more generally occur
|
partial(
|
||||||
# when the venue was closed (say over the weeknd)
|
start_backfill,
|
||||||
# causing a timeseries gap, AND the query frames size
|
tn=tn,
|
||||||
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
get_hist=get_hist,
|
||||||
# GREATER THAN the current venue-market's operating
|
mod=mod,
|
||||||
# session (time) we will receive datums from BEFORE THE
|
mkt=mkt,
|
||||||
# CLOSURE GAP and thus the `offset_s` value will be
|
shm=shm,
|
||||||
# NEGATIVE! In this case we need to ensure we don't try
|
timeframe=timeframe,
|
||||||
# to push datums that have already been recorded in the
|
|
||||||
# tsdb. In this case we instead only retreive and push
|
|
||||||
# the series portion missing from the db's data set.
|
|
||||||
# if offset_s < 0:
|
|
||||||
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
|
||||||
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
|
||||||
|
|
||||||
offset_samples: int = round(offset_s / timeframe)
|
backfill_from_shm_index=backfill_gap_from_shm_index,
|
||||||
|
backfill_from_dt=mr_start_dt,
|
||||||
# TODO: see if there's faster multi-field reads:
|
sampler_stream=sampler_stream,
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
backfill_until_dt=last_tsdb_dt,
|
||||||
# re-index with a `time` and index field
|
storage=storage,
|
||||||
if offset_s > 0:
|
write_tsdb=True,
|
||||||
# NOTE XXX: ONLY when there is an actual gap
|
)
|
||||||
# between the earliest sample in the latest history
|
|
||||||
# frame do we want to NOT stick the latest tsdb
|
|
||||||
# history adjacent to that latest frame!
|
|
||||||
prepend_start = shm._first.value - offset_samples + 1
|
|
||||||
to_push = tsdb_history[-prepend_start:]
|
|
||||||
else:
|
|
||||||
# when there is overlap we want to remove the
|
|
||||||
# overlapping samples from the tsdb portion (taking
|
|
||||||
# instead the latest frame's values since THEY
|
|
||||||
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
|
||||||
# to the latest frame!
|
|
||||||
# TODO: assert the overlap segment array contains
|
|
||||||
# the same values!?!
|
|
||||||
prepend_start = shm._first.value
|
|
||||||
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
|
||||||
|
|
||||||
# tsdb history is so far in the past we can't fit it in
|
|
||||||
# shm buffer space so simply don't load it!
|
|
||||||
if prepend_start > 0:
|
|
||||||
shm.push(
|
|
||||||
to_push,
|
|
||||||
|
|
||||||
# insert the history pre a "days worth" of samples
|
|
||||||
# to leave some real-time buffer space at the end.
|
|
||||||
prepend=True,
|
|
||||||
# update_first=False,
|
|
||||||
start=prepend_start,
|
|
||||||
field_map=storemod.ohlc_key_map,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(f'Loaded {to_push.shape} datums from storage')
|
# calc the index from which the tsdb data should be
|
||||||
|
# prepended, presuming there is a gap between the
|
||||||
|
# latest frame (loaded/read above) and the latest
|
||||||
|
# sample loaded from the tsdb.
|
||||||
|
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
||||||
|
offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
|
# XXX EDGE CASEs: the most recent frame overlaps with
|
||||||
|
# prior tsdb history!!
|
||||||
|
# - so the latest frame's start time is earlier then
|
||||||
|
# the tsdb's latest sample.
|
||||||
|
# - alternatively this may also more generally occur
|
||||||
|
# when the venue was closed (say over the weeknd)
|
||||||
|
# causing a timeseries gap, AND the query frames size
|
||||||
|
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
||||||
|
# GREATER THAN the current venue-market's operating
|
||||||
|
# session (time) we will receive datums from BEFORE THE
|
||||||
|
# CLOSURE GAP and thus the `offset_s` value will be
|
||||||
|
# NEGATIVE! In this case we need to ensure we don't try
|
||||||
|
# to push datums that have already been recorded in the
|
||||||
|
# tsdb. In this case we instead only retreive and push
|
||||||
|
# the series portion missing from the db's data set.
|
||||||
|
# if offset_s < 0:
|
||||||
|
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||||
|
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
|
offset_samples: int = round(offset_s / timeframe)
|
||||||
|
|
||||||
|
# TODO: see if there's faster multi-field reads:
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
|
# re-index with a `time` and index field
|
||||||
|
if offset_s > 0:
|
||||||
|
# NOTE XXX: ONLY when there is an actual gap
|
||||||
|
# between the earliest sample in the latest history
|
||||||
|
# frame do we want to NOT stick the latest tsdb
|
||||||
|
# history adjacent to that latest frame!
|
||||||
|
prepend_start = shm._first.value - offset_samples + 1
|
||||||
|
to_push = tsdb_history[-prepend_start:]
|
||||||
|
else:
|
||||||
|
# when there is overlap we want to remove the
|
||||||
|
# overlapping samples from the tsdb portion (taking
|
||||||
|
# instead the latest frame's values since THEY
|
||||||
|
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
||||||
|
# to the latest frame!
|
||||||
|
# TODO: assert the overlap segment array contains
|
||||||
|
# the same values!?!
|
||||||
|
prepend_start = shm._first.value
|
||||||
|
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
||||||
|
|
||||||
|
# tsdb history is so far in the past we can't fit it in
|
||||||
|
# shm buffer space so simply don't load it!
|
||||||
|
if prepend_start > 0:
|
||||||
|
shm.push(
|
||||||
|
to_push,
|
||||||
|
|
||||||
|
# insert the history pre a "days worth" of samples
|
||||||
|
# to leave some real-time buffer space at the end.
|
||||||
|
prepend=True,
|
||||||
|
# update_first=False,
|
||||||
|
start=prepend_start,
|
||||||
|
field_map=storemod.ohlc_key_map,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f'Loaded {to_push.shape} datums from storage')
|
||||||
|
|
||||||
|
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
|
||||||
|
# seemingly missing (null-time) segments..
|
||||||
|
# TODO: ideally these can never exist!
|
||||||
|
# -[ ] somehow it seems sometimes we're writing zero-ed
|
||||||
|
# segments to tsdbs during teardown?
|
||||||
|
# -[ ] can we ensure that the backcfiller tasks do this
|
||||||
|
# work PREVENTAVELY instead?
|
||||||
|
# -[ ] fill in non-zero epoch time values ALWAYS!
|
||||||
|
# await maybe_fill_null_segments(
|
||||||
|
nulls_detected: trio.Event = await tn.start(partial(
|
||||||
|
maybe_fill_null_segments,
|
||||||
|
|
||||||
|
shm=shm,
|
||||||
|
timeframe=timeframe,
|
||||||
|
get_hist=get_hist,
|
||||||
|
sampler_stream=sampler_stream,
|
||||||
|
mkt=mkt,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: who would want to?
|
||||||
|
await nulls_detected.wait()
|
||||||
|
|
||||||
|
await bf_done.wait()
|
||||||
# TODO: maybe start history anal and load missing "history
|
# TODO: maybe start history anal and load missing "history
|
||||||
# gaps" via backend..
|
# gaps" via backend..
|
||||||
|
|
||||||
if timeframe not in (1, 60):
|
|
||||||
raise ValueError(
|
|
||||||
'`piker` only needs to support 1m and 1s sampling '
|
|
||||||
'but ur api is trying to deliver a longer '
|
|
||||||
f'timeframe of {timeframe} seconds..\n'
|
|
||||||
'So yuh.. dun do dat brudder.'
|
|
||||||
)
|
|
||||||
|
|
||||||
# if there is a gap to backfill from the first
|
|
||||||
# history frame until the last datum loaded from the tsdb
|
|
||||||
# continue that now in the background
|
|
||||||
bf_done = await tn.start(
|
|
||||||
partial(
|
|
||||||
start_backfill,
|
|
||||||
get_hist=get_hist,
|
|
||||||
mod=mod,
|
|
||||||
mkt=mkt,
|
|
||||||
shm=shm,
|
|
||||||
timeframe=timeframe,
|
|
||||||
backfill_from_shm_index=backfill_gap_from_shm_index,
|
|
||||||
backfill_from_dt=mr_start_dt,
|
|
||||||
sampler_stream=sampler_stream,
|
|
||||||
backfill_until_dt=last_tsdb_dt,
|
|
||||||
storage=storage,
|
|
||||||
write_tsdb=True,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# if len(hist_shm.array) < 2:
|
# if len(hist_shm.array) < 2:
|
||||||
# TODO: there's an edge case here to solve where if the last
|
# TODO: there's an edge case here to solve where if the last
|
||||||
# frame before market close (at least on ib) was pushed and
|
# frame before market close (at least on ib) was pushed and
|
||||||
|
@ -848,32 +983,29 @@ async def tsdb_backfill(
|
||||||
# backload any further data from tsdb (concurrently per
|
# backload any further data from tsdb (concurrently per
|
||||||
# timeframe) if not all data was able to be loaded (in memory)
|
# timeframe) if not all data was able to be loaded (in memory)
|
||||||
# from the ``StorageClient.load()`` call above.
|
# from the ``StorageClient.load()`` call above.
|
||||||
try:
|
await trio.sleep_forever()
|
||||||
await trio.sleep_forever()
|
|
||||||
finally:
|
|
||||||
return
|
|
||||||
|
|
||||||
# XXX NOTE: this is legacy from when we were using
|
# XXX NOTE: this is legacy from when we were using
|
||||||
# marketstore and we needed to continue backloading
|
# marketstore and we needed to continue backloading
|
||||||
# incrementally from the tsdb client.. (bc it couldn't
|
# incrementally from the tsdb client.. (bc it couldn't
|
||||||
# handle a single large query with gRPC for some
|
# handle a single large query with gRPC for some
|
||||||
# reason.. classic goolag pos)
|
# reason.. classic goolag pos)
|
||||||
tn.start_soon(
|
# tn.start_soon(
|
||||||
back_load_from_tsdb,
|
# back_load_from_tsdb,
|
||||||
|
|
||||||
storemod,
|
# storemod,
|
||||||
storage,
|
# storage,
|
||||||
fqme,
|
# fqme,
|
||||||
|
|
||||||
tsdb_history,
|
# tsdb_history,
|
||||||
last_tsdb_dt,
|
# last_tsdb_dt,
|
||||||
mr_start_dt,
|
# mr_start_dt,
|
||||||
mr_end_dt,
|
# mr_end_dt,
|
||||||
bf_done,
|
# bf_done,
|
||||||
|
|
||||||
timeframe,
|
# timeframe,
|
||||||
shm,
|
# shm,
|
||||||
)
|
# )
|
||||||
|
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
|
@ -981,6 +1113,11 @@ async def manage_history(
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
storage.open_storage_client() as (storemod, client),
|
storage.open_storage_client() as (storemod, client),
|
||||||
|
|
||||||
|
# NOTE: this nursery spawns a task per "timeframe" (aka
|
||||||
|
# sampling period) data set since normally differently
|
||||||
|
# sampled timeseries can be loaded / process independently
|
||||||
|
# ;)
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -1030,7 +1167,6 @@ async def manage_history(
|
||||||
tsdb_backfill,
|
tsdb_backfill,
|
||||||
mod=mod,
|
mod=mod,
|
||||||
storemod=storemod,
|
storemod=storemod,
|
||||||
tn=tn,
|
|
||||||
# bus,
|
# bus,
|
||||||
storage=client,
|
storage=client,
|
||||||
mkt=mkt,
|
mkt=mkt,
|
||||||
|
@ -1059,3 +1195,70 @@ async def manage_history(
|
||||||
# and thus a small RPC-prot for remotely controllinlg
|
# and thus a small RPC-prot for remotely controllinlg
|
||||||
# what data is loaded for viewing.
|
# what data is loaded for viewing.
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
def iter_dfs_from_shms(
|
||||||
|
fqme: str
|
||||||
|
) -> Generator[
|
||||||
|
tuple[Path, ShmArray, pl.DataFrame],
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
]:
|
||||||
|
# shm buffer size table based on known sample rates
|
||||||
|
sizes: dict[str, int] = {
|
||||||
|
'hist': _default_hist_size,
|
||||||
|
'rt': _default_rt_size,
|
||||||
|
}
|
||||||
|
|
||||||
|
# load all detected shm buffer files which have the
|
||||||
|
# passed FQME pattern in the file name.
|
||||||
|
shmfiles: list[Path] = []
|
||||||
|
shmdir = Path('/dev/shm/')
|
||||||
|
|
||||||
|
for shmfile in shmdir.glob(f'*{fqme}*'):
|
||||||
|
filename: str = shmfile.name
|
||||||
|
|
||||||
|
# skip index files
|
||||||
|
if (
|
||||||
|
'_first' in filename
|
||||||
|
or '_last' in filename
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
assert shmfile.is_file()
|
||||||
|
log.debug(f'Found matching shm buffer file: {filename}')
|
||||||
|
shmfiles.append(shmfile)
|
||||||
|
|
||||||
|
for shmfile in shmfiles:
|
||||||
|
|
||||||
|
# lookup array buffer size based on file suffix
|
||||||
|
# being either .rt or .hist
|
||||||
|
key: str = shmfile.name.rsplit('.')[-1]
|
||||||
|
|
||||||
|
# skip FSP buffers for now..
|
||||||
|
if key not in sizes:
|
||||||
|
continue
|
||||||
|
|
||||||
|
size: int = sizes[key]
|
||||||
|
|
||||||
|
# attach to any shm buffer, load array into polars df,
|
||||||
|
# write to local parquet file.
|
||||||
|
shm, opened = maybe_open_shm_array(
|
||||||
|
key=shmfile.name,
|
||||||
|
size=size,
|
||||||
|
dtype=def_iohlcv_fields,
|
||||||
|
readonly=True,
|
||||||
|
)
|
||||||
|
assert not opened
|
||||||
|
ohlcv = shm.array
|
||||||
|
|
||||||
|
from ..data import tsp
|
||||||
|
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
||||||
|
|
||||||
|
yield (
|
||||||
|
shmfile,
|
||||||
|
shm,
|
||||||
|
df,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -29,12 +29,19 @@ from math import (
|
||||||
floor,
|
floor,
|
||||||
)
|
)
|
||||||
import time
|
import time
|
||||||
from typing import Literal
|
from typing import (
|
||||||
|
Literal,
|
||||||
|
# AsyncGenerator,
|
||||||
|
Generator,
|
||||||
|
)
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import polars as pl
|
import polars as pl
|
||||||
|
from pendulum import (
|
||||||
|
DateTime,
|
||||||
|
from_timestamp,
|
||||||
|
)
|
||||||
|
|
||||||
from ._sharedmem import ShmArray
|
|
||||||
from ..toolz.profile import (
|
from ..toolz.profile import (
|
||||||
Profiler,
|
Profiler,
|
||||||
pg_profile_enabled,
|
pg_profile_enabled,
|
||||||
|
@ -53,6 +60,14 @@ get_console_log = partial(
|
||||||
name=subsys,
|
name=subsys,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# NOTE: union type-defs to handle generic `numpy` and `polars` types
|
||||||
|
# side-by-side Bo
|
||||||
|
# |_ TODO: schema spec typing?
|
||||||
|
# -[ ] nptyping!
|
||||||
|
# -[ ] wtv we can with polars?
|
||||||
|
Frame = pl.DataFrame | np.ndarray
|
||||||
|
Seq = pl.Series | np.ndarray
|
||||||
|
|
||||||
|
|
||||||
def slice_from_time(
|
def slice_from_time(
|
||||||
arr: np.ndarray,
|
arr: np.ndarray,
|
||||||
|
@ -209,51 +224,282 @@ def slice_from_time(
|
||||||
return read_slc
|
return read_slc
|
||||||
|
|
||||||
|
|
||||||
def detect_null_time_gap(
|
def get_null_segs(
|
||||||
shm: ShmArray,
|
frame: Frame,
|
||||||
|
period: float, # sampling step in seconds
|
||||||
imargin: int = 1,
|
imargin: int = 1,
|
||||||
|
col: str = 'time',
|
||||||
|
|
||||||
) -> tuple[float, float] | None:
|
) -> tuple[
|
||||||
|
# Seq, # TODO: can we make it an array-type instead?
|
||||||
|
list[
|
||||||
|
list[int, int],
|
||||||
|
],
|
||||||
|
Seq,
|
||||||
|
Frame
|
||||||
|
] | None:
|
||||||
'''
|
'''
|
||||||
Detect if there are any zero-epoch stamped rows in
|
Detect if there are any zero(-epoch stamped) valued
|
||||||
the presumed 'time' field-column.
|
rows in for the provided `col: str` column; by default
|
||||||
|
presume the 'time' field/column.
|
||||||
|
|
||||||
Filter to the gap and return a surrounding index range.
|
Filter to all such zero (time) segments and return
|
||||||
|
the corresponding frame zeroed segment's,
|
||||||
|
|
||||||
NOTE: for now presumes only ONE gap XD
|
- gap absolute (in buffer terms) indices-endpoints as
|
||||||
|
`absi_zsegs`
|
||||||
|
- abs indices of all rows with zeroed `col` values as `absi_zeros`
|
||||||
|
- the corresponding frame's row-entries (view) which are
|
||||||
|
zeroed for the `col` as `zero_t`
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# ensure we read buffer state only once so that ShmArray rt
|
times: Seq = frame['time']
|
||||||
# circular-buffer updates don't cause a indexing/size mismatch.
|
zero_pred: Seq = (times == 0)
|
||||||
array: np.ndarray = shm.array
|
|
||||||
|
|
||||||
zero_pred: np.ndarray = array['time'] == 0
|
if isinstance(frame, np.ndarray):
|
||||||
zero_t: np.ndarray = array[zero_pred]
|
tis_zeros: int = zero_pred.any()
|
||||||
|
else:
|
||||||
|
tis_zeros: int = zero_pred.any()
|
||||||
|
|
||||||
if zero_t.size:
|
if not tis_zeros:
|
||||||
istart, iend = zero_t['index'][[0, -1]]
|
return None
|
||||||
start, end = shm._array['time'][
|
|
||||||
[istart - imargin, iend + imargin]
|
# TODO: use ndarray for this?!
|
||||||
]
|
absi_zsegs: list[list[int, int]] = []
|
||||||
return (
|
|
||||||
istart - imargin,
|
if isinstance(frame, np.ndarray):
|
||||||
start,
|
# view of ONLY the zero segments as one continuous chunk
|
||||||
end,
|
zero_t: np.ndarray = frame[zero_pred]
|
||||||
iend + imargin,
|
# abs indices of said zeroed rows
|
||||||
|
absi_zeros = zero_t['index']
|
||||||
|
# diff of abs index steps between each zeroed row
|
||||||
|
absi_zdiff: np.ndarray = np.diff(absi_zeros)
|
||||||
|
|
||||||
|
# scan for all frame-indices where the
|
||||||
|
# zeroed-row-abs-index-step-diff is greater then the
|
||||||
|
# expected increment of 1.
|
||||||
|
# data 1st zero seg data zeros
|
||||||
|
# ---- ------------ ---- ----- ------ ----
|
||||||
|
# ||||..000000000000..||||..00000..||||||..0000
|
||||||
|
# ---- ------------ ---- ----- ------ ----
|
||||||
|
# ^zero_t[0] ^zero_t[-1]
|
||||||
|
# ^fi_zgaps[0] ^fi_zgaps[1]
|
||||||
|
# ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple
|
||||||
|
# absi_zsegs[0][1]^
|
||||||
|
#
|
||||||
|
# NOTE: the first entry in `fi_zgaps` is where
|
||||||
|
# the first (absolute) index step diff is > 1.
|
||||||
|
# and it is a frame-relative index into `zero_t`.
|
||||||
|
fi_zgaps = np.argwhere(
|
||||||
|
absi_zdiff > 1
|
||||||
|
# NOTE: +1 here is ensure we index to the "start" of each
|
||||||
|
# segment (if we didn't the below loop needs to be
|
||||||
|
# re-written to expect `fi_end_rows`!
|
||||||
|
) + 1
|
||||||
|
# the rows from the contiguous zeroed segments which have
|
||||||
|
# abs-index steps >1 compared to the previous zero row
|
||||||
|
# (indicating an end of zeroed segment).
|
||||||
|
fi_zseg_start_rows = zero_t[fi_zgaps]
|
||||||
|
|
||||||
|
# TODO: equiv for pl.DataFrame case!
|
||||||
|
else:
|
||||||
|
izeros: pl.Series = zero_pred.arg_true()
|
||||||
|
zero_t: pl.DataFrame = frame[izeros]
|
||||||
|
|
||||||
|
absi_zeros = zero_t['index']
|
||||||
|
absi_zdiff: pl.Series = absi_zeros.diff()
|
||||||
|
fi_zgaps = (absi_zdiff > 1).arg_true()
|
||||||
|
|
||||||
|
# XXX: our goal (in this func) is to select out slice index
|
||||||
|
# pairs (zseg0_start, zseg_end) in abs index units for each
|
||||||
|
# null-segment portion detected throughout entire input frame.
|
||||||
|
|
||||||
|
# only up to one null-segment in entire frame?
|
||||||
|
num_gaps: int = fi_zgaps.size + 1
|
||||||
|
if num_gaps < 1:
|
||||||
|
if absi_zeros.size > 1:
|
||||||
|
absi_zsegs = [[
|
||||||
|
# see `get_hist()` in backend, should ALWAYS be
|
||||||
|
# able to handle a `start_dt=None`!
|
||||||
|
# None,
|
||||||
|
max(
|
||||||
|
absi_zeros[0] - 1,
|
||||||
|
0,
|
||||||
|
),
|
||||||
|
# NOTE: need the + 1 to guarantee we index "up to"
|
||||||
|
# the next non-null row-datum.
|
||||||
|
min(
|
||||||
|
absi_zeros[-1] + 1,
|
||||||
|
frame['index'][-1],
|
||||||
|
),
|
||||||
|
]]
|
||||||
|
else:
|
||||||
|
# XXX EDGE CASE: only one null-datum found so
|
||||||
|
# mark the start abs index as None to trigger
|
||||||
|
# a full frame-len query to the respective backend?
|
||||||
|
absi_zsegs = [[
|
||||||
|
# see `get_hist()` in backend, should ALWAYS be
|
||||||
|
# able to handle a `start_dt=None`!
|
||||||
|
# None,
|
||||||
|
None,
|
||||||
|
absi_zeros[0] + 1,
|
||||||
|
]]
|
||||||
|
|
||||||
|
# XXX NOTE XXX: if >= 2 zeroed segments are found, there should
|
||||||
|
# ALWAYS be more then one zero-segment-abs-index-step-diff row
|
||||||
|
# in `absi_zdiff`, so loop through all such
|
||||||
|
# abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`)
|
||||||
|
# and add them as the "end index" entries for each segment.
|
||||||
|
# Then, iif NOT iterating the first such segment end, look back
|
||||||
|
# for the prior segments zero-segment start indext by relative
|
||||||
|
# indexing the `zero_t` frame by -1 and grabbing the abs index
|
||||||
|
# of what should be the prior zero-segment abs start index.
|
||||||
|
else:
|
||||||
|
# NOTE: since `absi_zdiff` will never have a row
|
||||||
|
# corresponding to the first zero-segment's row, we add it
|
||||||
|
# manually here.
|
||||||
|
absi_zsegs.append([
|
||||||
|
absi_zeros[0] - 1,
|
||||||
|
None,
|
||||||
|
])
|
||||||
|
|
||||||
|
# TODO: can we do it with vec ops?
|
||||||
|
for i, (
|
||||||
|
fi, # frame index of zero-seg start
|
||||||
|
zseg_start_row, # full row for ^
|
||||||
|
) in enumerate(zip(
|
||||||
|
fi_zgaps,
|
||||||
|
fi_zseg_start_rows,
|
||||||
|
)):
|
||||||
|
assert (zseg_start_row == zero_t[fi]).all()
|
||||||
|
iabs: int = zseg_start_row['index'][0]
|
||||||
|
absi_zsegs.append([
|
||||||
|
iabs - 1,
|
||||||
|
None, # backfilled on next iter
|
||||||
|
])
|
||||||
|
|
||||||
|
# row = zero_t[fi]
|
||||||
|
# absi_pre_zseg = row['index'][0] - 1
|
||||||
|
# absi_pre_zseg = absi - 1
|
||||||
|
|
||||||
|
# final iter case, backfill FINAL end iabs!
|
||||||
|
if (i + 1) == fi_zgaps.size:
|
||||||
|
absi_zsegs[-1][1] = absi_zeros[-1] + 1
|
||||||
|
|
||||||
|
# NOTE: only after the first segment (due to `.diff()`
|
||||||
|
# usage above) can we do a lookback to the prior
|
||||||
|
# segment's end row and determine it's abs index to
|
||||||
|
# retroactively insert to the prior
|
||||||
|
# `absi_zsegs[i-1][1]` entry Bo
|
||||||
|
last_end: int = absi_zsegs[i][1]
|
||||||
|
if last_end is None:
|
||||||
|
prev_zseg_row = zero_t[fi - 1]
|
||||||
|
absi_post_zseg = prev_zseg_row['index'][0] + 1
|
||||||
|
# XXX: MUST BACKFILL previous end iabs!
|
||||||
|
absi_zsegs[i][1] = absi_post_zseg
|
||||||
|
|
||||||
|
else:
|
||||||
|
if 0 < num_gaps < 2:
|
||||||
|
absi_zsegs[-1][1] = absi_zeros[-1] + 1
|
||||||
|
|
||||||
|
iabs_first: int = frame['index'][0]
|
||||||
|
for start, end in absi_zsegs:
|
||||||
|
ts_start: float = times[start - iabs_first]
|
||||||
|
ts_end: float = times[end - iabs_first]
|
||||||
|
if (
|
||||||
|
ts_start == 0
|
||||||
|
or
|
||||||
|
ts_end == 0
|
||||||
|
):
|
||||||
|
import pdbp
|
||||||
|
pdbp.set_trace()
|
||||||
|
|
||||||
|
assert end
|
||||||
|
assert start < end
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
f'Frame has {len(absi_zsegs)} NULL GAPS!?\n'
|
||||||
|
f'period: {period}\n'
|
||||||
|
f'total null samples: {len(zero_t)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
absi_zsegs, # [start, end] abs slice indices of seg
|
||||||
|
absi_zeros, # all abs indices within all null-segs
|
||||||
|
zero_t, # sliced-view of all null-segment rows-datums
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def iter_null_segs(
|
||||||
|
timeframe: float,
|
||||||
|
frame: Frame | None = None,
|
||||||
|
null_segs: tuple | None = None,
|
||||||
|
|
||||||
|
) -> Generator[
|
||||||
|
tuple[
|
||||||
|
int, int,
|
||||||
|
int, int,
|
||||||
|
float, float,
|
||||||
|
float, float,
|
||||||
|
|
||||||
|
# Seq, # TODO: can we make it an array-type instead?
|
||||||
|
# list[
|
||||||
|
# list[int, int],
|
||||||
|
# ],
|
||||||
|
# Seq,
|
||||||
|
# Frame
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
]:
|
||||||
|
if null_segs is None:
|
||||||
|
null_segs: tuple = get_null_segs(
|
||||||
|
frame,
|
||||||
|
period=timeframe,
|
||||||
)
|
)
|
||||||
|
|
||||||
return None
|
absi_pairs_zsegs: list[list[float, float]]
|
||||||
|
izeros: Seq
|
||||||
|
zero_t: Frame
|
||||||
|
(
|
||||||
|
absi_pairs_zsegs,
|
||||||
|
izeros,
|
||||||
|
zero_t,
|
||||||
|
) = null_segs
|
||||||
|
|
||||||
|
absi_first: int = frame[0]['index']
|
||||||
|
for (
|
||||||
|
absi_start,
|
||||||
|
absi_end,
|
||||||
|
) in absi_pairs_zsegs:
|
||||||
|
|
||||||
t_unit: Literal = Literal[
|
fi_end: int = absi_end - absi_first
|
||||||
'days',
|
end_row: Seq = frame[fi_end]
|
||||||
'hours',
|
end_t: float = end_row['time']
|
||||||
'minutes',
|
end_dt: DateTime = from_timestamp(end_t)
|
||||||
'seconds',
|
|
||||||
'miliseconds',
|
fi_start = None
|
||||||
'microseconds',
|
start_row = None
|
||||||
'nanoseconds',
|
start_t = None
|
||||||
]
|
start_dt = None
|
||||||
|
if (
|
||||||
|
absi_start is not None
|
||||||
|
and start_t != 0
|
||||||
|
):
|
||||||
|
fi_start: int = absi_start - absi_first
|
||||||
|
start_row: Seq = frame[fi_start]
|
||||||
|
start_t: float = start_row['time']
|
||||||
|
start_dt: DateTime = from_timestamp(start_t)
|
||||||
|
|
||||||
|
if absi_start < 0:
|
||||||
|
import pdbp
|
||||||
|
pdbp.set_trace()
|
||||||
|
|
||||||
|
yield (
|
||||||
|
absi_start, absi_end, # abs indices
|
||||||
|
fi_start, fi_end, # relative "frame" indices
|
||||||
|
start_t, end_t,
|
||||||
|
start_dt, end_dt,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def with_dts(
|
def with_dts(
|
||||||
|
@ -292,6 +538,17 @@ def dedup_dt(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
t_unit: Literal = Literal[
|
||||||
|
'days',
|
||||||
|
'hours',
|
||||||
|
'minutes',
|
||||||
|
'seconds',
|
||||||
|
'miliseconds',
|
||||||
|
'microseconds',
|
||||||
|
'nanoseconds',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def detect_time_gaps(
|
def detect_time_gaps(
|
||||||
df: pl.DataFrame,
|
df: pl.DataFrame,
|
||||||
|
|
||||||
|
@ -406,10 +663,6 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||||
f'Gaps found:\n{gaps}\n'
|
f'Gaps found:\n{gaps}\n'
|
||||||
f'deduped Gaps found:\n{deduped_gaps}'
|
f'deduped Gaps found:\n{deduped_gaps}'
|
||||||
)
|
)
|
||||||
# TODO: rewrite this in polars and/or convert to
|
|
||||||
# ndarray to detect and remove?
|
|
||||||
# null_gaps = detect_null_time_gap()
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
df,
|
df,
|
||||||
gaps,
|
gaps,
|
||||||
|
@ -428,14 +681,19 @@ def sort_diff(
|
||||||
list[int], # indices of segments that are out-of-order
|
list[int], # indices of segments that are out-of-order
|
||||||
]:
|
]:
|
||||||
ser: pl.Series = src_df[col]
|
ser: pl.Series = src_df[col]
|
||||||
|
|
||||||
diff: pl.Series = ser.diff()
|
|
||||||
sortd: pl.DataFrame = ser.sort()
|
sortd: pl.DataFrame = ser.sort()
|
||||||
|
diff: pl.Series = ser.diff()
|
||||||
|
|
||||||
sortd_diff: pl.Series = sortd.diff()
|
sortd_diff: pl.Series = sortd.diff()
|
||||||
i_step_diff = (diff != sortd_diff).arg_true()
|
i_step_diff = (diff != sortd_diff).arg_true()
|
||||||
if i_step_diff.len():
|
frame_reorders: int = i_step_diff.len()
|
||||||
import pdbp
|
if frame_reorders:
|
||||||
pdbp.set_trace()
|
log.warn(
|
||||||
|
f'Resorted frame on col: {col}\n'
|
||||||
|
f'{frame_reorders}'
|
||||||
|
|
||||||
|
)
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
# NOTE: thanks to this SO answer for the below conversion routines
|
# NOTE: thanks to this SO answer for the below conversion routines
|
||||||
# to go from numpy struct-arrays to polars dataframes and back:
|
# to go from numpy struct-arrays to polars dataframes and back:
|
||||||
|
|
|
@ -21,8 +21,6 @@ Storage middle-ware CLIs.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import time
|
import time
|
||||||
from typing import Generator
|
|
||||||
# from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
import polars as pl
|
import polars as pl
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -37,14 +35,11 @@ from piker.service import open_piker_runtime
|
||||||
from piker.cli import cli
|
from piker.cli import cli
|
||||||
from piker.config import get_conf_dir
|
from piker.config import get_conf_dir
|
||||||
from piker.data import (
|
from piker.data import (
|
||||||
maybe_open_shm_array,
|
|
||||||
def_iohlcv_fields,
|
|
||||||
ShmArray,
|
ShmArray,
|
||||||
tsp,
|
tsp,
|
||||||
)
|
)
|
||||||
from piker.data.history import (
|
from piker.data.history import (
|
||||||
_default_hist_size,
|
iter_dfs_from_shms,
|
||||||
_default_rt_size,
|
|
||||||
)
|
)
|
||||||
from . import (
|
from . import (
|
||||||
log,
|
log,
|
||||||
|
@ -190,6 +185,13 @@ def anal(
|
||||||
)
|
)
|
||||||
assert first_dt < last_dt
|
assert first_dt < last_dt
|
||||||
|
|
||||||
|
null_segs: tuple = tsp.get_null_segs(
|
||||||
|
frame=history,
|
||||||
|
period=period,
|
||||||
|
)
|
||||||
|
if null_segs:
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
shm_df: pl.DataFrame = await client.as_df(
|
shm_df: pl.DataFrame = await client.as_df(
|
||||||
fqme,
|
fqme,
|
||||||
period,
|
period,
|
||||||
|
@ -204,6 +206,7 @@ def anal(
|
||||||
diff,
|
diff,
|
||||||
) = tsp.dedupe(shm_df)
|
) = tsp.dedupe(shm_df)
|
||||||
|
|
||||||
|
|
||||||
if diff:
|
if diff:
|
||||||
await client.write_ohlcv(
|
await client.write_ohlcv(
|
||||||
fqme,
|
fqme,
|
||||||
|
@ -219,69 +222,6 @@ def anal(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
def iter_dfs_from_shms(fqme: str) -> Generator[
|
|
||||||
tuple[Path, ShmArray, pl.DataFrame],
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
]:
|
|
||||||
# shm buffer size table based on known sample rates
|
|
||||||
sizes: dict[str, int] = {
|
|
||||||
'hist': _default_hist_size,
|
|
||||||
'rt': _default_rt_size,
|
|
||||||
}
|
|
||||||
|
|
||||||
# load all detected shm buffer files which have the
|
|
||||||
# passed FQME pattern in the file name.
|
|
||||||
shmfiles: list[Path] = []
|
|
||||||
shmdir = Path('/dev/shm/')
|
|
||||||
|
|
||||||
for shmfile in shmdir.glob(f'*{fqme}*'):
|
|
||||||
filename: str = shmfile.name
|
|
||||||
|
|
||||||
# skip index files
|
|
||||||
if (
|
|
||||||
'_first' in filename
|
|
||||||
or '_last' in filename
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
|
|
||||||
assert shmfile.is_file()
|
|
||||||
log.debug(f'Found matching shm buffer file: {filename}')
|
|
||||||
shmfiles.append(shmfile)
|
|
||||||
|
|
||||||
for shmfile in shmfiles:
|
|
||||||
|
|
||||||
# lookup array buffer size based on file suffix
|
|
||||||
# being either .rt or .hist
|
|
||||||
key: str = shmfile.name.rsplit('.')[-1]
|
|
||||||
|
|
||||||
# skip FSP buffers for now..
|
|
||||||
if key not in sizes:
|
|
||||||
continue
|
|
||||||
|
|
||||||
size: int = sizes[key]
|
|
||||||
|
|
||||||
# attach to any shm buffer, load array into polars df,
|
|
||||||
# write to local parquet file.
|
|
||||||
shm, opened = maybe_open_shm_array(
|
|
||||||
key=shmfile.name,
|
|
||||||
size=size,
|
|
||||||
dtype=def_iohlcv_fields,
|
|
||||||
readonly=True,
|
|
||||||
)
|
|
||||||
assert not opened
|
|
||||||
ohlcv = shm.array
|
|
||||||
|
|
||||||
from ..data import tsp
|
|
||||||
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
|
||||||
|
|
||||||
yield (
|
|
||||||
shmfile,
|
|
||||||
shm,
|
|
||||||
df,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@store.command()
|
@store.command()
|
||||||
def ldshm(
|
def ldshm(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
@ -307,8 +247,8 @@ def ldshm(
|
||||||
|
|
||||||
# compute ohlc properties for naming
|
# compute ohlc properties for naming
|
||||||
times: np.ndarray = shm.array['time']
|
times: np.ndarray = shm.array['time']
|
||||||
secs: float = times[-1] - times[-2]
|
period_s: float = times[-1] - times[-2]
|
||||||
if secs < 1.:
|
if period_s < 1.:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'Something is wrong with time period for {shm}:\n{times}'
|
f'Something is wrong with time period for {shm}:\n{times}'
|
||||||
)
|
)
|
||||||
|
@ -323,17 +263,22 @@ def ldshm(
|
||||||
diff,
|
diff,
|
||||||
) = tsp.dedupe(shm_df)
|
) = tsp.dedupe(shm_df)
|
||||||
|
|
||||||
|
null_segs: tuple = tsp.get_null_segs(
|
||||||
|
frame=shm.array,
|
||||||
|
period=period_s,
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: maybe only optionally enter this depending
|
# TODO: maybe only optionally enter this depending
|
||||||
# on some CLI flags and/or gap detection?
|
# on some CLI flags and/or gap detection?
|
||||||
if (
|
if not gaps.is_empty():
|
||||||
not gaps.is_empty()
|
await tractor.pause()
|
||||||
or secs > 2
|
|
||||||
):
|
if null_segs:
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
# write to parquet file?
|
# write to parquet file?
|
||||||
if write_parquet:
|
if write_parquet:
|
||||||
timeframe: str = f'{secs}s'
|
timeframe: str = f'{period_s}s'
|
||||||
|
|
||||||
datadir: Path = get_conf_dir() / 'nativedb'
|
datadir: Path = get_conf_dir() / 'nativedb'
|
||||||
if not datadir.is_dir():
|
if not datadir.is_dir():
|
||||||
|
|
|
@ -342,7 +342,6 @@ class NativeStorageClient:
|
||||||
)
|
)
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
async def write_ohlcv(
|
async def write_ohlcv(
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
|
@ -207,9 +207,10 @@ class ContentsLabel(pg.LabelItem):
|
||||||
# this being "html" is the dumbest shit :eyeroll:
|
# this being "html" is the dumbest shit :eyeroll:
|
||||||
|
|
||||||
self.setText(
|
self.setText(
|
||||||
"<b>i</b>:{index}<br/>"
|
"<b>i_arr</b>:{index}<br/>"
|
||||||
# NB: these fields must be indexed in the correct order via
|
# NB: these fields must be indexed in the correct order via
|
||||||
# the slice syntax below.
|
# the slice syntax below.
|
||||||
|
"<b>i_shm</b>:{}<br/>"
|
||||||
"<b>epoch</b>:{}<br/>"
|
"<b>epoch</b>:{}<br/>"
|
||||||
"<b>O</b>:{}<br/>"
|
"<b>O</b>:{}<br/>"
|
||||||
"<b>H</b>:{}<br/>"
|
"<b>H</b>:{}<br/>"
|
||||||
|
@ -219,6 +220,7 @@ class ContentsLabel(pg.LabelItem):
|
||||||
# "<b>wap</b>:{}".format(
|
# "<b>wap</b>:{}".format(
|
||||||
*array[ix][
|
*array[ix][
|
||||||
[
|
[
|
||||||
|
'index',
|
||||||
'time',
|
'time',
|
||||||
'open',
|
'open',
|
||||||
'high',
|
'high',
|
||||||
|
|
|
@ -248,25 +248,27 @@ async def increment_history_view(
|
||||||
# - samplerd could emit the actual update range via
|
# - samplerd could emit the actual update range via
|
||||||
# tuple and then we only enter the below block if that
|
# tuple and then we only enter the below block if that
|
||||||
# range is detected as in-view?
|
# range is detected as in-view?
|
||||||
if (
|
match msg:
|
||||||
(bf_wut := msg.get('backfilling', False))
|
case {
|
||||||
):
|
'backfilling': (viz_name, timeframe),
|
||||||
viz_name, timeframe = bf_wut
|
} if (
|
||||||
if (
|
|
||||||
viz_name == name
|
viz_name == name
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
f'Forcing HARD REDRAW:\n'
|
||||||
|
f'name: {name}\n'
|
||||||
|
f'timeframe: {timeframe}\n'
|
||||||
|
)
|
||||||
# TODO: only allow this when the data is IN VIEW!
|
# TODO: only allow this when the data is IN VIEW!
|
||||||
# also, we probably can do this more efficiently
|
# also, we probably can do this more efficiently
|
||||||
# / smarter by only redrawing the portion of the
|
# / smarter by only redrawing the portion of the
|
||||||
# path necessary?
|
# path necessary?
|
||||||
and False
|
{
|
||||||
):
|
60: hist_viz,
|
||||||
log.info(f'Forcing hard redraw -> {name}@{timeframe}')
|
1: viz,
|
||||||
match timeframe:
|
}[timeframe].update_graphics(
|
||||||
case 60:
|
force_redraw=True
|
||||||
hist_viz.update_graphics(force_redraw=True)
|
)
|
||||||
case 1:
|
|
||||||
viz.update_graphics(force_redraw=True)
|
|
||||||
|
|
||||||
# check if slow chart needs an x-domain shift and/or
|
# check if slow chart needs an x-domain shift and/or
|
||||||
# y-range resize.
|
# y-range resize.
|
||||||
|
|
Loading…
Reference in New Issue