diff --git a/piker/data/feed.py b/piker/data/feed.py index 337a625c..0ee94ad9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -356,7 +356,10 @@ async def open_feed_bus( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') if tick_throttle: n.cancel_scope.cancel() - bus._subscribers[symbol].remove(sub) + try: + bus._subscribers[symbol].remove(sub) + except ValueError: + log.warning(f'{sub} for {symbol} was already removed?') @asynccontextmanager @@ -520,7 +523,12 @@ async def open_feed( ) as (ctx, (init_msg, first_quotes)), - ctx.open_stream() as stream, + ctx.open_stream( + # XXX: be explicit about stream backpressure since we should + # **never** overrun on feeds being too fast, which will + # pretty much always happen with HFT XD + backpressure=True + ) as stream, ): # we can only read from shm