diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a8dba30b..98a7603f 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -230,6 +230,7 @@ class Sampler: self, period_s: float, time_stamp: float | None = None, + info: dict | None = None, ) -> None: ''' @@ -258,10 +259,14 @@ class Sampler: try: for stream in (subs - sent): try: - await stream.send({ + msg = { 'index': time_stamp or last_ts, 'period': period_s, - }) + } + if info: + msg.update(info) + + await stream.send(msg) sent.add(stream) except ( @@ -287,9 +292,15 @@ class Sampler: ) @classmethod - async def broadcast_all(self) -> None: + async def broadcast_all( + self, + info: dict | None = None, + ) -> None: for period_s in self.subscribers: - await self.broadcast(period_s) + await self.broadcast( + period_s, + info=info, + ) @tractor.context @@ -359,8 +370,10 @@ async def register_with_sampler( # except broadcast requests from the subscriber async for msg in stream: - if msg == 'broadcast_all': - await Sampler.broadcast_all() + if 'broadcast_all' in msg: + await Sampler.broadcast_all( + info=msg['broadcast_all'], + ) finally: if ( sub_for_broadcasts @@ -468,6 +481,8 @@ async def open_sample_stream( cache_key: str | None = None, allow_new_sampler: bool = True, + ensure_is_active: bool = False, + ) -> AsyncIterator[dict[str, float]]: ''' Subscribe to OHLC sampling "step" events: when the time aggregation @@ -510,12 +525,18 @@ async def open_sample_stream( }, ) as (ctx, first) ): - assert len(first) > 1 + if ensure_is_active: + assert len(first) > 1 + async with ( ctx.open_stream() as istream, - # TODO: we don't need this task-bcasting right? - # istream.subscribe() as istream, + # TODO: we DO need this task-bcasting so that + # for eg. the history chart update loop eventually + # receceives all backfilling event msgs such that + # the underlying graphics format arrays are + # re-allocated until all history is loaded! + istream.subscribe() as istream, ): yield istream