diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 19726ff6..d326f376 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -11,20 +11,14 @@ # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see -# . +# along with this program. If not, see . -''' +""" Kucoin broker backend -''' +""" -from typing import ( - Any, - Callable, - Literal, - AsyncGenerator -) +from typing import Any, Callable, Literal, AsyncGenerator from contextlib import asynccontextmanager as acm from datetime import datetime import time @@ -56,23 +50,24 @@ from ..data._web_bs import ( log = get_logger(__name__) _ohlc_dtype = [ - ('index', int), - ('time', int), - ('open', float), - ('high', float), - ('low', float), - ('close', float), - ('volume', float), - ('bar_wap', float), # will be zeroed by sampler if not filled + ("index", int), + ("time", int), + ("open", float), + ("high", float), + ("low", float), + ("close", float), + ("volume", float), + ("bar_wap", float), # will be zeroed by sampler if not filled ] class KucoinMktPair(Struct, frozen=True): - ''' + """ Kucoin's pair format: https://docs.kucoin.com/#get-symbols-list - ''' + """ + baseCurrency: str baseIncrement: float baseMaxSize: float @@ -93,11 +88,12 @@ class KucoinMktPair(Struct, frozen=True): class AccountTrade(Struct, frozen=True): - ''' + """ Historical trade format: https://docs.kucoin.com/#get-account-ledgers - ''' + """ + id: str currency: str amount: float @@ -105,16 +101,17 @@ class AccountTrade(Struct, frozen=True): balance: float accountType: str bizType: str - direction: Literal['in', 'out'] + direction: Literal["in", "out"] createdAt: float context: list[str] class AccountResponse(Struct, frozen=True): - ''' + """ https://docs.kucoin.com/#get-account-ledgers - ''' + """ + currentPage: int pageSize: int totalNum: int @@ -123,11 +120,12 @@ class AccountResponse(Struct, frozen=True): class KucoinTrade(Struct, frozen=True): - ''' + """ Real-time trade format: https://docs.kucoin.com/#symbol-ticker - ''' + """ + bestAsk: float bestAskSize: float bestBid: float @@ -139,21 +137,23 @@ class KucoinTrade(Struct, frozen=True): class KucoinL2(Struct, frozen=True): - ''' + """ Real-time L2 order book format: https://docs.kucoin.com/#level2-5-best-ask-bid-orders - ''' + """ + asks: list[list[float]] bids: list[list[float]] timestamp: float class KucoinMsg(Struct, frozen=True): - ''' + """ Generic outer-wrapper for any Kucoin ws msg - ''' + """ + type: str topic: str subject: str @@ -169,10 +169,10 @@ class BrokerConfig(Struct, frozen=True): def get_config() -> BrokerConfig | None: conf, _ = config.load() - section = conf.get('kucoin') + section = conf.get("kucoin") if section is None: - log.warning('No config section found for kucoin in config') + log.warning("No config section found for kucoin in config") return None return BrokerConfig(**section).copy() @@ -186,118 +186,119 @@ class Client: def _gen_auth_req_headers( self, - action: Literal['POST', 'GET'], + action: Literal["POST", "GET"], endpoint: str, - api_v: str = 'v2', + api_v: str = "v2", ) -> dict[str, str | bytes]: - ''' + """ Generate authenticated request headers https://docs.kucoin.com/#authentication - ''' - str_to_sign = str(int(time.time() * 1000)) + \ - action + f'/api/{api_v}{endpoint}' + """ + str_to_sign = ( + str(int(time.time() * 1000)) + action + f"/api/{api_v}{endpoint}" + ) signature = base64.b64encode( hmac.new( - self._config.key_secret.encode('utf-8'), - str_to_sign.encode('utf-8'), + self._config.key_secret.encode("utf-8"), + str_to_sign.encode("utf-8"), hashlib.sha256, ).digest() ) passphrase = base64.b64encode( hmac.new( - self._config.key_secret.encode('utf-8'), - self._config.key_passphrase.encode('utf-8'), + self._config.key_secret.encode("utf-8"), + self._config.key_passphrase.encode("utf-8"), hashlib.sha256, ).digest() ) return { - 'KC-API-SIGN': signature, - 'KC-API-TIMESTAMP': str(pendulum.now().int_timestamp * 1000), - 'KC-API-KEY': self._config.key_id, - 'KC-API-PASSPHRASE': passphrase, + "KC-API-SIGN": signature, + "KC-API-TIMESTAMP": str(pendulum.now().int_timestamp * 1000), + "KC-API-KEY": self._config.key_id, + "KC-API-PASSPHRASE": passphrase, # XXX: Even if using the v1 api - this stays the same - 'KC-API-KEY-VERSION': '2', + "KC-API-KEY-VERSION": "2", } async def _request( self, - action: Literal['POST', 'GET'], + action: Literal["POST", "GET"], endpoint: str, - api_v: str = 'v2', + api_v: str = "v2", headers: dict = {}, ) -> Any: - ''' + """ Generic request wrapper for Kucoin API - ''' + """ if self._config: - headers = self._gen_auth_req_headers( - action, endpoint, api_v) + headers = self._gen_auth_req_headers(action, endpoint, api_v) - api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' + api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}" res = await asks.request(action, api_url, headers=headers) - if 'data' in res.json(): - return res.json()['data'] + if "data" in res.json(): + return res.json()["data"] else: log.error( - f'Error making request to {api_url} -> {res.json()["msg"]}') - return res.json()['msg'] + f'Error making request to {api_url} -> {res.json()["msg"]}' + ) + return res.json()["msg"] async def _get_ws_token( self, private: bool = False, ) -> tuple[str, int] | None: - ''' + """ Fetch ws token needed for sub access: https://docs.kucoin.com/#apply-connect-token returns a token and the interval we must ping the server at to keep the connection alive - ''' - token_type = 'private' if private else 'public' + """ + token_type = "private" if private else "public" try: data: dict[str, Any] | None = await self._request( - 'POST', f'/bullet-{token_type}', 'v1' + "POST", f"/bullet-{token_type}", "v1" ) except Exception as e: - log.error( - f'Error making request for Kucoin ws token -> {str(e)}') + log.error(f"Error making request for Kucoin ws token -> {str(e)}") return None - if data and 'token' in data: + if data and "token" in data: # ping_interval is in ms - ping_interval: int = data['instanceServers'][0]['pingInterval'] - return data['token'], ping_interval + ping_interval: int = data["instanceServers"][0]["pingInterval"] + return data["token"], ping_interval elif data: log.error( - f'Error making request for Kucoin ws token -> {data.json()["msg"]}' + 'Error making request for Kucoin ws token' + f'{data.json()["msg"]}' ) async def _get_pairs( self, ) -> dict[str, KucoinMktPair]: - entries = await self._request('GET', '/symbols') + entries = await self._request("GET", "/symbols") syms = { - kucoin_sym_to_fqsn( - item['name']): KucoinMktPair( - **item) for item in entries} + kucoin_sym_to_fqsn(item["name"]): KucoinMktPair(**item) + for item in entries + } - log.info(f' {len(syms)} Kucoin market pairs fetched') + log.info(f" {len(syms)} Kucoin market pairs fetched") return syms async def cache_pairs( self, ) -> dict[str, KucoinMktPair]: - ''' + """ Get cached pairs and convert keyed symbols into fqsns if ya want - ''' + """ if not self._pairs: self._pairs = await self._get_pairs() @@ -311,12 +312,15 @@ class Client: data = await self.cache_pairs() matches = fuzzy.extractBests( - pattern, data, score_cutoff=35, limit=limit) + pattern, data, score_cutoff=35, limit=limit + ) # repack in dict form return {item[0].name: item[0] for item in matches} async def last_trades(self, sym: str) -> list[AccountTrade]: - trades = await self._request('GET', f'/accounts/ledgers?currency={sym}', 'v1') + trades = await self._request( + "GET", f"/accounts/ledgers?currency={sym}", "v1" + ) trades = AccountResponse(**trades) return trades.items @@ -327,9 +331,9 @@ class Client: end_dt: datetime | None = None, limit: int = 1000, as_np: bool = True, - type: str = '1min', + type: str = "1min", ) -> np.ndarray: - ''' + """ Get OHLC data and convert to numpy array for perffff: https://docs.kucoin.com/#get-klines @@ -356,35 +360,40 @@ class Client: ('bar_wap', float), # will be zeroed by sampler if not filled ] - ''' + """ # Generate generic end and start time if values not passed # Currently gives us 12hrs of data if end_dt is None: - end_dt = pendulum.now('UTC').add(minutes=1) + end_dt = pendulum.now("UTC").add(minutes=1) if start_dt is None: - start_dt = end_dt.start_of( - 'minute').subtract(minutes=limit) + start_dt = end_dt.start_of("minute").subtract(minutes=limit) start_dt = int(start_dt.timestamp()) end_dt = int(end_dt.timestamp()) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) - url = f'/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}' + url = ( + f"/market/candles?type={type}" + f"&symbol={kucoin_sym}" + f"&startAt={start_dt}" + f"&endAt={end_dt}" + ) for i in range(10): data = await self._request( - 'GET', + "GET", url, - api_v='v1', + api_v="v1", ) if not isinstance(data, list): # Do a gradual backoff if Kucoin is rate limiting us backoff_interval = i log.warn( - f'History call failed, backing off for {backoff_interval}s') + f"History call failed, backing off for {backoff_interval}s" + ) await trio.sleep(backoff_interval) else: bars: list[list[str]] = data @@ -416,20 +425,17 @@ class Client: ) ) - array = np.array( - new_bars, - dtype=_ohlc_dtype) if as_np else bars + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array -def fqsn_to_kucoin_sym( - fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: +def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: pair_data = pairs[fqsn] - return pair_data.baseCurrency + '-' + pair_data.quoteCurrency + return pair_data.baseCurrency + "-" + pair_data.quoteCurrency def kucoin_sym_to_fqsn(sym: str) -> str: - return sym.lower().replace('-', '') + return sym.lower().replace("-", "") @acm @@ -444,7 +450,7 @@ async def get_client() -> AsyncGenerator[Client, None]: async def open_symbol_search( ctx: tractor.Context, ) -> None: - async with open_cached_client('kucoin') as client: + async with open_cached_client("kucoin") as client: # load all symbols locally for fast search await client.cache_pairs() await ctx.started() @@ -452,25 +458,25 @@ async def open_symbol_search( async with ctx.open_stream() as stream: async for pattern in stream: await stream.send(await client.search_symbols(pattern)) - log.info('Kucoin symbol search opened') + log.info("Kucoin symbol search opened") @acm async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id): - ''' + """ Spawn a non-blocking task that pings the ws server every ping_interval so Kucoin doesn't drop our connection - ''' + """ async with trio.open_nursery() as n: # TODO: cache this task so it's only called once async def ping_server(): while True: await trio.sleep((ping_interval - 1000) / 1000) - await ws.send_msg({'id': connect_id, 'type': 'ping'}) + await ws.send_msg({"id": connect_id, "type": "ping"}) - log.info(f'Starting ping task for kucoin ws connection') + log.info("Starting ping task for kucoin ws connection") n.start_soon(ping_server) yield @@ -482,29 +488,30 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = '', + loglevel: str = "", # startup sync - task_status: TaskStatus[tuple[dict, dict] - ] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - ''' + """ Required piker api to stream real-time data. Where the rubber hits the road baby - ''' - async with open_cached_client('kucoin') as client: + """ + async with open_cached_client("kucoin") as client: token, ping_interval = await client._get_ws_token() connect_id = str(uuid4()) pairs = await client.cache_pairs() + ws_url = ( + f"wss://ws-api-spot.kucoin.com/?" + f"token={token}&[connectId={connect_id}]" + ) - # open ping task + # open ping task async with ( - open_autorecon_ws( - f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]' - ) as ws, + open_autorecon_ws(ws_url) as ws, open_ping_task(ws, ping_interval, connect_id), ): - log.info('Starting up quote stream') + log.info("Starting up quote stream") # loop through symbols and sub to feedz for sym in symbols: pair: KucoinMktPair = pairs[sym] @@ -514,13 +521,13 @@ async def stream_quotes( # pass back token, and bool, signalling if we're the writer # and that history has been written sym: { - 'symbol_info': { - 'asset_type': 'crypto', - 'price_tick_size': float(pair.baseIncrement), - 'lot_tick_size': float(pair.baseMinSize), + "symbol_info": { + "asset_type": "crypto", + "price_tick_size": float(pair.baseIncrement), + "lot_tick_size": float(pair.baseMinSize), }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, + "shm_write_opts": {"sum_tick_vml": False}, + "fqsn": sym, } } @@ -529,7 +536,7 @@ async def stream_quotes( stream_messages(ws, sym) as msg_gen, ): typ, quote = await anext(msg_gen) - while typ != 'trade': + while typ != "trade": # take care to not unblock here until we get a real # trade quote typ, quote = await anext(msg_gen) @@ -544,41 +551,47 @@ async def stream_quotes( @acm async def subscribe(ws: wsproto.WSConnection, connect_id, sym): # level 2 sub - await ws.send_msg({ - 'id': connect_id, - 'type': 'subscribe', - 'topic': f'/spotMarket/level2Depth5:{sym}', - 'privateChannel': False, - 'response': True, - }) + await ws.send_msg( + { + "id": connect_id, + "type": "subscribe", + "topic": f"/spotMarket/level2Depth5:{sym}", + "privateChannel": False, + "response": True, + } + ) # watch trades - await ws.send_msg({ - 'id': connect_id, - 'type': 'subscribe', - 'topic': f'/market/ticker:{sym}', - 'privateChannel': False, - 'response': True, - }) + await ws.send_msg( + { + "id": connect_id, + "type": "subscribe", + "topic": f"/market/ticker:{sym}", + "privateChannel": False, + "response": True, + } + ) yield # unsub if ws.connected(): - log.info(f'Unsubscribing to {sym} feed') + log.info(f"Unsubscribing to {sym} feed") await ws.send_msg( { - 'id': connect_id, - 'type': 'unsubscribe', - 'topic': f'/market/ticker:{sym}', - 'privateChannel': False, - 'response': True, + "id": connect_id, + "type": "unsubscribe", + "topic": f"/market/ticker:{sym}", + "privateChannel": False, + "response": True, } ) @trio_async_generator -async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: +async def stream_messages( + ws: NoBsWs, sym: str +) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 last_trade_ts = 0 @@ -588,65 +601,65 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: if cs.cancelled_caught: timeouts += 1 if timeouts > 2: - log.error( - 'kucoin feed is sh**ing the bed... rebooting...') + log.error("kucoin feed is sh**ing the bed... rebooting...") await ws._connect() continue - if msg.get('subject'): + if msg.get("subject"): msg = KucoinMsg(**msg) match msg.subject: - case 'trade.ticker': + case "trade.ticker": trade_data = KucoinTrade(**msg.data) - # XXX: Filter for duplicate messages as ws feed will send duplicate market state + # XXX: Filter for duplicate messages as ws feed will + # send duplicate market state # https://docs.kucoin.com/#level2-5-best-ask-bid-orders if trade_data.time == last_trade_ts: continue last_trade_ts = trade_data.time - yield 'trade', { - 'symbol': sym, - 'last': trade_data.price, - 'brokerd_ts': last_trade_ts, - 'ticks': [ + yield "trade", { + "symbol": sym, + "last": trade_data.price, + "brokerd_ts": last_trade_ts, + "ticks": [ { - 'type': 'trade', - 'price': float(trade_data.price), - 'size': float(trade_data.size), - 'broker_ts': last_trade_ts, + "type": "trade", + "price": float(trade_data.price), + "size": float(trade_data.size), + "broker_ts": last_trade_ts, } ], } - case 'level2': + case "level2": l2_data = KucoinL2(**msg.data) first_ask = l2_data.asks[0] first_bid = l2_data.bids[0] - yield 'l1', { - 'symbol': sym, - 'ticks': [ + yield "l1", { + "symbol": sym, + "ticks": [ { - 'type': 'bid', - 'price': float(first_bid[0]), - 'size': float(first_bid[1]), + "type": "bid", + "price": float(first_bid[0]), + "size": float(first_bid[1]), }, { - 'type': 'bsize', - 'price': float(first_bid[0]), - 'size': float(first_bid[1]), + "type": "bsize", + "price": float(first_bid[0]), + "size": float(first_bid[1]), }, { - 'type': 'ask', - 'price': float(first_ask[0]), - 'size': float(first_ask[1]), + "type": "ask", + "price": float(first_ask[0]), + "size": float(first_ask[1]), }, { - 'type': 'asize', - 'price': float(first_ask[0]), - 'size': float(first_ask[1]), + "type": "asize", + "price": float(first_ask[0]), + "size": float(first_ask[1]), }, ], } @@ -655,20 +668,20 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: @acm async def open_history_client( symbol: str, - type: str = '1m', + type: str = "1m", ) -> AsyncGenerator[Callable, None]: - async with open_cached_client('kucoin') as client: - - log.info('Attempting to open kucoin history client') + async with open_cached_client("kucoin") as client: + log.info("Attempting to open kucoin history client") async def get_ohlc_history( timeframe: float, end_dt: datetime | None = None, start_dt: datetime | None = None, - ) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end - + ) -> tuple[ + np.ndarray, datetime | None, datetime | None + ]: # start # end if timeframe != 60: - raise DataUnavailable('Only 1m bars are supported') + raise DataUnavailable("Only 1m bars are supported") array = await client._get_bars( symbol, @@ -676,13 +689,14 @@ async def open_history_client( end_dt=end_dt, ) - times = array['time'] + times = array["time"] if end_dt is None: inow = round(time.time()) print( - f'difference in time between load and processing {inow - times[-1]}' + f"difference in time between load and processing" + f"{inow - times[-1]}" ) if (inow - times[-1]) > 60: @@ -691,7 +705,7 @@ async def open_history_client( start_dt = pendulum.from_timestamp(times[0]) end_dt = pendulum.from_timestamp(times[-1]) - log.info('History succesfully fetched baby') + log.info("History succesfully fetched baby") return array, start_dt, end_dt