From ff22f2d240878c39dca0b01ea6e9e7067812f056 Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Mon, 27 Mar 2023 21:51:54 -0400 Subject: [PATCH] Format and ensure we're only grabbing the most closest bid and ask --- piker/brokers/kucoin.py | 293 ++++++++++++++++++++-------------------- 1 file changed, 143 insertions(+), 150 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 901b24c9..e42ed37d 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -13,10 +13,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Kucoin broker backend -""" +''' from random import randint from typing import Any, Callable, Optional, Literal, AsyncGenerator @@ -51,14 +51,14 @@ from ..data._web_bs import ( log = get_logger(__name__) _ohlc_dtype = [ - ("index", int), - ("time", int), - ("open", float), - ("high", float), - ("low", float), - ("close", float), - ("volume", float), - ("bar_wap", float), # will be zeroed by sampler if not filled + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('bar_wap', float), # will be zeroed by sampler if not filled ] @@ -67,6 +67,7 @@ class KucoinMktPair(Struct, frozen=True): Kucoin's pair format ''' + baseCurrency: str baseIncrement: float baseMaxSize: float @@ -91,6 +92,7 @@ class AccountTrade(Struct, frozen=True): Historical trade format ''' + id: str currency: str amount: float @@ -98,7 +100,7 @@ class AccountTrade(Struct, frozen=True): balance: float accountType: str bizType: str - direction: Literal["in", "out"] + direction: Literal['in', 'out'] createdAt: float context: list[str] @@ -116,6 +118,7 @@ class KucoinTrade(Struct, frozen=True): Real-time trade format ''' + bestAsk: float bestAskSize: float bestBid: float @@ -131,6 +134,7 @@ class KucoinL2(Struct, frozen=True): Real-time L2 order book format ''' + asks: list[list[float]] bids: list[list[float]] timestamp: float @@ -152,10 +156,10 @@ class BrokerConfig(Struct, frozen=True): def get_config() -> BrokerConfig | None: conf, path = config.load() - section = conf.get("kucoin") + section = conf.get('kucoin') if section is None: - log.warning("No config section found for kucoin in config") + log.warning('No config section found for kucoin in config') return None return BrokerConfig(**section) @@ -172,9 +176,7 @@ class Client: config: BrokerConfig | None = get_config() - if ( - config and config.key_id and config.key_secret and config.key_passphrase - ): + if config and config.key_id and config.key_secret and config.key_passphrase: self._authenticated = True self._key_id = config.key_id self._key_secret = config.key_secret @@ -182,9 +184,9 @@ class Client: def _gen_auth_req_headers( self, - action: Literal["POST", "GET"], + action: Literal['POST', 'GET'], endpoint: str, - api_v: str = "v2", + api_v: str = 'v2', ) -> dict[str, str | bytes]: ''' Generate authenticated request headers @@ -193,39 +195,39 @@ class Client: ''' breakpoint() now = int(time.time() * 1000) - path = f"/api/{api_v}{endpoint}" + path = f'/api/{api_v}{endpoint}' str_to_sign = str(now) + action + path signature = base64.b64encode( hmac.new( - self._key_secret.encode("utf-8"), - str_to_sign.encode("utf-8"), + self._key_secret.encode('utf-8'), + str_to_sign.encode('utf-8'), hashlib.sha256, ).digest() ) passphrase = base64.b64encode( hmac.new( - self._key_secret.encode("utf-8"), - self._key_passphrase.encode("utf-8"), + self._key_secret.encode('utf-8'), + self._key_passphrase.encode('utf-8'), hashlib.sha256, ).digest() ) return { - "KC-API-SIGN": signature, - "KC-API-TIMESTAMP": str(now), - "KC-API-KEY": self._key_id, - "KC-API-PASSPHRASE": passphrase, + 'KC-API-SIGN': signature, + 'KC-API-TIMESTAMP': str(now), + 'KC-API-KEY': self._key_id, + 'KC-API-PASSPHRASE': passphrase, # XXX: Even if using the v1 api - this stays the same - "KC-API-KEY-VERSION": "2", + 'KC-API-KEY-VERSION': '2', } async def _request( self, - action: Literal["POST", "GET"], + action: Literal['POST', 'GET'], endpoint: str, - api_v: str = "v2", + api_v: str = 'v2', headers: dict = {}, ) -> Any: ''' @@ -235,34 +237,29 @@ class Client: if self._authenticated: headers = self._gen_auth_req_headers(action, endpoint, api_v) - api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}" + api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' res = await asks.request(action, api_url, headers=headers) - if "data" in res.json(): - return res.json()["data"] + if 'data' in res.json(): + return res.json()['data'] else: log.error(f'Error making request to {api_url} -> {res.json()["msg"]}') - return res.json()["msg"] + return res.json()['msg'] - async def _get_ws_token( - self, - private: bool = False - ) -> tuple[str, int] | None: + async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None: ''' Fetch ws token needed for sub access ''' - token_type = "private" if private else "public" + token_type = 'private' if private else 'public' data: dict[str, Any] | None = await self._request( - "POST", - f"/bullet-{token_type}", - "v1" + 'POST', f'/bullet-{token_type}', 'v1' ) - if data and "token" in data: - ping_interval: int = data["instanceServers"][0]["pingInterval"] - return data["token"], ping_interval + if data and 'token' in data: + ping_interval: int = data['instanceServers'][0]['pingInterval'] + return data['token'], ping_interval elif data: log.error( f'Error making request for Kucoin ws token -> {data.json()["msg"]}' @@ -274,8 +271,8 @@ class Client: if self._pairs: return self._pairs - entries = await self._request("GET", "/symbols") - syms = {item["name"]: KucoinMktPair(**item) for item in entries} + entries = await self._request('GET', '/symbols') + syms = {item['name']: KucoinMktPair(**item) for item in entries} return syms async def cache_pairs( @@ -295,14 +292,14 @@ class Client: def _normalize_pairs( self, pairs: dict[str, KucoinMktPair] ) -> dict[str, KucoinMktPair]: - """ + ''' Map kucoin pairs to fqsn strings - """ + ''' norm_pairs = {} for key, value in pairs.items(): - fqsn = key.lower().replace("-", "") + fqsn = key.lower().replace('-', '') norm_pairs[fqsn] = value return norm_pairs @@ -319,7 +316,7 @@ class Client: return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches} async def last_trades(self, sym: str) -> list[AccountTrade]: - trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1") + trades = await self._request('GET', f'/accounts/ledgers?currency={sym}', 'v1') trades = AccountResponse(**trades) return trades.items @@ -330,7 +327,7 @@ class Client: end_dt: Optional[datetime] = None, limit: int = 1000, as_np: bool = True, - type: str = "1min", + type: str = '1min', ) -> np.ndarray: ''' Get OHLC data and convert to numpy array for perffff @@ -339,24 +336,24 @@ class Client: # Generate generic end and start time if values not passed # Currently gives us 12hrs of data if end_dt is None: - end_dt = pendulum.now("UTC").add(minutes=1) + end_dt = pendulum.now('UTC').add(minutes=1) if start_dt is None: - start_dt = end_dt.start_of("minute").subtract(minutes=limit) + start_dt = end_dt.start_of('minute').subtract(minutes=limit) start_dt = int(start_dt.timestamp()) end_dt = int(end_dt.timestamp()) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) - url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" + url = f'/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}' bars = [] for i in range(10): data = await self._request( - "GET", + 'GET', url, - api_v="v1", + api_v='v1', ) if not isinstance(data, list): @@ -371,28 +368,26 @@ class Client: # Map to OHLC values to dict then to np array new_bars = [] for i, bar in enumerate(bars[::-1]): - data = { - "index": i, - "time": bar[0], - "open": bar[1], - "close": bar[2], - "high": bar[3], - "low": bar[4], - "volume": bar[5], - "amount": bar[6], - "bar_wap": 0.0, + 'index': i, + 'time': bar[0], + 'open': bar[1], + 'close': bar[2], + 'high': bar[3], + 'low': bar[4], + 'volume': bar[5], + 'amount': bar[6], + 'bar_wap': 0.0, } row = [] for _, (field_name, field_type) in enumerate(_ohlc_dtype): - value = data[field_name] match field_name: - case "index": + case 'index': row.append(int(value)) - case "time": + case 'time': # row.append(int(value) + (3600 * 4)) row.append(value) case _: @@ -408,33 +403,28 @@ def kucoin_timestamp(dt: datetime): return math.trunc(time.mktime(dt.timetuple())) -def fqsn_to_kucoin_sym( - fqsn: str, - pairs: dict[str, KucoinMktPair] - -) -> str: +def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: pair_data = pairs[fqsn] - return pair_data.baseCurrency + "-" + pair_data.quoteCurrency + return pair_data.baseCurrency + '-' + pair_data.quoteCurrency def kucoin_sym_to_fqsn(sym: str) -> str: - return sym.lower().replace("-", "") + return sym.lower().replace('-', '') -@ acm +@acm async def get_client() -> AsyncGenerator[Client, None]: - client = Client() await client.cache_pairs() yield client -@ tractor.context +@tractor.context async def open_symbol_search( ctx: tractor.Context, ) -> None: - async with open_cached_client("kucoin") as client: + async with open_cached_client('kucoin') as client: # load all symbols locally for fast search await client.cache_pairs() await ctx.started() @@ -442,7 +432,7 @@ async def open_symbol_search( async with ctx.open_stream() as stream: async for pattern in stream: await stream.send(await client.search_symbols(pattern)) - log.info("Kucoin symbol search opened") + log.info('Kucoin symbol search opened') async def stream_quotes( @@ -460,11 +450,10 @@ async def stream_quotes( ''' connect_id = str(uuid4()) - async with open_cached_client("kucoin") as client: - log.info("Starting up quote stream") + async with open_cached_client('kucoin') as client: + log.info('Starting up quote stream') # loop through symbols and sub to feedz for sym in symbols: - token, ping_interval = await client._get_ws_token() pairs = await client.cache_pairs() kucoin_sym = pairs[sym].symbol @@ -473,19 +462,18 @@ async def stream_quotes( # pass back token, and bool, signalling if we're the writer # and that history has been written sym: { - "symbol_info": { - "asset_type": "crypto", - "price_tick_size": 0.0005, - "lot_tick_size": 0.1, + 'symbol_info': { + 'asset_type': 'crypto', + 'price_tick_size': 0.0005, + 'lot_tick_size': 0.1, }, - "shm_write_opts": {"sum_tick_vml": False}, - "fqsn": sym, + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, }, } @acm async def subscribe(ws: wsproto.WSConnection): - @acm async def open_ping_task(ws: wsproto.WSConnection): ''' @@ -499,7 +487,7 @@ async def stream_quotes( async def ping_server(): while True: await trio.sleep((ping_interval - 1000) / 1000) - await ws.send_msg({"id": connect_id, "type": "ping"}) + await ws.send_msg({'id': connect_id, 'type': 'ping'}) n.start_soon(ping_server) @@ -523,22 +511,22 @@ async def stream_quotes( log.info(f'Unsubscribing to {kucoin_sym} feed') await ws.send_msg( { - "id": connect_id, - "type": "unsubscribe", - "topic": f"/market/ticker:{sym}", - "privateChannel": False, - "response": True, + 'id': connect_id, + 'type': 'unsubscribe', + 'topic': f'/market/ticker:{sym}', + 'privateChannel': False, + 'response': True, } ) async with open_autorecon_ws( - f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]", + f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]', fixture=subscribe, ) as ws: msg_gen = stream_messages(ws, sym) typ, quote = await msg_gen.__anext__() - while typ != "trade": + while typ != 'trade': # TODO: use ``anext()`` when it lands in 3.10! typ, quote = await msg_gen.__anext__() @@ -553,29 +541,25 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]: match level: case 'l1': return { - "id": connect_id, - "type": "subscribe", - "topic": f"/spotMarket/level2Depth5:{sym}", - "privateChannel": False, - "response": True, + 'id': connect_id, + 'type': 'subscribe', + 'topic': f'/spotMarket/level2Depth5:{sym}', + 'privateChannel': False, + 'response': True, } case 'l3': return { - "id": connect_id, - "type": "subscribe", - "topic": f"/market/ticker:{sym}", - "privateChannel": False, - "response": True, + 'id': connect_id, + 'type': 'subscribe', + 'topic': f'/market/ticker:{sym}', + 'privateChannel': False, + 'response': True, } case _: return {} -async def stream_messages( - ws: NoBsWs, - sym: str -) -> AsyncGenerator[NoBsWs, dict]: - +async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 while True: @@ -584,67 +568,77 @@ async def stream_messages( if cs.cancelled_caught: timeouts += 1 if timeouts > 2: - log.error("kucoin feed is sh**ing the bed... rebooting...") + log.error('kucoin feed is sh**ing the bed... rebooting...') await ws._connect() continue - if msg.get("subject") != None: - + if msg.get('subject') != None: msg = KucoinMsg(**msg) match msg.subject: - case "trade.ticker": - + case 'trade.ticker': trade_data = KucoinTrade(**msg.data) - yield "trade", { - "symbol": sym, - "last": trade_data.price, - "brokerd_ts": trade_data.time, - "ticks": [ + yield 'trade', { + 'symbol': sym, + 'last': trade_data.price, + 'brokerd_ts': trade_data.time, + 'ticks': [ { - "type": "trade", - "price": float(trade_data.price), - "size": float(trade_data.size), - "broker_ts": trade_data.time, + 'type': 'trade', + 'price': float(trade_data.price), + 'size': float(trade_data.size), + 'broker_ts': trade_data.time, } ], } - case "level2": - + case 'level2': l2_data = KucoinL2(**msg.data) - - ticks = [] - for trade in l2_data.bids: - tick = {'type': 'bid', 'price': float(trade[0]), 'size': float(trade[1])} - ticks.append(tick) - for trade in l2_data.asks: - tick = {'type': 'ask', 'price': float(trade[0]), 'size': float(trade[1])} - ticks.append(tick) + first_ask = l2_data.asks[0] + first_bid = l2_data.bids[0] yield 'l1', { 'symbol': sym, - 'ticks': ticks, + 'ticks': [ + { + 'type': 'bid', + 'price': float(first_bid[0]), + 'size': float(first_bid[1]), + }, + { + 'type': 'bsize', + 'price': float(first_bid[0]), + 'size': float(first_bid[1]), + }, + { + 'type': 'ask', + 'price': float(first_ask[0]), + 'size': float(first_ask[1]), + }, + { + 'type': 'asize', + 'price': float(first_ask[0]), + 'size': float(first_ask[1]), + }, + ], } + @acm async def open_history_client( symbol: str, - type: str = "1m", + type: str = '1m', ) -> AsyncGenerator[Callable, None]: - async with open_cached_client("kucoin") as client: - log.info("Attempting to open kucoin history client") + async with open_cached_client('kucoin') as client: + log.info('Attempting to open kucoin history client') async def get_ohlc_history( - timeframe: float, end_dt: datetime | None = None, start_dt: datetime | None = None, - ) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end - if timeframe != 60: - raise DataUnavailable("Only 1m bars are supported") + raise DataUnavailable('Only 1m bars are supported') array = await client._get_bars( symbol, @@ -652,13 +646,12 @@ async def open_history_client( end_dt=end_dt, ) - times = array["time"] + times = array['time'] if end_dt is None: - inow = round(time.time()) print( - f"difference in time between load and processing {inow - times[-1]}" + f'difference in time between load and processing {inow - times[-1]}' ) if (inow - times[-1]) > 60: