Clean up broker code,

Add typecasting for messages/rt-data and historcal user trades
ensure we're fetching all history
add multi-symbol support
'
emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-19 13:11:33 -04:00
parent a4195fccc6
commit 61bb60a810
1 changed files with 213 additions and 111 deletions

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
@ -13,9 +16,10 @@
""" """
Kucoin broker backend Kucoin broker backend
""" """
from typing import Any, Optional, Literal, AsyncGenerator from typing import Any, Callable, Optional, Literal, AsyncGenerator
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
import time import time
@ -23,13 +27,11 @@ import math
import base64 import base64
import hmac import hmac
import hashlib import hashlib
import wsproto import wsproto
from uuid import uuid4 from uuid import uuid4
import asks import asks
import tractor import tractor
from tractor.trionics import maybe_open_context
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
@ -40,12 +42,14 @@ from piker._cacheables import open_cached_client
from piker.log import get_logger from piker.log import get_logger
from ._util import DataUnavailable from ._util import DataUnavailable
from piker.pp import config from piker.pp import config
from ..data.types import Struct
from ..data._web_bs import ( from ..data._web_bs import (
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
) )
log = get_logger(__name__) log = get_logger(__name__)
_ohlc_dtype = [ _ohlc_dtype = [
("index", int), ("index", int),
("time", int), ("time", int),
@ -58,24 +62,92 @@ _ohlc_dtype = [
] ]
def get_config() -> dict[str, Any]: def get_config() -> dict[str, dict]:
conf, path = config.load() conf, path = config.load()
section = conf.get("kucoin") section = conf.get("kucoin")
# TODO: document why we send this, basically because logging params for cryptofeed
conf["log"] = {}
conf["log"]["disabled"] = True
if section is None: if section is None:
log.warning("No config section found for kucoin in config") log.warning("No config section found for kucoin in config")
return section return section
class KucoinMktPair(Struct, frozen=True):
'''
Kucoin's pair format
'''
baseCurrency: str
baseIncrement: float
baseMaxSize: float
baseMinSize: float
enableTrading: bool
feeCurrency: str
isMarginEnabled: bool
market: str
minFunds: float
name: str
priceIncrement: float
priceLimitRate: float
quoteCurrency: str
quoteIncrement: float
quoteMaxSize: float
quoteMinSize: float
symbol: str
class AccountTrade(Struct, frozen=True):
'''
Historical trade format
'''
id: str
currency: str
amount: float
fee: float
balance: float
accountType: str
bizType: str
direction: Literal["in", "out"]
createdAt: float
context: list[str]
class AccountResponse(Struct, frozen=True):
currentPage: int
pageSize: int
totalNum: int
totalPage: int
items: list[AccountTrade]
class KucoinTrade(Struct, frozen=True):
'''
Real-time trade format
'''
bestAsk: float
bestAskSize: float
bestBid: float
bestBidSize: float
price: float
sequence: float
size: float
time: float
class KucoinTradeMsg(Struct, frozen=True):
type: str
topic: str
subject: str
data: list[KucoinTrade]
class Client: class Client:
def __init__(self) -> None: def __init__(self) -> None:
self._pairs: dict[str, any] = {} self._pairs: dict[str, KucoinMktPair] = {}
self._bars: list[list] = [] self._bars: list[list[float]] = []
self._key_id: str self._key_id: str
self._key_secret: str self._key_secret: str
self._key_passphrase: str self._key_passphrase: str
@ -84,7 +156,7 @@ class Client:
config = get_config() config = get_config()
if ( if (
("key_id" in config) float("key_id" in config)
and ("key_secret" in config) and ("key_secret" in config)
and ("key_passphrase" in config) and ("key_passphrase" in config)
): ):
@ -98,10 +170,12 @@ class Client:
action: Literal["POST", "GET"], action: Literal["POST", "GET"],
endpoint: str, endpoint: str,
api_v: str = "v2", api_v: str = "v2",
): ) -> dict[str, str]:
""" '''
Generate authenticated request headers
https://docs.kucoin.com/#authentication https://docs.kucoin.com/#authentication
"""
'''
now = int(time.time() * 1000) now = int(time.time() * 1000)
path = f"/api/{api_v}{endpoint}" path = f"/api/{api_v}{endpoint}"
str_to_sign = str(now) + action + path str_to_sign = str(now) + action + path
@ -127,6 +201,7 @@ class Client:
"KC-API-TIMESTAMP": str(now), "KC-API-TIMESTAMP": str(now),
"KC-API-KEY": self._key_id, "KC-API-KEY": self._key_id,
"KC-API-PASSPHRASE": passphrase, "KC-API-PASSPHRASE": passphrase,
# XXX: Even if using the v1 api - this stays the same
"KC-API-KEY-VERSION": "2", "KC-API-KEY-VERSION": "2",
} }
@ -136,7 +211,11 @@ class Client:
endpoint: str, endpoint: str,
api_v: str = "v2", api_v: str = "v2",
headers: dict = {}, headers: dict = {},
) -> Any: ) -> dict[str, Any]:
'''
Generic request wrapper for Kucoin API
'''
if self._authenticated: if self._authenticated:
headers = self._gen_auth_req_headers(action, endpoint, api_v) headers = self._gen_auth_req_headers(action, endpoint, api_v)
@ -146,43 +225,56 @@ class Client:
if "data" in res.json(): if "data" in res.json():
return res.json()["data"] return res.json()["data"]
else: else:
print(f'KUCOIN ERROR: {res.json()["msg"]}') log.error(f'Error making request to {api_url} -> {res.json()["msg"]}')
breakpoint()
async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None: async def _get_ws_token(
self,
private: bool = False
) -> tuple[str, int] | None:
'''
Fetch ws token needed for sub access
'''
token_type = "private" if private else "public" token_type = "private" if private else "public"
data = await self._request("POST", f"/bullet-{token_type}", "v1") data = await self._request("POST", f"/bullet-{token_type}", "v1")
if "token" in data: if "token" in data:
# return token and ping interval
ping_interval = data["instanceServers"][0]["pingInterval"] ping_interval = data["instanceServers"][0]["pingInterval"]
return data["token"], ping_interval return data["token"], ping_interval
else: else:
print(f'KUCOIN ERROR: {data.json()["msg"]}') log.error(
breakpoint() f'Error making request for Kucoin ws token -> {res.json()["msg"]}'
)
async def get_pairs( async def get_pairs(
self, self,
) -> dict[str, Any]: ) -> dict[str, KucoinMktPair]:
if self._pairs: if self._pairs:
return self._pairs return self._pairs
entries = await self._request("GET", "/symbols") entries = await self._request("GET", "/symbols")
syms = {item["name"]: item for item in entries} syms = {item["name"]: KucoinMktPair(**item) for item in entries}
return syms return syms
async def cache_pairs( async def cache_pairs(
self, self,
normalize: bool = True, normalize: bool = True,
) -> dict[str, any]: ) -> dict[str, KucoinMktPair]:
'''
Get cached pairs and convert keyed symbols into fqsns if ya want
'''
if not self._pairs: if not self._pairs:
self._pairs = await self.get_pairs() self._pairs = await self.get_pairs()
if normalize: if normalize:
self._pairs = self.normalize_pairs(self._pairs) self._pairs = self.normalize_pairs(self._pairs)
return self._pairs return self._pairs
def normalize_pairs(self, pairs: dict[str, any]) -> dict[str, any]: def normalize_pairs(
self, pairs: dict[str, KucoinMktPair]
) -> dict[str, KucoinMktPair]:
""" """
Map crypfeeds symbols to fqsn strings Map kucoin pairs to fqsn strings
""" """
norm_pairs = {} norm_pairs = {}
@ -197,14 +289,14 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = 30, limit: int = 30,
) -> dict[str, Any]: ) -> dict[str, KucoinMktPair]:
data = await self.get_pairs() data = await self.get_pairs()
matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit)
# repack in dict form # repack in dict form
return {item[0]["name"].lower(): item[0] for item in matches} return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches}
async def last_trades(self, sym: str): async def last_trades(self, sym: str) -> AccountResponse:
trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1") trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1")
return trades.items return trades.items
@ -216,20 +308,23 @@ class Client:
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
type: str = "1min", type: str = "1min",
): ) -> np.ndarray:
if len(self._bars): '''
return self._bars Get OHLC data and convert to numpy array for perffff
'''
# Generate generic end and start time if values not passed
if end_dt is None: if end_dt is None:
end_dt = pendulum.now("UTC").add(minutes=1) end_dt = pendulum.now("UTC").add(minutes=1)
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of("minute").subtract(minutes=limit) start_dt = end_dt.start_of("minute").subtract(minutes=limit)
# Format datetime to unix # Format datetime to unix timestamp
start_dt = math.trunc(time.mktime(start_dt.timetuple())) start_dt = math.trunc(time.mktime(start_dt.timetuple()))
end_dt = math.trunc(time.mktime(end_dt.timetuple())) end_dt = math.trunc(time.mktime(end_dt.timetuple()))
kucoin_sym = fqsn_to_cf_sym(fqsn, self._pairs) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}"
bars = await self._request( bars = await self._request(
@ -238,9 +333,9 @@ class Client:
api_v="v1", api_v="v1",
) )
# Map to OHLC values to dict then to np array
new_bars = [] new_bars = []
for i, bar in enumerate(bars[::-1]): for i, bar in enumerate(bars[::-1]):
# TODO: implement struct/typecasting/validation here
data = { data = {
"index": i, "index": i,
@ -256,14 +351,12 @@ class Client:
row = [] row = []
for j, (field_name, field_type) in enumerate(_ohlc_dtype): for j, (field_name, field_type) in enumerate(_ohlc_dtype):
value = data[field_name] value = data[field_name]
match field_name: match field_name:
case "index" | "time": case "index" | "time":
row.append(int(value)) row.append(int(value))
# case 'time':
# dt_from_unix_ts = datetime.utcfromtimestamp(int(value))
# # convert unix time to epoch seconds
# row.append(int(dt_from_unix_ts.timestamp()))
case _: case _:
row.append(float(value)) row.append(float(value))
@ -273,23 +366,31 @@ class Client:
return array return array
def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, any]) -> str: def fqsn_to_kucoin_sym(
fqsn: str,
pairs: dict[str, KucoinMktPair]
) -> str:
pair_data = pairs[fqsn] pair_data = pairs[fqsn]
return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"] return pair_data.baseCurrency + "-" + pair_data.quoteCurrency
def kucoin_sym_to_fqsn(sym: str) -> str:
return sym.lower().replace("-", "")
@acm @acm
async def get_client(): async def get_client() -> AsyncGenerator[Client, None]:
client = Client() client = Client()
# Do we need to open a nursery here?
await client.cache_pairs() await client.cache_pairs()
yield client yield client
@tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
): ) -> None:
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
# load all symbols locally for fast search # load all symbols locally for fast search
await client.cache_pairs() await client.cache_pairs()
@ -297,7 +398,6 @@ async def open_symbol_search(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for pattern in stream: async for pattern in stream:
# repack in dict form
await stream.send(await client.search_symbols(pattern)) await stream.send(await client.search_symbols(pattern))
@ -308,91 +408,93 @@ async def stream_quotes(
loglevel: str = None, loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
): ) -> None:
# TODO: Add multi-symbol functionality here '''
sym = symbols[0] Required piker api to stream real-time data.
Where the rubber hits the road baby
'''
connect_id = str(uuid4()) connect_id = str(uuid4())
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
token, ping_interval = await client._get_ws_token()
pairs = await client.cache_pairs() # map through symbols and sub to feedz
kucoin_sym = pairs[sym]["symbol"] for sym in symbols:
init_msgs = {
# pass back token, and bool, signalling if we're the writer token, ping_interval = await client._get_ws_token()
# and that history has been written pairs = await client.cache_pairs()
sym: { kucoin_sym = pairs[sym].symbol
"symbol_info": {
"asset_type": "crypto", init_msgs = {
"price_tick_size": 0.0005, # pass back token, and bool, signalling if we're the writer
"lot_tick_size": 0.1, # and that history has been written
sym: {
"symbol_info": {
"asset_type": "crypto",
"price_tick_size": 0.0005,
"lot_tick_size": 0.1,
},
"shm_write_opts": {"sum_tick_vml": False},
"fqsn": sym,
}, },
"shm_write_opts": {"sum_tick_vml": False}, }
"fqsn": sym,
},
}
last_trades = await client.last_trades(sym)
@acm
async def subscribe(ws: wsproto.WSConnection):
@acm @acm
async def open_ping_task(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
async with trio.open_nursery() as n: @acm
async def open_ping_task(ws: wsproto.WSConnection):
async with trio.open_nursery() as n:
async def ping_server(): async def ping_server():
while True: while True:
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
print("PINGING") await ws.send_msg({"id": connect_id, "type": "ping"})
await ws.send_msg({"id": connect_id, "type": "ping"})
n.start_soon(ping_server) n.start_soon(ping_server)
yield ws yield ws
n.cancel_scope.cancel() n.cancel_scope.cancel()
# Spawn the ping task here # Spawn the ping task here
async with open_ping_task(ws) as _ws: async with open_ping_task(ws) as ws:
# subscribe to market feedz here
l1_sub = make_sub(kucoin_sym, connect_id)
await ws.send_msg(l1_sub)
# subscribe to market feedz here yield
l1_sub = make_sub(kucoin_sym, connect_id)
await _ws.send_msg(l1_sub)
res = await _ws.recv_msg()
yield # unsub
if ws.connected():
await ws.send_msg(
{
"id": connect_id,
"type": "unsubscribe",
"topic": f"/market/ticker:{sym}",
"privateChannel": False,
"response": True,
}
)
# unsub async with open_autorecon_ws(
if _ws.connected(): f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]",
await _ws.send_msg( fixture=subscribe,
{ ) as ws:
"id": connect_id, msg_gen = stream_messages(ws, sym)
"type": "unsubscribe",
"topic": f"/market/ticker:{sym}",
"privateChannel": False,
"response": True,
}
)
async with open_autorecon_ws(
f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]",
fixture=subscribe,
) as ws:
msg_gen = stream_messages(ws, sym)
typ, quote = await msg_gen.__anext__()
#
while typ != "trade":
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
#
while typ != "trade":
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
async for typ, msg in msg_gen: async for typ, msg in msg_gen:
await send_chan.send({sym: msg}) await send_chan.send({sym: msg})
def make_sub(sym, connect_id): def make_sub(sym, connect_id) -> dict[str, str | bool]:
return { return {
"id": connect_id, "id": connect_id,
"type": "subscribe", "type": "subscribe",
@ -441,7 +543,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
async def open_history_client( async def open_history_client(
symbol: str, symbol: str,
type: str = "1m", type: str = "1m",
): ) -> AsyncGenerator[Callable, None]:
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
# call bars on kucoin # call bars on kucoin
async def get_ohlc_history( async def get_ohlc_history(