Format and ensure we're only grabbing the most closest bid and ask

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-27 21:51:54 -04:00
parent 480b8c591a
commit ff22f2d240
1 changed files with 143 additions and 150 deletions

View File

@ -13,10 +13,10 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
Kucoin broker backend Kucoin broker backend
""" '''
from random import randint from random import randint
from typing import Any, Callable, Optional, Literal, AsyncGenerator from typing import Any, Callable, Optional, Literal, AsyncGenerator
@ -51,14 +51,14 @@ from ..data._web_bs import (
log = get_logger(__name__) log = get_logger(__name__)
_ohlc_dtype = [ _ohlc_dtype = [
("index", int), ('index', int),
("time", int), ('time', int),
("open", float), ('open', float),
("high", float), ('high', float),
("low", float), ('low', float),
("close", float), ('close', float),
("volume", float), ('volume', float),
("bar_wap", float), # will be zeroed by sampler if not filled ('bar_wap', float), # will be zeroed by sampler if not filled
] ]
@ -67,6 +67,7 @@ class KucoinMktPair(Struct, frozen=True):
Kucoin's pair format Kucoin's pair format
''' '''
baseCurrency: str baseCurrency: str
baseIncrement: float baseIncrement: float
baseMaxSize: float baseMaxSize: float
@ -91,6 +92,7 @@ class AccountTrade(Struct, frozen=True):
Historical trade format Historical trade format
''' '''
id: str id: str
currency: str currency: str
amount: float amount: float
@ -98,7 +100,7 @@ class AccountTrade(Struct, frozen=True):
balance: float balance: float
accountType: str accountType: str
bizType: str bizType: str
direction: Literal["in", "out"] direction: Literal['in', 'out']
createdAt: float createdAt: float
context: list[str] context: list[str]
@ -116,6 +118,7 @@ class KucoinTrade(Struct, frozen=True):
Real-time trade format Real-time trade format
''' '''
bestAsk: float bestAsk: float
bestAskSize: float bestAskSize: float
bestBid: float bestBid: float
@ -131,6 +134,7 @@ class KucoinL2(Struct, frozen=True):
Real-time L2 order book format Real-time L2 order book format
''' '''
asks: list[list[float]] asks: list[list[float]]
bids: list[list[float]] bids: list[list[float]]
timestamp: float timestamp: float
@ -152,10 +156,10 @@ class BrokerConfig(Struct, frozen=True):
def get_config() -> BrokerConfig | None: def get_config() -> BrokerConfig | None:
conf, path = config.load() conf, path = config.load()
section = conf.get("kucoin") section = conf.get('kucoin')
if section is None: if section is None:
log.warning("No config section found for kucoin in config") log.warning('No config section found for kucoin in config')
return None return None
return BrokerConfig(**section) return BrokerConfig(**section)
@ -172,9 +176,7 @@ class Client:
config: BrokerConfig | None = get_config() config: BrokerConfig | None = get_config()
if ( if config and config.key_id and config.key_secret and config.key_passphrase:
config and config.key_id and config.key_secret and config.key_passphrase
):
self._authenticated = True self._authenticated = True
self._key_id = config.key_id self._key_id = config.key_id
self._key_secret = config.key_secret self._key_secret = config.key_secret
@ -182,9 +184,9 @@ class Client:
def _gen_auth_req_headers( def _gen_auth_req_headers(
self, self,
action: Literal["POST", "GET"], action: Literal['POST', 'GET'],
endpoint: str, endpoint: str,
api_v: str = "v2", api_v: str = 'v2',
) -> dict[str, str | bytes]: ) -> dict[str, str | bytes]:
''' '''
Generate authenticated request headers Generate authenticated request headers
@ -193,39 +195,39 @@ class Client:
''' '''
breakpoint() breakpoint()
now = int(time.time() * 1000) now = int(time.time() * 1000)
path = f"/api/{api_v}{endpoint}" path = f'/api/{api_v}{endpoint}'
str_to_sign = str(now) + action + path str_to_sign = str(now) + action + path
signature = base64.b64encode( signature = base64.b64encode(
hmac.new( hmac.new(
self._key_secret.encode("utf-8"), self._key_secret.encode('utf-8'),
str_to_sign.encode("utf-8"), str_to_sign.encode('utf-8'),
hashlib.sha256, hashlib.sha256,
).digest() ).digest()
) )
passphrase = base64.b64encode( passphrase = base64.b64encode(
hmac.new( hmac.new(
self._key_secret.encode("utf-8"), self._key_secret.encode('utf-8'),
self._key_passphrase.encode("utf-8"), self._key_passphrase.encode('utf-8'),
hashlib.sha256, hashlib.sha256,
).digest() ).digest()
) )
return { return {
"KC-API-SIGN": signature, 'KC-API-SIGN': signature,
"KC-API-TIMESTAMP": str(now), 'KC-API-TIMESTAMP': str(now),
"KC-API-KEY": self._key_id, 'KC-API-KEY': self._key_id,
"KC-API-PASSPHRASE": passphrase, 'KC-API-PASSPHRASE': passphrase,
# XXX: Even if using the v1 api - this stays the same # XXX: Even if using the v1 api - this stays the same
"KC-API-KEY-VERSION": "2", 'KC-API-KEY-VERSION': '2',
} }
async def _request( async def _request(
self, self,
action: Literal["POST", "GET"], action: Literal['POST', 'GET'],
endpoint: str, endpoint: str,
api_v: str = "v2", api_v: str = 'v2',
headers: dict = {}, headers: dict = {},
) -> Any: ) -> Any:
''' '''
@ -235,34 +237,29 @@ class Client:
if self._authenticated: if self._authenticated:
headers = self._gen_auth_req_headers(action, endpoint, api_v) headers = self._gen_auth_req_headers(action, endpoint, api_v)
api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}" api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}'
res = await asks.request(action, api_url, headers=headers) res = await asks.request(action, api_url, headers=headers)
if "data" in res.json(): if 'data' in res.json():
return res.json()["data"] return res.json()['data']
else: else:
log.error(f'Error making request to {api_url} -> {res.json()["msg"]}') log.error(f'Error making request to {api_url} -> {res.json()["msg"]}')
return res.json()["msg"] return res.json()['msg']
async def _get_ws_token( async def _get_ws_token(self, private: bool = False) -> tuple[str, int] | None:
self,
private: bool = False
) -> tuple[str, int] | None:
''' '''
Fetch ws token needed for sub access Fetch ws token needed for sub access
''' '''
token_type = "private" if private else "public" token_type = 'private' if private else 'public'
data: dict[str, Any] | None = await self._request( data: dict[str, Any] | None = await self._request(
"POST", 'POST', f'/bullet-{token_type}', 'v1'
f"/bullet-{token_type}",
"v1"
) )
if data and "token" in data: if data and 'token' in data:
ping_interval: int = data["instanceServers"][0]["pingInterval"] ping_interval: int = data['instanceServers'][0]['pingInterval']
return data["token"], ping_interval return data['token'], ping_interval
elif data: elif data:
log.error( log.error(
f'Error making request for Kucoin ws token -> {data.json()["msg"]}' f'Error making request for Kucoin ws token -> {data.json()["msg"]}'
@ -274,8 +271,8 @@ class Client:
if self._pairs: if self._pairs:
return self._pairs return self._pairs
entries = await self._request("GET", "/symbols") entries = await self._request('GET', '/symbols')
syms = {item["name"]: KucoinMktPair(**item) for item in entries} syms = {item['name']: KucoinMktPair(**item) for item in entries}
return syms return syms
async def cache_pairs( async def cache_pairs(
@ -295,14 +292,14 @@ class Client:
def _normalize_pairs( def _normalize_pairs(
self, pairs: dict[str, KucoinMktPair] self, pairs: dict[str, KucoinMktPair]
) -> dict[str, KucoinMktPair]: ) -> dict[str, KucoinMktPair]:
""" '''
Map kucoin pairs to fqsn strings Map kucoin pairs to fqsn strings
""" '''
norm_pairs = {} norm_pairs = {}
for key, value in pairs.items(): for key, value in pairs.items():
fqsn = key.lower().replace("-", "") fqsn = key.lower().replace('-', '')
norm_pairs[fqsn] = value norm_pairs[fqsn] = value
return norm_pairs return norm_pairs
@ -319,7 +316,7 @@ class Client:
return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches} return {kucoin_sym_to_fqsn(item[0].name): item[0] for item in matches}
async def last_trades(self, sym: str) -> list[AccountTrade]: async def last_trades(self, sym: str) -> list[AccountTrade]:
trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1") trades = await self._request('GET', f'/accounts/ledgers?currency={sym}', 'v1')
trades = AccountResponse(**trades) trades = AccountResponse(**trades)
return trades.items return trades.items
@ -330,7 +327,7 @@ class Client:
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
type: str = "1min", type: str = '1min',
) -> np.ndarray: ) -> np.ndarray:
''' '''
Get OHLC data and convert to numpy array for perffff Get OHLC data and convert to numpy array for perffff
@ -339,24 +336,24 @@ class Client:
# Generate generic end and start time if values not passed # Generate generic end and start time if values not passed
# Currently gives us 12hrs of data # Currently gives us 12hrs of data
if end_dt is None: if end_dt is None:
end_dt = pendulum.now("UTC").add(minutes=1) end_dt = pendulum.now('UTC').add(minutes=1)
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of("minute").subtract(minutes=limit) start_dt = end_dt.start_of('minute').subtract(minutes=limit)
start_dt = int(start_dt.timestamp()) start_dt = int(start_dt.timestamp())
end_dt = int(end_dt.timestamp()) end_dt = int(end_dt.timestamp())
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" url = f'/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}'
bars = [] bars = []
for i in range(10): for i in range(10):
data = await self._request( data = await self._request(
"GET", 'GET',
url, url,
api_v="v1", api_v='v1',
) )
if not isinstance(data, list): if not isinstance(data, list):
@ -371,28 +368,26 @@ class Client:
# Map to OHLC values to dict then to np array # Map to OHLC values to dict then to np array
new_bars = [] new_bars = []
for i, bar in enumerate(bars[::-1]): for i, bar in enumerate(bars[::-1]):
data = { data = {
"index": i, 'index': i,
"time": bar[0], 'time': bar[0],
"open": bar[1], 'open': bar[1],
"close": bar[2], 'close': bar[2],
"high": bar[3], 'high': bar[3],
"low": bar[4], 'low': bar[4],
"volume": bar[5], 'volume': bar[5],
"amount": bar[6], 'amount': bar[6],
"bar_wap": 0.0, 'bar_wap': 0.0,
} }
row = [] row = []
for _, (field_name, field_type) in enumerate(_ohlc_dtype): for _, (field_name, field_type) in enumerate(_ohlc_dtype):
value = data[field_name] value = data[field_name]
match field_name: match field_name:
case "index": case 'index':
row.append(int(value)) row.append(int(value))
case "time": case 'time':
# row.append(int(value) + (3600 * 4)) # row.append(int(value) + (3600 * 4))
row.append(value) row.append(value)
case _: case _:
@ -408,33 +403,28 @@ def kucoin_timestamp(dt: datetime):
return math.trunc(time.mktime(dt.timetuple())) return math.trunc(time.mktime(dt.timetuple()))
def fqsn_to_kucoin_sym( def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str:
fqsn: str,
pairs: dict[str, KucoinMktPair]
) -> str:
pair_data = pairs[fqsn] pair_data = pairs[fqsn]
return pair_data.baseCurrency + "-" + pair_data.quoteCurrency return pair_data.baseCurrency + '-' + pair_data.quoteCurrency
def kucoin_sym_to_fqsn(sym: str) -> str: def kucoin_sym_to_fqsn(sym: str) -> str:
return sym.lower().replace("-", "") return sym.lower().replace('-', '')
@ acm @acm
async def get_client() -> AsyncGenerator[Client, None]: async def get_client() -> AsyncGenerator[Client, None]:
client = Client() client = Client()
await client.cache_pairs() await client.cache_pairs()
yield client yield client
@ tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
async with open_cached_client("kucoin") as client: async with open_cached_client('kucoin') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
await client.cache_pairs() await client.cache_pairs()
await ctx.started() await ctx.started()
@ -442,7 +432,7 @@ async def open_symbol_search(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for pattern in stream: async for pattern in stream:
await stream.send(await client.search_symbols(pattern)) await stream.send(await client.search_symbols(pattern))
log.info("Kucoin symbol search opened") log.info('Kucoin symbol search opened')
async def stream_quotes( async def stream_quotes(
@ -460,11 +450,10 @@ async def stream_quotes(
''' '''
connect_id = str(uuid4()) connect_id = str(uuid4())
async with open_cached_client("kucoin") as client: async with open_cached_client('kucoin') as client:
log.info("Starting up quote stream") log.info('Starting up quote stream')
# loop through symbols and sub to feedz # loop through symbols and sub to feedz
for sym in symbols: for sym in symbols:
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
pairs = await client.cache_pairs() pairs = await client.cache_pairs()
kucoin_sym = pairs[sym].symbol kucoin_sym = pairs[sym].symbol
@ -473,19 +462,18 @@ async def stream_quotes(
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
sym: { sym: {
"symbol_info": { 'symbol_info': {
"asset_type": "crypto", 'asset_type': 'crypto',
"price_tick_size": 0.0005, 'price_tick_size': 0.0005,
"lot_tick_size": 0.1, 'lot_tick_size': 0.1,
}, },
"shm_write_opts": {"sum_tick_vml": False}, 'shm_write_opts': {'sum_tick_vml': False},
"fqsn": sym, 'fqsn': sym,
}, },
} }
@acm @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
@acm @acm
async def open_ping_task(ws: wsproto.WSConnection): async def open_ping_task(ws: wsproto.WSConnection):
''' '''
@ -499,7 +487,7 @@ async def stream_quotes(
async def ping_server(): async def ping_server():
while True: while True:
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({"id": connect_id, "type": "ping"}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
n.start_soon(ping_server) n.start_soon(ping_server)
@ -523,22 +511,22 @@ async def stream_quotes(
log.info(f'Unsubscribing to {kucoin_sym} feed') log.info(f'Unsubscribing to {kucoin_sym} feed')
await ws.send_msg( await ws.send_msg(
{ {
"id": connect_id, 'id': connect_id,
"type": "unsubscribe", 'type': 'unsubscribe',
"topic": f"/market/ticker:{sym}", 'topic': f'/market/ticker:{sym}',
"privateChannel": False, 'privateChannel': False,
"response": True, 'response': True,
} }
) )
async with open_autorecon_ws( async with open_autorecon_ws(
f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]", f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]',
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws:
msg_gen = stream_messages(ws, sym) msg_gen = stream_messages(ws, sym)
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
while typ != "trade": while typ != 'trade':
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
@ -553,29 +541,25 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
match level: match level:
case 'l1': case 'l1':
return { return {
"id": connect_id, 'id': connect_id,
"type": "subscribe", 'type': 'subscribe',
"topic": f"/spotMarket/level2Depth5:{sym}", 'topic': f'/spotMarket/level2Depth5:{sym}',
"privateChannel": False, 'privateChannel': False,
"response": True, 'response': True,
} }
case 'l3': case 'l3':
return { return {
"id": connect_id, 'id': connect_id,
"type": "subscribe", 'type': 'subscribe',
"topic": f"/market/ticker:{sym}", 'topic': f'/market/ticker:{sym}',
"privateChannel": False, 'privateChannel': False,
"response": True, 'response': True,
} }
case _: case _:
return {} return {}
async def stream_messages( async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
ws: NoBsWs,
sym: str
) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
while True: while True:
@ -584,67 +568,77 @@ async def stream_messages(
if cs.cancelled_caught: if cs.cancelled_caught:
timeouts += 1 timeouts += 1
if timeouts > 2: if timeouts > 2:
log.error("kucoin feed is sh**ing the bed... rebooting...") log.error('kucoin feed is sh**ing the bed... rebooting...')
await ws._connect() await ws._connect()
continue continue
if msg.get("subject") != None: if msg.get('subject') != None:
msg = KucoinMsg(**msg) msg = KucoinMsg(**msg)
match msg.subject: match msg.subject:
case "trade.ticker": case 'trade.ticker':
trade_data = KucoinTrade(**msg.data) trade_data = KucoinTrade(**msg.data)
yield "trade", { yield 'trade', {
"symbol": sym, 'symbol': sym,
"last": trade_data.price, 'last': trade_data.price,
"brokerd_ts": trade_data.time, 'brokerd_ts': trade_data.time,
"ticks": [ 'ticks': [
{ {
"type": "trade", 'type': 'trade',
"price": float(trade_data.price), 'price': float(trade_data.price),
"size": float(trade_data.size), 'size': float(trade_data.size),
"broker_ts": trade_data.time, 'broker_ts': trade_data.time,
} }
], ],
} }
case "level2": case 'level2':
l2_data = KucoinL2(**msg.data) l2_data = KucoinL2(**msg.data)
first_ask = l2_data.asks[0]
ticks = [] first_bid = l2_data.bids[0]
for trade in l2_data.bids:
tick = {'type': 'bid', 'price': float(trade[0]), 'size': float(trade[1])}
ticks.append(tick)
for trade in l2_data.asks:
tick = {'type': 'ask', 'price': float(trade[0]), 'size': float(trade[1])}
ticks.append(tick)
yield 'l1', { yield 'l1', {
'symbol': sym, 'symbol': sym,
'ticks': ticks, 'ticks': [
{
'type': 'bid',
'price': float(first_bid[0]),
'size': float(first_bid[1]),
},
{
'type': 'bsize',
'price': float(first_bid[0]),
'size': float(first_bid[1]),
},
{
'type': 'ask',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
{
'type': 'asize',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
],
} }
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, symbol: str,
type: str = "1m", type: str = '1m',
) -> AsyncGenerator[Callable, None]: ) -> AsyncGenerator[Callable, None]:
async with open_cached_client("kucoin") as client: async with open_cached_client('kucoin') as client:
log.info("Attempting to open kucoin history client") log.info('Attempting to open kucoin history client')
async def get_ohlc_history( async def get_ohlc_history(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end ) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end
if timeframe != 60: if timeframe != 60:
raise DataUnavailable("Only 1m bars are supported") raise DataUnavailable('Only 1m bars are supported')
array = await client._get_bars( array = await client._get_bars(
symbol, symbol,
@ -652,13 +646,12 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
times = array["time"] times = array['time']
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
print( print(
f"difference in time between load and processing {inow - times[-1]}" f'difference in time between load and processing {inow - times[-1]}'
) )
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60: