diff --git a/piker/data/feed.py b/piker/data/feed.py index 91793440..0cfdb848 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -340,7 +340,7 @@ async def allocate_persistent_feed( # yield back control to starting nursery once we receive either # some history or a real-time quote. - log.info(f'waiting on history to load: {fqme}') + log.info(f'loading OHLCV history: {fqme}') await some_data_ready.wait() flume = Flume( @@ -370,7 +370,8 @@ async def allocate_persistent_feed( mkt.bs_fqme: flume, }) - # signal the ``open_feed_bus()`` caller task to continue + # signal the ``open_feed_bus()`` caller task to continue since + # we now have (some) history pushed to the shm buffer. task_status.started(init) if not start_stream: diff --git a/piker/data/history.py b/piker/data/history.py index 4a0ab29b..aef3a15f 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -57,6 +57,7 @@ from ..brokers._util import ( ) if TYPE_CHECKING: + from bidict import bidict from ..service.marketstore import StorageClient from .feed import _FeedsBus @@ -83,13 +84,13 @@ async def start_backfill( mkt: MktPair, shm: ShmArray, timeframe: float, - sampler_stream: tractor.MsgStream, + # sampler_stream: tractor.MsgStream, feed_is_live: trio.Event, last_tsdb_dt: datetime | None = None, storage: StorageClient | None = None, write_tsdb: bool = True, - tsdb_is_up: bool = False, + tsdb_is_up: bool = True, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, @@ -120,6 +121,13 @@ async def start_backfill( - pendulum.from_timestamp(times[-2]) ).seconds + if step_size_s not in (1, 60): + log.error(f'Last 2 sample period is off!? -> {step_size_s}') + step_size_s = ( + pendulum.from_timestamp(times[-2]) + - pendulum.from_timestamp(times[-3]) + ).seconds + # if the market is open (aka we have a live feed) but the # history sample step index seems off we report the surrounding # data and drop into a bp. this case shouldn't really ever @@ -158,12 +166,15 @@ async def start_backfill( ) log.info(f'Pushing {to_push.size} to shm!') - shm.push(to_push, prepend=True) + shm.push( + to_push, + # prepend=True, + ) # TODO: *** THIS IS A BUG *** # we need to only broadcast to subscribers for this fqme.. # otherwise all fsps get reset on every chart.. - await sampler_stream.send('broadcast_all') + # await sampler_stream.send('broadcast_all') # signal that backfilling to tsdb's end datum is complete bf_done = trio.Event() @@ -297,9 +308,13 @@ async def start_backfill( f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' ) - # bail gracefully on shm allocation overrun/full condition + # bail gracefully on shm allocation overrun/full + # condition try: - shm.push(to_push, prepend=True) + shm.push( + to_push, + prepend=True, + ) except ValueError: log.info( f'Shm buffer overrun on: {start_dt} -> {end_dt}?' @@ -316,6 +331,7 @@ async def start_backfill( if ( storage is not None and write_tsdb + # and False ): log.info( f'Writing {ln} frame to storage:\n' @@ -334,7 +350,7 @@ async def start_backfill( await storage.write_ohlcv( col_sym_key, - to_push, + shm.array, timeframe, ) @@ -345,44 +361,165 @@ async def start_backfill( # in the block above to avoid entering new ``frames`` # values while we're pipelining the current ones to # memory... - await sampler_stream.send('broadcast_all') + # await sampler_stream.send('broadcast_all') # short-circuit (for now) bf_done.set() -async def basic_backfill( - bus: _FeedsBus, - mod: ModuleType, - mkt: MktPair, - shms: dict[int, ShmArray], - sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, +def push_tsdb_history_to_shm( + storemod: ModuleType, + shm: ShmArray, + tsdb_history: np.ndarray, + time_field_key: str, +) -> datetime: -) -> None: + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, - # do a legacy incremental backfill from the provider. - log.info('No TSDB (marketstored) found, doing basic backfill..') + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=storemod.ohlc_key_map, + ) - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - for timeframe, shm in shms.items(): - try: - await bus.nursery.start( - partial( - start_backfill, - mod, - mkt, - shm, - timeframe, - sampler_stream, - feed_is_live, - ) - ) - except DataUnavailable: - # XXX: timeframe not supported for backend - continue + log.info(f'Loaded {to_push.shape} datums from storage') + tsdb_last_frame_start = tsdb_history[time_field_key][0] + return pendulum.from_timestamp(tsdb_last_frame_start) + + +async def back_load_from_tsdb( + storemod: ModuleType, + storage: StorageClient, + + fqme: str, + # dts_per_tf: dict[int, datetime], + + tsdb_history: np.ndarray, + + last_tsdb_dt: datetime, + latest_start_dt: datetime, + latest_end_dt: datetime, + + bf_done: trio.Event, + + timeframe: int, + shm: ShmArray, +): + assert len(tsdb_history) + + # sync to backend history task's query/load completion + # if bf_done: + # await bf_done.wait() + + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? + if storemod.name == 'nativedb': + return + + await tractor.breakpoint() + assert shm._first.value == 0 + + array = shm.array + + # if timeframe == 1: + # times = shm.array['time'] + # assert (times[1] - times[0]) == 1 + + if len(array): + shm_last_dt = pendulum.from_timestamp( + shm.array[0]['time'] + ) + else: + shm_last_dt = None + + if last_tsdb_dt: + assert shm_last_dt >= last_tsdb_dt + + # do diff against start index of last frame of history and only + # fill in an amount of datums from tsdb allows for most recent + # to be loaded into mem *before* tsdb data. + if ( + last_tsdb_dt + and latest_start_dt + ): + backfilled_size_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s loaded' + ) + + # Load TSDB history into shm buffer (for display) if there is + # remaining buffer space. + + time_key: str = 'time' + if getattr(storemod, 'ohlc_key_map', False): + keymap: bidict = storemod.ohlc_key_map + time_key: str = keymap.inverse['time'] + + # if ( + # not len(tsdb_history) + # ): + # return + + tsdb_last_frame_start: datetime = last_tsdb_dt + # load as much from storage into shm possible (depends on + # user's shm size settings). + while shm._first.value > 0: + + tsdb_history = await storage.read_ohlcv( + fqme, + timeframe=timeframe, + end=tsdb_last_frame_start, + ) + + # # empty query + # if not len(tsdb_history): + # break + + next_start = tsdb_history[time_key][0] + if next_start >= tsdb_last_frame_start: + # no earlier data detected + break + + else: + tsdb_last_frame_start = next_start + + tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( + storemod, + shm, + tsdb_history, + time_key, + ) + + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. + # await sampler_stream.send('broadcast_all') + + # TODO: write new data to tsdb to be ready to for next read. async def tsdb_backfill( @@ -392,7 +529,7 @@ async def tsdb_backfill( storage: StorageClient, mkt: MktPair, shms: dict[int, ShmArray], - sampler_stream: tractor.MsgStream, + # sampler_stream: tractor.MsgStream, feed_is_live: trio.Event, task_status: TaskStatus[ @@ -406,16 +543,42 @@ async def tsdb_backfill( dts_per_tf: dict[int, datetime] = {} fqme: str = mkt.fqme + time_key: str = 'time' + if getattr(storemod, 'ohlc_key_map', False): + keymap: bidict = storemod.ohlc_key_map + time_key: str = keymap.inverse['time'] + # start history anal and load missing new data via backend. - timeframe: int + last_tsdb_dt: datetime | None = None + timeframe: int # OHLC sample period for timeframe, shm in shms.items(): + # loads a (large) frame of data from the tsdb depending - # on the db's query size limit. - tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( + # on the db's query size limit; our "nativedb" (using + # parquet) generally can load the entire history into mem + # but if not then below the remaining history can be lazy + # loaded? + tsdb_entry: tuple | None = await storage.load( fqme, timeframe=timeframe, ) + if tsdb_entry: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry + + tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( + storemod, + shm, + tsdb_history, + time_key, + ) + assert tsdb_last_frame_start == first_tsdb_dt + + # begin backfiller task ASAP try: ( latest_start_dt, @@ -428,7 +591,7 @@ async def tsdb_backfill( mkt, shm, timeframe, - sampler_stream, + # sampler_stream, feed_is_live, last_tsdb_dt=last_tsdb_dt, @@ -436,25 +599,20 @@ async def tsdb_backfill( storage=storage, ) ) + if tsdb_entry: + dts_per_tf[timeframe] = ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) except DataUnavailable: - # XXX: timeframe not supported for backend - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - None, - None, - None, - ) + # XXX: timeframe not supported for backend (since + # above exception type), so skip and move on to next. continue # tsdb_history = series.get(timeframe) - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) # if len(hist_shm.array) < 2: # TODO: there's an edge case here to solve where if the last @@ -470,143 +628,49 @@ async def tsdb_backfill( # assert len(shms[1].array) task_status.started() - async def back_load_from_tsdb( - timeframe: int, - shm: ShmArray, - ): - ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) = dts_per_tf[timeframe] - - # sync to backend history task's query/load completion - if bf_done: - await bf_done.wait() - - # TODO: eventually it'd be nice to not require a shm array/buffer - # to accomplish this.. maybe we can do some kind of tsdb direct to - # graphics format eventually in a child-actor? - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = shm._first.value - array = shm.array - if len(array): - shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) - else: - shm_last_dt = None - - if last_tsdb_dt: - assert shm_last_dt >= last_tsdb_dt - - # do diff against start index of last frame of history and only - # fill in an amount of datums from tsdb allows for most recent - # to be loaded into mem *before* tsdb data. - if ( - last_tsdb_dt - and latest_start_dt - ): - backfilled_size_s = ( - latest_start_dt - last_tsdb_dt - ).seconds - # if the shm buffer len is not large enough to contain - # all missing data between the most recent backend-queried frame - # and the most recent dt-index in the db we warn that we only - # want to load a portion of the next tsdb query to fill that - # space. - log.info( - f'{backfilled_size_s} seconds worth of {timeframe}s loaded' - ) - - # Load TSDB history into shm buffer (for display) if there is - # remaining buffer space. - - if ( - len(tsdb_history) - ): - # load the first (smaller) bit of history originally loaded - # above from ``StorageClient.load()``. - to_push = tsdb_history[-prepend_start:] - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=storemod.ohlc_key_map, - ) - - tsdb_last_frame_start = tsdb_history['Epoch'][0] - - if timeframe == 1: - times = shm.array['time'] - assert (times[1] - times[0]) == 1 - - # load as much from storage into shm possible (depends on - # user's shm size settings). - while shm._first.value > 0: - - tsdb_history = await storage.read_ohlcv( - fqme, - timeframe=timeframe, - end=tsdb_last_frame_start, - ) - - # empty query - if not len(tsdb_history): - break - - next_start = tsdb_history['Epoch'][0] - if next_start >= tsdb_last_frame_start: - # no earlier data detected - break - else: - tsdb_last_frame_start = next_start - - prepend_start = shm._first.value - to_push = tsdb_history[-prepend_start:] - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - shm.push( - to_push, - prepend=True, - field_map=storemod.ohlc_key_map, - ) - log.info(f'Loaded {to_push.shape} datums from storage') - - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - await sampler_stream.send('broadcast_all') - - # TODO: write new data to tsdb to be ready to for next read. - - # backload from db (concurrently per timeframe) once backfilling of - # recent dat a loaded from the backend provider (see - # ``bf_done.wait()`` call). + # backload any further data from tsdb (concurrently per + # timeframe) if not all data was able to be loaded (in memory) + # from the ``StorageClient.load()`` call above. async with trio.open_nursery() as nurse: for timeframe, shm in shms.items(): + + entry = dts_per_tf.get(timeframe) + if not entry: + continue + + ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) = entry + + if not tsdb_history.size: + continue + nurse.start_soon( back_load_from_tsdb, + + storemod, + storage, + fqme, + + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + timeframe, shm, ) + # try: + # await trio.sleep_forever() + # finally: + # write_ohlcv + async def manage_history( mod: ModuleType, @@ -624,8 +688,23 @@ async def manage_history( ''' Load and manage historical data including the loading of any available series from any connected tsdb as well as conduct - real-time update of both that existing db and the allocated shared - memory buffer. + real-time update of both that existing db and the allocated + shared memory buffer. + + Init sequence: + - allocate shm (numpy array) buffers for 60s & 1s sample rates + - configure "zero index" for each buffer: the index where + history will prepended *to* and new live data will be + appened *from*. + - open a ``.storage.StorageClient`` and load any existing tsdb + history as well as (async) start a backfill task which loads + missing (newer) history from the data provider backend: + - tsdb history is loaded first and pushed to shm ASAP. + - the backfill task loads the most recent history before + unblocking its parent task, so that the `ShmArray._last` is + up to date to allow the OHLC sampler to begin writing new + samples as the correct buffer index once the provider feed + engages. ''' # TODO: is there a way to make each shm file key @@ -684,88 +763,86 @@ async def manage_history( "Persistent shm for sym was already open?!" ) - # register 1s and 1m buffers with the global incrementer task - async with open_sample_stream( - period_s=1., - shms_by_period={ - 1.: rt_shm.token, - 60.: hist_shm.token, - }, + open_history_client = getattr( + mod, + 'open_history_client', + None, + ) + assert open_history_client - # NOTE: we want to only open a stream for doing broadcasts on - # backfill operations, not receive the sample index-stream - # (since there's no code in this data feed layer that needs to - # consume it). - open_index_stream=True, - sub_for_broadcasts=False, + # TODO: maybe it should be a subpkg of `.data`? + from piker import storage - ) as sample_stream: - - open_history_client = getattr( - mod, - 'open_history_client', - None, + async with storage.open_storage_client() as (storemod, client): + log.info( + f'Connecting to storage backend `{storemod.name}`:\n' + f'location: {client.address}\n' + f'db cardinality: {client.cardinality}\n' + # TODO: show backend config, eg: + # - network settings + # - storage size with compression + # - number of loaded time series? ) - assert open_history_client - from .. import storage - try: - async with storage.open_storage_client() as (storemod, client): - log.info(f'Found existing `{storemod.name}`') - # TODO: drop returning the output that we pass in? - await bus.nursery.start( - tsdb_backfill, - mod, - storemod, - bus, - client, - mkt, - { - 1: rt_shm, - 60: hist_shm, - }, - sample_stream, - feed_is_live, - ) - # yield back after client connect with filled shm - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) + # NOTE: this call ONLY UNBLOCKS once the latest-most frame + # (i.e. history just before the live feed latest datum) of + # history has been loaded and written to the shm buffer: + # - the backfiller task can write in reverse chronological + # to the shm and tsdb + # - the tsdb data can be loaded immediately and the + # backfiller can do a single append from it's end datum and + # then prepends backward to that from the current time + # step. + await bus.nursery.start( + tsdb_backfill, + mod, + storemod, + bus, + client, + mkt, + { + 1: rt_shm, + 60: hist_shm, + }, + # sample_stream, + feed_is_live, + ) - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() - # history retreival loop depending on user interaction - # and thus a small RPC-prot for remotely controllinlg - # what data is loaded for viewing. - await trio.sleep_forever() + # wait for a live feed before starting the sampler. + await feed_is_live.wait() - except storage.StorageConnectionError: - log.exception( - "Can't connect to tsdb backend!?\n" - 'Starting basic backfille to shm..' - ) - await basic_backfill( - bus, - mod, - mkt, - { - 1: rt_shm, - 60: hist_shm, - }, - sample_stream, - feed_is_live, - ) + # register 1s and 1m buffers with the global incrementer task + async with open_sample_stream( + period_s=1., + shms_by_period={ + 1.: rt_shm.token, + 60.: hist_shm.token, + }, + + # NOTE: we want to only open a stream for doing + # broadcasts on backfill operations, not receive the + # sample index-stream (since there's no code in this + # data feed layer that needs to consume it). + open_index_stream=True, + sub_for_broadcasts=False, + + ) as sample_stream: + log.info(f'Connected to sampler stream: {sample_stream}') + + # yield back after client connect with filled shm task_status.started(( hist_zero_index, hist_shm, rt_zero_index, rt_shm, )) - some_data_ready.set() + + # history retreival loop depending on user interaction + # and thus a small RPC-prot for remotely controllinlg + # what data is loaded for viewing. await trio.sleep_forever()