From 604e195bc06fbfafde65d817fc4934a35d32deec Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Fri, 7 May 2021 10:59:08 -0300 Subject: [PATCH] Got rid of websocket OHLC API, and added l1 tick streaming --- piker/brokers/binance.py | 122 +++++++++------------------------------ 1 file changed, 27 insertions(+), 95 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 371425ee..a56677ca 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -55,28 +55,6 @@ log = get_logger(__name__) _url = 'https://api.binance.com' -# Broker specific ohlc schema (websocket) -_websocket_ohlc_dtype = [ - ('index', int), - ('time', int), - ('close_time', int), - ('symbol', str), - ('interval', str), - ('first_trade_id', int), - ('last_trade_id', int), - ('open', float), - ('close', float), - ('high', float), - ('low', float), - ('volume', float), - ('num_trades', int), - ('closed', bool), - ('quote_asset_volume', float), - ('taker_buy_base_asset_volume', float), - ('taker_buy_quote_asset_volume', float), - ('ignore', int) -] - # Broker specific ohlc schema (rest) _ohlc_dtype = [ ('index', int), @@ -150,6 +128,8 @@ class OHLC: buy_base_vol: float buy_quote_vol: float ignore: int + # (sampled) generated tick data + ticks: List[Any] = field(default_factory=list) # convert arrow timestamp to unixtime in miliseconds @@ -235,30 +215,29 @@ async def get_client() -> Client: async def stream_messages(ws): - - too_slow_count = last_hb = 0 - while True: with trio.move_on_after(5) as cs: msg = await ws.recv_msg() - if msg.get('e') == 'kline': + # for l1 streams binance doesn't add an event type field so + # identify those messages by matching keys + if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']: + sym = msg['s'] + bid = float(msg['b']) + bsize = float(msg['B']) + ask = float(msg['a']) + asize = float(msg['A']) - yield 'ohlc', OHLC(*msg['k'].values()) - - -def normalize( - ohlc: OHLC, -) -> dict: - quote = asdict(ohlc) - quote['broker_ts'] = quote['start_time'] - quote['brokerd_ts'] = time.time() - quote['last'] = quote['close'] - quote['time'] = quote['start_time'] - - # print(quote) - return ohlc.symbol, quote + yield 'l1', { + 'symbol': sym, + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize} + ] + } def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: @@ -389,10 +368,6 @@ async def stream_quotes( task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - """Subscribe for ohlc stream of quotes for ``pairs``. - - ``pairs`` must be formatted . - """ # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) @@ -421,14 +396,9 @@ async def stream_quotes( async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: # XXX: setup subs - ohlc_sub = make_sub(symbols, 'kline_1m', uid) - uid += 1 - - await ws.send_msg(ohlc_sub) - res = await ws.recv_msg() # trade data (aka L1) - l1_sub = make_sub(symbols, 'trade', uid) + l1_sub = make_sub(symbols, 'bookTicker', uid) uid += 1 await ws.send_msg(l1_sub) @@ -438,56 +408,18 @@ async def stream_quotes( msg_gen = stream_messages(ws) # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() + typ, tick = await msg_gen.__anext__() - topic, quote = normalize(ohlc_last) - - first_quote = {topic: quote} + first_quote = {tick['symbol']: tick} task_status.started((init_msgs, first_quote)) - # lol, only "closes" when they're margin squeezing clients ;P feed_is_live.set() - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.end_time - # start streaming - async for typ, ohlc in msg_gen: - ... - # if typ == 'ohlc': + async for typ, msg in msg_gen: - # # TODO: can get rid of all this by using - # # ``trades`` subscription... + if typ == 'l1': + topic = msg['symbol'] + quote = msg - # # generate tick values to match time & sales pane: - # # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - # volume = ohlc.volume - - # # new OHLC sample interval - # if ohlc.etime > last_interval_start: - # last_interval_start = ohlc.etime - # tick_volume = volume - - # else: - # # this is the tick volume *within the interval* - # tick_volume = volume - ohlc_last.volume - - # ohlc_last = ohlc - # last = ohlc.close - - # if tick_volume: - # ohlc.ticks.append({ - # 'type': 'trade', - # 'price': last, - # 'size': tick_volume, - # }) - - # topic, quote = normalize(ohlc) - - # elif typ == 'l1': - # quote = ohlc - # topic = quote['symbol'] - - # # XXX: format required by ``tractor.msg.pub`` - # # requires a ``Dict[topic: str, quote: dict]`` - # await send_chan.send({topic: quote}) + await send_chan.send({topic: quote})