From b3d1b1aa639f4aca69ba2d53e84cf493dff96864 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 4 Jan 2023 23:03:43 -0500 Subject: [PATCH] Port feed layer to use new `samplerd` APIs Always use `open_sample_stream()` to register fast and slow quote feed buffers and get a sampler stream which we use to trigger `Sampler.broadcast_all()` calls on the service side after backfill events. --- piker/data/feed.py | 161 +++++++++++++++++++++++---------------------- 1 file changed, 82 insertions(+), 79 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 89330475..aa2a9a5a 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -74,10 +74,9 @@ from ._source import ( ) from ..ui import _search from ._sampling import ( - Sampler, + open_sample_stream, sample_and_broadcast, uniform_rate_send, - _default_delay_s, ) from ..brokers._util import ( DataUnavailable, @@ -277,6 +276,7 @@ async def start_backfill( bfqsn: str, shm: ShmArray, timeframe: float, + sampler_stream: tractor.MsgStream, last_tsdb_dt: Optional[datetime] = None, storage: Optional[Storage] = None, @@ -325,7 +325,7 @@ async def start_backfill( # TODO: *** THIS IS A BUG *** # we need to only broadcast to subscribers for this fqsn.. # otherwise all fsps get reset on every chart.. - await Sampler.broadcast_all() + await sampler_stream.send('broadcast_all') # signal that backfilling to tsdb's end datum is complete bf_done = trio.Event() @@ -493,7 +493,7 @@ async def start_backfill( # in the block above to avoid entering new ``frames`` # values while we're pipelining the current ones to # memory... - await Sampler.broadcast_all() + await sampler_stream.send('broadcast_all') # short-circuit (for now) bf_done.set() @@ -504,6 +504,7 @@ async def basic_backfill( mod: ModuleType, bfqsn: str, shms: dict[int, ShmArray], + sampler_stream: tractor.MsgStream, ) -> None: @@ -521,7 +522,8 @@ async def basic_backfill( mod, bfqsn, shm, - timeframe=timeframe, + timeframe, + sampler_stream, ) ) except DataUnavailable: @@ -537,6 +539,7 @@ async def tsdb_backfill( fqsn: str, bfqsn: str, shms: dict[int, ShmArray], + sampler_stream: tractor.MsgStream, task_status: TaskStatus[ tuple[ShmArray, ShmArray] @@ -569,7 +572,8 @@ async def tsdb_backfill( mod, bfqsn, shm, - timeframe=timeframe, + timeframe, + sampler_stream, last_tsdb_dt=last_tsdb_dt, tsdb_is_up=True, storage=storage, @@ -734,7 +738,7 @@ async def tsdb_backfill( # (usually a chart showing graphics for said fsp) # which tells the chart to conduct a manual full # graphics loop cycle. - await Sampler.broadcast_all() + await sampler_stream.send('broadcast_all') # TODO: write new data to tsdb to be ready to for next read. @@ -823,79 +827,96 @@ async def manage_history( "Persistent shm for sym was already open?!" ) - log.info('Scanning for existing `marketstored`') - tsdb_is_up = await check_for_service('marketstored') + # register 1s and 1m buffers with the global incrementer task + async with open_sample_stream( + period_s=1, + cache_key=fqsn, + shms_by_period={ + 1.: rt_shm.token, + 60.: hist_shm.token, + }, + open_index_stream=True, + ) as sample_stream: - bfqsn = fqsn.replace('.' + mod.name, '') - open_history_client = getattr(mod, 'open_history_client', None) - assert open_history_client + log.info('Scanning for existing `marketstored`') + tsdb_is_up = await check_for_service('marketstored') - if ( - tsdb_is_up - and opened - and open_history_client - ): - log.info('Found existing `marketstored`') + bfqsn = fqsn.replace('.' + mod.name, '') + open_history_client = getattr(mod, 'open_history_client', None) + assert open_history_client - from . import marketstore - async with ( - marketstore.open_storage_client(fqsn)as storage, + if ( + tsdb_is_up + and opened + and open_history_client ): - hist_shm, rt_shm = await bus.nursery.start( - tsdb_backfill, - mod, - marketstore, + log.info('Found existing `marketstored`') + + from . import marketstore + async with ( + marketstore.open_storage_client(fqsn)as storage, + ): + # TODO: drop returning the output that we pass in? + ( + hist_shm, + rt_shm, + ) = await bus.nursery.start( + tsdb_backfill, + mod, + marketstore, + bus, + storage, + fqsn, + bfqsn, + { + 1: rt_shm, + 60: hist_shm, + }, + sample_stream, + ) + + # yield back after client connect with filled shm + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # history retreival loop depending on user interaction + # and thus a small RPC-prot for remotely controllinlg + # what data is loaded for viewing. + await trio.sleep_forever() + + # load less history if no tsdb can be found + elif ( + not tsdb_is_up + and opened + ): + await basic_backfill( bus, - storage, - fqsn, + mod, bfqsn, { 1: rt_shm, 60: hist_shm, }, + sample_stream, ) - - # yield back after client connect with filled shm task_status.started(( hist_zero_index, hist_shm, rt_zero_index, rt_shm, )) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. some_data_ready.set() - - # history retreival loop depending on user interaction and thus - # a small RPC-prot for remotely controllinlg what data is loaded - # for viewing. await trio.sleep_forever() - # load less history if no tsdb can be found - elif ( - not tsdb_is_up - and opened - ): - await basic_backfill( - bus, - mod, - bfqsn, - shms={ - 1: rt_shm, - 60: hist_shm, - }, - ) - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) - some_data_ready.set() - await trio.sleep_forever() - async def allocate_persistent_feed( bus: _FeedsBus, @@ -997,6 +1018,7 @@ async def allocate_persistent_feed( # https://github.com/python-trio/trio/issues/2258 # bus.nursery.start_soon( # await bus.start_task( + ( izero_hist, hist_shm, @@ -1030,13 +1052,6 @@ async def allocate_persistent_feed( # feed to that name (for now). bus.feeds[symstr] = bus.feeds[bfqsn] = flume - # insert 1s ohlc into the increment buffer set - # to update and shift every second - Sampler.ohlcv_shms.setdefault( - 1, - [] - ).append(rt_shm) - task_status.started() if not start_stream: @@ -1046,18 +1061,6 @@ async def allocate_persistent_feed( # the backend will indicate when real-time quotes have begun. await feed_is_live.wait() - # insert 1m ohlc into the increment buffer set - # to shift every 60s. - Sampler.ohlcv_shms.setdefault(60, []).append(hist_shm) - - # create buffer a single incrementer task broker backend - # (aka `brokerd`) using the lowest sampler period. - if Sampler.incrementers.get(_default_delay_s) is None: - await bus.start_task( - Sampler.increment_ohlc_buffer, - _default_delay_s, - ) - sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True)