diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 1eac6edb..371425ee 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -21,7 +21,7 @@ Binance backend from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field from types import ModuleType -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple, Union, Optional import json import time @@ -52,13 +52,14 @@ from ..data import ShmArray log = get_logger(__name__) -_url = 'https://api.binance.com/' +_url = 'https://api.binance.com' -# Broker specific ohlc schema -_ohlc_dtype = [ - ('kline_start_time', int), - ('kline_close_time', int), +# Broker specific ohlc schema (websocket) +_websocket_ohlc_dtype = [ + ('index', int), + ('time', int), + ('close_time', int), ('symbol', str), ('interval', str), ('first_trade_id', int), @@ -76,30 +77,52 @@ _ohlc_dtype = [ ('ignore', int) ] +# Broker specific ohlc schema (rest) +_ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('close_time', int), + ('quote_vol', float), + ('num_trades', int), + ('buy_base_vol', float), + ('buy_quote_vol', float), + ('ignore', float) +] + # UI components allow this to be declared such that additional # (historical) fields can be exposed. ohlc_dtype = np.dtype(_ohlc_dtype) +_show_wap_in_history = False # https://binance-docs.github.io/apidocs/spot/en/#exchange-information class Pair(BaseModel): symbol: str status: str - base_asset: str - base_precision: int - quote_asset: str - quote_precision: int - quote_asset_precision: int + baseAsset: str + baseAssetPrecision: int + quoteAsset: str + quotePrecision: int + quoteAssetPrecision: int - order_types: List[str] + baseCommissionPrecision: int + quoteCommissionPrecision: int - iceberg_allowed: bool - oco_allowed: bool - is_spot_trading_allowed: bool - is_margin_trading_allowed: bool + orderTypes: List[str] + + icebergAllowed: bool + ocoAllowed: bool + quoteOrderQtyMarketAllowed: bool + isSpotTradingAllowed: bool + isMarginTradingAllowed: bool - filters: List[str] + filters: List[Dict[str, Union[str, int, float]]] permissions: List[str] @@ -117,15 +140,21 @@ class OHLC: first_id: int last_id: int open: float + close: float high: float low: float - close: float base_vol: float num_trades: int closed: bool quote_vol: float buy_base_vol: float buy_quote_vol: float + ignore: int + + +# convert arrow timestamp to unixtime in miliseconds +def binance_timestamp(when): + return int((when.timestamp * 1000) + (when.microsecond / 1000)) class Client: @@ -139,7 +168,7 @@ class Client: method: str, data: dict, ) -> Dict[str, Any]: - resp = await self._sesh.post( + resp = await self._sesh.get( path=f'/api/v3/{method}', params=data, timeout=float('inf') @@ -152,11 +181,11 @@ class Client: ): resp = await self._api('exchangeInfo', {}) if sym is not None: - return [ - sym_info - for sym_info in resp['symbols'] - if sym_info['symbol'] == sym - ] + for sym_info in resp['symbols']: + if sym_info['symbol'] == sym: + return sym_info + else: + raise BrokerError(f'{sym} not found') else: return resp['symbols'] @@ -169,13 +198,16 @@ class Client: as_np: bool = True, ) -> dict: if start_time is None: - start_time = int(arrow.utcnow().floor('minute').shift( - minutes=-limit).format('x')) + start_time = binance_timestamp( + arrow.utcnow() + .floor('minute') + .shift(minutes=-limit) + ) if end_time is None: - end_time = int(arrow.utcnow().format('x')) + end_time = binance_timestamp(arrow.utcnow()) - json = await self._api( + bars = await self._api( 'klines', { 'symbol': symbol, @@ -186,8 +218,14 @@ class Client: } ) - bars = next(iter(json)) - array = np.array(bars, dtype=_ohlc_dtype) if as_np else bars + new_bars = [ + (i,) + tuple( + ftype(bar[j]) + for j, (name, ftype) in enumerate(_ohlc_dtype[1:]) + ) for i, bar in enumerate(bars) + ] + + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array @@ -205,7 +243,9 @@ async def stream_messages(ws): with trio.move_on_after(5) as cs: msg = await ws.recv_msg() - breakpoint() + if msg.get('e') == 'kline': + + yield 'ohlc', OHLC(*msg['k'].values()) def normalize( @@ -215,9 +255,10 @@ def normalize( quote['broker_ts'] = quote['start_time'] quote['brokerd_ts'] = time.time() quote['last'] = quote['close'] + quote['time'] = quote['start_time'] # print(quote) - return topic, quote + return ohlc.symbol, quote def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: @@ -228,7 +269,7 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: return { 'method': 'SUBSCRIBE', 'params': [ - f'{pair}@{sub_name}' + f'{pair.lower()}@{sub_name}' for pair in pairs ], 'id': uid @@ -362,7 +403,8 @@ async def stream_quotes( # keep client cached for real-time section for sym in symbols: - syminfo = Pair(*await client.symbol_info(sym)) # validation + d = await client.symbol_info(sym) + syminfo = Pair(**d) # validation sym_infos[sym] = syminfo.dict() symbol = symbols[0] @@ -376,19 +418,21 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://stream.binance.com:9443') as ws: + async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: # XXX: setup subs ohlc_sub = make_sub(symbols, 'kline_1m', uid) uid += 1 await ws.send_msg(ohlc_sub) + res = await ws.recv_msg() # trade data (aka L1) l1_sub = make_sub(symbols, 'trade', uid) uid += 1 await ws.send_msg(l1_sub) + res = await ws.recv_msg() # pull a first quote and deliver msg_gen = stream_messages(ws)