diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 4966db8a..e8cfd9b6 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -28,6 +28,7 @@ from typing import ( ) import time +from async_generator import aclosing from fuzzywuzzy import process as fuzzy import numpy as np import pendulum @@ -407,17 +408,15 @@ async def stream_quotes( # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds ws: NoBsWs - async with open_autorecon_ws( - 'wss://ws.kraken.com/', - fixture=subscribe, - ) as ws: - + async with ( + open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws, + aclosing(process_data_feed_msgs(ws)) as msg_gen, + ): # pull a first quote and deliver - msg_gen = process_data_feed_msgs(ws) - - # TODO: use ``anext()`` when it lands in 3.10! typ, ohlc_last = await anext(msg_gen) - topic, quote = normalize(ohlc_last) task_status.started((init_msgs, quote))