From 61bb60a81063799a06d9b5460f237e8dce4a8bd7 Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Sun, 19 Mar 2023 13:11:33 -0400 Subject: [PATCH] Clean up broker code, Add typecasting for messages/rt-data and historcal user trades ensure we're fetching all history add multi-symbol support ' --- piker/brokers/kucoin.py | 324 ++++++++++++++++++++++++++-------------- 1 file changed, 213 insertions(+), 111 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index fc497eb0..35d8f690 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -1,3 +1,6 @@ +# piker: trading gear for hackers +# Copyright (C) Jared Goldman (in stewardship for piker0) + # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -13,9 +16,10 @@ """ Kucoin broker backend + """ -from typing import Any, Optional, Literal, AsyncGenerator +from typing import Any, Callable, Optional, Literal, AsyncGenerator from contextlib import asynccontextmanager as acm from datetime import datetime import time @@ -23,13 +27,11 @@ import math import base64 import hmac import hashlib - import wsproto from uuid import uuid4 import asks import tractor -from tractor.trionics import maybe_open_context import trio from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy @@ -40,12 +42,14 @@ from piker._cacheables import open_cached_client from piker.log import get_logger from ._util import DataUnavailable from piker.pp import config +from ..data.types import Struct from ..data._web_bs import ( open_autorecon_ws, NoBsWs, ) log = get_logger(__name__) + _ohlc_dtype = [ ("index", int), ("time", int), @@ -58,24 +62,92 @@ _ohlc_dtype = [ ] -def get_config() -> dict[str, Any]: +def get_config() -> dict[str, dict]: conf, path = config.load() section = conf.get("kucoin") - # TODO: document why we send this, basically because logging params for cryptofeed - conf["log"] = {} - conf["log"]["disabled"] = True if section is None: log.warning("No config section found for kucoin in config") return section +class KucoinMktPair(Struct, frozen=True): + ''' + Kucoin's pair format + + ''' + baseCurrency: str + baseIncrement: float + baseMaxSize: float + baseMinSize: float + enableTrading: bool + feeCurrency: str + isMarginEnabled: bool + market: str + minFunds: float + name: str + priceIncrement: float + priceLimitRate: float + quoteCurrency: str + quoteIncrement: float + quoteMaxSize: float + quoteMinSize: float + symbol: str + + +class AccountTrade(Struct, frozen=True): + ''' + Historical trade format + + ''' + id: str + currency: str + amount: float + fee: float + balance: float + accountType: str + bizType: str + direction: Literal["in", "out"] + createdAt: float + context: list[str] + + +class AccountResponse(Struct, frozen=True): + currentPage: int + pageSize: int + totalNum: int + totalPage: int + items: list[AccountTrade] + + +class KucoinTrade(Struct, frozen=True): + ''' + Real-time trade format + + ''' + bestAsk: float + bestAskSize: float + bestBid: float + bestBidSize: float + price: float + sequence: float + size: float + time: float + + +class KucoinTradeMsg(Struct, frozen=True): + type: str + topic: str + subject: str + data: list[KucoinTrade] + + class Client: def __init__(self) -> None: - self._pairs: dict[str, any] = {} - self._bars: list[list] = [] + self._pairs: dict[str, KucoinMktPair] = {} + self._bars: list[list[float]] = [] self._key_id: str self._key_secret: str self._key_passphrase: str @@ -84,7 +156,7 @@ class Client: config = get_config() if ( - ("key_id" in config) + float("key_id" in config) and ("key_secret" in config) and ("key_passphrase" in config) ): @@ -98,10 +170,12 @@ class Client: action: Literal["POST", "GET"], endpoint: str, api_v: str = "v2", - ): - """ + ) -> dict[str, str]: + ''' + Generate authenticated request headers https://docs.kucoin.com/#authentication - """ + + ''' now = int(time.time() * 1000) path = f"/api/{api_v}{endpoint}" str_to_sign = str(now) + action + path @@ -127,6 +201,7 @@ class Client: "KC-API-TIMESTAMP": str(now), "KC-API-KEY": self._key_id, "KC-API-PASSPHRASE": passphrase, + # XXX: Even if using the v1 api - this stays the same "KC-API-KEY-VERSION": "2", } @@ -136,7 +211,11 @@ class Client: endpoint: str, api_v: str = "v2", headers: dict = {}, - ) -> Any: + ) -> dict[str, Any]: + ''' + Generic request wrapper for Kucoin API + + ''' if self._authenticated: headers = self._gen_auth_req_headers(action, endpoint, api_v) @@ -146,43 +225,56 @@ class Client: if "data" in res.json(): return res.json()["data"] else: - print(f'KUCOIN ERROR: {res.json()["msg"]}') - breakpoint() + log.error(f'Error making request to {api_url} -> {res.json()["msg"]}') - async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None: + async def _get_ws_token( + self, + private: bool = False + ) -> tuple[str, int] | None: + ''' + Fetch ws token needed for sub access + + ''' token_type = "private" if private else "public" data = await self._request("POST", f"/bullet-{token_type}", "v1") + if "token" in data: - # return token and ping interval ping_interval = data["instanceServers"][0]["pingInterval"] return data["token"], ping_interval else: - print(f'KUCOIN ERROR: {data.json()["msg"]}') - breakpoint() + log.error( + f'Error making request for Kucoin ws token -> {res.json()["msg"]}' + ) async def get_pairs( self, - ) -> dict[str, Any]: + ) -> dict[str, KucoinMktPair]: if self._pairs: return self._pairs entries = await self._request("GET", "/symbols") - syms = {item["name"]: item for item in entries} + syms = {item["name"]: KucoinMktPair(**item) for item in entries} return syms async def cache_pairs( self, normalize: bool = True, - ) -> dict[str, any]: + ) -> dict[str, KucoinMktPair]: + ''' + Get cached pairs and convert keyed symbols into fqsns if ya want + + ''' if not self._pairs: self._pairs = await self.get_pairs() if normalize: self._pairs = self.normalize_pairs(self._pairs) return self._pairs - def normalize_pairs(self, pairs: dict[str, any]) -> dict[str, any]: + def normalize_pairs( + self, pairs: dict[str, KucoinMktPair] + ) -> dict[str, KucoinMktPair]: """ - Map crypfeeds symbols to fqsn strings + Map kucoin pairs to fqsn strings """ norm_pairs = {} @@ -197,14 +289,14 @@ class Client: self, pattern: str, limit: int = 30, - ) -> dict[str, Any]: + ) -> dict[str, KucoinMktPair]: data = await self.get_pairs() matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) # repack in dict form - return {item[0]["name"].lower(): item[0] for item in matches} + return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches} - async def last_trades(self, sym: str): + async def last_trades(self, sym: str) -> AccountResponse: trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1") return trades.items @@ -216,20 +308,23 @@ class Client: limit: int = 1000, as_np: bool = True, type: str = "1min", - ): - if len(self._bars): - return self._bars + ) -> np.ndarray: + ''' + Get OHLC data and convert to numpy array for perffff + ''' + # Generate generic end and start time if values not passed if end_dt is None: end_dt = pendulum.now("UTC").add(minutes=1) if start_dt is None: start_dt = end_dt.start_of("minute").subtract(minutes=limit) - # Format datetime to unix + # Format datetime to unix timestamp start_dt = math.trunc(time.mktime(start_dt.timetuple())) end_dt = math.trunc(time.mktime(end_dt.timetuple())) - kucoin_sym = fqsn_to_cf_sym(fqsn, self._pairs) + kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) + url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" bars = await self._request( @@ -238,9 +333,9 @@ class Client: api_v="v1", ) + # Map to OHLC values to dict then to np array new_bars = [] for i, bar in enumerate(bars[::-1]): - # TODO: implement struct/typecasting/validation here data = { "index": i, @@ -256,14 +351,12 @@ class Client: row = [] for j, (field_name, field_type) in enumerate(_ohlc_dtype): + value = data[field_name] + match field_name: case "index" | "time": row.append(int(value)) - # case 'time': - # dt_from_unix_ts = datetime.utcfromtimestamp(int(value)) - # # convert unix time to epoch seconds - # row.append(int(dt_from_unix_ts.timestamp())) case _: row.append(float(value)) @@ -273,23 +366,31 @@ class Client: return array -def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, any]) -> str: +def fqsn_to_kucoin_sym( + fqsn: str, + pairs: dict[str, KucoinMktPair] +) -> str: pair_data = pairs[fqsn] - return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"] + return pair_data.baseCurrency + "-" + pair_data.quoteCurrency + + +def kucoin_sym_to_fqsn(sym: str) -> str: + return sym.lower().replace("-", "") @acm -async def get_client(): +async def get_client() -> AsyncGenerator[Client, None]: + client = Client() - # Do we need to open a nursery here? await client.cache_pairs() + yield client @tractor.context async def open_symbol_search( ctx: tractor.Context, -): +) -> None: async with open_cached_client("kucoin") as client: # load all symbols locally for fast search await client.cache_pairs() @@ -297,7 +398,6 @@ async def open_symbol_search( async with ctx.open_stream() as stream: async for pattern in stream: - # repack in dict form await stream.send(await client.search_symbols(pattern)) @@ -308,91 +408,93 @@ async def stream_quotes( loglevel: str = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, -): - # TODO: Add multi-symbol functionality here - sym = symbols[0] +) -> None: + ''' + Required piker api to stream real-time data. + Where the rubber hits the road baby + + ''' connect_id = str(uuid4()) async with open_cached_client("kucoin") as client: - token, ping_interval = await client._get_ws_token() - pairs = await client.cache_pairs() - kucoin_sym = pairs[sym]["symbol"] - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - "symbol_info": { - "asset_type": "crypto", - "price_tick_size": 0.0005, - "lot_tick_size": 0.1, + + # map through symbols and sub to feedz + for sym in symbols: + + token, ping_interval = await client._get_ws_token() + pairs = await client.cache_pairs() + kucoin_sym = pairs[sym].symbol + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + "symbol_info": { + "asset_type": "crypto", + "price_tick_size": 0.0005, + "lot_tick_size": 0.1, + }, + "shm_write_opts": {"sum_tick_vml": False}, + "fqsn": sym, }, - "shm_write_opts": {"sum_tick_vml": False}, - "fqsn": sym, - }, - } - - last_trades = await client.last_trades(sym) - - @acm - async def subscribe(ws: wsproto.WSConnection): + } @acm - async def open_ping_task(ws: wsproto.WSConnection): - async with trio.open_nursery() as n: + async def subscribe(ws: wsproto.WSConnection): + @acm + async def open_ping_task(ws: wsproto.WSConnection): + async with trio.open_nursery() as n: - async def ping_server(): - while True: - await trio.sleep((ping_interval - 1000) / 1000) - print("PINGING") - await ws.send_msg({"id": connect_id, "type": "ping"}) + async def ping_server(): + while True: + await trio.sleep((ping_interval - 1000) / 1000) + await ws.send_msg({"id": connect_id, "type": "ping"}) - n.start_soon(ping_server) + n.start_soon(ping_server) - yield ws + yield ws - n.cancel_scope.cancel() + n.cancel_scope.cancel() - # Spawn the ping task here - async with open_ping_task(ws) as _ws: + # Spawn the ping task here + async with open_ping_task(ws) as ws: + # subscribe to market feedz here + l1_sub = make_sub(kucoin_sym, connect_id) + await ws.send_msg(l1_sub) - # subscribe to market feedz here - l1_sub = make_sub(kucoin_sym, connect_id) - await _ws.send_msg(l1_sub) - res = await _ws.recv_msg() + yield - yield + # unsub + if ws.connected(): + await ws.send_msg( + { + "id": connect_id, + "type": "unsubscribe", + "topic": f"/market/ticker:{sym}", + "privateChannel": False, + "response": True, + } + ) - # unsub - if _ws.connected(): - await _ws.send_msg( - { - "id": connect_id, - "type": "unsubscribe", - "topic": f"/market/ticker:{sym}", - "privateChannel": False, - "response": True, - } - ) - - async with open_autorecon_ws( - f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]", - fixture=subscribe, - ) as ws: - msg_gen = stream_messages(ws, sym) - typ, quote = await msg_gen.__anext__() - # - while typ != "trade": - # TODO: use ``anext()`` when it lands in 3.10! + async with open_autorecon_ws( + f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]", + fixture=subscribe, + ) as ws: + msg_gen = stream_messages(ws, sym) typ, quote = await msg_gen.__anext__() + # + while typ != "trade": + # TODO: use ``anext()`` when it lands in 3.10! + typ, quote = await msg_gen.__anext__() - task_status.started((init_msgs, quote)) - feed_is_live.set() + task_status.started((init_msgs, quote)) + feed_is_live.set() - async for typ, msg in msg_gen: - await send_chan.send({sym: msg}) + async for typ, msg in msg_gen: + await send_chan.send({sym: msg}) -def make_sub(sym, connect_id): +def make_sub(sym, connect_id) -> dict[str, str | bool]: return { "id": connect_id, "type": "subscribe", @@ -441,7 +543,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: async def open_history_client( symbol: str, type: str = "1m", -): +) -> AsyncGenerator[Callable, None]: async with open_cached_client("kucoin") as client: # call bars on kucoin async def get_ohlc_history(