From fcdddadec1aa8e4659a8c2ce44321152d4b9c37a Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Tue, 18 Apr 2023 10:42:30 -0400 Subject: [PATCH] Use singlequotes --- piker/brokers/kucoin.py | 308 ++++++++++++++++++++-------------------- 1 file changed, 154 insertions(+), 154 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index d326f376..d1c3c1c7 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -13,10 +13,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Kucoin broker backend -""" +''' from typing import Any, Callable, Literal, AsyncGenerator from contextlib import asynccontextmanager as acm @@ -50,23 +50,23 @@ from ..data._web_bs import ( log = get_logger(__name__) _ohlc_dtype = [ - ("index", int), - ("time", int), - ("open", float), - ("high", float), - ("low", float), - ("close", float), - ("volume", float), - ("bar_wap", float), # will be zeroed by sampler if not filled + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('bar_wap', float), # will be zeroed by sampler if not filled ] class KucoinMktPair(Struct, frozen=True): - """ + ''' Kucoin's pair format: https://docs.kucoin.com/#get-symbols-list - """ + ''' baseCurrency: str baseIncrement: float @@ -88,11 +88,11 @@ class KucoinMktPair(Struct, frozen=True): class AccountTrade(Struct, frozen=True): - """ + ''' Historical trade format: https://docs.kucoin.com/#get-account-ledgers - """ + ''' id: str currency: str @@ -101,16 +101,16 @@ class AccountTrade(Struct, frozen=True): balance: float accountType: str bizType: str - direction: Literal["in", "out"] + direction: Literal['in', 'out'] createdAt: float context: list[str] class AccountResponse(Struct, frozen=True): - """ + ''' https://docs.kucoin.com/#get-account-ledgers - """ + ''' currentPage: int pageSize: int @@ -120,11 +120,11 @@ class AccountResponse(Struct, frozen=True): class KucoinTrade(Struct, frozen=True): - """ + ''' Real-time trade format: https://docs.kucoin.com/#symbol-ticker - """ + ''' bestAsk: float bestAskSize: float @@ -137,11 +137,11 @@ class KucoinTrade(Struct, frozen=True): class KucoinL2(Struct, frozen=True): - """ + ''' Real-time L2 order book format: https://docs.kucoin.com/#level2-5-best-ask-bid-orders - """ + ''' asks: list[list[float]] bids: list[list[float]] @@ -149,10 +149,10 @@ class KucoinL2(Struct, frozen=True): class KucoinMsg(Struct, frozen=True): - """ + ''' Generic outer-wrapper for any Kucoin ws msg - """ + ''' type: str topic: str @@ -169,10 +169,10 @@ class BrokerConfig(Struct, frozen=True): def get_config() -> BrokerConfig | None: conf, _ = config.load() - section = conf.get("kucoin") + section = conf.get('kucoin') if section is None: - log.warning("No config section found for kucoin in config") + log.warning('No config section found for kucoin in config') return None return BrokerConfig(**section).copy() @@ -186,94 +186,94 @@ class Client: def _gen_auth_req_headers( self, - action: Literal["POST", "GET"], + action: Literal['POST', 'GET'], endpoint: str, - api_v: str = "v2", + api_v: str = 'v2', ) -> dict[str, str | bytes]: - """ + ''' Generate authenticated request headers https://docs.kucoin.com/#authentication - """ + ''' str_to_sign = ( - str(int(time.time() * 1000)) + action + f"/api/{api_v}{endpoint}" + str(int(time.time() * 1000)) + action + f'/api/{api_v}{endpoint}' ) signature = base64.b64encode( hmac.new( - self._config.key_secret.encode("utf-8"), - str_to_sign.encode("utf-8"), + self._config.key_secret.encode('utf-8'), + str_to_sign.encode('utf-8'), hashlib.sha256, ).digest() ) passphrase = base64.b64encode( hmac.new( - self._config.key_secret.encode("utf-8"), - self._config.key_passphrase.encode("utf-8"), + self._config.key_secret.encode('utf-8'), + self._config.key_passphrase.encode('utf-8'), hashlib.sha256, ).digest() ) return { - "KC-API-SIGN": signature, - "KC-API-TIMESTAMP": str(pendulum.now().int_timestamp * 1000), - "KC-API-KEY": self._config.key_id, - "KC-API-PASSPHRASE": passphrase, + 'KC-API-SIGN': signature, + 'KC-API-TIMESTAMP': str(pendulum.now().int_timestamp * 1000), + 'KC-API-KEY': self._config.key_id, + 'KC-API-PASSPHRASE': passphrase, # XXX: Even if using the v1 api - this stays the same - "KC-API-KEY-VERSION": "2", + 'KC-API-KEY-VERSION': '2', } async def _request( self, - action: Literal["POST", "GET"], + action: Literal['POST', 'GET'], endpoint: str, - api_v: str = "v2", + api_v: str = 'v2', headers: dict = {}, ) -> Any: - """ + ''' Generic request wrapper for Kucoin API - """ + ''' if self._config: headers = self._gen_auth_req_headers(action, endpoint, api_v) - api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}" + api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' res = await asks.request(action, api_url, headers=headers) - if "data" in res.json(): - return res.json()["data"] + if 'data' in res.json(): + return res.json()['data'] else: log.error( f'Error making request to {api_url} -> {res.json()["msg"]}' ) - return res.json()["msg"] + return res.json()['msg'] async def _get_ws_token( self, private: bool = False, ) -> tuple[str, int] | None: - """ + ''' Fetch ws token needed for sub access: https://docs.kucoin.com/#apply-connect-token returns a token and the interval we must ping the server at to keep the connection alive - """ - token_type = "private" if private else "public" + ''' + token_type = 'private' if private else 'public' try: data: dict[str, Any] | None = await self._request( - "POST", f"/bullet-{token_type}", "v1" + 'POST', f'/bullet-{token_type}', 'v1' ) except Exception as e: - log.error(f"Error making request for Kucoin ws token -> {str(e)}") + log.error(f'Error making request for Kucoin ws token -> {str(e)}') return None - if data and "token" in data: + if data and 'token' in data: # ping_interval is in ms - ping_interval: int = data["instanceServers"][0]["pingInterval"] - return data["token"], ping_interval + ping_interval: int = data['instanceServers'][0]['pingInterval'] + return data['token'], ping_interval elif data: log.error( 'Error making request for Kucoin ws token' @@ -283,22 +283,22 @@ class Client: async def _get_pairs( self, ) -> dict[str, KucoinMktPair]: - entries = await self._request("GET", "/symbols") + entries = await self._request('GET', '/symbols') syms = { - kucoin_sym_to_fqsn(item["name"]): KucoinMktPair(**item) + kucoin_sym_to_fqsn(item['name']): KucoinMktPair(**item) for item in entries } - log.info(f" {len(syms)} Kucoin market pairs fetched") + log.info(f' {len(syms)} Kucoin market pairs fetched') return syms async def cache_pairs( self, ) -> dict[str, KucoinMktPair]: - """ + ''' Get cached pairs and convert keyed symbols into fqsns if ya want - """ + ''' if not self._pairs: self._pairs = await self._get_pairs() @@ -319,7 +319,7 @@ class Client: async def last_trades(self, sym: str) -> list[AccountTrade]: trades = await self._request( - "GET", f"/accounts/ledgers?currency={sym}", "v1" + 'GET', f'/accounts/ledgers?currency={sym}', 'v1' ) trades = AccountResponse(**trades) return trades.items @@ -331,21 +331,21 @@ class Client: end_dt: datetime | None = None, limit: int = 1000, as_np: bool = True, - type: str = "1min", + type: str = '1min', ) -> np.ndarray: - """ + ''' Get OHLC data and convert to numpy array for perffff: https://docs.kucoin.com/#get-klines Kucoin bar data format: [ - "1545904980", //Start time of the candle cycle 0 - "0.058", //opening price 1 - "0.049", //closing price 2 - "0.058", //highest price 3 - "0.049", //lowest price 4 - "0.018", //Transaction volume 5 - "0.000945" //Transaction amount 6 + '1545904980', //Start time of the candle cycle 0 + '0.058', //opening price 1 + '0.049', //closing price 2 + '0.058', //highest price 3 + '0.049', //lowest price 4 + '0.018', //Transaction volume 5 + '0.000945' //Transaction amount 6 ], piker ohlc numpy array format: @@ -360,14 +360,14 @@ class Client: ('bar_wap', float), # will be zeroed by sampler if not filled ] - """ + ''' # Generate generic end and start time if values not passed # Currently gives us 12hrs of data if end_dt is None: - end_dt = pendulum.now("UTC").add(minutes=1) + end_dt = pendulum.now('UTC').add(minutes=1) if start_dt is None: - start_dt = end_dt.start_of("minute").subtract(minutes=limit) + start_dt = end_dt.start_of('minute').subtract(minutes=limit) start_dt = int(start_dt.timestamp()) end_dt = int(end_dt.timestamp()) @@ -375,24 +375,24 @@ class Client: kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) url = ( - f"/market/candles?type={type}" - f"&symbol={kucoin_sym}" - f"&startAt={start_dt}" - f"&endAt={end_dt}" + f'/market/candles?type={type}' + f'&symbol={kucoin_sym}' + f'&startAt={start_dt}' + f'&endAt={end_dt}' ) for i in range(10): data = await self._request( - "GET", + 'GET', url, - api_v="v1", + api_v='v1', ) if not isinstance(data, list): # Do a gradual backoff if Kucoin is rate limiting us backoff_interval = i log.warn( - f"History call failed, backing off for {backoff_interval}s" + f'History call failed, backing off for {backoff_interval}s' ) await trio.sleep(backoff_interval) else: @@ -431,11 +431,11 @@ class Client: def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: pair_data = pairs[fqsn] - return pair_data.baseCurrency + "-" + pair_data.quoteCurrency + return pair_data.baseCurrency + '-' + pair_data.quoteCurrency def kucoin_sym_to_fqsn(sym: str) -> str: - return sym.lower().replace("-", "") + return sym.lower().replace('-', '') @acm @@ -450,7 +450,7 @@ async def get_client() -> AsyncGenerator[Client, None]: async def open_symbol_search( ctx: tractor.Context, ) -> None: - async with open_cached_client("kucoin") as client: + async with open_cached_client('kucoin') as client: # load all symbols locally for fast search await client.cache_pairs() await ctx.started() @@ -458,25 +458,25 @@ async def open_symbol_search( async with ctx.open_stream() as stream: async for pattern in stream: await stream.send(await client.search_symbols(pattern)) - log.info("Kucoin symbol search opened") + log.info('Kucoin symbol search opened') @acm async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id): - """ + ''' Spawn a non-blocking task that pings the ws server every ping_interval so Kucoin doesn't drop our connection - """ + ''' async with trio.open_nursery() as n: # TODO: cache this task so it's only called once async def ping_server(): while True: await trio.sleep((ping_interval - 1000) / 1000) - await ws.send_msg({"id": connect_id, "type": "ping"}) + await ws.send_msg({'id': connect_id, 'type': 'ping'}) - log.info("Starting ping task for kucoin ws connection") + log.info('Starting ping task for kucoin ws connection') n.start_soon(ping_server) yield @@ -488,22 +488,22 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = "", + loglevel: str = '', # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - """ + ''' Required piker api to stream real-time data. Where the rubber hits the road baby - """ - async with open_cached_client("kucoin") as client: + ''' + async with open_cached_client('kucoin') as client: token, ping_interval = await client._get_ws_token() connect_id = str(uuid4()) pairs = await client.cache_pairs() ws_url = ( - f"wss://ws-api-spot.kucoin.com/?" - f"token={token}&[connectId={connect_id}]" + f'wss://ws-api-spot.kucoin.com/?' + f'token={token}&[connectId={connect_id}]' ) # open ping task @@ -511,7 +511,7 @@ async def stream_quotes( open_autorecon_ws(ws_url) as ws, open_ping_task(ws, ping_interval, connect_id), ): - log.info("Starting up quote stream") + log.info('Starting up quote stream') # loop through symbols and sub to feedz for sym in symbols: pair: KucoinMktPair = pairs[sym] @@ -521,13 +521,13 @@ async def stream_quotes( # pass back token, and bool, signalling if we're the writer # and that history has been written sym: { - "symbol_info": { - "asset_type": "crypto", - "price_tick_size": float(pair.baseIncrement), - "lot_tick_size": float(pair.baseMinSize), + 'symbol_info': { + 'asset_type': 'crypto', + 'price_tick_size': float(pair.baseIncrement), + 'lot_tick_size': float(pair.baseMinSize), }, - "shm_write_opts": {"sum_tick_vml": False}, - "fqsn": sym, + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, } } @@ -536,7 +536,7 @@ async def stream_quotes( stream_messages(ws, sym) as msg_gen, ): typ, quote = await anext(msg_gen) - while typ != "trade": + while typ != 'trade': # take care to not unblock here until we get a real # trade quote typ, quote = await anext(msg_gen) @@ -553,22 +553,22 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym): # level 2 sub await ws.send_msg( { - "id": connect_id, - "type": "subscribe", - "topic": f"/spotMarket/level2Depth5:{sym}", - "privateChannel": False, - "response": True, + 'id': connect_id, + 'type': 'subscribe', + 'topic': f'/spotMarket/level2Depth5:{sym}', + 'privateChannel': False, + 'response': True, } ) # watch trades await ws.send_msg( { - "id": connect_id, - "type": "subscribe", - "topic": f"/market/ticker:{sym}", - "privateChannel": False, - "response": True, + 'id': connect_id, + 'type': 'subscribe', + 'topic': f'/market/ticker:{sym}', + 'privateChannel': False, + 'response': True, } ) @@ -576,14 +576,14 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym): # unsub if ws.connected(): - log.info(f"Unsubscribing to {sym} feed") + log.info(f'Unsubscribing to {sym} feed') await ws.send_msg( { - "id": connect_id, - "type": "unsubscribe", - "topic": f"/market/ticker:{sym}", - "privateChannel": False, - "response": True, + 'id': connect_id, + 'type': 'unsubscribe', + 'topic': f'/market/ticker:{sym}', + 'privateChannel': False, + 'response': True, } ) @@ -601,15 +601,15 @@ async def stream_messages( if cs.cancelled_caught: timeouts += 1 if timeouts > 2: - log.error("kucoin feed is sh**ing the bed... rebooting...") + log.error('kucoin feed is sh**ing the bed... rebooting...') await ws._connect() continue - if msg.get("subject"): + if msg.get('subject'): msg = KucoinMsg(**msg) match msg.subject: - case "trade.ticker": + case 'trade.ticker': trade_data = KucoinTrade(**msg.data) # XXX: Filter for duplicate messages as ws feed will @@ -620,46 +620,46 @@ async def stream_messages( last_trade_ts = trade_data.time - yield "trade", { - "symbol": sym, - "last": trade_data.price, - "brokerd_ts": last_trade_ts, - "ticks": [ + yield 'trade', { + 'symbol': sym, + 'last': trade_data.price, + 'brokerd_ts': last_trade_ts, + 'ticks': [ { - "type": "trade", - "price": float(trade_data.price), - "size": float(trade_data.size), - "broker_ts": last_trade_ts, + 'type': 'trade', + 'price': float(trade_data.price), + 'size': float(trade_data.size), + 'broker_ts': last_trade_ts, } ], } - case "level2": + case 'level2': l2_data = KucoinL2(**msg.data) first_ask = l2_data.asks[0] first_bid = l2_data.bids[0] - yield "l1", { - "symbol": sym, - "ticks": [ + yield 'l1', { + 'symbol': sym, + 'ticks': [ { - "type": "bid", - "price": float(first_bid[0]), - "size": float(first_bid[1]), + 'type': 'bid', + 'price': float(first_bid[0]), + 'size': float(first_bid[1]), }, { - "type": "bsize", - "price": float(first_bid[0]), - "size": float(first_bid[1]), + 'type': 'bsize', + 'price': float(first_bid[0]), + 'size': float(first_bid[1]), }, { - "type": "ask", - "price": float(first_ask[0]), - "size": float(first_ask[1]), + 'type': 'ask', + 'price': float(first_ask[0]), + 'size': float(first_ask[1]), }, { - "type": "asize", - "price": float(first_ask[0]), - "size": float(first_ask[1]), + 'type': 'asize', + 'price': float(first_ask[0]), + 'size': float(first_ask[1]), }, ], } @@ -668,10 +668,10 @@ async def stream_messages( @acm async def open_history_client( symbol: str, - type: str = "1m", + type: str = '1m', ) -> AsyncGenerator[Callable, None]: - async with open_cached_client("kucoin") as client: - log.info("Attempting to open kucoin history client") + async with open_cached_client('kucoin') as client: + log.info('Attempting to open kucoin history client') async def get_ohlc_history( timeframe: float, @@ -681,7 +681,7 @@ async def open_history_client( np.ndarray, datetime | None, datetime | None ]: # start # end if timeframe != 60: - raise DataUnavailable("Only 1m bars are supported") + raise DataUnavailable('Only 1m bars are supported') array = await client._get_bars( symbol, @@ -689,14 +689,14 @@ async def open_history_client( end_dt=end_dt, ) - times = array["time"] + times = array['time'] if end_dt is None: inow = round(time.time()) print( - f"difference in time between load and processing" - f"{inow - times[-1]}" + f'difference in time between load and processing' + f'{inow - times[-1]}' ) if (inow - times[-1]) > 60: @@ -705,7 +705,7 @@ async def open_history_client( start_dt = pendulum.from_timestamp(times[0]) end_dt = pendulum.from_timestamp(times[-1]) - log.info("History succesfully fetched baby") + log.info('History succesfully fetched baby') return array, start_dt, end_dt