diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 77b15d7f..428540a8 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -37,6 +37,9 @@ if TYPE_CHECKING: log = get_logger(__name__) +_default_delay_s: float = 1.0 + + class sampler: ''' Global sampling engine registry. @@ -104,14 +107,18 @@ async def increment_ohlc_buffer( # TODO: do we want to support dynamically # adding a "lower" lowest increment period? await trio.sleep(ad) - total_s += lowest + total_s += delay_s # increment all subscribed shm arrays # TODO: # - this in ``numba`` # - just lookup shms for this step instead of iterating? - for delay_s, shms in sampler.ohlcv_shms.items(): - if total_s % delay_s != 0: + for this_delay_s, shms in sampler.ohlcv_shms.items(): + + # short-circuit on any not-ready because slower sample + # rate consuming shm buffers. + if total_s % this_delay_s != 0: + # print(f'skipping `{this_delay_s}s` sample update') continue # TODO: ``numba`` this! @@ -152,7 +159,6 @@ async def broadcast( ''' subs = sampler.subscribers.get(delay_s, ()) - first = last = -1 if shm is None: