diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 46da1f37..9bb35b00 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,6 +26,7 @@ from typing import ( ) import time +from async_generator import aclosing import trio from trio_typing import TaskStatus import pendulum @@ -529,19 +530,21 @@ async def stream_quotes( # XXX: do we need to ack the unsub? # await ws.recv_msg() - async with open_autorecon_ws( - 'wss://stream.binance.com/ws', - # XXX: see api docs which show diff addr? - # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information - # 'wss://ws-api.binance.com:443/ws-api/v3', - fixture=subscribe, - ) as ws: - - # pull a first quote and deliver - msg_gen = stream_messages(ws) + async with ( + open_autorecon_ws( + # XXX: see api docs which show diff addr? + # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information + # 'wss://ws-api.binance.com:443/ws-api/v3', + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws, + # avoid stream-gen closure from breaking trio.. + aclosing(stream_messages(ws)) as msg_gen, + ): typ, quote = await anext(msg_gen) + # pull a first quote and deliver while typ != 'trade': typ, quote = await anext(msg_gen)