kucoin: port to new `NoBsWs` api semantics

No longer need to implement connection timeout logic in the streaming
code, instead we just `async for` that bby B)

Further refining:
- better `KucoinTrade` msg parsing and handling with object cases.
- make `subscribe()` do sub request in a loop wand wair for acks.
master
Tyler Goodlet 2023-05-10 16:22:09 -04:00
parent c6e5368520
commit e06f9dc5c0
1 changed files with 105 additions and 98 deletions

View File

@ -1,4 +1,6 @@
# Copyright (C) Jared Goldman (in stewardship for pikers) # Copyright (C) (in stewardship for pikers)
# - Jared Goldman
# - Tyler Goodlet
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -693,7 +695,6 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
# subscribe(ws, connect_id, kucoin_sym),
aclosing(stream_messages(ws, sym_str)) as msg_gen, aclosing(stream_messages(ws, sym_str)) as msg_gen,
): ):
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)
@ -716,43 +717,48 @@ async def subscribe(
connect_id, connect_id,
bs_mktid, bs_mktid,
) -> AsyncGenerator[None, None]: # subs are filled in with `bs_mktid` from avbove
# level 2 sub topics: list[str] = [
await ws.send_msg( '/market/ticker:{bs_mktid}', # clearing events
{ '/spotMarket/level2Depth5:{bs_mktid}', # level 2
'id': connect_id, ],
'type': 'subscribe',
'topic': f'/spotMarket/level2Depth5:{bs_mktid}',
'privateChannel': False,
'response': True,
}
)
# watch trades ) -> AsyncGenerator[None, None]:
await ws.send_msg(
{ eps: list[str] = []
'id': connect_id, for topic in topics:
'type': 'subscribe', ep: str = topic.format(bs_mktid=bs_mktid)
'topic': f'/market/ticker:{bs_mktid}', eps.append(ep)
'privateChannel': False, await ws.send_msg(
'response': True, {
} 'id': connect_id,
) 'type': 'subscribe',
'topic': ep,
# 'topic': f'/spotMarket/level2Depth5:{bs_mktid}',
'privateChannel': False,
'response': True,
}
)
for _ in topics:
ack_msg = await ws.recv_msg()
log.info(f'Sub ACK: {ack_msg}')
yield yield
# unsub # unsub
if ws.connected(): if ws.connected():
log.info(f'Unsubscribing to {bs_mktid} feed') log.info(f'Unsubscribing to {bs_mktid} feed')
await ws.send_msg( for ep in eps:
{ await ws.send_msg(
'id': connect_id, {
'type': 'unsubscribe', 'id': connect_id,
'topic': f'/market/ticker:{bs_mktid}', 'type': 'unsubscribe',
'privateChannel': False, 'topic': ep,
'response': True, 'privateChannel': False,
} 'response': True,
) }
)
async def stream_messages( async def stream_messages(
@ -760,80 +766,81 @@ async def stream_messages(
sym: str, sym: str,
) -> AsyncGenerator[tuple[str, dict], None]: ) -> AsyncGenerator[tuple[str, dict], None]:
timeouts = 0 '''
last_trade_ts = 0 Core (live) feed msg handler: relay market events
to the piker-ized tick-stream format.
while True: '''
with trio.move_on_after(3) as cs: last_trade_ts: float = 0
msg = await ws.recv_msg()
if cs.cancelled_caught:
timeouts += 1
if timeouts > 2:
log.error(
'kucoin feed is sh**ing the bed... rebooting...')
await ws._connect()
async for dict_msg in ws:
if 'subject' not in dict_msg:
log.warn(f'Unhandled message: {dict_msg}')
continue continue
if msg.get('subject'):
msg = KucoinMsg(**msg)
match msg.subject:
case 'trade.ticker':
trade_data = KucoinTrade(**msg.data)
# XXX: Filter for duplicate messages as ws feed will msg = KucoinMsg(**dict_msg)
# send duplicate market state match msg:
# https://docs.kucoin.com/#level2-5-best-ask-bid-orders case KucoinMsg(
if trade_data.time == last_trade_ts: subject='trade.ticker',
continue ):
trade_data = KucoinTrade(**msg.data)
last_trade_ts = trade_data.time # XXX: Filter for duplicate messages as ws feed will
# send duplicate market state
# https://docs.kucoin.com/#level2-5-best-ask-bid-orders
if trade_data.time == last_trade_ts:
continue
yield 'trade', { last_trade_ts = trade_data.time
'symbol': sym,
'last': trade_data.price,
'brokerd_ts': last_trade_ts,
'ticks': [
{
'type': 'trade',
'price': float(trade_data.price),
'size': float(trade_data.size),
'broker_ts': last_trade_ts,
}
],
}
case 'level2': yield 'trade', {
l2_data = KucoinL2(**msg.data) 'symbol': sym,
first_ask = l2_data.asks[0] 'last': trade_data.price,
first_bid = l2_data.bids[0] 'brokerd_ts': last_trade_ts,
yield 'l1', { 'ticks': [
'symbol': sym, {
'ticks': [ 'type': 'trade',
{ 'price': float(trade_data.price),
'type': 'bid', 'size': float(trade_data.size),
'price': float(first_bid[0]), 'broker_ts': last_trade_ts,
'size': float(first_bid[1]), }
}, ],
{ }
'type': 'bsize',
'price': float(first_bid[0]),
'size': float(first_bid[1]),
},
{
'type': 'ask',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
{
'type': 'asize',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
],
}
case _: case KucoinMsg(
log.warn(f'Unhandled message: {msg}') subject='level2',
):
l2_data = KucoinL2(**msg.data)
first_ask = l2_data.asks[0]
first_bid = l2_data.bids[0]
yield 'l1', {
'symbol': sym,
'ticks': [
{
'type': 'bid',
'price': float(first_bid[0]),
'size': float(first_bid[1]),
},
{
'type': 'bsize',
'price': float(first_bid[0]),
'size': float(first_bid[1]),
},
{
'type': 'ask',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
{
'type': 'asize',
'price': float(first_ask[0]),
'size': float(first_ask[1]),
},
],
}
case _:
log.warn(f'Unhandled message: {msg}')
@acm @acm