From c52e889fe5f834e31e306c124aed7c9dad622ee6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Jun 2023 12:17:31 -0400 Subject: [PATCH] First draft history loading rework It was a concurrency-hack mess somewhat due to all sorts of limitations imposed by marketstore (query size limits, strange datetime/timestamp errors, slow table loads for large queries..) and we can drastically simplify. There's still some issues with getting new backfills (not yet in storage) correctly prepended: there's sometimes little gaps due to shm races when reading history indexing vs. when the live-feed startup finishes. We generally need tests for all this and likely a better rework of the feed layer's init such that we're showing history in chart afap instead of waiting on backfills or the live feed to come up. Much more to come B) --- piker/data/feed.py | 5 +- piker/data/history.py | 595 ++++++++++++++++++++++++------------------ 2 files changed, 339 insertions(+), 261 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 91793440..0cfdb848 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -340,7 +340,7 @@ async def allocate_persistent_feed( # yield back control to starting nursery once we receive either # some history or a real-time quote. - log.info(f'waiting on history to load: {fqme}') + log.info(f'loading OHLCV history: {fqme}') await some_data_ready.wait() flume = Flume( @@ -370,7 +370,8 @@ async def allocate_persistent_feed( mkt.bs_fqme: flume, }) - # signal the ``open_feed_bus()`` caller task to continue + # signal the ``open_feed_bus()`` caller task to continue since + # we now have (some) history pushed to the shm buffer. task_status.started(init) if not start_stream: diff --git a/piker/data/history.py b/piker/data/history.py index 4a0ab29b..aef3a15f 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -57,6 +57,7 @@ from ..brokers._util import ( ) if TYPE_CHECKING: + from bidict import bidict from ..service.marketstore import StorageClient from .feed import _FeedsBus @@ -83,13 +84,13 @@ async def start_backfill( mkt: MktPair, shm: ShmArray, timeframe: float, - sampler_stream: tractor.MsgStream, + # sampler_stream: tractor.MsgStream, feed_is_live: trio.Event, last_tsdb_dt: datetime | None = None, storage: StorageClient | None = None, write_tsdb: bool = True, - tsdb_is_up: bool = False, + tsdb_is_up: bool = True, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, @@ -120,6 +121,13 @@ async def start_backfill( - pendulum.from_timestamp(times[-2]) ).seconds + if step_size_s not in (1, 60): + log.error(f'Last 2 sample period is off!? -> {step_size_s}') + step_size_s = ( + pendulum.from_timestamp(times[-2]) + - pendulum.from_timestamp(times[-3]) + ).seconds + # if the market is open (aka we have a live feed) but the # history sample step index seems off we report the surrounding # data and drop into a bp. this case shouldn't really ever @@ -158,12 +166,15 @@ async def start_backfill( ) log.info(f'Pushing {to_push.size} to shm!') - shm.push(to_push, prepend=True) + shm.push( + to_push, + # prepend=True, + ) # TODO: *** THIS IS A BUG *** # we need to only broadcast to subscribers for this fqme.. # otherwise all fsps get reset on every chart.. - await sampler_stream.send('broadcast_all') + # await sampler_stream.send('broadcast_all') # signal that backfilling to tsdb's end datum is complete bf_done = trio.Event() @@ -297,9 +308,13 @@ async def start_backfill( f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' ) - # bail gracefully on shm allocation overrun/full condition + # bail gracefully on shm allocation overrun/full + # condition try: - shm.push(to_push, prepend=True) + shm.push( + to_push, + prepend=True, + ) except ValueError: log.info( f'Shm buffer overrun on: {start_dt} -> {end_dt}?' @@ -316,6 +331,7 @@ async def start_backfill( if ( storage is not None and write_tsdb + # and False ): log.info( f'Writing {ln} frame to storage:\n' @@ -334,7 +350,7 @@ async def start_backfill( await storage.write_ohlcv( col_sym_key, - to_push, + shm.array, timeframe, ) @@ -345,44 +361,165 @@ async def start_backfill( # in the block above to avoid entering new ``frames`` # values while we're pipelining the current ones to # memory... - await sampler_stream.send('broadcast_all') + # await sampler_stream.send('broadcast_all') # short-circuit (for now) bf_done.set() -async def basic_backfill( - bus: _FeedsBus, - mod: ModuleType, - mkt: MktPair, - shms: dict[int, ShmArray], - sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, +def push_tsdb_history_to_shm( + storemod: ModuleType, + shm: ShmArray, + tsdb_history: np.ndarray, + time_field_key: str, +) -> datetime: -) -> None: + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, - # do a legacy incremental backfill from the provider. - log.info('No TSDB (marketstored) found, doing basic backfill..') + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=storemod.ohlc_key_map, + ) - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - for timeframe, shm in shms.items(): - try: - await bus.nursery.start( - partial( - start_backfill, - mod, - mkt, - shm, - timeframe, - sampler_stream, - feed_is_live, - ) - ) - except DataUnavailable: - # XXX: timeframe not supported for backend - continue + log.info(f'Loaded {to_push.shape} datums from storage') + tsdb_last_frame_start = tsdb_history[time_field_key][0] + return pendulum.from_timestamp(tsdb_last_frame_start) + + +async def back_load_from_tsdb( + storemod: ModuleType, + storage: StorageClient, + + fqme: str, + # dts_per_tf: dict[int, datetime], + + tsdb_history: np.ndarray, + + last_tsdb_dt: datetime, + latest_start_dt: datetime, + latest_end_dt: datetime, + + bf_done: trio.Event, + + timeframe: int, + shm: ShmArray, +): + assert len(tsdb_history) + + # sync to backend history task's query/load completion + # if bf_done: + # await bf_done.wait() + + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? + if storemod.name == 'nativedb': + return + + await tractor.breakpoint() + assert shm._first.value == 0 + + array = shm.array + + # if timeframe == 1: + # times = shm.array['time'] + # assert (times[1] - times[0]) == 1 + + if len(array): + shm_last_dt = pendulum.from_timestamp( + shm.array[0]['time'] + ) + else: + shm_last_dt = None + + if last_tsdb_dt: + assert shm_last_dt >= last_tsdb_dt + + # do diff against start index of last frame of history and only + # fill in an amount of datums from tsdb allows for most recent + # to be loaded into mem *before* tsdb data. + if ( + last_tsdb_dt + and latest_start_dt + ): + backfilled_size_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s loaded' + ) + + # Load TSDB history into shm buffer (for display) if there is + # remaining buffer space. + + time_key: str = 'time' + if getattr(storemod, 'ohlc_key_map', False): + keymap: bidict = storemod.ohlc_key_map + time_key: str = keymap.inverse['time'] + + # if ( + # not len(tsdb_history) + # ): + # return + + tsdb_last_frame_start: datetime = last_tsdb_dt + # load as much from storage into shm possible (depends on + # user's shm size settings). + while shm._first.value > 0: + + tsdb_history = await storage.read_ohlcv( + fqme, + timeframe=timeframe, + end=tsdb_last_frame_start, + ) + + # # empty query + # if not len(tsdb_history): + # break + + next_start = tsdb_history[time_key][0] + if next_start >= tsdb_last_frame_start: + # no earlier data detected + break + + else: + tsdb_last_frame_start = next_start + + tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( + storemod, + shm, + tsdb_history, + time_key, + ) + + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. + # await sampler_stream.send('broadcast_all') + + # TODO: write new data to tsdb to be ready to for next read. async def tsdb_backfill( @@ -392,7 +529,7 @@ async def tsdb_backfill( storage: StorageClient, mkt: MktPair, shms: dict[int, ShmArray], - sampler_stream: tractor.MsgStream, + # sampler_stream: tractor.MsgStream, feed_is_live: trio.Event, task_status: TaskStatus[ @@ -406,16 +543,42 @@ async def tsdb_backfill( dts_per_tf: dict[int, datetime] = {} fqme: str = mkt.fqme + time_key: str = 'time' + if getattr(storemod, 'ohlc_key_map', False): + keymap: bidict = storemod.ohlc_key_map + time_key: str = keymap.inverse['time'] + # start history anal and load missing new data via backend. - timeframe: int + last_tsdb_dt: datetime | None = None + timeframe: int # OHLC sample period for timeframe, shm in shms.items(): + # loads a (large) frame of data from the tsdb depending - # on the db's query size limit. - tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( + # on the db's query size limit; our "nativedb" (using + # parquet) generally can load the entire history into mem + # but if not then below the remaining history can be lazy + # loaded? + tsdb_entry: tuple | None = await storage.load( fqme, timeframe=timeframe, ) + if tsdb_entry: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry + + tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( + storemod, + shm, + tsdb_history, + time_key, + ) + assert tsdb_last_frame_start == first_tsdb_dt + + # begin backfiller task ASAP try: ( latest_start_dt, @@ -428,7 +591,7 @@ async def tsdb_backfill( mkt, shm, timeframe, - sampler_stream, + # sampler_stream, feed_is_live, last_tsdb_dt=last_tsdb_dt, @@ -436,25 +599,20 @@ async def tsdb_backfill( storage=storage, ) ) + if tsdb_entry: + dts_per_tf[timeframe] = ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) except DataUnavailable: - # XXX: timeframe not supported for backend - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - None, - None, - None, - ) + # XXX: timeframe not supported for backend (since + # above exception type), so skip and move on to next. continue # tsdb_history = series.get(timeframe) - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) # if len(hist_shm.array) < 2: # TODO: there's an edge case here to solve where if the last @@ -470,143 +628,49 @@ async def tsdb_backfill( # assert len(shms[1].array) task_status.started() - async def back_load_from_tsdb( - timeframe: int, - shm: ShmArray, - ): - ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) = dts_per_tf[timeframe] - - # sync to backend history task's query/load completion - if bf_done: - await bf_done.wait() - - # TODO: eventually it'd be nice to not require a shm array/buffer - # to accomplish this.. maybe we can do some kind of tsdb direct to - # graphics format eventually in a child-actor? - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = shm._first.value - array = shm.array - if len(array): - shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) - else: - shm_last_dt = None - - if last_tsdb_dt: - assert shm_last_dt >= last_tsdb_dt - - # do diff against start index of last frame of history and only - # fill in an amount of datums from tsdb allows for most recent - # to be loaded into mem *before* tsdb data. - if ( - last_tsdb_dt - and latest_start_dt - ): - backfilled_size_s = ( - latest_start_dt - last_tsdb_dt - ).seconds - # if the shm buffer len is not large enough to contain - # all missing data between the most recent backend-queried frame - # and the most recent dt-index in the db we warn that we only - # want to load a portion of the next tsdb query to fill that - # space. - log.info( - f'{backfilled_size_s} seconds worth of {timeframe}s loaded' - ) - - # Load TSDB history into shm buffer (for display) if there is - # remaining buffer space. - - if ( - len(tsdb_history) - ): - # load the first (smaller) bit of history originally loaded - # above from ``StorageClient.load()``. - to_push = tsdb_history[-prepend_start:] - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=storemod.ohlc_key_map, - ) - - tsdb_last_frame_start = tsdb_history['Epoch'][0] - - if timeframe == 1: - times = shm.array['time'] - assert (times[1] - times[0]) == 1 - - # load as much from storage into shm possible (depends on - # user's shm size settings). - while shm._first.value > 0: - - tsdb_history = await storage.read_ohlcv( - fqme, - timeframe=timeframe, - end=tsdb_last_frame_start, - ) - - # empty query - if not len(tsdb_history): - break - - next_start = tsdb_history['Epoch'][0] - if next_start >= tsdb_last_frame_start: - # no earlier data detected - break - else: - tsdb_last_frame_start = next_start - - prepend_start = shm._first.value - to_push = tsdb_history[-prepend_start:] - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - shm.push( - to_push, - prepend=True, - field_map=storemod.ohlc_key_map, - ) - log.info(f'Loaded {to_push.shape} datums from storage') - - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - await sampler_stream.send('broadcast_all') - - # TODO: write new data to tsdb to be ready to for next read. - - # backload from db (concurrently per timeframe) once backfilling of - # recent dat a loaded from the backend provider (see - # ``bf_done.wait()`` call). + # backload any further data from tsdb (concurrently per + # timeframe) if not all data was able to be loaded (in memory) + # from the ``StorageClient.load()`` call above. async with trio.open_nursery() as nurse: for timeframe, shm in shms.items(): + + entry = dts_per_tf.get(timeframe) + if not entry: + continue + + ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) = entry + + if not tsdb_history.size: + continue + nurse.start_soon( back_load_from_tsdb, + + storemod, + storage, + fqme, + + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + timeframe, shm, ) + # try: + # await trio.sleep_forever() + # finally: + # write_ohlcv + async def manage_history( mod: ModuleType, @@ -624,8 +688,23 @@ async def manage_history( ''' Load and manage historical data including the loading of any available series from any connected tsdb as well as conduct - real-time update of both that existing db and the allocated shared - memory buffer. + real-time update of both that existing db and the allocated + shared memory buffer. + + Init sequence: + - allocate shm (numpy array) buffers for 60s & 1s sample rates + - configure "zero index" for each buffer: the index where + history will prepended *to* and new live data will be + appened *from*. + - open a ``.storage.StorageClient`` and load any existing tsdb + history as well as (async) start a backfill task which loads + missing (newer) history from the data provider backend: + - tsdb history is loaded first and pushed to shm ASAP. + - the backfill task loads the most recent history before + unblocking its parent task, so that the `ShmArray._last` is + up to date to allow the OHLC sampler to begin writing new + samples as the correct buffer index once the provider feed + engages. ''' # TODO: is there a way to make each shm file key @@ -684,88 +763,86 @@ async def manage_history( "Persistent shm for sym was already open?!" ) - # register 1s and 1m buffers with the global incrementer task - async with open_sample_stream( - period_s=1., - shms_by_period={ - 1.: rt_shm.token, - 60.: hist_shm.token, - }, + open_history_client = getattr( + mod, + 'open_history_client', + None, + ) + assert open_history_client - # NOTE: we want to only open a stream for doing broadcasts on - # backfill operations, not receive the sample index-stream - # (since there's no code in this data feed layer that needs to - # consume it). - open_index_stream=True, - sub_for_broadcasts=False, + # TODO: maybe it should be a subpkg of `.data`? + from piker import storage - ) as sample_stream: - - open_history_client = getattr( - mod, - 'open_history_client', - None, + async with storage.open_storage_client() as (storemod, client): + log.info( + f'Connecting to storage backend `{storemod.name}`:\n' + f'location: {client.address}\n' + f'db cardinality: {client.cardinality}\n' + # TODO: show backend config, eg: + # - network settings + # - storage size with compression + # - number of loaded time series? ) - assert open_history_client - from .. import storage - try: - async with storage.open_storage_client() as (storemod, client): - log.info(f'Found existing `{storemod.name}`') - # TODO: drop returning the output that we pass in? - await bus.nursery.start( - tsdb_backfill, - mod, - storemod, - bus, - client, - mkt, - { - 1: rt_shm, - 60: hist_shm, - }, - sample_stream, - feed_is_live, - ) - # yield back after client connect with filled shm - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) + # NOTE: this call ONLY UNBLOCKS once the latest-most frame + # (i.e. history just before the live feed latest datum) of + # history has been loaded and written to the shm buffer: + # - the backfiller task can write in reverse chronological + # to the shm and tsdb + # - the tsdb data can be loaded immediately and the + # backfiller can do a single append from it's end datum and + # then prepends backward to that from the current time + # step. + await bus.nursery.start( + tsdb_backfill, + mod, + storemod, + bus, + client, + mkt, + { + 1: rt_shm, + 60: hist_shm, + }, + # sample_stream, + feed_is_live, + ) - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() - # history retreival loop depending on user interaction - # and thus a small RPC-prot for remotely controllinlg - # what data is loaded for viewing. - await trio.sleep_forever() + # wait for a live feed before starting the sampler. + await feed_is_live.wait() - except storage.StorageConnectionError: - log.exception( - "Can't connect to tsdb backend!?\n" - 'Starting basic backfille to shm..' - ) - await basic_backfill( - bus, - mod, - mkt, - { - 1: rt_shm, - 60: hist_shm, - }, - sample_stream, - feed_is_live, - ) + # register 1s and 1m buffers with the global incrementer task + async with open_sample_stream( + period_s=1., + shms_by_period={ + 1.: rt_shm.token, + 60.: hist_shm.token, + }, + + # NOTE: we want to only open a stream for doing + # broadcasts on backfill operations, not receive the + # sample index-stream (since there's no code in this + # data feed layer that needs to consume it). + open_index_stream=True, + sub_for_broadcasts=False, + + ) as sample_stream: + log.info(f'Connected to sampler stream: {sample_stream}') + + # yield back after client connect with filled shm task_status.started(( hist_zero_index, hist_shm, rt_zero_index, rt_shm, )) - some_data_ready.set() + + # history retreival loop depending on user interaction + # and thus a small RPC-prot for remotely controllinlg + # what data is loaded for viewing. await trio.sleep_forever()