diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 14929d53..e1e9ebd9 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -18,32 +18,28 @@ """ -from dataclasses import field +from logging import warning from typing import Any, Optional, Literal from contextlib import asynccontextmanager as acm from datetime import datetime import time import math -from os import path, walk +import base64 +import hmac +import hashlib import asks import tractor import trio from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy -from cryptofeed.defines import KUCOIN, TRADES, L2_BOOK -from cryptofeed.symbols import Symbol import pendulum import numpy as np -from piker.data.cryptofeeds import ( - fqsn_to_cf_sym, - mk_stream_quotes, - get_config, -) + from piker._cacheables import open_cached_client from piker.log import get_logger -from piker.pp import config from ._util import DataUnavailable +from piker.pp import config _spawn_kwargs = { "infect_asyncio": True, @@ -51,45 +47,80 @@ _spawn_kwargs = { log = get_logger(__name__) _ohlc_dtype = [ - ('index', int), - ('time', int), - ('open', float), - ('high', float), - ('low', float), - ('close', float), - ('volume', float), - ('bar_wap', float), # will be zeroed by sampler if not filled + ("index", int), + ("time", int), + ("open", float), + ("high", float), + ("low", float), + ("close", float), + ("volume", float), + ("bar_wap", float), # will be zeroed by sampler if not filled ] class Client: def __init__(self) -> None: - self._pairs: dict[str, Symbol] = {} + self._pairs: dict[str, any] = {} self._bars: list[list] = [] - # TODO" Shouldn't have to write kucoin twice here + self._key_id: str + self._key_secret: str + self._key_passphrase: str + self._authenticated: bool = False - config = get_config("kucoin").get("kucoin", {}) - # - if ("key_id" in config) and ("key_secret" in config): + config = get_config() + breakpoint() + if ("key_id" in config) and \ + ("key_secret" in config) and \ + ("key_passphrase" in config): + self._authenticated = True self._key_id = config["key_id"] self._key_secret = config["key_secret"] - - else: - self._key_id = None - self._key_secret = None + self._key_passphrase = config["key_passphrase"] async def _request( self, action: Literal["POST", "GET", "PUT", "DELETE"], - route: str, + endpoint: str, api_v: str = "v2", ) -> Any: - api_url = f"https://api.kucoin.com/api/{api_v}{route}" - res = await asks.request(action, api_url) - #breakpoint() - try: + + now = int(time.time() * 1000) + path = f'/api/{api_v}{endpoint}' + str_to_sign = str(now) + action + path + headers = {} + + # Add headers to request if authenticated + if self._authenticated: + signature = base64.b64encode( + hmac.new( + self._key_secret.encode('utf-8'), + str_to_sign.encode('utf-8'), + hashlib.sha256 + ).digest() + ) + + passphrase = base64.b64encode( + hmac.new( + self._key_secret.encode('utf-8'), + self._key_passphrase.encode('utf-8'), + hashlib.sha256 + ).digest() + ) + + headers = { + "KC-API-SIGN": signature, + "KC-API-TIMESTAMP": str(now), + "KC-API-KEY": self._key_id, + "KC-API-PASSPHRASE": passphrase, + "KC-API-KEY-VERSION": "2" + } + + api_url = f"https://api.kucoin.com{path}" + res = await asks.request(action, api_url, headers=headers) + # breakpoint() + if "data" in res.json(): return res.json()["data"] - except KeyError as e: + else: print(f'KUCOIN ERROR: {res.json()["msg"]}') breakpoint() @@ -106,14 +137,14 @@ class Client: async def cache_pairs( self, normalize: bool = True, - ) -> dict[str, Symbol]: + ) -> dict[str, any]: if not self._pairs: self._pairs = await self.get_pairs() if normalize: self._pairs = self.normalize_pairs(self._pairs) return self._pairs - def normalize_pairs(self, pairs: dict[str, Symbol]) -> dict[str, Symbol]: + def normalize_pairs(self, pairs: dict[str, any]) -> dict[str, any]: """ Map crypfeeds symbols to fqsn strings @@ -137,6 +168,10 @@ class Client: # repack in dict form return {item[0]["name"].lower(): item[0] for item in matches} + async def last_trades(self, sym: str): + trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1") + return trades.items + async def get_bars( self, fqsn: str, @@ -158,7 +193,7 @@ class Client: # Format datetime to unix start_dt = math.trunc(time.mktime(start_dt.timetuple())) end_dt = math.trunc(time.mktime(end_dt.timetuple())) - kucoin_sym = fqsn_to_cf_sym(fqsn, self._pairs) + kucoin_sym = fqsn_to_cf_sym(fqsn, self._pairs) url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" bars = await self._request( @@ -170,39 +205,58 @@ class Client: new_bars = [] for i, bar in enumerate(bars[::-1]): # TODO: implement struct/typecasting/validation here - + data = { - 'index': i, - 'time': bar[0], - 'open': bar[1], - 'close': bar[2], - 'high': bar[3], - 'low': bar[4], - 'volume': bar[5], - 'amount': bar [6], - 'bar_wap': 0.0, + "index": i, + "time": bar[0], + "open": bar[1], + "close": bar[2], + "high": bar[3], + "low": bar[4], + "volume": bar[5], + "amount": bar[6], + "bar_wap": 0.0, } row = [] for j, (field_name, field_type) in enumerate(_ohlc_dtype): - - value = data[field_name] + value = data[field_name] match field_name: - case 'index' | 'time': + case "index" | "time": row.append(int(value)) # case 'time': - # dt_from_unix_ts = datetime.utcfromtimestamp(int(value)) - # # convert unix time to epoch seconds - # row.append(int(dt_from_unix_ts.timestamp())) - case _: + # dt_from_unix_ts = datetime.utcfromtimestamp(int(value)) + # # convert unix time to epoch seconds + # row.append(int(dt_from_unix_ts.timestamp())) + case _: row.append(float(value)) new_bars.append(tuple(row)) - + self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array +def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, any]) -> str: + pair_data = pairs[fqsn] + return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"] + + +def get_config() -> dict[str, Any]: + conf, path = config.load() + + section = conf.get('kucoin') + + # TODO: document why we send this, basically because logging params for cryptofeed + conf["log"] = {} + conf["log"]["disabled"] = True + + if section is None: + log.warning("No config section found for deribit in kucoin") + + return section + + @acm async def get_client(): client = Client() @@ -217,7 +271,7 @@ async def open_symbol_search( ): async with open_cached_client("kucoin") as client: # load all symbols locally for fast search - cache = await client.cache_pairs() + await client.cache_pairs() await ctx.started() async with ctx.open_stream() as stream: @@ -234,15 +288,24 @@ async def stream_quotes( # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ): - return await mk_stream_quotes( - KUCOIN, - [L2_BOOK, TRADES], - send_chan, - symbols, - feed_is_live, - loglevel, - task_status, - ) + sym = symbols[0] + + async with open_cached_client("kucoin") as client: + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + "symbol_info": { + "asset_type": "option", + "price_tick_size": 0.0005, + "lot_tick_size": 0.1, + }, + "shm_write_opts": {"sum_tick_vml": False}, + "fqsn": sym, + }, + } + + last_trades = await client.last_trades(sym) @acm @@ -256,13 +319,9 @@ async def open_history_client( timeframe: float, end_dt: datetime | None = None, start_dt: datetime | None = None, - ) -> tuple[ - np.ndarray, - datetime | None, # start - datetime | None, # end - ]: + ) -> tuple[np.ndarray, datetime | None, datetime | None,]: # start # end if timeframe != 60: - raise DataUnavailable('Only 1m bars are supported') + raise DataUnavailable("Only 1m bars are supported") array = await client.get_bars( symbol, @@ -270,13 +329,13 @@ async def open_history_client( end_dt=end_dt, ) - times = array['time'] + times = array["time"] - if ( - end_dt is None - ): + if end_dt is None: inow = round(time.time()) - print(f'difference in time between load and processing {inow - times[-1]}') + print( + f"difference in time between load and processing {inow - times[-1]}" + ) if (inow - times[-1]) > 60: await tractor.breakpoint() @@ -284,4 +343,4 @@ async def open_history_client( end_dt = pendulum.from_timestamp(times[-1]) return array, start_dt, end_dt - yield get_ohlc_history, {'erlangs': 3, 'rate': 3} + yield get_ohlc_history, {"erlangs": 3, "rate": 3}