kucoin: WIP moving to FeedInit API

marketstore_disable_snappy
Tyler Goodlet 2023-05-09 14:46:02 -04:00
parent f8c8f63e87
commit 80338e1ddd
1 changed files with 135 additions and 65 deletions

View File

@ -18,29 +18,45 @@ Kucoin broker backend
'''
from typing import Any, Callable, Literal, AsyncGenerator
from contextlib import asynccontextmanager as acm
from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from datetime import datetime
import time
from decimal import Decimal
import base64
import hmac
import hashlib
import time
from functools import partial
from typing import (
Any,
Callable,
Literal,
AsyncGenerator,
)
import wsproto
from uuid import uuid4
import asks
import tractor
import trio
from trio_util import trio_async_generator
from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy
import pendulum
import numpy as np
from piker._cacheables import open_cached_client
from piker.accounting._mktinfo import (
Asset,
MktPair,
)
from piker import config
from piker._cacheables import (
open_cached_client,
async_lifo_cache,
)
from piker.log import get_logger
from ._util import DataUnavailable
from piker.pp import config
from ..data.types import Struct
from ..data._web_bs import (
open_autorecon_ws,
@ -67,11 +83,20 @@ class KucoinMktPair(Struct, frozen=True):
https://docs.kucoin.com/#get-symbols-list
'''
baseCurrency: str
baseIncrement: float
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.self.baseIncrement))
baseMaxSize: float
baseMinSize: float
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.baseMinSize))
enableTrading: bool
feeCurrency: str
isMarginEnabled: bool
@ -84,7 +109,7 @@ class KucoinMktPair(Struct, frozen=True):
quoteIncrement: float
quoteMaxSize: float
quoteMinSize: float
symbol: str
symbol: str # our bs_mktid, kucoin's internal id
class AccountTrade(Struct, frozen=True):
@ -293,7 +318,7 @@ class Client:
) -> dict[str, KucoinMktPair]:
entries = await self._request('GET', '/symbols')
syms = {
kucoin_sym_to_fqsn(item['name']): KucoinMktPair(**item)
item['name'].lower().replace('-', ''): KucoinMktPair(**item)
for item in entries
}
@ -439,15 +464,15 @@ class Client:
return array
def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str:
def fqsn_to_kucoin_sym(
fqsn: str,
pairs: dict[str, KucoinMktPair],
) -> str:
pair_data = pairs[fqsn]
return pair_data.baseCurrency + '-' + pair_data.quoteCurrency
def kucoin_sym_to_fqsn(sym: str) -> str:
return sym.lower().replace('-', '')
@acm
async def get_client() -> AsyncGenerator[Client, None]:
client = Client()
@ -497,14 +522,51 @@ async def open_ping_task(
n.cancel_scope.cancel()
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, KucoinMktPair]:
'''
Query for and return a `MktPair` and `KucoinMktPair`.
'''
async with open_cached_client('kucoin') as client:
# split off any fqme broker part
bs_fqme, _, broker = fqme.partition('.')
pairs: dict[str, KucoinMktPair] = await client.cache_pairs()
pair: KucoinMktPair = pairs[bs_fqme]
bs_mktid: str = pair.symbol
# pair: KucoinMktPair = await client.pair_info(pair_str)
# assets = client.assets
# dst_asset: Asset = assets[pair.base]
# src_asset: Asset = assets[pair.quote]
mkt = MktPair(
dst=dst_asset,
src=src_asset,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=bs_mktid,
broker='kucoin',
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = '',
# startup sync
task_status: TaskStatus[tuple[dict, dict]
] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[
tuple[dict, dict]
] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Required piker api to stream real-time data.
@ -512,64 +574,71 @@ async def stream_quotes(
'''
async with open_cached_client('kucoin') as client:
log.info('Starting up quote stream')
# loop through symbols and sub to feedz
for sym_str in symbols:
mkt, pair = await get_mkt_info(sym_str)
init_msgs = {
# pass back token, and bool, signalling if we're the
# writer and that history has been written
sym_str: {
'symbol_info': {
'asset_type': 'crypto',
'price_tick_size': pair.baseIncrement,
'lot_tick_size': pair.baseMinSize,
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym_str,
}
}
token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4())
pairs = await client.cache_pairs()
ws_url = (
f'wss://ws-api-spot.kucoin.com/?'
f'token={token}&[connectId={connect_id}]'
)
# open ping task
async with (
open_autorecon_ws(ws_url) as ws,
open_autorecon_ws(
(
f'wss://ws-api-spot.kucoin.com/?'
f'token={token}&[connectId={connect_id}]'
),
fixture=partial(
subscribe,
connect_id=connect_id,
kucoin_sym=pair.sym,
),
) as ws,
open_ping_task(ws, ping_interval, connect_id),
# subscribe(ws, connect_id, kucoin_sym),
aclosing(stream_messages(ws, sym_str)) as msg_gen,
):
log.info('Starting up quote stream')
# loop through symbols and sub to feedz
for sym in symbols:
pair: KucoinMktPair = pairs[sym]
kucoin_sym = pair.symbol
typ, quote = await anext(msg_gen)
while typ != 'trade':
# take care to not unblock here until we get a real
# trade quote
typ, quote = await anext(msg_gen)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'crypto',
'price_tick_size': float(pair.baseIncrement),
'lot_tick_size': float(pair.baseMinSize),
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}
}
task_status.started((init_msgs, quote))
feed_is_live.set()
async with (
subscribe(ws, connect_id, kucoin_sym),
stream_messages(ws, sym) as msg_gen,
):
typ, quote = await anext(msg_gen)
while typ != 'trade':
# take care to not unblock here until we get a real
# trade quote
typ, quote = await anext(msg_gen)
task_status.started((init_msgs, quote))
feed_is_live.set()
async for typ, msg in msg_gen:
await send_chan.send({sym: msg})
async for typ, msg in msg_gen:
await send_chan.send({sym_str: msg})
@acm
async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator[None, None]:
async def subscribe(
ws: NoBsWs,
connect_id,
bs_mktid,
) -> AsyncGenerator[None, None]:
# level 2 sub
await ws.send_msg(
{
'id': connect_id,
'type': 'subscribe',
'topic': f'/spotMarket/level2Depth5:{sym}',
'topic': f'/spotMarket/level2Depth5:{bs_mktid}',
'privateChannel': False,
'response': True,
}
@ -580,7 +649,7 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator
{
'id': connect_id,
'type': 'subscribe',
'topic': f'/market/ticker:{sym}',
'topic': f'/market/ticker:{bs_mktid}',
'privateChannel': False,
'response': True,
}
@ -590,21 +659,22 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator
# unsub
if ws.connected():
log.info(f'Unsubscribing to {sym} feed')
log.info(f'Unsubscribing to {bs_mktid} feed')
await ws.send_msg(
{
'id': connect_id,
'type': 'unsubscribe',
'topic': f'/market/ticker:{sym}',
'topic': f'/market/ticker:{bs_mktid}',
'privateChannel': False,
'response': True,
}
)
@trio_async_generator
async def stream_messages(
ws: NoBsWs, sym: str
ws: NoBsWs,
sym: str,
) -> AsyncGenerator[tuple[str, dict], None]:
timeouts = 0
last_trade_ts = 0