Use anext() in kucoin stream_quotes

small_kucoin_fixes
jaredgoldman 2023-04-12 20:25:35 -04:00
parent d2f3a79c09
commit ace04af21a
1 changed files with 12 additions and 9 deletions

View File

@ -38,6 +38,7 @@ from uuid import uuid4
import asks import asks
import tractor import tractor
import trio import trio
from trio_util import trio_async_generator
from trio_typing import TaskStatus from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import pendulum import pendulum
@ -454,7 +455,6 @@ async def stream_quotes(
'fqsn': sym, 'fqsn': sym,
}, },
} }
@acm @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
@acm @acm
@ -505,16 +505,19 @@ async def stream_quotes(
} }
) )
async with open_autorecon_ws( async with (
f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]', open_autorecon_ws(
fixture=subscribe, f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]',
) as ws: fixture=subscribe,
msg_gen = stream_messages(ws, sym) ) as ws,
typ, quote = await msg_gen.__anext__() stream_messages(ws, sym) as msg_gen,
):
typ, quote = await anext(msg_gen)
while typ != 'trade': while typ != 'trade':
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await anext(msg_gen)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
@ -543,7 +546,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool] | None:
'response': True, 'response': True,
} }
@trio_async_generator
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
last_trade_ts = 0 last_trade_ts = 0