Cast/validate streamed messages

Update comments

Minor formatting

Minor formatting
emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-19 14:22:56 -04:00
parent 6e3b132239
commit 57587c9d2d
1 changed files with 22 additions and 13 deletions

View File

@ -418,7 +418,7 @@ async def stream_quotes(
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
# map through symbols and sub to feedz # loop through symbols and sub to feedz
for sym in symbols: for sym in symbols:
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
@ -445,8 +445,9 @@ async def stream_quotes(
@acm @acm
async def open_ping_task(ws: wsproto.WSConnection): async def open_ping_task(ws: wsproto.WSConnection):
''' '''
Ping ws server every ping_interval Spawn a non-blocking task that pings the ws
so Kucoin doesn't drop our connection server every ping_interval so Kucoin doesn't drop
our connection
''' '''
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
@ -468,7 +469,7 @@ async def stream_quotes(
l1_sub = make_sub(kucoin_sym, connect_id) l1_sub = make_sub(kucoin_sym, connect_id)
await ws.send_msg(l1_sub) await ws.send_msg(l1_sub)
yield yieldhttps: // github.com/pikers/piker/pull/494
# unsub # unsub
if ws.connected(): if ws.connected():
@ -529,18 +530,20 @@ async def stream_messages(
continue continue
if "subject" in msg and msg["subject"] == "trade.ticker": if "subject" in msg and msg["subject"] == "trade.ticker":
# TODO: cast msg into class
trade_data = msg["data"] trade_msg = KucoinTradeMsg(**msg)
trade_data = KucoinTrade(**trade_msg.data)
yield "trade", { yield "trade", {
"symbol": sym, "symbol": sym,
"last": trade_data["price"], "last": trade_data.price,
"brokerd_ts": trade_data["time"], "brokerd_ts": trade_data.time,
"ticks": [ "ticks": [
{ {
"type": "trade", "type": "trade",
"price": float(trade_data["price"]), "price": float(trade_data.price),
"size": float(trade_data["size"]), "size": float(trade_data.size),
"broker_ts": trade_data["time"], "broker_ts": trade_data.time,
} }
], ],
} }
@ -555,12 +558,15 @@ async def open_history_client(
type: str = "1m", type: str = "1m",
) -> AsyncGenerator[Callable, None]: ) -> AsyncGenerator[Callable, None]:
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
# call bars on kucoin
async def get_ohlc_history( async def get_ohlc_history(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[np.ndarray, datetime | None, datetime | None,]: # start # end
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end
if timeframe != 60: if timeframe != 60:
raise DataUnavailable("Only 1m bars are supported") raise DataUnavailable("Only 1m bars are supported")
@ -573,10 +579,13 @@ async def open_history_client(
times = array["time"] times = array["time"]
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
print( print(
f"difference in time between load and processing {inow - times[-1]}" f"difference in time between load and processing {inow - times[-1]}"
) )
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
await tractor.breakpoint() await tractor.breakpoint()