diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 55e73f5d..aaa35f34 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -29,6 +29,7 @@ import hmac import hashlib import time from functools import partial +from pprint import pformat from typing import ( Any, Callable, @@ -48,20 +49,22 @@ import numpy as np from piker.accounting._mktinfo import ( Asset, + digits_to_dec, MktPair, ) +from piker.data.validate import FeedInit from piker import config from piker._cacheables import ( open_cached_client, async_lifo_cache, ) from piker.log import get_logger -from ._util import DataUnavailable -from ..data.types import Struct -from ..data._web_bs import ( +from piker.data.types import Struct +from piker.data._web_bs import ( open_autorecon_ws, NoBsWs, ) +from ._util import DataUnavailable log = get_logger(__name__) @@ -88,7 +91,7 @@ class KucoinMktPair(Struct, frozen=True): @property def price_tick(self) -> Decimal: - return Decimal(str(self.self.baseIncrement)) + return Decimal(str(self.baseIncrement)) baseMaxSize: float baseMinSize: float @@ -118,7 +121,6 @@ class AccountTrade(Struct, frozen=True): https://docs.kucoin.com/#get-account-ledgers ''' - id: str currency: str amount: float @@ -136,7 +138,6 @@ class AccountResponse(Struct, frozen=True): https://docs.kucoin.com/#get-account-ledgers ''' - currentPage: int pageSize: int totalNum: int @@ -150,7 +151,6 @@ class KucoinTrade(Struct, frozen=True): https://docs.kucoin.com/#symbol-ticker ''' - bestAsk: float bestAskSize: float bestBid: float @@ -178,13 +178,32 @@ class KucoinMsg(Struct, frozen=True): Generic outer-wrapper for any Kucoin ws msg ''' - type: str topic: str subject: str data: list[KucoinTrade | KucoinL2] +class Currency(Struct, frozen=True): + ''' + Currency (asset) info: + https://docs.kucoin.com/#get-currencies + + ''' + currency: str + name: str + fullName: str + precision: int + confirms: int + contractAddress: str + withdrawalMinSize: str + withdrawalMinFee: str + isWithdrawEnabled: bool + isDepositEnabled: bool + isMarginEnabled: bool + isDebitEnabled: bool + + class BrokerConfig(Struct, frozen=True): key_id: str key_secret: str @@ -205,15 +224,17 @@ def get_config() -> BrokerConfig | None: class Client: def __init__(self) -> None: + self._config: BrokerConfig | None = get_config() self._pairs: dict[str, KucoinMktPair] = {} self._bars: list[list[float]] = [] - self._config: BrokerConfig | None = get_config() + self._currencies: dict[str, Currency] = {} def _gen_auth_req_headers( self, action: Literal['POST', 'GET'], endpoint: str, - api_v: str = 'v2', + api: str = 'v2', + ) -> dict[str, str | bytes]: ''' Generate authenticated request headers @@ -227,7 +248,7 @@ class Client: str_to_sign = ( str(int(time.time() * 1000)) - + action + f'/api/{api_v}{endpoint}' + + action + f'/api/{api}/{endpoint.lstrip("/")}' ) signature = base64.b64encode( @@ -259,7 +280,7 @@ class Client: self, action: Literal['POST', 'GET'], endpoint: str, - api_v: str = 'v2', + api: str = 'v2', headers: dict = {}, ) -> Any: ''' @@ -268,19 +289,24 @@ class Client: ''' if self._config: headers = self._gen_auth_req_headers( - action, endpoint, api_v) + action, + endpoint, + api, + ) - api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' + api_url = f'https://api.kucoin.com/api/{api}/{endpoint}' res = await asks.request(action, api_url, headers=headers) - if 'data' in res.json(): - return res.json()['data'] + json = res.json() + if 'data' in json: + return json['data'] else: log.error( - f'Error making request to {api_url} -> {res.json()["msg"]}' + f'Error making request to {api_url} ->\n' + f'{pformat(res)}' ) - return res.json()['msg'] + return json['msg'] async def _get_ws_token( self, @@ -296,7 +322,9 @@ class Client: token_type = 'private' if private else 'public' try: data: dict[str, Any] | None = await self._request( - 'POST', f'/bullet-{token_type}', 'v1' + 'POST', + endpoint=f'bullet-{token_type}', + api='v1' ) except Exception as e: log.error( @@ -313,10 +341,39 @@ class Client: f'{data.json()["msg"]}' ) + async def get_currencies( + self, + update: bool = False, + ) -> dict[str, Currency]: + ''' + Retrieve all "currency" info: + https://docs.kucoin.com/#get-currencies + + We use this for creating piker-interal ``Asset``s. + + ''' + if ( + not self._currencies + or update + ): + currencies: dict[str, Currency] = {} + entries: list[dict] = await self._request( + 'GET', + api='v1', + endpoint='currencies', + ) + for entry in entries: + curr = Currency(**entry).copy() + currencies[curr.name] = curr + + self._currencies.update(currencies) + + return self._currencies + async def _get_pairs( self, ) -> dict[str, KucoinMktPair]: - entries = await self._request('GET', '/symbols') + entries = await self._request('GET', 'symbols') syms = { item['name'].lower().replace('-', ''): KucoinMktPair(**item) for item in entries @@ -327,13 +384,18 @@ class Client: async def cache_pairs( self, + update: bool = False, + ) -> dict[str, KucoinMktPair]: ''' Get cached pairs and convert keyed symbols into fqsns if ya want ''' - if not self._pairs: - self._pairs = await self._get_pairs() + if ( + not self._pairs + or update + ): + self._pairs.update(await self._get_pairs()) return self._pairs @@ -341,7 +403,12 @@ class Client: self, pattern: str, limit: int = 30, + ) -> dict[str, KucoinMktPair]: + ''' + Use fuzzy search to match against all market names. + + ''' data = await self.cache_pairs() matches = fuzzy.extractBests( @@ -352,7 +419,9 @@ class Client: async def last_trades(self, sym: str) -> list[AccountTrade]: trades = await self._request( - 'GET', f'/accounts/ledgers?currency={sym}', 'v1' + 'GET', + endpoint=f'accounts/ledgers?currency={sym}', + api='v1' ) trades = AccountResponse(**trades) return trades.items @@ -360,11 +429,13 @@ class Client: async def _get_bars( self, fqsn: str, + start_dt: datetime | None = None, end_dt: datetime | None = None, limit: int = 1000, as_np: bool = True, type: str = '1min', + ) -> np.ndarray: ''' Get OHLC data and convert to numpy array for perffff: @@ -409,7 +480,7 @@ class Client: kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) url = ( - f'/market/candles?type={type}' + f'market/candles?type={type}' f'&symbol={kucoin_sym}' f'&startAt={start_dt}' f'&endAt={end_dt}' @@ -419,7 +490,7 @@ class Client: data: list[list[str]] | dict = await self._request( 'GET', url, - api_v='v1', + api='v1', ) if not isinstance(data, list): @@ -476,7 +547,10 @@ def fqsn_to_kucoin_sym( @acm async def get_client() -> AsyncGenerator[Client, None]: client = Client() - await client.cache_pairs() + + async with trio.open_nursery() as n: + n.start_soon(client.cache_pairs) + await client.get_currencies() yield client @@ -540,10 +614,24 @@ async def get_mkt_info( bs_mktid: str = pair.symbol # pair: KucoinMktPair = await client.pair_info(pair_str) + assets: dict[str, Currency] = client._currencies - # assets = client.assets - # dst_asset: Asset = assets[pair.base] - # src_asset: Asset = assets[pair.quote] + # TODO: maybe just do this processing in + # a .get_assets() method (see kraken)? + src: Currency = assets[pair.quoteCurrency] + src_asset = Asset( + name=src.name, + atype='crypto_currency', + tx_tick=digits_to_dec(src.precision), + info=src.to_dict(), + ) + dst: Currency = assets[pair.baseCurrency] + dst_asset = Asset( + name=dst.name, + atype='crypto_currency', + tx_tick=digits_to_dec(dst.precision), + info=dst.to_dict(), + ) mkt = MktPair( dst=dst_asset, @@ -573,30 +661,25 @@ async def stream_quotes( Where the rubber hits the road baby ''' + init_msgs: list[FeedInit] = [] + async with open_cached_client('kucoin') as client: - log.info('Starting up quote stream') - # loop through symbols and sub to feedz + log.info(f'Starting up quote stream(s) for {symbols}') for sym_str in symbols: mkt, pair = await get_mkt_info(sym_str) - - init_msgs = { - # pass back token, and bool, signalling if we're the - # writer and that history has been written - sym_str: { - 'symbol_info': { - 'asset_type': 'crypto', - 'price_tick_size': pair.baseIncrement, - 'lot_tick_size': pair.baseMinSize, + init_msgs.append( + FeedInit( + mkt_info=mkt, + shm_write_opts={ + 'sum_tick_vml': False, }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym_str, - } - } + ) + ) + ws: NoBsWs token, ping_interval = await client._get_ws_token() connect_id = str(uuid4()) - async with ( open_autorecon_ws( ( @@ -606,7 +689,7 @@ async def stream_quotes( fixture=partial( subscribe, connect_id=connect_id, - kucoin_sym=pair.sym, + bs_mktid=pair.symbol, ), ) as ws, open_ping_task(ws, ping_interval, connect_id), @@ -614,6 +697,7 @@ async def stream_quotes( aclosing(stream_messages(ws, sym_str)) as msg_gen, ): typ, quote = await anext(msg_gen) + while typ != 'trade': # take care to not unblock here until we get a real # trade quote