From 7b26bd45e245296e4213706340d6eac91edd47f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 May 2021 15:25:18 -0400 Subject: [PATCH] Get binance OHLC history and quote format correct This gets the binance provider meeting the data feed schema requirements of both the OHLC sampling/charting machinery as well as proper formatting of historical OHLC history. Notably, - spec a minimal ohlc dtype based on the kline endpoint - use a dataclass to parse out OHLC bar datums and pack into np.ndarray/shm - add the ``aggTrade`` endpoint to get last clearing (traded) prices, validate with ``pydantic`` and then normalize these into our tick-quote format for delivery over the feed stream api. - a notable requirement is that the "first" quote from the feed must contain a 'last` field so the clearing system can start up correctly. --- piker/brokers/binance.py | 188 +++++++++++++++++++++++++++++---------- 1 file changed, 140 insertions(+), 48 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index a56677ca..e956f8c7 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -19,7 +19,6 @@ Binance backend """ from contextlib import asynccontextmanager, AsyncExitStack -from dataclasses import asdict, field from types import ModuleType from typing import List, Dict, Any, Tuple, Union, Optional import json @@ -45,7 +44,7 @@ from pydantic import BaseModel from .api import open_cached_client -from ._util import resproc, SymbolNotFound, BrokerError +from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray @@ -64,12 +63,14 @@ _ohlc_dtype = [ ('low', float), ('close', float), ('volume', float), - ('close_time', int), - ('quote_vol', float), - ('num_trades', int), - ('buy_base_vol', float), - ('buy_quote_vol', float), - ('ignore', float) + # XXX: don't need these in shm history right? + # ('close_time', int), + # ('quote_vol', float), + # ('num_trades', int), + # ('buy_base_vol', float), + # ('buy_quote_vol', float), + # ('ignore', float), + ('bar_wap', float), # will be zeroed by sampler if not filled ] # UI components allow this to be declared such that additional @@ -78,9 +79,10 @@ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = False + # https://binance-docs.github.io/apidocs/spot/en/#exchange-information class Pair(BaseModel): - symbol: str + symbol: str status: str baseAsset: str @@ -99,42 +101,67 @@ class Pair(BaseModel): quoteOrderQtyMarketAllowed: bool isSpotTradingAllowed: bool isMarginTradingAllowed: bool - + filters: List[Dict[str, Union[str, int, float]]] permissions: List[str] +# TODO: this isn't being used yet right? @dataclass class OHLC: """Description of the flattened OHLC quote format. For schema details see: - https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams + https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams + + documented format: + ``` + [ + [ + 1499040000000, // Open time + "0.01634790", // Open + "0.80000000", // High + "0.01575800", // Low + "0.01577100", // Close + "148976.11427815", // Volume + 1499644799999, // Close time + "2434.19055334", // Quote asset volume + 308, // Number of trades + "1756.87402397", // Taker buy base asset volume + "28.46694368", // Taker buy quote asset volume + "17928899.62484339" // Ignore. + ] + ] + ``` + """ - start_time: int - end_time: int - symbol: str - interval: str - first_id: int - last_id: int + time: int + open: float - close: float high: float low: float - base_vol: float - num_trades: int - closed: bool + close: float + volume: float + + close_time: int + quote_vol: float + num_trades: int buy_base_vol: float buy_quote_vol: float ignore: int + + # null the place holder for `bar_wap` until we + # figure out what to extract for this. + bar_wap: float = 0.0 + # (sampled) generated tick data - ticks: List[Any] = field(default_factory=list) + # ticks: List[Any] = field(default_factory=list) # convert arrow timestamp to unixtime in miliseconds def binance_timestamp(when): - return int((when.timestamp * 1000) + (when.microsecond / 1000)) + return int((when.timestamp() * 1000) + (when.microsecond / 1000)) class Client: @@ -158,14 +185,16 @@ class Client: async def symbol_info( self, sym: Optional[str] = None - ): + + ) -> dict: + resp = await self._api('exchangeInfo', {}) if sym is not None: for sym_info in resp['symbols']: if sym_info['symbol'] == sym: return sym_info else: - raise BrokerError(f'{sym} not found') + raise SymbolNotFound(f'{sym} not found') else: return resp['symbols'] @@ -176,17 +205,18 @@ class Client: end_time: int = None, limit: int = 1000, # <- max allowed per query as_np: bool = True, + ) -> dict: + if start_time is None: start_time = binance_timestamp( - arrow.utcnow() - .floor('minute') - .shift(minutes=-limit) + arrow.utcnow().floor('minute').shift(minutes=-limit) ) - + if end_time is None: end_time = binance_timestamp(arrow.utcnow()) + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data bars = await self._api( 'klines', { @@ -198,12 +228,29 @@ class Client: } ) - new_bars = [ - (i,) + tuple( - ftype(bar[j]) - for j, (name, ftype) in enumerate(_ohlc_dtype[1:]) - ) for i, bar in enumerate(bars) - ] + # TODO: pack this bars scheme into a ``pydantic`` validator type: + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data + + # TODO: we should port this to ``pydantic`` to avoid doing + # manual validation ourselves.. + new_bars = [] + for i, bar in enumerate(bars): + + bar = OHLC(*bar) + + row = [] + for j, (name, ftype) in enumerate(_ohlc_dtype[1:]): + + # TODO: maybe we should go nanoseconds on all + # history time stamps? + if name == 'time': + # convert to epoch seconds: float + row.append(bar.time / 1000.0) + + else: + row.append(getattr(bar, name)) + + new_bars.append((i,) + tuple(row)) array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array @@ -214,15 +261,32 @@ async def get_client() -> Client: yield Client() +# validation type +class AggTrade(BaseModel): + e: str # "aggTrade", # Event type + E: int # 123456789, # Event time + s: str # "BNBBTC", # Symbol + a: int # 12345, # Aggregate trade ID + p: float # "0.001", # Price + q: float # "100", # Quantity + f: int # 100, # First trade ID + l: int # 105, # Last trade ID + T: int # 123456785, # Trade time + m: bool # true, # Is the buyer the market maker? + M: bool # true # Ignore + + async def stream_messages(ws): while True: - with trio.move_on_after(5) as cs: + with trio.move_on_after(5): msg = await ws.recv_msg() # for l1 streams binance doesn't add an event type field so # identify those messages by matching keys - if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']: + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + + if msg.get('u'): sym = msg['s'] bid = float(msg['b']) bsize = float(msg['B']) @@ -239,6 +303,25 @@ async def stream_messages(ws): ] } + elif msg.get('e') == 'aggTrade': + + # validate + msg = AggTrade(**msg) + + # TODO: type out and require this quote format + # from all backends! + yield 'trade', { + 'symbol': msg.s, + 'last': msg.p, + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': msg.p, + 'size': msg.q, + 'broker_ts': msg.T, + }], + } + def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: """Create a request subscription packet dict. @@ -395,31 +478,40 @@ async def stream_quotes( async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: - # XXX: setup subs + # setup subs # trade data (aka L1) + # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker l1_sub = make_sub(symbols, 'bookTicker', uid) - uid += 1 - await ws.send_msg(l1_sub) + + # aggregate (each order clear by taker **not** by maker) + # trades data: + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + agg_trades_sub = make_sub(symbols, 'aggTrade', uid) + await ws.send_msg(agg_trades_sub) + + # ack from ws server res = await ws.recv_msg() + assert res['id'] == uid # pull a first quote and deliver msg_gen = stream_messages(ws) - # TODO: use ``anext()`` when it lands in 3.10! - typ, tick = await msg_gen.__anext__() + typ, quote = await msg_gen.__anext__() - first_quote = {tick['symbol']: tick} + while typ != 'trade': + # TODO: use ``anext()`` when it lands in 3.10! + typ, quote = await msg_gen.__anext__() + + first_quote = {quote['symbol']: quote} task_status.started((init_msgs, first_quote)) + # signal to caller feed is ready for consumption feed_is_live.set() # start streaming async for typ, msg in msg_gen: - if typ == 'l1': - topic = msg['symbol'] - quote = msg - - await send_chan.send({topic: quote}) + topic = msg['symbol'] + await send_chan.send({topic: msg})