From 3bed3a64c3038505709645d494774e603151127c Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Mon, 10 Apr 2023 19:59:50 -0400 Subject: [PATCH] Implement duplicate filtering at message level --- piker/brokers/kucoin.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 8e9d398c..a8d04337 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -546,6 +546,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]: async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 + last_trade_data: KucoinTrade | dict = {} while True: with trio.move_on_after(3) as cs: @@ -558,12 +559,19 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: continue + if msg.get('subject') != None: msg = KucoinMsg(**msg) - match msg.subject: case 'trade.ticker': trade_data = KucoinTrade(**msg.data) + + # Filter for duplicate messages + if last_trade_data and trade_data.time == last_trade_data.time: + continue + + last_trade_data = trade_data + yield 'trade', { 'symbol': sym, 'last': trade_data.price, @@ -580,6 +588,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: case 'level2': l2_data = KucoinL2(**msg.data) + breakpoint() first_ask = l2_data.asks[0] first_bid = l2_data.bids[0] yield 'l1', {