diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py
index 5bcc7336..4a63a0f1 100644
--- a/piker/brokers/ib/api.py
+++ b/piker/brokers/ib/api.py
@@ -1187,7 +1187,7 @@ async def load_aio_clients(
# the API TCP in `ib_insync` connection can be flaky af so instead
# retry a few times to get the client going..
connect_retries: int = 3,
- connect_timeout: float = 10,
+ connect_timeout: float = 30, # in case a remote-host
disconnect_on_exit: bool = True,
) -> dict[str, Client]:
diff --git a/piker/storage/__init__.py b/piker/storage/__init__.py
index f32f40b6..361eaadc 100644
--- a/piker/storage/__init__.py
+++ b/piker/storage/__init__.py
@@ -43,7 +43,6 @@ from typing import (
import numpy as np
-
from .. import config
from ..service import (
check_for_service,
@@ -152,7 +151,10 @@ class StorageConnectionError(ConnectionError):
'''
-def get_storagemod(name: str) -> ModuleType:
+def get_storagemod(
+ name: str,
+
+) -> ModuleType:
mod: ModuleType = import_module(
'.' + name,
'piker.storage',
@@ -165,9 +167,12 @@ def get_storagemod(name: str) -> ModuleType:
@acm
async def open_storage_client(
- backend: str | None = None,
+ backend: str|None = None,
-) -> tuple[ModuleType, StorageClient]:
+) -> tuple[
+ ModuleType,
+ StorageClient,
+]:
'''
Load the ``StorageClient`` for named backend.
@@ -267,7 +272,10 @@ async def open_tsdb_client(
from ..data.feed import maybe_open_feed
async with (
- open_storage_client() as (_, storage),
+ open_storage_client() as (
+ _,
+ storage,
+ ),
maybe_open_feed(
[fqme],
@@ -275,7 +283,7 @@ async def open_tsdb_client(
) as feed,
):
- profiler(f'opened feed for {fqme}')
+ profiler(f'opened feed for {fqme!r}')
# to_append = feed.hist_shm.array
# to_prepend = None
diff --git a/piker/storage/cli.py b/piker/storage/cli.py
index 1c8ff11b..90d5baed 100644
--- a/piker/storage/cli.py
+++ b/piker/storage/cli.py
@@ -19,16 +19,10 @@ Storage middle-ware CLIs.
"""
from __future__ import annotations
-# from datetime import datetime
-# from contextlib import (
-# AsyncExitStack,
-# )
from pathlib import Path
-from math import copysign
import time
from types import ModuleType
from typing import (
- Any,
TYPE_CHECKING,
)
@@ -47,7 +41,6 @@ from piker.data import (
ShmArray,
)
from piker import tsp
-from piker.data._formatters import BGM
from . import log
from . import (
__tsdbs__,
@@ -242,122 +235,12 @@ def anal(
trio.run(main)
-async def markup_gaps(
- fqme: str,
- timeframe: float,
- actl: AnnotCtl,
- wdts: pl.DataFrame,
- gaps: pl.DataFrame,
-
-) -> dict[int, dict]:
- '''
- Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
- with rectangles.
-
- '''
- aids: dict[int] = {}
- for i in range(gaps.height):
-
- row: pl.DataFrame = gaps[i]
-
- # the gap's RIGHT-most bar's OPEN value
- # at that time (sample) step.
- iend: int = row['index'][0]
- # dt: datetime = row['dt'][0]
- # dt_prev: datetime = row['dt_prev'][0]
- # dt_end_t: float = dt.timestamp()
-
-
- # TODO: can we eventually remove this
- # once we figure out why the epoch cols
- # don't match?
- # TODO: FIX HOW/WHY these aren't matching
- # and are instead off by 4hours (EST
- # vs. UTC?!?!)
- # end_t: float = row['time']
- # assert (
- # dt.timestamp()
- # ==
- # end_t
- # )
-
- # the gap's LEFT-most bar's CLOSE value
- # at that time (sample) step.
- prev_r: pl.DataFrame = wdts.filter(
- pl.col('index') == iend - 1
- )
- # XXX: probably a gap in the (newly sorted or de-duplicated)
- # dt-df, so we might need to re-index first..
- if prev_r.is_empty():
- await tractor.pause()
-
- istart: int = prev_r['index'][0]
- # dt_start_t: float = dt_prev.timestamp()
-
- # start_t: float = prev_r['time']
- # assert (
- # dt_start_t
- # ==
- # start_t
- # )
-
- # TODO: implement px-col width measure
- # and ensure at least as many px-cols
- # shown per rect as configured by user.
- # gap_w: float = abs((iend - istart))
- # if gap_w < 6:
- # margin: float = 6
- # iend += margin
- # istart -= margin
-
- rect_gap: float = BGM*3/8
- opn: float = row['open'][0]
- ro: tuple[float, float] = (
- # dt_end_t,
- iend + rect_gap + 1,
- opn,
- )
- cls: float = prev_r['close'][0]
- lc: tuple[float, float] = (
- # dt_start_t,
- istart - rect_gap, # + 1 ,
- cls,
- )
-
- color: str = 'dad_blue'
- diff: float = cls - opn
- sgn: float = copysign(1, diff)
- color: str = {
- -1: 'buy_green',
- 1: 'sell_red',
- }[sgn]
-
- rect_kwargs: dict[str, Any] = dict(
- fqme=fqme,
- timeframe=timeframe,
- start_pos=lc,
- end_pos=ro,
- color=color,
- )
-
- aid: int = await actl.add_rect(**rect_kwargs)
- assert aid
- aids[aid] = rect_kwargs
-
- # tell chart to redraw all its
- # graphics view layers Bo
- await actl.redraw(
- fqme=fqme,
- timeframe=timeframe,
- )
- return aids
-
-
@store.command()
def ldshm(
fqme: str,
write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
+ pdb: bool = False, # --pdb passed?
) -> None:
'''
@@ -377,7 +260,7 @@ def ldshm(
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
- debug_mode=True,
+ debug_mode=pdb,
),
open_storage_client() as (
mod,
@@ -397,17 +280,19 @@ def ldshm(
times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2])
- d2: float = float(times[-2] - times[-3])
- med: float = np.median(np.diff(times))
- if (
- d1 < 1.
- and d2 < 1.
- and med < 1.
- ):
- raise ValueError(
- f'Something is wrong with time period for {shm}:\n{times}'
- )
-
+ d2: float = 0
+ # XXX, take a median sample rate if sufficient data
+ if times.size > 2:
+ d2: float = float(times[-2] - times[-3])
+ med: float = np.median(np.diff(times))
+ if (
+ d1 < 1.
+ and d2 < 1.
+ and med < 1.
+ ):
+ raise ValueError(
+ f'Something is wrong with time period for {shm}:\n{times}'
+ )
period_s: float = float(max(d1, d2, med))
null_segs: tuple = tsp.get_null_segs(
@@ -417,7 +302,9 @@ def ldshm(
# TODO: call null-seg fixer somehow?
if null_segs:
- await tractor.pause()
+
+ if tractor._state.is_debug_mode():
+ await tractor.pause()
# async with (
# trio.open_nursery() as tn,
# mod.open_history_client(
@@ -441,11 +328,37 @@ def ldshm(
wdts,
deduped,
diff,
- ) = tsp.dedupe(
+ valid_races,
+ dq_issues,
+ ) = tsp.dedupe_ohlcv_smart(
shm_df,
- period=period_s,
)
+ # Report duplicate analysis
+ if diff > 0:
+ log.info(
+ f'Removed {diff} duplicate timestamp(s)\n'
+ )
+ if valid_races is not None:
+ identical: int = (
+ valid_races
+ .filter(pl.col('identical_bars'))
+ .height
+ )
+ monotonic: int = valid_races.height - identical
+ log.info(
+ f'Valid race conditions: {valid_races.height}\n'
+ f' - Identical bars: {identical}\n'
+ f' - Volume monotonic: {monotonic}\n'
+ )
+
+ if dq_issues is not None:
+ log.warning(
+ f'DATA QUALITY ISSUES from provider: '
+ f'{dq_issues.height} timestamp(s)\n'
+ f'{dq_issues}\n'
+ )
+
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
@@ -460,7 +373,8 @@ def ldshm(
# TODO: actually pull the exact duration
# expected for each venue operational period?
- gap_dt_unit='days',
+ # gap_dt_unit='day',
+ gap_dt_unit='day',
gap_thresh=1,
)
@@ -471,8 +385,11 @@ def ldshm(
if (
not venue_gaps.is_empty()
or (
- period_s < 60
- and not step_gaps.is_empty()
+ not step_gaps.is_empty()
+ # XXX, i presume i put this bc i was guarding
+ # for ib venue gaps?
+ # and
+ # period_s < 60
)
):
# write repaired ts to parquet-file?
@@ -521,7 +438,7 @@ def ldshm(
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
- aids: dict = await markup_gaps(
+ aids: dict = await tsp._annotate.markup_gaps(
fqme,
period_s,
actl,
@@ -534,8 +451,13 @@ def ldshm(
tf2aids[period_s] = aids
else:
- # allow interaction even when no ts problems.
- assert not diff
+ # No significant gaps to handle, but may have had
+ # duplicates removed (valid race conditions are ok)
+ if diff > 0 and dq_issues is not None:
+ log.warning(
+ 'Found duplicates with data quality issues '
+ 'but no significant time gaps!\n'
+ )
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py
index 121fcbb7..baa28c82 100644
--- a/piker/tsp/__init__.py
+++ b/piker/tsp/__init__.py
@@ -28,1435 +28,25 @@ Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic fo
stored offline (in a tsdb).
'''
-from __future__ import annotations
-from datetime import datetime
-from functools import partial
-from pathlib import Path
-from pprint import pformat
-from types import ModuleType
-from typing import (
- Callable,
- Generator,
- TYPE_CHECKING,
-)
-
-import trio
-from trio_typing import TaskStatus
-import tractor
-from pendulum import (
- Interval,
- DateTime,
- Duration,
- duration as mk_duration,
- from_timestamp,
-)
-import numpy as np
-import polars as pl
-
-from piker.brokers import NoData
-from piker.accounting import (
- MktPair,
-)
-from piker.data._util import (
- log,
-)
-from ..data._sharedmem import (
- maybe_open_shm_array,
- ShmArray,
-)
-from ..data._source import def_iohlcv_fields
-from ..data._sampling import (
- open_sample_stream,
-)
from ._anal import (
-
get_null_segs as get_null_segs,
- iter_null_segs as iter_null_segs,
- Frame as Frame,
- Seq as Seq,
-
- # codec-ish
- np2pl as np2pl,
- pl2np as pl2np,
-
- # `numpy` only
- slice_from_time as slice_from_time,
# `polars` specific
dedupe as dedupe,
- with_dts as with_dts,
detect_time_gaps as detect_time_gaps,
- sort_diff as sort_diff,
+ pl2np as pl2np,
+ np2pl as np2pl,
- # TODO:
- detect_price_gaps as detect_price_gaps
+ # `numpy` only
+ slice_from_time as slice_from_time,
)
-
-# TODO: break up all this shite into submods!
-from ..brokers._util import (
- DataUnavailable,
+from ._dedupe_smart import (
+ dedupe_ohlcv_smart as dedupe_ohlcv_smart,
)
-from ..storage import TimeseriesNotFound
-
-if TYPE_CHECKING:
- from bidict import bidict
- from ..service.marketstore import StorageClient
- # from .feed import _FeedsBus
-
-
-# `ShmArray` buffer sizing configuration:
-_mins_in_day = int(60 * 24)
-# how much is probably dependent on lifestyle
-# but we reco a buncha times (but only on a
-# run-every-other-day kinda week).
-_secs_in_day = int(60 * _mins_in_day)
-_days_in_week: int = 7
-
-_days_worth: int = 3
-_default_hist_size: int = 6 * 365 * _mins_in_day
-_hist_buffer_start = int(
- _default_hist_size - round(7 * _mins_in_day)
+from ._history import (
+ iter_dfs_from_shms as iter_dfs_from_shms,
+ manage_history as manage_history,
+)
+from ._annotate import (
+ markup_gaps as markup_gaps,
)
-
-_default_rt_size: int = _days_worth * _secs_in_day
-# NOTE: start the append index in rt buffer such that 1 day's worth
-# can be appenened before overrun.
-_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
-
-
-def diff_history(
- array: np.ndarray,
- append_until_dt: datetime | None = None,
- prepend_until_dt: datetime | None = None,
-
-) -> np.ndarray:
-
- # no diffing with tsdb dt index possible..
- if (
- prepend_until_dt is None
- and append_until_dt is None
- ):
- return array
-
- times = array['time']
-
- if append_until_dt:
- return array[times < append_until_dt.timestamp()]
- else:
- return array[times >= prepend_until_dt.timestamp()]
-
-
-# TODO: can't we just make this a sync func now?
-async def shm_push_in_between(
- shm: ShmArray,
- to_push: np.ndarray,
- prepend_index: int,
-
- update_start_on_prepend: bool = False,
-
-) -> int:
- # XXX: extremely important, there can be no checkpoints
- # in the body of this func to avoid entering new ``frames``
- # values while we're pipelining the current ones to
- # memory...
- shm.push(
- to_push,
- prepend=True,
-
- # XXX: only update the ._first index if no tsdb
- # segment was previously prepended by the
- # parent task.
- update_first=update_start_on_prepend,
-
- # XXX: only prepend from a manually calculated shm
- # index if there was already a tsdb history
- # segment prepended (since then the
- # ._first.value is going to be wayyy in the
- # past!)
- start=(
- prepend_index
- if not update_start_on_prepend
- else None
- ),
- )
-
-
-async def maybe_fill_null_segments(
- shm: ShmArray,
- timeframe: float,
- get_hist: Callable,
- sampler_stream: tractor.MsgStream,
- mkt: MktPair,
-
- task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
-
-) -> list[Frame]:
-
- null_segs_detected = trio.Event()
- task_status.started(null_segs_detected)
-
- frame: Frame = shm.array
-
- null_segs: tuple | None = get_null_segs(
- frame,
- period=timeframe,
- )
- for (
- absi_start, absi_end,
- fi_start, fi_end,
- start_t, end_t,
- start_dt, end_dt,
- ) in iter_null_segs(
- null_segs=null_segs,
- frame=frame,
- timeframe=timeframe,
- ):
-
- # XXX NOTE: ?if we get a badly ordered timestamp
- # pair, immediately stop backfilling?
- if (
- start_dt
- and
- end_dt < start_dt
- ):
- await tractor.pause()
- break
-
- (
- array,
- next_start_dt,
- next_end_dt,
- ) = await get_hist(
- timeframe,
- start_dt=start_dt,
- end_dt=end_dt,
- )
-
- # XXX TODO: pretty sure if i plot tsla, btcusdt.binance
- # and mnq.cme.ib this causes a Qt crash XXDDD
-
- # make sure we don't overrun the buffer start
- len_to_push: int = min(absi_end, array.size)
- to_push: np.ndarray = array[-len_to_push:]
-
- await shm_push_in_between(
- shm,
- to_push,
- prepend_index=absi_end,
- update_start_on_prepend=False,
- )
- # TODO: UI side needs IPC event to update..
- # - make sure the UI actually always handles
- # this update!
- # - remember that in the display side, only refersh this
- # if the respective history is actually "in view".
- # loop
- try:
- await sampler_stream.send({
- 'broadcast_all': {
-
- # XXX NOTE XXX: see the
- # `.ui._display.increment_history_view()` if block
- # that looks for this info to FORCE a hard viz
- # redraw!
- 'backfilling': (mkt.fqme, timeframe),
- },
- })
- except tractor.ContextCancelled:
- # log.exception
- await tractor.pause()
- raise
-
- null_segs_detected.set()
- # RECHECK for more null-gaps
- frame: Frame = shm.array
- null_segs: tuple | None = get_null_segs(
- frame,
- period=timeframe,
- )
- if (
- null_segs
- and
- len(null_segs[-1])
- ):
- (
- iabs_slices,
- iabs_zero_rows,
- zero_t,
- ) = null_segs
- log.warning(
- f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
- f'{pformat(iabs_slices)}'
- )
-
- # TODO: always backfill gaps with the earliest (price) datum's
- # value to avoid the y-ranger including zeros and completely
- # stretching the y-axis..
- # array: np.ndarray = shm.array
- # zeros = array[array['low'] == 0]
- ohlc_fields: list[str] = [
- 'open',
- 'high',
- 'low',
- 'close',
- ]
-
- for istart, istop in iabs_slices:
-
- # get view into buffer for null-segment
- gap: np.ndarray = shm._array[istart:istop]
-
- # copy the oldest OHLC samples forward
- cls: float = shm._array[istart]['close']
-
- # TODO: how can we mark this range as being a gap tho?
- # -[ ] maybe pg finally supports nulls in ndarray to
- # show empty space somehow?
- # -[ ] we could put a special value in the vlm or
- # another col/field to denote?
- gap[ohlc_fields] = cls
-
- start_t: float = shm._array[istart]['time']
- t_diff: float = (istop - istart)*timeframe
-
- gap['time'] = np.arange(
- start=start_t,
- stop=start_t + t_diff,
- step=timeframe,
- )
-
- # TODO: reimpl using the new `.ui._remote_ctl` ctx
- # ideally using some kinda decent
- # tractory-reverse-lookup-connnection from some other
- # `Context` type thingy?
- await sampler_stream.send({
- 'broadcast_all': {
-
- # XXX NOTE XXX: see the
- # `.ui._display.increment_history_view()` if block
- # that looks for this info to FORCE a hard viz
- # redraw!
- 'backfilling': (mkt.fqme, timeframe),
- },
- })
-
- # TODO: interatively step through any remaining
- # time-gaps/null-segments and spawn piecewise backfiller
- # tasks in a nursery?
- # -[ ] not sure that's going to work so well on the ib
- # backend but worth a shot?
- # -[ ] mk new history connections to make it properly
- # parallel possible no matter the backend?
- # -[ ] fill algo: do queries in alternating "latest, then
- # earliest, then latest.. etc?"
-
-
-async def start_backfill(
- get_hist,
- def_frame_duration: Duration,
- mod: ModuleType,
- mkt: MktPair,
- shm: ShmArray,
- timeframe: float,
-
- backfill_from_shm_index: int,
- backfill_from_dt: datetime,
-
- sampler_stream: tractor.MsgStream,
-
- backfill_until_dt: datetime | None = None,
- storage: StorageClient | None = None,
-
- write_tsdb: bool = True,
-
- task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
-
-) -> int:
-
- # let caller unblock and deliver latest history frame
- # and use to signal that backfilling the shm gap until
- # the tsdb end is complete!
- bf_done = trio.Event()
- task_status.started(bf_done)
-
- # based on the sample step size, maybe load a certain amount history
- update_start_on_prepend: bool = False
- if backfill_until_dt is None:
-
- # TODO: per-provider default history-durations?
- # -[ ] inside the `open_history_client()` config allow
- # declaring the history duration limits instead of
- # guessing and/or applying the same limits to all?
- #
- # -[ ] allow declaring (default) per-provider backfill
- # limits inside a [storage] sub-section in conf.toml?
- #
- # NOTE, when no tsdb "last datum" is provided, we just
- # load some near-term history by presuming a "decently
- # large" 60s duration limit and a much shorter 1s range.
- periods = {
- 1: {'days': 2},
- 60: {'years': 6},
- }
- period_duration: int = periods[timeframe]
- update_start_on_prepend: bool = True
-
- # NOTE: manually set the "latest" datetime which we intend to
- # backfill history "until" so as to adhere to the history
- # settings above when the tsdb is detected as being empty.
- backfill_until_dt = backfill_from_dt.subtract(**period_duration)
-
- # STAGE NOTE: "backward history gap filling":
- # - we push to the shm buffer until we have history back
- # until the latest entry loaded from the tsdb's table B)
- # - after this loop continue to check for other gaps in the
- # (tsdb) history and (at least report) maybe fill them
- # from new frame queries to the backend?
- last_start_dt: datetime = backfill_from_dt
- next_prepend_index: int = backfill_from_shm_index
-
- while last_start_dt > backfill_until_dt:
- log.info(
- f'Requesting {timeframe}s frame:\n'
- f'backfill_until_dt: {backfill_until_dt}\n'
- f'last_start_dt: {last_start_dt}\n'
- )
- try:
- (
- array,
- next_start_dt,
- next_end_dt,
- ) = await get_hist(
- timeframe,
- end_dt=last_start_dt,
- )
- except NoData as _daterr:
- orig_last_start_dt: datetime = last_start_dt
- gap_report: str = (
- f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
- f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
- f'last_start_dt: {orig_last_start_dt}\n\n'
- f'bf_until: {backfill_until_dt}\n'
- )
- # EMPTY FRAME signal with 3 (likely) causes:
- #
- # 1. range contains legit gap in venue history
- # 2. history actually (edge case) **began** at the
- # value `last_start_dt`
- # 3. some other unknown error (ib blocking the
- # history-query bc they don't want you seeing how
- # they cucked all the tinas.. like with options
- # hist)
- #
- if def_frame_duration:
- # decrement by a duration's (frame) worth of time
- # as maybe indicated by the backend to see if we
- # can get older data before this possible
- # "history gap".
- last_start_dt: datetime = last_start_dt.subtract(
- seconds=def_frame_duration.total_seconds()
- )
- gap_report += (
- f'Decrementing `end_dt` and retrying with,\n'
- f'def_frame_duration: {def_frame_duration}\n'
- f'(new) last_start_dt: {last_start_dt}\n'
- )
- log.warning(gap_report)
- # skip writing to shm/tsdb and try the next
- # duration's worth of prior history.
- continue
-
- else:
- # await tractor.pause()
- raise DataUnavailable(gap_report)
-
- # broker says there never was or is no more history to pull
- except DataUnavailable as due:
- message: str = due.args[0]
- log.warning(
- f'Provider {mod.name!r} halted backfill due to,\n\n'
-
- f'{message}\n'
-
- f'fqme: {mkt.fqme}\n'
- f'timeframe: {timeframe}\n'
- f'last_start_dt: {last_start_dt}\n'
- f'bf_until: {backfill_until_dt}\n'
- )
- # UGH: what's a better way?
- # TODO: backends are responsible for being correct on
- # this right!?
- # -[ ] in the `ib` case we could maybe offer some way
- # to halt the request loop until the condition is
- # resolved or should the backend be entirely in
- # charge of solving such faults? yes, right?
- return
-
- time: np.ndarray = array['time']
- assert (
- time[0]
- ==
- next_start_dt.timestamp()
- )
-
- assert time[-1] == next_end_dt.timestamp()
-
- expected_dur: Interval = last_start_dt - next_start_dt
-
- # frame's worth of sample-period-steps, in seconds
- frame_size_s: float = len(array) * timeframe
- recv_frame_dur: Duration = (
- from_timestamp(array[-1]['time'])
- -
- from_timestamp(array[0]['time'])
- )
- if (
- (lt_frame := (recv_frame_dur < expected_dur))
- or
- (null_frame := (frame_size_s == 0))
- # ^XXX, should NEVER hit now!
- ):
- # XXX: query result includes a start point prior to our
- # expected "frame size" and thus is likely some kind of
- # history gap (eg. market closed period, outage, etc.)
- # so just report it to console for now.
- if lt_frame:
- reason = 'Possible GAP (or first-datum)'
- else:
- assert null_frame
- reason = 'NULL-FRAME'
-
- missing_dur: Interval = expected_dur.end - recv_frame_dur.end
- log.warning(
- f'{timeframe}s-series {reason} detected!\n'
- f'fqme: {mkt.fqme}\n'
- f'last_start_dt: {last_start_dt}\n\n'
- f'recv interval: {recv_frame_dur}\n'
- f'expected interval: {expected_dur}\n\n'
-
- f'Missing duration of history of {missing_dur.in_words()!r}\n'
- f'{missing_dur}\n'
- )
- # await tractor.pause()
-
- to_push = diff_history(
- array,
- prepend_until_dt=backfill_until_dt,
- )
- ln: int = len(to_push)
- if ln:
- log.info(
- f'{ln} bars for {next_start_dt} -> {last_start_dt}'
- )
-
- else:
- log.warning(
- '0 BARS TO PUSH after diff!?\n'
- f'{next_start_dt} -> {last_start_dt}'
- )
-
- # bail gracefully on shm allocation overrun/full
- # condition
- try:
- await shm_push_in_between(
- shm,
- to_push,
- prepend_index=next_prepend_index,
- update_start_on_prepend=update_start_on_prepend,
- )
- await sampler_stream.send({
- 'broadcast_all': {
- 'backfilling': (mkt.fqme, timeframe),
- },
- })
-
- # decrement next prepend point
- next_prepend_index = next_prepend_index - ln
- last_start_dt = next_start_dt
-
- except ValueError as ve:
- _ve = ve
- log.error(
- f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?'
- )
-
- if next_prepend_index < ln:
- log.warning(
- f'Shm buffer can only hold {next_prepend_index} more rows..\n'
- f'Appending those from recent {ln}-sized frame, no more!'
- )
-
- to_push = to_push[-next_prepend_index + 1:]
- await shm_push_in_between(
- shm,
- to_push,
- prepend_index=next_prepend_index,
- update_start_on_prepend=update_start_on_prepend,
- )
- await sampler_stream.send({
- 'broadcast_all': {
- 'backfilling': (mkt.fqme, timeframe),
- },
- })
-
- # can't push the entire frame? so
- # push only the amount that can fit..
- break
-
- log.info(
- f'Shm pushed {ln} frame:\n'
- f'{next_start_dt} -> {last_start_dt}'
- )
-
- # FINALLY, maybe write immediately to the tsdb backend for
- # long-term storage.
- if (
- storage is not None
- and
- write_tsdb
- ):
- log.info(
- f'Writing {ln} frame to storage:\n'
- f'{next_start_dt} -> {last_start_dt}'
- )
-
- # NOTE, always drop the src asset token for
- # non-currency-pair like market types (for now)
- #
- # THAT IS, for now our table key schema is NOT
- # including the dst[/src] source asset token. SO,
- # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
- # historical reasons ONLY.
- if mkt.dst.atype not in {
- 'crypto',
- 'crypto_currency',
- 'fiat', # a "forex pair"
- 'perpetual_future', # stupid "perps" from cex land
- }:
- col_sym_key: str = mkt.get_fqme(
- delim_char='',
- without_src=True,
- )
- else:
- col_sym_key: str = mkt.get_fqme(
- delim_char='',
- )
-
- await storage.write_ohlcv(
- col_sym_key,
- shm.array,
- timeframe,
- )
- df: pl.DataFrame = await storage.as_df(
- fqme=mkt.fqme,
- period=timeframe,
- load_from_offline=False,
- )
- (
- wdts,
- deduped,
- diff,
- ) = dedupe(df)
- # if diff:
- # sort_diff(df)
-
- else:
- # finally filled gap
- log.info(
- f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
- )
-
- # XXX: extremely important, there can be no checkpoints
- # in the block above to avoid entering new ``frames``
- # values while we're pipelining the current ones to
- # memory...
- # await sampler_stream.send('broadcast_all')
-
- # short-circuit (for now)
- bf_done.set()
-
-
-# NOTE: originally this was used to cope with a tsdb (marketstore)
-# which could not delivery very large frames of history over gRPC
-# (thanks goolag) due to corruption issues. NOW, using apache
-# parquet (by default in the local filesys) we don't have this
-# requirement since the files can be loaded very quickly in
-# entirety to memory via
-async def back_load_from_tsdb(
- storemod: ModuleType,
- storage: StorageClient,
-
- fqme: str,
-
- tsdb_history: np.ndarray,
-
- last_tsdb_dt: datetime,
- latest_start_dt: datetime,
- latest_end_dt: datetime,
-
- bf_done: trio.Event,
-
- timeframe: int,
- shm: ShmArray,
-):
- assert len(tsdb_history)
-
- # sync to backend history task's query/load completion
- # if bf_done:
- # await bf_done.wait()
-
- # TODO: eventually it'd be nice to not require a shm array/buffer
- # to accomplish this.. maybe we can do some kind of tsdb direct to
- # graphics format eventually in a child-actor?
- if storemod.name == 'nativedb':
- return
-
- await tractor.pause()
- assert shm._first.value == 0
-
- array = shm.array
-
- # if timeframe == 1:
- # times = shm.array['time']
- # assert (times[1] - times[0]) == 1
-
- if len(array):
- shm_last_dt = from_timestamp(
- shm.array[0]['time']
- )
- else:
- shm_last_dt = None
-
- if last_tsdb_dt:
- assert shm_last_dt >= last_tsdb_dt
-
- # do diff against start index of last frame of history and only
- # fill in an amount of datums from tsdb allows for most recent
- # to be loaded into mem *before* tsdb data.
- if (
- last_tsdb_dt
- and latest_start_dt
- ):
- backfilled_size_s: Duration = (
- latest_start_dt - last_tsdb_dt
- ).seconds
- # if the shm buffer len is not large enough to contain
- # all missing data between the most recent backend-queried frame
- # and the most recent dt-index in the db we warn that we only
- # want to load a portion of the next tsdb query to fill that
- # space.
- log.info(
- f'{backfilled_size_s} seconds worth of {timeframe}s loaded'
- )
-
- # Load TSDB history into shm buffer (for display) if there is
- # remaining buffer space.
-
- time_key: str = 'time'
- if getattr(storemod, 'ohlc_key_map', False):
- keymap: bidict = storemod.ohlc_key_map
- time_key: str = keymap.inverse['time']
-
- # if (
- # not len(tsdb_history)
- # ):
- # return
-
- tsdb_last_frame_start: datetime = last_tsdb_dt
- # load as much from storage into shm possible (depends on
- # user's shm size settings).
- while shm._first.value > 0:
-
- tsdb_history = await storage.read_ohlcv(
- fqme,
- timeframe=timeframe,
- end=tsdb_last_frame_start,
- )
-
- # # empty query
- # if not len(tsdb_history):
- # break
-
- next_start = tsdb_history[time_key][0]
- if next_start >= tsdb_last_frame_start:
- # no earlier data detected
- break
-
- else:
- tsdb_last_frame_start = next_start
-
- # TODO: see if there's faster multi-field reads:
- # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
- # re-index with a `time` and index field
- prepend_start = shm._first.value
-
- to_push = tsdb_history[-prepend_start:]
- shm.push(
- to_push,
-
- # insert the history pre a "days worth" of samples
- # to leave some real-time buffer space at the end.
- prepend=True,
- # update_first=False,
- # start=prepend_start,
- field_map=storemod.ohlc_key_map,
- )
-
- log.info(f'Loaded {to_push.shape} datums from storage')
- tsdb_last_frame_start = tsdb_history[time_key][0]
-
- # manually trigger step update to update charts/fsps
- # which need an incremental update.
- # NOTE: the way this works is super duper
- # un-intuitive right now:
- # - the broadcaster fires a msg to the fsp subsystem.
- # - fsp subsys then checks for a sample step diff and
- # possibly recomputes prepended history.
- # - the fsp then sends back to the parent actor
- # (usually a chart showing graphics for said fsp)
- # which tells the chart to conduct a manual full
- # graphics loop cycle.
- # await sampler_stream.send('broadcast_all')
-
-
-async def push_latest_frame(
- # box-type only that should get packed with the datetime
- # objects received for the latest history frame
- dt_eps: list[DateTime, DateTime],
- shm: ShmArray,
- get_hist: Callable[
- [int, datetime, datetime],
- tuple[np.ndarray, str]
- ],
- timeframe: float,
- config: dict,
-
- task_status: TaskStatus[
- Exception | list[datetime, datetime]
- ] = trio.TASK_STATUS_IGNORED,
-
-) -> list[datetime, datetime] | None:
- # get latest query's worth of history all the way
- # back to what is recorded in the tsdb
- try:
- (
- array,
- mr_start_dt,
- mr_end_dt,
- ) = await get_hist(
- timeframe,
- end_dt=None,
- )
- # so caller can access these ep values
- dt_eps.extend([
- mr_start_dt,
- mr_end_dt,
- ])
- task_status.started(dt_eps)
-
- # XXX: timeframe not supported for backend (since
- # above exception type), terminate immediately since
- # there's no backfilling possible.
- except DataUnavailable:
- task_status.started(None)
-
- if timeframe > 1:
- await tractor.pause()
-
- # prolly tf not supported
- return None
-
- # NOTE: on the first history, most recent history
- # frame we PREPEND from the current shm ._last index
- # and thus a gap between the earliest datum loaded here
- # and the latest loaded from the tsdb may exist!
- log.info(f'Pushing {array.size} to shm!')
- shm.push(
- array,
- prepend=True, # append on first frame
- )
-
- return dt_eps
-
-
-async def load_tsdb_hist(
- storage: StorageClient,
- mkt: MktPair,
- timeframe: float,
-
- task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
-
-) -> tuple[
- np.ndarray,
- DateTime,
- DateTime,
-] | None:
- # loads a (large) frame of data from the tsdb depending
- # on the db's query size limit; our "nativedb" (using
- # parquet) generally can load the entire history into mem
- # but if not then below the remaining history can be lazy
- # loaded?
- fqme: str = mkt.fqme
- tsdb_entry: tuple[
- np.ndarray,
- DateTime,
- DateTime,
- ]
- try:
- tsdb_entry: tuple | None = await storage.load(
- fqme,
- timeframe=timeframe,
- )
- return tsdb_entry
-
- except TimeseriesNotFound:
- log.warning(
- f'No timeseries yet for {timeframe}@{fqme}'
- )
- return None
-
-
-async def tsdb_backfill(
- mod: ModuleType,
- storemod: ModuleType,
-
- storage: StorageClient,
- mkt: MktPair,
- shm: ShmArray,
- timeframe: float,
-
- sampler_stream: tractor.MsgStream,
-
- task_status: TaskStatus[
- tuple[ShmArray, ShmArray]
- ] = trio.TASK_STATUS_IGNORED,
-
-) -> None:
-
- if timeframe not in (1, 60):
- raise ValueError(
- '`piker` only needs to support 1m and 1s sampling '
- 'but ur api is trying to deliver a longer '
- f'timeframe of {timeframe} seconds..\n'
- 'So yuh.. dun do dat brudder.'
- )
-
- get_hist: Callable[
- [int, datetime, datetime],
- tuple[np.ndarray, str]
- ]
- config: dict[str, int]
- async with (
- mod.open_history_client(
- mkt,
- ) as (get_hist, config),
-
- # NOTE: this sub-nursery splits to tasks for the given
- # sampling rate to concurrently load offline tsdb
- # timeseries as well as new data from the venue backend!
- ):
- log.info(
- f'`{mod}` history client returned backfill config:\n'
- f'{pformat(config)}\n'
- )
-
- # concurrently load the provider's most-recent-frame AND any
- # pre-existing tsdb history already saved in `piker` storage.
- dt_eps: list[DateTime, DateTime] = []
- async with (
- tractor.trionics.collapse_eg(),
- trio.open_nursery() as tn
- ):
- tn.start_soon(
- push_latest_frame,
- dt_eps,
- shm,
- get_hist,
- timeframe,
- config,
- )
- tsdb_entry: tuple = await load_tsdb_hist(
- storage,
- mkt,
- timeframe,
- )
-
- # tell parent task to continue
- # TODO: really we'd want this the other way with the
- # tsdb load happening asap and the since the latest
- # frame query will normally be the main source of
- # latency?
- task_status.started()
-
- # NOTE: iabs to start backfilling from, reverse chronological,
- # ONLY AFTER the first history frame has been pushed to
- # mem!
- backfill_gap_from_shm_index: int = shm._first.value + 1
-
- # Prepend any tsdb history into the rt-shm-buffer which
- # should NOW be getting filled with the most recent history
- # pulled from the data-backend.
- if dt_eps:
- # well then, unpack the latest (gap) backfilled frame dts
- (
- mr_start_dt,
- mr_end_dt,
- ) = dt_eps
-
- first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
- calced_frame_size: Duration = mk_duration(
- seconds=first_frame_dur_s,
- )
- # NOTE, attempt to use the backend declared default frame
- # sizing (as allowed by their time-series query APIs) and
- # if not provided try to construct a default from the
- # first frame received above.
- def_frame_durs: dict[
- int,
- Duration,
- ]|None = config.get('frame_types', None)
-
- if def_frame_durs:
- def_frame_size: Duration = def_frame_durs[timeframe]
-
- if def_frame_size != calced_frame_size:
- log.warning(
- f'Expected frame size {def_frame_size}\n'
- f'Rxed frame {calced_frame_size}\n'
- )
- # await tractor.pause()
- else:
- # use what we calced from first frame above.
- def_frame_size = calced_frame_size
-
- # NOTE: when there's no offline data, there's 2 cases:
- # - data backend doesn't support timeframe/sample
- # period (in which case `dt_eps` should be `None` and
- # we shouldn't be here!), or
- # - no prior history has been stored (yet) and we need
- # todo full backfill of the history now.
- if tsdb_entry is None:
- # indicate to backfill task to fill the whole
- # shm buffer as much as it can!
- last_tsdb_dt = None
-
- # there's existing tsdb history from (offline) storage
- # so only backfill the gap between the
- # most-recent-frame (mrf) and that latest sample.
- else:
- (
- tsdb_history,
- first_tsdb_dt,
- last_tsdb_dt,
- ) = tsdb_entry
-
- # if there is a gap to backfill from the first
- # history frame until the last datum loaded from the tsdb
- # continue that now in the background
- async with trio.open_nursery(
- strict_exception_groups=False,
- ) as tn:
-
- bf_done = await tn.start(
- partial(
- start_backfill,
- get_hist=get_hist,
- def_frame_duration=def_frame_size,
- mod=mod,
- mkt=mkt,
- shm=shm,
- timeframe=timeframe,
-
- backfill_from_shm_index=backfill_gap_from_shm_index,
- backfill_from_dt=mr_start_dt,
-
- sampler_stream=sampler_stream,
- backfill_until_dt=last_tsdb_dt,
-
- storage=storage,
- write_tsdb=True,
- )
- )
- nulls_detected: trio.Event | None = None
- if last_tsdb_dt is not None:
- # calc the index from which the tsdb data should be
- # prepended, presuming there is a gap between the
- # latest frame (loaded/read above) and the latest
- # sample loaded from the tsdb.
- backfill_diff: Duration = mr_start_dt - last_tsdb_dt
- offset_s: float = backfill_diff.in_seconds()
-
- # XXX EDGE CASEs: the most recent frame overlaps with
- # prior tsdb history!!
- # - so the latest frame's start time is earlier then
- # the tsdb's latest sample.
- # - alternatively this may also more generally occur
- # when the venue was closed (say over the weeknd)
- # causing a timeseries gap, AND the query frames size
- # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
- # GREATER THAN the current venue-market's operating
- # session (time) we will receive datums from BEFORE THE
- # CLOSURE GAP and thus the `offset_s` value will be
- # NEGATIVE! In this case we need to ensure we don't try
- # to push datums that have already been recorded in the
- # tsdb. In this case we instead only retreive and push
- # the series portion missing from the db's data set.
- # if offset_s < 0:
- # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
- # non_overlap_offset_s: float = backfill_diff.in_seconds()
-
- offset_samples: int = round(offset_s / timeframe)
-
- # TODO: see if there's faster multi-field reads:
- # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
- # re-index with a `time` and index field
- if offset_s > 0:
- # NOTE XXX: ONLY when there is an actual gap
- # between the earliest sample in the latest history
- # frame do we want to NOT stick the latest tsdb
- # history adjacent to that latest frame!
- prepend_start = shm._first.value - offset_samples + 1
- to_push = tsdb_history[-prepend_start:]
- else:
- # when there is overlap we want to remove the
- # overlapping samples from the tsdb portion (taking
- # instead the latest frame's values since THEY
- # SHOULD BE THE SAME) and prepend DIRECTLY adjacent
- # to the latest frame!
- # TODO: assert the overlap segment array contains
- # the same values!?!
- prepend_start = shm._first.value
- to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
-
- # tsdb history is so far in the past we can't fit it in
- # shm buffer space so simply don't load it!
- if prepend_start > 0:
- shm.push(
- to_push,
-
- # insert the history pre a "days worth" of samples
- # to leave some real-time buffer space at the end.
- prepend=True,
- # update_first=False,
- start=prepend_start,
- field_map=storemod.ohlc_key_map,
- )
-
- log.info(f'Loaded {to_push.shape} datums from storage')
-
- # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
- # seemingly missing (null-time) segments..
- # TODO: ideally these can never exist!
- # -[ ] somehow it seems sometimes we're writing zero-ed
- # segments to tsdbs during teardown?
- # -[ ] can we ensure that the backcfiller tasks do this
- # work PREVENTAVELY instead?
- # -[ ] fill in non-zero epoch time values ALWAYS!
- # await maybe_fill_null_segments(
- nulls_detected: trio.Event = await tn.start(partial(
- maybe_fill_null_segments,
-
- shm=shm,
- timeframe=timeframe,
- get_hist=get_hist,
- sampler_stream=sampler_stream,
- mkt=mkt,
- ))
-
- # 2nd nursery END
-
- # TODO: who would want to?
- if nulls_detected:
- await nulls_detected.wait()
-
- await bf_done.wait()
- # TODO: maybe start history anal and load missing "history
- # gaps" via backend..
-
- # if len(hist_shm.array) < 2:
- # TODO: there's an edge case here to solve where if the last
- # frame before market close (at least on ib) was pushed and
- # there was only "1 new" row pushed from the first backfill
- # query-iteration, then the sample step sizing calcs will
- # break upstream from here since you can't diff on at least
- # 2 steps... probably should also add logic to compute from
- # the tsdb series and stash that somewhere as meta data on
- # the shm buffer?.. no se.
-
- # backload any further data from tsdb (concurrently per
- # timeframe) if not all data was able to be loaded (in memory)
- # from the ``StorageClient.load()`` call above.
- await trio.sleep_forever()
-
- # XXX NOTE: this is legacy from when we were using
- # marketstore and we needed to continue backloading
- # incrementally from the tsdb client.. (bc it couldn't
- # handle a single large query with gRPC for some
- # reason.. classic goolag pos)
- # tn.start_soon(
- # back_load_from_tsdb,
-
- # storemod,
- # storage,
- # fqme,
-
- # tsdb_history,
- # last_tsdb_dt,
- # mr_start_dt,
- # mr_end_dt,
- # bf_done,
-
- # timeframe,
- # shm,
- # )
-
-
-async def manage_history(
- mod: ModuleType,
- mkt: MktPair,
- some_data_ready: trio.Event,
- feed_is_live: trio.Event,
- timeframe: float = 60, # in seconds
-
- task_status: TaskStatus[
- tuple[ShmArray, ShmArray]
- ] = trio.TASK_STATUS_IGNORED,
-
-) -> None:
- '''
- Load and manage historical data including the loading of any
- available series from any connected tsdb as well as conduct
- real-time update of both that existing db and the allocated
- shared memory buffer.
-
- Init sequence:
- - allocate shm (numpy array) buffers for 60s & 1s sample rates
- - configure "zero index" for each buffer: the index where
- history will prepended *to* and new live data will be
- appened *from*.
- - open a ``.storage.StorageClient`` and load any existing tsdb
- history as well as (async) start a backfill task which loads
- missing (newer) history from the data provider backend:
- - tsdb history is loaded first and pushed to shm ASAP.
- - the backfill task loads the most recent history before
- unblocking its parent task, so that the `ShmArray._last` is
- up to date to allow the OHLC sampler to begin writing new
- samples as the correct buffer index once the provider feed
- engages.
-
- '''
- # TODO: is there a way to make each shm file key
- # actor-tree-discovery-addr unique so we avoid collisions
- # when doing tests which also allocate shms for certain instruments
- # that may be in use on the system by some other running daemons?
- # from tractor._state import _runtime_vars
- # port = _runtime_vars['_root_mailbox'][1]
-
- uid: tuple = tractor.current_actor().uid
- name, uuid = uid
- service: str = name.rstrip(f'.{mod.name}')
- fqme: str = mkt.get_fqme(delim_char='')
-
- # (maybe) allocate shm array for this broker/symbol which will
- # be used for fast near-term history capture and processing.
- hist_shm, opened = maybe_open_shm_array(
- size=_default_hist_size,
- append_start_index=_hist_buffer_start,
-
- key=f'piker.{service}[{uuid[:16]}].{fqme}.hist',
-
- # use any broker defined ohlc dtype:
- dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields),
-
- # we expect the sub-actor to write
- readonly=False,
- )
- hist_zero_index = hist_shm.index - 1
-
- # TODO: history validation
- if not opened:
- raise RuntimeError(
- "Persistent shm for sym was already open?!"
- )
-
- rt_shm, opened = maybe_open_shm_array(
- size=_default_rt_size,
- append_start_index=_rt_buffer_start,
- key=f'piker.{service}[{uuid[:16]}].{fqme}.rt',
-
- # use any broker defined ohlc dtype:
- dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields),
-
- # we expect the sub-actor to write
- readonly=False,
- )
-
- # (for now) set the rt (hft) shm array with space to prepend
- # only a few days worth of 1s history.
- days: int = 2
- start_index: int = days*_secs_in_day
- rt_shm._first.value = start_index
- rt_shm._last.value = start_index
- rt_zero_index = rt_shm.index - 1
-
- if not opened:
- raise RuntimeError(
- "Persistent shm for sym was already open?!"
- )
-
- open_history_client = getattr(
- mod,
- 'open_history_client',
- )
- assert open_history_client
-
- # TODO: maybe it should be a subpkg of `.data`?
- from piker import storage
-
- async with (
- storage.open_storage_client() as (storemod, client),
-
- # NOTE: this nursery spawns a task per "timeframe" (aka
- # sampling period) data set since normally differently
- # sampled timeseries can be loaded / process independently
- # ;)
- tractor.trionics.collapse_eg(),
- trio.open_nursery() as tn,
- ):
- log.info(
- f'Connecting to storage backend `{storemod.name}`:\n'
- f'location: {client.address}\n'
- f'db cardinality: {client.cardinality}\n'
- # TODO: show backend config, eg:
- # - network settings
- # - storage size with compression
- # - number of loaded time series?
- )
-
- # NOTE: this call ONLY UNBLOCKS once the latest-most frame
- # (i.e. history just before the live feed latest datum) of
- # history has been loaded and written to the shm buffer:
- # - the backfiller task can write in reverse chronological
- # to the shm and tsdb
- # - the tsdb data can be loaded immediately and the
- # backfiller can do a single append from it's end datum and
- # then prepends backward to that from the current time
- # step.
- tf2mem: dict = {
- 1: rt_shm,
- 60: hist_shm,
- }
- async with open_sample_stream(
- period_s=1.,
- shms_by_period={
- 1.: rt_shm.token,
- 60.: hist_shm.token,
- },
-
- # NOTE: we want to only open a stream for doing
- # broadcasts on backfill operations, not receive the
- # sample index-stream (since there's no code in this
- # data feed layer that needs to consume it).
- open_index_stream=True,
- sub_for_broadcasts=False,
-
- ) as sample_stream:
- # register 1s and 1m buffers with the global
- # incrementer task
- log.info(f'Connected to sampler stream: {sample_stream}')
-
- for timeframe in [60, 1]:
- await tn.start(partial(
- tsdb_backfill,
- mod=mod,
- storemod=storemod,
- storage=client,
- mkt=mkt,
- shm=tf2mem[timeframe],
- timeframe=timeframe,
- sampler_stream=sample_stream,
- ))
-
- # indicate to caller that feed can be delivered to
- # remote requesting client since we've loaded history
- # data that can be used.
- some_data_ready.set()
-
- # wait for a live feed before starting the sampler.
- await feed_is_live.wait()
-
- # yield back after client connect with filled shm
- task_status.started((
- hist_zero_index,
- hist_shm,
- rt_zero_index,
- rt_shm,
- ))
-
- # history retreival loop depending on user interaction
- # and thus a small RPC-prot for remotely controllinlg
- # what data is loaded for viewing.
- await trio.sleep_forever()
-
-
-def iter_dfs_from_shms(
- fqme: str
-) -> Generator[
- tuple[Path, ShmArray, pl.DataFrame],
- None,
- None,
-]:
- # shm buffer size table based on known sample rates
- sizes: dict[str, int] = {
- 'hist': _default_hist_size,
- 'rt': _default_rt_size,
- }
-
- # load all detected shm buffer files which have the
- # passed FQME pattern in the file name.
- shmfiles: list[Path] = []
- shmdir = Path('/dev/shm/')
-
- for shmfile in shmdir.glob(f'*{fqme}*'):
- filename: str = shmfile.name
-
- # skip index files
- if (
- '_first' in filename
- or '_last' in filename
- ):
- continue
-
- assert shmfile.is_file()
- log.debug(f'Found matching shm buffer file: {filename}')
- shmfiles.append(shmfile)
-
- for shmfile in shmfiles:
-
- # lookup array buffer size based on file suffix
- # being either .rt or .hist
- key: str = shmfile.name.rsplit('.')[-1]
-
- # skip FSP buffers for now..
- if key not in sizes:
- continue
-
- size: int = sizes[key]
-
- # attach to any shm buffer, load array into polars df,
- # write to local parquet file.
- shm, opened = maybe_open_shm_array(
- key=shmfile.name,
- size=size,
- dtype=def_iohlcv_fields,
- readonly=True,
- )
- assert not opened
- ohlcv: np.ndarray = shm.array
- df: pl.DataFrame = np2pl(ohlcv)
-
- yield (
- shmfile,
- shm,
- df,
- )
diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py
index 42c3aa6c..26c3740e 100644
--- a/piker/tsp/_anal.py
+++ b/piker/tsp/_anal.py
@@ -578,11 +578,22 @@ def detect_time_gaps(
# NOTE: this flag is to indicate that on this (sampling) time
# scale we expect to only be filtering against larger venue
# closures-scale time gaps.
+ #
+ # Map to total_ method since `dt_diff` is a duration type,
+ # not datetime - modern polars requires `total_*` methods
+ # for duration types (e.g. `total_days()` not `day()`)
+ # Ensure plural form for polars API (e.g. 'day' -> 'days')
+ unit_plural: str = (
+ gap_dt_unit
+ if gap_dt_unit.endswith('s')
+ else f'{gap_dt_unit}s'
+ )
+ duration_method: str = f'total_{unit_plural}'
return step_gaps.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
- gap_dt_unit,
+ duration_method,
)().abs() > gap_thresh
)
diff --git a/piker/tsp/_annotate.py b/piker/tsp/_annotate.py
new file mode 100644
index 00000000..797c38cf
--- /dev/null
+++ b/piker/tsp/_annotate.py
@@ -0,0 +1,166 @@
+# piker: trading gear for hackers
+# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+"""
+Time-series (remote) annotation APIs.
+
+"""
+from __future__ import annotations
+from math import copysign
+from typing import (
+ Any,
+ TYPE_CHECKING,
+)
+
+import polars as pl
+import tractor
+
+from piker.data._formatters import BGM
+from piker.storage import log
+
+if TYPE_CHECKING:
+ from piker.ui._remote_ctl import AnnotCtl
+
+
+async def markup_gaps(
+ fqme: str,
+ timeframe: float,
+ actl: AnnotCtl,
+ wdts: pl.DataFrame,
+ gaps: pl.DataFrame,
+
+) -> dict[int, dict]:
+ '''
+ Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
+ with rectangles.
+
+ '''
+ aids: dict[int] = {}
+ for i in range(gaps.height):
+
+ row: pl.DataFrame = gaps[i]
+
+ # the gap's RIGHT-most bar's OPEN value
+ # at that time (sample) step.
+ iend: int = row['index'][0]
+ # dt: datetime = row['dt'][0]
+ # dt_prev: datetime = row['dt_prev'][0]
+ # dt_end_t: float = dt.timestamp()
+
+
+ # TODO: can we eventually remove this
+ # once we figure out why the epoch cols
+ # don't match?
+ # TODO: FIX HOW/WHY these aren't matching
+ # and are instead off by 4hours (EST
+ # vs. UTC?!?!)
+ # end_t: float = row['time']
+ # assert (
+ # dt.timestamp()
+ # ==
+ # end_t
+ # )
+
+ # the gap's LEFT-most bar's CLOSE value
+ # at that time (sample) step.
+ prev_r: pl.DataFrame = wdts.filter(
+ pl.col('index') == iend - 1
+ )
+ # XXX: probably a gap in the (newly sorted or de-duplicated)
+ # dt-df, so we might need to re-index first..
+ dt: pl.Series = row['dt']
+ dt_prev: pl.Series = row['dt_prev']
+ if prev_r.is_empty():
+
+ # XXX, filter out any special ignore cases,
+ # - UNIX-epoch stamped datums
+ # - first row
+ if (
+ dt_prev.dt.epoch()[0] == 0
+ or
+ dt.dt.epoch()[0] == 0
+ ):
+ log.warning('Skipping row with UNIX epoch timestamp ??')
+ continue
+
+ if wdts[0]['index'][0] == iend: # first row
+ log.warning('Skipping first-row (has no previous obvi) !!')
+ continue
+
+ # XXX, if the previous-row by shm-index is missing,
+ # meaning there is a missing sample (set), get the prior
+ # row by df index and attempt to use it?
+ i_wdts: pl.DataFrame = wdts.with_row_index(name='i')
+ i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0]
+ prev_row_by_i = wdts[i_row]
+ prev_r: pl.DataFrame = prev_row_by_i
+
+ # debug any missing pre-row
+ if tractor._state.is_debug_mode():
+ await tractor.pause()
+
+ istart: int = prev_r['index'][0]
+
+ # TODO: implement px-col width measure
+ # and ensure at least as many px-cols
+ # shown per rect as configured by user.
+ # gap_w: float = abs((iend - istart))
+ # if gap_w < 6:
+ # margin: float = 6
+ # iend += margin
+ # istart -= margin
+
+ rect_gap: float = BGM*3/8
+ opn: float = row['open'][0]
+ ro: tuple[float, float] = (
+ # dt_end_t,
+ iend + rect_gap + 1,
+ opn,
+ )
+ cls: float = prev_r['close'][0]
+ lc: tuple[float, float] = (
+ # dt_start_t,
+ istart - rect_gap, # + 1 ,
+ cls,
+ )
+
+ color: str = 'dad_blue'
+ diff: float = cls - opn
+ sgn: float = copysign(1, diff)
+ color: str = {
+ -1: 'buy_green',
+ 1: 'sell_red',
+ }[sgn]
+
+ rect_kwargs: dict[str, Any] = dict(
+ fqme=fqme,
+ timeframe=timeframe,
+ start_pos=lc,
+ end_pos=ro,
+ color=color,
+ )
+
+ aid: int = await actl.add_rect(**rect_kwargs)
+ assert aid
+ aids[aid] = rect_kwargs
+
+ # tell chart to redraw all its
+ # graphics view layers Bo
+ await actl.redraw(
+ fqme=fqme,
+ timeframe=timeframe,
+ )
+ return aids
diff --git a/piker/tsp/_dedupe_smart.py b/piker/tsp/_dedupe_smart.py
new file mode 100644
index 00000000..8c0ac55a
--- /dev/null
+++ b/piker/tsp/_dedupe_smart.py
@@ -0,0 +1,206 @@
+'''
+Smart OHLCV deduplication with data quality validation.
+
+Handles concurrent write conflicts by keeping the most complete bar
+(highest volume) while detecting data quality anomalies.
+
+'''
+import polars as pl
+
+from ._anal import with_dts
+
+
+def dedupe_ohlcv_smart(
+ src_df: pl.DataFrame,
+ time_col: str = 'time',
+ volume_col: str = 'volume',
+ sort: bool = True,
+
+) -> tuple[
+ pl.DataFrame, # with dts
+ pl.DataFrame, # deduped (keeping higher volume bars)
+ int, # count of dupes removed
+ pl.DataFrame|None, # valid race conditions
+ pl.DataFrame|None, # data quality violations
+]:
+ '''
+ Smart OHLCV deduplication keeping most complete bars.
+
+ For duplicate timestamps, keeps bar with highest volume under
+ the assumption that higher volume indicates more complete/final
+ data from backfill vs partial live updates.
+
+ Returns
+ -------
+ Tuple of:
+ - wdts: original dataframe with datetime columns added
+ - deduped: deduplicated frame keeping highest-volume bars
+ - diff: number of duplicate rows removed
+ - valid_races: duplicates meeting expected race condition pattern
+ (volume monotonic, OHLC ranges valid)
+ - data_quality_issues: duplicates violating expected relationships
+ indicating provider data problems
+
+ '''
+ wdts: pl.DataFrame = with_dts(src_df)
+
+ # Find duplicate timestamps
+ dupes: pl.DataFrame = wdts.filter(
+ pl.col(time_col).is_duplicated()
+ )
+
+ if dupes.is_empty():
+ # No duplicates, return as-is
+ return (wdts, wdts, 0, None, None)
+
+ # Analyze duplicate groups for validation
+ dupe_analysis: pl.DataFrame = (
+ dupes
+ .sort([time_col, 'index'])
+ .group_by(time_col, maintain_order=True)
+ .agg([
+ pl.col('index').alias('indices'),
+ pl.col('volume').alias('volumes'),
+ pl.col('high').alias('highs'),
+ pl.col('low').alias('lows'),
+ pl.col('open').alias('opens'),
+ pl.col('close').alias('closes'),
+ pl.col('dt').first().alias('dt'),
+ pl.len().alias('count'),
+ ])
+ )
+
+ # Validate OHLCV monotonicity for each duplicate group
+ def check_ohlcv_validity(row) -> dict[str, bool]:
+ '''
+ Check if duplicate bars follow expected race condition pattern.
+
+ For a valid live-update → backfill race:
+ - volume should be monotonically increasing
+ - high should be monotonically non-decreasing
+ - low should be monotonically non-increasing
+ - open should be identical (fixed at bar start)
+
+ Returns dict of violation flags.
+
+ '''
+ vols: list = row['volumes']
+ highs: list = row['highs']
+ lows: list = row['lows']
+ opens: list = row['opens']
+
+ violations: dict[str, bool] = {
+ 'volume_non_monotonic': False,
+ 'high_decreased': False,
+ 'low_increased': False,
+ 'open_mismatch': False,
+ 'identical_bars': False,
+ }
+
+ # Check if all bars are identical (pure duplicate)
+ if (
+ len(set(vols)) == 1
+ and len(set(highs)) == 1
+ and len(set(lows)) == 1
+ and len(set(opens)) == 1
+ ):
+ violations['identical_bars'] = True
+ return violations
+
+ # Check volume monotonicity
+ for i in range(1, len(vols)):
+ if vols[i] < vols[i-1]:
+ violations['volume_non_monotonic'] = True
+ break
+
+ # Check high monotonicity (can only increase or stay same)
+ for i in range(1, len(highs)):
+ if highs[i] < highs[i-1]:
+ violations['high_decreased'] = True
+ break
+
+ # Check low monotonicity (can only decrease or stay same)
+ for i in range(1, len(lows)):
+ if lows[i] > lows[i-1]:
+ violations['low_increased'] = True
+ break
+
+ # Check open consistency (should be fixed)
+ if len(set(opens)) > 1:
+ violations['open_mismatch'] = True
+
+ return violations
+
+ # Apply validation
+ dupe_analysis = dupe_analysis.with_columns([
+ pl.struct(['volumes', 'highs', 'lows', 'opens'])
+ .map_elements(
+ check_ohlcv_validity,
+ return_dtype=pl.Struct([
+ pl.Field('volume_non_monotonic', pl.Boolean),
+ pl.Field('high_decreased', pl.Boolean),
+ pl.Field('low_increased', pl.Boolean),
+ pl.Field('open_mismatch', pl.Boolean),
+ pl.Field('identical_bars', pl.Boolean),
+ ])
+ )
+ .alias('validity')
+ ])
+
+ # Unnest validity struct
+ dupe_analysis = dupe_analysis.unnest('validity')
+
+ # Separate valid races from data quality issues
+ valid_races: pl.DataFrame|None = (
+ dupe_analysis
+ .filter(
+ # Valid if no violations OR just identical bars
+ ~pl.col('volume_non_monotonic')
+ & ~pl.col('high_decreased')
+ & ~pl.col('low_increased')
+ & ~pl.col('open_mismatch')
+ )
+ )
+ if valid_races.is_empty():
+ valid_races = None
+
+ data_quality_issues: pl.DataFrame|None = (
+ dupe_analysis
+ .filter(
+ # Issues if any non-identical violation exists
+ (
+ pl.col('volume_non_monotonic')
+ | pl.col('high_decreased')
+ | pl.col('low_increased')
+ | pl.col('open_mismatch')
+ )
+ & ~pl.col('identical_bars')
+ )
+ )
+ if data_quality_issues.is_empty():
+ data_quality_issues = None
+
+ # Deduplicate: keep highest volume bar for each timestamp
+ deduped: pl.DataFrame = (
+ wdts
+ .sort([time_col, volume_col])
+ .unique(
+ subset=[time_col],
+ keep='last',
+ maintain_order=False,
+ )
+ )
+
+ # Re-sort by time or index
+ if sort:
+ deduped = deduped.sort(by=time_col)
+
+ diff: int = wdts.height - deduped.height
+
+ return (
+ wdts,
+ deduped,
+ diff,
+ valid_races,
+ data_quality_issues,
+ )
diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py
new file mode 100644
index 00000000..361b0e23
--- /dev/null
+++ b/piker/tsp/_history.py
@@ -0,0 +1,1506 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for pikers)
+
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public
+# License as published by the Free Software Foundation, either
+# version 3 of the License, or (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# .
+
+'''
+Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
+
+- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
+ orchestration and mgmt of tsdb data sets.
+- core data-provider history backfilling middleware (as task-funcs) via
+ (what will eventually be `datad`, but are rn is the) `.brokers` backend
+ APIs.
+- various data set cleaning, repairing and issue-detection/analysis
+ routines to ensure consistent series whether in shm or when
+ stored offline (in a tsdb).
+
+'''
+from __future__ import annotations
+from datetime import datetime
+from functools import partial
+from pathlib import Path
+from pprint import pformat
+from types import ModuleType
+from typing import (
+ Callable,
+ Generator,
+ TYPE_CHECKING,
+)
+
+import trio
+from trio_typing import TaskStatus
+import tractor
+from pendulum import (
+ Interval,
+ DateTime,
+ Duration,
+ duration as mk_duration,
+ from_timestamp,
+)
+import numpy as np
+import polars as pl
+
+from piker.brokers import NoData
+from piker.accounting import (
+ MktPair,
+)
+from piker.data._util import (
+ log,
+)
+from ..data._sharedmem import (
+ maybe_open_shm_array,
+ ShmArray,
+)
+from ..data._source import def_iohlcv_fields
+from ..data._sampling import (
+ open_sample_stream,
+)
+
+
+from piker.brokers._util import (
+ DataUnavailable,
+)
+from piker.storage import TimeseriesNotFound
+from ._anal import (
+ dedupe,
+ get_null_segs,
+ iter_null_segs,
+ Frame,
+
+ # codec-ish
+ np2pl as np2pl,
+
+ # `polars` specific
+ # with_dts,
+ # sort_diff,
+
+ # TODO, use this to correct conc-issues during backfill?
+ # detect_price_gaps,
+)
+
+if TYPE_CHECKING:
+ from bidict import bidict
+ from ..service.marketstore import StorageClient
+ # from .feed import _FeedsBus
+
+
+# `ShmArray` buffer sizing configuration:
+_mins_in_day = int(60 * 24)
+# how much is probably dependent on lifestyle
+# but we reco a buncha times (but only on a
+# run-every-other-day kinda week).
+_secs_in_day = int(60 * _mins_in_day)
+_days_in_week: int = 7
+
+_days_worth: int = 3
+_default_hist_size: int = 6 * 365 * _mins_in_day
+_hist_buffer_start = int(
+ _default_hist_size - round(7 * _mins_in_day)
+)
+
+_default_rt_size: int = _days_worth * _secs_in_day
+# NOTE: start the append index in rt buffer such that 1 day's worth
+# can be appenened before overrun.
+_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
+
+
+def diff_history(
+ array: np.ndarray,
+ append_until_dt: datetime|None = None,
+ prepend_until_dt: datetime|None = None,
+
+) -> np.ndarray:
+
+ # no diffing with tsdb dt index possible..
+ if (
+ prepend_until_dt is None
+ and
+ append_until_dt is None
+ ):
+ return array
+
+ times = array['time']
+
+ if append_until_dt:
+ return array[times < append_until_dt.timestamp()]
+ else:
+ return array[times >= prepend_until_dt.timestamp()]
+
+
+async def shm_push_in_between(
+ shm: ShmArray,
+ to_push: np.ndarray,
+ prepend_index: int,
+ backfill_until_dt: datetime,
+
+ update_start_on_prepend: bool = False,
+
+) -> int:
+
+ # XXX, try to catch bad inserts by peeking at the first/last
+ # times and ensure we don't violate order.
+ f_times: np.ndarray = to_push['time']
+ f_start: float = f_times[0]
+ f_start_dt = from_timestamp(f_start)
+ if (
+ f_start_dt < backfill_until_dt
+ ):
+ await tractor.pause()
+
+ # XXX: extremely important, there can be no checkpoints
+ # in the body of this func to avoid entering new ``frames``
+ # values while we're pipelining the current ones to
+ # memory...
+ shm.push(
+ to_push,
+ prepend=True,
+
+ # XXX: only update the ._first index if no tsdb
+ # segment was previously prepended by the
+ # parent task.
+ update_first=update_start_on_prepend,
+
+ # XXX: only prepend from a manually calculated shm
+ # index if there was already a tsdb history
+ # segment prepended (since then the
+ # ._first.value is going to be wayyy in the
+ # past!)
+ start=(
+ prepend_index
+ if not update_start_on_prepend
+ else None
+ ),
+ )
+
+
+async def maybe_fill_null_segments(
+ shm: ShmArray,
+ timeframe: float,
+ get_hist: Callable,
+ sampler_stream: tractor.MsgStream,
+ mkt: MktPair,
+ backfill_until_dt: datetime,
+
+ task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
+
+) -> list[Frame]:
+
+ null_segs_detected = trio.Event()
+ task_status.started(null_segs_detected)
+
+ frame: Frame = shm.array
+
+ # TODO, put in parent task/daemon root!
+ import greenback
+ await greenback.ensure_portal()
+
+ null_segs: tuple|None = get_null_segs(
+ frame,
+ period=timeframe,
+ )
+ for (
+ absi_start, absi_end,
+ fi_start, fi_end,
+ start_t, end_t,
+ start_dt, end_dt,
+ ) in iter_null_segs(
+ null_segs=null_segs,
+ frame=frame,
+ timeframe=timeframe,
+ ):
+
+ # XXX NOTE: ?if we get a badly ordered timestamp
+ # pair, immediately stop backfilling?
+ if (
+ start_dt
+ and
+ end_dt < start_dt
+ ):
+ await tractor.pause()
+ break
+
+ (
+ array,
+ next_start_dt,
+ next_end_dt,
+ ) = await get_hist(
+ timeframe,
+ start_dt=start_dt,
+ end_dt=end_dt,
+ )
+
+ # XXX TODO: pretty sure if i plot tsla, btcusdt.binance
+ # and mnq.cme.ib this causes a Qt crash XXDDD
+
+ # make sure we don't overrun the buffer start
+ len_to_push: int = min(absi_end, array.size)
+ to_push: np.ndarray = array[-len_to_push:]
+
+ await shm_push_in_between(
+ shm,
+ to_push,
+ prepend_index=absi_end,
+ backfill_until_dt=backfill_until_dt,
+ update_start_on_prepend=False,
+ )
+ # TODO: UI side needs IPC event to update..
+ # - make sure the UI actually always handles
+ # this update!
+ # - remember that in the display side, only refersh this
+ # if the respective history is actually "in view".
+ # loop
+ try:
+ await sampler_stream.send({
+ 'broadcast_all': {
+
+ # XXX NOTE XXX: see the
+ # `.ui._display.increment_history_view()` if block
+ # that looks for this info to FORCE a hard viz
+ # redraw!
+ 'backfilling': (mkt.fqme, timeframe),
+ },
+ })
+ except tractor.ContextCancelled:
+ # log.exception
+ await tractor.pause()
+ raise
+
+ null_segs_detected.set()
+ # RECHECK for more null-gaps
+ frame: Frame = shm.array
+ null_segs: tuple | None = get_null_segs(
+ frame,
+ period=timeframe,
+ )
+ if (
+ null_segs
+ and
+ len(null_segs[-1])
+ ):
+ (
+ iabs_slices,
+ iabs_zero_rows,
+ zero_t,
+ ) = null_segs
+ log.warning(
+ f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
+ f'{pformat(iabs_slices)}'
+ )
+
+ # TODO: always backfill gaps with the earliest (price) datum's
+ # value to avoid the y-ranger including zeros and completely
+ # stretching the y-axis..
+ # array: np.ndarray = shm.array
+ # zeros = array[array['low'] == 0]
+ ohlc_fields: list[str] = [
+ 'open',
+ 'high',
+ 'low',
+ 'close',
+ ]
+
+ for istart, istop in iabs_slices:
+
+ # get view into buffer for null-segment
+ gap: np.ndarray = shm._array[istart:istop]
+
+ # copy the oldest OHLC samples forward
+ cls: float = shm._array[istart]['close']
+
+ # TODO: how can we mark this range as being a gap tho?
+ # -[ ] maybe pg finally supports nulls in ndarray to
+ # show empty space somehow?
+ # -[ ] we could put a special value in the vlm or
+ # another col/field to denote?
+ gap[ohlc_fields] = cls
+
+ start_t: float = shm._array[istart]['time']
+ t_diff: float = (istop - istart)*timeframe
+
+ gap['time'] = np.arange(
+ start=start_t,
+ stop=start_t + t_diff,
+ step=timeframe,
+ )
+
+ # TODO: reimpl using the new `.ui._remote_ctl` ctx
+ # ideally using some kinda decent
+ # tractory-reverse-lookup-connnection from some other
+ # `Context` type thingy?
+ await sampler_stream.send({
+ 'broadcast_all': {
+
+ # XXX NOTE XXX: see the
+ # `.ui._display.increment_history_view()` if block
+ # that looks for this info to FORCE a hard viz
+ # redraw!
+ 'backfilling': (mkt.fqme, timeframe),
+ },
+ })
+
+ # TODO: interatively step through any remaining
+ # time-gaps/null-segments and spawn piecewise backfiller
+ # tasks in a nursery?
+ # -[ ] not sure that's going to work so well on the ib
+ # backend but worth a shot?
+ # -[ ] mk new history connections to make it properly
+ # parallel possible no matter the backend?
+ # -[ ] fill algo: do queries in alternating "latest, then
+ # earliest, then latest.. etc?"
+
+
+async def start_backfill(
+ get_hist,
+ def_frame_duration: Duration,
+ mod: ModuleType,
+ mkt: MktPair,
+ shm: ShmArray,
+ timeframe: float,
+ backfill_from_shm_index: int,
+ backfill_from_dt: datetime,
+ sampler_stream: tractor.MsgStream,
+
+ backfill_until_dt: datetime|None = None,
+ storage: StorageClient|None = None,
+ write_tsdb: bool = True,
+
+ task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
+
+) -> int:
+
+ # let caller unblock and deliver latest history frame
+ # and use to signal that backfilling the shm gap until
+ # the tsdb end is complete!
+ bf_done = trio.Event()
+ task_status.started(bf_done)
+
+ # based on the sample step size, maybe load a certain amount history
+ update_start_on_prepend: bool = False
+ if backfill_until_dt is None:
+
+ # TODO: per-provider default history-durations?
+ # -[ ] inside the `open_history_client()` config allow
+ # declaring the history duration limits instead of
+ # guessing and/or applying the same limits to all?
+ #
+ # -[ ] allow declaring (default) per-provider backfill
+ # limits inside a [storage] sub-section in conf.toml?
+ #
+ # NOTE, when no tsdb "last datum" is provided, we just
+ # load some near-term history by presuming a "decently
+ # large" 60s duration limit and a much shorter 1s range.
+ periods = {
+ 1: {'days': 2},
+ 60: {'years': 6},
+ }
+ period_duration: int = periods[timeframe]
+ update_start_on_prepend: bool = True
+
+ # NOTE: manually set the "latest" datetime which we intend to
+ # backfill history "until" so as to adhere to the history
+ # settings above when the tsdb is detected as being empty.
+ backfill_until_dt = backfill_from_dt.subtract(**period_duration)
+
+ # STAGE NOTE: "backward history gap filling":
+ # - we push to the shm buffer until we have history back
+ # until the latest entry loaded from the tsdb's table B)
+ # - after this loop continue to check for other gaps in the
+ # (tsdb) history and (at least report) maybe fill them
+ # from new frame queries to the backend?
+ last_start_dt: datetime = backfill_from_dt
+ next_prepend_index: int = backfill_from_shm_index
+
+ while last_start_dt > backfill_until_dt:
+ log.info(
+ f'Requesting {timeframe}s frame:\n'
+ f'backfill_until_dt: {backfill_until_dt}\n'
+ f'last_start_dt: {last_start_dt}\n'
+ )
+ try:
+ (
+ array,
+ next_start_dt,
+ next_end_dt,
+ ) = await get_hist(
+ timeframe,
+ end_dt=last_start_dt,
+ )
+ except NoData as _daterr:
+ orig_last_start_dt: datetime = last_start_dt
+ gap_report: str = (
+ f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
+ f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
+ f'last_start_dt: {orig_last_start_dt}\n\n'
+ f'bf_until: {backfill_until_dt}\n'
+ )
+ # EMPTY FRAME signal with 3 (likely) causes:
+ #
+ # 1. range contains legit gap in venue history
+ # 2. history actually (edge case) **began** at the
+ # value `last_start_dt`
+ # 3. some other unknown error (ib blocking the
+ # history-query bc they don't want you seeing how
+ # they cucked all the tinas.. like with options
+ # hist)
+ #
+ if def_frame_duration:
+ # decrement by a duration's (frame) worth of time
+ # as maybe indicated by the backend to see if we
+ # can get older data before this possible
+ # "history gap".
+ last_start_dt: datetime = last_start_dt.subtract(
+ seconds=def_frame_duration.total_seconds()
+ )
+ gap_report += (
+ f'Decrementing `end_dt` and retrying with,\n'
+ f'def_frame_duration: {def_frame_duration}\n'
+ f'(new) last_start_dt: {last_start_dt}\n'
+ )
+ log.warning(gap_report)
+ # skip writing to shm/tsdb and try the next
+ # duration's worth of prior history.
+ continue
+
+ else:
+ # await tractor.pause()
+ raise DataUnavailable(gap_report)
+
+ # broker says there never was or is no more history to pull
+ except DataUnavailable as due:
+ message: str = due.args[0]
+ log.warning(
+ f'Provider {mod.name!r} halted backfill due to,\n\n'
+
+ f'{message}\n'
+
+ f'fqme: {mkt.fqme}\n'
+ f'timeframe: {timeframe}\n'
+ f'last_start_dt: {last_start_dt}\n'
+ f'bf_until: {backfill_until_dt}\n'
+ )
+ # UGH: what's a better way?
+ # TODO: backends are responsible for being correct on
+ # this right!?
+ # -[ ] in the `ib` case we could maybe offer some way
+ # to halt the request loop until the condition is
+ # resolved or should the backend be entirely in
+ # charge of solving such faults? yes, right?
+ return
+
+ time: np.ndarray = array['time']
+ assert (
+ time[0]
+ ==
+ next_start_dt.timestamp()
+ )
+
+ assert time[-1] == next_end_dt.timestamp()
+
+ expected_dur: Interval = (
+ last_start_dt.subtract(
+ seconds=timeframe
+ # ^XXX, always "up to" the bar *before*
+ )
+ -
+ next_start_dt
+ )
+
+ # frame's worth of sample-period-steps, in seconds
+ frame_size_s: float = len(array) * timeframe
+ recv_frame_dur: Duration = (
+ from_timestamp(array[-1]['time'])
+ -
+ from_timestamp(array[0]['time'])
+ )
+ if (
+ (lt_frame := (recv_frame_dur < expected_dur))
+ or
+ (null_frame := (frame_size_s == 0))
+ # ^XXX, should NEVER hit now!
+ ):
+ # XXX: query result includes a start point prior to our
+ # expected "frame size" and thus is likely some kind of
+ # history gap (eg. market closed period, outage, etc.)
+ # so just report it to console for now.
+ if lt_frame:
+ reason = 'Possible GAP (or first-datum)'
+ else:
+ assert null_frame
+ reason = 'NULL-FRAME'
+
+ missing_dur: Interval = expected_dur.end - recv_frame_dur.end
+ log.warning(
+ f'{timeframe}s-series {reason} detected!\n'
+ f'fqme: {mkt.fqme}\n'
+ f'last_start_dt: {last_start_dt}\n\n'
+ f'recv interval: {recv_frame_dur}\n'
+ f'expected interval: {expected_dur}\n\n'
+
+ f'Missing duration of history of {missing_dur.in_words()!r}\n'
+ f'{missing_dur}\n'
+ )
+ # await tractor.pause()
+
+ to_push = diff_history(
+ array,
+ prepend_until_dt=backfill_until_dt,
+ )
+ ln: int = len(to_push)
+ if ln:
+ log.info(
+ f'{ln} bars for {next_start_dt} -> {last_start_dt}'
+ )
+
+ else:
+ log.warning(
+ '0 BARS TO PUSH after diff!?\n'
+ f'{next_start_dt} -> {last_start_dt}'
+ )
+
+ # bail gracefully on shm allocation overrun/full
+ # condition
+ try:
+ await shm_push_in_between(
+ shm,
+ to_push,
+ prepend_index=next_prepend_index,
+ backfill_until_dt=backfill_until_dt,
+ update_start_on_prepend=update_start_on_prepend,
+ )
+ await sampler_stream.send({
+ 'broadcast_all': {
+ 'backfilling': (mkt.fqme, timeframe),
+ },
+ })
+
+ # decrement next prepend point
+ next_prepend_index = next_prepend_index - ln
+ last_start_dt = next_start_dt
+
+ except ValueError as ve:
+ _ve = ve
+ log.error(
+ f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?'
+ )
+
+ if next_prepend_index < ln:
+ log.warning(
+ f'Shm buffer can only hold {next_prepend_index} more rows..\n'
+ f'Appending those from recent {ln}-sized frame, no more!'
+ )
+
+ to_push = to_push[-next_prepend_index + 1:]
+ await shm_push_in_between(
+ shm,
+ to_push,
+ prepend_index=next_prepend_index,
+ backfill_until_dt=backfill_until_dt,
+ update_start_on_prepend=update_start_on_prepend,
+ )
+ await sampler_stream.send({
+ 'broadcast_all': {
+ 'backfilling': (mkt.fqme, timeframe),
+ },
+ })
+
+ # can't push the entire frame? so
+ # push only the amount that can fit..
+ break
+
+ log.info(
+ f'Shm pushed {ln} frame:\n'
+ f'{next_start_dt} -> {last_start_dt}'
+ )
+
+ # FINALLY, maybe write immediately to the tsdb backend for
+ # long-term storage.
+ if (
+ storage is not None
+ and
+ write_tsdb
+ ):
+ log.info(
+ f'Writing {ln} frame to storage:\n'
+ f'{next_start_dt} -> {last_start_dt}'
+ )
+
+ # NOTE, always drop the src asset token for
+ # non-currency-pair like market types (for now)
+ #
+ # THAT IS, for now our table key schema is NOT
+ # including the dst[/src] source asset token. SO,
+ # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
+ # historical reasons ONLY.
+ if mkt.dst.atype not in {
+ 'crypto',
+ 'crypto_currency',
+ 'fiat', # a "forex pair"
+ 'perpetual_future', # stupid "perps" from cex land
+ }:
+ col_sym_key: str = mkt.get_fqme(
+ delim_char='',
+ without_src=True,
+ )
+ else:
+ col_sym_key: str = mkt.get_fqme(
+ delim_char='',
+ )
+
+ await storage.write_ohlcv(
+ col_sym_key,
+ shm.array,
+ timeframe,
+ )
+ df: pl.DataFrame = await storage.as_df(
+ fqme=mkt.fqme,
+ period=timeframe,
+ load_from_offline=False,
+ )
+ (
+ wdts,
+ deduped,
+ diff,
+ ) = dedupe(df)
+ # if diff:
+ # sort_diff(df)
+
+ else:
+ # finally filled gap
+ log.info(
+ f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
+ )
+
+ # XXX: extremely important, there can be no checkpoints
+ # in the block above to avoid entering new ``frames``
+ # values while we're pipelining the current ones to
+ # memory...
+ # await sampler_stream.send('broadcast_all')
+
+ # short-circuit (for now)
+ bf_done.set()
+
+
+# NOTE: originally this was used to cope with a tsdb (marketstore)
+# which could not delivery very large frames of history over gRPC
+# (thanks goolag) due to corruption issues.
+#
+# NOW, using apache parquet (by default in the local filesys) we
+# don't have this requirement since the files can be loaded very
+# quickly in entirety to memory via `polars.read_parquet()`.
+#
+async def back_load_from_tsdb(
+ storemod: ModuleType,
+ storage: StorageClient,
+
+ fqme: str,
+
+ tsdb_history: np.ndarray,
+
+ last_tsdb_dt: datetime,
+ latest_start_dt: datetime,
+ latest_end_dt: datetime,
+
+ bf_done: trio.Event,
+
+ timeframe: int,
+ shm: ShmArray,
+):
+ assert len(tsdb_history)
+
+ # sync to backend history task's query/load completion
+ # if bf_done:
+ # await bf_done.wait()
+
+ # TODO: eventually it'd be nice to not require a shm array/buffer
+ # to accomplish this.. maybe we can do some kind of tsdb direct to
+ # graphics format eventually in a child-actor?
+ if storemod.name == 'nativedb':
+ return
+
+ await tractor.pause()
+ assert shm._first.value == 0
+
+ array = shm.array
+
+ # if timeframe == 1:
+ # times = shm.array['time']
+ # assert (times[1] - times[0]) == 1
+
+ if len(array):
+ shm_last_dt = from_timestamp(
+ shm.array[0]['time']
+ )
+ else:
+ shm_last_dt = None
+
+ if last_tsdb_dt:
+ assert shm_last_dt >= last_tsdb_dt
+
+ # do diff against start index of last frame of history and only
+ # fill in an amount of datums from tsdb allows for most recent
+ # to be loaded into mem *before* tsdb data.
+ if (
+ last_tsdb_dt
+ and latest_start_dt
+ ):
+ backfilled_size_s: Duration = (
+ latest_start_dt - last_tsdb_dt
+ ).seconds
+ # if the shm buffer len is not large enough to contain
+ # all missing data between the most recent backend-queried frame
+ # and the most recent dt-index in the db we warn that we only
+ # want to load a portion of the next tsdb query to fill that
+ # space.
+ log.info(
+ f'{backfilled_size_s} seconds worth of {timeframe}s loaded'
+ )
+
+ # Load TSDB history into shm buffer (for display) if there is
+ # remaining buffer space.
+
+ time_key: str = 'time'
+ if getattr(storemod, 'ohlc_key_map', False):
+ keymap: bidict = storemod.ohlc_key_map
+ time_key: str = keymap.inverse['time']
+
+ # if (
+ # not len(tsdb_history)
+ # ):
+ # return
+
+ tsdb_last_frame_start: datetime = last_tsdb_dt
+ # load as much from storage into shm possible (depends on
+ # user's shm size settings).
+ while shm._first.value > 0:
+
+ tsdb_history = await storage.read_ohlcv(
+ fqme,
+ timeframe=timeframe,
+ end=tsdb_last_frame_start,
+ )
+
+ # # empty query
+ # if not len(tsdb_history):
+ # break
+
+ next_start = tsdb_history[time_key][0]
+ if next_start >= tsdb_last_frame_start:
+ # no earlier data detected
+ break
+
+ else:
+ tsdb_last_frame_start = next_start
+
+ # TODO: see if there's faster multi-field reads:
+ # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
+ # re-index with a `time` and index field
+ prepend_start = shm._first.value
+
+ to_push = tsdb_history[-prepend_start:]
+ shm.push(
+ to_push,
+
+ # insert the history pre a "days worth" of samples
+ # to leave some real-time buffer space at the end.
+ prepend=True,
+ # update_first=False,
+ # start=prepend_start,
+ field_map=storemod.ohlc_key_map,
+ )
+
+ log.info(f'Loaded {to_push.shape} datums from storage')
+ tsdb_last_frame_start = tsdb_history[time_key][0]
+
+ # manually trigger step update to update charts/fsps
+ # which need an incremental update.
+ # NOTE: the way this works is super duper
+ # un-intuitive right now:
+ # - the broadcaster fires a msg to the fsp subsystem.
+ # - fsp subsys then checks for a sample step diff and
+ # possibly recomputes prepended history.
+ # - the fsp then sends back to the parent actor
+ # (usually a chart showing graphics for said fsp)
+ # which tells the chart to conduct a manual full
+ # graphics loop cycle.
+ # await sampler_stream.send('broadcast_all')
+
+
+async def push_latest_frame(
+ # box-type only that should get packed with the datetime
+ # objects received for the latest history frame
+ dt_eps: list[DateTime, DateTime],
+ shm: ShmArray,
+ get_hist: Callable[
+ [int, datetime, datetime],
+ tuple[np.ndarray, str]
+ ],
+ timeframe: float,
+ config: dict,
+
+ task_status: TaskStatus[
+ Exception | list[datetime, datetime]
+ ] = trio.TASK_STATUS_IGNORED,
+
+) -> list[datetime, datetime] | None:
+ # get latest query's worth of history all the way
+ # back to what is recorded in the tsdb
+ try:
+ (
+ array,
+ mr_start_dt,
+ mr_end_dt,
+ ) = await get_hist(
+ timeframe,
+ end_dt=None,
+ )
+ # so caller can access these ep values
+ dt_eps.extend([
+ mr_start_dt,
+ mr_end_dt,
+ ])
+ task_status.started(dt_eps)
+
+ # XXX: timeframe not supported for backend (since
+ # above exception type), terminate immediately since
+ # there's no backfilling possible.
+ except DataUnavailable:
+ task_status.started(None)
+
+ if timeframe > 1:
+ await tractor.pause()
+
+ # prolly tf not supported
+ return None
+
+ # NOTE: on the first history, most recent history
+ # frame we PREPEND from the current shm ._last index
+ # and thus a gap between the earliest datum loaded here
+ # and the latest loaded from the tsdb may exist!
+ log.info(f'Pushing {array.size} to shm!')
+ shm.push(
+ array,
+ prepend=True, # append on first frame
+ )
+
+ return dt_eps
+
+
+async def load_tsdb_hist(
+ storage: StorageClient,
+ mkt: MktPair,
+ timeframe: float,
+
+ task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
+
+) -> tuple[
+ np.ndarray,
+ DateTime,
+ DateTime,
+]|None:
+ # loads a (large) frame of data from the tsdb depending
+ # on the db's query size limit; our "nativedb" (using
+ # parquet) generally can load the entire history into mem
+ # but if not then below the remaining history can be lazy
+ # loaded?
+ fqme: str = mkt.fqme
+ tsdb_entry: tuple[
+ np.ndarray,
+ DateTime,
+ DateTime,
+ ]
+ try:
+ tsdb_entry: tuple|None = await storage.load(
+ fqme,
+ timeframe=timeframe,
+ )
+ return tsdb_entry
+
+ except TimeseriesNotFound:
+ log.warning(
+ f'No timeseries yet for {timeframe}@{fqme}'
+ )
+ return None
+
+
+async def tsdb_backfill(
+ mod: ModuleType,
+ storemod: ModuleType,
+
+ storage: StorageClient,
+ mkt: MktPair,
+ shm: ShmArray,
+ timeframe: float,
+
+ sampler_stream: tractor.MsgStream,
+
+ task_status: TaskStatus[
+ tuple[ShmArray, ShmArray]
+ ] = trio.TASK_STATUS_IGNORED,
+
+) -> None:
+
+ if timeframe not in (1, 60):
+ raise ValueError(
+ '`piker` only needs to support 1m and 1s sampling '
+ 'but ur api is trying to deliver a longer '
+ f'timeframe of {timeframe} seconds..\n'
+ 'So yuh.. dun do dat brudder.'
+ )
+
+ get_hist: Callable[
+ [int, datetime, datetime],
+ tuple[np.ndarray, str]
+ ]
+ config: dict[str, int]
+ async with (
+ mod.open_history_client(
+ mkt,
+ ) as (get_hist, config),
+
+ # NOTE: this sub-nursery splits to tasks for the given
+ # sampling rate to concurrently load offline tsdb
+ # timeseries as well as new data from the venue backend!
+ ):
+ log.info(
+ f'`{mod}` history client returned backfill config:\n'
+ f'{pformat(config)}\n'
+ )
+
+ # concurrently load the provider's most-recent-frame AND any
+ # pre-existing tsdb history already saved in `piker` storage.
+ dt_eps: list[DateTime, DateTime] = []
+ async with (
+ tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn
+ ):
+ tn.start_soon(
+ push_latest_frame,
+ dt_eps,
+ shm,
+ get_hist,
+ timeframe,
+ config,
+ )
+ tsdb_entry: tuple = await load_tsdb_hist(
+ storage,
+ mkt,
+ timeframe,
+ )
+
+ # tell parent task to continue
+ # TODO: really we'd want this the other way with the
+ # tsdb load happening asap and the since the latest
+ # frame query will normally be the main source of
+ # latency?
+ task_status.started()
+
+ # NOTE: iabs to start backfilling from, reverse chronological,
+ # ONLY AFTER the first history frame has been pushed to
+ # mem!
+ backfill_gap_from_shm_index: int = shm._first.value + 1
+
+ # Prepend any tsdb history into the rt-shm-buffer which
+ # should NOW be getting filled with the most recent history
+ # pulled from the data-backend.
+ if dt_eps:
+ # well then, unpack the latest (gap) backfilled frame dts
+ (
+ mr_start_dt,
+ mr_end_dt,
+ ) = dt_eps
+
+ first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
+ calced_frame_size: Duration = mk_duration(
+ seconds=first_frame_dur_s,
+ )
+ # NOTE, attempt to use the backend declared default frame
+ # sizing (as allowed by their time-series query APIs) and
+ # if not provided try to construct a default from the
+ # first frame received above.
+ def_frame_durs: dict[
+ int,
+ Duration,
+ ]|None = config.get('frame_types', None)
+
+ if def_frame_durs:
+ def_frame_size: Duration = def_frame_durs[timeframe]
+
+ if def_frame_size != calced_frame_size:
+ log.warning(
+ f'Expected frame size {def_frame_size}\n'
+ f'Rxed frame {calced_frame_size}\n'
+ )
+ # await tractor.pause()
+ else:
+ # use what we calced from first frame above.
+ def_frame_size = calced_frame_size
+
+ # NOTE: when there's no offline data, there's 2 cases:
+ # - data backend doesn't support timeframe/sample
+ # period (in which case `dt_eps` should be `None` and
+ # we shouldn't be here!), or
+ # - no prior history has been stored (yet) and we need
+ # todo full backfill of the history now.
+ if tsdb_entry is None:
+ # indicate to backfill task to fill the whole
+ # shm buffer as much as it can!
+ last_tsdb_dt = None
+
+ # there's existing tsdb history from (offline) storage
+ # so only backfill the gap between the
+ # most-recent-frame (mrf) and that latest sample.
+ else:
+ (
+ tsdb_history,
+ first_tsdb_dt,
+ last_tsdb_dt,
+ ) = tsdb_entry
+
+ # await tractor.pause()
+
+ # if there is a gap to backfill from the first
+ # history frame until the last datum loaded from the tsdb
+ # continue that now in the background
+ async with (
+ tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn,
+ ):
+
+ bf_done: trio.Event = await tn.start(
+ partial(
+ start_backfill,
+ get_hist=get_hist,
+ def_frame_duration=def_frame_size,
+ mod=mod,
+ mkt=mkt,
+ shm=shm,
+ timeframe=timeframe,
+
+ backfill_from_shm_index=backfill_gap_from_shm_index,
+ backfill_from_dt=mr_start_dt,
+
+ sampler_stream=sampler_stream,
+ backfill_until_dt=last_tsdb_dt,
+
+ storage=storage,
+ write_tsdb=True,
+ )
+ )
+ nulls_detected: trio.Event|None = None
+
+ if last_tsdb_dt is not None:
+
+ # calc the index from which the tsdb data should be
+ # prepended, presuming there is a gap between the
+ # latest frame (loaded/read above) and the latest
+ # sample loaded from the tsdb.
+ backfill_diff: Duration = mr_start_dt - last_tsdb_dt
+ offset_s: float = backfill_diff.in_seconds()
+
+ # XXX EDGE CASEs: the most recent frame overlaps with
+ # prior tsdb history!!
+ # - so the latest frame's start time is earlier then
+ # the tsdb's latest sample.
+ # - alternatively this may also more generally occur
+ # when the venue was closed (say over the weeknd)
+ # causing a timeseries gap, AND the query frames size
+ # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
+ # GREATER THAN the current venue-market's operating
+ # session (time) we will receive datums from BEFORE THE
+ # CLOSURE GAP and thus the `offset_s` value will be
+ # NEGATIVE! In this case we need to ensure we don't try
+ # to push datums that have already been recorded in the
+ # tsdb. In this case we instead only retreive and push
+ # the series portion missing from the db's data set.
+ # if offset_s < 0:
+ # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
+ # non_overlap_offset_s: float = backfill_diff.in_seconds()
+
+ offset_samples: int = round(offset_s / timeframe)
+
+ # TODO: see if there's faster multi-field reads:
+ # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
+ # re-index with a `time` and index field
+ if offset_s > 0:
+ # NOTE XXX: ONLY when there is an actual gap
+ # between the earliest sample in the latest history
+ # frame do we want to NOT stick the latest tsdb
+ # history adjacent to that latest frame!
+ prepend_start = shm._first.value - offset_samples + 1
+ to_push = tsdb_history[-prepend_start:]
+ else:
+ # when there is overlap we want to remove the
+ # overlapping samples from the tsdb portion (taking
+ # instead the latest frame's values since THEY
+ # SHOULD BE THE SAME) and prepend DIRECTLY adjacent
+ # to the latest frame!
+ # TODO: assert the overlap segment array contains
+ # the same values!?!
+ prepend_start = shm._first.value
+ to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
+
+ # tsdb history is so far in the past we can't fit it in
+ # shm buffer space so simply don't load it!
+ if prepend_start > 0:
+ shm.push(
+ to_push,
+
+ # insert the history pre a "days worth" of samples
+ # to leave some real-time buffer space at the end.
+ prepend=True,
+ # update_first=False,
+ start=prepend_start,
+ field_map=storemod.ohlc_key_map,
+ )
+
+ log.info(f'Loaded {to_push.shape} datums from storage')
+
+ # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
+ # seemingly missing (null-time) segments..
+ # TODO: ideally these can never exist!
+ # -[ ] somehow it seems sometimes we're writing zero-ed
+ # segments to tsdbs during teardown?
+ # -[ ] can we ensure that the backfiller tasks do this
+ # work PREVENTAVELY instead?
+ # -[ ] fill in non-zero epoch time values ALWAYS!
+ # await maybe_fill_null_segments(
+ nulls_detected: trio.Event = await tn.start(partial(
+ maybe_fill_null_segments,
+
+ shm=shm,
+ timeframe=timeframe,
+ get_hist=get_hist,
+ sampler_stream=sampler_stream,
+ mkt=mkt,
+ backfill_until_dt=last_tsdb_dt,
+ ))
+
+ # 2nd nursery END
+
+ # TODO: who would want to?
+ if nulls_detected:
+ await nulls_detected.wait()
+
+ await bf_done.wait()
+ # TODO: maybe start history anal and load missing "history
+ # gaps" via backend..
+
+ # if len(hist_shm.array) < 2:
+ # TODO: there's an edge case here to solve where if the last
+ # frame before market close (at least on ib) was pushed and
+ # there was only "1 new" row pushed from the first backfill
+ # query-iteration, then the sample step sizing calcs will
+ # break upstream from here since you can't diff on at least
+ # 2 steps... probably should also add logic to compute from
+ # the tsdb series and stash that somewhere as meta data on
+ # the shm buffer?.. no se.
+
+ # backload any further data from tsdb (concurrently per
+ # timeframe) if not all data was able to be loaded (in memory)
+ # from the ``StorageClient.load()`` call above.
+ await trio.sleep_forever()
+
+ # XXX NOTE: this is legacy from when we were using
+ # marketstore and we needed to continue backloading
+ # incrementally from the tsdb client.. (bc it couldn't
+ # handle a single large query with gRPC for some
+ # reason.. classic goolag pos)
+ # tn.start_soon(
+ # back_load_from_tsdb,
+
+ # storemod,
+ # storage,
+ # fqme,
+
+ # tsdb_history,
+ # last_tsdb_dt,
+ # mr_start_dt,
+ # mr_end_dt,
+ # bf_done,
+
+ # timeframe,
+ # shm,
+ # )
+
+
+async def manage_history(
+ mod: ModuleType,
+ mkt: MktPair,
+ some_data_ready: trio.Event,
+ feed_is_live: trio.Event,
+ timeframe: float = 60, # in seconds
+
+ task_status: TaskStatus[
+ tuple[ShmArray, ShmArray]
+ ] = trio.TASK_STATUS_IGNORED,
+
+) -> None:
+ '''
+ Load historical series data from offline-storage (tsdb) and any
+ missing new datums from data provider(s).
+
+ This is the primary "backfilling service" `trio.Task` entrypoint
+ and conducts,
+
+ - time-series retreival for offline-data previously stored in
+ any (connected) tsdb,
+
+ - queries for missing new datums (compared with the latest found
+ from ^) onward to the present by pulling from available
+ `datad`-provider backends.
+
+ - real-time update of both the existing tsdb-records and the
+ allocated shared-memory-buffer as required by downstream
+ `piker.data`-layer consumer-wares.
+
+ Init sequence:
+ -------------
+ - allocate shm (numpy array) buffers for 60s & 1s sample rates
+ - configure "zero index" for each buffer: the index where
+ history will prepended *to* and new live data will be
+ appened *from*.
+ - open a ``.storage.StorageClient`` and load any existing tsdb
+ history as well as (async) start a backfill task which loads
+ missing (newer) history from the data provider backend:
+ - tsdb history is loaded first and pushed to shm ASAP.
+ - the backfill task loads the most recent history before
+ unblocking its parent task, so that the `ShmArray._last` is
+ up to date to allow the OHLC sampler to begin writing new
+ samples as the correct buffer index once the provider feed
+ engages.
+
+ '''
+ # TODO: is there a way to make each shm file key
+ # actor-tree-discovery-addr unique so we avoid collisions
+ # when doing tests which also allocate shms for certain instruments
+ # that may be in use on the system by some other running daemons?
+ # from tractor._state import _runtime_vars
+ # port = _runtime_vars['_root_mailbox'][1]
+
+ uid: tuple = tractor.current_actor().uid
+ name, uuid = uid
+ service: str = name.rstrip(f'.{mod.name}')
+ fqme: str = mkt.get_fqme(delim_char='')
+
+ # (maybe) allocate shm array for this broker/symbol which will
+ # be used for fast near-term history capture and processing.
+ hist_shm, opened = maybe_open_shm_array(
+ size=_default_hist_size,
+ append_start_index=_hist_buffer_start,
+
+ key=f'piker.{service}[{uuid[:16]}].{fqme}.hist',
+
+ # use any broker defined ohlc dtype:
+ dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields),
+
+ # we expect the sub-actor to write
+ readonly=False,
+ )
+ hist_zero_index = hist_shm.index - 1
+
+ # TODO: history validation
+ if not opened:
+ raise RuntimeError(
+ "Persistent shm for sym was already open?!"
+ )
+
+ rt_shm, opened = maybe_open_shm_array(
+ size=_default_rt_size,
+ append_start_index=_rt_buffer_start,
+ key=f'piker.{service}[{uuid[:16]}].{fqme}.rt',
+
+ # use any broker defined ohlc dtype:
+ dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields),
+
+ # we expect the sub-actor to write
+ readonly=False,
+ )
+
+ # (for now) set the rt (hft) shm array with space to prepend
+ # only a few days worth of 1s history.
+ days: int = 2
+ start_index: int = days*_secs_in_day
+ rt_shm._first.value = start_index
+ rt_shm._last.value = start_index
+ rt_zero_index = rt_shm.index - 1
+
+ if not opened:
+ raise RuntimeError(
+ "Persistent shm for sym was already open?!"
+ )
+
+ open_history_client = getattr(
+ mod,
+ 'open_history_client',
+ )
+ assert open_history_client
+
+ # TODO: maybe it should be a subpkg of `.data`?
+ from piker import storage
+
+ storemod: ModuleType
+ client: StorageClient
+ tn: trio.Nursery
+ async with (
+ storage.open_storage_client() as (
+ storemod,
+ client,
+ ),
+
+ # NOTE: this nursery spawns a task per "timeframe" (aka
+ # sampling period) data set since normally differently
+ # sampled timeseries can be loaded / process independently
+ # ;)
+ tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn,
+ ):
+ log.info(
+ f'Connecting to storage backend `{storemod.name}`:\n'
+ f'location: {client.address}\n'
+ f'db cardinality: {client.cardinality}\n'
+ # TODO: show backend config, eg:
+ # - network settings
+ # - storage size with compression
+ # - number of loaded time series?
+ )
+
+ # NOTE: this call ONLY UNBLOCKS once the latest-most frame
+ # (i.e. history just before the live feed latest datum) of
+ # history has been loaded and written to the shm buffer:
+ # - the backfiller task can write in reverse chronological
+ # to the shm and tsdb
+ # - the tsdb data can be loaded immediately and the
+ # backfiller can do a single append from it's end datum and
+ # then prepends backward to that from the current time
+ # step.
+ tf2mem: dict = {
+ 1: rt_shm,
+ 60: hist_shm,
+ }
+ async with open_sample_stream(
+ period_s=1.,
+ shms_by_period={
+ 1.: rt_shm.token,
+ 60.: hist_shm.token,
+ },
+
+ # NOTE: we want to only open a stream for doing
+ # broadcasts on backfill operations, not receive the
+ # sample index-stream (since there's no code in this
+ # data feed layer that needs to consume it).
+ open_index_stream=True,
+ sub_for_broadcasts=False,
+
+ ) as sample_stream:
+ # register 1s and 1m buffers with the global
+ # incrementer task
+ log.info(f'Connected to sampler stream: {sample_stream}')
+
+ for timeframe in [60, 1]:
+ await tn.start(partial(
+ tsdb_backfill,
+ mod=mod,
+ storemod=storemod,
+ storage=client,
+ mkt=mkt,
+ shm=tf2mem[timeframe],
+ timeframe=timeframe,
+ sampler_stream=sample_stream,
+ ))
+
+ # indicate to caller that feed can be delivered to
+ # remote requesting client since we've loaded history
+ # data that can be used.
+ some_data_ready.set()
+
+ # wait for a live feed before starting the sampler.
+ # await feed_is_live.wait()
+
+ # yield back after client connect with filled shm
+ task_status.started((
+ hist_zero_index,
+ hist_shm,
+ rt_zero_index,
+ rt_shm,
+ ))
+
+ # history retreival loop depending on user interaction
+ # and thus a small RPC-prot for remotely controllinlg
+ # what data is loaded for viewing.
+ await trio.sleep_forever()
+
+
+def iter_dfs_from_shms(
+ fqme: str
+) -> Generator[
+ tuple[Path, ShmArray, pl.DataFrame],
+ None,
+ None,
+]:
+ # shm buffer size table based on known sample rates
+ sizes: dict[str, int] = {
+ 'hist': _default_hist_size,
+ 'rt': _default_rt_size,
+ }
+
+ # load all detected shm buffer files which have the
+ # passed FQME pattern in the file name.
+ shmfiles: list[Path] = []
+ shmdir = Path('/dev/shm/')
+
+ for shmfile in shmdir.glob(f'*{fqme}*'):
+ filename: str = shmfile.name
+
+ # skip index files
+ if (
+ '_first' in filename
+ or '_last' in filename
+ ):
+ continue
+
+ assert shmfile.is_file()
+ log.debug(f'Found matching shm buffer file: {filename}')
+ shmfiles.append(shmfile)
+
+ for shmfile in shmfiles:
+
+ # lookup array buffer size based on file suffix
+ # being either .rt or .hist
+ key: str = shmfile.name.rsplit('.')[-1]
+
+ # skip FSP buffers for now..
+ if key not in sizes:
+ continue
+
+ size: int = sizes[key]
+
+ # attach to any shm buffer, load array into polars df,
+ # write to local parquet file.
+ shm, opened = maybe_open_shm_array(
+ key=shmfile.name,
+ size=size,
+ dtype=def_iohlcv_fields,
+ readonly=True,
+ )
+ assert not opened
+ ohlcv: np.ndarray = shm.array
+ df: pl.DataFrame = np2pl(ohlcv)
+
+ yield (
+ shmfile,
+ shm,
+ df,
+ )
diff --git a/piker/ui/_l1.py b/piker/ui/_l1.py
index 8d29d90c..f557de4b 100644
--- a/piker/ui/_l1.py
+++ b/piker/ui/_l1.py
@@ -237,8 +237,8 @@ class LevelLabel(YAxisLabel):
class L1Label(LevelLabel):
text_flags = (
- QtCore.Qt.TextDontClip
- | QtCore.Qt.AlignLeft
+ QtCore.Qt.TextFlag.TextDontClip
+ | QtCore.Qt.AlignmentFlag.AlignLeft
)
def set_label_str(
diff --git a/snippets/claude_debug_helper.py b/snippets/claude_debug_helper.py
new file mode 100755
index 00000000..97467d8a
--- /dev/null
+++ b/snippets/claude_debug_helper.py
@@ -0,0 +1,256 @@
+#!/usr/bin/env python
+'''
+Programmatic debugging helper for `pdbp` REPL human-like
+interaction but built to allow `claude` to interact with
+crashes and `tractor.pause()` breakpoints along side a human dev.
+
+Originally written by `clauded` during a backfiller inspection
+session with @goodboy trying to resolve duplicate/gappy ohlcv ts
+issues discovered while testing the new `nativedb` tsdb.
+
+Allows `claude` to run `pdb` commands and capture output in an "offline"
+manner but generating similar output as if it was iteracting with
+the debug REPL.
+
+The use of `pexpect` is heavily based on tractor's REPL UX test
+suite(s), namely various `tests/devx/test_debugger.py` patterns.
+
+'''
+import sys
+import os
+import time
+
+import pexpect
+from pexpect.exceptions import (
+ TIMEOUT,
+ EOF,
+)
+
+
+PROMPT: str = r'\(Pdb\+\)'
+
+
+def expect(
+ child: pexpect.spawn,
+ patt: str,
+ **kwargs,
+) -> None:
+ '''
+ Expect wrapper that prints last console data before failing.
+
+ '''
+ try:
+ child.expect(
+ patt,
+ **kwargs,
+ )
+ except TIMEOUT:
+ before: str = (
+ str(child.before.decode())
+ if isinstance(child.before, bytes)
+ else str(child.before)
+ )
+ print(
+ f'TIMEOUT waiting for pattern: {patt}\n'
+ f'Last seen output:\n{before}'
+ )
+ raise
+
+
+def run_pdb_commands(
+ commands: list[str],
+ initial_cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
+ timeout: int = 30,
+ print_output: bool = True,
+) -> dict[str, str]:
+ '''
+ Spawn piker process, wait for pdb prompt, execute commands.
+
+ Returns dict mapping command -> output.
+
+ '''
+ results: dict[str, str] = {}
+
+ # Disable colored output for easier parsing
+ os.environ['PYTHON_COLORS'] = '0'
+
+ # Spawn the process
+ if print_output:
+ print(f'Spawning: {initial_cmd}')
+
+ child: pexpect.spawn = pexpect.spawn(
+ initial_cmd,
+ timeout=timeout,
+ encoding='utf-8',
+ echo=False,
+ )
+
+ # Wait for pdb prompt
+ try:
+ expect(child, PROMPT, timeout=timeout)
+ if print_output:
+ print('Reached pdb prompt!')
+
+ # Execute each command
+ for cmd in commands:
+ if print_output:
+ print(f'\n>>> {cmd}')
+
+ child.sendline(cmd)
+ time.sleep(0.1)
+
+ # Wait for next prompt
+ expect(child, PROMPT, timeout=timeout)
+
+ # Capture output (everything before the prompt)
+ output: str = (
+ str(child.before.decode())
+ if isinstance(child.before, bytes)
+ else str(child.before)
+ )
+ results[cmd] = output
+
+ if print_output:
+ print(output)
+
+ # Quit debugger gracefully
+ child.sendline('quit')
+ try:
+ child.expect(EOF, timeout=5)
+ except (TIMEOUT, EOF):
+ pass
+
+ except TIMEOUT as e:
+ print(f'Timeout: {e}')
+ if child.before:
+ before: str = (
+ str(child.before.decode())
+ if isinstance(child.before, bytes)
+ else str(child.before)
+ )
+ print(f'Buffer:\n{before}')
+ results['_error'] = str(e)
+
+ finally:
+ if child.isalive():
+ child.close(force=True)
+
+ return results
+
+
+class InteractivePdbSession:
+ '''
+ Interactive pdb session manager for incremental debugging.
+
+ '''
+ def __init__(
+ self,
+ cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
+ timeout: int = 30,
+ ):
+ self.cmd: str = cmd
+ self.timeout: int = timeout
+ self.child: pexpect.spawn|None = None
+ self.history: list[tuple[str, str]] = []
+
+ def start(self) -> None:
+ '''
+ Start the piker process and wait for first prompt.
+
+ '''
+ os.environ['PYTHON_COLORS'] = '0'
+
+ print(f'Starting: {self.cmd}')
+ self.child = pexpect.spawn(
+ self.cmd,
+ timeout=self.timeout,
+ encoding='utf-8',
+ echo=False,
+ )
+
+ # Wait for initial prompt
+ expect(self.child, PROMPT, timeout=self.timeout)
+ print('Ready at pdb prompt!')
+
+ def run(
+ self,
+ cmd: str,
+ print_output: bool = True,
+ ) -> str:
+ '''
+ Execute a single pdb command and return output.
+
+ '''
+ if not self.child or not self.child.isalive():
+ raise RuntimeError('Session not started or dead')
+
+ if print_output:
+ print(f'\n>>> {cmd}')
+
+ self.child.sendline(cmd)
+ time.sleep(0.1)
+
+ # Wait for next prompt
+ expect(self.child, PROMPT, timeout=self.timeout)
+
+ output: str = (
+ str(self.child.before.decode())
+ if isinstance(self.child.before, bytes)
+ else str(self.child.before)
+ )
+ self.history.append((cmd, output))
+
+ if print_output:
+ print(output)
+
+ return output
+
+ def quit(self) -> None:
+ '''
+ Exit the debugger and cleanup.
+
+ '''
+ if self.child and self.child.isalive():
+ self.child.sendline('quit')
+ try:
+ self.child.expect(EOF, timeout=5)
+ except (TIMEOUT, EOF):
+ pass
+ self.child.close(force=True)
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self, *args):
+ self.quit()
+
+
+if __name__ == '__main__':
+ # Example inspection commands
+ inspect_cmds: list[str] = [
+ 'locals().keys()',
+ 'type(deduped)',
+ 'deduped.shape',
+ (
+ 'step_gaps.shape '
+ 'if "step_gaps" in locals() '
+ 'else "N/A"'
+ ),
+ (
+ 'venue_gaps.shape '
+ 'if "venue_gaps" in locals() '
+ 'else "N/A"'
+ ),
+ ]
+
+ # Allow commands from CLI args
+ if len(sys.argv) > 1:
+ inspect_cmds = sys.argv[1:]
+
+ # Interactive session example
+ with InteractivePdbSession() as session:
+ for cmd in inspect_cmds:
+ session.run(cmd)
+
+ print('\n=== Session Complete ===')