diff --git a/piker/data/history.py b/piker/data/history.py index aef3a15f..51e19c5a 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -24,7 +24,7 @@ from collections import ( ) from datetime import datetime from functools import partial -import time +# import time from types import ModuleType from typing import ( Callable, @@ -34,7 +34,10 @@ from typing import ( import trio from trio_typing import TaskStatus import tractor -import pendulum +from pendulum import ( + Duration, + from_timestamp, +) import numpy as np from ..accounting import ( @@ -64,113 +67,171 @@ if TYPE_CHECKING: def diff_history( array: np.ndarray, - timeframe: int, - start_dt: datetime, - end_dt: datetime, - last_tsdb_dt: datetime | None = None + # timeframe: int, + # start_dt: datetime, + # end_dt: datetime, + + append_until_dt: datetime | None = None, + prepend_until_dt: datetime | None = None, ) -> np.ndarray: # no diffing with tsdb dt index possible.. - if last_tsdb_dt is None: + if ( + prepend_until_dt is None + and append_until_dt is None + ): return array - time = array['time'] - return array[time > last_tsdb_dt.timestamp()] + times = array['time'] + + if append_until_dt: + return array[times < append_until_dt.timestamp()] + else: + return array[times >= prepend_until_dt.timestamp()] + + +# async def open_history_mngr( +# mod: ModuleType, +# mkt: MktPair, +# # shm: ShmArray, +# # timeframes: list[float] = [60, 1], +# timeframes: float, + +# ) -> Callable[ +# [int, datetime, datetime], +# tuple[np.ndarray, str] +# ]: +# ''' +# Open a "history manager" for the backend data provider, +# get the latest "frames worth" of ohlcv history, +# push the history to shm and deliver +# the start datum's datetime value so that further history loading +# can be done until synchronized with the tsdb loaded time series. + +# ''' +# hist: Callable[ +# [int, datetime, datetime], +# tuple[np.ndarray, str] +# ] +# config: dict[str, int] + +# async with mod.open_history_client( +# mkt, +# ) as (hist, config): +# log.info(f'{mod} history client returned backfill config: {config}') + +# # get latest query's worth of history all the way +# # back to what is recorded in the tsdb +# array, mr_start_dt, mr_end_dt = await hist( +# timeframe, +# end_dt=None, +# ) +# times: np.ndarray = array['time'] + +# # sample period step size in seconds +# step_size_s = ( +# from_timestamp(times[-1]) +# - from_timestamp(times[-2]) +# ).seconds + +# if step_size_s not in (1, 60): +# log.error(f'Last 2 sample period is off!? -> {step_size_s}') +# step_size_s = ( +# from_timestamp(times[-2]) +# - from_timestamp(times[-3]) +# ).seconds + +# # NOTE: on the first history, most recent history +# # frame we PREPEND from the current shm ._last index +# # and thus a gap between the earliest datum loaded here +# # and the latest loaded from the tsdb may exist! +# log.info(f'Pushing {to_push.size} to shm!') +# shm.push( +# to_push, +# prepend=True, +# # start= +# ) + + +# # if the market is open (aka we have a live feed) but the +# # history sample step index seems off we report the surrounding +# # data and drop into a bp. this case shouldn't really ever +# # happen if we're doing history retrieval correctly. +# # if ( +# # step_size_s == 60 +# # and feed_is_live.is_set() +# # ): +# # inow = round(time.time()) +# # diff = inow - times[-1] +# # if abs(diff) > 60: +# # surr = array[-6:] +# # diff_in_mins = round(diff/60., ndigits=2) +# # log.warning( +# # f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n' +# # f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' +# # 'Surrounding 6 time stamps:\n' +# # f'{list(surr["time"])}\n' +# # 'Here is surrounding 6 samples:\n' +# # f'{surr}\nn' +# # ) + +# # uncomment this for a hacker who wants to investigate +# # this case manually.. +# # await tractor.breakpoint() + +# # frame's worth of sample-period-steps, in seconds +# # frame_size_s = len(array) * step_size_s + +# to_push = array +# # to_push = diff_history( +# # array, +# # # timeframe, +# # # mr_start_dt, +# # # mr_end_dt, + +# # # backfill scenario for "most recent" frame +# # prepend_until_dt=last_tsdb_dt, +# # ) + +# # NOTE: on the first history, most recent history +# # frame we PREPEND from the current shm ._last index +# # and thus a gap between the earliest datum loaded here +# # and the latest loaded from the tsdb may exist! +# log.info(f'Pushing {to_push.size} to shm!') +# shm.push( +# to_push, +# prepend=True, +# # start= +# ) +# # TODO: should we wrap this in a "history frame" type or +# # something? +# yield hist, mr_start_dt, mr_end_dt async def start_backfill( + get_hist, mod: ModuleType, mkt: MktPair, shm: ShmArray, timeframe: float, - # sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, - last_tsdb_dt: datetime | None = None, + backfill_from_shm_index: int, + backfill_from_dt: datetime, + + sampler_stream: tractor.MsgStream, + + + backfill_until_dt: datetime | None = None, storage: StorageClient | None = None, + write_tsdb: bool = True, - tsdb_is_up: bool = True, + # tsdb_is_up: bool = True, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, ) -> int: - hist: Callable[ - [int, datetime, datetime], - tuple[np.ndarray, str] - ] - config: dict[str, int] - - async with mod.open_history_client( - mkt, - ) as (hist, config): - log.info(f'{mod} history client returned backfill config: {config}') - - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - array, start_dt, end_dt = await hist( - timeframe, - end_dt=None, - ) - times = array['time'] - - # sample period step size in seconds - step_size_s = ( - pendulum.from_timestamp(times[-1]) - - pendulum.from_timestamp(times[-2]) - ).seconds - - if step_size_s not in (1, 60): - log.error(f'Last 2 sample period is off!? -> {step_size_s}') - step_size_s = ( - pendulum.from_timestamp(times[-2]) - - pendulum.from_timestamp(times[-3]) - ).seconds - - # if the market is open (aka we have a live feed) but the - # history sample step index seems off we report the surrounding - # data and drop into a bp. this case shouldn't really ever - # happen if we're doing history retrieval correctly. - if ( - step_size_s == 60 - and feed_is_live.is_set() - ): - inow = round(time.time()) - diff = inow - times[-1] - if abs(diff) > 60: - surr = array[-6:] - diff_in_mins = round(diff/60., ndigits=2) - log.warning( - f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n' - f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' - 'Surrounding 6 time stamps:\n' - f'{list(surr["time"])}\n' - 'Here is surrounding 6 samples:\n' - f'{surr}\nn' - ) - - # uncomment this for a hacker who wants to investigate - # this case manually.. - # await tractor.breakpoint() - - # frame's worth of sample-period-steps, in seconds - frame_size_s = len(array) * step_size_s - - to_push = diff_history( - array, - timeframe, - start_dt, - end_dt, - last_tsdb_dt=last_tsdb_dt, - ) - - log.info(f'Pushing {to_push.size} to shm!') - shm.push( - to_push, - # prepend=True, - ) - # TODO: *** THIS IS A BUG *** # we need to only broadcast to subscribers for this fqme.. # otherwise all fsps get reset on every chart.. @@ -180,43 +241,45 @@ async def start_backfill( bf_done = trio.Event() # let caller unblock and deliver latest history frame - task_status.started(( - start_dt, - end_dt, + task_status.started( #( + # mr_start_dt, + # mr_end_dt, bf_done, - )) + )# ) # based on the sample step size, maybe load a certain amount history - if last_tsdb_dt is None: + update_start_on_prepend: bool = False + if backfill_until_dt is None: - if step_size_s not in (1, 60): + if timeframe not in (1, 60): raise ValueError( '`piker` only needs to support 1m and 1s sampling ' 'but ur api is trying to deliver a longer ' - f'timeframe of {step_size_s} seconds..\n' + f'timeframe of {timeframe} seconds..\n' 'So yuh.. dun do dat brudder.' ) # when no tsdb "last datum" is provided, we just load # some near-term history. + # periods = { + # 1: {'days': 1}, + # 60: {'days': 14}, + # } + + # if tsdb_is_up: + # do a decently sized backfill and load it into storage. periods = { - 1: {'days': 1}, - 60: {'days': 14}, + 1: {'days': 6}, + 60: {'years': 6}, } + period_duration: int = periods[timeframe] - if tsdb_is_up: - # do a decently sized backfill and load it into storage. - periods = { - 1: {'days': 6}, - 60: {'years': 6}, - } - - period_duration = periods[step_size_s] + update_start_on_prepend = True # NOTE: manually set the "latest" datetime which we intend to # backfill history "until" so as to adhere to the history # settings above when the tsdb is detected as being empty. - last_tsdb_dt = start_dt.subtract(**period_duration) + backfill_until_dt = backfill_from_dt.subtract(**period_duration) # configure async query throttling # rate = config.get('rate', 1) @@ -228,18 +291,39 @@ async def start_backfill( # per time stamp. starts: Counter[datetime] = Counter() - # inline sequential loop where we simply pass the - # last retrieved start dt to the next request as - # it's end dt. - while end_dt > last_tsdb_dt: + # conduct "backward history filling" since + # no tsdb history yet exists. + + # implemented via a simple inline sequential loop where we + # simply pass the last retrieved start dt to the next + # request as it's end dt. + # while end_dt < backfill_until_dt: + # while ( + # end_dt is None # init case + # or end_dt < mr_start_dt + # ): + + # conduct "forward filling" from the last time step + # loaded from the tsdb until the first step loaded + # just above + end_dt: datetime = backfill_from_dt + # start_dt: datetime = backfill_until_dt + next_prepend_index: int = backfill_from_shm_index + + while end_dt > backfill_until_dt: log.debug( - f'Requesting {step_size_s}s frame ending in {start_dt}' + f'Requesting {timeframe}s frame ending in {end_dt}' ) try: - array, next_start_dt, end_dt = await hist( + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( timeframe, - end_dt=start_dt, + end_dt=end_dt, + # start_dt=start_dt, ) # broker says there never was or is no more history to pull @@ -272,15 +356,17 @@ async def start_backfill( return # only update new start point if not-yet-seen - start_dt = next_start_dt + start_dt: datetime = next_start_dt starts[start_dt] += 1 assert array['time'][0] == start_dt.timestamp() diff = end_dt - start_dt frame_time_diff_s = diff.seconds - expected_frame_size_s = frame_size_s + step_size_s + # frame's worth of sample-period-steps, in seconds + frame_size_s = len(array) * timeframe + expected_frame_size_s = frame_size_s + timeframe if frame_time_diff_s > expected_frame_size_s: # XXX: query result includes a start point prior to our @@ -294,10 +380,10 @@ async def start_backfill( to_push = diff_history( array, - timeframe, - start_dt, - end_dt, - last_tsdb_dt=last_tsdb_dt, + # timeframe, + # start_dt, + # end_dt, + prepend_until_dt=backfill_until_dt, ) ln = len(to_push) if ln: @@ -314,11 +400,52 @@ async def start_backfill( shm.push( to_push, prepend=True, + + # XXX: only update the ._first index if no tsdb + # segment was previously prepended by the + # parent task. + update_first=update_start_on_prepend, + + # XXX: only prepend from a manually calculated shm + # index if there was already a tsdb history + # segment prepended (since then the + # ._first.value is going to be wayyy in the + # past!) + start=( + next_prepend_index + if not update_start_on_prepend + else None + ), ) - except ValueError: + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': True + }, + }) + + # decrement next prepend point + next_prepend_index = next_prepend_index - ln + end_dt = next_start_dt + + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + array = shm.array + zeros = array[array['low'] == 0] + if ( + 0 < zeros.size < 10 + ): + await tractor.breakpoint() + + + except ValueError as ve: + _ve = ve log.info( f'Shm buffer overrun on: {start_dt} -> {end_dt}?' ) + + await tractor.breakpoint() # can't push the entire frame? so # push only the amount that can fit.. break @@ -328,10 +455,12 @@ async def start_backfill( f'{start_dt} -> {end_dt}' ) + # FINALLY, maybe write immediately to the tsdb backend for + # long-term storage. if ( storage is not None and write_tsdb - # and False + and False ): log.info( f'Writing {ln} frame to storage:\n' @@ -372,19 +501,22 @@ def push_tsdb_history_to_shm( shm: ShmArray, tsdb_history: np.ndarray, time_field_key: str, + prepend: bool = False, + ) -> datetime: # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field prepend_start = shm._first.value + to_push = tsdb_history[-prepend_start:] shm.push( to_push, # insert the history pre a "days worth" of samples # to leave some real-time buffer space at the end. - prepend=True, + prepend=prepend, # update_first=False, # start=prepend_start, field_map=storemod.ohlc_key_map, @@ -392,7 +524,7 @@ def push_tsdb_history_to_shm( log.info(f'Loaded {to_push.shape} datums from storage') tsdb_last_frame_start = tsdb_history[time_field_key][0] - return pendulum.from_timestamp(tsdb_last_frame_start) + return from_timestamp(tsdb_last_frame_start) async def back_load_from_tsdb( @@ -435,7 +567,7 @@ async def back_load_from_tsdb( # assert (times[1] - times[0]) == 1 if len(array): - shm_last_dt = pendulum.from_timestamp( + shm_last_dt = from_timestamp( shm.array[0]['time'] ) else: @@ -525,12 +657,16 @@ async def back_load_from_tsdb( async def tsdb_backfill( mod: ModuleType, storemod: ModuleType, - bus: _FeedsBus, + # bus: _FeedsBus, + tn: trio.Nursery, storage: StorageClient, mkt: MktPair, - shms: dict[int, ShmArray], - # sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, + # shms: dict[int, ShmArray], + shm: ShmArray, + timeframe: float, + + sampler_stream: tractor.MsgStream, + # feed_is_live: trio.Event, task_status: TaskStatus[ tuple[ShmArray, ShmArray] @@ -540,18 +676,75 @@ async def tsdb_backfill( # TODO: this should be used verbatim for the pure # shm backfiller approach below. - dts_per_tf: dict[int, datetime] = {} + # dts_per_tf: dict[int, datetime] = {} fqme: str = mkt.fqme - time_key: str = 'time' - if getattr(storemod, 'ohlc_key_map', False): - keymap: bidict = storemod.ohlc_key_map - time_key: str = keymap.inverse['time'] + # time_key: str = 'time' + # if getattr(storemod, 'ohlc_key_map', False): + # keymap: bidict = storemod.ohlc_key_map + # time_key: str = keymap.inverse['time'] - # start history anal and load missing new data via backend. - last_tsdb_dt: datetime | None = None - timeframe: int # OHLC sample period - for timeframe, shm in shms.items(): + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ] + config: dict[str, int] + + async with mod.open_history_client( + mkt, + ) as (get_hist, config): + log.info(f'{mod} history client returned backfill config: {config}') + + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + try: + array, mr_start_dt, mr_end_dt = await get_hist( + timeframe, + end_dt=None, + ) + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + except DataUnavailable: + task_status.started() + return + + times: np.ndarray = array['time'] + + # sample period step size in seconds + step_size_s = ( + from_timestamp(times[-1]) + - from_timestamp(times[-2]) + ).seconds + + if step_size_s not in (1, 60): + log.error(f'Last 2 sample period is off!? -> {step_size_s}') + step_size_s = ( + from_timestamp(times[-2]) + - from_timestamp(times[-3]) + ).seconds + + # NOTE: on the first history, most recent history + # frame we PREPEND from the current shm ._last index + # and thus a gap between the earliest datum loaded here + # and the latest loaded from the tsdb may exist! + log.info(f'Pushing {array.size} to shm!') + shm.push( + array, + prepend=True, # append on first frame + # start= + ) + backfill_gap_from_shm_index: int = shm._first.value + 1 + + # tell parent task to continue + task_status.started() + + # start history anal and load missing new data via backend. + # backfill_until_dt: datetime | None = None + # started_after_tsdb_load: bool = False + + # for timeframe, shm in shms.items(): # loads a (large) frame of data from the tsdb depending # on the db's query size limit; our "nativedb" (using @@ -563,6 +756,7 @@ async def tsdb_backfill( timeframe=timeframe, ) + last_tsdb_dt: datetime | None = None if tsdb_entry: ( tsdb_history, @@ -570,106 +764,160 @@ async def tsdb_backfill( last_tsdb_dt, ) = tsdb_entry - tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( - storemod, + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() + offset_samples: int = round(offset_s / timeframe) + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value - offset_samples + 1 + + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( + # storemod, + # shm, + # tsdb_history, + # time_key, + # prepend=True, + # ) + # assert tsdb_last_frame_start == first_tsdb_dt + + # unblock the feed bus management task + # assert len(shms[1].array) + # if not started_after_tsdb_load: + # task_status.started() + # started_after_tsdb_load = True + + # begin backfiller task ASAP + # try: + + # if there is a gap to backfill from the first + # history frame until the last datum loaded from the tsdb + # continue that now in the background + # try: + # ( + # latest_start_dt, + # latest_end_dt, + bf_done = await tn.start( + partial( + start_backfill, + get_hist, + mod, + mkt, shm, - tsdb_history, - time_key, - ) - assert tsdb_last_frame_start == first_tsdb_dt - - # begin backfiller task ASAP - try: - ( - latest_start_dt, - latest_end_dt, - bf_done, - ) = await bus.nursery.start( - partial( - start_backfill, - mod, - mkt, - shm, - timeframe, - # sampler_stream, - feed_is_live, - - last_tsdb_dt=last_tsdb_dt, - tsdb_is_up=True, - storage=storage, - ) - ) - if tsdb_entry: - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) - except DataUnavailable: - # XXX: timeframe not supported for backend (since - # above exception type), so skip and move on to next. - continue - - # tsdb_history = series.get(timeframe) - - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. - - # unblock the feed bus management task - # assert len(shms[1].array) - task_status.started() - - # backload any further data from tsdb (concurrently per - # timeframe) if not all data was able to be loaded (in memory) - # from the ``StorageClient.load()`` call above. - async with trio.open_nursery() as nurse: - for timeframe, shm in shms.items(): - - entry = dts_per_tf.get(timeframe) - if not entry: - continue - - ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) = entry - - if not tsdb_history.size: - continue - - nurse.start_soon( - back_load_from_tsdb, - - storemod, - storage, - fqme, - - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - timeframe, - shm, - ) - # try: - # await trio.sleep_forever() - # finally: - # write_ohlcv + backfill_from_shm_index=backfill_gap_from_shm_index, + backfill_from_dt=mr_start_dt, + backfill_until_dt=last_tsdb_dt, + sampler_stream=sampler_stream, + + # feed_is_live, + + storage=storage, + # tsdb_is_up=True, + ) + ) + + # if tsdb_entry: + # dts_per_tf[timeframe] = ( + # tsdb_history, + # last_tsdb_dt, + # latest_start_dt, + # latest_end_dt, + # bf_done, + # ) + # elif not started_after_tsdb_load: + # task_status.started() + # started_after_tsdb_load = True + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + # except DataUnavailable: + # return + # continue + + # tsdb_history = series.get(timeframe) + + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + + # backload any further data from tsdb (concurrently per + # timeframe) if not all data was able to be loaded (in memory) + # from the ``StorageClient.load()`` call above. + try: + await trio.sleep_forever() + finally: + return + # write_ohlcv + + # IF we need to continue backloading incrementall from the + # tsdb client.. + tn.start_soon( + back_load_from_tsdb, + + storemod, + storage, + fqme, + + tsdb_history, + last_tsdb_dt, + mr_start_dt, + mr_end_dt, + bf_done, + + timeframe, + shm, + ) + # async with trio.open_nursery() as nurse: + # for timeframe, shm in shms.items(): + + # entry = dts_per_tf.get(timeframe) + # if not entry: + # continue + + # ( + # tsdb_history, + # last_tsdb_dt, + # latest_start_dt, + # latest_end_dt, + # bf_done, + # ) = entry + + # if not tsdb_history.size: + # continue + + + # try: + # await trio.sleep_forever() + # finally: + # write_ohlcv async def manage_history( @@ -773,7 +1021,10 @@ async def manage_history( # TODO: maybe it should be a subpkg of `.data`? from piker import storage - async with storage.open_storage_client() as (storemod, client): + async with ( + storage.open_storage_client() as (storemod, client), + trio.open_nursery() as tn, + ): log.info( f'Connecting to storage backend `{storemod.name}`:\n' f'location: {client.address}\n' @@ -793,30 +1044,10 @@ async def manage_history( # backfiller can do a single append from it's end datum and # then prepends backward to that from the current time # step. - await bus.nursery.start( - tsdb_backfill, - mod, - storemod, - bus, - client, - mkt, - { - 1: rt_shm, - 60: hist_shm, - }, - # sample_stream, - feed_is_live, - ) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # wait for a live feed before starting the sampler. - await feed_is_live.wait() - - # register 1s and 1m buffers with the global incrementer task + tf2mem: dict = { + 1: rt_shm, + 60: hist_shm, + } async with open_sample_stream( period_s=1., shms_by_period={ @@ -832,8 +1063,33 @@ async def manage_history( sub_for_broadcasts=False, ) as sample_stream: + # register 1s and 1m buffers with the global incrementer task log.info(f'Connected to sampler stream: {sample_stream}') + for timeframe in [60, 1]: + await tn.start( + tsdb_backfill, + mod, + storemod, + tn, + # bus, + client, + mkt, + tf2mem[timeframe], + timeframe, + + sample_stream, + # feed_is_live, + ) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # wait for a live feed before starting the sampler. + await feed_is_live.wait() + # yield back after client connect with filled shm task_status.started(( hist_zero_index,