Implement Kucoin auth and last trades call
parent
8e91e215b3
commit
7074ca7713
|
@ -18,32 +18,28 @@
|
|||
|
||||
"""
|
||||
|
||||
from dataclasses import field
|
||||
from logging import warning
|
||||
from typing import Any, Optional, Literal
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from datetime import datetime
|
||||
import time
|
||||
import math
|
||||
from os import path, walk
|
||||
import base64
|
||||
import hmac
|
||||
import hashlib
|
||||
|
||||
import asks
|
||||
import tractor
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from fuzzywuzzy import process as fuzzy
|
||||
from cryptofeed.defines import KUCOIN, TRADES, L2_BOOK
|
||||
from cryptofeed.symbols import Symbol
|
||||
import pendulum
|
||||
import numpy as np
|
||||
from piker.data.cryptofeeds import (
|
||||
fqsn_to_cf_sym,
|
||||
mk_stream_quotes,
|
||||
get_config,
|
||||
)
|
||||
|
||||
from piker._cacheables import open_cached_client
|
||||
from piker.log import get_logger
|
||||
from piker.pp import config
|
||||
from ._util import DataUnavailable
|
||||
from piker.pp import config
|
||||
|
||||
_spawn_kwargs = {
|
||||
"infect_asyncio": True,
|
||||
|
@ -51,45 +47,80 @@ _spawn_kwargs = {
|
|||
|
||||
log = get_logger(__name__)
|
||||
_ohlc_dtype = [
|
||||
('index', int),
|
||||
('time', int),
|
||||
('open', float),
|
||||
('high', float),
|
||||
('low', float),
|
||||
('close', float),
|
||||
('volume', float),
|
||||
('bar_wap', float), # will be zeroed by sampler if not filled
|
||||
("index", int),
|
||||
("time", int),
|
||||
("open", float),
|
||||
("high", float),
|
||||
("low", float),
|
||||
("close", float),
|
||||
("volume", float),
|
||||
("bar_wap", float), # will be zeroed by sampler if not filled
|
||||
]
|
||||
|
||||
|
||||
class Client:
|
||||
def __init__(self) -> None:
|
||||
self._pairs: dict[str, Symbol] = {}
|
||||
self._pairs: dict[str, any] = {}
|
||||
self._bars: list[list] = []
|
||||
# TODO" Shouldn't have to write kucoin twice here
|
||||
self._key_id: str
|
||||
self._key_secret: str
|
||||
self._key_passphrase: str
|
||||
self._authenticated: bool = False
|
||||
|
||||
config = get_config("kucoin").get("kucoin", {})
|
||||
#
|
||||
if ("key_id" in config) and ("key_secret" in config):
|
||||
config = get_config()
|
||||
breakpoint()
|
||||
if ("key_id" in config) and \
|
||||
("key_secret" in config) and \
|
||||
("key_passphrase" in config):
|
||||
self._authenticated = True
|
||||
self._key_id = config["key_id"]
|
||||
self._key_secret = config["key_secret"]
|
||||
|
||||
else:
|
||||
self._key_id = None
|
||||
self._key_secret = None
|
||||
self._key_passphrase = config["key_passphrase"]
|
||||
|
||||
async def _request(
|
||||
self,
|
||||
action: Literal["POST", "GET", "PUT", "DELETE"],
|
||||
route: str,
|
||||
endpoint: str,
|
||||
api_v: str = "v2",
|
||||
) -> Any:
|
||||
api_url = f"https://api.kucoin.com/api/{api_v}{route}"
|
||||
res = await asks.request(action, api_url)
|
||||
#breakpoint()
|
||||
try:
|
||||
|
||||
now = int(time.time() * 1000)
|
||||
path = f'/api/{api_v}{endpoint}'
|
||||
str_to_sign = str(now) + action + path
|
||||
headers = {}
|
||||
|
||||
# Add headers to request if authenticated
|
||||
if self._authenticated:
|
||||
signature = base64.b64encode(
|
||||
hmac.new(
|
||||
self._key_secret.encode('utf-8'),
|
||||
str_to_sign.encode('utf-8'),
|
||||
hashlib.sha256
|
||||
).digest()
|
||||
)
|
||||
|
||||
passphrase = base64.b64encode(
|
||||
hmac.new(
|
||||
self._key_secret.encode('utf-8'),
|
||||
self._key_passphrase.encode('utf-8'),
|
||||
hashlib.sha256
|
||||
).digest()
|
||||
)
|
||||
|
||||
headers = {
|
||||
"KC-API-SIGN": signature,
|
||||
"KC-API-TIMESTAMP": str(now),
|
||||
"KC-API-KEY": self._key_id,
|
||||
"KC-API-PASSPHRASE": passphrase,
|
||||
"KC-API-KEY-VERSION": "2"
|
||||
}
|
||||
|
||||
api_url = f"https://api.kucoin.com{path}"
|
||||
res = await asks.request(action, api_url, headers=headers)
|
||||
# breakpoint()
|
||||
if "data" in res.json():
|
||||
return res.json()["data"]
|
||||
except KeyError as e:
|
||||
else:
|
||||
print(f'KUCOIN ERROR: {res.json()["msg"]}')
|
||||
breakpoint()
|
||||
|
||||
|
@ -106,14 +137,14 @@ class Client:
|
|||
async def cache_pairs(
|
||||
self,
|
||||
normalize: bool = True,
|
||||
) -> dict[str, Symbol]:
|
||||
) -> dict[str, any]:
|
||||
if not self._pairs:
|
||||
self._pairs = await self.get_pairs()
|
||||
if normalize:
|
||||
self._pairs = self.normalize_pairs(self._pairs)
|
||||
return self._pairs
|
||||
|
||||
def normalize_pairs(self, pairs: dict[str, Symbol]) -> dict[str, Symbol]:
|
||||
def normalize_pairs(self, pairs: dict[str, any]) -> dict[str, any]:
|
||||
"""
|
||||
Map crypfeeds symbols to fqsn strings
|
||||
|
||||
|
@ -137,6 +168,10 @@ class Client:
|
|||
# repack in dict form
|
||||
return {item[0]["name"].lower(): item[0] for item in matches}
|
||||
|
||||
async def last_trades(self, sym: str):
|
||||
trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1")
|
||||
return trades.items
|
||||
|
||||
async def get_bars(
|
||||
self,
|
||||
fqsn: str,
|
||||
|
@ -172,28 +207,27 @@ class Client:
|
|||
# TODO: implement struct/typecasting/validation here
|
||||
|
||||
data = {
|
||||
'index': i,
|
||||
'time': bar[0],
|
||||
'open': bar[1],
|
||||
'close': bar[2],
|
||||
'high': bar[3],
|
||||
'low': bar[4],
|
||||
'volume': bar[5],
|
||||
'amount': bar [6],
|
||||
'bar_wap': 0.0,
|
||||
"index": i,
|
||||
"time": bar[0],
|
||||
"open": bar[1],
|
||||
"close": bar[2],
|
||||
"high": bar[3],
|
||||
"low": bar[4],
|
||||
"volume": bar[5],
|
||||
"amount": bar[6],
|
||||
"bar_wap": 0.0,
|
||||
}
|
||||
|
||||
row = []
|
||||
for j, (field_name, field_type) in enumerate(_ohlc_dtype):
|
||||
|
||||
value = data[field_name]
|
||||
match field_name:
|
||||
case 'index' | 'time':
|
||||
case "index" | "time":
|
||||
row.append(int(value))
|
||||
# case 'time':
|
||||
# dt_from_unix_ts = datetime.utcfromtimestamp(int(value))
|
||||
# # convert unix time to epoch seconds
|
||||
# row.append(int(dt_from_unix_ts.timestamp()))
|
||||
# dt_from_unix_ts = datetime.utcfromtimestamp(int(value))
|
||||
# # convert unix time to epoch seconds
|
||||
# row.append(int(dt_from_unix_ts.timestamp()))
|
||||
case _:
|
||||
row.append(float(value))
|
||||
|
||||
|
@ -203,6 +237,26 @@ class Client:
|
|||
return array
|
||||
|
||||
|
||||
def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, any]) -> str:
|
||||
pair_data = pairs[fqsn]
|
||||
return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"]
|
||||
|
||||
|
||||
def get_config() -> dict[str, Any]:
|
||||
conf, path = config.load()
|
||||
|
||||
section = conf.get('kucoin')
|
||||
|
||||
# TODO: document why we send this, basically because logging params for cryptofeed
|
||||
conf["log"] = {}
|
||||
conf["log"]["disabled"] = True
|
||||
|
||||
if section is None:
|
||||
log.warning("No config section found for deribit in kucoin")
|
||||
|
||||
return section
|
||||
|
||||
|
||||
@acm
|
||||
async def get_client():
|
||||
client = Client()
|
||||
|
@ -217,7 +271,7 @@ async def open_symbol_search(
|
|||
):
|
||||
async with open_cached_client("kucoin") as client:
|
||||
# load all symbols locally for fast search
|
||||
cache = await client.cache_pairs()
|
||||
await client.cache_pairs()
|
||||
await ctx.started()
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
@ -234,15 +288,24 @@ async def stream_quotes(
|
|||
# startup sync
|
||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
return await mk_stream_quotes(
|
||||
KUCOIN,
|
||||
[L2_BOOK, TRADES],
|
||||
send_chan,
|
||||
symbols,
|
||||
feed_is_live,
|
||||
loglevel,
|
||||
task_status,
|
||||
)
|
||||
sym = symbols[0]
|
||||
|
||||
async with open_cached_client("kucoin") as client:
|
||||
init_msgs = {
|
||||
# pass back token, and bool, signalling if we're the writer
|
||||
# and that history has been written
|
||||
sym: {
|
||||
"symbol_info": {
|
||||
"asset_type": "option",
|
||||
"price_tick_size": 0.0005,
|
||||
"lot_tick_size": 0.1,
|
||||
},
|
||||
"shm_write_opts": {"sum_tick_vml": False},
|
||||
"fqsn": sym,
|
||||
},
|
||||
}
|
||||
|
||||
last_trades = await client.last_trades(sym)
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -256,13 +319,9 @@ async def open_history_client(
|
|||
timeframe: float,
|
||||
end_dt: datetime | None = None,
|
||||
start_dt: datetime | None = None,
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
datetime | None, # start
|
||||
datetime | None, # end
|
||||
]:
|
||||
) -> tuple[np.ndarray, datetime | None, datetime | None,]: # start # end
|
||||
if timeframe != 60:
|
||||
raise DataUnavailable('Only 1m bars are supported')
|
||||
raise DataUnavailable("Only 1m bars are supported")
|
||||
|
||||
array = await client.get_bars(
|
||||
symbol,
|
||||
|
@ -270,13 +329,13 @@ async def open_history_client(
|
|||
end_dt=end_dt,
|
||||
)
|
||||
|
||||
times = array['time']
|
||||
times = array["time"]
|
||||
|
||||
if (
|
||||
end_dt is None
|
||||
):
|
||||
if end_dt is None:
|
||||
inow = round(time.time())
|
||||
print(f'difference in time between load and processing {inow - times[-1]}')
|
||||
print(
|
||||
f"difference in time between load and processing {inow - times[-1]}"
|
||||
)
|
||||
if (inow - times[-1]) > 60:
|
||||
await tractor.breakpoint()
|
||||
|
||||
|
@ -284,4 +343,4 @@ async def open_history_client(
|
|||
end_dt = pendulum.from_timestamp(times[-1])
|
||||
return array, start_dt, end_dt
|
||||
|
||||
yield get_ohlc_history, {'erlangs': 3, 'rate': 3}
|
||||
yield get_ohlc_history, {"erlangs": 3, "rate": 3}
|
||||
|
|
Loading…
Reference in New Issue