Spawn background ping task

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-18 11:21:23 -04:00
parent afa68d2d59
commit a4195fccc6
1 changed files with 44 additions and 22 deletions

View File

@ -29,6 +29,7 @@ from uuid import uuid4
import asks import asks
import tractor import tractor
from tractor.trionics import maybe_open_context
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
@ -148,11 +149,13 @@ class Client:
print(f'KUCOIN ERROR: {res.json()["msg"]}') print(f'KUCOIN ERROR: {res.json()["msg"]}')
breakpoint() breakpoint()
async def _get_ws_token(self, private: bool = False) -> str | None: async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None:
token_type = "private" if private else "public" token_type = "private" if private else "public"
data = await self._request("POST", f"/bullet-{token_type}", "v1") data = await self._request("POST", f"/bullet-{token_type}", "v1")
if "token" in data: if "token" in data:
return data["token"] # return token and ping interval
ping_interval = data["instanceServers"][0]["pingInterval"]
return data["token"], ping_interval
else: else:
print(f'KUCOIN ERROR: {data.json()["msg"]}') print(f'KUCOIN ERROR: {data.json()["msg"]}')
breakpoint() breakpoint()
@ -311,6 +314,7 @@ async def stream_quotes(
connect_id = str(uuid4()) connect_id = str(uuid4())
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
token, ping_interval = await client._get_ws_token()
pairs = await client.cache_pairs() pairs = await client.cache_pairs()
kucoin_sym = pairs[sym]["symbol"] kucoin_sym = pairs[sym]["symbol"]
init_msgs = { init_msgs = {
@ -331,19 +335,36 @@ async def stream_quotes(
@acm @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
# await ws.send_msg({"id": connect_id, "type": "ping"})
# res = await ws.recv_msg() @acm
async def open_ping_task(ws: wsproto.WSConnection):
async with trio.open_nursery() as n:
async def ping_server():
while True:
await trio.sleep((ping_interval - 1000) / 1000)
print("PINGING")
await ws.send_msg({"id": connect_id, "type": "ping"})
n.start_soon(ping_server)
yield ws
n.cancel_scope.cancel()
# Spawn the ping task here
async with open_ping_task(ws) as _ws:
# subscribe to market feedz here
l1_sub = make_sub(kucoin_sym, connect_id) l1_sub = make_sub(kucoin_sym, connect_id)
await ws.send_msg(l1_sub) await _ws.send_msg(l1_sub)
res = await ws.recv_msg() res = await _ws.recv_msg()
# breakpoint()
# assert res["id"] == connect_id
yield yield
# unsub # unsub
if ws.connected(): if _ws.connected():
await ws.send_msg( await _ws.send_msg(
{ {
"id": connect_id, "id": connect_id,
"type": "unsubscribe", "type": "unsubscribe",
@ -353,7 +374,6 @@ async def stream_quotes(
} }
) )
token = await client._get_ws_token()
async with open_autorecon_ws( async with open_autorecon_ws(
f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]", f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]",
fixture=subscribe, fixture=subscribe,
@ -395,6 +415,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
await ws._connect() await ws._connect()
continue continue
if "subject" in msg and msg["subject"] == "trade.ticker": if "subject" in msg and msg["subject"] == "trade.ticker":
# TODO: cast msg into class # TODO: cast msg into class
trade_data = msg["data"] trade_data = msg["data"]
@ -411,6 +432,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
} }
], ],
} }
else: else:
continue continue