2023-04-20 17:36:52 +00:00
|
|
|
# piker: trading gear for hackers
|
|
|
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
|
|
|
2023-12-13 00:57:46 +00:00
|
|
|
# This program is free software: you can redistribute it and/or
|
|
|
|
# modify it under the terms of the GNU Affero General Public
|
|
|
|
# License as published by the Free Software Foundation, either
|
|
|
|
# version 3 of the License, or (at your option) any later version.
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
2023-12-13 00:57:46 +00:00
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
|
|
# Affero General Public License for more details.
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-12-13 00:57:46 +00:00
|
|
|
# You should have received a copy of the GNU Affero General Public
|
|
|
|
# License along with this program. If not, see
|
|
|
|
# <https://www.gnu.org/licenses/>.
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
'''
|
2023-12-15 20:53:02 +00:00
|
|
|
Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
|
|
|
|
|
|
|
|
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
|
|
|
|
orchestration and mgmt of tsdb data sets.
|
|
|
|
- core data-provider history backfilling middleware (as task-funcs) via
|
|
|
|
(what will eventually be `datad`, but are rn is the) `.brokers` backend
|
|
|
|
APIs.
|
|
|
|
- various data set cleaning, repairing and issue-detection/analysis
|
|
|
|
routines to ensure consistent series whether in shm or when
|
|
|
|
stored offline (in a tsdb).
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
'''
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
|
|
from functools import partial
|
2023-12-15 20:53:02 +00:00
|
|
|
from pathlib import Path
|
2023-12-18 16:48:33 +00:00
|
|
|
from pprint import pformat
|
2023-04-20 17:36:52 +00:00
|
|
|
from types import ModuleType
|
|
|
|
from typing import (
|
|
|
|
Callable,
|
2023-12-15 20:53:02 +00:00
|
|
|
Generator,
|
2023-04-20 17:36:52 +00:00
|
|
|
TYPE_CHECKING,
|
|
|
|
)
|
|
|
|
|
|
|
|
import trio
|
|
|
|
from trio_typing import TaskStatus
|
|
|
|
import tractor
|
2023-06-07 03:59:59 +00:00
|
|
|
from pendulum import (
|
2023-12-15 18:11:00 +00:00
|
|
|
DateTime,
|
2023-06-07 03:59:59 +00:00
|
|
|
Duration,
|
|
|
|
from_timestamp,
|
|
|
|
)
|
2023-04-20 17:36:52 +00:00
|
|
|
import numpy as np
|
2023-12-13 00:57:46 +00:00
|
|
|
import polars as pl
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-05-17 20:52:15 +00:00
|
|
|
from ..accounting import (
|
2023-05-17 14:19:14 +00:00
|
|
|
MktPair,
|
|
|
|
)
|
2023-12-18 16:48:33 +00:00
|
|
|
from ..data._util import (
|
2023-04-20 17:36:52 +00:00
|
|
|
log,
|
|
|
|
)
|
2023-12-18 16:48:33 +00:00
|
|
|
from ..data._sharedmem import (
|
2023-04-20 17:36:52 +00:00
|
|
|
maybe_open_shm_array,
|
|
|
|
ShmArray,
|
|
|
|
)
|
2023-12-18 16:48:33 +00:00
|
|
|
from ..data._source import def_iohlcv_fields
|
|
|
|
from ..data._sampling import (
|
2023-04-20 17:36:52 +00:00
|
|
|
open_sample_stream,
|
|
|
|
)
|
2023-12-18 16:48:33 +00:00
|
|
|
from ._anal import (
|
|
|
|
|
2023-12-28 15:58:22 +00:00
|
|
|
get_null_segs as get_null_segs,
|
|
|
|
iter_null_segs as iter_null_segs,
|
|
|
|
Frame as Frame,
|
|
|
|
Seq as Seq,
|
2023-12-18 16:48:33 +00:00
|
|
|
|
|
|
|
# codec-ish
|
2023-12-28 15:58:22 +00:00
|
|
|
np2pl as np2pl,
|
|
|
|
pl2np as pl2np,
|
2023-12-18 16:48:33 +00:00
|
|
|
|
|
|
|
# `numpy` only
|
2023-12-28 15:58:22 +00:00
|
|
|
slice_from_time as slice_from_time,
|
2023-12-18 16:48:33 +00:00
|
|
|
|
|
|
|
# `polars` specific
|
2023-12-28 15:58:22 +00:00
|
|
|
dedupe as dedupe,
|
|
|
|
with_dts as with_dts,
|
|
|
|
detect_time_gaps as detect_time_gaps,
|
|
|
|
sort_diff as sort_diff,
|
2023-12-18 16:48:33 +00:00
|
|
|
|
|
|
|
# TODO:
|
2023-12-28 15:58:22 +00:00
|
|
|
detect_price_gaps as detect_price_gaps
|
2023-12-13 20:08:42 +00:00
|
|
|
)
|
2023-12-18 16:48:33 +00:00
|
|
|
|
|
|
|
# TODO: break up all this shite into submods!
|
2023-04-20 17:36:52 +00:00
|
|
|
from ..brokers._util import (
|
|
|
|
DataUnavailable,
|
|
|
|
)
|
2023-12-07 17:31:12 +00:00
|
|
|
from ..storage import TimeseriesNotFound
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
2023-06-02 16:17:31 +00:00
|
|
|
from bidict import bidict
|
2023-05-31 21:56:32 +00:00
|
|
|
from ..service.marketstore import StorageClient
|
2023-12-26 22:35:38 +00:00
|
|
|
# from .feed import _FeedsBus
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
|
2023-06-15 17:04:21 +00:00
|
|
|
# `ShmArray` buffer sizing configuration:
|
|
|
|
_mins_in_day = int(60 * 24)
|
|
|
|
# how much is probably dependent on lifestyle
|
|
|
|
# but we reco a buncha times (but only on a
|
|
|
|
# run-every-other-day kinda week).
|
|
|
|
_secs_in_day = int(60 * _mins_in_day)
|
|
|
|
_days_in_week: int = 7
|
|
|
|
|
|
|
|
_days_worth: int = 3
|
|
|
|
_default_hist_size: int = 6 * 365 * _mins_in_day
|
|
|
|
_hist_buffer_start = int(
|
|
|
|
_default_hist_size - round(7 * _mins_in_day)
|
|
|
|
)
|
|
|
|
|
|
|
|
_default_rt_size: int = _days_worth * _secs_in_day
|
|
|
|
# NOTE: start the append index in rt buffer such that 1 day's worth
|
|
|
|
# can be appenened before overrun.
|
|
|
|
_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
|
|
|
|
|
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
def diff_history(
|
|
|
|
array: np.ndarray,
|
2023-06-07 03:59:59 +00:00
|
|
|
append_until_dt: datetime | None = None,
|
|
|
|
prepend_until_dt: datetime | None = None,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
) -> np.ndarray:
|
|
|
|
|
|
|
|
# no diffing with tsdb dt index possible..
|
2023-06-07 03:59:59 +00:00
|
|
|
if (
|
|
|
|
prepend_until_dt is None
|
|
|
|
and append_until_dt is None
|
|
|
|
):
|
2023-04-20 17:36:52 +00:00
|
|
|
return array
|
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
times = array['time']
|
|
|
|
|
|
|
|
if append_until_dt:
|
|
|
|
return array[times < append_until_dt.timestamp()]
|
|
|
|
else:
|
|
|
|
return array[times >= prepend_until_dt.timestamp()]
|
|
|
|
|
2023-06-08 22:50:15 +00:00
|
|
|
|
2023-12-15 20:53:02 +00:00
|
|
|
# TODO: can't we just make this a sync func now?
|
2023-06-08 15:16:19 +00:00
|
|
|
async def shm_push_in_between(
|
|
|
|
shm: ShmArray,
|
|
|
|
to_push: np.ndarray,
|
|
|
|
prepend_index: int,
|
|
|
|
|
|
|
|
update_start_on_prepend: bool = False,
|
|
|
|
|
|
|
|
) -> int:
|
2023-12-15 20:53:02 +00:00
|
|
|
# XXX: extremely important, there can be no checkpoints
|
|
|
|
# in the body of this func to avoid entering new ``frames``
|
|
|
|
# values while we're pipelining the current ones to
|
|
|
|
# memory...
|
2023-06-08 15:16:19 +00:00
|
|
|
shm.push(
|
|
|
|
to_push,
|
|
|
|
prepend=True,
|
|
|
|
|
|
|
|
# XXX: only update the ._first index if no tsdb
|
|
|
|
# segment was previously prepended by the
|
|
|
|
# parent task.
|
|
|
|
update_first=update_start_on_prepend,
|
|
|
|
|
|
|
|
# XXX: only prepend from a manually calculated shm
|
|
|
|
# index if there was already a tsdb history
|
|
|
|
# segment prepended (since then the
|
|
|
|
# ._first.value is going to be wayyy in the
|
|
|
|
# past!)
|
|
|
|
start=(
|
|
|
|
prepend_index
|
|
|
|
if not update_start_on_prepend
|
|
|
|
else None
|
|
|
|
),
|
|
|
|
)
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
|
2023-12-13 20:08:42 +00:00
|
|
|
async def maybe_fill_null_segments(
|
|
|
|
shm: ShmArray,
|
|
|
|
timeframe: float,
|
|
|
|
get_hist: Callable,
|
|
|
|
sampler_stream: tractor.MsgStream,
|
|
|
|
mkt: MktPair,
|
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
2023-12-13 20:08:42 +00:00
|
|
|
) -> list[Frame]:
|
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
null_segs_detected = trio.Event()
|
|
|
|
task_status.started(null_segs_detected)
|
|
|
|
|
2023-12-13 20:08:42 +00:00
|
|
|
frame: Frame = shm.array
|
2023-12-15 18:11:00 +00:00
|
|
|
|
|
|
|
null_segs: tuple | None = get_null_segs(
|
|
|
|
frame,
|
|
|
|
period=timeframe,
|
|
|
|
)
|
|
|
|
for (
|
2023-12-13 23:29:06 +00:00
|
|
|
absi_start, absi_end,
|
|
|
|
fi_start, fi_end,
|
|
|
|
start_t, end_t,
|
|
|
|
start_dt, end_dt,
|
|
|
|
) in iter_null_segs(
|
2023-12-15 18:11:00 +00:00
|
|
|
null_segs=null_segs,
|
|
|
|
frame=frame,
|
2023-12-13 23:29:06 +00:00
|
|
|
timeframe=timeframe,
|
|
|
|
):
|
|
|
|
|
|
|
|
# XXX NOTE: ?if we get a badly ordered timestamp
|
|
|
|
# pair, immediately stop backfilling?
|
|
|
|
if (
|
|
|
|
start_dt
|
|
|
|
and end_dt < start_dt
|
|
|
|
):
|
2023-12-15 18:11:00 +00:00
|
|
|
await tractor.pause()
|
2023-12-13 23:29:06 +00:00
|
|
|
break
|
|
|
|
|
2023-12-13 20:08:42 +00:00
|
|
|
(
|
2023-12-13 23:29:06 +00:00
|
|
|
array,
|
|
|
|
next_start_dt,
|
|
|
|
next_end_dt,
|
|
|
|
) = await get_hist(
|
|
|
|
timeframe,
|
|
|
|
start_dt=start_dt,
|
|
|
|
end_dt=end_dt,
|
|
|
|
)
|
|
|
|
|
|
|
|
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
|
|
|
# and mnq.cme.ib this causes a Qt crash XXDDD
|
|
|
|
|
|
|
|
# make sure we don't overrun the buffer start
|
|
|
|
len_to_push: int = min(absi_end, array.size)
|
|
|
|
to_push: np.ndarray = array[-len_to_push:]
|
|
|
|
|
|
|
|
await shm_push_in_between(
|
|
|
|
shm,
|
|
|
|
to_push,
|
|
|
|
prepend_index=absi_end,
|
|
|
|
update_start_on_prepend=False,
|
|
|
|
)
|
|
|
|
# TODO: UI side needs IPC event to update..
|
|
|
|
# - make sure the UI actually always handles
|
|
|
|
# this update!
|
|
|
|
# - remember that in the display side, only refersh this
|
|
|
|
# if the respective history is actually "in view".
|
|
|
|
# loop
|
2023-12-23 02:34:31 +00:00
|
|
|
try:
|
|
|
|
await sampler_stream.send({
|
|
|
|
'broadcast_all': {
|
|
|
|
|
|
|
|
# XXX NOTE XXX: see the
|
|
|
|
# `.ui._display.increment_history_view()` if block
|
|
|
|
# that looks for this info to FORCE a hard viz
|
|
|
|
# redraw!
|
|
|
|
'backfilling': (mkt.fqme, timeframe),
|
|
|
|
},
|
|
|
|
})
|
|
|
|
except tractor.ContextCancelled:
|
|
|
|
# log.exception
|
|
|
|
await tractor.pause()
|
2023-12-13 23:29:06 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
null_segs_detected.set()
|
|
|
|
# RECHECK for more null-gaps
|
|
|
|
frame: Frame = shm.array
|
|
|
|
null_segs: tuple | None = get_null_segs(
|
|
|
|
frame,
|
|
|
|
period=timeframe,
|
|
|
|
)
|
|
|
|
if (
|
|
|
|
null_segs
|
|
|
|
and
|
|
|
|
len(null_segs[-1])
|
|
|
|
):
|
2023-12-18 16:48:33 +00:00
|
|
|
(
|
|
|
|
iabs_slices,
|
|
|
|
iabs_zero_rows,
|
|
|
|
zero_t,
|
|
|
|
) = null_segs
|
|
|
|
log.warning(
|
|
|
|
f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
|
|
|
|
f'{pformat(iabs_slices)}'
|
|
|
|
)
|
2023-12-15 20:53:02 +00:00
|
|
|
|
2023-12-18 16:48:33 +00:00
|
|
|
# TODO: always backfill gaps with the earliest (price) datum's
|
|
|
|
# value to avoid the y-ranger including zeros and completely
|
|
|
|
# stretching the y-axis..
|
|
|
|
# array: np.ndarray = shm.array
|
|
|
|
# zeros = array[array['low'] == 0]
|
|
|
|
ohlc_fields: list[str] = [
|
2023-12-15 20:53:02 +00:00
|
|
|
'open',
|
|
|
|
'high',
|
|
|
|
'low',
|
|
|
|
'close',
|
2023-12-18 16:48:33 +00:00
|
|
|
]
|
|
|
|
|
|
|
|
for istart, istop in iabs_slices:
|
|
|
|
|
|
|
|
# get view into buffer for null-segment
|
|
|
|
gap: np.ndarray = shm._array[istart:istop]
|
|
|
|
|
|
|
|
# copy the oldest OHLC samples forward
|
|
|
|
gap[ohlc_fields] = shm._array[istart]['close']
|
|
|
|
|
|
|
|
start_t: float = shm._array[istart]['time']
|
|
|
|
t_diff: float = (istop - istart)*timeframe
|
|
|
|
gap['time'] = np.arange(
|
|
|
|
start=start_t,
|
|
|
|
stop=start_t + t_diff,
|
|
|
|
step=timeframe,
|
|
|
|
)
|
|
|
|
|
|
|
|
await sampler_stream.send({
|
|
|
|
'broadcast_all': {
|
|
|
|
|
|
|
|
# XXX NOTE XXX: see the
|
|
|
|
# `.ui._display.increment_history_view()` if block
|
|
|
|
# that looks for this info to FORCE a hard viz
|
|
|
|
# redraw!
|
|
|
|
'backfilling': (mkt.fqme, timeframe),
|
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
# TODO: interatively step through any remaining
|
|
|
|
# time-gaps/null-segments and spawn piecewise backfiller
|
|
|
|
# tasks in a nursery?
|
|
|
|
# -[ ] not sure that's going to work so well on the ib
|
|
|
|
# backend but worth a shot?
|
|
|
|
# -[ ] mk new history connections to make it properly
|
|
|
|
# parallel possible no matter the backend?
|
|
|
|
# -[ ] fill algo: do queries in alternating "latest, then
|
|
|
|
# earliest, then latest.. etc?"
|
|
|
|
# await tractor.pause()
|
2023-12-13 20:08:42 +00:00
|
|
|
|
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
async def start_backfill(
|
2023-06-07 03:59:59 +00:00
|
|
|
get_hist,
|
2023-04-20 17:36:52 +00:00
|
|
|
mod: ModuleType,
|
2023-05-17 14:19:14 +00:00
|
|
|
mkt: MktPair,
|
2023-04-20 17:36:52 +00:00
|
|
|
shm: ShmArray,
|
|
|
|
timeframe: float,
|
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
backfill_from_shm_index: int,
|
|
|
|
backfill_from_dt: datetime,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
sampler_stream: tractor.MsgStream,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
backfill_until_dt: datetime | None = None,
|
|
|
|
storage: StorageClient | None = None,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
write_tsdb: bool = True,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
) -> int:
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# let caller unblock and deliver latest history frame
|
2023-06-08 15:16:19 +00:00
|
|
|
# and use to signal that backfilling the shm gap until
|
|
|
|
# the tsdb end is complete!
|
|
|
|
bf_done = trio.Event()
|
|
|
|
task_status.started(bf_done)
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# based on the sample step size, maybe load a certain amount history
|
2023-06-07 03:59:59 +00:00
|
|
|
update_start_on_prepend: bool = False
|
|
|
|
if backfill_until_dt is None:
|
2023-05-31 21:56:32 +00:00
|
|
|
|
2023-06-08 15:16:19 +00:00
|
|
|
# TODO: drop this right and just expose the backfill
|
|
|
|
# limits inside a [storage] section in conf.toml?
|
2023-04-20 17:36:52 +00:00
|
|
|
# when no tsdb "last datum" is provided, we just load
|
|
|
|
# some near-term history.
|
2023-06-07 03:59:59 +00:00
|
|
|
# periods = {
|
|
|
|
# 1: {'days': 1},
|
|
|
|
# 60: {'days': 14},
|
|
|
|
# }
|
|
|
|
|
|
|
|
# do a decently sized backfill and load it into storage.
|
2023-04-20 17:36:52 +00:00
|
|
|
periods = {
|
2023-12-13 00:57:46 +00:00
|
|
|
1: {'days': 2},
|
2023-06-07 03:59:59 +00:00
|
|
|
60: {'years': 6},
|
2023-04-20 17:36:52 +00:00
|
|
|
}
|
2023-06-07 03:59:59 +00:00
|
|
|
period_duration: int = periods[timeframe]
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
update_start_on_prepend = True
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# NOTE: manually set the "latest" datetime which we intend to
|
|
|
|
# backfill history "until" so as to adhere to the history
|
|
|
|
# settings above when the tsdb is detected as being empty.
|
2023-06-07 03:59:59 +00:00
|
|
|
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-12-13 20:08:42 +00:00
|
|
|
# STAGE NOTE: "backward history gap filling":
|
|
|
|
# - we push to the shm buffer until we have history back
|
|
|
|
# until the latest entry loaded from the tsdb's table B)
|
|
|
|
# - after this loop continue to check for other gaps in the
|
|
|
|
# (tsdb) history and (at least report) maybe fill them
|
|
|
|
# from new frame queries to the backend?
|
2023-06-08 15:16:19 +00:00
|
|
|
last_start_dt: datetime = backfill_from_dt
|
2023-06-07 03:59:59 +00:00
|
|
|
next_prepend_index: int = backfill_from_shm_index
|
|
|
|
|
2023-06-08 15:16:19 +00:00
|
|
|
while last_start_dt > backfill_until_dt:
|
2023-12-13 20:08:42 +00:00
|
|
|
log.info(
|
|
|
|
f'Requesting {timeframe}s frame:\n'
|
|
|
|
f'backfill_until_dt: {backfill_until_dt}\n'
|
|
|
|
f'last_start_dt: {last_start_dt}\n'
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
2023-06-07 03:59:59 +00:00
|
|
|
(
|
|
|
|
array,
|
|
|
|
next_start_dt,
|
|
|
|
next_end_dt,
|
|
|
|
) = await get_hist(
|
2023-04-20 17:36:52 +00:00
|
|
|
timeframe,
|
2023-06-08 15:16:19 +00:00
|
|
|
end_dt=last_start_dt,
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
# broker says there never was or is no more history to pull
|
|
|
|
except DataUnavailable:
|
|
|
|
log.warning(
|
2023-12-27 21:55:00 +00:00
|
|
|
f'NO-MORE-DATA in range?\n'
|
|
|
|
f'`{mod.name}` halted history:\n'
|
|
|
|
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
|
|
|
'bf_until <- last_start_dt:\n'
|
|
|
|
f'{backfill_until_dt} <- {last_start_dt}\n'
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
# ugh, what's a better way?
|
|
|
|
# TODO: fwiw, we probably want a way to signal a throttle
|
|
|
|
# condition (eg. with ib) so that we can halt the
|
|
|
|
# request loop until the condition is resolved?
|
2023-12-13 00:57:46 +00:00
|
|
|
if timeframe > 1:
|
|
|
|
await tractor.pause()
|
2023-04-20 17:36:52 +00:00
|
|
|
return
|
|
|
|
|
2023-12-13 20:08:42 +00:00
|
|
|
assert (
|
|
|
|
array['time'][0]
|
|
|
|
==
|
|
|
|
next_start_dt.timestamp()
|
|
|
|
)
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-08 22:50:15 +00:00
|
|
|
diff = last_start_dt - next_start_dt
|
2023-04-20 17:36:52 +00:00
|
|
|
frame_time_diff_s = diff.seconds
|
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
# frame's worth of sample-period-steps, in seconds
|
2023-12-13 20:08:42 +00:00
|
|
|
frame_size_s: float = len(array) * timeframe
|
|
|
|
expected_frame_size_s: float = frame_size_s + timeframe
|
2023-04-20 17:36:52 +00:00
|
|
|
if frame_time_diff_s > expected_frame_size_s:
|
|
|
|
|
|
|
|
# XXX: query result includes a start point prior to our
|
|
|
|
# expected "frame size" and thus is likely some kind of
|
|
|
|
# history gap (eg. market closed period, outage, etc.)
|
|
|
|
# so just report it to console for now.
|
|
|
|
log.warning(
|
2023-12-13 20:08:42 +00:00
|
|
|
'GAP DETECTED:\n'
|
|
|
|
f'last_start_dt: {last_start_dt}\n'
|
|
|
|
f'diff: {diff}\n'
|
|
|
|
f'frame_time_diff_s: {frame_time_diff_s}\n'
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
to_push = diff_history(
|
|
|
|
array,
|
2023-06-07 03:59:59 +00:00
|
|
|
prepend_until_dt=backfill_until_dt,
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
2023-12-13 00:57:46 +00:00
|
|
|
ln: int = len(to_push)
|
2023-04-20 17:36:52 +00:00
|
|
|
if ln:
|
2023-12-13 00:57:46 +00:00
|
|
|
log.info(
|
|
|
|
f'{ln} bars for {next_start_dt} -> {last_start_dt}'
|
|
|
|
)
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
else:
|
|
|
|
log.warning(
|
2023-06-08 22:50:15 +00:00
|
|
|
'0 BARS TO PUSH after diff!?\n'
|
|
|
|
f'{next_start_dt} -> {last_start_dt}'
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
|
|
|
|
2023-06-02 16:17:31 +00:00
|
|
|
# bail gracefully on shm allocation overrun/full
|
|
|
|
# condition
|
2023-04-20 17:36:52 +00:00
|
|
|
try:
|
2023-06-08 15:16:19 +00:00
|
|
|
await shm_push_in_between(
|
|
|
|
shm,
|
2023-06-02 16:17:31 +00:00
|
|
|
to_push,
|
2023-06-08 15:16:19 +00:00
|
|
|
prepend_index=next_prepend_index,
|
|
|
|
update_start_on_prepend=update_start_on_prepend,
|
2023-06-02 16:17:31 +00:00
|
|
|
)
|
2023-06-07 03:59:59 +00:00
|
|
|
await sampler_stream.send({
|
|
|
|
'broadcast_all': {
|
2023-06-15 15:43:58 +00:00
|
|
|
'backfilling': (mkt.fqme, timeframe),
|
2023-06-07 03:59:59 +00:00
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
# decrement next prepend point
|
|
|
|
next_prepend_index = next_prepend_index - ln
|
2023-06-08 15:16:19 +00:00
|
|
|
last_start_dt = next_start_dt
|
2023-06-07 03:59:59 +00:00
|
|
|
|
|
|
|
except ValueError as ve:
|
|
|
|
_ve = ve
|
2023-06-08 15:16:19 +00:00
|
|
|
log.error(
|
2023-06-08 22:50:15 +00:00
|
|
|
f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?'
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
2023-06-07 03:59:59 +00:00
|
|
|
|
2023-06-08 15:16:19 +00:00
|
|
|
if next_prepend_index < ln:
|
|
|
|
log.warning(
|
|
|
|
f'Shm buffer can only hold {next_prepend_index} more rows..\n'
|
|
|
|
f'Appending those from recent {ln}-sized frame, no more!'
|
|
|
|
)
|
|
|
|
|
|
|
|
to_push = to_push[-next_prepend_index + 1:]
|
|
|
|
await shm_push_in_between(
|
|
|
|
shm,
|
|
|
|
to_push,
|
|
|
|
prepend_index=next_prepend_index,
|
|
|
|
update_start_on_prepend=update_start_on_prepend,
|
|
|
|
)
|
|
|
|
await sampler_stream.send({
|
|
|
|
'broadcast_all': {
|
2023-06-15 15:43:58 +00:00
|
|
|
'backfilling': (mkt.fqme, timeframe),
|
2023-06-08 15:16:19 +00:00
|
|
|
},
|
|
|
|
})
|
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
# can't push the entire frame? so
|
|
|
|
# push only the amount that can fit..
|
|
|
|
break
|
|
|
|
|
|
|
|
log.info(
|
|
|
|
f'Shm pushed {ln} frame:\n'
|
2023-06-08 22:50:15 +00:00
|
|
|
f'{next_start_dt} -> {last_start_dt}'
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
# FINALLY, maybe write immediately to the tsdb backend for
|
|
|
|
# long-term storage.
|
2023-04-20 17:36:52 +00:00
|
|
|
if (
|
|
|
|
storage is not None
|
|
|
|
and write_tsdb
|
|
|
|
):
|
|
|
|
log.info(
|
|
|
|
f'Writing {ln} frame to storage:\n'
|
2023-06-08 22:50:15 +00:00
|
|
|
f'{next_start_dt} -> {last_start_dt}'
|
2023-04-20 17:36:52 +00:00
|
|
|
)
|
2023-05-22 15:54:36 +00:00
|
|
|
|
2023-06-29 18:03:16 +00:00
|
|
|
# always drop the src asset token for
|
|
|
|
# non-currency-pair like market types (for now)
|
|
|
|
if mkt.dst.atype not in {
|
|
|
|
'crypto',
|
|
|
|
'crypto_currency',
|
|
|
|
'fiat', # a "forex pair"
|
|
|
|
}:
|
2023-05-22 15:54:36 +00:00
|
|
|
# for now, our table key schema is not including
|
|
|
|
# the dst[/src] source asset token.
|
|
|
|
col_sym_key: str = mkt.get_fqme(
|
|
|
|
delim_char='',
|
|
|
|
without_src=True,
|
|
|
|
)
|
|
|
|
else:
|
2023-12-13 00:57:46 +00:00
|
|
|
col_sym_key: str = mkt.get_fqme(
|
|
|
|
delim_char='',
|
|
|
|
)
|
2023-05-22 15:54:36 +00:00
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
await storage.write_ohlcv(
|
2023-05-22 15:54:36 +00:00
|
|
|
col_sym_key,
|
2023-06-02 16:17:31 +00:00
|
|
|
shm.array,
|
2023-04-20 17:36:52 +00:00
|
|
|
timeframe,
|
|
|
|
)
|
2023-12-13 00:57:46 +00:00
|
|
|
df: pl.DataFrame = await storage.as_df(
|
|
|
|
fqme=mkt.fqme,
|
|
|
|
period=timeframe,
|
|
|
|
load_from_offline=False,
|
|
|
|
)
|
|
|
|
(
|
2023-12-28 15:58:22 +00:00
|
|
|
wdts,
|
2023-12-13 00:57:46 +00:00
|
|
|
deduped,
|
|
|
|
diff,
|
2023-12-13 20:08:42 +00:00
|
|
|
) = dedupe(df)
|
2023-12-28 15:58:22 +00:00
|
|
|
# if diff:
|
|
|
|
# sort_diff(df)
|
2023-12-13 00:57:46 +00:00
|
|
|
|
2023-06-08 15:16:19 +00:00
|
|
|
else:
|
|
|
|
# finally filled gap
|
|
|
|
log.info(
|
|
|
|
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
|
|
|
|
)
|
2023-06-08 22:50:15 +00:00
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
# XXX: extremely important, there can be no checkpoints
|
|
|
|
# in the block above to avoid entering new ``frames``
|
|
|
|
# values while we're pipelining the current ones to
|
|
|
|
# memory...
|
2023-06-02 16:17:31 +00:00
|
|
|
# await sampler_stream.send('broadcast_all')
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# short-circuit (for now)
|
|
|
|
bf_done.set()
|
|
|
|
|
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
# NOTE: originally this was used to cope with a tsdb (marketstore)
|
|
|
|
# which could not delivery very large frames of history over gRPC
|
|
|
|
# (thanks goolag) due to corruption issues. NOW, using apache
|
|
|
|
# parquet (by default in the local filesys) we don't have this
|
|
|
|
# requirement since the files can be loaded very quickly in
|
|
|
|
# entirety to memory via
|
2023-06-02 16:17:31 +00:00
|
|
|
async def back_load_from_tsdb(
|
|
|
|
storemod: ModuleType,
|
|
|
|
storage: StorageClient,
|
|
|
|
|
|
|
|
fqme: str,
|
|
|
|
|
|
|
|
tsdb_history: np.ndarray,
|
|
|
|
|
|
|
|
last_tsdb_dt: datetime,
|
|
|
|
latest_start_dt: datetime,
|
|
|
|
latest_end_dt: datetime,
|
|
|
|
|
|
|
|
bf_done: trio.Event,
|
|
|
|
|
|
|
|
timeframe: int,
|
|
|
|
shm: ShmArray,
|
|
|
|
):
|
|
|
|
assert len(tsdb_history)
|
|
|
|
|
|
|
|
# sync to backend history task's query/load completion
|
|
|
|
# if bf_done:
|
|
|
|
# await bf_done.wait()
|
|
|
|
|
|
|
|
# TODO: eventually it'd be nice to not require a shm array/buffer
|
|
|
|
# to accomplish this.. maybe we can do some kind of tsdb direct to
|
|
|
|
# graphics format eventually in a child-actor?
|
|
|
|
if storemod.name == 'nativedb':
|
|
|
|
return
|
|
|
|
|
2023-07-26 16:48:19 +00:00
|
|
|
await tractor.pause()
|
2023-06-02 16:17:31 +00:00
|
|
|
assert shm._first.value == 0
|
|
|
|
|
|
|
|
array = shm.array
|
|
|
|
|
|
|
|
# if timeframe == 1:
|
|
|
|
# times = shm.array['time']
|
|
|
|
# assert (times[1] - times[0]) == 1
|
|
|
|
|
|
|
|
if len(array):
|
2023-06-07 03:59:59 +00:00
|
|
|
shm_last_dt = from_timestamp(
|
2023-06-02 16:17:31 +00:00
|
|
|
shm.array[0]['time']
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
shm_last_dt = None
|
|
|
|
|
|
|
|
if last_tsdb_dt:
|
|
|
|
assert shm_last_dt >= last_tsdb_dt
|
|
|
|
|
|
|
|
# do diff against start index of last frame of history and only
|
|
|
|
# fill in an amount of datums from tsdb allows for most recent
|
|
|
|
# to be loaded into mem *before* tsdb data.
|
|
|
|
if (
|
|
|
|
last_tsdb_dt
|
|
|
|
and latest_start_dt
|
|
|
|
):
|
|
|
|
backfilled_size_s = (
|
|
|
|
latest_start_dt - last_tsdb_dt
|
|
|
|
).seconds
|
|
|
|
# if the shm buffer len is not large enough to contain
|
|
|
|
# all missing data between the most recent backend-queried frame
|
|
|
|
# and the most recent dt-index in the db we warn that we only
|
|
|
|
# want to load a portion of the next tsdb query to fill that
|
|
|
|
# space.
|
|
|
|
log.info(
|
|
|
|
f'{backfilled_size_s} seconds worth of {timeframe}s loaded'
|
|
|
|
)
|
|
|
|
|
|
|
|
# Load TSDB history into shm buffer (for display) if there is
|
|
|
|
# remaining buffer space.
|
|
|
|
|
|
|
|
time_key: str = 'time'
|
|
|
|
if getattr(storemod, 'ohlc_key_map', False):
|
|
|
|
keymap: bidict = storemod.ohlc_key_map
|
|
|
|
time_key: str = keymap.inverse['time']
|
|
|
|
|
|
|
|
# if (
|
|
|
|
# not len(tsdb_history)
|
|
|
|
# ):
|
|
|
|
# return
|
|
|
|
|
|
|
|
tsdb_last_frame_start: datetime = last_tsdb_dt
|
|
|
|
# load as much from storage into shm possible (depends on
|
|
|
|
# user's shm size settings).
|
|
|
|
while shm._first.value > 0:
|
|
|
|
|
|
|
|
tsdb_history = await storage.read_ohlcv(
|
|
|
|
fqme,
|
|
|
|
timeframe=timeframe,
|
|
|
|
end=tsdb_last_frame_start,
|
|
|
|
)
|
|
|
|
|
|
|
|
# # empty query
|
|
|
|
# if not len(tsdb_history):
|
|
|
|
# break
|
|
|
|
|
|
|
|
next_start = tsdb_history[time_key][0]
|
|
|
|
if next_start >= tsdb_last_frame_start:
|
|
|
|
# no earlier data detected
|
|
|
|
break
|
|
|
|
|
|
|
|
else:
|
|
|
|
tsdb_last_frame_start = next_start
|
|
|
|
|
2023-06-08 15:16:19 +00:00
|
|
|
# TODO: see if there's faster multi-field reads:
|
|
|
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
|
|
|
# re-index with a `time` and index field
|
|
|
|
prepend_start = shm._first.value
|
|
|
|
|
|
|
|
to_push = tsdb_history[-prepend_start:]
|
|
|
|
shm.push(
|
|
|
|
to_push,
|
|
|
|
|
|
|
|
# insert the history pre a "days worth" of samples
|
|
|
|
# to leave some real-time buffer space at the end.
|
|
|
|
prepend=True,
|
|
|
|
# update_first=False,
|
|
|
|
# start=prepend_start,
|
|
|
|
field_map=storemod.ohlc_key_map,
|
2023-06-02 16:17:31 +00:00
|
|
|
)
|
|
|
|
|
2023-06-08 15:16:19 +00:00
|
|
|
log.info(f'Loaded {to_push.shape} datums from storage')
|
|
|
|
tsdb_last_frame_start = tsdb_history[time_key][0]
|
|
|
|
|
2023-06-02 16:17:31 +00:00
|
|
|
# manually trigger step update to update charts/fsps
|
|
|
|
# which need an incremental update.
|
|
|
|
# NOTE: the way this works is super duper
|
|
|
|
# un-intuitive right now:
|
|
|
|
# - the broadcaster fires a msg to the fsp subsystem.
|
|
|
|
# - fsp subsys then checks for a sample step diff and
|
|
|
|
# possibly recomputes prepended history.
|
|
|
|
# - the fsp then sends back to the parent actor
|
|
|
|
# (usually a chart showing graphics for said fsp)
|
|
|
|
# which tells the chart to conduct a manual full
|
|
|
|
# graphics loop cycle.
|
|
|
|
# await sampler_stream.send('broadcast_all')
|
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
async def push_latest_frame(
|
2023-12-23 02:34:31 +00:00
|
|
|
# box-type only that should get packed with the datetime
|
|
|
|
# objects received for the latest history frame
|
2023-12-15 18:11:00 +00:00
|
|
|
dt_eps: list[DateTime, DateTime],
|
|
|
|
shm: ShmArray,
|
|
|
|
get_hist: Callable[
|
|
|
|
[int, datetime, datetime],
|
|
|
|
tuple[np.ndarray, str]
|
|
|
|
],
|
|
|
|
timeframe: float,
|
|
|
|
config: dict,
|
|
|
|
|
2023-12-23 02:34:31 +00:00
|
|
|
task_status: TaskStatus[
|
|
|
|
Exception | list[datetime, datetime]
|
|
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
|
|
|
) -> list[datetime, datetime] | None:
|
2023-12-15 18:11:00 +00:00
|
|
|
# get latest query's worth of history all the way
|
|
|
|
# back to what is recorded in the tsdb
|
|
|
|
try:
|
|
|
|
(
|
|
|
|
array,
|
|
|
|
mr_start_dt,
|
|
|
|
mr_end_dt,
|
|
|
|
) = await get_hist(
|
|
|
|
timeframe,
|
|
|
|
end_dt=None,
|
|
|
|
)
|
|
|
|
# so caller can access these ep values
|
|
|
|
dt_eps.extend([
|
|
|
|
mr_start_dt,
|
|
|
|
mr_end_dt,
|
|
|
|
])
|
2023-12-23 02:34:31 +00:00
|
|
|
task_status.started(dt_eps)
|
2023-12-15 18:11:00 +00:00
|
|
|
|
|
|
|
# XXX: timeframe not supported for backend (since
|
|
|
|
# above exception type), terminate immediately since
|
|
|
|
# there's no backfilling possible.
|
|
|
|
except DataUnavailable:
|
2023-12-23 02:34:31 +00:00
|
|
|
task_status.started(None)
|
2023-12-15 18:11:00 +00:00
|
|
|
|
|
|
|
if timeframe > 1:
|
|
|
|
await tractor.pause()
|
|
|
|
|
2023-12-23 02:34:31 +00:00
|
|
|
# prolly tf not supported
|
|
|
|
return None
|
2023-12-15 18:11:00 +00:00
|
|
|
|
|
|
|
# NOTE: on the first history, most recent history
|
|
|
|
# frame we PREPEND from the current shm ._last index
|
|
|
|
# and thus a gap between the earliest datum loaded here
|
|
|
|
# and the latest loaded from the tsdb may exist!
|
|
|
|
log.info(f'Pushing {array.size} to shm!')
|
|
|
|
shm.push(
|
|
|
|
array,
|
|
|
|
prepend=True, # append on first frame
|
|
|
|
)
|
|
|
|
|
2023-12-23 02:34:31 +00:00
|
|
|
return dt_eps
|
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
|
|
|
|
async def load_tsdb_hist(
|
|
|
|
storage: StorageClient,
|
|
|
|
mkt: MktPair,
|
|
|
|
timeframe: float,
|
2023-12-23 02:34:31 +00:00
|
|
|
|
|
|
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
) -> tuple[
|
|
|
|
np.ndarray,
|
|
|
|
DateTime,
|
|
|
|
DateTime,
|
|
|
|
] | None:
|
|
|
|
# loads a (large) frame of data from the tsdb depending
|
|
|
|
# on the db's query size limit; our "nativedb" (using
|
|
|
|
# parquet) generally can load the entire history into mem
|
|
|
|
# but if not then below the remaining history can be lazy
|
|
|
|
# loaded?
|
|
|
|
fqme: str = mkt.fqme
|
|
|
|
tsdb_entry: tuple[
|
|
|
|
np.ndarray,
|
|
|
|
DateTime,
|
|
|
|
DateTime,
|
|
|
|
]
|
|
|
|
try:
|
|
|
|
tsdb_entry: tuple | None = await storage.load(
|
|
|
|
fqme,
|
|
|
|
timeframe=timeframe,
|
|
|
|
)
|
|
|
|
return tsdb_entry
|
|
|
|
|
|
|
|
except TimeseriesNotFound:
|
|
|
|
log.warning(
|
|
|
|
f'No timeseries yet for {timeframe}@{fqme}'
|
|
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
async def tsdb_backfill(
|
|
|
|
mod: ModuleType,
|
2023-05-29 17:52:55 +00:00
|
|
|
storemod: ModuleType,
|
2023-06-08 15:16:19 +00:00
|
|
|
|
2023-05-31 21:56:32 +00:00
|
|
|
storage: StorageClient,
|
2023-05-17 14:19:14 +00:00
|
|
|
mkt: MktPair,
|
2023-06-07 03:59:59 +00:00
|
|
|
shm: ShmArray,
|
|
|
|
timeframe: float,
|
|
|
|
|
|
|
|
sampler_stream: tractor.MsgStream,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
task_status: TaskStatus[
|
|
|
|
tuple[ShmArray, ShmArray]
|
|
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
|
|
|
) -> None:
|
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
if timeframe not in (1, 60):
|
|
|
|
raise ValueError(
|
|
|
|
'`piker` only needs to support 1m and 1s sampling '
|
|
|
|
'but ur api is trying to deliver a longer '
|
|
|
|
f'timeframe of {timeframe} seconds..\n'
|
|
|
|
'So yuh.. dun do dat brudder.'
|
|
|
|
)
|
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
get_hist: Callable[
|
|
|
|
[int, datetime, datetime],
|
|
|
|
tuple[np.ndarray, str]
|
|
|
|
]
|
|
|
|
config: dict[str, int]
|
2023-12-15 18:11:00 +00:00
|
|
|
async with (
|
|
|
|
mod.open_history_client(
|
|
|
|
mkt,
|
|
|
|
) as (get_hist, config),
|
|
|
|
|
|
|
|
# NOTE: this sub-nursery splits to tasks for the given
|
|
|
|
# sampling rate to concurrently load offline tsdb
|
|
|
|
# timeseries as well as new data from the venue backend!
|
|
|
|
):
|
2023-12-13 00:57:46 +00:00
|
|
|
log.info(
|
|
|
|
f'`{mod}` history client returned backfill config:\n'
|
|
|
|
f'{config}\n'
|
|
|
|
)
|
2023-06-07 03:59:59 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
dt_eps: list[DateTime, DateTime] = []
|
|
|
|
async with trio.open_nursery() as tn:
|
|
|
|
tn.start_soon(
|
|
|
|
push_latest_frame,
|
|
|
|
dt_eps,
|
|
|
|
shm,
|
|
|
|
get_hist,
|
2023-06-07 03:59:59 +00:00
|
|
|
timeframe,
|
2023-12-15 18:11:00 +00:00
|
|
|
config,
|
2023-06-07 03:59:59 +00:00
|
|
|
)
|
2023-06-02 16:17:31 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
tsdb_entry: tuple = await load_tsdb_hist(
|
|
|
|
storage,
|
|
|
|
mkt,
|
|
|
|
timeframe,
|
|
|
|
)
|
2023-06-07 03:59:59 +00:00
|
|
|
|
2023-12-19 00:46:59 +00:00
|
|
|
# tell parent task to continue
|
|
|
|
# TODO: really we'd want this the other way with the
|
|
|
|
# tsdb load happening asap and the since the latest
|
|
|
|
# frame query will normally be the main source of
|
|
|
|
# latency?
|
|
|
|
task_status.started()
|
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
# NOTE: iabs to start backfilling from, reverse chronological,
|
|
|
|
# ONLY AFTER the first history frame has been pushed to
|
|
|
|
# mem!
|
2023-06-07 03:59:59 +00:00
|
|
|
backfill_gap_from_shm_index: int = shm._first.value + 1
|
|
|
|
|
2023-12-23 02:34:31 +00:00
|
|
|
# Prepend any tsdb history into the rt-shm-buffer which
|
|
|
|
# should NOW be getting filled with the most recent history
|
|
|
|
# pulled from the data-backend.
|
|
|
|
if dt_eps:
|
|
|
|
# well then, unpack the latest (gap) backfilled frame dts
|
2023-12-19 00:46:59 +00:00
|
|
|
(
|
|
|
|
mr_start_dt,
|
|
|
|
mr_end_dt,
|
|
|
|
) = dt_eps
|
|
|
|
|
2023-12-23 02:34:31 +00:00
|
|
|
# NOTE: when there's no offline data, there's 2 cases:
|
|
|
|
# - data backend doesn't support timeframe/sample
|
|
|
|
# period (in which case `dt_eps` should be `None` and
|
|
|
|
# we shouldn't be here!), or
|
|
|
|
# - no prior history has been stored (yet) and we need
|
|
|
|
# todo full backfill of the history now.
|
|
|
|
if tsdb_entry is None:
|
|
|
|
# indicate to backfill task to fill the whole
|
|
|
|
# shm buffer as much as it can!
|
|
|
|
last_tsdb_dt = None
|
|
|
|
|
|
|
|
# there's existing tsdb history from (offline) storage
|
|
|
|
# so only backfill the gap between the
|
|
|
|
# most-recent-frame (mrf) and that latest sample.
|
|
|
|
else:
|
|
|
|
(
|
|
|
|
tsdb_history,
|
|
|
|
first_tsdb_dt,
|
|
|
|
last_tsdb_dt,
|
|
|
|
) = tsdb_entry
|
2023-12-19 00:46:59 +00:00
|
|
|
|
|
|
|
# if there is a gap to backfill from the first
|
|
|
|
# history frame until the last datum loaded from the tsdb
|
|
|
|
# continue that now in the background
|
|
|
|
async with trio.open_nursery() as tn:
|
2023-06-07 03:59:59 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
bf_done = await tn.start(
|
|
|
|
partial(
|
|
|
|
start_backfill,
|
|
|
|
get_hist=get_hist,
|
|
|
|
mod=mod,
|
|
|
|
mkt=mkt,
|
|
|
|
shm=shm,
|
|
|
|
timeframe=timeframe,
|
|
|
|
|
|
|
|
backfill_from_shm_index=backfill_gap_from_shm_index,
|
|
|
|
backfill_from_dt=mr_start_dt,
|
2023-12-19 00:46:59 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
sampler_stream=sampler_stream,
|
|
|
|
backfill_until_dt=last_tsdb_dt,
|
2023-12-19 00:46:59 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
storage=storage,
|
|
|
|
write_tsdb=True,
|
|
|
|
)
|
2023-06-08 15:16:19 +00:00
|
|
|
)
|
2023-12-23 02:34:31 +00:00
|
|
|
nulls_detected: trio.Event | None = None
|
|
|
|
if last_tsdb_dt is not None:
|
|
|
|
# calc the index from which the tsdb data should be
|
|
|
|
# prepended, presuming there is a gap between the
|
|
|
|
# latest frame (loaded/read above) and the latest
|
|
|
|
# sample loaded from the tsdb.
|
|
|
|
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
|
|
|
offset_s: float = backfill_diff.in_seconds()
|
|
|
|
|
|
|
|
# XXX EDGE CASEs: the most recent frame overlaps with
|
|
|
|
# prior tsdb history!!
|
|
|
|
# - so the latest frame's start time is earlier then
|
|
|
|
# the tsdb's latest sample.
|
|
|
|
# - alternatively this may also more generally occur
|
|
|
|
# when the venue was closed (say over the weeknd)
|
|
|
|
# causing a timeseries gap, AND the query frames size
|
|
|
|
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
|
|
|
# GREATER THAN the current venue-market's operating
|
|
|
|
# session (time) we will receive datums from BEFORE THE
|
|
|
|
# CLOSURE GAP and thus the `offset_s` value will be
|
|
|
|
# NEGATIVE! In this case we need to ensure we don't try
|
|
|
|
# to push datums that have already been recorded in the
|
|
|
|
# tsdb. In this case we instead only retreive and push
|
|
|
|
# the series portion missing from the db's data set.
|
|
|
|
# if offset_s < 0:
|
|
|
|
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
|
|
|
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
|
|
|
|
|
|
|
offset_samples: int = round(offset_s / timeframe)
|
|
|
|
|
|
|
|
# TODO: see if there's faster multi-field reads:
|
|
|
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
|
|
|
# re-index with a `time` and index field
|
|
|
|
if offset_s > 0:
|
|
|
|
# NOTE XXX: ONLY when there is an actual gap
|
|
|
|
# between the earliest sample in the latest history
|
|
|
|
# frame do we want to NOT stick the latest tsdb
|
|
|
|
# history adjacent to that latest frame!
|
|
|
|
prepend_start = shm._first.value - offset_samples + 1
|
|
|
|
to_push = tsdb_history[-prepend_start:]
|
|
|
|
else:
|
|
|
|
# when there is overlap we want to remove the
|
|
|
|
# overlapping samples from the tsdb portion (taking
|
|
|
|
# instead the latest frame's values since THEY
|
|
|
|
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
|
|
|
# to the latest frame!
|
|
|
|
# TODO: assert the overlap segment array contains
|
|
|
|
# the same values!?!
|
|
|
|
prepend_start = shm._first.value
|
|
|
|
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
|
|
|
|
|
|
|
# tsdb history is so far in the past we can't fit it in
|
|
|
|
# shm buffer space so simply don't load it!
|
|
|
|
if prepend_start > 0:
|
|
|
|
shm.push(
|
|
|
|
to_push,
|
|
|
|
|
|
|
|
# insert the history pre a "days worth" of samples
|
|
|
|
# to leave some real-time buffer space at the end.
|
|
|
|
prepend=True,
|
|
|
|
# update_first=False,
|
|
|
|
start=prepend_start,
|
|
|
|
field_map=storemod.ohlc_key_map,
|
|
|
|
)
|
|
|
|
|
|
|
|
log.info(f'Loaded {to_push.shape} datums from storage')
|
|
|
|
|
|
|
|
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
|
|
|
|
# seemingly missing (null-time) segments..
|
|
|
|
# TODO: ideally these can never exist!
|
|
|
|
# -[ ] somehow it seems sometimes we're writing zero-ed
|
|
|
|
# segments to tsdbs during teardown?
|
|
|
|
# -[ ] can we ensure that the backcfiller tasks do this
|
|
|
|
# work PREVENTAVELY instead?
|
|
|
|
# -[ ] fill in non-zero epoch time values ALWAYS!
|
|
|
|
# await maybe_fill_null_segments(
|
|
|
|
nulls_detected: trio.Event = await tn.start(partial(
|
|
|
|
maybe_fill_null_segments,
|
2023-06-07 03:59:59 +00:00
|
|
|
|
2023-12-23 02:34:31 +00:00
|
|
|
shm=shm,
|
|
|
|
timeframe=timeframe,
|
|
|
|
get_hist=get_hist,
|
|
|
|
sampler_stream=sampler_stream,
|
|
|
|
mkt=mkt,
|
|
|
|
))
|
2023-06-07 03:59:59 +00:00
|
|
|
|
2023-12-19 00:46:59 +00:00
|
|
|
# 2nd nursery END
|
2023-12-13 00:57:46 +00:00
|
|
|
|
2023-12-19 00:46:59 +00:00
|
|
|
# TODO: who would want to?
|
2023-12-23 02:34:31 +00:00
|
|
|
if nulls_detected:
|
|
|
|
await nulls_detected.wait()
|
|
|
|
|
2023-12-19 00:46:59 +00:00
|
|
|
await bf_done.wait()
|
2023-12-15 18:11:00 +00:00
|
|
|
# TODO: maybe start history anal and load missing "history
|
|
|
|
# gaps" via backend..
|
2023-06-02 16:17:31 +00:00
|
|
|
|
2023-06-08 15:16:19 +00:00
|
|
|
# if len(hist_shm.array) < 2:
|
|
|
|
# TODO: there's an edge case here to solve where if the last
|
|
|
|
# frame before market close (at least on ib) was pushed and
|
|
|
|
# there was only "1 new" row pushed from the first backfill
|
|
|
|
# query-iteration, then the sample step sizing calcs will
|
|
|
|
# break upstream from here since you can't diff on at least
|
|
|
|
# 2 steps... probably should also add logic to compute from
|
|
|
|
# the tsdb series and stash that somewhere as meta data on
|
|
|
|
# the shm buffer?.. no se.
|
2023-06-07 03:59:59 +00:00
|
|
|
|
|
|
|
# backload any further data from tsdb (concurrently per
|
|
|
|
# timeframe) if not all data was able to be loaded (in memory)
|
|
|
|
# from the ``StorageClient.load()`` call above.
|
2023-12-15 18:11:00 +00:00
|
|
|
await trio.sleep_forever()
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-12-13 00:57:46 +00:00
|
|
|
# XXX NOTE: this is legacy from when we were using
|
|
|
|
# marketstore and we needed to continue backloading
|
|
|
|
# incrementally from the tsdb client.. (bc it couldn't
|
|
|
|
# handle a single large query with gRPC for some
|
|
|
|
# reason.. classic goolag pos)
|
2023-12-15 18:11:00 +00:00
|
|
|
# tn.start_soon(
|
|
|
|
# back_load_from_tsdb,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
# storemod,
|
|
|
|
# storage,
|
|
|
|
# fqme,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
# tsdb_history,
|
|
|
|
# last_tsdb_dt,
|
|
|
|
# mr_start_dt,
|
|
|
|
# mr_end_dt,
|
|
|
|
# bf_done,
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-12-15 18:11:00 +00:00
|
|
|
# timeframe,
|
|
|
|
# shm,
|
|
|
|
# )
|
2023-06-02 16:17:31 +00:00
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
async def manage_history(
|
|
|
|
mod: ModuleType,
|
2023-05-17 14:19:14 +00:00
|
|
|
mkt: MktPair,
|
2023-04-20 17:36:52 +00:00
|
|
|
some_data_ready: trio.Event,
|
|
|
|
feed_is_live: trio.Event,
|
|
|
|
timeframe: float = 60, # in seconds
|
|
|
|
|
|
|
|
task_status: TaskStatus[
|
|
|
|
tuple[ShmArray, ShmArray]
|
|
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
|
|
|
) -> None:
|
|
|
|
'''
|
|
|
|
Load and manage historical data including the loading of any
|
2023-05-29 17:52:55 +00:00
|
|
|
available series from any connected tsdb as well as conduct
|
2023-06-02 16:17:31 +00:00
|
|
|
real-time update of both that existing db and the allocated
|
|
|
|
shared memory buffer.
|
|
|
|
|
|
|
|
Init sequence:
|
|
|
|
- allocate shm (numpy array) buffers for 60s & 1s sample rates
|
|
|
|
- configure "zero index" for each buffer: the index where
|
|
|
|
history will prepended *to* and new live data will be
|
|
|
|
appened *from*.
|
|
|
|
- open a ``.storage.StorageClient`` and load any existing tsdb
|
|
|
|
history as well as (async) start a backfill task which loads
|
|
|
|
missing (newer) history from the data provider backend:
|
|
|
|
- tsdb history is loaded first and pushed to shm ASAP.
|
|
|
|
- the backfill task loads the most recent history before
|
|
|
|
unblocking its parent task, so that the `ShmArray._last` is
|
|
|
|
up to date to allow the OHLC sampler to begin writing new
|
|
|
|
samples as the correct buffer index once the provider feed
|
|
|
|
engages.
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
'''
|
|
|
|
# TODO: is there a way to make each shm file key
|
|
|
|
# actor-tree-discovery-addr unique so we avoid collisions
|
|
|
|
# when doing tests which also allocate shms for certain instruments
|
|
|
|
# that may be in use on the system by some other running daemons?
|
|
|
|
# from tractor._state import _runtime_vars
|
|
|
|
# port = _runtime_vars['_root_mailbox'][1]
|
|
|
|
|
2023-07-31 21:21:11 +00:00
|
|
|
uid: tuple = tractor.current_actor().uid
|
2023-04-20 17:36:52 +00:00
|
|
|
name, uuid = uid
|
2023-07-31 21:21:11 +00:00
|
|
|
service: str = name.rstrip(f'.{mod.name}')
|
2023-05-22 15:54:36 +00:00
|
|
|
fqme: str = mkt.get_fqme(delim_char='')
|
2023-05-17 14:19:14 +00:00
|
|
|
|
2023-04-20 17:36:52 +00:00
|
|
|
# (maybe) allocate shm array for this broker/symbol which will
|
|
|
|
# be used for fast near-term history capture and processing.
|
|
|
|
hist_shm, opened = maybe_open_shm_array(
|
2023-06-15 17:04:21 +00:00
|
|
|
size=_default_hist_size,
|
|
|
|
append_start_index=_hist_buffer_start,
|
|
|
|
|
2023-05-24 16:16:17 +00:00
|
|
|
key=f'piker.{service}[{uuid[:16]}].{fqme}.hist',
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# use any broker defined ohlc dtype:
|
2023-05-31 21:56:32 +00:00
|
|
|
dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields),
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# we expect the sub-actor to write
|
|
|
|
readonly=False,
|
|
|
|
)
|
|
|
|
hist_zero_index = hist_shm.index - 1
|
|
|
|
|
|
|
|
# TODO: history validation
|
|
|
|
if not opened:
|
|
|
|
raise RuntimeError(
|
|
|
|
"Persistent shm for sym was already open?!"
|
|
|
|
)
|
|
|
|
|
|
|
|
rt_shm, opened = maybe_open_shm_array(
|
2023-06-15 17:04:21 +00:00
|
|
|
size=_default_rt_size,
|
|
|
|
append_start_index=_rt_buffer_start,
|
2023-05-24 16:16:17 +00:00
|
|
|
key=f'piker.{service}[{uuid[:16]}].{fqme}.rt',
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# use any broker defined ohlc dtype:
|
2023-05-31 21:56:32 +00:00
|
|
|
dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields),
|
2023-04-20 17:36:52 +00:00
|
|
|
|
|
|
|
# we expect the sub-actor to write
|
|
|
|
readonly=False,
|
|
|
|
)
|
|
|
|
|
|
|
|
# (for now) set the rt (hft) shm array with space to prepend
|
|
|
|
# only a few days worth of 1s history.
|
2023-07-31 21:21:11 +00:00
|
|
|
days: int = 2
|
|
|
|
start_index: int = days*_secs_in_day
|
2023-04-20 17:36:52 +00:00
|
|
|
rt_shm._first.value = start_index
|
|
|
|
rt_shm._last.value = start_index
|
|
|
|
rt_zero_index = rt_shm.index - 1
|
|
|
|
|
|
|
|
if not opened:
|
|
|
|
raise RuntimeError(
|
|
|
|
"Persistent shm for sym was already open?!"
|
|
|
|
)
|
|
|
|
|
2023-06-02 16:17:31 +00:00
|
|
|
open_history_client = getattr(
|
|
|
|
mod,
|
|
|
|
'open_history_client',
|
|
|
|
)
|
|
|
|
assert open_history_client
|
|
|
|
|
|
|
|
# TODO: maybe it should be a subpkg of `.data`?
|
|
|
|
from piker import storage
|
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
async with (
|
|
|
|
storage.open_storage_client() as (storemod, client),
|
2023-12-15 18:11:00 +00:00
|
|
|
|
|
|
|
# NOTE: this nursery spawns a task per "timeframe" (aka
|
|
|
|
# sampling period) data set since normally differently
|
|
|
|
# sampled timeseries can be loaded / process independently
|
|
|
|
# ;)
|
2023-06-07 03:59:59 +00:00
|
|
|
trio.open_nursery() as tn,
|
|
|
|
):
|
2023-06-02 16:17:31 +00:00
|
|
|
log.info(
|
|
|
|
f'Connecting to storage backend `{storemod.name}`:\n'
|
|
|
|
f'location: {client.address}\n'
|
|
|
|
f'db cardinality: {client.cardinality}\n'
|
|
|
|
# TODO: show backend config, eg:
|
|
|
|
# - network settings
|
|
|
|
# - storage size with compression
|
|
|
|
# - number of loaded time series?
|
|
|
|
)
|
2023-04-20 17:36:52 +00:00
|
|
|
|
2023-06-02 16:17:31 +00:00
|
|
|
# NOTE: this call ONLY UNBLOCKS once the latest-most frame
|
|
|
|
# (i.e. history just before the live feed latest datum) of
|
|
|
|
# history has been loaded and written to the shm buffer:
|
|
|
|
# - the backfiller task can write in reverse chronological
|
|
|
|
# to the shm and tsdb
|
|
|
|
# - the tsdb data can be loaded immediately and the
|
|
|
|
# backfiller can do a single append from it's end datum and
|
|
|
|
# then prepends backward to that from the current time
|
|
|
|
# step.
|
2023-06-07 03:59:59 +00:00
|
|
|
tf2mem: dict = {
|
|
|
|
1: rt_shm,
|
|
|
|
60: hist_shm,
|
|
|
|
}
|
2023-06-02 16:17:31 +00:00
|
|
|
async with open_sample_stream(
|
|
|
|
period_s=1.,
|
|
|
|
shms_by_period={
|
|
|
|
1.: rt_shm.token,
|
|
|
|
60.: hist_shm.token,
|
|
|
|
},
|
|
|
|
|
|
|
|
# NOTE: we want to only open a stream for doing
|
|
|
|
# broadcasts on backfill operations, not receive the
|
|
|
|
# sample index-stream (since there's no code in this
|
|
|
|
# data feed layer that needs to consume it).
|
|
|
|
open_index_stream=True,
|
|
|
|
sub_for_broadcasts=False,
|
|
|
|
|
|
|
|
) as sample_stream:
|
2023-12-07 17:31:12 +00:00
|
|
|
# register 1s and 1m buffers with the global
|
|
|
|
# incrementer task
|
2023-06-02 16:17:31 +00:00
|
|
|
log.info(f'Connected to sampler stream: {sample_stream}')
|
|
|
|
|
2023-06-07 03:59:59 +00:00
|
|
|
for timeframe in [60, 1]:
|
2023-12-13 00:57:46 +00:00
|
|
|
await tn.start(partial(
|
2023-06-07 03:59:59 +00:00
|
|
|
tsdb_backfill,
|
2023-12-13 00:57:46 +00:00
|
|
|
mod=mod,
|
|
|
|
storemod=storemod,
|
|
|
|
storage=client,
|
|
|
|
mkt=mkt,
|
|
|
|
shm=tf2mem[timeframe],
|
|
|
|
timeframe=timeframe,
|
|
|
|
sampler_stream=sample_stream,
|
|
|
|
))
|
2023-06-07 03:59:59 +00:00
|
|
|
|
|
|
|
# indicate to caller that feed can be delivered to
|
|
|
|
# remote requesting client since we've loaded history
|
|
|
|
# data that can be used.
|
|
|
|
some_data_ready.set()
|
|
|
|
|
|
|
|
# wait for a live feed before starting the sampler.
|
|
|
|
await feed_is_live.wait()
|
|
|
|
|
2023-06-02 16:17:31 +00:00
|
|
|
# yield back after client connect with filled shm
|
2023-04-20 17:36:52 +00:00
|
|
|
task_status.started((
|
|
|
|
hist_zero_index,
|
|
|
|
hist_shm,
|
|
|
|
rt_zero_index,
|
|
|
|
rt_shm,
|
|
|
|
))
|
2023-06-02 16:17:31 +00:00
|
|
|
|
|
|
|
# history retreival loop depending on user interaction
|
|
|
|
# and thus a small RPC-prot for remotely controllinlg
|
|
|
|
# what data is loaded for viewing.
|
2023-04-20 17:36:52 +00:00
|
|
|
await trio.sleep_forever()
|
2023-12-15 20:53:02 +00:00
|
|
|
|
|
|
|
|
|
|
|
def iter_dfs_from_shms(
|
|
|
|
fqme: str
|
|
|
|
) -> Generator[
|
|
|
|
tuple[Path, ShmArray, pl.DataFrame],
|
|
|
|
None,
|
|
|
|
None,
|
|
|
|
]:
|
|
|
|
# shm buffer size table based on known sample rates
|
|
|
|
sizes: dict[str, int] = {
|
|
|
|
'hist': _default_hist_size,
|
|
|
|
'rt': _default_rt_size,
|
|
|
|
}
|
|
|
|
|
|
|
|
# load all detected shm buffer files which have the
|
|
|
|
# passed FQME pattern in the file name.
|
|
|
|
shmfiles: list[Path] = []
|
|
|
|
shmdir = Path('/dev/shm/')
|
|
|
|
|
|
|
|
for shmfile in shmdir.glob(f'*{fqme}*'):
|
|
|
|
filename: str = shmfile.name
|
|
|
|
|
|
|
|
# skip index files
|
|
|
|
if (
|
|
|
|
'_first' in filename
|
|
|
|
or '_last' in filename
|
|
|
|
):
|
|
|
|
continue
|
|
|
|
|
|
|
|
assert shmfile.is_file()
|
|
|
|
log.debug(f'Found matching shm buffer file: {filename}')
|
|
|
|
shmfiles.append(shmfile)
|
|
|
|
|
|
|
|
for shmfile in shmfiles:
|
|
|
|
|
|
|
|
# lookup array buffer size based on file suffix
|
|
|
|
# being either .rt or .hist
|
|
|
|
key: str = shmfile.name.rsplit('.')[-1]
|
|
|
|
|
|
|
|
# skip FSP buffers for now..
|
|
|
|
if key not in sizes:
|
|
|
|
continue
|
|
|
|
|
|
|
|
size: int = sizes[key]
|
|
|
|
|
|
|
|
# attach to any shm buffer, load array into polars df,
|
|
|
|
# write to local parquet file.
|
|
|
|
shm, opened = maybe_open_shm_array(
|
|
|
|
key=shmfile.name,
|
|
|
|
size=size,
|
|
|
|
dtype=def_iohlcv_fields,
|
|
|
|
readonly=True,
|
|
|
|
)
|
|
|
|
assert not opened
|
2023-12-26 22:35:38 +00:00
|
|
|
ohlcv: np.ndarray = shm.array
|
2023-12-18 16:48:33 +00:00
|
|
|
df: pl.DataFrame = np2pl(ohlcv)
|
2023-12-15 20:53:02 +00:00
|
|
|
|
|
|
|
yield (
|
|
|
|
shmfile,
|
|
|
|
shm,
|
|
|
|
df,
|
|
|
|
)
|