diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 654404ef..bf9ecbba 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -118,8 +118,9 @@ async def increment_ohlc_buffer( shm.push(last) # broadcast the buffer index step - # yield {'index': shm._last.value} - for ctx in _subscribers.get(delay_s, ()): + subs = _subscribers.get(delay_s, ()) + + for ctx in subs: try: await ctx.send_yield({'index': shm._last.value}) except ( @@ -127,6 +128,7 @@ async def increment_ohlc_buffer( trio.ClosedResourceError ): log.error(f'{ctx.chan.uid} dropped connection') + subs.remove(ctx) @tractor.stream