Implement working message streaming

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-15 20:03:16 -04:00
parent 9292e98d00
commit 2d3c351ca4
1 changed files with 53 additions and 37 deletions

View File

@ -1,4 +1,3 @@
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
@ -152,7 +151,6 @@ class Client:
async def _get_ws_token(self, private: bool = False) -> str | None:
token_type = "private" if private else "public"
data = await self._request("POST", f"/bullet-{token_type}", "v1")
breakpoint()
if "token" in data:
return data["token"]
else:
@ -308,15 +306,13 @@ async def stream_quotes(
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
):
# TODO: Add multi-symbol functionality here
sym = symbols[0]
connect_id = 0
connect_id = str(uuid4())
async with open_cached_client("kucoin") as client:
pairs = await client.cache_pairs()
kucoin_sym = pairs[sym]['symbol']
kucoin_sym = pairs[sym]["symbol"]
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
@ -335,40 +331,47 @@ async def stream_quotes(
@acm
async def subscribe(ws: wsproto.WSConnection):
await ws.send_msg({
"id": connect_id,
"type": "ping"
})
res = await ws.recv_msg()
breakpoint()
yield
# l1_sub = make_sub(kucoin_sym, sub_id)
# await ws.send_msg(l1_sub)
# await ws.send_msg({"id": connect_id, "type": "ping"})
# res = await ws.recv_msg()
l1_sub = make_sub(kucoin_sym, connect_id)
await ws.send_msg(l1_sub)
res = await ws.recv_msg()
# breakpoint()
# assert res['id'] == connect_id
#
# yield
#
# # unsub
# ws.send_msg({
# "id": sub_id,
# "type": "unsubscribe",
# "topic": f"/market/ticker:{sym}",
# "privateChannel": False,
# "response": True,
# })
# assert res["id"] == connect_id
yield
# unsub
await ws.send_msg(
{
"id": connect_id,
"type": "unsubscribe",
"topic": f"/market/ticker:{sym}",
"privateChannel": False,
"response": True,
}
)
token = await client._get_ws_token()
breakpoint()
async with open_autorecon_ws(
f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={connect_id}]",
f"wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]",
fixture=subscribe,
) as ws:
msg_gen = stream_messages(ws)
msg_gen = stream_messages(ws, sym)
typ, quote = await msg_gen.__anext__()
#
while typ != "trade":
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()
task_status.started((init_msgs, quote))
feed_is_live.set()
async for typ, msg in msg_gen:
await send_chan.send({sym: msg})
def make_sub(sym, connect_id):
breakpoint()
return {
"id": connect_id,
"type": "subscribe",
@ -378,25 +381,38 @@ def make_sub(sym, connect_id):
}
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0
while True:
with trio.move_on_after(3) as cs:
msg = await ws.recv_msg()
if cs.cancelled_caught:
timeouts += 1
if timeouts > 2:
log.error("kucoin feed is sh**ing the bed... rebooting...")
await ws._connect()
continue
if "subject" in msg and msg["subject"] == "trade.ticker":
# TODO: cast msg into class
trade_data = msg["data"]
yield "trade", {
"symbol": sym,
"last": trade_data["price"],
"brokerd_ts": trade_data["time"],
"ticks": [
{
"type": "trade",
"price": float(trade_data["price"]),
"size": float(trade_data["size"]),
"broker_ts": trade_data["time"],
}
],
}
else:
continue
breakpoint()
@acm
async def open_history_client(