'Only emit `'trade'` ticks on msgs with new(er) timestamps from `kucoin`'
parent
f9d7892d69
commit
9ea9603037
|
@ -559,6 +559,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
|
||||||
|
|
||||||
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
||||||
timeouts = 0
|
timeouts = 0
|
||||||
|
last_ts: int = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
|
@ -571,12 +572,17 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if msg.get('subject') != None:
|
if msg.get('subject'):
|
||||||
msg = KucoinMsg(**msg)
|
msg = KucoinMsg(**msg)
|
||||||
|
|
||||||
match msg.subject:
|
match msg.subject:
|
||||||
case 'trade.ticker':
|
case 'trade.ticker':
|
||||||
trade_data = KucoinTrade(**msg.data)
|
trade_data = KucoinTrade(**msg.data)
|
||||||
|
ts = trade_data.time
|
||||||
|
if ts <= last_ts:
|
||||||
|
continue
|
||||||
|
|
||||||
|
last_ts = ts
|
||||||
yield 'trade', {
|
yield 'trade', {
|
||||||
'symbol': sym,
|
'symbol': sym,
|
||||||
'last': trade_data.price,
|
'last': trade_data.price,
|
||||||
|
@ -586,7 +592,7 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
|
||||||
'type': 'trade',
|
'type': 'trade',
|
||||||
'price': float(trade_data.price),
|
'price': float(trade_data.price),
|
||||||
'size': float(trade_data.size),
|
'size': float(trade_data.size),
|
||||||
'broker_ts': trade_data.time,
|
'broker_ts': ts,
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue