Support not registering for sample-index msgs via `sub_for_broadcasts: bool` flag
parent
c2c9053ca6
commit
a4408fc740
|
@ -162,7 +162,6 @@ class Sampler:
|
|||
if shm_period_s not in broadcasted:
|
||||
sub_pair = self.subscribers[shm_period_s]
|
||||
sub_pair[0] = i_epoch
|
||||
print(f'skipping `{shm_period_s}s` sample update')
|
||||
broadcasted.add(shm_period_s)
|
||||
|
||||
# TODO: ``numba`` this!
|
||||
|
@ -174,8 +173,8 @@ class Sampler:
|
|||
array = shm.array
|
||||
last = array[-1:][shm._write_fields].copy()
|
||||
|
||||
# guard against startup backfilling race with
|
||||
# empty buffers.
|
||||
# guard against startup backfilling races where
|
||||
# the buffer has not yet been filled.
|
||||
if not last.size:
|
||||
continue
|
||||
|
||||
|
@ -288,7 +287,9 @@ async def register_with_sampler(
|
|||
ctx: tractor.Context,
|
||||
period_s: float,
|
||||
shms_by_period: dict[float, dict] | None = None,
|
||||
open_index_stream: bool = True,
|
||||
|
||||
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
|
||||
sub_for_broadcasts: bool = True, # sampler side to send step updates?
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -341,15 +342,16 @@ async def register_with_sampler(
|
|||
if open_index_stream:
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
subs.add(stream)
|
||||
if sub_for_broadcasts:
|
||||
subs.add(stream)
|
||||
|
||||
# except broadcast requests from the subscriber
|
||||
async for msg in stream:
|
||||
if msg == 'broadcast_all':
|
||||
await Sampler.broadcast_all()
|
||||
|
||||
finally:
|
||||
subs.remove(stream)
|
||||
if sub_for_broadcasts:
|
||||
subs.remove(stream)
|
||||
else:
|
||||
# if no shms are passed in we just wait until cancelled
|
||||
# by caller.
|
||||
|
@ -404,6 +406,7 @@ async def spawn_samplerd(
|
|||
portal,
|
||||
register_with_sampler,
|
||||
period_s=1,
|
||||
sub_for_broadcasts=False,
|
||||
)
|
||||
return True
|
||||
|
||||
|
@ -437,9 +440,10 @@ async def maybe_open_samplerd(
|
|||
|
||||
@acm
|
||||
async def open_sample_stream(
|
||||
period_s: int,
|
||||
period_s: float,
|
||||
shms_by_period: dict[float, dict] | None = None,
|
||||
open_index_stream: bool = True,
|
||||
sub_for_broadcasts: bool = True,
|
||||
|
||||
cache_key: str | None = None,
|
||||
allow_new_sampler: bool = True,
|
||||
|
@ -482,6 +486,7 @@ async def open_sample_stream(
|
|||
'period_s': period_s,
|
||||
'shms_by_period': shms_by_period,
|
||||
'open_index_stream': open_index_stream,
|
||||
'sub_for_broadcasts': sub_for_broadcasts,
|
||||
},
|
||||
) as (ctx, first)
|
||||
):
|
||||
|
|
Loading…
Reference in New Issue