diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 651b5d90..1aadfccd 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -38,6 +38,7 @@ from uuid import uuid4 import asks import tractor import trio +from trio_util import trio_async_generator from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy import pendulum @@ -454,7 +455,6 @@ async def stream_quotes( 'fqsn': sym, }, } - @acm async def subscribe(ws: wsproto.WSConnection): @acm @@ -505,16 +505,19 @@ async def stream_quotes( } ) - async with open_autorecon_ws( - f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]', - fixture=subscribe, - ) as ws: - msg_gen = stream_messages(ws, sym) - typ, quote = await msg_gen.__anext__() + async with ( + open_autorecon_ws( + f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]', + fixture=subscribe, + ) as ws, + stream_messages(ws, sym) as msg_gen, + ): + typ, quote = await anext(msg_gen) while typ != 'trade': # TODO: use ``anext()`` when it lands in 3.10! - typ, quote = await msg_gen.__anext__() + typ, quote = await anext(msg_gen) + task_status.started((init_msgs, quote)) feed_is_live.set() @@ -543,7 +546,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool] | None: 'response': True, } - +@trio_async_generator async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 last_trade_ts = 0