diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 06c3ed46..778f3aac 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,8 +26,8 @@ from contextlib import ( aclosing, ) from datetime import datetime -# from functools import lru_cache from decimal import Decimal +import itertools from typing import ( Any, Union, Optional, AsyncGenerator, Callable, @@ -560,8 +560,6 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - uid = 0 - async with ( send_chan as send_chan, ): @@ -577,24 +575,31 @@ async def stream_quotes( ) ) + iter_subids = itertools.count() + @acm async def subscribe(ws: NoBsWs): # setup subs + subid: int = next(iter_subids) + # trade data (aka L1) # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker - l1_sub = make_sub(symbols, 'bookTicker', uid) + l1_sub = make_sub(symbols, 'bookTicker', subid) await ws.send_msg(l1_sub) # aggregate (each order clear by taker **not** by maker) # trades data: # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams - agg_trades_sub = make_sub(symbols, 'aggTrade', uid) + agg_trades_sub = make_sub(symbols, 'aggTrade', subid) await ws.send_msg(agg_trades_sub) - # ack from ws server + # might get ack from ws server, or maybe some + # other msg still in transit.. res = await ws.recv_msg() - assert res['id'] == uid + subid: str | None = res.get('id') + if subid: + assert res['id'] == subid yield @@ -608,7 +613,7 @@ async def stream_quotes( await ws.send_msg({ "method": "UNSUBSCRIBE", "params": subs, - "id": uid, + "id": subid, }) # XXX: do we need to ack the unsub?