diff --git a/piker/data/feed.py b/piker/data/feed.py index 07df13a1..715cf9d1 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -325,11 +325,23 @@ async def attach_feed_bus( else: sub = (stream, tick_throttle) - bus._subscribers[symbol].append(sub) + subs = bus._subscribers[symbol] + subs.append(sub) try: - await trio.sleep_forever() + async for msg in stream: + if msg == 'pause': + log.info( + f'Pausing {symbol}.{brokername} feed for {ctx.chan.uid}') + subs.remove(sub) + + elif msg == 'resume': + log.info( + f'Resuming {symbol}.{brokername} feed for {ctx.chan.uid}') + subs.append(sub) + else: + raise ValueError(msg) finally: log.info( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')