Add L1 data feed and correct history issue
parent
dfd030a6aa
commit
52aadb374b
|
@ -1,4 +1,3 @@
|
||||||
# piker: trading gear for hackers
|
|
||||||
# Copyright (C) Jared Goldman (in stewardship for pikers)
|
# Copyright (C) Jared Goldman (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
@ -127,19 +126,29 @@ class KucoinTrade(Struct, frozen=True):
|
||||||
time: float
|
time: float
|
||||||
|
|
||||||
|
|
||||||
|
class KucoinL2(Struct, frozen=True):
|
||||||
|
'''
|
||||||
|
Real-time L2 order book format
|
||||||
|
|
||||||
|
'''
|
||||||
|
asks: list[list[float]]
|
||||||
|
bids: list[list[float]]
|
||||||
|
timestamp: float
|
||||||
|
|
||||||
|
|
||||||
|
class KucoinMsg(Struct, frozen=True):
|
||||||
|
type: str
|
||||||
|
topic: str
|
||||||
|
subject: str
|
||||||
|
data: list[KucoinTrade | KucoinL2]
|
||||||
|
|
||||||
|
|
||||||
class BrokerConfig(Struct, frozen=True):
|
class BrokerConfig(Struct, frozen=True):
|
||||||
key_id: str
|
key_id: str
|
||||||
key_secret: str
|
key_secret: str
|
||||||
key_passphrase: str
|
key_passphrase: str
|
||||||
|
|
||||||
|
|
||||||
class KucoinTradeMsg(Struct, frozen=True):
|
|
||||||
type: str
|
|
||||||
topic: str
|
|
||||||
subject: str
|
|
||||||
data: list[KucoinTrade]
|
|
||||||
|
|
||||||
|
|
||||||
def get_config() -> BrokerConfig | None:
|
def get_config() -> BrokerConfig | None:
|
||||||
conf, path = config.load()
|
conf, path = config.load()
|
||||||
|
|
||||||
|
@ -182,6 +191,7 @@ class Client:
|
||||||
https://docs.kucoin.com/#authentication
|
https://docs.kucoin.com/#authentication
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
breakpoint()
|
||||||
now = int(time.time() * 1000)
|
now = int(time.time() * 1000)
|
||||||
path = f"/api/{api_v}{endpoint}"
|
path = f"/api/{api_v}{endpoint}"
|
||||||
str_to_sign = str(now) + action + path
|
str_to_sign = str(now) + action + path
|
||||||
|
@ -327,22 +337,22 @@ class Client:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# Generate generic end and start time if values not passed
|
# Generate generic end and start time if values not passed
|
||||||
|
# Currently gives us 12hrs of data
|
||||||
if end_dt is None:
|
if end_dt is None:
|
||||||
end_dt = pendulum.now("UTC").add(minutes=1)
|
end_dt = pendulum.now("UTC").add(minutes=1)
|
||||||
|
|
||||||
if start_dt is None:
|
if start_dt is None:
|
||||||
start_dt = end_dt.start_of("minute").subtract(minutes=limit)
|
start_dt = end_dt.start_of("minute").subtract(minutes=limit)
|
||||||
|
|
||||||
# Format datetime to unix timestamp
|
start_dt = int(start_dt.timestamp())
|
||||||
start_dt = math.trunc(time.mktime(start_dt.timetuple()))
|
end_dt = int(end_dt.timestamp())
|
||||||
end_dt = math.trunc(time.mktime(end_dt.timetuple()))
|
|
||||||
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
|
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
|
||||||
|
|
||||||
url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}"
|
url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}"
|
||||||
bars = []
|
bars = []
|
||||||
|
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
|
|
||||||
data = await self._request(
|
data = await self._request(
|
||||||
"GET",
|
"GET",
|
||||||
url,
|
url,
|
||||||
|
@ -375,7 +385,7 @@ class Client:
|
||||||
}
|
}
|
||||||
|
|
||||||
row = []
|
row = []
|
||||||
for j, (field_name, field_type) in enumerate(_ohlc_dtype):
|
for _, (field_name, field_type) in enumerate(_ohlc_dtype):
|
||||||
|
|
||||||
value = data[field_name]
|
value = data[field_name]
|
||||||
|
|
||||||
|
@ -383,16 +393,21 @@ class Client:
|
||||||
case "index":
|
case "index":
|
||||||
row.append(int(value))
|
row.append(int(value))
|
||||||
case "time":
|
case "time":
|
||||||
|
# row.append(int(value) + (3600 * 4))
|
||||||
row.append(value)
|
row.append(value)
|
||||||
case _:
|
case _:
|
||||||
row.append(float(value))
|
row.append(float(value))
|
||||||
|
|
||||||
new_bars.append(tuple(row))
|
new_bars.append(tuple(row))
|
||||||
|
|
||||||
self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
||||||
return array
|
return array
|
||||||
|
|
||||||
|
|
||||||
|
def kucoin_timestamp(dt: datetime):
|
||||||
|
return math.trunc(time.mktime(dt.timetuple()))
|
||||||
|
|
||||||
|
|
||||||
def fqsn_to_kucoin_sym(
|
def fqsn_to_kucoin_sym(
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
pairs: dict[str, KucoinMktPair]
|
pairs: dict[str, KucoinMktPair]
|
||||||
|
@ -496,7 +511,9 @@ async def stream_quotes(
|
||||||
async with open_ping_task(ws) as ws:
|
async with open_ping_task(ws) as ws:
|
||||||
# subscribe to market feedz here
|
# subscribe to market feedz here
|
||||||
log.info(f'Subscribing to {kucoin_sym} feed')
|
log.info(f'Subscribing to {kucoin_sym} feed')
|
||||||
l1_sub = make_sub(kucoin_sym, connect_id)
|
trade_sub = make_sub(kucoin_sym, connect_id, level='l3')
|
||||||
|
l1_sub = make_sub(kucoin_sym, connect_id, level='l1')
|
||||||
|
await ws.send_msg(trade_sub)
|
||||||
await ws.send_msg(l1_sub)
|
await ws.send_msg(l1_sub)
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
@ -520,7 +537,7 @@ async def stream_quotes(
|
||||||
) as ws:
|
) as ws:
|
||||||
msg_gen = stream_messages(ws, sym)
|
msg_gen = stream_messages(ws, sym)
|
||||||
typ, quote = await msg_gen.__anext__()
|
typ, quote = await msg_gen.__anext__()
|
||||||
#
|
|
||||||
while typ != "trade":
|
while typ != "trade":
|
||||||
# TODO: use ``anext()`` when it lands in 3.10!
|
# TODO: use ``anext()`` when it lands in 3.10!
|
||||||
typ, quote = await msg_gen.__anext__()
|
typ, quote = await msg_gen.__anext__()
|
||||||
|
@ -532,14 +549,26 @@ async def stream_quotes(
|
||||||
await send_chan.send({sym: msg})
|
await send_chan.send({sym: msg})
|
||||||
|
|
||||||
|
|
||||||
def make_sub(sym, connect_id) -> dict[str, str | bool]:
|
def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
|
||||||
return {
|
match level:
|
||||||
"id": connect_id,
|
case 'l1':
|
||||||
"type": "subscribe",
|
return {
|
||||||
"topic": f"/market/ticker:{sym}",
|
"id": connect_id,
|
||||||
"privateChannel": False,
|
"type": "subscribe",
|
||||||
"response": True,
|
"topic": f"/spotMarket/level2Depth5:{sym}",
|
||||||
}
|
"privateChannel": False,
|
||||||
|
"response": True,
|
||||||
|
}
|
||||||
|
case 'l3':
|
||||||
|
return {
|
||||||
|
"id": connect_id,
|
||||||
|
"type": "subscribe",
|
||||||
|
"topic": f"/market/ticker:{sym}",
|
||||||
|
"privateChannel": False,
|
||||||
|
"response": True,
|
||||||
|
}
|
||||||
|
case _:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
async def stream_messages(
|
async def stream_messages(
|
||||||
|
@ -560,28 +589,43 @@ async def stream_messages(
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if "subject" in msg and msg["subject"] == "trade.ticker":
|
if msg.get("subject") != None:
|
||||||
|
|
||||||
trade_msg = KucoinTradeMsg(**msg)
|
msg = KucoinMsg(**msg)
|
||||||
trade_data = KucoinTrade(**trade_msg.data)
|
|
||||||
|
|
||||||
yield "trade", {
|
match msg.subject:
|
||||||
"symbol": sym,
|
case "trade.ticker":
|
||||||
"last": trade_data.price,
|
|
||||||
"brokerd_ts": trade_data.time,
|
trade_data = KucoinTrade(**msg.data)
|
||||||
"ticks": [
|
yield "trade", {
|
||||||
{
|
"symbol": sym,
|
||||||
"type": "trade",
|
"last": trade_data.price,
|
||||||
"price": float(trade_data.price),
|
"brokerd_ts": trade_data.time,
|
||||||
"size": float(trade_data.size),
|
"ticks": [
|
||||||
"broker_ts": trade_data.time,
|
{
|
||||||
|
"type": "trade",
|
||||||
|
"price": float(trade_data.price),
|
||||||
|
"size": float(trade_data.size),
|
||||||
|
"broker_ts": trade_data.time,
|
||||||
|
}
|
||||||
|
],
|
||||||
}
|
}
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
else:
|
case "level2":
|
||||||
continue
|
|
||||||
|
|
||||||
|
l2_data = KucoinL2(**msg.data)
|
||||||
|
|
||||||
|
ticks = []
|
||||||
|
for trade in l2_data.bids:
|
||||||
|
tick = {'type': 'bid', 'price': float(trade[0]), 'size': float(trade[1])}
|
||||||
|
ticks.append(tick)
|
||||||
|
for trade in l2_data.asks:
|
||||||
|
tick = {'type': 'ask', 'price': float(trade[0]), 'size': float(trade[1])}
|
||||||
|
ticks.append(tick)
|
||||||
|
yield 'l1', {
|
||||||
|
'symbol': sym,
|
||||||
|
'ticks': ticks,
|
||||||
|
}
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_history_client(
|
async def open_history_client(
|
||||||
|
@ -609,7 +653,6 @@ async def open_history_client(
|
||||||
)
|
)
|
||||||
|
|
||||||
times = array["time"]
|
times = array["time"]
|
||||||
|
|
||||||
if end_dt is None:
|
if end_dt is None:
|
||||||
|
|
||||||
inow = round(time.time())
|
inow = round(time.time())
|
||||||
|
@ -620,10 +663,13 @@ async def open_history_client(
|
||||||
|
|
||||||
if (inow - times[-1]) > 60:
|
if (inow - times[-1]) > 60:
|
||||||
await tractor.breakpoint()
|
await tractor.breakpoint()
|
||||||
|
|
||||||
start_dt = pendulum.from_timestamp(times[0])
|
start_dt = pendulum.from_timestamp(times[0])
|
||||||
end_dt = pendulum.from_timestamp(times[-1])
|
end_dt = pendulum.from_timestamp(times[-1])
|
||||||
log.info('History succesfully fetched baby')
|
log.info('History succesfully fetched baby')
|
||||||
|
# breakpoint()
|
||||||
|
# print(f'OUTPUTTED END TIME: {time.ctime(kucoin_timestamp(end_dt))}')
|
||||||
|
# print(f'OUTPUTTED START TIME: {time.ctime(kucoin_timestamp(start_dt))}')
|
||||||
|
# print(f'DIFFERENCE IN MINUTES {(end_dt - start_dt).in_minutes()}')
|
||||||
return array, start_dt, end_dt
|
return array, start_dt, end_dt
|
||||||
|
|
||||||
yield get_ohlc_history, {"erlangs": 3, "rate": 3}
|
yield get_ohlc_history, {}
|
||||||
|
|
Loading…
Reference in New Issue