From 57587c9d2ddc13d57cac1e8b0d64df6a1312cd81 Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Sun, 19 Mar 2023 14:22:56 -0400 Subject: [PATCH] Cast/validate streamed messages Update comments Minor formatting Minor formatting --- piker/brokers/kucoin.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index d0fca3be..63502099 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -418,7 +418,7 @@ async def stream_quotes( async with open_cached_client("kucoin") as client: - # map through symbols and sub to feedz + # loop through symbols and sub to feedz for sym in symbols: token, ping_interval = await client._get_ws_token() @@ -445,8 +445,9 @@ async def stream_quotes( @acm async def open_ping_task(ws: wsproto.WSConnection): ''' - Ping ws server every ping_interval - so Kucoin doesn't drop our connection + Spawn a non-blocking task that pings the ws + server every ping_interval so Kucoin doesn't drop + our connection ''' async with trio.open_nursery() as n: @@ -468,7 +469,7 @@ async def stream_quotes( l1_sub = make_sub(kucoin_sym, connect_id) await ws.send_msg(l1_sub) - yield + yieldhttps: // github.com/pikers/piker/pull/494 # unsub if ws.connected(): @@ -529,18 +530,20 @@ async def stream_messages( continue if "subject" in msg and msg["subject"] == "trade.ticker": - # TODO: cast msg into class - trade_data = msg["data"] + + trade_msg = KucoinTradeMsg(**msg) + trade_data = KucoinTrade(**trade_msg.data) + yield "trade", { "symbol": sym, - "last": trade_data["price"], - "brokerd_ts": trade_data["time"], + "last": trade_data.price, + "brokerd_ts": trade_data.time, "ticks": [ { "type": "trade", - "price": float(trade_data["price"]), - "size": float(trade_data["size"]), - "broker_ts": trade_data["time"], + "price": float(trade_data.price), + "size": float(trade_data.size), + "broker_ts": trade_data.time, } ], } @@ -555,12 +558,15 @@ async def open_history_client( type: str = "1m", ) -> AsyncGenerator[Callable, None]: async with open_cached_client("kucoin") as client: - # call bars on kucoin + async def get_ohlc_history( + timeframe: float, end_dt: datetime | None = None, start_dt: datetime | None = None, - ) -> tuple[np.ndarray, datetime | None, datetime | None,]: # start # end + + ) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end + if timeframe != 60: raise DataUnavailable("Only 1m bars are supported") @@ -573,10 +579,13 @@ async def open_history_client( times = array["time"] if end_dt is None: + inow = round(time.time()) + print( f"difference in time between load and processing {inow - times[-1]}" ) + if (inow - times[-1]) > 60: await tractor.breakpoint()