From 973e4b5f445399348654d4729bb723e34b5352ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 13 Mar 2023 12:29:17 -0400 Subject: [PATCH] `binance`: wrap streamer async-gen in `aclosing()` --- piker/brokers/binance.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 46da1f37..9bb35b00 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,6 +26,7 @@ from typing import ( ) import time +from async_generator import aclosing import trio from trio_typing import TaskStatus import pendulum @@ -529,19 +530,21 @@ async def stream_quotes( # XXX: do we need to ack the unsub? # await ws.recv_msg() - async with open_autorecon_ws( - 'wss://stream.binance.com/ws', - # XXX: see api docs which show diff addr? - # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information - # 'wss://ws-api.binance.com:443/ws-api/v3', - fixture=subscribe, - ) as ws: - - # pull a first quote and deliver - msg_gen = stream_messages(ws) + async with ( + open_autorecon_ws( + # XXX: see api docs which show diff addr? + # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information + # 'wss://ws-api.binance.com:443/ws-api/v3', + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws, + # avoid stream-gen closure from breaking trio.. + aclosing(stream_messages(ws)) as msg_gen, + ): typ, quote = await anext(msg_gen) + # pull a first quote and deliver while typ != 'trade': typ, quote = await anext(msg_gen)