Use `aclosing()` on all msg async-gens

kraken_ws_orders
Tyler Goodlet 2022-07-15 16:26:18 -04:00
parent 0fca1b3e1a
commit b96b7a8b9c
1 changed files with 8 additions and 9 deletions

View File

@ -28,6 +28,7 @@ from typing import (
) )
import time import time
from async_generator import aclosing
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import pendulum import pendulum
@ -407,17 +408,15 @@ async def stream_quotes(
# see the tips on reconnection logic: # see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs ws: NoBsWs
async with open_autorecon_ws( async with (
'wss://ws.kraken.com/', open_autorecon_ws(
fixture=subscribe, 'wss://ws.kraken.com/',
) as ws: fixture=subscribe,
) as ws,
aclosing(process_data_feed_msgs(ws)) as msg_gen,
):
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await anext(msg_gen) typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))