Implement duplicate filtering at message level
parent
93e7d54c5e
commit
3bed3a64c3
|
@ -546,6 +546,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
|
|||
|
||||
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
||||
timeouts = 0
|
||||
last_trade_data: KucoinTrade | dict = {}
|
||||
|
||||
while True:
|
||||
with trio.move_on_after(3) as cs:
|
||||
|
@ -558,12 +559,19 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
|||
|
||||
continue
|
||||
|
||||
|
||||
if msg.get('subject') != None:
|
||||
msg = KucoinMsg(**msg)
|
||||
|
||||
match msg.subject:
|
||||
case 'trade.ticker':
|
||||
trade_data = KucoinTrade(**msg.data)
|
||||
|
||||
# Filter for duplicate messages
|
||||
if last_trade_data and trade_data.time == last_trade_data.time:
|
||||
continue
|
||||
|
||||
last_trade_data = trade_data
|
||||
|
||||
yield 'trade', {
|
||||
'symbol': sym,
|
||||
'last': trade_data.price,
|
||||
|
@ -580,6 +588,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
|||
|
||||
case 'level2':
|
||||
l2_data = KucoinL2(**msg.data)
|
||||
breakpoint()
|
||||
first_ask = l2_data.asks[0]
|
||||
first_bid = l2_data.bids[0]
|
||||
yield 'l1', {
|
||||
|
|
Loading…
Reference in New Issue