kucoin: deliver `FeedInit` msgs on feed startup

To fit with the rest of the new requirements added in `.data.validate`
this adds `FeedInit` init including `MktPair` and `Asset` loading for
all spot currencies provided by `kucoin`.

Deatz:
- add a `Currency` struct and accompanying `Client.get_currencies()` for
  storing all asset infos.
- implement `.get_mkt_info()` which loads all necessary accounting and
  mkt meta-data structs including adding `.price/size_tick` fields to
  the `KucoinMktPair`.
- on client boot, async spawn requests to cache both symbols and currencies.
- pass `subscribe()` as the `fixture` arg to `open_autorecon_ws()`
  instead of opening it manually.

Other:
- tweak `Client._request` to not expect the prefixed `'/'` for the
  `endpoint: str`.
- change the `api_v` arg to just be `api: str`.
master
Tyler Goodlet 2023-05-09 18:17:50 -04:00
parent 80338e1ddd
commit f1f2ba2e02
1 changed files with 130 additions and 46 deletions

View File

@ -29,6 +29,7 @@ import hmac
import hashlib import hashlib
import time import time
from functools import partial from functools import partial
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -48,20 +49,22 @@ import numpy as np
from piker.accounting._mktinfo import ( from piker.accounting._mktinfo import (
Asset, Asset,
digits_to_dec,
MktPair, MktPair,
) )
from piker.data.validate import FeedInit
from piker import config from piker import config
from piker._cacheables import ( from piker._cacheables import (
open_cached_client, open_cached_client,
async_lifo_cache, async_lifo_cache,
) )
from piker.log import get_logger from piker.log import get_logger
from ._util import DataUnavailable from piker.data.types import Struct
from ..data.types import Struct from piker.data._web_bs import (
from ..data._web_bs import (
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
) )
from ._util import DataUnavailable
log = get_logger(__name__) log = get_logger(__name__)
@ -88,7 +91,7 @@ class KucoinMktPair(Struct, frozen=True):
@property @property
def price_tick(self) -> Decimal: def price_tick(self) -> Decimal:
return Decimal(str(self.self.baseIncrement)) return Decimal(str(self.baseIncrement))
baseMaxSize: float baseMaxSize: float
baseMinSize: float baseMinSize: float
@ -118,7 +121,6 @@ class AccountTrade(Struct, frozen=True):
https://docs.kucoin.com/#get-account-ledgers https://docs.kucoin.com/#get-account-ledgers
''' '''
id: str id: str
currency: str currency: str
amount: float amount: float
@ -136,7 +138,6 @@ class AccountResponse(Struct, frozen=True):
https://docs.kucoin.com/#get-account-ledgers https://docs.kucoin.com/#get-account-ledgers
''' '''
currentPage: int currentPage: int
pageSize: int pageSize: int
totalNum: int totalNum: int
@ -150,7 +151,6 @@ class KucoinTrade(Struct, frozen=True):
https://docs.kucoin.com/#symbol-ticker https://docs.kucoin.com/#symbol-ticker
''' '''
bestAsk: float bestAsk: float
bestAskSize: float bestAskSize: float
bestBid: float bestBid: float
@ -178,13 +178,32 @@ class KucoinMsg(Struct, frozen=True):
Generic outer-wrapper for any Kucoin ws msg Generic outer-wrapper for any Kucoin ws msg
''' '''
type: str type: str
topic: str topic: str
subject: str subject: str
data: list[KucoinTrade | KucoinL2] data: list[KucoinTrade | KucoinL2]
class Currency(Struct, frozen=True):
'''
Currency (asset) info:
https://docs.kucoin.com/#get-currencies
'''
currency: str
name: str
fullName: str
precision: int
confirms: int
contractAddress: str
withdrawalMinSize: str
withdrawalMinFee: str
isWithdrawEnabled: bool
isDepositEnabled: bool
isMarginEnabled: bool
isDebitEnabled: bool
class BrokerConfig(Struct, frozen=True): class BrokerConfig(Struct, frozen=True):
key_id: str key_id: str
key_secret: str key_secret: str
@ -205,15 +224,17 @@ def get_config() -> BrokerConfig | None:
class Client: class Client:
def __init__(self) -> None: def __init__(self) -> None:
self._config: BrokerConfig | None = get_config()
self._pairs: dict[str, KucoinMktPair] = {} self._pairs: dict[str, KucoinMktPair] = {}
self._bars: list[list[float]] = [] self._bars: list[list[float]] = []
self._config: BrokerConfig | None = get_config() self._currencies: dict[str, Currency] = {}
def _gen_auth_req_headers( def _gen_auth_req_headers(
self, self,
action: Literal['POST', 'GET'], action: Literal['POST', 'GET'],
endpoint: str, endpoint: str,
api_v: str = 'v2', api: str = 'v2',
) -> dict[str, str | bytes]: ) -> dict[str, str | bytes]:
''' '''
Generate authenticated request headers Generate authenticated request headers
@ -227,7 +248,7 @@ class Client:
str_to_sign = ( str_to_sign = (
str(int(time.time() * 1000)) str(int(time.time() * 1000))
+ action + f'/api/{api_v}{endpoint}' + action + f'/api/{api}/{endpoint.lstrip("/")}'
) )
signature = base64.b64encode( signature = base64.b64encode(
@ -259,7 +280,7 @@ class Client:
self, self,
action: Literal['POST', 'GET'], action: Literal['POST', 'GET'],
endpoint: str, endpoint: str,
api_v: str = 'v2', api: str = 'v2',
headers: dict = {}, headers: dict = {},
) -> Any: ) -> Any:
''' '''
@ -268,19 +289,24 @@ class Client:
''' '''
if self._config: if self._config:
headers = self._gen_auth_req_headers( headers = self._gen_auth_req_headers(
action, endpoint, api_v) action,
endpoint,
api,
)
api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' api_url = f'https://api.kucoin.com/api/{api}/{endpoint}'
res = await asks.request(action, api_url, headers=headers) res = await asks.request(action, api_url, headers=headers)
if 'data' in res.json(): json = res.json()
return res.json()['data'] if 'data' in json:
return json['data']
else: else:
log.error( log.error(
f'Error making request to {api_url} -> {res.json()["msg"]}' f'Error making request to {api_url} ->\n'
f'{pformat(res)}'
) )
return res.json()['msg'] return json['msg']
async def _get_ws_token( async def _get_ws_token(
self, self,
@ -296,7 +322,9 @@ class Client:
token_type = 'private' if private else 'public' token_type = 'private' if private else 'public'
try: try:
data: dict[str, Any] | None = await self._request( data: dict[str, Any] | None = await self._request(
'POST', f'/bullet-{token_type}', 'v1' 'POST',
endpoint=f'bullet-{token_type}',
api='v1'
) )
except Exception as e: except Exception as e:
log.error( log.error(
@ -313,10 +341,39 @@ class Client:
f'{data.json()["msg"]}' f'{data.json()["msg"]}'
) )
async def get_currencies(
self,
update: bool = False,
) -> dict[str, Currency]:
'''
Retrieve all "currency" info:
https://docs.kucoin.com/#get-currencies
We use this for creating piker-interal ``Asset``s.
'''
if (
not self._currencies
or update
):
currencies: dict[str, Currency] = {}
entries: list[dict] = await self._request(
'GET',
api='v1',
endpoint='currencies',
)
for entry in entries:
curr = Currency(**entry).copy()
currencies[curr.name] = curr
self._currencies.update(currencies)
return self._currencies
async def _get_pairs( async def _get_pairs(
self, self,
) -> dict[str, KucoinMktPair]: ) -> dict[str, KucoinMktPair]:
entries = await self._request('GET', '/symbols') entries = await self._request('GET', 'symbols')
syms = { syms = {
item['name'].lower().replace('-', ''): KucoinMktPair(**item) item['name'].lower().replace('-', ''): KucoinMktPair(**item)
for item in entries for item in entries
@ -327,13 +384,18 @@ class Client:
async def cache_pairs( async def cache_pairs(
self, self,
update: bool = False,
) -> dict[str, KucoinMktPair]: ) -> dict[str, KucoinMktPair]:
''' '''
Get cached pairs and convert keyed symbols into fqsns if ya want Get cached pairs and convert keyed symbols into fqsns if ya want
''' '''
if not self._pairs: if (
self._pairs = await self._get_pairs() not self._pairs
or update
):
self._pairs.update(await self._get_pairs())
return self._pairs return self._pairs
@ -341,7 +403,12 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = 30, limit: int = 30,
) -> dict[str, KucoinMktPair]: ) -> dict[str, KucoinMktPair]:
'''
Use fuzzy search to match against all market names.
'''
data = await self.cache_pairs() data = await self.cache_pairs()
matches = fuzzy.extractBests( matches = fuzzy.extractBests(
@ -352,7 +419,9 @@ class Client:
async def last_trades(self, sym: str) -> list[AccountTrade]: async def last_trades(self, sym: str) -> list[AccountTrade]:
trades = await self._request( trades = await self._request(
'GET', f'/accounts/ledgers?currency={sym}', 'v1' 'GET',
endpoint=f'accounts/ledgers?currency={sym}',
api='v1'
) )
trades = AccountResponse(**trades) trades = AccountResponse(**trades)
return trades.items return trades.items
@ -360,11 +429,13 @@ class Client:
async def _get_bars( async def _get_bars(
self, self,
fqsn: str, fqsn: str,
start_dt: datetime | None = None, start_dt: datetime | None = None,
end_dt: datetime | None = None, end_dt: datetime | None = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
type: str = '1min', type: str = '1min',
) -> np.ndarray: ) -> np.ndarray:
''' '''
Get OHLC data and convert to numpy array for perffff: Get OHLC data and convert to numpy array for perffff:
@ -409,7 +480,7 @@ class Client:
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
url = ( url = (
f'/market/candles?type={type}' f'market/candles?type={type}'
f'&symbol={kucoin_sym}' f'&symbol={kucoin_sym}'
f'&startAt={start_dt}' f'&startAt={start_dt}'
f'&endAt={end_dt}' f'&endAt={end_dt}'
@ -419,7 +490,7 @@ class Client:
data: list[list[str]] | dict = await self._request( data: list[list[str]] | dict = await self._request(
'GET', 'GET',
url, url,
api_v='v1', api='v1',
) )
if not isinstance(data, list): if not isinstance(data, list):
@ -476,7 +547,10 @@ def fqsn_to_kucoin_sym(
@acm @acm
async def get_client() -> AsyncGenerator[Client, None]: async def get_client() -> AsyncGenerator[Client, None]:
client = Client() client = Client()
await client.cache_pairs()
async with trio.open_nursery() as n:
n.start_soon(client.cache_pairs)
await client.get_currencies()
yield client yield client
@ -540,10 +614,24 @@ async def get_mkt_info(
bs_mktid: str = pair.symbol bs_mktid: str = pair.symbol
# pair: KucoinMktPair = await client.pair_info(pair_str) # pair: KucoinMktPair = await client.pair_info(pair_str)
assets: dict[str, Currency] = client._currencies
# assets = client.assets # TODO: maybe just do this processing in
# dst_asset: Asset = assets[pair.base] # a .get_assets() method (see kraken)?
# src_asset: Asset = assets[pair.quote] src: Currency = assets[pair.quoteCurrency]
src_asset = Asset(
name=src.name,
atype='crypto_currency',
tx_tick=digits_to_dec(src.precision),
info=src.to_dict(),
)
dst: Currency = assets[pair.baseCurrency]
dst_asset = Asset(
name=dst.name,
atype='crypto_currency',
tx_tick=digits_to_dec(dst.precision),
info=dst.to_dict(),
)
mkt = MktPair( mkt = MktPair(
dst=dst_asset, dst=dst_asset,
@ -573,30 +661,25 @@ async def stream_quotes(
Where the rubber hits the road baby Where the rubber hits the road baby
''' '''
init_msgs: list[FeedInit] = []
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
log.info('Starting up quote stream') log.info(f'Starting up quote stream(s) for {symbols}')
# loop through symbols and sub to feedz
for sym_str in symbols: for sym_str in symbols:
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append(
init_msgs = { FeedInit(
# pass back token, and bool, signalling if we're the mkt_info=mkt,
# writer and that history has been written shm_write_opts={
sym_str: { 'sum_tick_vml': False,
'symbol_info': {
'asset_type': 'crypto',
'price_tick_size': pair.baseIncrement,
'lot_tick_size': pair.baseMinSize,
}, },
'shm_write_opts': {'sum_tick_vml': False}, )
'fqsn': sym_str, )
}
}
ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4()) connect_id = str(uuid4())
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -606,7 +689,7 @@ async def stream_quotes(
fixture=partial( fixture=partial(
subscribe, subscribe,
connect_id=connect_id, connect_id=connect_id,
kucoin_sym=pair.sym, bs_mktid=pair.symbol,
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
@ -614,6 +697,7 @@ async def stream_quotes(
aclosing(stream_messages(ws, sym_str)) as msg_gen, aclosing(stream_messages(ws, sym_str)) as msg_gen,
): ):
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)
while typ != 'trade': while typ != 'trade':
# take care to not unblock here until we get a real # take care to not unblock here until we get a real
# trade quote # trade quote