Implement Kucoin auth and last trades call

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-08 23:31:28 -05:00
parent dc02c115ba
commit 2a68ce0657
1 changed files with 135 additions and 76 deletions

View File

@ -18,32 +18,28 @@
""" """
from dataclasses import field from logging import warning
from typing import Any, Optional, Literal from typing import Any, Optional, Literal
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
import time import time
import math import math
from os import path, walk import base64
import hmac
import hashlib
import asks import asks
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
from cryptofeed.defines import KUCOIN, TRADES, L2_BOOK
from cryptofeed.symbols import Symbol
import pendulum import pendulum
import numpy as np import numpy as np
from piker.data.cryptofeeds import (
fqsn_to_cf_sym,
mk_stream_quotes,
get_config,
)
from piker._cacheables import open_cached_client from piker._cacheables import open_cached_client
from piker.log import get_logger from piker.log import get_logger
from piker.pp import config
from ._util import DataUnavailable from ._util import DataUnavailable
from piker.pp import config
_spawn_kwargs = { _spawn_kwargs = {
"infect_asyncio": True, "infect_asyncio": True,
@ -51,45 +47,80 @@ _spawn_kwargs = {
log = get_logger(__name__) log = get_logger(__name__)
_ohlc_dtype = [ _ohlc_dtype = [
('index', int), ("index", int),
('time', int), ("time", int),
('open', float), ("open", float),
('high', float), ("high", float),
('low', float), ("low", float),
('close', float), ("close", float),
('volume', float), ("volume", float),
('bar_wap', float), # will be zeroed by sampler if not filled ("bar_wap", float), # will be zeroed by sampler if not filled
] ]
class Client: class Client:
def __init__(self) -> None: def __init__(self) -> None:
self._pairs: dict[str, Symbol] = {} self._pairs: dict[str, any] = {}
self._bars: list[list] = [] self._bars: list[list] = []
# TODO" Shouldn't have to write kucoin twice here self._key_id: str
self._key_secret: str
self._key_passphrase: str
self._authenticated: bool = False
config = get_config("kucoin").get("kucoin", {}) config = get_config()
# breakpoint()
if ("key_id" in config) and ("key_secret" in config): if ("key_id" in config) and \
("key_secret" in config) and \
("key_passphrase" in config):
self._authenticated = True
self._key_id = config["key_id"] self._key_id = config["key_id"]
self._key_secret = config["key_secret"] self._key_secret = config["key_secret"]
self._key_passphrase = config["key_passphrase"]
else:
self._key_id = None
self._key_secret = None
async def _request( async def _request(
self, self,
action: Literal["POST", "GET", "PUT", "DELETE"], action: Literal["POST", "GET", "PUT", "DELETE"],
route: str, endpoint: str,
api_v: str = "v2", api_v: str = "v2",
) -> Any: ) -> Any:
api_url = f"https://api.kucoin.com/api/{api_v}{route}"
res = await asks.request(action, api_url) now = int(time.time() * 1000)
#breakpoint() path = f'/api/{api_v}{endpoint}'
try: str_to_sign = str(now) + action + path
headers = {}
# Add headers to request if authenticated
if self._authenticated:
signature = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
str_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
)
passphrase = base64.b64encode(
hmac.new(
self._key_secret.encode('utf-8'),
self._key_passphrase.encode('utf-8'),
hashlib.sha256
).digest()
)
headers = {
"KC-API-SIGN": signature,
"KC-API-TIMESTAMP": str(now),
"KC-API-KEY": self._key_id,
"KC-API-PASSPHRASE": passphrase,
"KC-API-KEY-VERSION": "2"
}
api_url = f"https://api.kucoin.com{path}"
res = await asks.request(action, api_url, headers=headers)
# breakpoint()
if "data" in res.json():
return res.json()["data"] return res.json()["data"]
except KeyError as e: else:
print(f'KUCOIN ERROR: {res.json()["msg"]}') print(f'KUCOIN ERROR: {res.json()["msg"]}')
breakpoint() breakpoint()
@ -106,14 +137,14 @@ class Client:
async def cache_pairs( async def cache_pairs(
self, self,
normalize: bool = True, normalize: bool = True,
) -> dict[str, Symbol]: ) -> dict[str, any]:
if not self._pairs: if not self._pairs:
self._pairs = await self.get_pairs() self._pairs = await self.get_pairs()
if normalize: if normalize:
self._pairs = self.normalize_pairs(self._pairs) self._pairs = self.normalize_pairs(self._pairs)
return self._pairs return self._pairs
def normalize_pairs(self, pairs: dict[str, Symbol]) -> dict[str, Symbol]: def normalize_pairs(self, pairs: dict[str, any]) -> dict[str, any]:
""" """
Map crypfeeds symbols to fqsn strings Map crypfeeds symbols to fqsn strings
@ -137,6 +168,10 @@ class Client:
# repack in dict form # repack in dict form
return {item[0]["name"].lower(): item[0] for item in matches} return {item[0]["name"].lower(): item[0] for item in matches}
async def last_trades(self, sym: str):
trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1")
return trades.items
async def get_bars( async def get_bars(
self, self,
fqsn: str, fqsn: str,
@ -172,28 +207,27 @@ class Client:
# TODO: implement struct/typecasting/validation here # TODO: implement struct/typecasting/validation here
data = { data = {
'index': i, "index": i,
'time': bar[0], "time": bar[0],
'open': bar[1], "open": bar[1],
'close': bar[2], "close": bar[2],
'high': bar[3], "high": bar[3],
'low': bar[4], "low": bar[4],
'volume': bar[5], "volume": bar[5],
'amount': bar [6], "amount": bar[6],
'bar_wap': 0.0, "bar_wap": 0.0,
} }
row = [] row = []
for j, (field_name, field_type) in enumerate(_ohlc_dtype): for j, (field_name, field_type) in enumerate(_ohlc_dtype):
value = data[field_name] value = data[field_name]
match field_name: match field_name:
case 'index' | 'time': case "index" | "time":
row.append(int(value)) row.append(int(value))
# case 'time': # case 'time':
# dt_from_unix_ts = datetime.utcfromtimestamp(int(value)) # dt_from_unix_ts = datetime.utcfromtimestamp(int(value))
# # convert unix time to epoch seconds # # convert unix time to epoch seconds
# row.append(int(dt_from_unix_ts.timestamp())) # row.append(int(dt_from_unix_ts.timestamp()))
case _: case _:
row.append(float(value)) row.append(float(value))
@ -203,6 +237,26 @@ class Client:
return array return array
def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, any]) -> str:
pair_data = pairs[fqsn]
return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"]
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kucoin')
# TODO: document why we send this, basically because logging params for cryptofeed
conf["log"] = {}
conf["log"]["disabled"] = True
if section is None:
log.warning("No config section found for deribit in kucoin")
return section
@acm @acm
async def get_client(): async def get_client():
client = Client() client = Client()
@ -217,7 +271,7 @@ async def open_symbol_search(
): ):
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = await client.cache_pairs() await client.cache_pairs()
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
@ -234,15 +288,24 @@ async def stream_quotes(
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
): ):
return await mk_stream_quotes( sym = symbols[0]
KUCOIN,
[L2_BOOK, TRADES], async with open_cached_client("kucoin") as client:
send_chan, init_msgs = {
symbols, # pass back token, and bool, signalling if we're the writer
feed_is_live, # and that history has been written
loglevel, sym: {
task_status, "symbol_info": {
) "asset_type": "option",
"price_tick_size": 0.0005,
"lot_tick_size": 0.1,
},
"shm_write_opts": {"sum_tick_vml": False},
"fqsn": sym,
},
}
last_trades = await client.last_trades(sym)
@acm @acm
@ -256,13 +319,9 @@ async def open_history_client(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[np.ndarray, datetime | None, datetime | None,]: # start # end
np.ndarray,
datetime | None, # start
datetime | None, # end
]:
if timeframe != 60: if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported') raise DataUnavailable("Only 1m bars are supported")
array = await client.get_bars( array = await client.get_bars(
symbol, symbol,
@ -270,13 +329,13 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
times = array['time'] times = array["time"]
if ( if end_dt is None:
end_dt is None
):
inow = round(time.time()) inow = round(time.time())
print(f'difference in time between load and processing {inow - times[-1]}') print(
f"difference in time between load and processing {inow - times[-1]}"
)
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
await tractor.breakpoint() await tractor.breakpoint()
@ -284,4 +343,4 @@ async def open_history_client(
end_dt = pendulum.from_timestamp(times[-1]) end_dt = pendulum.from_timestamp(times[-1])
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc_history, {'erlangs': 3, 'rate': 3} yield get_ohlc_history, {"erlangs": 3, "rate": 3}