Abstract header gen to seperate function

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-08 23:45:51 -05:00
parent 2a68ce0657
commit 88e51f6fde
1 changed files with 38 additions and 30 deletions

View File

@ -77,47 +77,55 @@ class Client:
self._key_secret = config["key_secret"]
self._key_passphrase = config["key_passphrase"]
def _gen_auth_req_headers(
self,
action: Literal["POST", "GET", "PUT", "DELETE"],
endpoint: str,
api_v: str = "v2",
):
now = int(time.time() * 1000)
path = f'/api/{api_v}{endpoint}'
str_to_sign = str(now) + action + path
# Add headers to request if authenticated
signature = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
str_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
)
passphrase = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
self._key_passphrase.encode('utf-8'),
hashlib.sha256
).digest()
)
return {
"KC-API-SIGN": signature,
"KC-API-TIMESTAMP": str(now),
"KC-API-KEY": self._key_id,
"KC-API-PASSPHRASE": passphrase,
"KC-API-KEY-VERSION": "2"
}
async def _request(
self,
action: Literal["POST", "GET", "PUT", "DELETE"],
endpoint: str,
api_v: str = "v2",
) -> Any:
now = int(time.time() * 1000)
path = f'/api/{api_v}{endpoint}'
str_to_sign = str(now) + action + path
headers = {}
# Add headers to request if authenticated
if self._authenticated:
signature = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
str_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
)
headers = self._gen_auth_req_headers(action, endpoint, api_v)
passphrase = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
self._key_passphrase.encode('utf-8'),
hashlib.sha256
).digest()
)
headers = {
"KC-API-SIGN": signature,
"KC-API-TIMESTAMP": str(now),
"KC-API-KEY": self._key_id,
"KC-API-PASSPHRASE": passphrase,
"KC-API-KEY-VERSION": "2"
}
api_url = f"https://api.kucoin.com{path}"
api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}"
res = await asks.request(action, api_url, headers=headers)
# breakpoint()
if "data" in res.json():
return res.json()["data"]
else: