Cast/validate streamed messages
Update comments Minor formatting Minor formattingsmall_kucoin_fixes
parent
6ad1e3da38
commit
5ff0cc7905
|
@ -418,7 +418,7 @@ async def stream_quotes(
|
||||||
|
|
||||||
async with open_cached_client("kucoin") as client:
|
async with open_cached_client("kucoin") as client:
|
||||||
|
|
||||||
# map through symbols and sub to feedz
|
# loop through symbols and sub to feedz
|
||||||
for sym in symbols:
|
for sym in symbols:
|
||||||
|
|
||||||
token, ping_interval = await client._get_ws_token()
|
token, ping_interval = await client._get_ws_token()
|
||||||
|
@ -445,8 +445,9 @@ async def stream_quotes(
|
||||||
@acm
|
@acm
|
||||||
async def open_ping_task(ws: wsproto.WSConnection):
|
async def open_ping_task(ws: wsproto.WSConnection):
|
||||||
'''
|
'''
|
||||||
Ping ws server every ping_interval
|
Spawn a non-blocking task that pings the ws
|
||||||
so Kucoin doesn't drop our connection
|
server every ping_interval so Kucoin doesn't drop
|
||||||
|
our connection
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
|
@ -468,7 +469,7 @@ async def stream_quotes(
|
||||||
l1_sub = make_sub(kucoin_sym, connect_id)
|
l1_sub = make_sub(kucoin_sym, connect_id)
|
||||||
await ws.send_msg(l1_sub)
|
await ws.send_msg(l1_sub)
|
||||||
|
|
||||||
yield
|
yieldhttps: // github.com/pikers/piker/pull/494
|
||||||
|
|
||||||
# unsub
|
# unsub
|
||||||
if ws.connected():
|
if ws.connected():
|
||||||
|
@ -529,18 +530,20 @@ async def stream_messages(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if "subject" in msg and msg["subject"] == "trade.ticker":
|
if "subject" in msg and msg["subject"] == "trade.ticker":
|
||||||
# TODO: cast msg into class
|
|
||||||
trade_data = msg["data"]
|
trade_msg = KucoinTradeMsg(**msg)
|
||||||
|
trade_data = KucoinTrade(**trade_msg.data)
|
||||||
|
|
||||||
yield "trade", {
|
yield "trade", {
|
||||||
"symbol": sym,
|
"symbol": sym,
|
||||||
"last": trade_data["price"],
|
"last": trade_data.price,
|
||||||
"brokerd_ts": trade_data["time"],
|
"brokerd_ts": trade_data.time,
|
||||||
"ticks": [
|
"ticks": [
|
||||||
{
|
{
|
||||||
"type": "trade",
|
"type": "trade",
|
||||||
"price": float(trade_data["price"]),
|
"price": float(trade_data.price),
|
||||||
"size": float(trade_data["size"]),
|
"size": float(trade_data.size),
|
||||||
"broker_ts": trade_data["time"],
|
"broker_ts": trade_data.time,
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
@ -555,12 +558,15 @@ async def open_history_client(
|
||||||
type: str = "1m",
|
type: str = "1m",
|
||||||
) -> AsyncGenerator[Callable, None]:
|
) -> AsyncGenerator[Callable, None]:
|
||||||
async with open_cached_client("kucoin") as client:
|
async with open_cached_client("kucoin") as client:
|
||||||
# call bars on kucoin
|
|
||||||
async def get_ohlc_history(
|
async def get_ohlc_history(
|
||||||
|
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
end_dt: datetime | None = None,
|
end_dt: datetime | None = None,
|
||||||
start_dt: datetime | None = None,
|
start_dt: datetime | None = None,
|
||||||
) -> tuple[np.ndarray, datetime | None, datetime | None,]: # start # end
|
|
||||||
|
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end
|
||||||
|
|
||||||
if timeframe != 60:
|
if timeframe != 60:
|
||||||
raise DataUnavailable("Only 1m bars are supported")
|
raise DataUnavailable("Only 1m bars are supported")
|
||||||
|
|
||||||
|
@ -573,10 +579,13 @@ async def open_history_client(
|
||||||
times = array["time"]
|
times = array["time"]
|
||||||
|
|
||||||
if end_dt is None:
|
if end_dt is None:
|
||||||
|
|
||||||
inow = round(time.time())
|
inow = round(time.time())
|
||||||
|
|
||||||
print(
|
print(
|
||||||
f"difference in time between load and processing {inow - times[-1]}"
|
f"difference in time between load and processing {inow - times[-1]}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if (inow - times[-1]) > 60:
|
if (inow - times[-1]) > 60:
|
||||||
await tractor.breakpoint()
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue