From 480b8c591a3b29cf9c342ab545eebfb0155fdc3c Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Mon, 27 Mar 2023 21:28:11 -0400 Subject: [PATCH] Add L1 data feed and correct history issue --- piker/brokers/kucoin.py | 138 ++++++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 46 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 2fbfcbc5..901b24c9 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -1,4 +1,3 @@ -# piker: trading gear for hackers # Copyright (C) Jared Goldman (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify @@ -127,19 +126,29 @@ class KucoinTrade(Struct, frozen=True): time: float +class KucoinL2(Struct, frozen=True): + ''' + Real-time L2 order book format + + ''' + asks: list[list[float]] + bids: list[list[float]] + timestamp: float + + +class KucoinMsg(Struct, frozen=True): + type: str + topic: str + subject: str + data: list[KucoinTrade | KucoinL2] + + class BrokerConfig(Struct, frozen=True): key_id: str key_secret: str key_passphrase: str -class KucoinTradeMsg(Struct, frozen=True): - type: str - topic: str - subject: str - data: list[KucoinTrade] - - def get_config() -> BrokerConfig | None: conf, path = config.load() @@ -182,6 +191,7 @@ class Client: https://docs.kucoin.com/#authentication ''' + breakpoint() now = int(time.time() * 1000) path = f"/api/{api_v}{endpoint}" str_to_sign = str(now) + action + path @@ -327,22 +337,22 @@ class Client: ''' # Generate generic end and start time if values not passed + # Currently gives us 12hrs of data if end_dt is None: end_dt = pendulum.now("UTC").add(minutes=1) if start_dt is None: start_dt = end_dt.start_of("minute").subtract(minutes=limit) - # Format datetime to unix timestamp - start_dt = math.trunc(time.mktime(start_dt.timetuple())) - end_dt = math.trunc(time.mktime(end_dt.timetuple())) + start_dt = int(start_dt.timestamp()) + end_dt = int(end_dt.timestamp()) + kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" bars = [] for i in range(10): - data = await self._request( "GET", url, @@ -375,7 +385,7 @@ class Client: } row = [] - for j, (field_name, field_type) in enumerate(_ohlc_dtype): + for _, (field_name, field_type) in enumerate(_ohlc_dtype): value = data[field_name] @@ -383,16 +393,21 @@ class Client: case "index": row.append(int(value)) case "time": + # row.append(int(value) + (3600 * 4)) row.append(value) case _: row.append(float(value)) new_bars.append(tuple(row)) - self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array +def kucoin_timestamp(dt: datetime): + return math.trunc(time.mktime(dt.timetuple())) + + def fqsn_to_kucoin_sym( fqsn: str, pairs: dict[str, KucoinMktPair] @@ -474,8 +489,8 @@ async def stream_quotes( @acm async def open_ping_task(ws: wsproto.WSConnection): ''' - Spawn a non-blocking task that pings the ws - server every ping_interval so Kucoin doesn't drop + Spawn a non-blocking task that pings the ws + server every ping_interval so Kucoin doesn't drop our connection ''' @@ -496,7 +511,9 @@ async def stream_quotes( async with open_ping_task(ws) as ws: # subscribe to market feedz here log.info(f'Subscribing to {kucoin_sym} feed') - l1_sub = make_sub(kucoin_sym, connect_id) + trade_sub = make_sub(kucoin_sym, connect_id, level='l3') + l1_sub = make_sub(kucoin_sym, connect_id, level='l1') + await ws.send_msg(trade_sub) await ws.send_msg(l1_sub) yield @@ -520,7 +537,7 @@ async def stream_quotes( ) as ws: msg_gen = stream_messages(ws, sym) typ, quote = await msg_gen.__anext__() - # + while typ != "trade": # TODO: use ``anext()`` when it lands in 3.10! typ, quote = await msg_gen.__anext__() @@ -532,14 +549,26 @@ async def stream_quotes( await send_chan.send({sym: msg}) -def make_sub(sym, connect_id) -> dict[str, str | bool]: - return { - "id": connect_id, - "type": "subscribe", - "topic": f"/market/ticker:{sym}", - "privateChannel": False, - "response": True, - } +def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]: + match level: + case 'l1': + return { + "id": connect_id, + "type": "subscribe", + "topic": f"/spotMarket/level2Depth5:{sym}", + "privateChannel": False, + "response": True, + } + case 'l3': + return { + "id": connect_id, + "type": "subscribe", + "topic": f"/market/ticker:{sym}", + "privateChannel": False, + "response": True, + } + case _: + return {} async def stream_messages( @@ -560,28 +589,43 @@ async def stream_messages( continue - if "subject" in msg and msg["subject"] == "trade.ticker": + if msg.get("subject") != None: - trade_msg = KucoinTradeMsg(**msg) - trade_data = KucoinTrade(**trade_msg.data) + msg = KucoinMsg(**msg) - yield "trade", { - "symbol": sym, - "last": trade_data.price, - "brokerd_ts": trade_data.time, - "ticks": [ - { - "type": "trade", - "price": float(trade_data.price), - "size": float(trade_data.size), - "broker_ts": trade_data.time, + match msg.subject: + case "trade.ticker": + + trade_data = KucoinTrade(**msg.data) + yield "trade", { + "symbol": sym, + "last": trade_data.price, + "brokerd_ts": trade_data.time, + "ticks": [ + { + "type": "trade", + "price": float(trade_data.price), + "size": float(trade_data.size), + "broker_ts": trade_data.time, + } + ], } - ], - } - else: - continue + case "level2": + l2_data = KucoinL2(**msg.data) + + ticks = [] + for trade in l2_data.bids: + tick = {'type': 'bid', 'price': float(trade[0]), 'size': float(trade[1])} + ticks.append(tick) + for trade in l2_data.asks: + tick = {'type': 'ask', 'price': float(trade[0]), 'size': float(trade[1])} + ticks.append(tick) + yield 'l1', { + 'symbol': sym, + 'ticks': ticks, + } @acm async def open_history_client( @@ -609,7 +653,6 @@ async def open_history_client( ) times = array["time"] - if end_dt is None: inow = round(time.time()) @@ -620,10 +663,13 @@ async def open_history_client( if (inow - times[-1]) > 60: await tractor.breakpoint() - start_dt = pendulum.from_timestamp(times[0]) end_dt = pendulum.from_timestamp(times[-1]) log.info('History succesfully fetched baby') + # breakpoint() + # print(f'OUTPUTTED END TIME: {time.ctime(kucoin_timestamp(end_dt))}') + # print(f'OUTPUTTED START TIME: {time.ctime(kucoin_timestamp(start_dt))}') + # print(f'DIFFERENCE IN MINUTES {(end_dt - start_dt).in_minutes()}') return array, start_dt, end_dt - yield get_ohlc_history, {"erlangs": 3, "rate": 3} + yield get_ohlc_history, {}