diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index ecad241d..e52fd93a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -133,18 +133,20 @@ async def increment_ohlc_buffer( # a given sample period. subs = sampler.subscribers.get(delay_s, ()) - for ctx in subs: + for stream in subs: try: - await ctx.send_yield({'index': shm._last.value}) + await stream.send({'index': shm._last.value}) except ( trio.BrokenResourceError, trio.ClosedResourceError ): - log.error(f'{ctx.chan.uid} dropped connection') - subs.remove(ctx) + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + subs.remove(stream) -@tractor.stream +@tractor.context async def iter_ohlc_periods( ctx: tractor.Context, delay_s: int, @@ -158,18 +160,20 @@ async def iter_ohlc_periods( ''' # add our subscription subs = sampler.subscribers.setdefault(delay_s, []) - subs.append(ctx) + await ctx.started() + async with ctx.open_stream() as stream: + subs.append(stream) - try: - # stream and block until cancelled - await trio.sleep_forever() - finally: try: - subs.remove(ctx) - except ValueError: - log.error( - f'iOHLC step stream was already dropped for {ctx.chan.uid}?' - ) + # stream and block until cancelled + await trio.sleep_forever() + finally: + try: + subs.remove(stream) + except ValueError: + log.error( + f'iOHLC step stream was already dropped {ctx.chan.uid}?' + ) async def sample_and_broadcast(