diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 2d0efd7d..fc497eb0 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -29,6 +29,7 @@ from uuid import uuid4 import asks import tractor +from tractor.trionics import maybe_open_context import trio from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy @@ -148,11 +149,13 @@ class Client: print(f'KUCOIN ERROR: {res.json()["msg"]}') breakpoint() - async def _get_ws_token(self, private: bool = False) -> str | None: + async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None: token_type = "private" if private else "public" data = await self._request("POST", f"/bullet-{token_type}", "v1") if "token" in data: - return data["token"] + # return token and ping interval + ping_interval = data["instanceServers"][0]["pingInterval"] + return data["token"], ping_interval else: print(f'KUCOIN ERROR: {data.json()["msg"]}') breakpoint() @@ -311,6 +314,7 @@ async def stream_quotes( connect_id = str(uuid4()) async with open_cached_client("kucoin") as client: + token, ping_interval = await client._get_ws_token() pairs = await client.cache_pairs() kucoin_sym = pairs[sym]["symbol"] init_msgs = { @@ -331,29 +335,45 @@ async def stream_quotes( @acm async def subscribe(ws: wsproto.WSConnection): - # await ws.send_msg({"id": connect_id, "type": "ping"}) - # res = await ws.recv_msg() - l1_sub = make_sub(kucoin_sym, connect_id) - await ws.send_msg(l1_sub) - res = await ws.recv_msg() - # breakpoint() - # assert res["id"] == connect_id - yield + @acm + async def open_ping_task(ws: wsproto.WSConnection): + async with trio.open_nursery() as n: - # unsub - if ws.connected(): - await ws.send_msg( - { - "id": connect_id, - "type": "unsubscribe", - "topic": f"/market/ticker:{sym}", - "privateChannel": False, - "response": True, - } - ) + async def ping_server(): + while True: + await trio.sleep((ping_interval - 1000) / 1000) + print("PINGING") + await ws.send_msg({"id": connect_id, "type": "ping"}) + + n.start_soon(ping_server) + + yield ws + + n.cancel_scope.cancel() + + # Spawn the ping task here + async with open_ping_task(ws) as _ws: + + # subscribe to market feedz here + l1_sub = make_sub(kucoin_sym, connect_id) + await _ws.send_msg(l1_sub) + res = await _ws.recv_msg() + + yield + + # unsub + if _ws.connected(): + await _ws.send_msg( + { + "id": connect_id, + "type": "unsubscribe", + "topic": f"/market/ticker:{sym}", + "privateChannel": False, + "response": True, + } + ) - token = await client._get_ws_token() async with open_autorecon_ws( f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]", fixture=subscribe, @@ -395,6 +415,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: await ws._connect() continue + if "subject" in msg and msg["subject"] == "trade.ticker": # TODO: cast msg into class trade_data = msg["data"] @@ -411,6 +432,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: } ], } + else: continue