Finally backfilling is working, still need to work on realtime updates!

binance_backend
Guillermo Rodriguez 2021-05-05 10:17:52 -03:00
parent d327584039
commit 7e493625f6
No known key found for this signature in database
GPG Key ID: 3F61096EC7DF75A8
1 changed files with 79 additions and 35 deletions

View File

@ -21,7 +21,7 @@ Binance backend
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple, Union, Optional
import json
import time
@ -52,13 +52,14 @@ from ..data import ShmArray
log = get_logger(__name__)
_url = 'https://api.binance.com/'
_url = 'https://api.binance.com'
# Broker specific ohlc schema
_ohlc_dtype = [
('kline_start_time', int),
('kline_close_time', int),
# Broker specific ohlc schema (websocket)
_websocket_ohlc_dtype = [
('index', int),
('time', int),
('close_time', int),
('symbol', str),
('interval', str),
('first_trade_id', int),
@ -76,30 +77,52 @@ _ohlc_dtype = [
('ignore', int)
]
# Broker specific ohlc schema (rest)
_ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', float),
('close_time', int),
('quote_vol', float),
('num_trades', int),
('buy_base_vol', float),
('buy_quote_vol', float),
('ignore', float)
]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = False
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
symbol: str
status: str
base_asset: str
base_precision: int
quote_asset: str
quote_precision: int
quote_asset_precision: int
baseAsset: str
baseAssetPrecision: int
quoteAsset: str
quotePrecision: int
quoteAssetPrecision: int
order_types: List[str]
baseCommissionPrecision: int
quoteCommissionPrecision: int
iceberg_allowed: bool
oco_allowed: bool
is_spot_trading_allowed: bool
is_margin_trading_allowed: bool
orderTypes: List[str]
icebergAllowed: bool
ocoAllowed: bool
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool
filters: List[str]
filters: List[Dict[str, Union[str, int, float]]]
permissions: List[str]
@ -117,15 +140,21 @@ class OHLC:
first_id: int
last_id: int
open: float
close: float
high: float
low: float
close: float
base_vol: float
num_trades: int
closed: bool
quote_vol: float
buy_base_vol: float
buy_quote_vol: float
ignore: int
# convert arrow timestamp to unixtime in miliseconds
def binance_timestamp(when):
return int((when.timestamp * 1000) + (when.microsecond / 1000))
class Client:
@ -139,7 +168,7 @@ class Client:
method: str,
data: dict,
) -> Dict[str, Any]:
resp = await self._sesh.post(
resp = await self._sesh.get(
path=f'/api/v3/{method}',
params=data,
timeout=float('inf')
@ -152,11 +181,11 @@ class Client:
):
resp = await self._api('exchangeInfo', {})
if sym is not None:
return [
sym_info
for sym_info in resp['symbols']
if sym_info['symbol'] == sym
]
for sym_info in resp['symbols']:
if sym_info['symbol'] == sym:
return sym_info
else:
raise BrokerError(f'{sym} not found')
else:
return resp['symbols']
@ -169,13 +198,16 @@ class Client:
as_np: bool = True,
) -> dict:
if start_time is None:
start_time = int(arrow.utcnow().floor('minute').shift(
minutes=-limit).format('x'))
start_time = binance_timestamp(
arrow.utcnow()
.floor('minute')
.shift(minutes=-limit)
)
if end_time is None:
end_time = int(arrow.utcnow().format('x'))
end_time = binance_timestamp(arrow.utcnow())
json = await self._api(
bars = await self._api(
'klines',
{
'symbol': symbol,
@ -186,8 +218,14 @@ class Client:
}
)
bars = next(iter(json))
array = np.array(bars, dtype=_ohlc_dtype) if as_np else bars
new_bars = [
(i,) + tuple(
ftype(bar[j])
for j, (name, ftype) in enumerate(_ohlc_dtype[1:])
) for i, bar in enumerate(bars)
]
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
@ -205,7 +243,9 @@ async def stream_messages(ws):
with trio.move_on_after(5) as cs:
msg = await ws.recv_msg()
breakpoint()
if msg.get('e') == 'kline':
yield 'ohlc', OHLC(*msg['k'].values())
def normalize(
@ -215,9 +255,10 @@ def normalize(
quote['broker_ts'] = quote['start_time']
quote['brokerd_ts'] = time.time()
quote['last'] = quote['close']
quote['time'] = quote['start_time']
# print(quote)
return topic, quote
return ohlc.symbol, quote
def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
@ -228,7 +269,7 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
return {
'method': 'SUBSCRIBE',
'params': [
f'{pair}@{sub_name}'
f'{pair.lower()}@{sub_name}'
for pair in pairs
],
'id': uid
@ -362,7 +403,8 @@ async def stream_quotes(
# keep client cached for real-time section
for sym in symbols:
syminfo = Pair(*await client.symbol_info(sym)) # validation
d = await client.symbol_info(sym)
syminfo = Pair(**d) # validation
sym_infos[sym] = syminfo.dict()
symbol = symbols[0]
@ -376,19 +418,21 @@ async def stream_quotes(
},
}
async with open_autorecon_ws('wss://stream.binance.com:9443') as ws:
async with open_autorecon_ws('wss://stream.binance.com/ws') as ws:
# XXX: setup subs
ohlc_sub = make_sub(symbols, 'kline_1m', uid)
uid += 1
await ws.send_msg(ohlc_sub)
res = await ws.recv_msg()
# trade data (aka L1)
l1_sub = make_sub(symbols, 'trade', uid)
uid += 1
await ws.send_msg(l1_sub)
res = await ws.recv_msg()
# pull a first quote and deliver
msg_gen = stream_messages(ws)