From a3c7bec5768c4345e436a0efb90dccc96991dd3b Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Wed, 15 Mar 2023 20:03:16 -0400 Subject: [PATCH] Implement working message streaming --- piker/brokers/kucoin.py | 90 ++++++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 37 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 4472bee7..50a4ba5e 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -1,4 +1,3 @@ - # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -152,7 +151,6 @@ class Client: async def _get_ws_token(self, private: bool = False) -> str | None: token_type = "private" if private else "public" data = await self._request("POST", f"/bullet-{token_type}", "v1") - breakpoint() if "token" in data: return data["token"] else: @@ -308,15 +306,13 @@ async def stream_quotes( # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ): - # TODO: Add multi-symbol functionality here sym = symbols[0] - connect_id = 0 + connect_id = str(uuid4()) async with open_cached_client("kucoin") as client: - pairs = await client.cache_pairs() - kucoin_sym = pairs[sym]['symbol'] + kucoin_sym = pairs[sym]["symbol"] init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written @@ -335,40 +331,47 @@ async def stream_quotes( @acm async def subscribe(ws: wsproto.WSConnection): - await ws.send_msg({ - "id": connect_id, - "type": "ping" - }) - res = await ws.recv_msg() - breakpoint() - yield - # l1_sub = make_sub(kucoin_sym, sub_id) - # await ws.send_msg(l1_sub) + # await ws.send_msg({"id": connect_id, "type": "ping"}) # res = await ws.recv_msg() + l1_sub = make_sub(kucoin_sym, connect_id) + await ws.send_msg(l1_sub) + res = await ws.recv_msg() # breakpoint() - # assert res['id'] == connect_id - # - # yield - # - # # unsub - # ws.send_msg({ - # "id": sub_id, - # "type": "unsubscribe", - # "topic": f"/market/ticker:{sym}", - # "privateChannel": False, - # "response": True, - # }) + # assert res["id"] == connect_id + + yield + + # unsub + await ws.send_msg( + { + "id": connect_id, + "type": "unsubscribe", + "topic": f"/market/ticker:{sym}", + "privateChannel": False, + "response": True, + } + ) token = await client._get_ws_token() - breakpoint() async with open_autorecon_ws( - f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={connect_id}]", + f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]", fixture=subscribe, ) as ws: - msg_gen = stream_messages(ws) + msg_gen = stream_messages(ws, sym) + typ, quote = await msg_gen.__anext__() + # + while typ != "trade": + # TODO: use ``anext()`` when it lands in 3.10! + typ, quote = await msg_gen.__anext__() + + task_status.started((init_msgs, quote)) + feed_is_live.set() + + async for typ, msg in msg_gen: + await send_chan.send({sym: msg}) + def make_sub(sym, connect_id): - breakpoint() return { "id": connect_id, "type": "subscribe", @@ -378,25 +381,38 @@ def make_sub(sym, connect_id): } -async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: - +async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 while True: with trio.move_on_after(3) as cs: msg = await ws.recv_msg() - if cs.cancelled_caught: - timeouts += 1 - if timeouts > 2: log.error("kucoin feed is sh**ing the bed... rebooting...") await ws._connect() continue + if "subject" in msg and msg["subject"] == "trade.ticker": + # TODO: cast msg into class + trade_data = msg["data"] + yield "trade", { + "symbol": sym, + "last": trade_data["price"], + "brokerd_ts": trade_data["time"], + "ticks": [ + { + "type": "trade", + "price": float(trade_data["price"]), + "size": float(trade_data["size"]), + "broker_ts": trade_data["time"], + } + ], + } + else: + continue - breakpoint() @acm async def open_history_client(