binance: use new `int` sub-id for each request

marketstore_disable_snappy
Tyler Goodlet 2023-04-22 13:11:40 -04:00
parent 0a8dd7b6da
commit 363a2bbcc6
1 changed files with 13 additions and 8 deletions

View File

@ -26,8 +26,8 @@ from contextlib import (
aclosing,
)
from datetime import datetime
# from functools import lru_cache
from decimal import Decimal
import itertools
from typing import (
Any, Union, Optional,
AsyncGenerator, Callable,
@ -560,8 +560,6 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
uid = 0
async with (
send_chan as send_chan,
):
@ -577,24 +575,31 @@ async def stream_quotes(
)
)
iter_subids = itertools.count()
@acm
async def subscribe(ws: NoBsWs):
# setup subs
subid: int = next(iter_subids)
# trade data (aka L1)
# https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
l1_sub = make_sub(symbols, 'bookTicker', uid)
l1_sub = make_sub(symbols, 'bookTicker', subid)
await ws.send_msg(l1_sub)
# aggregate (each order clear by taker **not** by maker)
# trades data:
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
agg_trades_sub = make_sub(symbols, 'aggTrade', uid)
agg_trades_sub = make_sub(symbols, 'aggTrade', subid)
await ws.send_msg(agg_trades_sub)
# ack from ws server
# might get ack from ws server, or maybe some
# other msg still in transit..
res = await ws.recv_msg()
assert res['id'] == uid
subid: str | None = res.get('id')
if subid:
assert res['id'] == subid
yield
@ -608,7 +613,7 @@ async def stream_quotes(
await ws.send_msg({
"method": "UNSUBSCRIBE",
"params": subs,
"id": uid,
"id": subid,
})
# XXX: do we need to ack the unsub?