From 0dcfcea6ee2ff8762f2b6abae322df982b25ce34 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 6 Jun 2023 23:59:59 -0400 Subject: [PATCH] Finally get partial backfills after tsdb load workinnn It took a little while (and a lot of commenting out of old no longer needed code) but, this gets tsdb (from parquet file) loading *before* final backfilling from the most recent history frame until the most recent tsdb time stamp! More or less all the convoluted concurrency shit we had for coping with `marketstore` IPC junk is no longer needed, particularly all the query size limits and accompanying load loops.. The recent frame loading technique/order *has* now changed though since we'd like to show charts asap once tsdb history loads. The new load sequence is as follows: - load mr (most recent) frame from backend. - load existing history (one shot) from the "tsdb" aka parquet files with `polars`. - backfill the gap part from the mr frame back to the tsdb start incrementally by making (hacky) `ShmArray.push(start=)` calls and *not* updating the `._first.value` while doing it XD Dirtier deatz: - make `tsdb_backfill()` run per timeframe in a separate task. - drop all the loop through timeframes and insert `dts_per_tf` crap. - only spawn a subtask for the `start_backfill()` call which in turn only does the gap backfilling as mentioned above. - mask out all the code related to being limited to certain query sizes (over gRPC) as was restricted by marketstore.. not gonna go through what all of that was since it's probably getting deleted in a follow up commit. - buncha off-by-one tweaks to do with backfilling the gap from mr frame to tsdb start.. mostly tinkered it to get it all right but seems to be working correctly B) - still use the `broadcast_all()` msg stuff when doing the gap backfill though don't have it really working yet on the UI side (since previously we were relying on the shm first/last values.. so this will be "coming soon" :) --- piker/data/history.py | 774 ++++++++++++++++++++++++++++-------------- 1 file changed, 515 insertions(+), 259 deletions(-) diff --git a/piker/data/history.py b/piker/data/history.py index aef3a15f..51e19c5a 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -24,7 +24,7 @@ from collections import ( ) from datetime import datetime from functools import partial -import time +# import time from types import ModuleType from typing import ( Callable, @@ -34,7 +34,10 @@ from typing import ( import trio from trio_typing import TaskStatus import tractor -import pendulum +from pendulum import ( + Duration, + from_timestamp, +) import numpy as np from ..accounting import ( @@ -64,113 +67,171 @@ if TYPE_CHECKING: def diff_history( array: np.ndarray, - timeframe: int, - start_dt: datetime, - end_dt: datetime, - last_tsdb_dt: datetime | None = None + # timeframe: int, + # start_dt: datetime, + # end_dt: datetime, + + append_until_dt: datetime | None = None, + prepend_until_dt: datetime | None = None, ) -> np.ndarray: # no diffing with tsdb dt index possible.. - if last_tsdb_dt is None: + if ( + prepend_until_dt is None + and append_until_dt is None + ): return array - time = array['time'] - return array[time > last_tsdb_dt.timestamp()] + times = array['time'] + + if append_until_dt: + return array[times < append_until_dt.timestamp()] + else: + return array[times >= prepend_until_dt.timestamp()] + + +# async def open_history_mngr( +# mod: ModuleType, +# mkt: MktPair, +# # shm: ShmArray, +# # timeframes: list[float] = [60, 1], +# timeframes: float, + +# ) -> Callable[ +# [int, datetime, datetime], +# tuple[np.ndarray, str] +# ]: +# ''' +# Open a "history manager" for the backend data provider, +# get the latest "frames worth" of ohlcv history, +# push the history to shm and deliver +# the start datum's datetime value so that further history loading +# can be done until synchronized with the tsdb loaded time series. + +# ''' +# hist: Callable[ +# [int, datetime, datetime], +# tuple[np.ndarray, str] +# ] +# config: dict[str, int] + +# async with mod.open_history_client( +# mkt, +# ) as (hist, config): +# log.info(f'{mod} history client returned backfill config: {config}') + +# # get latest query's worth of history all the way +# # back to what is recorded in the tsdb +# array, mr_start_dt, mr_end_dt = await hist( +# timeframe, +# end_dt=None, +# ) +# times: np.ndarray = array['time'] + +# # sample period step size in seconds +# step_size_s = ( +# from_timestamp(times[-1]) +# - from_timestamp(times[-2]) +# ).seconds + +# if step_size_s not in (1, 60): +# log.error(f'Last 2 sample period is off!? -> {step_size_s}') +# step_size_s = ( +# from_timestamp(times[-2]) +# - from_timestamp(times[-3]) +# ).seconds + +# # NOTE: on the first history, most recent history +# # frame we PREPEND from the current shm ._last index +# # and thus a gap between the earliest datum loaded here +# # and the latest loaded from the tsdb may exist! +# log.info(f'Pushing {to_push.size} to shm!') +# shm.push( +# to_push, +# prepend=True, +# # start= +# ) + + +# # if the market is open (aka we have a live feed) but the +# # history sample step index seems off we report the surrounding +# # data and drop into a bp. this case shouldn't really ever +# # happen if we're doing history retrieval correctly. +# # if ( +# # step_size_s == 60 +# # and feed_is_live.is_set() +# # ): +# # inow = round(time.time()) +# # diff = inow - times[-1] +# # if abs(diff) > 60: +# # surr = array[-6:] +# # diff_in_mins = round(diff/60., ndigits=2) +# # log.warning( +# # f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n' +# # f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' +# # 'Surrounding 6 time stamps:\n' +# # f'{list(surr["time"])}\n' +# # 'Here is surrounding 6 samples:\n' +# # f'{surr}\nn' +# # ) + +# # uncomment this for a hacker who wants to investigate +# # this case manually.. +# # await tractor.breakpoint() + +# # frame's worth of sample-period-steps, in seconds +# # frame_size_s = len(array) * step_size_s + +# to_push = array +# # to_push = diff_history( +# # array, +# # # timeframe, +# # # mr_start_dt, +# # # mr_end_dt, + +# # # backfill scenario for "most recent" frame +# # prepend_until_dt=last_tsdb_dt, +# # ) + +# # NOTE: on the first history, most recent history +# # frame we PREPEND from the current shm ._last index +# # and thus a gap between the earliest datum loaded here +# # and the latest loaded from the tsdb may exist! +# log.info(f'Pushing {to_push.size} to shm!') +# shm.push( +# to_push, +# prepend=True, +# # start= +# ) +# # TODO: should we wrap this in a "history frame" type or +# # something? +# yield hist, mr_start_dt, mr_end_dt async def start_backfill( + get_hist, mod: ModuleType, mkt: MktPair, shm: ShmArray, timeframe: float, - # sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, - last_tsdb_dt: datetime | None = None, + backfill_from_shm_index: int, + backfill_from_dt: datetime, + + sampler_stream: tractor.MsgStream, + + + backfill_until_dt: datetime | None = None, storage: StorageClient | None = None, + write_tsdb: bool = True, - tsdb_is_up: bool = True, + # tsdb_is_up: bool = True, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, ) -> int: - hist: Callable[ - [int, datetime, datetime], - tuple[np.ndarray, str] - ] - config: dict[str, int] - - async with mod.open_history_client( - mkt, - ) as (hist, config): - log.info(f'{mod} history client returned backfill config: {config}') - - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - array, start_dt, end_dt = await hist( - timeframe, - end_dt=None, - ) - times = array['time'] - - # sample period step size in seconds - step_size_s = ( - pendulum.from_timestamp(times[-1]) - - pendulum.from_timestamp(times[-2]) - ).seconds - - if step_size_s not in (1, 60): - log.error(f'Last 2 sample period is off!? -> {step_size_s}') - step_size_s = ( - pendulum.from_timestamp(times[-2]) - - pendulum.from_timestamp(times[-3]) - ).seconds - - # if the market is open (aka we have a live feed) but the - # history sample step index seems off we report the surrounding - # data and drop into a bp. this case shouldn't really ever - # happen if we're doing history retrieval correctly. - if ( - step_size_s == 60 - and feed_is_live.is_set() - ): - inow = round(time.time()) - diff = inow - times[-1] - if abs(diff) > 60: - surr = array[-6:] - diff_in_mins = round(diff/60., ndigits=2) - log.warning( - f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n' - f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' - 'Surrounding 6 time stamps:\n' - f'{list(surr["time"])}\n' - 'Here is surrounding 6 samples:\n' - f'{surr}\nn' - ) - - # uncomment this for a hacker who wants to investigate - # this case manually.. - # await tractor.breakpoint() - - # frame's worth of sample-period-steps, in seconds - frame_size_s = len(array) * step_size_s - - to_push = diff_history( - array, - timeframe, - start_dt, - end_dt, - last_tsdb_dt=last_tsdb_dt, - ) - - log.info(f'Pushing {to_push.size} to shm!') - shm.push( - to_push, - # prepend=True, - ) - # TODO: *** THIS IS A BUG *** # we need to only broadcast to subscribers for this fqme.. # otherwise all fsps get reset on every chart.. @@ -180,43 +241,45 @@ async def start_backfill( bf_done = trio.Event() # let caller unblock and deliver latest history frame - task_status.started(( - start_dt, - end_dt, + task_status.started( #( + # mr_start_dt, + # mr_end_dt, bf_done, - )) + )# ) # based on the sample step size, maybe load a certain amount history - if last_tsdb_dt is None: + update_start_on_prepend: bool = False + if backfill_until_dt is None: - if step_size_s not in (1, 60): + if timeframe not in (1, 60): raise ValueError( '`piker` only needs to support 1m and 1s sampling ' 'but ur api is trying to deliver a longer ' - f'timeframe of {step_size_s} seconds..\n' + f'timeframe of {timeframe} seconds..\n' 'So yuh.. dun do dat brudder.' ) # when no tsdb "last datum" is provided, we just load # some near-term history. + # periods = { + # 1: {'days': 1}, + # 60: {'days': 14}, + # } + + # if tsdb_is_up: + # do a decently sized backfill and load it into storage. periods = { - 1: {'days': 1}, - 60: {'days': 14}, + 1: {'days': 6}, + 60: {'years': 6}, } + period_duration: int = periods[timeframe] - if tsdb_is_up: - # do a decently sized backfill and load it into storage. - periods = { - 1: {'days': 6}, - 60: {'years': 6}, - } - - period_duration = periods[step_size_s] + update_start_on_prepend = True # NOTE: manually set the "latest" datetime which we intend to # backfill history "until" so as to adhere to the history # settings above when the tsdb is detected as being empty. - last_tsdb_dt = start_dt.subtract(**period_duration) + backfill_until_dt = backfill_from_dt.subtract(**period_duration) # configure async query throttling # rate = config.get('rate', 1) @@ -228,18 +291,39 @@ async def start_backfill( # per time stamp. starts: Counter[datetime] = Counter() - # inline sequential loop where we simply pass the - # last retrieved start dt to the next request as - # it's end dt. - while end_dt > last_tsdb_dt: + # conduct "backward history filling" since + # no tsdb history yet exists. + + # implemented via a simple inline sequential loop where we + # simply pass the last retrieved start dt to the next + # request as it's end dt. + # while end_dt < backfill_until_dt: + # while ( + # end_dt is None # init case + # or end_dt < mr_start_dt + # ): + + # conduct "forward filling" from the last time step + # loaded from the tsdb until the first step loaded + # just above + end_dt: datetime = backfill_from_dt + # start_dt: datetime = backfill_until_dt + next_prepend_index: int = backfill_from_shm_index + + while end_dt > backfill_until_dt: log.debug( - f'Requesting {step_size_s}s frame ending in {start_dt}' + f'Requesting {timeframe}s frame ending in {end_dt}' ) try: - array, next_start_dt, end_dt = await hist( + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( timeframe, - end_dt=start_dt, + end_dt=end_dt, + # start_dt=start_dt, ) # broker says there never was or is no more history to pull @@ -272,15 +356,17 @@ async def start_backfill( return # only update new start point if not-yet-seen - start_dt = next_start_dt + start_dt: datetime = next_start_dt starts[start_dt] += 1 assert array['time'][0] == start_dt.timestamp() diff = end_dt - start_dt frame_time_diff_s = diff.seconds - expected_frame_size_s = frame_size_s + step_size_s + # frame's worth of sample-period-steps, in seconds + frame_size_s = len(array) * timeframe + expected_frame_size_s = frame_size_s + timeframe if frame_time_diff_s > expected_frame_size_s: # XXX: query result includes a start point prior to our @@ -294,10 +380,10 @@ async def start_backfill( to_push = diff_history( array, - timeframe, - start_dt, - end_dt, - last_tsdb_dt=last_tsdb_dt, + # timeframe, + # start_dt, + # end_dt, + prepend_until_dt=backfill_until_dt, ) ln = len(to_push) if ln: @@ -314,11 +400,52 @@ async def start_backfill( shm.push( to_push, prepend=True, + + # XXX: only update the ._first index if no tsdb + # segment was previously prepended by the + # parent task. + update_first=update_start_on_prepend, + + # XXX: only prepend from a manually calculated shm + # index if there was already a tsdb history + # segment prepended (since then the + # ._first.value is going to be wayyy in the + # past!) + start=( + next_prepend_index + if not update_start_on_prepend + else None + ), ) - except ValueError: + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': True + }, + }) + + # decrement next prepend point + next_prepend_index = next_prepend_index - ln + end_dt = next_start_dt + + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + array = shm.array + zeros = array[array['low'] == 0] + if ( + 0 < zeros.size < 10 + ): + await tractor.breakpoint() + + + except ValueError as ve: + _ve = ve log.info( f'Shm buffer overrun on: {start_dt} -> {end_dt}?' ) + + await tractor.breakpoint() # can't push the entire frame? so # push only the amount that can fit.. break @@ -328,10 +455,12 @@ async def start_backfill( f'{start_dt} -> {end_dt}' ) + # FINALLY, maybe write immediately to the tsdb backend for + # long-term storage. if ( storage is not None and write_tsdb - # and False + and False ): log.info( f'Writing {ln} frame to storage:\n' @@ -372,19 +501,22 @@ def push_tsdb_history_to_shm( shm: ShmArray, tsdb_history: np.ndarray, time_field_key: str, + prepend: bool = False, + ) -> datetime: # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field prepend_start = shm._first.value + to_push = tsdb_history[-prepend_start:] shm.push( to_push, # insert the history pre a "days worth" of samples # to leave some real-time buffer space at the end. - prepend=True, + prepend=prepend, # update_first=False, # start=prepend_start, field_map=storemod.ohlc_key_map, @@ -392,7 +524,7 @@ def push_tsdb_history_to_shm( log.info(f'Loaded {to_push.shape} datums from storage') tsdb_last_frame_start = tsdb_history[time_field_key][0] - return pendulum.from_timestamp(tsdb_last_frame_start) + return from_timestamp(tsdb_last_frame_start) async def back_load_from_tsdb( @@ -435,7 +567,7 @@ async def back_load_from_tsdb( # assert (times[1] - times[0]) == 1 if len(array): - shm_last_dt = pendulum.from_timestamp( + shm_last_dt = from_timestamp( shm.array[0]['time'] ) else: @@ -525,12 +657,16 @@ async def back_load_from_tsdb( async def tsdb_backfill( mod: ModuleType, storemod: ModuleType, - bus: _FeedsBus, + # bus: _FeedsBus, + tn: trio.Nursery, storage: StorageClient, mkt: MktPair, - shms: dict[int, ShmArray], - # sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, + # shms: dict[int, ShmArray], + shm: ShmArray, + timeframe: float, + + sampler_stream: tractor.MsgStream, + # feed_is_live: trio.Event, task_status: TaskStatus[ tuple[ShmArray, ShmArray] @@ -540,18 +676,75 @@ async def tsdb_backfill( # TODO: this should be used verbatim for the pure # shm backfiller approach below. - dts_per_tf: dict[int, datetime] = {} + # dts_per_tf: dict[int, datetime] = {} fqme: str = mkt.fqme - time_key: str = 'time' - if getattr(storemod, 'ohlc_key_map', False): - keymap: bidict = storemod.ohlc_key_map - time_key: str = keymap.inverse['time'] + # time_key: str = 'time' + # if getattr(storemod, 'ohlc_key_map', False): + # keymap: bidict = storemod.ohlc_key_map + # time_key: str = keymap.inverse['time'] - # start history anal and load missing new data via backend. - last_tsdb_dt: datetime | None = None - timeframe: int # OHLC sample period - for timeframe, shm in shms.items(): + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ] + config: dict[str, int] + + async with mod.open_history_client( + mkt, + ) as (get_hist, config): + log.info(f'{mod} history client returned backfill config: {config}') + + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + try: + array, mr_start_dt, mr_end_dt = await get_hist( + timeframe, + end_dt=None, + ) + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + except DataUnavailable: + task_status.started() + return + + times: np.ndarray = array['time'] + + # sample period step size in seconds + step_size_s = ( + from_timestamp(times[-1]) + - from_timestamp(times[-2]) + ).seconds + + if step_size_s not in (1, 60): + log.error(f'Last 2 sample period is off!? -> {step_size_s}') + step_size_s = ( + from_timestamp(times[-2]) + - from_timestamp(times[-3]) + ).seconds + + # NOTE: on the first history, most recent history + # frame we PREPEND from the current shm ._last index + # and thus a gap between the earliest datum loaded here + # and the latest loaded from the tsdb may exist! + log.info(f'Pushing {array.size} to shm!') + shm.push( + array, + prepend=True, # append on first frame + # start= + ) + backfill_gap_from_shm_index: int = shm._first.value + 1 + + # tell parent task to continue + task_status.started() + + # start history anal and load missing new data via backend. + # backfill_until_dt: datetime | None = None + # started_after_tsdb_load: bool = False + + # for timeframe, shm in shms.items(): # loads a (large) frame of data from the tsdb depending # on the db's query size limit; our "nativedb" (using @@ -563,6 +756,7 @@ async def tsdb_backfill( timeframe=timeframe, ) + last_tsdb_dt: datetime | None = None if tsdb_entry: ( tsdb_history, @@ -570,106 +764,160 @@ async def tsdb_backfill( last_tsdb_dt, ) = tsdb_entry - tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( - storemod, + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() + offset_samples: int = round(offset_s / timeframe) + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value - offset_samples + 1 + + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( + # storemod, + # shm, + # tsdb_history, + # time_key, + # prepend=True, + # ) + # assert tsdb_last_frame_start == first_tsdb_dt + + # unblock the feed bus management task + # assert len(shms[1].array) + # if not started_after_tsdb_load: + # task_status.started() + # started_after_tsdb_load = True + + # begin backfiller task ASAP + # try: + + # if there is a gap to backfill from the first + # history frame until the last datum loaded from the tsdb + # continue that now in the background + # try: + # ( + # latest_start_dt, + # latest_end_dt, + bf_done = await tn.start( + partial( + start_backfill, + get_hist, + mod, + mkt, shm, - tsdb_history, - time_key, - ) - assert tsdb_last_frame_start == first_tsdb_dt - - # begin backfiller task ASAP - try: - ( - latest_start_dt, - latest_end_dt, - bf_done, - ) = await bus.nursery.start( - partial( - start_backfill, - mod, - mkt, - shm, - timeframe, - # sampler_stream, - feed_is_live, - - last_tsdb_dt=last_tsdb_dt, - tsdb_is_up=True, - storage=storage, - ) - ) - if tsdb_entry: - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) - except DataUnavailable: - # XXX: timeframe not supported for backend (since - # above exception type), so skip and move on to next. - continue - - # tsdb_history = series.get(timeframe) - - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. - - # unblock the feed bus management task - # assert len(shms[1].array) - task_status.started() - - # backload any further data from tsdb (concurrently per - # timeframe) if not all data was able to be loaded (in memory) - # from the ``StorageClient.load()`` call above. - async with trio.open_nursery() as nurse: - for timeframe, shm in shms.items(): - - entry = dts_per_tf.get(timeframe) - if not entry: - continue - - ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) = entry - - if not tsdb_history.size: - continue - - nurse.start_soon( - back_load_from_tsdb, - - storemod, - storage, - fqme, - - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - timeframe, - shm, - ) - # try: - # await trio.sleep_forever() - # finally: - # write_ohlcv + backfill_from_shm_index=backfill_gap_from_shm_index, + backfill_from_dt=mr_start_dt, + backfill_until_dt=last_tsdb_dt, + sampler_stream=sampler_stream, + + # feed_is_live, + + storage=storage, + # tsdb_is_up=True, + ) + ) + + # if tsdb_entry: + # dts_per_tf[timeframe] = ( + # tsdb_history, + # last_tsdb_dt, + # latest_start_dt, + # latest_end_dt, + # bf_done, + # ) + # elif not started_after_tsdb_load: + # task_status.started() + # started_after_tsdb_load = True + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + # except DataUnavailable: + # return + # continue + + # tsdb_history = series.get(timeframe) + + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + + # backload any further data from tsdb (concurrently per + # timeframe) if not all data was able to be loaded (in memory) + # from the ``StorageClient.load()`` call above. + try: + await trio.sleep_forever() + finally: + return + # write_ohlcv + + # IF we need to continue backloading incrementall from the + # tsdb client.. + tn.start_soon( + back_load_from_tsdb, + + storemod, + storage, + fqme, + + tsdb_history, + last_tsdb_dt, + mr_start_dt, + mr_end_dt, + bf_done, + + timeframe, + shm, + ) + # async with trio.open_nursery() as nurse: + # for timeframe, shm in shms.items(): + + # entry = dts_per_tf.get(timeframe) + # if not entry: + # continue + + # ( + # tsdb_history, + # last_tsdb_dt, + # latest_start_dt, + # latest_end_dt, + # bf_done, + # ) = entry + + # if not tsdb_history.size: + # continue + + + # try: + # await trio.sleep_forever() + # finally: + # write_ohlcv async def manage_history( @@ -773,7 +1021,10 @@ async def manage_history( # TODO: maybe it should be a subpkg of `.data`? from piker import storage - async with storage.open_storage_client() as (storemod, client): + async with ( + storage.open_storage_client() as (storemod, client), + trio.open_nursery() as tn, + ): log.info( f'Connecting to storage backend `{storemod.name}`:\n' f'location: {client.address}\n' @@ -793,30 +1044,10 @@ async def manage_history( # backfiller can do a single append from it's end datum and # then prepends backward to that from the current time # step. - await bus.nursery.start( - tsdb_backfill, - mod, - storemod, - bus, - client, - mkt, - { - 1: rt_shm, - 60: hist_shm, - }, - # sample_stream, - feed_is_live, - ) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # wait for a live feed before starting the sampler. - await feed_is_live.wait() - - # register 1s and 1m buffers with the global incrementer task + tf2mem: dict = { + 1: rt_shm, + 60: hist_shm, + } async with open_sample_stream( period_s=1., shms_by_period={ @@ -832,8 +1063,33 @@ async def manage_history( sub_for_broadcasts=False, ) as sample_stream: + # register 1s and 1m buffers with the global incrementer task log.info(f'Connected to sampler stream: {sample_stream}') + for timeframe in [60, 1]: + await tn.start( + tsdb_backfill, + mod, + storemod, + tn, + # bus, + client, + mkt, + tf2mem[timeframe], + timeframe, + + sample_stream, + # feed_is_live, + ) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # wait for a live feed before starting the sampler. + await feed_is_live.wait() + # yield back after client connect with filled shm task_status.started(( hist_zero_index,