diff --git a/piker/data/__init__.py b/piker/data/__init__.py index afc74d75..907dd6fe 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -53,9 +53,10 @@ from ._sharedmem import ( ) from ._source import base_iohlc_dtype, Symbol from ._buffer import ( + _shms, + _incrementers, increment_ohlc_buffer, - subscribe_ohlc_for_increment, - shm_incrementing, + iter_ohlc_periods, ) __all__ = [ @@ -64,7 +65,7 @@ __all__ = [ 'attach_shm_array', 'open_shm_array', 'get_shm_token', - 'subscribe_ohlc_for_increment', + # 'subscribe_ohlc_for_increment', ] @@ -181,10 +182,10 @@ async def allocate_persistent_feed( readonly=False, ) - # assert opened - if not opened: - # do history validation? - pass + # do history validation? + assert opened, "Persistent shm for sym was already open?!" + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") send, quote_stream = trio.open_memory_channel(2**8) feed_is_live = trio.Event() @@ -204,34 +205,48 @@ async def allocate_persistent_feed( init_msg[symbol]['shm_token'] = shm.token cs = trio.CancelScope() + + # TODO: make this into a composed type which also + # contains the backfiller cs for individual super-based + # resspawns when needed. bus.feeds[symbol] = (cs, init_msg, first_quote) with cs: if opened: - # start history backfill task - # ``backfill_bars()`` is a required backend func - await bus.nursery.start(mod.backfill_bars, symbol, shm) - # yield back control to starting nursery - task_status.started((init_msg, first_quote)) + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + await bus.nursery.start(mod.backfill_bars, symbol, shm) times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] + # pass OHLC sample rate in seconds + init_msg[symbol]['sample_rate'] = delay_s + + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) + await feed_is_live.wait() - # tell incrementer task it can start - shm_incrementing(shm.token['shm_name']).set() + # # tell incrementer task it can start + # shm_incrementing(shm.token['shm_name']).set() # start shm incrementingn for OHLC sampling - subscribe_ohlc_for_increment(shm, delay_s) + # subscribe_ohlc_for_increment(shm, delay_s) - # begin shm write loop and broadcast to subscribers + if opened: + _shms.setdefault(delay_s, []).append(shm) + + if _incrementers.get(delay_s) is None: + cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) + # begin shm write loop and broadcast to subscribers async with quote_stream: log.info("Started shared mem bar writer") @@ -372,6 +387,7 @@ class Feed: _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + _max_sample_rate: int = 0 # cache of symbol info messages received as first message when # a stream startsc. @@ -380,15 +396,19 @@ class Feed: async def receive(self) -> dict: return await self.stream.__anext__() - async def index_stream(self) -> AsyncIterator[int]: + async def index_stream( + self, + delay_s: Optional[int] = None + + ) -> AsyncIterator[int]: + if not self._index_stream: # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes self._index_stream = await self._brokerd_portal.run( - increment_ohlc_buffer, - shm_token=self.shm.token, - topics=['index'], + iter_ohlc_periods, + delay_s=delay_s or self._max_sample_rate, ) return self._index_stream @@ -459,9 +479,9 @@ async def open_feed( loglevel=loglevel, ) - # TODO: we can't do this **and** be compate with - # ``tractor.msg.pub``, should we maybe just drop this after - # tests are in? + # TODO: can we make this work better with the proposed + # context based bidirectional streaming style api proposed in: + # https://github.com/goodboy/tractor/issues/53 init_msg = await stream.receive() shm = attach_shm_array( @@ -478,10 +498,12 @@ async def open_feed( mod=mod, _brokerd_portal=portal, ) + ohlc_sample_rates = [] for sym, data in init_msg.items(): si = data['symbol_info'] + ohlc_sample_rates.append(data['sample_rate']) symbol = Symbol( key=sym, @@ -493,9 +515,11 @@ async def open_feed( feed.symbols[sym] = symbol + # cast shm dtype to list... can't member why we need this shm_token = data['shm_token'] + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) + assert shm_token == shm.token # sanity - shm_token['dtype_descr'] = list(shm_token['dtype_descr']) - assert shm_token == shm.token # sanity + feed._max_sample_rate = max(ohlc_sample_rates) yield feed