Add sub method to flow

Stash for checkout of master
emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-14 15:05:04 -04:00
parent e4a78eaeef
commit 9292e98d00
2 changed files with 67 additions and 15 deletions

View File

@ -1,5 +1,3 @@
# piker: trading gear for hackers
# Copyright (C) Jared Goldman (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -26,7 +24,8 @@ import math
import base64 import base64
import hmac import hmac
import hashlib import hashlib
# import wsproto
import wsproto
from uuid import uuid4 from uuid import uuid4
import asks import asks
@ -150,9 +149,10 @@ class Client:
print(f'KUCOIN ERROR: {res.json()["msg"]}') print(f'KUCOIN ERROR: {res.json()["msg"]}')
breakpoint() breakpoint()
async def _get_ws_token(self, private: bool) -> str: async def _get_ws_token(self, private: bool = False) -> str | None:
token_type = "private" if private else "public" token_type = "private" if private else "public"
data = await self._request("POST", f"/bullet-{token_type}", "v1") data = await self._request("POST", f"/bullet-{token_type}", "v1")
breakpoint()
if "token" in data: if "token" in data:
return data["token"] return data["token"]
else: else:
@ -308,15 +308,21 @@ async def stream_quotes(
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
): ):
# TODO: Add multi-symbol functionality here
sym = symbols[0] sym = symbols[0]
connect_id = 0
async with open_cached_client("kucoin") as client: async with open_cached_client("kucoin") as client:
pairs = await client.cache_pairs()
kucoin_sym = pairs[sym]['symbol']
init_msgs = { init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
sym: { sym: {
"symbol_info": { "symbol_info": {
"asset_type": "option", "asset_type": "crypto",
"price_tick_size": 0.0005, "price_tick_size": 0.0005,
"lot_tick_size": 0.1, "lot_tick_size": 0.1,
}, },
@ -327,25 +333,70 @@ async def stream_quotes(
last_trades = await client.last_trades(sym) last_trades = await client.last_trades(sym)
# @acm @acm
# async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
await ws.send_msg({
token = await client._get_ws_token(True) "id": connect_id,
connect_id = str(uuid4()) "type": "ping"
async with open_autorecon_ws( })
f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={connect_id}]" res = await ws.recv_msg()
) as ws:
breakpoint() breakpoint()
yield
# l1_sub = make_sub(kucoin_sym, sub_id)
# await ws.send_msg(l1_sub)
# res = await ws.recv_msg()
# breakpoint()
# assert res['id'] == connect_id
#
# yield
#
# # unsub
# ws.send_msg({
# "id": sub_id,
# "type": "unsubscribe",
# "topic": f"/market/ticker:{sym}",
# "privateChannel": False,
# "response": True,
# })
token = await client._get_ws_token()
breakpoint()
async with open_autorecon_ws(
f"wss://ws-api-spot.kucoin.com/?token=={token}&[connectId={connect_id}]",
fixture=subscribe,
) as ws:
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws)
def make_sub(sym, connect_id):
breakpoint()
return {
"id": connect_id,
"type": "subscribe",
"topic": f"/market/ticker:{sym}",
"privateChannel": False,
"response": True,
}
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
while True: while True:
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
msg = await ws.recv_msg() msg = await ws.recv_msg()
print(f"msg: {msg}")
if cs.cancelled_caught:
timeouts += 1
if timeouts > 2:
log.error("kucoin feed is sh**ing the bed... rebooting...")
await ws._connect()
continue
breakpoint()
@acm @acm
async def open_history_client( async def open_history_client(

View File

@ -100,6 +100,7 @@ class NoBsWs:
last_err = None last_err = None
for i in range(tries): for i in range(tries):
try: try:
breakpoint()
self._ws = await self._stack.enter_async_context( self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url) trio_websocket.open_websocket_url(self.url)
) )
@ -166,7 +167,7 @@ async def open_autorecon_ws(
# TODO: proper type cannot smh # TODO: proper type cannot smh
fixture: Optional[Callable] = None, fixture: Optional[Callable] = None,
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.
""" """