diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 669f624e..89228c96 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -15,7 +15,9 @@ # along with this program. If not, see . """ -Data buffers for fast shared humpy. +Sampling and broadcast machinery for (soft) real-time delivery of +financial data flows. + """ import time from typing import Dict, List @@ -48,7 +50,8 @@ async def increment_ohlc_buffer( delay_s: int, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ): - """Task which inserts new bars into the provide shared memory array + ''' + Task which inserts new bars into the provide shared memory array every ``delay_s`` seconds. This task fulfills 2 purposes: @@ -59,8 +62,8 @@ async def increment_ohlc_buffer( Note that if **no** actor has initiated this task then **none** of the underlying buffers will actually be incremented. - """ + ''' # # wait for brokerd to signal we should start sampling # await shm_incrementing(shm_token['shm_name']).wait() @@ -137,12 +140,12 @@ async def iter_ohlc_periods( delay_s: int, ) -> None: - """ + ''' Subscribe to OHLC sampling "step" events: when the time aggregation period increments, this event stream emits an index event. - """ + ''' # add our subscription global _subscribers subs = _subscribers.setdefault(delay_s, []) @@ -290,7 +293,10 @@ async def sample_and_broadcast( # so far seems like no since this should all # be single-threaded. Doing it anyway though # since there seems to be some kinda race.. - subs.remove((stream, tick_throttle)) + try: + subs.remove((stream, tick_throttle)) + except ValueError: + log.error(f'{stream} was already removed from subs!?') # TODO: a less naive throttler, here's some snippets: