Compare commits

..

No commits in common. "9ea960303719b6731bff5ee349b30a284acfb740" and "b72f85433abfc800bd25c0609698ef4862c76e45" have entirely different histories.

2 changed files with 163 additions and 213 deletions

View File

@ -543,7 +543,7 @@ async def stream_quotes(
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
# signal to caller feed is ready for consumption # signal to caller feed is ready for consumption
feed_is_live.set() feed_is_live.set()

View File

@ -1,3 +1,4 @@
# piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for pikers) # Copyright (C) Jared Goldman (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
@ -13,10 +14,10 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
Kucoin broker backend Kucoin broker backend
''' """
from random import randint from random import randint
from typing import Any, Callable, Optional, Literal, AsyncGenerator from typing import Any, Callable, Optional, Literal, AsyncGenerator
@ -51,14 +52,14 @@ from ..data._web_bs import (
log = get_logger(__name__) log = get_logger(__name__)
_ohlc_dtype = [ _ohlc_dtype = [
('index', int), ("index", int),
('time', int), ("time", int),
('open', float), ("open", float),
('high', float), ("high", float),
('low', float), ("low", float),
('close', float), ("close", float),
('volume', float), ("volume", float),
('bar_wap', float), # will be zeroed by sampler if not filled ("bar_wap", float), # will be zeroed by sampler if not filled
] ]
@ -67,7 +68,6 @@ class KucoinMktPair(Struct, frozen=True):
Kucoin's pair format Kucoin's pair format
''' '''
baseCurrency: str baseCurrency: str
baseIncrement: float baseIncrement: float
baseMaxSize: float baseMaxSize: float
@ -92,7 +92,6 @@ class AccountTrade(Struct, frozen=True):
Historical trade format Historical trade format
''' '''
id: str id: str
currency: str currency: str
amount: float amount: float
@ -100,7 +99,7 @@ class AccountTrade(Struct, frozen=True):
balance: float balance: float
accountType: str accountType: str
bizType: str bizType: str
direction: Literal['in', 'out'] direction: Literal["in", "out"]
createdAt: float createdAt: float
context: list[str] context: list[str]
@ -118,7 +117,6 @@ class KucoinTrade(Struct, frozen=True):
Real-time trade format Real-time trade format
''' '''
bestAsk: float bestAsk: float
bestAskSize: float bestAskSize: float
bestBid: float bestBid: float
@ -129,37 +127,26 @@ class KucoinTrade(Struct, frozen=True):
time: float time: float
class KucoinL2(Struct, frozen=True):
'''
Real-time L2 order book format
'''
asks: list[list[float]]
bids: list[list[float]]
timestamp: float
class KucoinMsg(Struct, frozen=True):
type: str
topic: str
subject: str
data: list[KucoinTrade | KucoinL2]
class BrokerConfig(Struct, frozen=True): class BrokerConfig(Struct, frozen=True):
key_id: str key_id: str
key_secret: str key_secret: str
key_passphrase: str key_passphrase: str
class KucoinTradeMsg(Struct, frozen=True):
type: str
topic: str
subject: str
data: list[KucoinTrade]
def get_config() -> BrokerConfig | None: def get_config() -> BrokerConfig | None:
conf, path = config.load() conf, path = config.load()
section = conf.get('kucoin') section = conf.get("kucoin")
if section is None: if section is None:
log.warning('No config section found for kucoin in config') log.warning("No config section found for kucoin in config")
return None return None
return BrokerConfig(**section) return BrokerConfig(**section)
@ -176,18 +163,19 @@ class Client:
config: BrokerConfig | None = get_config() config: BrokerConfig | None = get_config()
if config and config.key_id and config.key_secret and config.key_passphrase: if (
config and float(config.key_id) and config.key_secret and config.key_passphrase
):
self._authenticated = True self._authenticated = True
self._key_id = config.key_id self._key_id = config.key_id
self._key_secret = config.key_secret self._key_secret = config.key_secret
self._key_passphrase = config.key_passphrase self._key_passphrase = config.key_passphrase
log.info('User credentials added')
def _gen_auth_req_headers( def _gen_auth_req_headers(
self, self,
action: Literal['POST', 'GET'], action: Literal["POST", "GET"],
endpoint: str, endpoint: str,
api_v: str = 'v2', api_v: str = "v2",
) -> dict[str, str | bytes]: ) -> dict[str, str | bytes]:
''' '''
Generate authenticated request headers Generate authenticated request headers
@ -195,39 +183,39 @@ class Client:
''' '''
now = int(time.time() * 1000) now = int(time.time() * 1000)
path = f'/api/{api_v}{endpoint}' path = f"/api/{api_v}{endpoint}"
str_to_sign = str(now) + action + path str_to_sign = str(now) + action + path
signature = base64.b64encode( signature = base64.b64encode(
hmac.new( hmac.new(
self._key_secret.encode('utf-8'), self._key_secret.encode("utf-8"),
str_to_sign.encode('utf-8'), str_to_sign.encode("utf-8"),
hashlib.sha256, hashlib.sha256,
).digest() ).digest()
) )
passphrase = base64.b64encode( passphrase = base64.b64encode(
hmac.new( hmac.new(
self._key_secret.encode('utf-8'), self._key_secret.encode("utf-8"),
self._key_passphrase.encode('utf-8'), self._key_passphrase.encode("utf-8"),
hashlib.sha256, hashlib.sha256,
).digest() ).digest()
) )
return { return {
'KC-API-SIGN': signature, "KC-API-SIGN": signature,
'KC-API-TIMESTAMP': str(now), "KC-API-TIMESTAMP": str(now),
'KC-API-KEY': self._key_id, "KC-API-KEY": self._key_id,
'KC-API-PASSPHRASE': passphrase, "KC-API-PASSPHRASE": passphrase,
# XXX: Even if using the v1 api - this stays the same # XXX: Even if using the v1 api - this stays the same
'KC-API-KEY-VERSION': '2', "KC-API-KEY-VERSION": "2",
} }
async def _request( async def _request(
self, self,
action: Literal['POST', 'GET'], action: Literal["POST", "GET"],
endpoint: str, endpoint: str,
api_v: str = 'v2', api_v: str = "v2",
headers: dict = {}, headers: dict = {},
) -> Any: ) -> Any:
''' '''
@ -237,29 +225,34 @@ class Client:
if self._authenticated: if self._authenticated:
headers = self._gen_auth_req_headers(action, endpoint, api_v) headers = self._gen_auth_req_headers(action, endpoint, api_v)
api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}"
res = await asks.request(action, api_url, headers=headers) res = await asks.request(action, api_url, headers=headers)
if 'data' in res.json(): if "data" in res.json():
return res.json()['data'] return res.json()["data"]
else: else:
log.error(f'Error making request to {api_url} -> {res.json()["msg"]}') log.error(f'Error making request to {api_url} -> {res.json()["msg"]}')
return res.json()['msg'] return res.json()["msg"]
async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None: async def _get_ws_token(
self,
private: bool = False
) -> tuple[str, int] | None:
''' '''
Fetch ws token needed for sub access Fetch ws token needed for sub access
''' '''
token_type = 'private' if private else 'public' token_type = "private" if private else "public"
data: dict[str, Any] | None = await self._request( data: dict[str, Any] | None = await self._request(
'POST', f'/bullet-{token_type}', 'v1' "POST",
f"/bullet-{token_type}",
"v1"
) )
if data and 'token' in data: if data and "token" in data:
ping_interval: int = data['instanceServers'][0]['pingInterval'] ping_interval: int = data["instanceServers"][0]["pingInterval"]
return data['token'], ping_interval return data["token"], ping_interval
elif data: elif data:
log.error( log.error(
f'Error making request for Kucoin ws token -> {data.json()["msg"]}' f'Error making request for Kucoin ws token -> {data.json()["msg"]}'
@ -271,10 +264,8 @@ class Client:
if self._pairs: if self._pairs:
return self._pairs return self._pairs
entries = await self._request('GET', '/symbols') entries = await self._request("GET", "/symbols")
syms = {item['name']: KucoinMktPair(**item) for item in entries} syms = {item["name"]: KucoinMktPair(**item) for item in entries}
log.info('Kucoin market pairs fetches')
return syms return syms
async def cache_pairs( async def cache_pairs(
@ -294,14 +285,14 @@ class Client:
def _normalize_pairs( def _normalize_pairs(
self, pairs: dict[str, KucoinMktPair] self, pairs: dict[str, KucoinMktPair]
) -> dict[str, KucoinMktPair]: ) -> dict[str, KucoinMktPair]:
''' """
Map kucoin pairs to fqsn strings Map kucoin pairs to fqsn strings
''' """
norm_pairs = {} norm_pairs = {}
for key, value in pairs.items(): for key, value in pairs.items():
fqsn = key.lower().replace('-', '') fqsn = key.lower().replace("-", "")
norm_pairs[fqsn] = value norm_pairs[fqsn] = value
return norm_pairs return norm_pairs
@ -318,7 +309,7 @@ class Client:
return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches} return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches}
async def last_trades(self, sym: str) -> list[AccountTrade]: async def last_trades(self, sym: str) -> list[AccountTrade]:
trades = await self._request('GET', f'/accounts/ledgers?currency={sym}', 'v1') trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1")
trades = AccountResponse(**trades) trades = AccountResponse(**trades)
return trades.items return trades.items
@ -329,38 +320,38 @@ class Client:
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
type: str = '1min', type: str = "1min",
) -> np.ndarray: ) -> np.ndarray:
''' '''
Get OHLC data and convert to numpy array for perffff Get OHLC data and convert to numpy array for perffff
''' '''
# Generate generic end and start time if values not passed # Generate generic end and start time if values not passed
# Currently gives us 12hrs of data
if end_dt is None: if end_dt is None:
end_dt = pendulum.now('UTC').add(minutes=1) end_dt = pendulum.now("UTC").add(minutes=1)
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of('minute').subtract(minutes=limit) start_dt = end_dt.start_of("minute").subtract(minutes=limit)
start_dt = int(start_dt.timestamp())
end_dt = int(end_dt.timestamp())
# Format datetime to unix timestamp
start_dt = math.trunc(time.mktime(start_dt.timetuple()))
end_dt = math.trunc(time.mktime(end_dt.timetuple()))
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
url = f'/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}' url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}"
bars = [] bars = []
for i in range(10): for i in range(10):
data = await self._request( data = await self._request(
'GET', "GET",
url, url,
api_v='v1', api_v="v1",
) )
if not isinstance(data, list): if not isinstance(data, list):
# Do a gradual backoff if Kucoin is rate limiting us # Do a gradual backoff if Kucoin is rate limiting us
backoff_interval = i backoff_interval = i + (randint(0, 1000) / 1000)
log.warn(f'History call failed, backing off for {backoff_interval}s') log.warn(f'History call failed, backing off for {backoff_interval}s')
await trio.sleep(backoff_interval) await trio.sleep(backoff_interval)
else: else:
@ -370,58 +361,64 @@ class Client:
# Map to OHLC values to dict then to np array # Map to OHLC values to dict then to np array
new_bars = [] new_bars = []
for i, bar in enumerate(bars[::-1]): for i, bar in enumerate(bars[::-1]):
data = { data = {
'index': i, "index": i,
'time': bar[0], "time": bar[0],
'open': bar[1], "open": bar[1],
'close': bar[2], "close": bar[2],
'high': bar[3], "high": bar[3],
'low': bar[4], "low": bar[4],
'volume': bar[5], "volume": bar[5],
'amount': bar[6], "amount": bar[6],
'bar_wap': 0.0, "bar_wap": 0.0,
} }
row = [] row = []
for _, (field_name, field_type) in enumerate(_ohlc_dtype): for j, (field_name, field_type) in enumerate(_ohlc_dtype):
value = data[field_name] value = data[field_name]
match field_name: match field_name:
case 'index': case "index" | "time":
row.append(int(value)) row.append(int(value))
case 'time':
row.append(value)
case _: case _:
row.append(float(value)) row.append(float(value))
new_bars.append(tuple(row)) new_bars.append(tuple(row))
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array return array
def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: def fqsn_to_kucoin_sym(
fqsn: str,
pairs: dict[str, KucoinMktPair]
) -> str:
pair_data = pairs[fqsn] pair_data = pairs[fqsn]
return pair_data.baseCurrency + '-' + pair_data.quoteCurrency return pair_data.baseCurrency + "-" + pair_data.quoteCurrency
def kucoin_sym_to_fqsn(sym: str) -> str: def kucoin_sym_to_fqsn(sym: str) -> str:
return sym.lower().replace('-', '') return sym.lower().replace("-", "")
@acm @ acm
async def get_client() -> AsyncGenerator[Client, None]: async def get_client() -> AsyncGenerator[Client, None]:
client = Client() client = Client()
await client.cache_pairs() await client.cache_pairs()
yield client yield client
@tractor.context @ tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
async with open_cached_client('kucoin') as client: async with open_cached_client("kucoin") as client:
# load all symbols locally for fast search # load all symbols locally for fast search
await client.cache_pairs() await client.cache_pairs()
await ctx.started() await ctx.started()
@ -429,7 +426,6 @@ async def open_symbol_search(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for pattern in stream: async for pattern in stream:
await stream.send(await client.search_symbols(pattern)) await stream.send(await client.search_symbols(pattern))
log.info('Kucoin symbol search opened')
async def stream_quotes( async def stream_quotes(
@ -447,10 +443,11 @@ async def stream_quotes(
''' '''
connect_id = str(uuid4()) connect_id = str(uuid4())
async with open_cached_client('kucoin') as client: async with open_cached_client("kucoin") as client:
log.info('Starting up quote stream')
# loop through symbols and sub to feedz # loop through symbols and sub to feedz
for sym in symbols: for sym in symbols:
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
pairs = await client.cache_pairs() pairs = await client.cache_pairs()
kucoin_sym = pairs[sym].symbol kucoin_sym = pairs[sym].symbol
@ -459,18 +456,19 @@ async def stream_quotes(
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
sym: { sym: {
'symbol_info': { "symbol_info": {
'asset_type': 'crypto', "asset_type": "crypto",
'price_tick_size': 0.0005, "price_tick_size": 0.0005,
'lot_tick_size': 0.1, "lot_tick_size": 0.1,
}, },
'shm_write_opts': {'sum_tick_vml': False}, "shm_write_opts": {"sum_tick_vml": False},
'fqsn': sym, "fqsn": sym,
}, },
} }
@acm @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
@acm @acm
async def open_ping_task(ws: wsproto.WSConnection): async def open_ping_task(ws: wsproto.WSConnection):
''' '''
@ -480,13 +478,12 @@ async def stream_quotes(
''' '''
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# TODO: cache this task so it's only called once
async def ping_server(): async def ping_server():
while True: while True:
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({"id": connect_id, "type": "ping"})
log.info(f'Starting ping task for {sym}')
n.start_soon(ping_server) n.start_soon(ping_server)
yield ws yield ws
@ -495,37 +492,32 @@ async def stream_quotes(
# Spawn the ping task here # Spawn the ping task here
async with open_ping_task(ws) as ws: async with open_ping_task(ws) as ws:
tasks = [] # subscribe to market feedz here
tasks.append(make_sub(kucoin_sym, connect_id, level='l3')) l1_sub = make_sub(kucoin_sym, connect_id)
tasks.append(make_sub(kucoin_sym, connect_id, level='l1')) await ws.send_msg(l1_sub)
for task in tasks:
log.info(f'Subscribing to {task["topic"]} feed for {sym}')
await ws.send_msg(task)
yield yield
# unsub # unsub
if ws.connected(): if ws.connected():
log.info(f'Unsubscribing to {kucoin_sym} feed')
await ws.send_msg( await ws.send_msg(
{ {
'id': connect_id, "id": connect_id,
'type': 'unsubscribe', "type": "unsubscribe",
'topic': f'/market/ticker:{sym}', "topic": f"/market/ticker:{sym}",
'privateChannel': False, "privateChannel": False,
'response': True, "response": True,
} }
) )
async with open_autorecon_ws( async with open_autorecon_ws(
f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]', f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]",
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws:
msg_gen = stream_messages(ws, sym) msg_gen = stream_messages(ws, sym)
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
#
while typ != 'trade': while typ != "trade":
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
@ -536,30 +528,22 @@ async def stream_quotes(
await send_chan.send({sym: msg}) await send_chan.send({sym: msg})
def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]: def make_sub(sym, connect_id) -> dict[str, str | bool]:
match level: return {
case 'l1': "id": connect_id,
return { "type": "subscribe",
'id': connect_id, "topic": f"/market/ticker:{sym}",
'type': 'subscribe', "privateChannel": False,
'topic': f'/spotMarket/level2Depth5:{sym}', "response": True,
'privateChannel': False, }
'response': True,
}
case 'l3':
return {
'id': connect_id,
'type': 'subscribe',
'topic': f'/market/ticker:{sym}',
'privateChannel': False,
'response': True,
}
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: async def stream_messages(
ws: NoBsWs,
sym: str
) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
last_ts: int = 0
while True: while True:
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
@ -567,84 +551,52 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
if cs.cancelled_caught: if cs.cancelled_caught:
timeouts += 1 timeouts += 1
if timeouts > 2: if timeouts > 2:
log.error('kucoin feed is sh**ing the bed... rebooting...') log.error("kucoin feed is sh**ing the bed... rebooting...")
await ws._connect() await ws._connect()
continue continue
if msg.get('subject'): if "subject" in msg and msg["subject"] == "trade.ticker":
msg = KucoinMsg(**msg)
match msg.subject: trade_msg = KucoinTradeMsg(**msg)
case 'trade.ticker': trade_data = KucoinTrade(**trade_msg.data)
trade_data = KucoinTrade(**msg.data)
ts = trade_data.time
if ts <= last_ts:
continue
last_ts = ts yield "trade", {
yield 'trade', { "symbol": sym,
'symbol': sym, "last": trade_data.price,
'last': trade_data.price, "brokerd_ts": trade_data.time,
'brokerd_ts': trade_data.time, "ticks": [
'ticks': [ {
{ "type": "trade",
'type': 'trade', "price": float(trade_data.price),
'price': float(trade_data.price), "size": float(trade_data.size),
'size': float(trade_data.size), "broker_ts": trade_data.time,
'broker_ts': ts,
}
],
} }
],
}
case 'level2': else:
l2_data = KucoinL2(**msg.data) continue
first_ask = l2_data.asks[0]
first_bid = l2_data.bids[0]
yield 'l1', {
'symbol': sym,
'ticks': [
{
'type': 'bid',
'price': float(first_bid[0]),
'size': float(first_bid[1]),
},
{
'type': 'bsize',
'price': float(first_bid[0]),
'size': float(first_bid[1]),
},
{
'type': 'ask',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
{
'type': 'asize',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
],
}
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, symbol: str,
type: str = '1m', type: str = "1m",
) -> AsyncGenerator[Callable, None]: ) -> AsyncGenerator[Callable, None]:
async with open_cached_client('kucoin') as client: async with open_cached_client("kucoin") as client:
log.info("Attempting to open kucoin history client")
log.info('Attempting to open kucoin history client')
async def get_ohlc_history( async def get_ohlc_history(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end ) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end
if timeframe != 60: if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported') raise DataUnavailable("Only 1m bars are supported")
array = await client._get_bars( array = await client._get_bars(
symbol, symbol,
@ -652,13 +604,14 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
times = array['time'] times = array["time"]
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
print( print(
f'difference in time between load and processing {inow - times[-1]}' f"difference in time between load and processing {inow - times[-1]}"
) )
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
@ -666,9 +619,6 @@ async def open_history_client(
start_dt = pendulum.from_timestamp(times[0]) start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1]) end_dt = pendulum.from_timestamp(times[-1])
log.info('History succesfully fetched baby')
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc_history, {} yield get_ohlc_history, {"erlangs": 3, "rate": 3}