`binance`: wrap streamer async-gen in `aclosing()`

binance_ws_ep_update
Tyler Goodlet 2023-03-13 12:29:17 -04:00
parent 9197e6decb
commit 973e4b5f44
1 changed files with 13 additions and 10 deletions

View File

@ -26,6 +26,7 @@ from typing import (
) )
import time import time
from async_generator import aclosing
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum import pendulum
@ -529,19 +530,21 @@ async def stream_quotes(
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
async with open_autorecon_ws( async with (
'wss://stream.binance.com/ws', open_autorecon_ws(
# XXX: see api docs which show diff addr? # XXX: see api docs which show diff addr?
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
# 'wss://ws-api.binance.com:443/ws-api/v3', # 'wss://ws-api.binance.com:443/ws-api/v3',
'wss://stream.binance.com/ws',
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws,
# pull a first quote and deliver
msg_gen = stream_messages(ws)
# avoid stream-gen closure from breaking trio..
aclosing(stream_messages(ws)) as msg_gen,
):
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)
# pull a first quote and deliver
while typ != 'trade': while typ != 'trade':
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)