diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index f4ef0b97..64152b18 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -18,7 +18,7 @@ Kucoin broker backend """ -from typing import Any, Optional, Literal +from typing import Any, Optional, Literal, AsyncGenerator from contextlib import asynccontextmanager as acm from datetime import datetime import time @@ -26,6 +26,7 @@ import math import base64 import hmac import hashlib +import wsproto import asks import tractor @@ -39,6 +40,10 @@ from piker._cacheables import open_cached_client from piker.log import get_logger from ._util import DataUnavailable from piker.pp import config +from ..data._web_bs import ( + open_autorecon_ws, + NoBsWs, +) log = get_logger(__name__) _ohlc_dtype = [ @@ -53,6 +58,21 @@ _ohlc_dtype = [ ] +def get_config() -> dict[str, Any]: + conf, path = config.load() + + section = conf.get("kucoin") + + # TODO: document why we send this, basically because logging params for cryptofeed + conf["log"] = {} + conf["log"]["disabled"] = True + breakpoint() + if section is None: + log.warning("No config section found for kucoin in config") + + return section + + class Client: def __init__(self) -> None: self._pairs: dict[str, any] = {} @@ -64,9 +84,11 @@ class Client: config = get_config() - if ("key_id" in config) and \ - ("key_secret" in config) and \ - ("key_passphrase" in config): + if ( + ("key_id" in config) + and ("key_secret" in config) + and ("key_passphrase" in config) + ): self._authenticated = True self._key_id = config["key_id"] self._key_secret = config["key_secret"] @@ -74,30 +96,30 @@ class Client: def _gen_auth_req_headers( self, - action: Literal["POST", "GET", "PUT", "DELETE"], + action: Literal["POST", "GET"], endpoint: str, api_v: str = "v2", ): - ''' + """ https://docs.kucoin.com/#authentication - ''' + """ now = int(time.time() * 1000) - path = f'/api/{api_v}{endpoint}' + path = f"/api/{api_v}{endpoint}" str_to_sign = str(now) + action + path signature = base64.b64encode( hmac.new( - self._key_secret.encode('utf-8'), - str_to_sign.encode('utf-8'), - hashlib.sha256 + self._key_secret.encode("utf-8"), + str_to_sign.encode("utf-8"), + hashlib.sha256, ).digest() ) passphrase = base64.b64encode( hmac.new( - self._key_secret.encode('utf-8'), - self._key_passphrase.encode('utf-8'), - hashlib.sha256 + self._key_secret.encode("utf-8"), + self._key_passphrase.encode("utf-8"), + hashlib.sha256, ).digest() ) @@ -106,17 +128,16 @@ class Client: "KC-API-TIMESTAMP": str(now), "KC-API-KEY": self._key_id, "KC-API-PASSPHRASE": passphrase, - "KC-API-KEY-VERSION": api_v[1] + "KC-API-KEY-VERSION": "2", } async def _request( self, - action: Literal["POST", "GET", "PUT", "DELETE"], + action: Literal["POST", "GET"], endpoint: str, api_v: str = "v2", - headers: dict = {} + headers: dict = {}, ) -> Any: - if self._authenticated: headers = self._gen_auth_req_headers(action, endpoint, api_v) @@ -129,6 +150,11 @@ class Client: print(f'KUCOIN ERROR: {res.json()["msg"]}') breakpoint() + async def _get_ws_token(self, private: bool) -> str: + token_type = "private" if private else "public" + token = await self._request("POST", f"/bullet-{token_type}", "v1") + return token + async def get_pairs( self, ) -> dict[str, Any]: @@ -175,6 +201,7 @@ class Client: async def last_trades(self, sym: str): trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1") + breakpoint() return trades.items async def get_bars( @@ -247,21 +274,6 @@ def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, any]) -> str: return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"] -def get_config() -> dict[str, Any]: - conf, path = config.load() - - section = conf.get('kucoin') - - # TODO: document why we send this, basically because logging params for cryptofeed - conf["log"] = {} - conf["log"]["disabled"] = True - - if section is None: - log.warning("No config section found for deribit in kucoin") - - return section - - @acm async def get_client(): client = Client() @@ -312,6 +324,23 @@ async def stream_quotes( last_trades = await client.last_trades(sym) + # @acm + # async def subscribe(ws: wsproto.WSConnection): + + token = await client._get_ws_token(True) + async with open_autorecon_ws( + f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={12345}]" + ) as ws: + msg_gen = stream_messageS(ws) + + +async def stream_messageS(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: + timeouts = 0 + while True: + with trio.move_on_after(3) as cs: + msg = await ws.recv_msg() + print(f"msg: {msg}") + @acm async def open_history_client(