From 2778ee14010075ced4c8f443c0e32fdec59da665 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jan 2023 10:12:42 -0500 Subject: [PATCH] Support not registering for sample-index msgs via `sub_for_broadcasts: bool` flag --- piker/data/_sampling.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f70e4113..f1bbc500 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -162,7 +162,6 @@ class Sampler: if shm_period_s not in broadcasted: sub_pair = self.subscribers[shm_period_s] sub_pair[0] = i_epoch - print(f'skipping `{shm_period_s}s` sample update') broadcasted.add(shm_period_s) # TODO: ``numba`` this! @@ -174,8 +173,8 @@ class Sampler: array = shm.array last = array[-1:][shm._write_fields].copy() - # guard against startup backfilling race with - # empty buffers. + # guard against startup backfilling races where + # the buffer has not yet been filled. if not last.size: continue @@ -288,7 +287,9 @@ async def register_with_sampler( ctx: tractor.Context, period_s: float, shms_by_period: dict[float, dict] | None = None, - open_index_stream: bool = True, + + open_index_stream: bool = True, # open a 2way stream for sample step msgs? + sub_for_broadcasts: bool = True, # sampler side to send step updates? ) -> None: @@ -341,15 +342,16 @@ async def register_with_sampler( if open_index_stream: try: async with ctx.open_stream() as stream: - subs.add(stream) + if sub_for_broadcasts: + subs.add(stream) # except broadcast requests from the subscriber async for msg in stream: if msg == 'broadcast_all': await Sampler.broadcast_all() - finally: - subs.remove(stream) + if sub_for_broadcasts: + subs.remove(stream) else: # if no shms are passed in we just wait until cancelled # by caller. @@ -404,6 +406,7 @@ async def spawn_samplerd( portal, register_with_sampler, period_s=1, + sub_for_broadcasts=False, ) return True @@ -437,9 +440,10 @@ async def maybe_open_samplerd( @acm async def open_sample_stream( - period_s: int, + period_s: float, shms_by_period: dict[float, dict] | None = None, open_index_stream: bool = True, + sub_for_broadcasts: bool = True, cache_key: str | None = None, allow_new_sampler: bool = True, @@ -482,6 +486,7 @@ async def open_sample_stream( 'period_s': period_s, 'shms_by_period': shms_by_period, 'open_index_stream': open_index_stream, + 'sub_for_broadcasts': sub_for_broadcasts, }, ) as (ctx, first) ):