Add L1 data feed and correct history issue

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-27 21:28:11 -04:00
parent 79956abc5e
commit 480b8c591a
1 changed files with 92 additions and 46 deletions

View File

@ -1,4 +1,3 @@
# piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for pikers) # Copyright (C) Jared Goldman (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
@ -127,19 +126,29 @@ class KucoinTrade(Struct, frozen=True):
time: float time: float
class KucoinL2(Struct, frozen=True):
'''
Real-time L2 order book format
'''
asks: list[list[float]]
bids: list[list[float]]
timestamp: float
class KucoinMsg(Struct, frozen=True):
type: str
topic: str
subject: str
data: list[KucoinTrade | KucoinL2]
class BrokerConfig(Struct, frozen=True): class BrokerConfig(Struct, frozen=True):
key_id: str key_id: str
key_secret: str key_secret: str
key_passphrase: str key_passphrase: str
class KucoinTradeMsg(Struct, frozen=True):
type: str
topic: str
subject: str
data: list[KucoinTrade]
def get_config() -> BrokerConfig | None: def get_config() -> BrokerConfig | None:
conf, path = config.load() conf, path = config.load()
@ -182,6 +191,7 @@ class Client:
https://docs.kucoin.com/#authentication https://docs.kucoin.com/#authentication
''' '''
breakpoint()
now = int(time.time() * 1000) now = int(time.time() * 1000)
path = f"/api/{api_v}{endpoint}" path = f"/api/{api_v}{endpoint}"
str_to_sign = str(now) + action + path str_to_sign = str(now) + action + path
@ -327,22 +337,22 @@ class Client:
''' '''
# Generate generic end and start time if values not passed # Generate generic end and start time if values not passed
# Currently gives us 12hrs of data
if end_dt is None: if end_dt is None:
end_dt = pendulum.now("UTC").add(minutes=1) end_dt = pendulum.now("UTC").add(minutes=1)
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of("minute").subtract(minutes=limit) start_dt = end_dt.start_of("minute").subtract(minutes=limit)
# Format datetime to unix timestamp start_dt = int(start_dt.timestamp())
start_dt = math.trunc(time.mktime(start_dt.timetuple())) end_dt = int(end_dt.timestamp())
end_dt = math.trunc(time.mktime(end_dt.timetuple()))
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}"
bars = [] bars = []
for i in range(10): for i in range(10):
data = await self._request( data = await self._request(
"GET", "GET",
url, url,
@ -375,7 +385,7 @@ class Client:
} }
row = [] row = []
for j, (field_name, field_type) in enumerate(_ohlc_dtype): for _, (field_name, field_type) in enumerate(_ohlc_dtype):
value = data[field_name] value = data[field_name]
@ -383,16 +393,21 @@ class Client:
case "index": case "index":
row.append(int(value)) row.append(int(value))
case "time": case "time":
# row.append(int(value) + (3600 * 4))
row.append(value) row.append(value)
case _: case _:
row.append(float(value)) row.append(float(value))
new_bars.append(tuple(row)) new_bars.append(tuple(row))
self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array return array
def kucoin_timestamp(dt: datetime):
return math.trunc(time.mktime(dt.timetuple()))
def fqsn_to_kucoin_sym( def fqsn_to_kucoin_sym(
fqsn: str, fqsn: str,
pairs: dict[str, KucoinMktPair] pairs: dict[str, KucoinMktPair]
@ -496,7 +511,9 @@ async def stream_quotes(
async with open_ping_task(ws) as ws: async with open_ping_task(ws) as ws:
# subscribe to market feedz here # subscribe to market feedz here
log.info(f'Subscribing to {kucoin_sym} feed') log.info(f'Subscribing to {kucoin_sym} feed')
l1_sub = make_sub(kucoin_sym, connect_id) trade_sub = make_sub(kucoin_sym, connect_id, level='l3')
l1_sub = make_sub(kucoin_sym, connect_id, level='l1')
await ws.send_msg(trade_sub)
await ws.send_msg(l1_sub) await ws.send_msg(l1_sub)
yield yield
@ -520,7 +537,7 @@ async def stream_quotes(
) as ws: ) as ws:
msg_gen = stream_messages(ws, sym) msg_gen = stream_messages(ws, sym)
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
#
while typ != "trade": while typ != "trade":
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
@ -532,7 +549,17 @@ async def stream_quotes(
await send_chan.send({sym: msg}) await send_chan.send({sym: msg})
def make_sub(sym, connect_id) -> dict[str, str | bool]: def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
match level:
case 'l1':
return {
"id": connect_id,
"type": "subscribe",
"topic": f"/spotMarket/level2Depth5:{sym}",
"privateChannel": False,
"response": True,
}
case 'l3':
return { return {
"id": connect_id, "id": connect_id,
"type": "subscribe", "type": "subscribe",
@ -540,6 +567,8 @@ def make_sub(sym, connect_id) -> dict[str, str | bool]:
"privateChannel": False, "privateChannel": False,
"response": True, "response": True,
} }
case _:
return {}
async def stream_messages( async def stream_messages(
@ -560,11 +589,14 @@ async def stream_messages(
continue continue
if "subject" in msg and msg["subject"] == "trade.ticker": if msg.get("subject") != None:
trade_msg = KucoinTradeMsg(**msg) msg = KucoinMsg(**msg)
trade_data = KucoinTrade(**trade_msg.data)
match msg.subject:
case "trade.ticker":
trade_data = KucoinTrade(**msg.data)
yield "trade", { yield "trade", {
"symbol": sym, "symbol": sym,
"last": trade_data.price, "last": trade_data.price,
@ -579,9 +611,21 @@ async def stream_messages(
], ],
} }
else: case "level2":
continue
l2_data = KucoinL2(**msg.data)
ticks = []
for trade in l2_data.bids:
tick = {'type': 'bid', 'price': float(trade[0]), 'size': float(trade[1])}
ticks.append(tick)
for trade in l2_data.asks:
tick = {'type': 'ask', 'price': float(trade[0]), 'size': float(trade[1])}
ticks.append(tick)
yield 'l1', {
'symbol': sym,
'ticks': ticks,
}
@acm @acm
async def open_history_client( async def open_history_client(
@ -609,7 +653,6 @@ async def open_history_client(
) )
times = array["time"] times = array["time"]
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
@ -620,10 +663,13 @@ async def open_history_client(
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
await tractor.breakpoint() await tractor.breakpoint()
start_dt = pendulum.from_timestamp(times[0]) start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1]) end_dt = pendulum.from_timestamp(times[-1])
log.info('History succesfully fetched baby') log.info('History succesfully fetched baby')
# breakpoint()
# print(f'OUTPUTTED END TIME: {time.ctime(kucoin_timestamp(end_dt))}')
# print(f'OUTPUTTED START TIME: {time.ctime(kucoin_timestamp(start_dt))}')
# print(f'DIFFERENCE IN MINUTES {(end_dt - start_dt).in_minutes()}')
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc_history, {"erlangs": 3, "rate": 3} yield get_ohlc_history, {}