kucoin: handle ws welcome, subs-ack and pong msgs

Previously the subscription response handling was a bit sloppy what with
ignoring the welcome msg; this now correctly expects the correct startup
sequence. Also this avoids warn logging on pong messages by expecting
them in the msg loop and further drops the `KucoinMsg` struct and
instead changes the msg loop to expect `dict`s and only cast to structs
on live feed msgs that we actually process/relay.
master
Tyler Goodlet 2023-05-17 11:59:19 -04:00
parent d0ba9a0a58
commit a44e926c2f
1 changed files with 21 additions and 28 deletions

View File

@ -176,17 +176,6 @@ class KucoinL2(Struct, frozen=True):
timestamp: float timestamp: float
class KucoinMsg(Struct, frozen=True):
'''
Generic outer-wrapper for any Kucoin ws msg
'''
type: str
topic: str
subject: str
data: list[KucoinTrade | KucoinL2]
class Currency(Struct, frozen=True): class Currency(Struct, frozen=True):
''' '''
Currency (asset) info: Currency (asset) info:
@ -743,12 +732,14 @@ async def subscribe(
'id': connect_id, 'id': connect_id,
'type': 'subscribe', 'type': 'subscribe',
'topic': ep, 'topic': ep,
# 'topic': f'/spotMarket/level2Depth5:{bs_mktid}',
'privateChannel': False, 'privateChannel': False,
'response': True, 'response': True,
} }
) )
welcome_msg = await ws.recv_msg()
log.info(f'WS welcome: {welcome_msg}')
for _ in topics: for _ in topics:
ack_msg = await ws.recv_msg() ack_msg = await ws.recv_msg()
log.info(f'Sub ACK: {ack_msg}') log.info(f'Sub ACK: {ack_msg}')
@ -782,19 +773,16 @@ async def stream_messages(
''' '''
last_trade_ts: float = 0 last_trade_ts: float = 0
dict_msg: dict[str, Any]
async for dict_msg in ws: async for dict_msg in ws:
if 'subject' not in dict_msg: match dict_msg:
log.warn(f'Unhandled message: {dict_msg}') case {
continue 'subject': 'trade.ticker',
'data': trade_data_dict,
}:
trade_data = KucoinTrade(**trade_data_dict)
msg = KucoinMsg(**dict_msg) # XXX: Filter out duplicate messages as ws feed will
match msg:
case KucoinMsg(
subject='trade.ticker',
):
trade_data = KucoinTrade(**msg.data)
# XXX: Filter for duplicate messages as ws feed will
# send duplicate market state # send duplicate market state
# https://docs.kucoin.com/#level2-5-best-ask-bid-orders # https://docs.kucoin.com/#level2-5-best-ask-bid-orders
if trade_data.time == last_trade_ts: if trade_data.time == last_trade_ts:
@ -816,10 +804,11 @@ async def stream_messages(
], ],
} }
case KucoinMsg( case {
subject='level2', 'subject': 'level2',
): 'data': trade_data_dict,
l2_data = KucoinL2(**msg.data) }:
l2_data = KucoinL2(**trade_data_dict)
first_ask = l2_data.asks[0] first_ask = l2_data.asks[0]
first_bid = l2_data.bids[0] first_bid = l2_data.bids[0]
yield 'l1', { yield 'l1', {
@ -848,8 +837,12 @@ async def stream_messages(
], ],
} }
case {'type': 'pong'}:
# resp to ping task req
continue
case _: case _:
log.warn(f'Unhandled message: {msg}') log.warn(f'Unhandled message: {dict_msg}')
@acm @acm