diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 121fcbb7..3c49e71b 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -28,1435 +28,18 @@ Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic fo stored offline (in a tsdb). ''' -from __future__ import annotations -from datetime import datetime -from functools import partial -from pathlib import Path -from pprint import pformat -from types import ModuleType -from typing import ( - Callable, - Generator, - TYPE_CHECKING, -) - -import trio -from trio_typing import TaskStatus -import tractor -from pendulum import ( - Interval, - DateTime, - Duration, - duration as mk_duration, - from_timestamp, -) -import numpy as np -import polars as pl - -from piker.brokers import NoData -from piker.accounting import ( - MktPair, -) -from piker.data._util import ( - log, -) -from ..data._sharedmem import ( - maybe_open_shm_array, - ShmArray, -) -from ..data._source import def_iohlcv_fields -from ..data._sampling import ( - open_sample_stream, -) from ._anal import ( - get_null_segs as get_null_segs, - iter_null_segs as iter_null_segs, - Frame as Frame, - Seq as Seq, - # codec-ish - np2pl as np2pl, + # `polars` specific + dedupe as dedupe, + detect_time_gaps as detect_time_gaps, pl2np as pl2np, # `numpy` only slice_from_time as slice_from_time, - - # `polars` specific - dedupe as dedupe, - with_dts as with_dts, - detect_time_gaps as detect_time_gaps, - sort_diff as sort_diff, - - # TODO: - detect_price_gaps as detect_price_gaps ) - -# TODO: break up all this shite into submods! -from ..brokers._util import ( - DataUnavailable, +from ._history import ( + iter_dfs_from_shms as iter_dfs_from_shms, + manage_history as manage_history, ) -from ..storage import TimeseriesNotFound - -if TYPE_CHECKING: - from bidict import bidict - from ..service.marketstore import StorageClient - # from .feed import _FeedsBus - - -# `ShmArray` buffer sizing configuration: -_mins_in_day = int(60 * 24) -# how much is probably dependent on lifestyle -# but we reco a buncha times (but only on a -# run-every-other-day kinda week). -_secs_in_day = int(60 * _mins_in_day) -_days_in_week: int = 7 - -_days_worth: int = 3 -_default_hist_size: int = 6 * 365 * _mins_in_day -_hist_buffer_start = int( - _default_hist_size - round(7 * _mins_in_day) -) - -_default_rt_size: int = _days_worth * _secs_in_day -# NOTE: start the append index in rt buffer such that 1 day's worth -# can be appenened before overrun. -_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) - - -def diff_history( - array: np.ndarray, - append_until_dt: datetime | None = None, - prepend_until_dt: datetime | None = None, - -) -> np.ndarray: - - # no diffing with tsdb dt index possible.. - if ( - prepend_until_dt is None - and append_until_dt is None - ): - return array - - times = array['time'] - - if append_until_dt: - return array[times < append_until_dt.timestamp()] - else: - return array[times >= prepend_until_dt.timestamp()] - - -# TODO: can't we just make this a sync func now? -async def shm_push_in_between( - shm: ShmArray, - to_push: np.ndarray, - prepend_index: int, - - update_start_on_prepend: bool = False, - -) -> int: - # XXX: extremely important, there can be no checkpoints - # in the body of this func to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - shm.push( - to_push, - prepend=True, - - # XXX: only update the ._first index if no tsdb - # segment was previously prepended by the - # parent task. - update_first=update_start_on_prepend, - - # XXX: only prepend from a manually calculated shm - # index if there was already a tsdb history - # segment prepended (since then the - # ._first.value is going to be wayyy in the - # past!) - start=( - prepend_index - if not update_start_on_prepend - else None - ), - ) - - -async def maybe_fill_null_segments( - shm: ShmArray, - timeframe: float, - get_hist: Callable, - sampler_stream: tractor.MsgStream, - mkt: MktPair, - - task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, - -) -> list[Frame]: - - null_segs_detected = trio.Event() - task_status.started(null_segs_detected) - - frame: Frame = shm.array - - null_segs: tuple | None = get_null_segs( - frame, - period=timeframe, - ) - for ( - absi_start, absi_end, - fi_start, fi_end, - start_t, end_t, - start_dt, end_dt, - ) in iter_null_segs( - null_segs=null_segs, - frame=frame, - timeframe=timeframe, - ): - - # XXX NOTE: ?if we get a badly ordered timestamp - # pair, immediately stop backfilling? - if ( - start_dt - and - end_dt < start_dt - ): - await tractor.pause() - break - - ( - array, - next_start_dt, - next_end_dt, - ) = await get_hist( - timeframe, - start_dt=start_dt, - end_dt=end_dt, - ) - - # XXX TODO: pretty sure if i plot tsla, btcusdt.binance - # and mnq.cme.ib this causes a Qt crash XXDDD - - # make sure we don't overrun the buffer start - len_to_push: int = min(absi_end, array.size) - to_push: np.ndarray = array[-len_to_push:] - - await shm_push_in_between( - shm, - to_push, - prepend_index=absi_end, - update_start_on_prepend=False, - ) - # TODO: UI side needs IPC event to update.. - # - make sure the UI actually always handles - # this update! - # - remember that in the display side, only refersh this - # if the respective history is actually "in view". - # loop - try: - await sampler_stream.send({ - 'broadcast_all': { - - # XXX NOTE XXX: see the - # `.ui._display.increment_history_view()` if block - # that looks for this info to FORCE a hard viz - # redraw! - 'backfilling': (mkt.fqme, timeframe), - }, - }) - except tractor.ContextCancelled: - # log.exception - await tractor.pause() - raise - - null_segs_detected.set() - # RECHECK for more null-gaps - frame: Frame = shm.array - null_segs: tuple | None = get_null_segs( - frame, - period=timeframe, - ) - if ( - null_segs - and - len(null_segs[-1]) - ): - ( - iabs_slices, - iabs_zero_rows, - zero_t, - ) = null_segs - log.warning( - f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n' - f'{pformat(iabs_slices)}' - ) - - # TODO: always backfill gaps with the earliest (price) datum's - # value to avoid the y-ranger including zeros and completely - # stretching the y-axis.. - # array: np.ndarray = shm.array - # zeros = array[array['low'] == 0] - ohlc_fields: list[str] = [ - 'open', - 'high', - 'low', - 'close', - ] - - for istart, istop in iabs_slices: - - # get view into buffer for null-segment - gap: np.ndarray = shm._array[istart:istop] - - # copy the oldest OHLC samples forward - cls: float = shm._array[istart]['close'] - - # TODO: how can we mark this range as being a gap tho? - # -[ ] maybe pg finally supports nulls in ndarray to - # show empty space somehow? - # -[ ] we could put a special value in the vlm or - # another col/field to denote? - gap[ohlc_fields] = cls - - start_t: float = shm._array[istart]['time'] - t_diff: float = (istop - istart)*timeframe - - gap['time'] = np.arange( - start=start_t, - stop=start_t + t_diff, - step=timeframe, - ) - - # TODO: reimpl using the new `.ui._remote_ctl` ctx - # ideally using some kinda decent - # tractory-reverse-lookup-connnection from some other - # `Context` type thingy? - await sampler_stream.send({ - 'broadcast_all': { - - # XXX NOTE XXX: see the - # `.ui._display.increment_history_view()` if block - # that looks for this info to FORCE a hard viz - # redraw! - 'backfilling': (mkt.fqme, timeframe), - }, - }) - - # TODO: interatively step through any remaining - # time-gaps/null-segments and spawn piecewise backfiller - # tasks in a nursery? - # -[ ] not sure that's going to work so well on the ib - # backend but worth a shot? - # -[ ] mk new history connections to make it properly - # parallel possible no matter the backend? - # -[ ] fill algo: do queries in alternating "latest, then - # earliest, then latest.. etc?" - - -async def start_backfill( - get_hist, - def_frame_duration: Duration, - mod: ModuleType, - mkt: MktPair, - shm: ShmArray, - timeframe: float, - - backfill_from_shm_index: int, - backfill_from_dt: datetime, - - sampler_stream: tractor.MsgStream, - - backfill_until_dt: datetime | None = None, - storage: StorageClient | None = None, - - write_tsdb: bool = True, - - task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, - -) -> int: - - # let caller unblock and deliver latest history frame - # and use to signal that backfilling the shm gap until - # the tsdb end is complete! - bf_done = trio.Event() - task_status.started(bf_done) - - # based on the sample step size, maybe load a certain amount history - update_start_on_prepend: bool = False - if backfill_until_dt is None: - - # TODO: per-provider default history-durations? - # -[ ] inside the `open_history_client()` config allow - # declaring the history duration limits instead of - # guessing and/or applying the same limits to all? - # - # -[ ] allow declaring (default) per-provider backfill - # limits inside a [storage] sub-section in conf.toml? - # - # NOTE, when no tsdb "last datum" is provided, we just - # load some near-term history by presuming a "decently - # large" 60s duration limit and a much shorter 1s range. - periods = { - 1: {'days': 2}, - 60: {'years': 6}, - } - period_duration: int = periods[timeframe] - update_start_on_prepend: bool = True - - # NOTE: manually set the "latest" datetime which we intend to - # backfill history "until" so as to adhere to the history - # settings above when the tsdb is detected as being empty. - backfill_until_dt = backfill_from_dt.subtract(**period_duration) - - # STAGE NOTE: "backward history gap filling": - # - we push to the shm buffer until we have history back - # until the latest entry loaded from the tsdb's table B) - # - after this loop continue to check for other gaps in the - # (tsdb) history and (at least report) maybe fill them - # from new frame queries to the backend? - last_start_dt: datetime = backfill_from_dt - next_prepend_index: int = backfill_from_shm_index - - while last_start_dt > backfill_until_dt: - log.info( - f'Requesting {timeframe}s frame:\n' - f'backfill_until_dt: {backfill_until_dt}\n' - f'last_start_dt: {last_start_dt}\n' - ) - try: - ( - array, - next_start_dt, - next_end_dt, - ) = await get_hist( - timeframe, - end_dt=last_start_dt, - ) - except NoData as _daterr: - orig_last_start_dt: datetime = last_start_dt - gap_report: str = ( - f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' - f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' - f'last_start_dt: {orig_last_start_dt}\n\n' - f'bf_until: {backfill_until_dt}\n' - ) - # EMPTY FRAME signal with 3 (likely) causes: - # - # 1. range contains legit gap in venue history - # 2. history actually (edge case) **began** at the - # value `last_start_dt` - # 3. some other unknown error (ib blocking the - # history-query bc they don't want you seeing how - # they cucked all the tinas.. like with options - # hist) - # - if def_frame_duration: - # decrement by a duration's (frame) worth of time - # as maybe indicated by the backend to see if we - # can get older data before this possible - # "history gap". - last_start_dt: datetime = last_start_dt.subtract( - seconds=def_frame_duration.total_seconds() - ) - gap_report += ( - f'Decrementing `end_dt` and retrying with,\n' - f'def_frame_duration: {def_frame_duration}\n' - f'(new) last_start_dt: {last_start_dt}\n' - ) - log.warning(gap_report) - # skip writing to shm/tsdb and try the next - # duration's worth of prior history. - continue - - else: - # await tractor.pause() - raise DataUnavailable(gap_report) - - # broker says there never was or is no more history to pull - except DataUnavailable as due: - message: str = due.args[0] - log.warning( - f'Provider {mod.name!r} halted backfill due to,\n\n' - - f'{message}\n' - - f'fqme: {mkt.fqme}\n' - f'timeframe: {timeframe}\n' - f'last_start_dt: {last_start_dt}\n' - f'bf_until: {backfill_until_dt}\n' - ) - # UGH: what's a better way? - # TODO: backends are responsible for being correct on - # this right!? - # -[ ] in the `ib` case we could maybe offer some way - # to halt the request loop until the condition is - # resolved or should the backend be entirely in - # charge of solving such faults? yes, right? - return - - time: np.ndarray = array['time'] - assert ( - time[0] - == - next_start_dt.timestamp() - ) - - assert time[-1] == next_end_dt.timestamp() - - expected_dur: Interval = last_start_dt - next_start_dt - - # frame's worth of sample-period-steps, in seconds - frame_size_s: float = len(array) * timeframe - recv_frame_dur: Duration = ( - from_timestamp(array[-1]['time']) - - - from_timestamp(array[0]['time']) - ) - if ( - (lt_frame := (recv_frame_dur < expected_dur)) - or - (null_frame := (frame_size_s == 0)) - # ^XXX, should NEVER hit now! - ): - # XXX: query result includes a start point prior to our - # expected "frame size" and thus is likely some kind of - # history gap (eg. market closed period, outage, etc.) - # so just report it to console for now. - if lt_frame: - reason = 'Possible GAP (or first-datum)' - else: - assert null_frame - reason = 'NULL-FRAME' - - missing_dur: Interval = expected_dur.end - recv_frame_dur.end - log.warning( - f'{timeframe}s-series {reason} detected!\n' - f'fqme: {mkt.fqme}\n' - f'last_start_dt: {last_start_dt}\n\n' - f'recv interval: {recv_frame_dur}\n' - f'expected interval: {expected_dur}\n\n' - - f'Missing duration of history of {missing_dur.in_words()!r}\n' - f'{missing_dur}\n' - ) - # await tractor.pause() - - to_push = diff_history( - array, - prepend_until_dt=backfill_until_dt, - ) - ln: int = len(to_push) - if ln: - log.info( - f'{ln} bars for {next_start_dt} -> {last_start_dt}' - ) - - else: - log.warning( - '0 BARS TO PUSH after diff!?\n' - f'{next_start_dt} -> {last_start_dt}' - ) - - # bail gracefully on shm allocation overrun/full - # condition - try: - await shm_push_in_between( - shm, - to_push, - prepend_index=next_prepend_index, - update_start_on_prepend=update_start_on_prepend, - ) - await sampler_stream.send({ - 'broadcast_all': { - 'backfilling': (mkt.fqme, timeframe), - }, - }) - - # decrement next prepend point - next_prepend_index = next_prepend_index - ln - last_start_dt = next_start_dt - - except ValueError as ve: - _ve = ve - log.error( - f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?' - ) - - if next_prepend_index < ln: - log.warning( - f'Shm buffer can only hold {next_prepend_index} more rows..\n' - f'Appending those from recent {ln}-sized frame, no more!' - ) - - to_push = to_push[-next_prepend_index + 1:] - await shm_push_in_between( - shm, - to_push, - prepend_index=next_prepend_index, - update_start_on_prepend=update_start_on_prepend, - ) - await sampler_stream.send({ - 'broadcast_all': { - 'backfilling': (mkt.fqme, timeframe), - }, - }) - - # can't push the entire frame? so - # push only the amount that can fit.. - break - - log.info( - f'Shm pushed {ln} frame:\n' - f'{next_start_dt} -> {last_start_dt}' - ) - - # FINALLY, maybe write immediately to the tsdb backend for - # long-term storage. - if ( - storage is not None - and - write_tsdb - ): - log.info( - f'Writing {ln} frame to storage:\n' - f'{next_start_dt} -> {last_start_dt}' - ) - - # NOTE, always drop the src asset token for - # non-currency-pair like market types (for now) - # - # THAT IS, for now our table key schema is NOT - # including the dst[/src] source asset token. SO, - # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for - # historical reasons ONLY. - if mkt.dst.atype not in { - 'crypto', - 'crypto_currency', - 'fiat', # a "forex pair" - 'perpetual_future', # stupid "perps" from cex land - }: - col_sym_key: str = mkt.get_fqme( - delim_char='', - without_src=True, - ) - else: - col_sym_key: str = mkt.get_fqme( - delim_char='', - ) - - await storage.write_ohlcv( - col_sym_key, - shm.array, - timeframe, - ) - df: pl.DataFrame = await storage.as_df( - fqme=mkt.fqme, - period=timeframe, - load_from_offline=False, - ) - ( - wdts, - deduped, - diff, - ) = dedupe(df) - # if diff: - # sort_diff(df) - - else: - # finally filled gap - log.info( - f'Finished filling gap to tsdb start @ {backfill_until_dt}!' - ) - - # XXX: extremely important, there can be no checkpoints - # in the block above to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - # await sampler_stream.send('broadcast_all') - - # short-circuit (for now) - bf_done.set() - - -# NOTE: originally this was used to cope with a tsdb (marketstore) -# which could not delivery very large frames of history over gRPC -# (thanks goolag) due to corruption issues. NOW, using apache -# parquet (by default in the local filesys) we don't have this -# requirement since the files can be loaded very quickly in -# entirety to memory via -async def back_load_from_tsdb( - storemod: ModuleType, - storage: StorageClient, - - fqme: str, - - tsdb_history: np.ndarray, - - last_tsdb_dt: datetime, - latest_start_dt: datetime, - latest_end_dt: datetime, - - bf_done: trio.Event, - - timeframe: int, - shm: ShmArray, -): - assert len(tsdb_history) - - # sync to backend history task's query/load completion - # if bf_done: - # await bf_done.wait() - - # TODO: eventually it'd be nice to not require a shm array/buffer - # to accomplish this.. maybe we can do some kind of tsdb direct to - # graphics format eventually in a child-actor? - if storemod.name == 'nativedb': - return - - await tractor.pause() - assert shm._first.value == 0 - - array = shm.array - - # if timeframe == 1: - # times = shm.array['time'] - # assert (times[1] - times[0]) == 1 - - if len(array): - shm_last_dt = from_timestamp( - shm.array[0]['time'] - ) - else: - shm_last_dt = None - - if last_tsdb_dt: - assert shm_last_dt >= last_tsdb_dt - - # do diff against start index of last frame of history and only - # fill in an amount of datums from tsdb allows for most recent - # to be loaded into mem *before* tsdb data. - if ( - last_tsdb_dt - and latest_start_dt - ): - backfilled_size_s: Duration = ( - latest_start_dt - last_tsdb_dt - ).seconds - # if the shm buffer len is not large enough to contain - # all missing data between the most recent backend-queried frame - # and the most recent dt-index in the db we warn that we only - # want to load a portion of the next tsdb query to fill that - # space. - log.info( - f'{backfilled_size_s} seconds worth of {timeframe}s loaded' - ) - - # Load TSDB history into shm buffer (for display) if there is - # remaining buffer space. - - time_key: str = 'time' - if getattr(storemod, 'ohlc_key_map', False): - keymap: bidict = storemod.ohlc_key_map - time_key: str = keymap.inverse['time'] - - # if ( - # not len(tsdb_history) - # ): - # return - - tsdb_last_frame_start: datetime = last_tsdb_dt - # load as much from storage into shm possible (depends on - # user's shm size settings). - while shm._first.value > 0: - - tsdb_history = await storage.read_ohlcv( - fqme, - timeframe=timeframe, - end=tsdb_last_frame_start, - ) - - # # empty query - # if not len(tsdb_history): - # break - - next_start = tsdb_history[time_key][0] - if next_start >= tsdb_last_frame_start: - # no earlier data detected - break - - else: - tsdb_last_frame_start = next_start - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = shm._first.value - - to_push = tsdb_history[-prepend_start:] - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=storemod.ohlc_key_map, - ) - - log.info(f'Loaded {to_push.shape} datums from storage') - tsdb_last_frame_start = tsdb_history[time_key][0] - - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - # await sampler_stream.send('broadcast_all') - - -async def push_latest_frame( - # box-type only that should get packed with the datetime - # objects received for the latest history frame - dt_eps: list[DateTime, DateTime], - shm: ShmArray, - get_hist: Callable[ - [int, datetime, datetime], - tuple[np.ndarray, str] - ], - timeframe: float, - config: dict, - - task_status: TaskStatus[ - Exception | list[datetime, datetime] - ] = trio.TASK_STATUS_IGNORED, - -) -> list[datetime, datetime] | None: - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - try: - ( - array, - mr_start_dt, - mr_end_dt, - ) = await get_hist( - timeframe, - end_dt=None, - ) - # so caller can access these ep values - dt_eps.extend([ - mr_start_dt, - mr_end_dt, - ]) - task_status.started(dt_eps) - - # XXX: timeframe not supported for backend (since - # above exception type), terminate immediately since - # there's no backfilling possible. - except DataUnavailable: - task_status.started(None) - - if timeframe > 1: - await tractor.pause() - - # prolly tf not supported - return None - - # NOTE: on the first history, most recent history - # frame we PREPEND from the current shm ._last index - # and thus a gap between the earliest datum loaded here - # and the latest loaded from the tsdb may exist! - log.info(f'Pushing {array.size} to shm!') - shm.push( - array, - prepend=True, # append on first frame - ) - - return dt_eps - - -async def load_tsdb_hist( - storage: StorageClient, - mkt: MktPair, - timeframe: float, - - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - -) -> tuple[ - np.ndarray, - DateTime, - DateTime, -] | None: - # loads a (large) frame of data from the tsdb depending - # on the db's query size limit; our "nativedb" (using - # parquet) generally can load the entire history into mem - # but if not then below the remaining history can be lazy - # loaded? - fqme: str = mkt.fqme - tsdb_entry: tuple[ - np.ndarray, - DateTime, - DateTime, - ] - try: - tsdb_entry: tuple | None = await storage.load( - fqme, - timeframe=timeframe, - ) - return tsdb_entry - - except TimeseriesNotFound: - log.warning( - f'No timeseries yet for {timeframe}@{fqme}' - ) - return None - - -async def tsdb_backfill( - mod: ModuleType, - storemod: ModuleType, - - storage: StorageClient, - mkt: MktPair, - shm: ShmArray, - timeframe: float, - - sampler_stream: tractor.MsgStream, - - task_status: TaskStatus[ - tuple[ShmArray, ShmArray] - ] = trio.TASK_STATUS_IGNORED, - -) -> None: - - if timeframe not in (1, 60): - raise ValueError( - '`piker` only needs to support 1m and 1s sampling ' - 'but ur api is trying to deliver a longer ' - f'timeframe of {timeframe} seconds..\n' - 'So yuh.. dun do dat brudder.' - ) - - get_hist: Callable[ - [int, datetime, datetime], - tuple[np.ndarray, str] - ] - config: dict[str, int] - async with ( - mod.open_history_client( - mkt, - ) as (get_hist, config), - - # NOTE: this sub-nursery splits to tasks for the given - # sampling rate to concurrently load offline tsdb - # timeseries as well as new data from the venue backend! - ): - log.info( - f'`{mod}` history client returned backfill config:\n' - f'{pformat(config)}\n' - ) - - # concurrently load the provider's most-recent-frame AND any - # pre-existing tsdb history already saved in `piker` storage. - dt_eps: list[DateTime, DateTime] = [] - async with ( - tractor.trionics.collapse_eg(), - trio.open_nursery() as tn - ): - tn.start_soon( - push_latest_frame, - dt_eps, - shm, - get_hist, - timeframe, - config, - ) - tsdb_entry: tuple = await load_tsdb_hist( - storage, - mkt, - timeframe, - ) - - # tell parent task to continue - # TODO: really we'd want this the other way with the - # tsdb load happening asap and the since the latest - # frame query will normally be the main source of - # latency? - task_status.started() - - # NOTE: iabs to start backfilling from, reverse chronological, - # ONLY AFTER the first history frame has been pushed to - # mem! - backfill_gap_from_shm_index: int = shm._first.value + 1 - - # Prepend any tsdb history into the rt-shm-buffer which - # should NOW be getting filled with the most recent history - # pulled from the data-backend. - if dt_eps: - # well then, unpack the latest (gap) backfilled frame dts - ( - mr_start_dt, - mr_end_dt, - ) = dt_eps - - first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds - calced_frame_size: Duration = mk_duration( - seconds=first_frame_dur_s, - ) - # NOTE, attempt to use the backend declared default frame - # sizing (as allowed by their time-series query APIs) and - # if not provided try to construct a default from the - # first frame received above. - def_frame_durs: dict[ - int, - Duration, - ]|None = config.get('frame_types', None) - - if def_frame_durs: - def_frame_size: Duration = def_frame_durs[timeframe] - - if def_frame_size != calced_frame_size: - log.warning( - f'Expected frame size {def_frame_size}\n' - f'Rxed frame {calced_frame_size}\n' - ) - # await tractor.pause() - else: - # use what we calced from first frame above. - def_frame_size = calced_frame_size - - # NOTE: when there's no offline data, there's 2 cases: - # - data backend doesn't support timeframe/sample - # period (in which case `dt_eps` should be `None` and - # we shouldn't be here!), or - # - no prior history has been stored (yet) and we need - # todo full backfill of the history now. - if tsdb_entry is None: - # indicate to backfill task to fill the whole - # shm buffer as much as it can! - last_tsdb_dt = None - - # there's existing tsdb history from (offline) storage - # so only backfill the gap between the - # most-recent-frame (mrf) and that latest sample. - else: - ( - tsdb_history, - first_tsdb_dt, - last_tsdb_dt, - ) = tsdb_entry - - # if there is a gap to backfill from the first - # history frame until the last datum loaded from the tsdb - # continue that now in the background - async with trio.open_nursery( - strict_exception_groups=False, - ) as tn: - - bf_done = await tn.start( - partial( - start_backfill, - get_hist=get_hist, - def_frame_duration=def_frame_size, - mod=mod, - mkt=mkt, - shm=shm, - timeframe=timeframe, - - backfill_from_shm_index=backfill_gap_from_shm_index, - backfill_from_dt=mr_start_dt, - - sampler_stream=sampler_stream, - backfill_until_dt=last_tsdb_dt, - - storage=storage, - write_tsdb=True, - ) - ) - nulls_detected: trio.Event | None = None - if last_tsdb_dt is not None: - # calc the index from which the tsdb data should be - # prepended, presuming there is a gap between the - # latest frame (loaded/read above) and the latest - # sample loaded from the tsdb. - backfill_diff: Duration = mr_start_dt - last_tsdb_dt - offset_s: float = backfill_diff.in_seconds() - - # XXX EDGE CASEs: the most recent frame overlaps with - # prior tsdb history!! - # - so the latest frame's start time is earlier then - # the tsdb's latest sample. - # - alternatively this may also more generally occur - # when the venue was closed (say over the weeknd) - # causing a timeseries gap, AND the query frames size - # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS - # GREATER THAN the current venue-market's operating - # session (time) we will receive datums from BEFORE THE - # CLOSURE GAP and thus the `offset_s` value will be - # NEGATIVE! In this case we need to ensure we don't try - # to push datums that have already been recorded in the - # tsdb. In this case we instead only retreive and push - # the series portion missing from the db's data set. - # if offset_s < 0: - # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt - # non_overlap_offset_s: float = backfill_diff.in_seconds() - - offset_samples: int = round(offset_s / timeframe) - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - if offset_s > 0: - # NOTE XXX: ONLY when there is an actual gap - # between the earliest sample in the latest history - # frame do we want to NOT stick the latest tsdb - # history adjacent to that latest frame! - prepend_start = shm._first.value - offset_samples + 1 - to_push = tsdb_history[-prepend_start:] - else: - # when there is overlap we want to remove the - # overlapping samples from the tsdb portion (taking - # instead the latest frame's values since THEY - # SHOULD BE THE SAME) and prepend DIRECTLY adjacent - # to the latest frame! - # TODO: assert the overlap segment array contains - # the same values!?! - prepend_start = shm._first.value - to_push = tsdb_history[-(shm._first.value):offset_samples - 1] - - # tsdb history is so far in the past we can't fit it in - # shm buffer space so simply don't load it! - if prepend_start > 0: - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - start=prepend_start, - field_map=storemod.ohlc_key_map, - ) - - log.info(f'Loaded {to_push.shape} datums from storage') - - # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any - # seemingly missing (null-time) segments.. - # TODO: ideally these can never exist! - # -[ ] somehow it seems sometimes we're writing zero-ed - # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this - # work PREVENTAVELY instead? - # -[ ] fill in non-zero epoch time values ALWAYS! - # await maybe_fill_null_segments( - nulls_detected: trio.Event = await tn.start(partial( - maybe_fill_null_segments, - - shm=shm, - timeframe=timeframe, - get_hist=get_hist, - sampler_stream=sampler_stream, - mkt=mkt, - )) - - # 2nd nursery END - - # TODO: who would want to? - if nulls_detected: - await nulls_detected.wait() - - await bf_done.wait() - # TODO: maybe start history anal and load missing "history - # gaps" via backend.. - - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. - - # backload any further data from tsdb (concurrently per - # timeframe) if not all data was able to be loaded (in memory) - # from the ``StorageClient.load()`` call above. - await trio.sleep_forever() - - # XXX NOTE: this is legacy from when we were using - # marketstore and we needed to continue backloading - # incrementally from the tsdb client.. (bc it couldn't - # handle a single large query with gRPC for some - # reason.. classic goolag pos) - # tn.start_soon( - # back_load_from_tsdb, - - # storemod, - # storage, - # fqme, - - # tsdb_history, - # last_tsdb_dt, - # mr_start_dt, - # mr_end_dt, - # bf_done, - - # timeframe, - # shm, - # ) - - -async def manage_history( - mod: ModuleType, - mkt: MktPair, - some_data_ready: trio.Event, - feed_is_live: trio.Event, - timeframe: float = 60, # in seconds - - task_status: TaskStatus[ - tuple[ShmArray, ShmArray] - ] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Load and manage historical data including the loading of any - available series from any connected tsdb as well as conduct - real-time update of both that existing db and the allocated - shared memory buffer. - - Init sequence: - - allocate shm (numpy array) buffers for 60s & 1s sample rates - - configure "zero index" for each buffer: the index where - history will prepended *to* and new live data will be - appened *from*. - - open a ``.storage.StorageClient`` and load any existing tsdb - history as well as (async) start a backfill task which loads - missing (newer) history from the data provider backend: - - tsdb history is loaded first and pushed to shm ASAP. - - the backfill task loads the most recent history before - unblocking its parent task, so that the `ShmArray._last` is - up to date to allow the OHLC sampler to begin writing new - samples as the correct buffer index once the provider feed - engages. - - ''' - # TODO: is there a way to make each shm file key - # actor-tree-discovery-addr unique so we avoid collisions - # when doing tests which also allocate shms for certain instruments - # that may be in use on the system by some other running daemons? - # from tractor._state import _runtime_vars - # port = _runtime_vars['_root_mailbox'][1] - - uid: tuple = tractor.current_actor().uid - name, uuid = uid - service: str = name.rstrip(f'.{mod.name}') - fqme: str = mkt.get_fqme(delim_char='') - - # (maybe) allocate shm array for this broker/symbol which will - # be used for fast near-term history capture and processing. - hist_shm, opened = maybe_open_shm_array( - size=_default_hist_size, - append_start_index=_hist_buffer_start, - - key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), - - # we expect the sub-actor to write - readonly=False, - ) - hist_zero_index = hist_shm.index - 1 - - # TODO: history validation - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) - - rt_shm, opened = maybe_open_shm_array( - size=_default_rt_size, - append_start_index=_rt_buffer_start, - key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), - - # we expect the sub-actor to write - readonly=False, - ) - - # (for now) set the rt (hft) shm array with space to prepend - # only a few days worth of 1s history. - days: int = 2 - start_index: int = days*_secs_in_day - rt_shm._first.value = start_index - rt_shm._last.value = start_index - rt_zero_index = rt_shm.index - 1 - - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) - - open_history_client = getattr( - mod, - 'open_history_client', - ) - assert open_history_client - - # TODO: maybe it should be a subpkg of `.data`? - from piker import storage - - async with ( - storage.open_storage_client() as (storemod, client), - - # NOTE: this nursery spawns a task per "timeframe" (aka - # sampling period) data set since normally differently - # sampled timeseries can be loaded / process independently - # ;) - tractor.trionics.collapse_eg(), - trio.open_nursery() as tn, - ): - log.info( - f'Connecting to storage backend `{storemod.name}`:\n' - f'location: {client.address}\n' - f'db cardinality: {client.cardinality}\n' - # TODO: show backend config, eg: - # - network settings - # - storage size with compression - # - number of loaded time series? - ) - - # NOTE: this call ONLY UNBLOCKS once the latest-most frame - # (i.e. history just before the live feed latest datum) of - # history has been loaded and written to the shm buffer: - # - the backfiller task can write in reverse chronological - # to the shm and tsdb - # - the tsdb data can be loaded immediately and the - # backfiller can do a single append from it's end datum and - # then prepends backward to that from the current time - # step. - tf2mem: dict = { - 1: rt_shm, - 60: hist_shm, - } - async with open_sample_stream( - period_s=1., - shms_by_period={ - 1.: rt_shm.token, - 60.: hist_shm.token, - }, - - # NOTE: we want to only open a stream for doing - # broadcasts on backfill operations, not receive the - # sample index-stream (since there's no code in this - # data feed layer that needs to consume it). - open_index_stream=True, - sub_for_broadcasts=False, - - ) as sample_stream: - # register 1s and 1m buffers with the global - # incrementer task - log.info(f'Connected to sampler stream: {sample_stream}') - - for timeframe in [60, 1]: - await tn.start(partial( - tsdb_backfill, - mod=mod, - storemod=storemod, - storage=client, - mkt=mkt, - shm=tf2mem[timeframe], - timeframe=timeframe, - sampler_stream=sample_stream, - )) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # wait for a live feed before starting the sampler. - await feed_is_live.wait() - - # yield back after client connect with filled shm - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) - - # history retreival loop depending on user interaction - # and thus a small RPC-prot for remotely controllinlg - # what data is loaded for viewing. - await trio.sleep_forever() - - -def iter_dfs_from_shms( - fqme: str -) -> Generator[ - tuple[Path, ShmArray, pl.DataFrame], - None, - None, -]: - # shm buffer size table based on known sample rates - sizes: dict[str, int] = { - 'hist': _default_hist_size, - 'rt': _default_rt_size, - } - - # load all detected shm buffer files which have the - # passed FQME pattern in the file name. - shmfiles: list[Path] = [] - shmdir = Path('/dev/shm/') - - for shmfile in shmdir.glob(f'*{fqme}*'): - filename: str = shmfile.name - - # skip index files - if ( - '_first' in filename - or '_last' in filename - ): - continue - - assert shmfile.is_file() - log.debug(f'Found matching shm buffer file: {filename}') - shmfiles.append(shmfile) - - for shmfile in shmfiles: - - # lookup array buffer size based on file suffix - # being either .rt or .hist - key: str = shmfile.name.rsplit('.')[-1] - - # skip FSP buffers for now.. - if key not in sizes: - continue - - size: int = sizes[key] - - # attach to any shm buffer, load array into polars df, - # write to local parquet file. - shm, opened = maybe_open_shm_array( - key=shmfile.name, - size=size, - dtype=def_iohlcv_fields, - readonly=True, - ) - assert not opened - ohlcv: np.ndarray = shm.array - df: pl.DataFrame = np2pl(ohlcv) - - yield ( - shmfile, - shm, - df, - ) diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py new file mode 100644 index 00000000..a4ee04c2 --- /dev/null +++ b/piker/tsp/_history.py @@ -0,0 +1,1471 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public +# License along with this program. If not, see +# . + +''' +Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for, + +- hi-level biz logics using the `.storage` subpkg APIs for (I/O) + orchestration and mgmt of tsdb data sets. +- core data-provider history backfilling middleware (as task-funcs) via + (what will eventually be `datad`, but are rn is the) `.brokers` backend + APIs. +- various data set cleaning, repairing and issue-detection/analysis + routines to ensure consistent series whether in shm or when + stored offline (in a tsdb). + +''' +from __future__ import annotations +from datetime import datetime +from functools import partial +from pathlib import Path +from pprint import pformat +from types import ModuleType +from typing import ( + Callable, + Generator, + TYPE_CHECKING, +) + +import trio +from trio_typing import TaskStatus +import tractor +from pendulum import ( + Interval, + DateTime, + Duration, + duration as mk_duration, + from_timestamp, +) +import numpy as np +import polars as pl + +from piker.brokers import NoData +from piker.accounting import ( + MktPair, +) +from piker.data._util import ( + log, +) +from ..data._sharedmem import ( + maybe_open_shm_array, + ShmArray, +) +from ..data._source import def_iohlcv_fields +from ..data._sampling import ( + open_sample_stream, +) + + +from piker.brokers._util import ( + DataUnavailable, +) +from piker.storage import TimeseriesNotFound +from ._anal import ( + + dedupe, + get_null_segs, + iter_null_segs, + Frame, + + # codec-ish + np2pl as np2pl, + + # `polars` specific + # with_dts, + # sort_diff, + + # TODO, use this to correct conc-issues during backfill? + # detect_price_gaps, +) + +if TYPE_CHECKING: + from bidict import bidict + from ..service.marketstore import StorageClient + # from .feed import _FeedsBus + + +# `ShmArray` buffer sizing configuration: +_mins_in_day = int(60 * 24) +# how much is probably dependent on lifestyle +# but we reco a buncha times (but only on a +# run-every-other-day kinda week). +_secs_in_day = int(60 * _mins_in_day) +_days_in_week: int = 7 + +_days_worth: int = 3 +_default_hist_size: int = 6 * 365 * _mins_in_day +_hist_buffer_start = int( + _default_hist_size - round(7 * _mins_in_day) +) + +_default_rt_size: int = _days_worth * _secs_in_day +# NOTE: start the append index in rt buffer such that 1 day's worth +# can be appenened before overrun. +_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) + + +def diff_history( + array: np.ndarray, + append_until_dt: datetime | None = None, + prepend_until_dt: datetime | None = None, + +) -> np.ndarray: + + # no diffing with tsdb dt index possible.. + if ( + prepend_until_dt is None + and append_until_dt is None + ): + return array + + times = array['time'] + + if append_until_dt: + return array[times < append_until_dt.timestamp()] + else: + return array[times >= prepend_until_dt.timestamp()] + + +# TODO: can't we just make this a sync func now? +async def shm_push_in_between( + shm: ShmArray, + to_push: np.ndarray, + prepend_index: int, + + update_start_on_prepend: bool = False, + +) -> int: + # XXX: extremely important, there can be no checkpoints + # in the body of this func to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + shm.push( + to_push, + prepend=True, + + # XXX: only update the ._first index if no tsdb + # segment was previously prepended by the + # parent task. + update_first=update_start_on_prepend, + + # XXX: only prepend from a manually calculated shm + # index if there was already a tsdb history + # segment prepended (since then the + # ._first.value is going to be wayyy in the + # past!) + start=( + prepend_index + if not update_start_on_prepend + else None + ), + ) + + +async def maybe_fill_null_segments( + shm: ShmArray, + timeframe: float, + get_hist: Callable, + sampler_stream: tractor.MsgStream, + mkt: MktPair, + + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, + +) -> list[Frame]: + + null_segs_detected = trio.Event() + task_status.started(null_segs_detected) + + frame: Frame = shm.array + + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + for ( + absi_start, absi_end, + fi_start, fi_end, + start_t, end_t, + start_dt, end_dt, + ) in iter_null_segs( + null_segs=null_segs, + frame=frame, + timeframe=timeframe, + ): + + # XXX NOTE: ?if we get a badly ordered timestamp + # pair, immediately stop backfilling? + if ( + start_dt + and + end_dt < start_dt + ): + await tractor.pause() + break + + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( + timeframe, + start_dt=start_dt, + end_dt=end_dt, + ) + + # XXX TODO: pretty sure if i plot tsla, btcusdt.binance + # and mnq.cme.ib this causes a Qt crash XXDDD + + # make sure we don't overrun the buffer start + len_to_push: int = min(absi_end, array.size) + to_push: np.ndarray = array[-len_to_push:] + + await shm_push_in_between( + shm, + to_push, + prepend_index=absi_end, + update_start_on_prepend=False, + ) + # TODO: UI side needs IPC event to update.. + # - make sure the UI actually always handles + # this update! + # - remember that in the display side, only refersh this + # if the respective history is actually "in view". + # loop + try: + await sampler_stream.send({ + 'broadcast_all': { + + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) + except tractor.ContextCancelled: + # log.exception + await tractor.pause() + raise + + null_segs_detected.set() + # RECHECK for more null-gaps + frame: Frame = shm.array + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + if ( + null_segs + and + len(null_segs[-1]) + ): + ( + iabs_slices, + iabs_zero_rows, + zero_t, + ) = null_segs + log.warning( + f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n' + f'{pformat(iabs_slices)}' + ) + + # TODO: always backfill gaps with the earliest (price) datum's + # value to avoid the y-ranger including zeros and completely + # stretching the y-axis.. + # array: np.ndarray = shm.array + # zeros = array[array['low'] == 0] + ohlc_fields: list[str] = [ + 'open', + 'high', + 'low', + 'close', + ] + + for istart, istop in iabs_slices: + + # get view into buffer for null-segment + gap: np.ndarray = shm._array[istart:istop] + + # copy the oldest OHLC samples forward + cls: float = shm._array[istart]['close'] + + # TODO: how can we mark this range as being a gap tho? + # -[ ] maybe pg finally supports nulls in ndarray to + # show empty space somehow? + # -[ ] we could put a special value in the vlm or + # another col/field to denote? + gap[ohlc_fields] = cls + + start_t: float = shm._array[istart]['time'] + t_diff: float = (istop - istart)*timeframe + + gap['time'] = np.arange( + start=start_t, + stop=start_t + t_diff, + step=timeframe, + ) + + # TODO: reimpl using the new `.ui._remote_ctl` ctx + # ideally using some kinda decent + # tractory-reverse-lookup-connnection from some other + # `Context` type thingy? + await sampler_stream.send({ + 'broadcast_all': { + + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # TODO: interatively step through any remaining + # time-gaps/null-segments and spawn piecewise backfiller + # tasks in a nursery? + # -[ ] not sure that's going to work so well on the ib + # backend but worth a shot? + # -[ ] mk new history connections to make it properly + # parallel possible no matter the backend? + # -[ ] fill algo: do queries in alternating "latest, then + # earliest, then latest.. etc?" + + +async def start_backfill( + get_hist, + def_frame_duration: Duration, + mod: ModuleType, + mkt: MktPair, + shm: ShmArray, + timeframe: float, + + backfill_from_shm_index: int, + backfill_from_dt: datetime, + + sampler_stream: tractor.MsgStream, + + backfill_until_dt: datetime | None = None, + storage: StorageClient | None = None, + + write_tsdb: bool = True, + + task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, + +) -> int: + + # let caller unblock and deliver latest history frame + # and use to signal that backfilling the shm gap until + # the tsdb end is complete! + bf_done = trio.Event() + task_status.started(bf_done) + + # based on the sample step size, maybe load a certain amount history + update_start_on_prepend: bool = False + if backfill_until_dt is None: + + # TODO: per-provider default history-durations? + # -[ ] inside the `open_history_client()` config allow + # declaring the history duration limits instead of + # guessing and/or applying the same limits to all? + # + # -[ ] allow declaring (default) per-provider backfill + # limits inside a [storage] sub-section in conf.toml? + # + # NOTE, when no tsdb "last datum" is provided, we just + # load some near-term history by presuming a "decently + # large" 60s duration limit and a much shorter 1s range. + periods = { + 1: {'days': 2}, + 60: {'years': 6}, + } + period_duration: int = periods[timeframe] + update_start_on_prepend: bool = True + + # NOTE: manually set the "latest" datetime which we intend to + # backfill history "until" so as to adhere to the history + # settings above when the tsdb is detected as being empty. + backfill_until_dt = backfill_from_dt.subtract(**period_duration) + + # STAGE NOTE: "backward history gap filling": + # - we push to the shm buffer until we have history back + # until the latest entry loaded from the tsdb's table B) + # - after this loop continue to check for other gaps in the + # (tsdb) history and (at least report) maybe fill them + # from new frame queries to the backend? + last_start_dt: datetime = backfill_from_dt + next_prepend_index: int = backfill_from_shm_index + + while last_start_dt > backfill_until_dt: + log.info( + f'Requesting {timeframe}s frame:\n' + f'backfill_until_dt: {backfill_until_dt}\n' + f'last_start_dt: {last_start_dt}\n' + ) + try: + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( + timeframe, + end_dt=last_start_dt, + ) + except NoData as _daterr: + orig_last_start_dt: datetime = last_start_dt + gap_report: str = ( + f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' + f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' + f'last_start_dt: {orig_last_start_dt}\n\n' + f'bf_until: {backfill_until_dt}\n' + ) + # EMPTY FRAME signal with 3 (likely) causes: + # + # 1. range contains legit gap in venue history + # 2. history actually (edge case) **began** at the + # value `last_start_dt` + # 3. some other unknown error (ib blocking the + # history-query bc they don't want you seeing how + # they cucked all the tinas.. like with options + # hist) + # + if def_frame_duration: + # decrement by a duration's (frame) worth of time + # as maybe indicated by the backend to see if we + # can get older data before this possible + # "history gap". + last_start_dt: datetime = last_start_dt.subtract( + seconds=def_frame_duration.total_seconds() + ) + gap_report += ( + f'Decrementing `end_dt` and retrying with,\n' + f'def_frame_duration: {def_frame_duration}\n' + f'(new) last_start_dt: {last_start_dt}\n' + ) + log.warning(gap_report) + # skip writing to shm/tsdb and try the next + # duration's worth of prior history. + continue + + else: + # await tractor.pause() + raise DataUnavailable(gap_report) + + # broker says there never was or is no more history to pull + except DataUnavailable as due: + message: str = due.args[0] + log.warning( + f'Provider {mod.name!r} halted backfill due to,\n\n' + + f'{message}\n' + + f'fqme: {mkt.fqme}\n' + f'timeframe: {timeframe}\n' + f'last_start_dt: {last_start_dt}\n' + f'bf_until: {backfill_until_dt}\n' + ) + # UGH: what's a better way? + # TODO: backends are responsible for being correct on + # this right!? + # -[ ] in the `ib` case we could maybe offer some way + # to halt the request loop until the condition is + # resolved or should the backend be entirely in + # charge of solving such faults? yes, right? + return + + time: np.ndarray = array['time'] + assert ( + time[0] + == + next_start_dt.timestamp() + ) + + assert time[-1] == next_end_dt.timestamp() + + expected_dur: Interval = last_start_dt - next_start_dt + + # frame's worth of sample-period-steps, in seconds + frame_size_s: float = len(array) * timeframe + recv_frame_dur: Duration = ( + from_timestamp(array[-1]['time']) + - + from_timestamp(array[0]['time']) + ) + if ( + (lt_frame := (recv_frame_dur < expected_dur)) + or + (null_frame := (frame_size_s == 0)) + # ^XXX, should NEVER hit now! + ): + # XXX: query result includes a start point prior to our + # expected "frame size" and thus is likely some kind of + # history gap (eg. market closed period, outage, etc.) + # so just report it to console for now. + if lt_frame: + reason = 'Possible GAP (or first-datum)' + else: + assert null_frame + reason = 'NULL-FRAME' + + missing_dur: Interval = expected_dur.end - recv_frame_dur.end + log.warning( + f'{timeframe}s-series {reason} detected!\n' + f'fqme: {mkt.fqme}\n' + f'last_start_dt: {last_start_dt}\n\n' + f'recv interval: {recv_frame_dur}\n' + f'expected interval: {expected_dur}\n\n' + + f'Missing duration of history of {missing_dur.in_words()!r}\n' + f'{missing_dur}\n' + ) + # await tractor.pause() + + to_push = diff_history( + array, + prepend_until_dt=backfill_until_dt, + ) + ln: int = len(to_push) + if ln: + log.info( + f'{ln} bars for {next_start_dt} -> {last_start_dt}' + ) + + else: + log.warning( + '0 BARS TO PUSH after diff!?\n' + f'{next_start_dt} -> {last_start_dt}' + ) + + # bail gracefully on shm allocation overrun/full + # condition + try: + await shm_push_in_between( + shm, + to_push, + prepend_index=next_prepend_index, + update_start_on_prepend=update_start_on_prepend, + ) + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # decrement next prepend point + next_prepend_index = next_prepend_index - ln + last_start_dt = next_start_dt + + except ValueError as ve: + _ve = ve + log.error( + f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?' + ) + + if next_prepend_index < ln: + log.warning( + f'Shm buffer can only hold {next_prepend_index} more rows..\n' + f'Appending those from recent {ln}-sized frame, no more!' + ) + + to_push = to_push[-next_prepend_index + 1:] + await shm_push_in_between( + shm, + to_push, + prepend_index=next_prepend_index, + update_start_on_prepend=update_start_on_prepend, + ) + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # can't push the entire frame? so + # push only the amount that can fit.. + break + + log.info( + f'Shm pushed {ln} frame:\n' + f'{next_start_dt} -> {last_start_dt}' + ) + + # FINALLY, maybe write immediately to the tsdb backend for + # long-term storage. + if ( + storage is not None + and + write_tsdb + ): + log.info( + f'Writing {ln} frame to storage:\n' + f'{next_start_dt} -> {last_start_dt}' + ) + + # NOTE, always drop the src asset token for + # non-currency-pair like market types (for now) + # + # THAT IS, for now our table key schema is NOT + # including the dst[/src] source asset token. SO, + # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for + # historical reasons ONLY. + if mkt.dst.atype not in { + 'crypto', + 'crypto_currency', + 'fiat', # a "forex pair" + 'perpetual_future', # stupid "perps" from cex land + }: + col_sym_key: str = mkt.get_fqme( + delim_char='', + without_src=True, + ) + else: + col_sym_key: str = mkt.get_fqme( + delim_char='', + ) + + await storage.write_ohlcv( + col_sym_key, + shm.array, + timeframe, + ) + df: pl.DataFrame = await storage.as_df( + fqme=mkt.fqme, + period=timeframe, + load_from_offline=False, + ) + ( + wdts, + deduped, + diff, + ) = dedupe(df) + # if diff: + # sort_diff(df) + + else: + # finally filled gap + log.info( + f'Finished filling gap to tsdb start @ {backfill_until_dt}!' + ) + + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + # await sampler_stream.send('broadcast_all') + + # short-circuit (for now) + bf_done.set() + + +# NOTE: originally this was used to cope with a tsdb (marketstore) +# which could not delivery very large frames of history over gRPC +# (thanks goolag) due to corruption issues. +# +# NOW, using apache parquet (by default in the local filesys) we +# don't have this requirement since the files can be loaded very +# quickly in entirety to memory via `polars.read_parquet()`. +# +async def back_load_from_tsdb( + storemod: ModuleType, + storage: StorageClient, + + fqme: str, + + tsdb_history: np.ndarray, + + last_tsdb_dt: datetime, + latest_start_dt: datetime, + latest_end_dt: datetime, + + bf_done: trio.Event, + + timeframe: int, + shm: ShmArray, +): + assert len(tsdb_history) + + # sync to backend history task's query/load completion + # if bf_done: + # await bf_done.wait() + + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? + if storemod.name == 'nativedb': + return + + await tractor.pause() + assert shm._first.value == 0 + + array = shm.array + + # if timeframe == 1: + # times = shm.array['time'] + # assert (times[1] - times[0]) == 1 + + if len(array): + shm_last_dt = from_timestamp( + shm.array[0]['time'] + ) + else: + shm_last_dt = None + + if last_tsdb_dt: + assert shm_last_dt >= last_tsdb_dt + + # do diff against start index of last frame of history and only + # fill in an amount of datums from tsdb allows for most recent + # to be loaded into mem *before* tsdb data. + if ( + last_tsdb_dt + and latest_start_dt + ): + backfilled_size_s: Duration = ( + latest_start_dt - last_tsdb_dt + ).seconds + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s loaded' + ) + + # Load TSDB history into shm buffer (for display) if there is + # remaining buffer space. + + time_key: str = 'time' + if getattr(storemod, 'ohlc_key_map', False): + keymap: bidict = storemod.ohlc_key_map + time_key: str = keymap.inverse['time'] + + # if ( + # not len(tsdb_history) + # ): + # return + + tsdb_last_frame_start: datetime = last_tsdb_dt + # load as much from storage into shm possible (depends on + # user's shm size settings). + while shm._first.value > 0: + + tsdb_history = await storage.read_ohlcv( + fqme, + timeframe=timeframe, + end=tsdb_last_frame_start, + ) + + # # empty query + # if not len(tsdb_history): + # break + + next_start = tsdb_history[time_key][0] + if next_start >= tsdb_last_frame_start: + # no earlier data detected + break + + else: + tsdb_last_frame_start = next_start + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + tsdb_last_frame_start = tsdb_history[time_key][0] + + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. + # await sampler_stream.send('broadcast_all') + + +async def push_latest_frame( + # box-type only that should get packed with the datetime + # objects received for the latest history frame + dt_eps: list[DateTime, DateTime], + shm: ShmArray, + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ], + timeframe: float, + config: dict, + + task_status: TaskStatus[ + Exception | list[datetime, datetime] + ] = trio.TASK_STATUS_IGNORED, + +) -> list[datetime, datetime] | None: + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + try: + ( + array, + mr_start_dt, + mr_end_dt, + ) = await get_hist( + timeframe, + end_dt=None, + ) + # so caller can access these ep values + dt_eps.extend([ + mr_start_dt, + mr_end_dt, + ]) + task_status.started(dt_eps) + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + except DataUnavailable: + task_status.started(None) + + if timeframe > 1: + await tractor.pause() + + # prolly tf not supported + return None + + # NOTE: on the first history, most recent history + # frame we PREPEND from the current shm ._last index + # and thus a gap between the earliest datum loaded here + # and the latest loaded from the tsdb may exist! + log.info(f'Pushing {array.size} to shm!') + shm.push( + array, + prepend=True, # append on first frame + ) + + return dt_eps + + +async def load_tsdb_hist( + storage: StorageClient, + mkt: MktPair, + timeframe: float, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> tuple[ + np.ndarray, + DateTime, + DateTime, +] | None: + # loads a (large) frame of data from the tsdb depending + # on the db's query size limit; our "nativedb" (using + # parquet) generally can load the entire history into mem + # but if not then below the remaining history can be lazy + # loaded? + fqme: str = mkt.fqme + tsdb_entry: tuple[ + np.ndarray, + DateTime, + DateTime, + ] + try: + tsdb_entry: tuple | None = await storage.load( + fqme, + timeframe=timeframe, + ) + return tsdb_entry + + except TimeseriesNotFound: + log.warning( + f'No timeseries yet for {timeframe}@{fqme}' + ) + return None + + +async def tsdb_backfill( + mod: ModuleType, + storemod: ModuleType, + + storage: StorageClient, + mkt: MktPair, + shm: ShmArray, + timeframe: float, + + sampler_stream: tractor.MsgStream, + + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + + if timeframe not in (1, 60): + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {timeframe} seconds..\n' + 'So yuh.. dun do dat brudder.' + ) + + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ] + config: dict[str, int] + async with ( + mod.open_history_client( + mkt, + ) as (get_hist, config), + + # NOTE: this sub-nursery splits to tasks for the given + # sampling rate to concurrently load offline tsdb + # timeseries as well as new data from the venue backend! + ): + log.info( + f'`{mod}` history client returned backfill config:\n' + f'{pformat(config)}\n' + ) + + # concurrently load the provider's most-recent-frame AND any + # pre-existing tsdb history already saved in `piker` storage. + dt_eps: list[DateTime, DateTime] = [] + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): + tn.start_soon( + push_latest_frame, + dt_eps, + shm, + get_hist, + timeframe, + config, + ) + tsdb_entry: tuple = await load_tsdb_hist( + storage, + mkt, + timeframe, + ) + + # tell parent task to continue + # TODO: really we'd want this the other way with the + # tsdb load happening asap and the since the latest + # frame query will normally be the main source of + # latency? + task_status.started() + + # NOTE: iabs to start backfilling from, reverse chronological, + # ONLY AFTER the first history frame has been pushed to + # mem! + backfill_gap_from_shm_index: int = shm._first.value + 1 + + # Prepend any tsdb history into the rt-shm-buffer which + # should NOW be getting filled with the most recent history + # pulled from the data-backend. + if dt_eps: + # well then, unpack the latest (gap) backfilled frame dts + ( + mr_start_dt, + mr_end_dt, + ) = dt_eps + + first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds + calced_frame_size: Duration = mk_duration( + seconds=first_frame_dur_s, + ) + # NOTE, attempt to use the backend declared default frame + # sizing (as allowed by their time-series query APIs) and + # if not provided try to construct a default from the + # first frame received above. + def_frame_durs: dict[ + int, + Duration, + ]|None = config.get('frame_types', None) + + if def_frame_durs: + def_frame_size: Duration = def_frame_durs[timeframe] + + if def_frame_size != calced_frame_size: + log.warning( + f'Expected frame size {def_frame_size}\n' + f'Rxed frame {calced_frame_size}\n' + ) + # await tractor.pause() + else: + # use what we calced from first frame above. + def_frame_size = calced_frame_size + + # NOTE: when there's no offline data, there's 2 cases: + # - data backend doesn't support timeframe/sample + # period (in which case `dt_eps` should be `None` and + # we shouldn't be here!), or + # - no prior history has been stored (yet) and we need + # todo full backfill of the history now. + if tsdb_entry is None: + # indicate to backfill task to fill the whole + # shm buffer as much as it can! + last_tsdb_dt = None + + # there's existing tsdb history from (offline) storage + # so only backfill the gap between the + # most-recent-frame (mrf) and that latest sample. + else: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry + + # if there is a gap to backfill from the first + # history frame until the last datum loaded from the tsdb + # continue that now in the background + async with trio.open_nursery( + strict_exception_groups=False, + ) as tn: + + bf_done = await tn.start( + partial( + start_backfill, + get_hist=get_hist, + def_frame_duration=def_frame_size, + mod=mod, + mkt=mkt, + shm=shm, + timeframe=timeframe, + + backfill_from_shm_index=backfill_gap_from_shm_index, + backfill_from_dt=mr_start_dt, + + sampler_stream=sampler_stream, + backfill_until_dt=last_tsdb_dt, + + storage=storage, + write_tsdb=True, + ) + ) + nulls_detected: trio.Event | None = None + if last_tsdb_dt is not None: + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() + + # XXX EDGE CASEs: the most recent frame overlaps with + # prior tsdb history!! + # - so the latest frame's start time is earlier then + # the tsdb's latest sample. + # - alternatively this may also more generally occur + # when the venue was closed (say over the weeknd) + # causing a timeseries gap, AND the query frames size + # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS + # GREATER THAN the current venue-market's operating + # session (time) we will receive datums from BEFORE THE + # CLOSURE GAP and thus the `offset_s` value will be + # NEGATIVE! In this case we need to ensure we don't try + # to push datums that have already been recorded in the + # tsdb. In this case we instead only retreive and push + # the series portion missing from the db's data set. + # if offset_s < 0: + # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt + # non_overlap_offset_s: float = backfill_diff.in_seconds() + + offset_samples: int = round(offset_s / timeframe) + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + if offset_s > 0: + # NOTE XXX: ONLY when there is an actual gap + # between the earliest sample in the latest history + # frame do we want to NOT stick the latest tsdb + # history adjacent to that latest frame! + prepend_start = shm._first.value - offset_samples + 1 + to_push = tsdb_history[-prepend_start:] + else: + # when there is overlap we want to remove the + # overlapping samples from the tsdb portion (taking + # instead the latest frame's values since THEY + # SHOULD BE THE SAME) and prepend DIRECTLY adjacent + # to the latest frame! + # TODO: assert the overlap segment array contains + # the same values!?! + prepend_start = shm._first.value + to_push = tsdb_history[-(shm._first.value):offset_samples - 1] + + # tsdb history is so far in the past we can't fit it in + # shm buffer space so simply don't load it! + if prepend_start > 0: + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any + # seemingly missing (null-time) segments.. + # TODO: ideally these can never exist! + # -[ ] somehow it seems sometimes we're writing zero-ed + # segments to tsdbs during teardown? + # -[ ] can we ensure that the backcfiller tasks do this + # work PREVENTAVELY instead? + # -[ ] fill in non-zero epoch time values ALWAYS! + # await maybe_fill_null_segments( + nulls_detected: trio.Event = await tn.start(partial( + maybe_fill_null_segments, + + shm=shm, + timeframe=timeframe, + get_hist=get_hist, + sampler_stream=sampler_stream, + mkt=mkt, + )) + + # 2nd nursery END + + # TODO: who would want to? + if nulls_detected: + await nulls_detected.wait() + + await bf_done.wait() + # TODO: maybe start history anal and load missing "history + # gaps" via backend.. + + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + + # backload any further data from tsdb (concurrently per + # timeframe) if not all data was able to be loaded (in memory) + # from the ``StorageClient.load()`` call above. + await trio.sleep_forever() + + # XXX NOTE: this is legacy from when we were using + # marketstore and we needed to continue backloading + # incrementally from the tsdb client.. (bc it couldn't + # handle a single large query with gRPC for some + # reason.. classic goolag pos) + # tn.start_soon( + # back_load_from_tsdb, + + # storemod, + # storage, + # fqme, + + # tsdb_history, + # last_tsdb_dt, + # mr_start_dt, + # mr_end_dt, + # bf_done, + + # timeframe, + # shm, + # ) + + +async def manage_history( + mod: ModuleType, + mkt: MktPair, + some_data_ready: trio.Event, + feed_is_live: trio.Event, + timeframe: float = 60, # in seconds + + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Load historical series data from offline-storage (tsdb) and any + missing new datums from data provider(s). + + This is the primary "backfilling service" `trio.Task` entrypoint + and conducts, + + - time-series retreival for offline-data previously stored in + any (connected) tsdb, + + - queries for missing new datums (compared with the latest found + from ^) onward to the present by pulling from available + `datad`-provider backends. + + - real-time update of both the existing tsdb-records and the + allocated shared-memory-buffer as required by downstream + `piker.data`-layer consumer-wares. + + Init sequence: + ------------- + - allocate shm (numpy array) buffers for 60s & 1s sample rates + - configure "zero index" for each buffer: the index where + history will prepended *to* and new live data will be + appened *from*. + - open a ``.storage.StorageClient`` and load any existing tsdb + history as well as (async) start a backfill task which loads + missing (newer) history from the data provider backend: + - tsdb history is loaded first and pushed to shm ASAP. + - the backfill task loads the most recent history before + unblocking its parent task, so that the `ShmArray._last` is + up to date to allow the OHLC sampler to begin writing new + samples as the correct buffer index once the provider feed + engages. + + ''' + # TODO: is there a way to make each shm file key + # actor-tree-discovery-addr unique so we avoid collisions + # when doing tests which also allocate shms for certain instruments + # that may be in use on the system by some other running daemons? + # from tractor._state import _runtime_vars + # port = _runtime_vars['_root_mailbox'][1] + + uid: tuple = tractor.current_actor().uid + name, uuid = uid + service: str = name.rstrip(f'.{mod.name}') + fqme: str = mkt.get_fqme(delim_char='') + + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + hist_shm, opened = maybe_open_shm_array( + size=_default_hist_size, + append_start_index=_hist_buffer_start, + + key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), + + # we expect the sub-actor to write + readonly=False, + ) + hist_zero_index = hist_shm.index - 1 + + # TODO: history validation + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) + + rt_shm, opened = maybe_open_shm_array( + size=_default_rt_size, + append_start_index=_rt_buffer_start, + key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), + + # we expect the sub-actor to write + readonly=False, + ) + + # (for now) set the rt (hft) shm array with space to prepend + # only a few days worth of 1s history. + days: int = 2 + start_index: int = days*_secs_in_day + rt_shm._first.value = start_index + rt_shm._last.value = start_index + rt_zero_index = rt_shm.index - 1 + + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) + + open_history_client = getattr( + mod, + 'open_history_client', + ) + assert open_history_client + + # TODO: maybe it should be a subpkg of `.data`? + from piker import storage + + async with ( + storage.open_storage_client() as (storemod, client), + + # NOTE: this nursery spawns a task per "timeframe" (aka + # sampling period) data set since normally differently + # sampled timeseries can be loaded / process independently + # ;) + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn, + ): + log.info( + f'Connecting to storage backend `{storemod.name}`:\n' + f'location: {client.address}\n' + f'db cardinality: {client.cardinality}\n' + # TODO: show backend config, eg: + # - network settings + # - storage size with compression + # - number of loaded time series? + ) + + # NOTE: this call ONLY UNBLOCKS once the latest-most frame + # (i.e. history just before the live feed latest datum) of + # history has been loaded and written to the shm buffer: + # - the backfiller task can write in reverse chronological + # to the shm and tsdb + # - the tsdb data can be loaded immediately and the + # backfiller can do a single append from it's end datum and + # then prepends backward to that from the current time + # step. + tf2mem: dict = { + 1: rt_shm, + 60: hist_shm, + } + async with open_sample_stream( + period_s=1., + shms_by_period={ + 1.: rt_shm.token, + 60.: hist_shm.token, + }, + + # NOTE: we want to only open a stream for doing + # broadcasts on backfill operations, not receive the + # sample index-stream (since there's no code in this + # data feed layer that needs to consume it). + open_index_stream=True, + sub_for_broadcasts=False, + + ) as sample_stream: + # register 1s and 1m buffers with the global + # incrementer task + log.info(f'Connected to sampler stream: {sample_stream}') + + for timeframe in [60, 1]: + await tn.start(partial( + tsdb_backfill, + mod=mod, + storemod=storemod, + storage=client, + mkt=mkt, + shm=tf2mem[timeframe], + timeframe=timeframe, + sampler_stream=sample_stream, + )) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # wait for a live feed before starting the sampler. + await feed_is_live.wait() + + # yield back after client connect with filled shm + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) + + # history retreival loop depending on user interaction + # and thus a small RPC-prot for remotely controllinlg + # what data is loaded for viewing. + await trio.sleep_forever() + + +def iter_dfs_from_shms( + fqme: str +) -> Generator[ + tuple[Path, ShmArray, pl.DataFrame], + None, + None, +]: + # shm buffer size table based on known sample rates + sizes: dict[str, int] = { + 'hist': _default_hist_size, + 'rt': _default_rt_size, + } + + # load all detected shm buffer files which have the + # passed FQME pattern in the file name. + shmfiles: list[Path] = [] + shmdir = Path('/dev/shm/') + + for shmfile in shmdir.glob(f'*{fqme}*'): + filename: str = shmfile.name + + # skip index files + if ( + '_first' in filename + or '_last' in filename + ): + continue + + assert shmfile.is_file() + log.debug(f'Found matching shm buffer file: {filename}') + shmfiles.append(shmfile) + + for shmfile in shmfiles: + + # lookup array buffer size based on file suffix + # being either .rt or .hist + key: str = shmfile.name.rsplit('.')[-1] + + # skip FSP buffers for now.. + if key not in sizes: + continue + + size: int = sizes[key] + + # attach to any shm buffer, load array into polars df, + # write to local parquet file. + shm, opened = maybe_open_shm_array( + key=shmfile.name, + size=size, + dtype=def_iohlcv_fields, + readonly=True, + ) + assert not opened + ohlcv: np.ndarray = shm.array + df: pl.DataFrame = np2pl(ohlcv) + + yield ( + shmfile, + shm, + df, + )