Add back static API version in headers

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-11 18:55:40 -05:00
parent 88f6f89cbb
commit f89b408f64
1 changed files with 62 additions and 33 deletions

View File

@ -18,7 +18,7 @@
Kucoin broker backend
"""
from typing import Any, Optional, Literal
from typing import Any, Optional, Literal, AsyncGenerator
from contextlib import asynccontextmanager as acm
from datetime import datetime
import time
@ -26,6 +26,7 @@ import math
import base64
import hmac
import hashlib
import wsproto
import asks
import tractor
@ -39,6 +40,10 @@ from piker._cacheables import open_cached_client
from piker.log import get_logger
from ._util import DataUnavailable
from piker.pp import config
from ..data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
log = get_logger(__name__)
_ohlc_dtype = [
@ -53,6 +58,21 @@ _ohlc_dtype = [
]
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get("kucoin")
# TODO: document why we send this, basically because logging params for cryptofeed
conf["log"] = {}
conf["log"]["disabled"] = True
breakpoint()
if section is None:
log.warning("No config section found for kucoin in config")
return section
class Client:
def __init__(self) -> None:
self._pairs: dict[str, any] = {}
@ -64,9 +84,11 @@ class Client:
config = get_config()
if ("key_id" in config) and \
("key_secret" in config) and \
("key_passphrase" in config):
if (
("key_id" in config)
and ("key_secret" in config)
and ("key_passphrase" in config)
):
self._authenticated = True
self._key_id = config["key_id"]
self._key_secret = config["key_secret"]
@ -74,30 +96,30 @@ class Client:
def _gen_auth_req_headers(
self,
action: Literal["POST", "GET", "PUT", "DELETE"],
action: Literal["POST", "GET"],
endpoint: str,
api_v: str = "v2",
):
'''
"""
https://docs.kucoin.com/#authentication
'''
"""
now = int(time.time() * 1000)
path = f'/api/{api_v}{endpoint}'
path = f"/api/{api_v}{endpoint}"
str_to_sign = str(now) + action + path
signature = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
str_to_sign.encode('utf-8'),
hashlib.sha256
self._key_secret.encode("utf-8"),
str_to_sign.encode("utf-8"),
hashlib.sha256,
).digest()
)
passphrase = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
self._key_passphrase.encode('utf-8'),
hashlib.sha256
self._key_secret.encode("utf-8"),
self._key_passphrase.encode("utf-8"),
hashlib.sha256,
).digest()
)
@ -106,17 +128,16 @@ class Client:
"KC-API-TIMESTAMP": str(now),
"KC-API-KEY": self._key_id,
"KC-API-PASSPHRASE": passphrase,
"KC-API-KEY-VERSION": api_v[1]
"KC-API-KEY-VERSION": "2",
}
async def _request(
self,
action: Literal["POST", "GET", "PUT", "DELETE"],
action: Literal["POST", "GET"],
endpoint: str,
api_v: str = "v2",
headers: dict = {}
headers: dict = {},
) -> Any:
if self._authenticated:
headers = self._gen_auth_req_headers(action, endpoint, api_v)
@ -129,6 +150,11 @@ class Client:
print(f'KUCOIN ERROR: {res.json()["msg"]}')
breakpoint()
async def _get_ws_token(self, private: bool) -> str:
token_type = "private" if private else "public"
token = await self._request("POST", f"/bullet-{token_type}", "v1")
return token
async def get_pairs(
self,
) -> dict[str, Any]:
@ -175,6 +201,7 @@ class Client:
async def last_trades(self, sym: str):
trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1")
breakpoint()
return trades.items
async def get_bars(
@ -247,21 +274,6 @@ def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, any]) -> str:
return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"]
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kucoin')
# TODO: document why we send this, basically because logging params for cryptofeed
conf["log"] = {}
conf["log"]["disabled"] = True
if section is None:
log.warning("No config section found for deribit in kucoin")
return section
@acm
async def get_client():
client = Client()
@ -312,6 +324,23 @@ async def stream_quotes(
last_trades = await client.last_trades(sym)
# @acm
# async def subscribe(ws: wsproto.WSConnection):
token = await client._get_ws_token(True)
async with open_autorecon_ws(
f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={12345}]"
) as ws:
msg_gen = stream_messageS(ws)
async def stream_messageS(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0
while True:
with trio.move_on_after(3) as cs:
msg = await ws.recv_msg()
print(f"msg: {msg}")
@acm
async def open_history_client(