Implement working message streaming

small_kucoin_fixes
jaredgoldman 2023-03-15 20:03:16 -04:00
parent ac34ca7cad
commit a3c7bec576
1 changed files with 53 additions and 37 deletions

View File

@ -1,4 +1,3 @@
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
@ -152,7 +151,6 @@ class Client:
async def _get_ws_token(self, private: bool = False) -> str | None: async def _get_ws_token(self, private: bool = False) -> str | None:
token_type = "private" if private else "public" token_type = "private" if private else "public"
data = await self._request("POST", f"/bullet-{token_type}", "v1") data = await self._request("POST", f"/bullet-{token_type}", "v1")
breakpoint()
if "token" in data: if "token" in data:
return data["token"] return data["token"]
else: else:
@ -308,15 +306,13 @@ async def stream_quotes(
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
): ):
# TODO: Add multi-symbol functionality here # TODO: Add multi-symbol functionality here
sym = symbols[0] sym = symbols[0]
connect_id = 0 connect_id = str(uuid4())
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
pairs = await client.cache_pairs() pairs = await client.cache_pairs()
kucoin_sym = pairs[sym]['symbol'] kucoin_sym = pairs[sym]["symbol"]
init_msgs = { init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
@ -335,40 +331,47 @@ async def stream_quotes(
@acm @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
await ws.send_msg({ # await ws.send_msg({"id": connect_id, "type": "ping"})
"id": connect_id,
"type": "ping"
})
res = await ws.recv_msg()
breakpoint()
yield
# l1_sub = make_sub(kucoin_sym, sub_id)
# await ws.send_msg(l1_sub)
# res = await ws.recv_msg() # res = await ws.recv_msg()
l1_sub = make_sub(kucoin_sym, connect_id)
await ws.send_msg(l1_sub)
res = await ws.recv_msg()
# breakpoint() # breakpoint()
# assert res['id'] == connect_id # assert res["id"] == connect_id
#
# yield yield
#
# # unsub # unsub
# ws.send_msg({ await ws.send_msg(
# "id": sub_id, {
# "type": "unsubscribe", "id": connect_id,
# "topic": f"/market/ticker:{sym}", "type": "unsubscribe",
# "privateChannel": False, "topic": f"/market/ticker:{sym}",
# "response": True, "privateChannel": False,
# }) "response": True,
}
)
token = await client._get_ws_token() token = await client._get_ws_token()
breakpoint()
async with open_autorecon_ws( async with open_autorecon_ws(
f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={connect_id}]", f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]",
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws:
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws, sym)
typ, quote = await msg_gen.__anext__()
#
while typ != "trade":
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()
task_status.started((init_msgs, quote))
feed_is_live.set()
async for typ, msg in msg_gen:
await send_chan.send({sym: msg})
def make_sub(sym, connect_id): def make_sub(sym, connect_id):
breakpoint()
return { return {
"id": connect_id, "id": connect_id,
"type": "subscribe", "type": "subscribe",
@ -378,25 +381,38 @@ def make_sub(sym, connect_id):
} }
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
while True: while True:
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
msg = await ws.recv_msg() msg = await ws.recv_msg()
if cs.cancelled_caught: if cs.cancelled_caught:
timeouts += 1 timeouts += 1
if timeouts > 2: if timeouts > 2:
log.error("kucoin feed is sh**ing the bed... rebooting...") log.error("kucoin feed is sh**ing the bed... rebooting...")
await ws._connect() await ws._connect()
continue continue
if "subject" in msg and msg["subject"] == "trade.ticker":
# TODO: cast msg into class
trade_data = msg["data"]
yield "trade", {
"symbol": sym,
"last": trade_data["price"],
"brokerd_ts": trade_data["time"],
"ticks": [
{
"type": "trade",
"price": float(trade_data["price"]),
"size": float(trade_data["size"]),
"broker_ts": trade_data["time"],
}
],
}
else:
continue
breakpoint()
@acm @acm
async def open_history_client( async def open_history_client(