From ac34ca7cad20c115dc3e73bc1940de368688e2a0 Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Tue, 14 Mar 2023 15:05:04 -0400 Subject: [PATCH] Add sub method to flow Stash for checkout of master --- piker/brokers/kucoin.py | 79 +++++++++++++++++++++++++++++++++-------- piker/data/_web_bs.py | 3 +- 2 files changed, 67 insertions(+), 15 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index ef70bfc0..4472bee7 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -1,5 +1,3 @@ -# piker: trading gear for hackers -# Copyright (C) Jared Goldman (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -26,7 +24,8 @@ import math import base64 import hmac import hashlib -# import wsproto + +import wsproto from uuid import uuid4 import asks @@ -150,9 +149,10 @@ class Client: print(f'KUCOIN ERROR: {res.json()["msg"]}') breakpoint() - async def _get_ws_token(self, private: bool) -> str: + async def _get_ws_token(self, private: bool = False) -> str | None: token_type = "private" if private else "public" data = await self._request("POST", f"/bullet-{token_type}", "v1") + breakpoint() if "token" in data: return data["token"] else: @@ -308,15 +308,21 @@ async def stream_quotes( # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ): + + # TODO: Add multi-symbol functionality here sym = symbols[0] + connect_id = 0 async with open_cached_client("kucoin") as client: + + pairs = await client.cache_pairs() + kucoin_sym = pairs[sym]['symbol'] init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written sym: { "symbol_info": { - "asset_type": "option", + "asset_type": "crypto", "price_tick_size": 0.0005, "lot_tick_size": 0.1, }, @@ -327,25 +333,70 @@ async def stream_quotes( last_trades = await client.last_trades(sym) - # @acm - # async def subscribe(ws: wsproto.WSConnection): - - token = await client._get_ws_token(True) - connect_id = str(uuid4()) - async with open_autorecon_ws( - f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={connect_id}]" - ) as ws: + @acm + async def subscribe(ws: wsproto.WSConnection): + await ws.send_msg({ + "id": connect_id, + "type": "ping" + }) + res = await ws.recv_msg() breakpoint() + yield + # l1_sub = make_sub(kucoin_sym, sub_id) + # await ws.send_msg(l1_sub) + # res = await ws.recv_msg() + # breakpoint() + # assert res['id'] == connect_id + # + # yield + # + # # unsub + # ws.send_msg({ + # "id": sub_id, + # "type": "unsubscribe", + # "topic": f"/market/ticker:{sym}", + # "privateChannel": False, + # "response": True, + # }) + + token = await client._get_ws_token() + breakpoint() + async with open_autorecon_ws( + f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={connect_id}]", + fixture=subscribe, + ) as ws: msg_gen = stream_messages(ws) +def make_sub(sym, connect_id): + breakpoint() + return { + "id": connect_id, + "type": "subscribe", + "topic": f"/market/ticker:{sym}", + "privateChannel": False, + "response": True, + } + async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: + timeouts = 0 + while True: with trio.move_on_after(3) as cs: msg = await ws.recv_msg() - print(f"msg: {msg}") + if cs.cancelled_caught: + + timeouts += 1 + + if timeouts > 2: + log.error("kucoin feed is sh**ing the bed... rebooting...") + await ws._connect() + + continue + + breakpoint() @acm async def open_history_client( diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 2dd7f4af..3a397f7e 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -100,6 +100,7 @@ class NoBsWs: last_err = None for i in range(tries): try: + breakpoint() self._ws = await self._stack.enter_async_context( trio_websocket.open_websocket_url(self.url) ) @@ -166,7 +167,7 @@ async def open_autorecon_ws( # TODO: proper type cannot smh fixture: Optional[Callable] = None, -) -> AsyncGenerator[tuple[...], NoBsWs]: +) -> AsyncGenerator[tuple[...], NoBsWs]: """Apparently we can QoS for all sorts of reasons..so catch em. """