From e9d64ffee8620b56a246140e5c0e0a3e844a987d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Mar 2022 09:31:12 -0500 Subject: [PATCH] Use fqsn in `.manage_history()` Allocate and `.started()` return the `ShmArray` from here as well in prep for tsdb integration. --- piker/data/feed.py | 51 ++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 611d79ce..c9bb5e36 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -191,10 +191,8 @@ async def _setup_persistent_brokerd( async def manage_history( mod: ModuleType, - shm: ShmArray, bus: _FeedsBus, symbol: str, - we_opened_shm: bool, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -208,21 +206,30 @@ async def manage_history( buffer. ''' - task_status.started() + fqsn = mk_fqsn(mod.name, symbol) - opened = we_opened_shm - # TODO: history validation - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + shm, opened = maybe_open_shm_array( + key=fqsn, + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) if opened: - # ask broker backend for new history + log.info('No existing `marketstored` found..') # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # yield back after client connect with filled shm + task_status.started(shm) # indicate to caller that feed can be delivered to # remote requesting client since we've loaded history @@ -243,13 +250,12 @@ async def manage_history( # start shm incrementing for OHLC sampling at the current # detected sampling period if one dne. if sampler.incrementers.get(delay_s) is None: - cs = await bus.start_task( + await bus.start_task( increment_ohlc_buffer, delay_s, ) await trio.sleep_forever() - cs.cancel() async def allocate_persistent_feed( @@ -281,18 +287,6 @@ async def allocate_persistent_feed( fqsn = mk_fqsn(brokername, symbol) - # (maybe) allocate shm array for this broker/symbol which will - # be used for fast near-term history capture and processing. - shm, opened = maybe_open_shm_array( - key=fqsn, - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=False, - ) - # mem chan handed to broker backend so it can push real-time # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) @@ -311,13 +305,11 @@ async def allocate_persistent_feed( # bus.nursery.start_soon( # await bus.start_task( - await bus.nursery.start( + shm = await bus.nursery.start( manage_history, mod, - shm, bus, symbol, - opened, some_data_ready, feed_is_live, ) @@ -454,6 +446,11 @@ async def open_feed_bus( async with ( ctx.open_stream() as stream, ): + # re-send to trigger display loop cycle (necessary especially + # when the mkt is closed and no real-time messages are + # expected). + await stream.send(first_quotes) + if tick_throttle: # open a bg task which receives quotes over a mem chan