Compare commits
10 Commits
b72f85433a
...
9ea9603037
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 9ea9603037 | |
jaredgoldman | f9d7892d69 | |
jaredgoldman | a7e668f852 | |
jaredgoldman | 8af5fe3c64 | |
jaredgoldman | ff22f2d240 | |
jaredgoldman | 480b8c591a | |
jaredgoldman | 79956abc5e | |
jaredgoldman | 3b1078bcba | |
jaredgoldman | 95d127cfe0 | |
jaredgoldman | 75eadd58f4 |
|
@ -543,7 +543,7 @@ async def stream_quotes(
|
||||||
# TODO: use ``anext()`` when it lands in 3.10!
|
# TODO: use ``anext()`` when it lands in 3.10!
|
||||||
typ, quote = await msg_gen.__anext__()
|
typ, quote = await msg_gen.__anext__()
|
||||||
|
|
||||||
task_status.started((init_msgs, quote))
|
task_status.started((init_msgs, quote))
|
||||||
|
|
||||||
# signal to caller feed is ready for consumption
|
# signal to caller feed is ready for consumption
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# piker: trading gear for hackers
|
|
||||||
# Copyright (C) Jared Goldman (in stewardship for pikers)
|
# Copyright (C) Jared Goldman (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
@ -14,10 +13,10 @@
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
Kucoin broker backend
|
Kucoin broker backend
|
||||||
|
|
||||||
"""
|
'''
|
||||||
|
|
||||||
from random import randint
|
from random import randint
|
||||||
from typing import Any, Callable, Optional, Literal, AsyncGenerator
|
from typing import Any, Callable, Optional, Literal, AsyncGenerator
|
||||||
|
@ -52,14 +51,14 @@ from ..data._web_bs import (
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
_ohlc_dtype = [
|
_ohlc_dtype = [
|
||||||
("index", int),
|
('index', int),
|
||||||
("time", int),
|
('time', int),
|
||||||
("open", float),
|
('open', float),
|
||||||
("high", float),
|
('high', float),
|
||||||
("low", float),
|
('low', float),
|
||||||
("close", float),
|
('close', float),
|
||||||
("volume", float),
|
('volume', float),
|
||||||
("bar_wap", float), # will be zeroed by sampler if not filled
|
('bar_wap', float), # will be zeroed by sampler if not filled
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,6 +67,7 @@ class KucoinMktPair(Struct, frozen=True):
|
||||||
Kucoin's pair format
|
Kucoin's pair format
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
baseCurrency: str
|
baseCurrency: str
|
||||||
baseIncrement: float
|
baseIncrement: float
|
||||||
baseMaxSize: float
|
baseMaxSize: float
|
||||||
|
@ -92,6 +92,7 @@ class AccountTrade(Struct, frozen=True):
|
||||||
Historical trade format
|
Historical trade format
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
id: str
|
id: str
|
||||||
currency: str
|
currency: str
|
||||||
amount: float
|
amount: float
|
||||||
|
@ -99,7 +100,7 @@ class AccountTrade(Struct, frozen=True):
|
||||||
balance: float
|
balance: float
|
||||||
accountType: str
|
accountType: str
|
||||||
bizType: str
|
bizType: str
|
||||||
direction: Literal["in", "out"]
|
direction: Literal['in', 'out']
|
||||||
createdAt: float
|
createdAt: float
|
||||||
context: list[str]
|
context: list[str]
|
||||||
|
|
||||||
|
@ -117,6 +118,7 @@ class KucoinTrade(Struct, frozen=True):
|
||||||
Real-time trade format
|
Real-time trade format
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
bestAsk: float
|
bestAsk: float
|
||||||
bestAskSize: float
|
bestAskSize: float
|
||||||
bestBid: float
|
bestBid: float
|
||||||
|
@ -127,26 +129,37 @@ class KucoinTrade(Struct, frozen=True):
|
||||||
time: float
|
time: float
|
||||||
|
|
||||||
|
|
||||||
|
class KucoinL2(Struct, frozen=True):
|
||||||
|
'''
|
||||||
|
Real-time L2 order book format
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
asks: list[list[float]]
|
||||||
|
bids: list[list[float]]
|
||||||
|
timestamp: float
|
||||||
|
|
||||||
|
|
||||||
|
class KucoinMsg(Struct, frozen=True):
|
||||||
|
type: str
|
||||||
|
topic: str
|
||||||
|
subject: str
|
||||||
|
data: list[KucoinTrade | KucoinL2]
|
||||||
|
|
||||||
|
|
||||||
class BrokerConfig(Struct, frozen=True):
|
class BrokerConfig(Struct, frozen=True):
|
||||||
key_id: str
|
key_id: str
|
||||||
key_secret: str
|
key_secret: str
|
||||||
key_passphrase: str
|
key_passphrase: str
|
||||||
|
|
||||||
|
|
||||||
class KucoinTradeMsg(Struct, frozen=True):
|
|
||||||
type: str
|
|
||||||
topic: str
|
|
||||||
subject: str
|
|
||||||
data: list[KucoinTrade]
|
|
||||||
|
|
||||||
|
|
||||||
def get_config() -> BrokerConfig | None:
|
def get_config() -> BrokerConfig | None:
|
||||||
conf, path = config.load()
|
conf, path = config.load()
|
||||||
|
|
||||||
section = conf.get("kucoin")
|
section = conf.get('kucoin')
|
||||||
|
|
||||||
if section is None:
|
if section is None:
|
||||||
log.warning("No config section found for kucoin in config")
|
log.warning('No config section found for kucoin in config')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return BrokerConfig(**section)
|
return BrokerConfig(**section)
|
||||||
|
@ -163,19 +176,18 @@ class Client:
|
||||||
|
|
||||||
config: BrokerConfig | None = get_config()
|
config: BrokerConfig | None = get_config()
|
||||||
|
|
||||||
if (
|
if config and config.key_id and config.key_secret and config.key_passphrase:
|
||||||
config and float(config.key_id) and config.key_secret and config.key_passphrase
|
|
||||||
):
|
|
||||||
self._authenticated = True
|
self._authenticated = True
|
||||||
self._key_id = config.key_id
|
self._key_id = config.key_id
|
||||||
self._key_secret = config.key_secret
|
self._key_secret = config.key_secret
|
||||||
self._key_passphrase = config.key_passphrase
|
self._key_passphrase = config.key_passphrase
|
||||||
|
log.info('User credentials added')
|
||||||
|
|
||||||
def _gen_auth_req_headers(
|
def _gen_auth_req_headers(
|
||||||
self,
|
self,
|
||||||
action: Literal["POST", "GET"],
|
action: Literal['POST', 'GET'],
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
api_v: str = "v2",
|
api_v: str = 'v2',
|
||||||
) -> dict[str, str | bytes]:
|
) -> dict[str, str | bytes]:
|
||||||
'''
|
'''
|
||||||
Generate authenticated request headers
|
Generate authenticated request headers
|
||||||
|
@ -183,39 +195,39 @@ class Client:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
now = int(time.time() * 1000)
|
now = int(time.time() * 1000)
|
||||||
path = f"/api/{api_v}{endpoint}"
|
path = f'/api/{api_v}{endpoint}'
|
||||||
str_to_sign = str(now) + action + path
|
str_to_sign = str(now) + action + path
|
||||||
|
|
||||||
signature = base64.b64encode(
|
signature = base64.b64encode(
|
||||||
hmac.new(
|
hmac.new(
|
||||||
self._key_secret.encode("utf-8"),
|
self._key_secret.encode('utf-8'),
|
||||||
str_to_sign.encode("utf-8"),
|
str_to_sign.encode('utf-8'),
|
||||||
hashlib.sha256,
|
hashlib.sha256,
|
||||||
).digest()
|
).digest()
|
||||||
)
|
)
|
||||||
|
|
||||||
passphrase = base64.b64encode(
|
passphrase = base64.b64encode(
|
||||||
hmac.new(
|
hmac.new(
|
||||||
self._key_secret.encode("utf-8"),
|
self._key_secret.encode('utf-8'),
|
||||||
self._key_passphrase.encode("utf-8"),
|
self._key_passphrase.encode('utf-8'),
|
||||||
hashlib.sha256,
|
hashlib.sha256,
|
||||||
).digest()
|
).digest()
|
||||||
)
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"KC-API-SIGN": signature,
|
'KC-API-SIGN': signature,
|
||||||
"KC-API-TIMESTAMP": str(now),
|
'KC-API-TIMESTAMP': str(now),
|
||||||
"KC-API-KEY": self._key_id,
|
'KC-API-KEY': self._key_id,
|
||||||
"KC-API-PASSPHRASE": passphrase,
|
'KC-API-PASSPHRASE': passphrase,
|
||||||
# XXX: Even if using the v1 api - this stays the same
|
# XXX: Even if using the v1 api - this stays the same
|
||||||
"KC-API-KEY-VERSION": "2",
|
'KC-API-KEY-VERSION': '2',
|
||||||
}
|
}
|
||||||
|
|
||||||
async def _request(
|
async def _request(
|
||||||
self,
|
self,
|
||||||
action: Literal["POST", "GET"],
|
action: Literal['POST', 'GET'],
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
api_v: str = "v2",
|
api_v: str = 'v2',
|
||||||
headers: dict = {},
|
headers: dict = {},
|
||||||
) -> Any:
|
) -> Any:
|
||||||
'''
|
'''
|
||||||
|
@ -225,34 +237,29 @@ class Client:
|
||||||
if self._authenticated:
|
if self._authenticated:
|
||||||
headers = self._gen_auth_req_headers(action, endpoint, api_v)
|
headers = self._gen_auth_req_headers(action, endpoint, api_v)
|
||||||
|
|
||||||
api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}"
|
api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}'
|
||||||
|
|
||||||
res = await asks.request(action, api_url, headers=headers)
|
res = await asks.request(action, api_url, headers=headers)
|
||||||
|
|
||||||
if "data" in res.json():
|
if 'data' in res.json():
|
||||||
return res.json()["data"]
|
return res.json()['data']
|
||||||
else:
|
else:
|
||||||
log.error(f'Error making request to {api_url} -> {res.json()["msg"]}')
|
log.error(f'Error making request to {api_url} -> {res.json()["msg"]}')
|
||||||
return res.json()["msg"]
|
return res.json()['msg']
|
||||||
|
|
||||||
async def _get_ws_token(
|
async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None:
|
||||||
self,
|
|
||||||
private: bool = False
|
|
||||||
) -> tuple[str, int] | None:
|
|
||||||
'''
|
'''
|
||||||
Fetch ws token needed for sub access
|
Fetch ws token needed for sub access
|
||||||
|
|
||||||
'''
|
'''
|
||||||
token_type = "private" if private else "public"
|
token_type = 'private' if private else 'public'
|
||||||
data: dict[str, Any] | None = await self._request(
|
data: dict[str, Any] | None = await self._request(
|
||||||
"POST",
|
'POST', f'/bullet-{token_type}', 'v1'
|
||||||
f"/bullet-{token_type}",
|
|
||||||
"v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if data and "token" in data:
|
if data and 'token' in data:
|
||||||
ping_interval: int = data["instanceServers"][0]["pingInterval"]
|
ping_interval: int = data['instanceServers'][0]['pingInterval']
|
||||||
return data["token"], ping_interval
|
return data['token'], ping_interval
|
||||||
elif data:
|
elif data:
|
||||||
log.error(
|
log.error(
|
||||||
f'Error making request for Kucoin ws token -> {data.json()["msg"]}'
|
f'Error making request for Kucoin ws token -> {data.json()["msg"]}'
|
||||||
|
@ -264,8 +271,10 @@ class Client:
|
||||||
if self._pairs:
|
if self._pairs:
|
||||||
return self._pairs
|
return self._pairs
|
||||||
|
|
||||||
entries = await self._request("GET", "/symbols")
|
entries = await self._request('GET', '/symbols')
|
||||||
syms = {item["name"]: KucoinMktPair(**item) for item in entries}
|
syms = {item['name']: KucoinMktPair(**item) for item in entries}
|
||||||
|
|
||||||
|
log.info('Kucoin market pairs fetches')
|
||||||
return syms
|
return syms
|
||||||
|
|
||||||
async def cache_pairs(
|
async def cache_pairs(
|
||||||
|
@ -285,14 +294,14 @@ class Client:
|
||||||
def _normalize_pairs(
|
def _normalize_pairs(
|
||||||
self, pairs: dict[str, KucoinMktPair]
|
self, pairs: dict[str, KucoinMktPair]
|
||||||
) -> dict[str, KucoinMktPair]:
|
) -> dict[str, KucoinMktPair]:
|
||||||
"""
|
'''
|
||||||
Map kucoin pairs to fqsn strings
|
Map kucoin pairs to fqsn strings
|
||||||
|
|
||||||
"""
|
'''
|
||||||
norm_pairs = {}
|
norm_pairs = {}
|
||||||
|
|
||||||
for key, value in pairs.items():
|
for key, value in pairs.items():
|
||||||
fqsn = key.lower().replace("-", "")
|
fqsn = key.lower().replace('-', '')
|
||||||
norm_pairs[fqsn] = value
|
norm_pairs[fqsn] = value
|
||||||
|
|
||||||
return norm_pairs
|
return norm_pairs
|
||||||
|
@ -309,7 +318,7 @@ class Client:
|
||||||
return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches}
|
return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches}
|
||||||
|
|
||||||
async def last_trades(self, sym: str) -> list[AccountTrade]:
|
async def last_trades(self, sym: str) -> list[AccountTrade]:
|
||||||
trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1")
|
trades = await self._request('GET', f'/accounts/ledgers?currency={sym}', 'v1')
|
||||||
trades = AccountResponse(**trades)
|
trades = AccountResponse(**trades)
|
||||||
return trades.items
|
return trades.items
|
||||||
|
|
||||||
|
@ -320,38 +329,38 @@ class Client:
|
||||||
end_dt: Optional[datetime] = None,
|
end_dt: Optional[datetime] = None,
|
||||||
limit: int = 1000,
|
limit: int = 1000,
|
||||||
as_np: bool = True,
|
as_np: bool = True,
|
||||||
type: str = "1min",
|
type: str = '1min',
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
'''
|
'''
|
||||||
Get OHLC data and convert to numpy array for perffff
|
Get OHLC data and convert to numpy array for perffff
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# Generate generic end and start time if values not passed
|
# Generate generic end and start time if values not passed
|
||||||
|
# Currently gives us 12hrs of data
|
||||||
if end_dt is None:
|
if end_dt is None:
|
||||||
end_dt = pendulum.now("UTC").add(minutes=1)
|
end_dt = pendulum.now('UTC').add(minutes=1)
|
||||||
|
|
||||||
if start_dt is None:
|
if start_dt is None:
|
||||||
start_dt = end_dt.start_of("minute").subtract(minutes=limit)
|
start_dt = end_dt.start_of('minute').subtract(minutes=limit)
|
||||||
|
|
||||||
|
start_dt = int(start_dt.timestamp())
|
||||||
|
end_dt = int(end_dt.timestamp())
|
||||||
|
|
||||||
# Format datetime to unix timestamp
|
|
||||||
start_dt = math.trunc(time.mktime(start_dt.timetuple()))
|
|
||||||
end_dt = math.trunc(time.mktime(end_dt.timetuple()))
|
|
||||||
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
|
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
|
||||||
|
|
||||||
url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}"
|
url = f'/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}'
|
||||||
bars = []
|
bars = []
|
||||||
|
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
|
|
||||||
data = await self._request(
|
data = await self._request(
|
||||||
"GET",
|
'GET',
|
||||||
url,
|
url,
|
||||||
api_v="v1",
|
api_v='v1',
|
||||||
)
|
)
|
||||||
|
|
||||||
if not isinstance(data, list):
|
if not isinstance(data, list):
|
||||||
# Do a gradual backoff if Kucoin is rate limiting us
|
# Do a gradual backoff if Kucoin is rate limiting us
|
||||||
backoff_interval = i + (randint(0, 1000) / 1000)
|
backoff_interval = i
|
||||||
log.warn(f'History call failed, backing off for {backoff_interval}s')
|
log.warn(f'History call failed, backing off for {backoff_interval}s')
|
||||||
await trio.sleep(backoff_interval)
|
await trio.sleep(backoff_interval)
|
||||||
else:
|
else:
|
||||||
|
@ -361,64 +370,58 @@ class Client:
|
||||||
# Map to OHLC values to dict then to np array
|
# Map to OHLC values to dict then to np array
|
||||||
new_bars = []
|
new_bars = []
|
||||||
for i, bar in enumerate(bars[::-1]):
|
for i, bar in enumerate(bars[::-1]):
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"index": i,
|
'index': i,
|
||||||
"time": bar[0],
|
'time': bar[0],
|
||||||
"open": bar[1],
|
'open': bar[1],
|
||||||
"close": bar[2],
|
'close': bar[2],
|
||||||
"high": bar[3],
|
'high': bar[3],
|
||||||
"low": bar[4],
|
'low': bar[4],
|
||||||
"volume": bar[5],
|
'volume': bar[5],
|
||||||
"amount": bar[6],
|
'amount': bar[6],
|
||||||
"bar_wap": 0.0,
|
'bar_wap': 0.0,
|
||||||
}
|
}
|
||||||
|
|
||||||
row = []
|
row = []
|
||||||
for j, (field_name, field_type) in enumerate(_ohlc_dtype):
|
for _, (field_name, field_type) in enumerate(_ohlc_dtype):
|
||||||
|
|
||||||
value = data[field_name]
|
value = data[field_name]
|
||||||
|
|
||||||
match field_name:
|
match field_name:
|
||||||
case "index" | "time":
|
case 'index':
|
||||||
row.append(int(value))
|
row.append(int(value))
|
||||||
|
case 'time':
|
||||||
|
row.append(value)
|
||||||
case _:
|
case _:
|
||||||
row.append(float(value))
|
row.append(float(value))
|
||||||
|
|
||||||
new_bars.append(tuple(row))
|
new_bars.append(tuple(row))
|
||||||
|
|
||||||
self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
||||||
return array
|
return array
|
||||||
|
|
||||||
|
|
||||||
def fqsn_to_kucoin_sym(
|
def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str:
|
||||||
fqsn: str,
|
|
||||||
pairs: dict[str, KucoinMktPair]
|
|
||||||
|
|
||||||
|
|
||||||
) -> str:
|
|
||||||
pair_data = pairs[fqsn]
|
pair_data = pairs[fqsn]
|
||||||
return pair_data.baseCurrency + "-" + pair_data.quoteCurrency
|
return pair_data.baseCurrency + '-' + pair_data.quoteCurrency
|
||||||
|
|
||||||
|
|
||||||
def kucoin_sym_to_fqsn(sym: str) -> str:
|
def kucoin_sym_to_fqsn(sym: str) -> str:
|
||||||
return sym.lower().replace("-", "")
|
return sym.lower().replace('-', '')
|
||||||
|
|
||||||
|
|
||||||
@ acm
|
@acm
|
||||||
async def get_client() -> AsyncGenerator[Client, None]:
|
async def get_client() -> AsyncGenerator[Client, None]:
|
||||||
|
|
||||||
client = Client()
|
client = Client()
|
||||||
await client.cache_pairs()
|
await client.cache_pairs()
|
||||||
|
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
|
||||||
@ tractor.context
|
@tractor.context
|
||||||
async def open_symbol_search(
|
async def open_symbol_search(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
) -> None:
|
) -> None:
|
||||||
async with open_cached_client("kucoin") as client:
|
async with open_cached_client('kucoin') as client:
|
||||||
# load all symbols locally for fast search
|
# load all symbols locally for fast search
|
||||||
await client.cache_pairs()
|
await client.cache_pairs()
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
@ -426,6 +429,7 @@ async def open_symbol_search(
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
await stream.send(await client.search_symbols(pattern))
|
await stream.send(await client.search_symbols(pattern))
|
||||||
|
log.info('Kucoin symbol search opened')
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
@ -443,11 +447,10 @@ async def stream_quotes(
|
||||||
'''
|
'''
|
||||||
connect_id = str(uuid4())
|
connect_id = str(uuid4())
|
||||||
|
|
||||||
async with open_cached_client("kucoin") as client:
|
async with open_cached_client('kucoin') as client:
|
||||||
|
log.info('Starting up quote stream')
|
||||||
# loop through symbols and sub to feedz
|
# loop through symbols and sub to feedz
|
||||||
for sym in symbols:
|
for sym in symbols:
|
||||||
|
|
||||||
token, ping_interval = await client._get_ws_token()
|
token, ping_interval = await client._get_ws_token()
|
||||||
pairs = await client.cache_pairs()
|
pairs = await client.cache_pairs()
|
||||||
kucoin_sym = pairs[sym].symbol
|
kucoin_sym = pairs[sym].symbol
|
||||||
|
@ -456,19 +459,18 @@ async def stream_quotes(
|
||||||
# pass back token, and bool, signalling if we're the writer
|
# pass back token, and bool, signalling if we're the writer
|
||||||
# and that history has been written
|
# and that history has been written
|
||||||
sym: {
|
sym: {
|
||||||
"symbol_info": {
|
'symbol_info': {
|
||||||
"asset_type": "crypto",
|
'asset_type': 'crypto',
|
||||||
"price_tick_size": 0.0005,
|
'price_tick_size': 0.0005,
|
||||||
"lot_tick_size": 0.1,
|
'lot_tick_size': 0.1,
|
||||||
},
|
},
|
||||||
"shm_write_opts": {"sum_tick_vml": False},
|
'shm_write_opts': {'sum_tick_vml': False},
|
||||||
"fqsn": sym,
|
'fqsn': sym,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def subscribe(ws: wsproto.WSConnection):
|
async def subscribe(ws: wsproto.WSConnection):
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_ping_task(ws: wsproto.WSConnection):
|
async def open_ping_task(ws: wsproto.WSConnection):
|
||||||
'''
|
'''
|
||||||
|
@ -478,12 +480,13 @@ async def stream_quotes(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
|
# TODO: cache this task so it's only called once
|
||||||
async def ping_server():
|
async def ping_server():
|
||||||
while True:
|
while True:
|
||||||
await trio.sleep((ping_interval - 1000) / 1000)
|
await trio.sleep((ping_interval - 1000) / 1000)
|
||||||
await ws.send_msg({"id": connect_id, "type": "ping"})
|
await ws.send_msg({'id': connect_id, 'type': 'ping'})
|
||||||
|
|
||||||
|
log.info(f'Starting ping task for {sym}')
|
||||||
n.start_soon(ping_server)
|
n.start_soon(ping_server)
|
||||||
|
|
||||||
yield ws
|
yield ws
|
||||||
|
@ -492,32 +495,37 @@ async def stream_quotes(
|
||||||
|
|
||||||
# Spawn the ping task here
|
# Spawn the ping task here
|
||||||
async with open_ping_task(ws) as ws:
|
async with open_ping_task(ws) as ws:
|
||||||
# subscribe to market feedz here
|
tasks = []
|
||||||
l1_sub = make_sub(kucoin_sym, connect_id)
|
tasks.append(make_sub(kucoin_sym, connect_id, level='l3'))
|
||||||
await ws.send_msg(l1_sub)
|
tasks.append(make_sub(kucoin_sym, connect_id, level='l1'))
|
||||||
|
|
||||||
|
for task in tasks:
|
||||||
|
log.info(f'Subscribing to {task["topic"]} feed for {sym}')
|
||||||
|
await ws.send_msg(task)
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# unsub
|
# unsub
|
||||||
if ws.connected():
|
if ws.connected():
|
||||||
|
log.info(f'Unsubscribing to {kucoin_sym} feed')
|
||||||
await ws.send_msg(
|
await ws.send_msg(
|
||||||
{
|
{
|
||||||
"id": connect_id,
|
'id': connect_id,
|
||||||
"type": "unsubscribe",
|
'type': 'unsubscribe',
|
||||||
"topic": f"/market/ticker:{sym}",
|
'topic': f'/market/ticker:{sym}',
|
||||||
"privateChannel": False,
|
'privateChannel': False,
|
||||||
"response": True,
|
'response': True,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
async with open_autorecon_ws(
|
async with open_autorecon_ws(
|
||||||
f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]",
|
f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]',
|
||||||
fixture=subscribe,
|
fixture=subscribe,
|
||||||
) as ws:
|
) as ws:
|
||||||
msg_gen = stream_messages(ws, sym)
|
msg_gen = stream_messages(ws, sym)
|
||||||
typ, quote = await msg_gen.__anext__()
|
typ, quote = await msg_gen.__anext__()
|
||||||
#
|
|
||||||
while typ != "trade":
|
while typ != 'trade':
|
||||||
# TODO: use ``anext()`` when it lands in 3.10!
|
# TODO: use ``anext()`` when it lands in 3.10!
|
||||||
typ, quote = await msg_gen.__anext__()
|
typ, quote = await msg_gen.__anext__()
|
||||||
|
|
||||||
|
@ -528,22 +536,30 @@ async def stream_quotes(
|
||||||
await send_chan.send({sym: msg})
|
await send_chan.send({sym: msg})
|
||||||
|
|
||||||
|
|
||||||
def make_sub(sym, connect_id) -> dict[str, str | bool]:
|
def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
|
||||||
return {
|
match level:
|
||||||
"id": connect_id,
|
case 'l1':
|
||||||
"type": "subscribe",
|
return {
|
||||||
"topic": f"/market/ticker:{sym}",
|
'id': connect_id,
|
||||||
"privateChannel": False,
|
'type': 'subscribe',
|
||||||
"response": True,
|
'topic': f'/spotMarket/level2Depth5:{sym}',
|
||||||
}
|
'privateChannel': False,
|
||||||
|
'response': True,
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'l3':
|
||||||
|
return {
|
||||||
|
'id': connect_id,
|
||||||
|
'type': 'subscribe',
|
||||||
|
'topic': f'/market/ticker:{sym}',
|
||||||
|
'privateChannel': False,
|
||||||
|
'response': True,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async def stream_messages(
|
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
||||||
ws: NoBsWs,
|
|
||||||
sym: str
|
|
||||||
) -> AsyncGenerator[NoBsWs, dict]:
|
|
||||||
|
|
||||||
timeouts = 0
|
timeouts = 0
|
||||||
|
last_ts: int = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
|
@ -551,52 +567,84 @@ async def stream_messages(
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
timeouts += 1
|
timeouts += 1
|
||||||
if timeouts > 2:
|
if timeouts > 2:
|
||||||
log.error("kucoin feed is sh**ing the bed... rebooting...")
|
log.error('kucoin feed is sh**ing the bed... rebooting...')
|
||||||
await ws._connect()
|
await ws._connect()
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if "subject" in msg and msg["subject"] == "trade.ticker":
|
if msg.get('subject'):
|
||||||
|
msg = KucoinMsg(**msg)
|
||||||
|
|
||||||
trade_msg = KucoinTradeMsg(**msg)
|
match msg.subject:
|
||||||
trade_data = KucoinTrade(**trade_msg.data)
|
case 'trade.ticker':
|
||||||
|
trade_data = KucoinTrade(**msg.data)
|
||||||
|
ts = trade_data.time
|
||||||
|
if ts <= last_ts:
|
||||||
|
continue
|
||||||
|
|
||||||
yield "trade", {
|
last_ts = ts
|
||||||
"symbol": sym,
|
yield 'trade', {
|
||||||
"last": trade_data.price,
|
'symbol': sym,
|
||||||
"brokerd_ts": trade_data.time,
|
'last': trade_data.price,
|
||||||
"ticks": [
|
'brokerd_ts': trade_data.time,
|
||||||
{
|
'ticks': [
|
||||||
"type": "trade",
|
{
|
||||||
"price": float(trade_data.price),
|
'type': 'trade',
|
||||||
"size": float(trade_data.size),
|
'price': float(trade_data.price),
|
||||||
"broker_ts": trade_data.time,
|
'size': float(trade_data.size),
|
||||||
|
'broker_ts': ts,
|
||||||
|
}
|
||||||
|
],
|
||||||
}
|
}
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
else:
|
case 'level2':
|
||||||
continue
|
l2_data = KucoinL2(**msg.data)
|
||||||
|
first_ask = l2_data.asks[0]
|
||||||
|
first_bid = l2_data.bids[0]
|
||||||
|
yield 'l1', {
|
||||||
|
'symbol': sym,
|
||||||
|
'ticks': [
|
||||||
|
{
|
||||||
|
'type': 'bid',
|
||||||
|
'price': float(first_bid[0]),
|
||||||
|
'size': float(first_bid[1]),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'type': 'bsize',
|
||||||
|
'price': float(first_bid[0]),
|
||||||
|
'size': float(first_bid[1]),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'type': 'ask',
|
||||||
|
'price': float(first_ask[0]),
|
||||||
|
'size': float(first_ask[1]),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'type': 'asize',
|
||||||
|
'price': float(first_ask[0]),
|
||||||
|
'size': float(first_ask[1]),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_history_client(
|
async def open_history_client(
|
||||||
symbol: str,
|
symbol: str,
|
||||||
type: str = "1m",
|
type: str = '1m',
|
||||||
) -> AsyncGenerator[Callable, None]:
|
) -> AsyncGenerator[Callable, None]:
|
||||||
async with open_cached_client("kucoin") as client:
|
async with open_cached_client('kucoin') as client:
|
||||||
log.info("Attempting to open kucoin history client")
|
|
||||||
|
log.info('Attempting to open kucoin history client')
|
||||||
|
|
||||||
async def get_ohlc_history(
|
async def get_ohlc_history(
|
||||||
|
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
end_dt: datetime | None = None,
|
end_dt: datetime | None = None,
|
||||||
start_dt: datetime | None = None,
|
start_dt: datetime | None = None,
|
||||||
|
|
||||||
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end
|
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end
|
||||||
|
|
||||||
if timeframe != 60:
|
if timeframe != 60:
|
||||||
raise DataUnavailable("Only 1m bars are supported")
|
raise DataUnavailable('Only 1m bars are supported')
|
||||||
|
|
||||||
array = await client._get_bars(
|
array = await client._get_bars(
|
||||||
symbol,
|
symbol,
|
||||||
|
@ -604,14 +652,13 @@ async def open_history_client(
|
||||||
end_dt=end_dt,
|
end_dt=end_dt,
|
||||||
)
|
)
|
||||||
|
|
||||||
times = array["time"]
|
times = array['time']
|
||||||
|
|
||||||
if end_dt is None:
|
if end_dt is None:
|
||||||
|
|
||||||
inow = round(time.time())
|
inow = round(time.time())
|
||||||
|
|
||||||
print(
|
print(
|
||||||
f"difference in time between load and processing {inow - times[-1]}"
|
f'difference in time between load and processing {inow - times[-1]}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if (inow - times[-1]) > 60:
|
if (inow - times[-1]) > 60:
|
||||||
|
@ -619,6 +666,9 @@ async def open_history_client(
|
||||||
|
|
||||||
start_dt = pendulum.from_timestamp(times[0])
|
start_dt = pendulum.from_timestamp(times[0])
|
||||||
end_dt = pendulum.from_timestamp(times[-1])
|
end_dt = pendulum.from_timestamp(times[-1])
|
||||||
|
|
||||||
|
log.info('History succesfully fetched baby')
|
||||||
|
|
||||||
return array, start_dt, end_dt
|
return array, start_dt, end_dt
|
||||||
|
|
||||||
yield get_ohlc_history, {"erlangs": 3, "rate": 3}
|
yield get_ohlc_history, {}
|
||||||
|
|
Loading…
Reference in New Issue