diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index fed6b965..9a496e7a 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -27,6 +27,12 @@ from ._sharedmem import ShmArray _shms: Dict[int, ShmArray] = {} +_start_increment: Dict[str, trio.Event] = {} + + +def shm_incrementing(shm_token_name: str) -> trio.Event: + global _start_increment + return _start_increment.setdefault(shm_token_name, trio.Event()) @tractor.msg.pub @@ -47,6 +53,10 @@ async def increment_ohlc_buffer( Note that if **no** actor has initiated this task then **none** of the underlying buffers will actually be incremented. """ + + # wait for brokerd to signal we should start sampling + await shm_incrementing(shm_token['shm_name']).wait() + # TODO: right now we'll spin printing bars if the last time stamp is # before a large period of no market activity. Likely the best way # to solve this is to make this task aware of the instrument's