From 9ea960303719b6731bff5ee349b30a284acfb740 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Apr 2023 23:10:48 -0400 Subject: [PATCH] 'Only emit `'trade'` ticks on msgs with new(er) timestamps from `kucoin`' --- piker/brokers/kucoin.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 0c3d1c7e..fe275435 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -559,6 +559,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]: async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 + last_ts: int = 0 while True: with trio.move_on_after(3) as cs: @@ -571,12 +572,17 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: continue - if msg.get('subject') != None: + if msg.get('subject'): msg = KucoinMsg(**msg) match msg.subject: case 'trade.ticker': trade_data = KucoinTrade(**msg.data) + ts = trade_data.time + if ts <= last_ts: + continue + + last_ts = ts yield 'trade', { 'symbol': sym, 'last': trade_data.price, @@ -586,7 +592,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: 'type': 'trade', 'price': float(trade_data.price), 'size': float(trade_data.size), - 'broker_ts': trade_data.time, + 'broker_ts': ts, } ], }