From d327584039461551f0ec2335691ac6dca4471cc7 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 4 May 2021 22:52:53 -0300 Subject: [PATCH 1/5] Rough translation of kraken backend to binance API, still missing some important parts --- piker/brokers/binance.py | 449 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 449 insertions(+) create mode 100644 piker/brokers/binance.py diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py new file mode 100644 index 00000000..1eac6edb --- /dev/null +++ b/piker/brokers/binance.py @@ -0,0 +1,449 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Binance backend + +""" +from contextlib import asynccontextmanager, AsyncExitStack +from dataclasses import asdict, field +from types import ModuleType +from typing import List, Dict, Any, Tuple, Optional +import json +import time + +import trio_websocket +from trio_typing import TaskStatus +from trio_websocket._impl import ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, +) + +import arrow +import asks +import numpy as np +import trio +import tractor +from pydantic.dataclasses import dataclass +from pydantic import BaseModel + + +from .api import open_cached_client +from ._util import resproc, SymbolNotFound, BrokerError +from ..log import get_logger, get_console_log +from ..data import ShmArray + +log = get_logger(__name__) + + +_url = 'https://api.binance.com/' + + +# Broker specific ohlc schema +_ohlc_dtype = [ + ('kline_start_time', int), + ('kline_close_time', int), + ('symbol', str), + ('interval', str), + ('first_trade_id', int), + ('last_trade_id', int), + ('open', float), + ('close', float), + ('high', float), + ('low', float), + ('volume', float), + ('num_trades', int), + ('closed', bool), + ('quote_asset_volume', float), + ('taker_buy_base_asset_volume', float), + ('taker_buy_quote_asset_volume', float), + ('ignore', int) +] + +# UI components allow this to be declared such that additional +# (historical) fields can be exposed. +ohlc_dtype = np.dtype(_ohlc_dtype) + + +# https://binance-docs.github.io/apidocs/spot/en/#exchange-information +class Pair(BaseModel): + symbol: str + status: str + + base_asset: str + base_precision: int + quote_asset: str + quote_precision: int + quote_asset_precision: int + + order_types: List[str] + + iceberg_allowed: bool + oco_allowed: bool + is_spot_trading_allowed: bool + is_margin_trading_allowed: bool + + filters: List[str] + permissions: List[str] + + +@dataclass +class OHLC: + """Description of the flattened OHLC quote format. + + For schema details see: + https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams + """ + start_time: int + end_time: int + symbol: str + interval: str + first_id: int + last_id: int + open: float + high: float + low: float + close: float + base_vol: float + num_trades: int + closed: bool + quote_vol: float + buy_base_vol: float + buy_quote_vol: float + + +class Client: + + def __init__(self) -> None: + self._sesh = asks.Session(connections=4) + self._sesh.base_location = _url + + async def _api( + self, + method: str, + data: dict, + ) -> Dict[str, Any]: + resp = await self._sesh.post( + path=f'/api/v3/{method}', + params=data, + timeout=float('inf') + ) + return resproc(resp, log) + + async def symbol_info( + self, + sym: Optional[str] = None + ): + resp = await self._api('exchangeInfo', {}) + if sym is not None: + return [ + sym_info + for sym_info in resp['symbols'] + if sym_info['symbol'] == sym + ] + else: + return resp['symbols'] + + async def bars( + self, + symbol: str = 'BTCUSDT', + start_time: int = None, + end_time: int = None, + limit: int = 1000, # <- max allowed per query + as_np: bool = True, + ) -> dict: + if start_time is None: + start_time = int(arrow.utcnow().floor('minute').shift( + minutes=-limit).format('x')) + + if end_time is None: + end_time = int(arrow.utcnow().format('x')) + + json = await self._api( + 'klines', + { + 'symbol': symbol, + 'interval': '1m', + 'startTime': start_time, + 'endTime': end_time, + 'limit': limit + } + ) + + bars = next(iter(json)) + array = np.array(bars, dtype=_ohlc_dtype) if as_np else bars + return array + + +@asynccontextmanager +async def get_client() -> Client: + yield Client() + + +async def stream_messages(ws): + + too_slow_count = last_hb = 0 + + while True: + + with trio.move_on_after(5) as cs: + msg = await ws.recv_msg() + + breakpoint() + + +def normalize( + ohlc: OHLC, +) -> dict: + quote = asdict(ohlc) + quote['broker_ts'] = quote['start_time'] + quote['brokerd_ts'] = time.time() + quote['last'] = quote['close'] + + # print(quote) + return topic, quote + + +def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: + """Create a request subscription packet dict. + + https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams + """ + return { + 'method': 'SUBSCRIBE', + 'params': [ + f'{pair}@{sub_name}' + for pair in pairs + ], + 'id': uid + } + + +class AutoReconWs: + """Make ``trio_websocketw` sockets stay up no matter the bs. + + """ + recon_errors = ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, + ) + + def __init__( + self, + url: str, + stack: AsyncExitStack, + serializer: ModuleType = json, + ): + self.url = url + self._stack = stack + self._ws: 'WebSocketConnection' = None # noqa + + async def _connect( + self, + tries: int = 10000, + ) -> None: + try: + await self._stack.aclose() + except (DisconnectionTimeout, RuntimeError): + await trio.sleep(1) + + last_err = None + for i in range(tries): + try: + self._ws = await self._stack.enter_async_context( + trio_websocket.open_websocket_url(self.url) + ) + log.info(f'Connection success: {self.url}') + return + except self.recon_errors as err: + last_err = err + log.error( + f'{self} connection bail with ' + f'{type(err)}...retry attempt {i}' + ) + await trio.sleep(1) + continue + else: + log.exception('ws connection fail...') + raise last_err + + async def send_msg( + self, + data: Any, + ) -> None: + while True: + try: + return await self._ws.send_message(json.dumps(data)) + except self.recon_errors: + await self._connect() + + async def recv_msg( + self, + ) -> Any: + while True: + try: + return json.loads(await self._ws.get_message()) + except self.recon_errors: + await self._connect() + + +@asynccontextmanager +async def open_autorecon_ws(url): + """Apparently we can QoS for all sorts of reasons..so catch em. + + """ + async with AsyncExitStack() as stack: + ws = AutoReconWs(url, stack) + + await ws._connect() + try: + yield ws + + finally: + await stack.aclose() + + +async def backfill_bars( + sym: str, + shm: ShmArray, # type: ignore # noqa + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + """Fill historical bars into shared mem / storage afap. + """ + with trio.CancelScope() as cs: + async with open_cached_client('binance') as client: + bars = await client.bars(symbol=sym) + shm.push(bars) + task_status.started(cs) + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: List[str], + shm: ShmArray, + feed_is_live: trio.Event, + loglevel: str = None, + + # startup sync + task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + """Subscribe for ohlc stream of quotes for ``pairs``. + + ``pairs`` must be formatted . + """ + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + sym_infos = {} + uid = 0 + + async with open_cached_client('binance') as client, send_chan as send_chan: + + # keep client cached for real-time section + for sym in symbols: + syminfo = Pair(*await client.symbol_info(sym)) # validation + sym_infos[sym] = syminfo.dict() + + symbol = symbols[0] + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + symbol: { + 'symbol_info': sym_infos[sym], + 'shm_write_opts': {'sum_tick_vml': False}, + }, + } + + async with open_autorecon_ws('wss://stream.binance.com:9443') as ws: + + # XXX: setup subs + ohlc_sub = make_sub(symbols, 'kline_1m', uid) + uid += 1 + + await ws.send_msg(ohlc_sub) + + # trade data (aka L1) + l1_sub = make_sub(symbols, 'trade', uid) + uid += 1 + + await ws.send_msg(l1_sub) + + # pull a first quote and deliver + msg_gen = stream_messages(ws) + + # TODO: use ``anext()`` when it lands in 3.10! + typ, ohlc_last = await msg_gen.__anext__() + + topic, quote = normalize(ohlc_last) + + first_quote = {topic: quote} + task_status.started((init_msgs, first_quote)) + + # lol, only "closes" when they're margin squeezing clients ;P + feed_is_live.set() + + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.end_time + + # start streaming + async for typ, ohlc in msg_gen: + ... + # if typ == 'ohlc': + + # # TODO: can get rid of all this by using + # # ``trades`` subscription... + + # # generate tick values to match time & sales pane: + # # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + # volume = ohlc.volume + + # # new OHLC sample interval + # if ohlc.etime > last_interval_start: + # last_interval_start = ohlc.etime + # tick_volume = volume + + # else: + # # this is the tick volume *within the interval* + # tick_volume = volume - ohlc_last.volume + + # ohlc_last = ohlc + # last = ohlc.close + + # if tick_volume: + # ohlc.ticks.append({ + # 'type': 'trade', + # 'price': last, + # 'size': tick_volume, + # }) + + # topic, quote = normalize(ohlc) + + # elif typ == 'l1': + # quote = ohlc + # topic = quote['symbol'] + + # # XXX: format required by ``tractor.msg.pub`` + # # requires a ``Dict[topic: str, quote: dict]`` + # await send_chan.send({topic: quote}) From 7e493625f69dddd3c70a48fea1df030fe7b48670 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Wed, 5 May 2021 10:17:52 -0300 Subject: [PATCH 2/5] Finally backfilling is working, still need to work on realtime updates! --- piker/brokers/binance.py | 114 +++++++++++++++++++++++++++------------ 1 file changed, 79 insertions(+), 35 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 1eac6edb..371425ee 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -21,7 +21,7 @@ Binance backend from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field from types import ModuleType -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple, Union, Optional import json import time @@ -52,13 +52,14 @@ from ..data import ShmArray log = get_logger(__name__) -_url = 'https://api.binance.com/' +_url = 'https://api.binance.com' -# Broker specific ohlc schema -_ohlc_dtype = [ - ('kline_start_time', int), - ('kline_close_time', int), +# Broker specific ohlc schema (websocket) +_websocket_ohlc_dtype = [ + ('index', int), + ('time', int), + ('close_time', int), ('symbol', str), ('interval', str), ('first_trade_id', int), @@ -76,30 +77,52 @@ _ohlc_dtype = [ ('ignore', int) ] +# Broker specific ohlc schema (rest) +_ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('close_time', int), + ('quote_vol', float), + ('num_trades', int), + ('buy_base_vol', float), + ('buy_quote_vol', float), + ('ignore', float) +] + # UI components allow this to be declared such that additional # (historical) fields can be exposed. ohlc_dtype = np.dtype(_ohlc_dtype) +_show_wap_in_history = False # https://binance-docs.github.io/apidocs/spot/en/#exchange-information class Pair(BaseModel): symbol: str status: str - base_asset: str - base_precision: int - quote_asset: str - quote_precision: int - quote_asset_precision: int + baseAsset: str + baseAssetPrecision: int + quoteAsset: str + quotePrecision: int + quoteAssetPrecision: int - order_types: List[str] + baseCommissionPrecision: int + quoteCommissionPrecision: int - iceberg_allowed: bool - oco_allowed: bool - is_spot_trading_allowed: bool - is_margin_trading_allowed: bool + orderTypes: List[str] + + icebergAllowed: bool + ocoAllowed: bool + quoteOrderQtyMarketAllowed: bool + isSpotTradingAllowed: bool + isMarginTradingAllowed: bool - filters: List[str] + filters: List[Dict[str, Union[str, int, float]]] permissions: List[str] @@ -117,15 +140,21 @@ class OHLC: first_id: int last_id: int open: float + close: float high: float low: float - close: float base_vol: float num_trades: int closed: bool quote_vol: float buy_base_vol: float buy_quote_vol: float + ignore: int + + +# convert arrow timestamp to unixtime in miliseconds +def binance_timestamp(when): + return int((when.timestamp * 1000) + (when.microsecond / 1000)) class Client: @@ -139,7 +168,7 @@ class Client: method: str, data: dict, ) -> Dict[str, Any]: - resp = await self._sesh.post( + resp = await self._sesh.get( path=f'/api/v3/{method}', params=data, timeout=float('inf') @@ -152,11 +181,11 @@ class Client: ): resp = await self._api('exchangeInfo', {}) if sym is not None: - return [ - sym_info - for sym_info in resp['symbols'] - if sym_info['symbol'] == sym - ] + for sym_info in resp['symbols']: + if sym_info['symbol'] == sym: + return sym_info + else: + raise BrokerError(f'{sym} not found') else: return resp['symbols'] @@ -169,13 +198,16 @@ class Client: as_np: bool = True, ) -> dict: if start_time is None: - start_time = int(arrow.utcnow().floor('minute').shift( - minutes=-limit).format('x')) + start_time = binance_timestamp( + arrow.utcnow() + .floor('minute') + .shift(minutes=-limit) + ) if end_time is None: - end_time = int(arrow.utcnow().format('x')) + end_time = binance_timestamp(arrow.utcnow()) - json = await self._api( + bars = await self._api( 'klines', { 'symbol': symbol, @@ -186,8 +218,14 @@ class Client: } ) - bars = next(iter(json)) - array = np.array(bars, dtype=_ohlc_dtype) if as_np else bars + new_bars = [ + (i,) + tuple( + ftype(bar[j]) + for j, (name, ftype) in enumerate(_ohlc_dtype[1:]) + ) for i, bar in enumerate(bars) + ] + + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array @@ -205,7 +243,9 @@ async def stream_messages(ws): with trio.move_on_after(5) as cs: msg = await ws.recv_msg() - breakpoint() + if msg.get('e') == 'kline': + + yield 'ohlc', OHLC(*msg['k'].values()) def normalize( @@ -215,9 +255,10 @@ def normalize( quote['broker_ts'] = quote['start_time'] quote['brokerd_ts'] = time.time() quote['last'] = quote['close'] + quote['time'] = quote['start_time'] # print(quote) - return topic, quote + return ohlc.symbol, quote def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: @@ -228,7 +269,7 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: return { 'method': 'SUBSCRIBE', 'params': [ - f'{pair}@{sub_name}' + f'{pair.lower()}@{sub_name}' for pair in pairs ], 'id': uid @@ -362,7 +403,8 @@ async def stream_quotes( # keep client cached for real-time section for sym in symbols: - syminfo = Pair(*await client.symbol_info(sym)) # validation + d = await client.symbol_info(sym) + syminfo = Pair(**d) # validation sym_infos[sym] = syminfo.dict() symbol = symbols[0] @@ -376,19 +418,21 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://stream.binance.com:9443') as ws: + async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: # XXX: setup subs ohlc_sub = make_sub(symbols, 'kline_1m', uid) uid += 1 await ws.send_msg(ohlc_sub) + res = await ws.recv_msg() # trade data (aka L1) l1_sub = make_sub(symbols, 'trade', uid) uid += 1 await ws.send_msg(l1_sub) + res = await ws.recv_msg() # pull a first quote and deliver msg_gen = stream_messages(ws) From 604e195bc06fbfafde65d817fc4934a35d32deec Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Fri, 7 May 2021 10:59:08 -0300 Subject: [PATCH 3/5] Got rid of websocket OHLC API, and added l1 tick streaming --- piker/brokers/binance.py | 122 +++++++++------------------------------ 1 file changed, 27 insertions(+), 95 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 371425ee..a56677ca 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -55,28 +55,6 @@ log = get_logger(__name__) _url = 'https://api.binance.com' -# Broker specific ohlc schema (websocket) -_websocket_ohlc_dtype = [ - ('index', int), - ('time', int), - ('close_time', int), - ('symbol', str), - ('interval', str), - ('first_trade_id', int), - ('last_trade_id', int), - ('open', float), - ('close', float), - ('high', float), - ('low', float), - ('volume', float), - ('num_trades', int), - ('closed', bool), - ('quote_asset_volume', float), - ('taker_buy_base_asset_volume', float), - ('taker_buy_quote_asset_volume', float), - ('ignore', int) -] - # Broker specific ohlc schema (rest) _ohlc_dtype = [ ('index', int), @@ -150,6 +128,8 @@ class OHLC: buy_base_vol: float buy_quote_vol: float ignore: int + # (sampled) generated tick data + ticks: List[Any] = field(default_factory=list) # convert arrow timestamp to unixtime in miliseconds @@ -235,30 +215,29 @@ async def get_client() -> Client: async def stream_messages(ws): - - too_slow_count = last_hb = 0 - while True: with trio.move_on_after(5) as cs: msg = await ws.recv_msg() - if msg.get('e') == 'kline': + # for l1 streams binance doesn't add an event type field so + # identify those messages by matching keys + if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']: + sym = msg['s'] + bid = float(msg['b']) + bsize = float(msg['B']) + ask = float(msg['a']) + asize = float(msg['A']) - yield 'ohlc', OHLC(*msg['k'].values()) - - -def normalize( - ohlc: OHLC, -) -> dict: - quote = asdict(ohlc) - quote['broker_ts'] = quote['start_time'] - quote['brokerd_ts'] = time.time() - quote['last'] = quote['close'] - quote['time'] = quote['start_time'] - - # print(quote) - return ohlc.symbol, quote + yield 'l1', { + 'symbol': sym, + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize} + ] + } def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: @@ -389,10 +368,6 @@ async def stream_quotes( task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - """Subscribe for ohlc stream of quotes for ``pairs``. - - ``pairs`` must be formatted . - """ # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) @@ -421,14 +396,9 @@ async def stream_quotes( async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: # XXX: setup subs - ohlc_sub = make_sub(symbols, 'kline_1m', uid) - uid += 1 - - await ws.send_msg(ohlc_sub) - res = await ws.recv_msg() # trade data (aka L1) - l1_sub = make_sub(symbols, 'trade', uid) + l1_sub = make_sub(symbols, 'bookTicker', uid) uid += 1 await ws.send_msg(l1_sub) @@ -438,56 +408,18 @@ async def stream_quotes( msg_gen = stream_messages(ws) # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() + typ, tick = await msg_gen.__anext__() - topic, quote = normalize(ohlc_last) - - first_quote = {topic: quote} + first_quote = {tick['symbol']: tick} task_status.started((init_msgs, first_quote)) - # lol, only "closes" when they're margin squeezing clients ;P feed_is_live.set() - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.end_time - # start streaming - async for typ, ohlc in msg_gen: - ... - # if typ == 'ohlc': + async for typ, msg in msg_gen: - # # TODO: can get rid of all this by using - # # ``trades`` subscription... + if typ == 'l1': + topic = msg['symbol'] + quote = msg - # # generate tick values to match time & sales pane: - # # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - # volume = ohlc.volume - - # # new OHLC sample interval - # if ohlc.etime > last_interval_start: - # last_interval_start = ohlc.etime - # tick_volume = volume - - # else: - # # this is the tick volume *within the interval* - # tick_volume = volume - ohlc_last.volume - - # ohlc_last = ohlc - # last = ohlc.close - - # if tick_volume: - # ohlc.ticks.append({ - # 'type': 'trade', - # 'price': last, - # 'size': tick_volume, - # }) - - # topic, quote = normalize(ohlc) - - # elif typ == 'l1': - # quote = ohlc - # topic = quote['symbol'] - - # # XXX: format required by ``tractor.msg.pub`` - # # requires a ``Dict[topic: str, quote: dict]`` - # await send_chan.send({topic: quote}) + await send_chan.send({topic: quote}) From 7b26bd45e245296e4213706340d6eac91edd47f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 May 2021 15:25:18 -0400 Subject: [PATCH 4/5] Get binance OHLC history and quote format correct This gets the binance provider meeting the data feed schema requirements of both the OHLC sampling/charting machinery as well as proper formatting of historical OHLC history. Notably, - spec a minimal ohlc dtype based on the kline endpoint - use a dataclass to parse out OHLC bar datums and pack into np.ndarray/shm - add the ``aggTrade`` endpoint to get last clearing (traded) prices, validate with ``pydantic`` and then normalize these into our tick-quote format for delivery over the feed stream api. - a notable requirement is that the "first" quote from the feed must contain a 'last` field so the clearing system can start up correctly. --- piker/brokers/binance.py | 188 +++++++++++++++++++++++++++++---------- 1 file changed, 140 insertions(+), 48 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index a56677ca..e956f8c7 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -19,7 +19,6 @@ Binance backend """ from contextlib import asynccontextmanager, AsyncExitStack -from dataclasses import asdict, field from types import ModuleType from typing import List, Dict, Any, Tuple, Union, Optional import json @@ -45,7 +44,7 @@ from pydantic import BaseModel from .api import open_cached_client -from ._util import resproc, SymbolNotFound, BrokerError +from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray @@ -64,12 +63,14 @@ _ohlc_dtype = [ ('low', float), ('close', float), ('volume', float), - ('close_time', int), - ('quote_vol', float), - ('num_trades', int), - ('buy_base_vol', float), - ('buy_quote_vol', float), - ('ignore', float) + # XXX: don't need these in shm history right? + # ('close_time', int), + # ('quote_vol', float), + # ('num_trades', int), + # ('buy_base_vol', float), + # ('buy_quote_vol', float), + # ('ignore', float), + ('bar_wap', float), # will be zeroed by sampler if not filled ] # UI components allow this to be declared such that additional @@ -78,9 +79,10 @@ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = False + # https://binance-docs.github.io/apidocs/spot/en/#exchange-information class Pair(BaseModel): - symbol: str + symbol: str status: str baseAsset: str @@ -99,42 +101,67 @@ class Pair(BaseModel): quoteOrderQtyMarketAllowed: bool isSpotTradingAllowed: bool isMarginTradingAllowed: bool - + filters: List[Dict[str, Union[str, int, float]]] permissions: List[str] +# TODO: this isn't being used yet right? @dataclass class OHLC: """Description of the flattened OHLC quote format. For schema details see: - https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams + https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams + + documented format: + ``` + [ + [ + 1499040000000, // Open time + "0.01634790", // Open + "0.80000000", // High + "0.01575800", // Low + "0.01577100", // Close + "148976.11427815", // Volume + 1499644799999, // Close time + "2434.19055334", // Quote asset volume + 308, // Number of trades + "1756.87402397", // Taker buy base asset volume + "28.46694368", // Taker buy quote asset volume + "17928899.62484339" // Ignore. + ] + ] + ``` + """ - start_time: int - end_time: int - symbol: str - interval: str - first_id: int - last_id: int + time: int + open: float - close: float high: float low: float - base_vol: float - num_trades: int - closed: bool + close: float + volume: float + + close_time: int + quote_vol: float + num_trades: int buy_base_vol: float buy_quote_vol: float ignore: int + + # null the place holder for `bar_wap` until we + # figure out what to extract for this. + bar_wap: float = 0.0 + # (sampled) generated tick data - ticks: List[Any] = field(default_factory=list) + # ticks: List[Any] = field(default_factory=list) # convert arrow timestamp to unixtime in miliseconds def binance_timestamp(when): - return int((when.timestamp * 1000) + (when.microsecond / 1000)) + return int((when.timestamp() * 1000) + (when.microsecond / 1000)) class Client: @@ -158,14 +185,16 @@ class Client: async def symbol_info( self, sym: Optional[str] = None - ): + + ) -> dict: + resp = await self._api('exchangeInfo', {}) if sym is not None: for sym_info in resp['symbols']: if sym_info['symbol'] == sym: return sym_info else: - raise BrokerError(f'{sym} not found') + raise SymbolNotFound(f'{sym} not found') else: return resp['symbols'] @@ -176,17 +205,18 @@ class Client: end_time: int = None, limit: int = 1000, # <- max allowed per query as_np: bool = True, + ) -> dict: + if start_time is None: start_time = binance_timestamp( - arrow.utcnow() - .floor('minute') - .shift(minutes=-limit) + arrow.utcnow().floor('minute').shift(minutes=-limit) ) - + if end_time is None: end_time = binance_timestamp(arrow.utcnow()) + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data bars = await self._api( 'klines', { @@ -198,12 +228,29 @@ class Client: } ) - new_bars = [ - (i,) + tuple( - ftype(bar[j]) - for j, (name, ftype) in enumerate(_ohlc_dtype[1:]) - ) for i, bar in enumerate(bars) - ] + # TODO: pack this bars scheme into a ``pydantic`` validator type: + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data + + # TODO: we should port this to ``pydantic`` to avoid doing + # manual validation ourselves.. + new_bars = [] + for i, bar in enumerate(bars): + + bar = OHLC(*bar) + + row = [] + for j, (name, ftype) in enumerate(_ohlc_dtype[1:]): + + # TODO: maybe we should go nanoseconds on all + # history time stamps? + if name == 'time': + # convert to epoch seconds: float + row.append(bar.time / 1000.0) + + else: + row.append(getattr(bar, name)) + + new_bars.append((i,) + tuple(row)) array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array @@ -214,15 +261,32 @@ async def get_client() -> Client: yield Client() +# validation type +class AggTrade(BaseModel): + e: str # "aggTrade", # Event type + E: int # 123456789, # Event time + s: str # "BNBBTC", # Symbol + a: int # 12345, # Aggregate trade ID + p: float # "0.001", # Price + q: float # "100", # Quantity + f: int # 100, # First trade ID + l: int # 105, # Last trade ID + T: int # 123456785, # Trade time + m: bool # true, # Is the buyer the market maker? + M: bool # true # Ignore + + async def stream_messages(ws): while True: - with trio.move_on_after(5) as cs: + with trio.move_on_after(5): msg = await ws.recv_msg() # for l1 streams binance doesn't add an event type field so # identify those messages by matching keys - if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']: + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + + if msg.get('u'): sym = msg['s'] bid = float(msg['b']) bsize = float(msg['B']) @@ -239,6 +303,25 @@ async def stream_messages(ws): ] } + elif msg.get('e') == 'aggTrade': + + # validate + msg = AggTrade(**msg) + + # TODO: type out and require this quote format + # from all backends! + yield 'trade', { + 'symbol': msg.s, + 'last': msg.p, + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': msg.p, + 'size': msg.q, + 'broker_ts': msg.T, + }], + } + def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: """Create a request subscription packet dict. @@ -395,31 +478,40 @@ async def stream_quotes( async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: - # XXX: setup subs + # setup subs # trade data (aka L1) + # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker l1_sub = make_sub(symbols, 'bookTicker', uid) - uid += 1 - await ws.send_msg(l1_sub) + + # aggregate (each order clear by taker **not** by maker) + # trades data: + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + agg_trades_sub = make_sub(symbols, 'aggTrade', uid) + await ws.send_msg(agg_trades_sub) + + # ack from ws server res = await ws.recv_msg() + assert res['id'] == uid # pull a first quote and deliver msg_gen = stream_messages(ws) - # TODO: use ``anext()`` when it lands in 3.10! - typ, tick = await msg_gen.__anext__() + typ, quote = await msg_gen.__anext__() - first_quote = {tick['symbol']: tick} + while typ != 'trade': + # TODO: use ``anext()`` when it lands in 3.10! + typ, quote = await msg_gen.__anext__() + + first_quote = {quote['symbol']: quote} task_status.started((init_msgs, first_quote)) + # signal to caller feed is ready for consumption feed_is_live.set() # start streaming async for typ, msg in msg_gen: - if typ == 'l1': - topic = msg['symbol'] - quote = msg - - await send_chan.send({topic: quote}) + topic = msg['symbol'] + await send_chan.send({topic: msg}) From a0dfdd935f7480a178479c4f266c4fc73e2a4f30 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Fri, 21 May 2021 22:23:35 -0300 Subject: [PATCH 5/5] Post @goodboy review commit --- piker/brokers/binance.py | 53 ++++++++++++---------------------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index e956f8c7..d6a647ac 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -63,14 +63,17 @@ _ohlc_dtype = [ ('low', float), ('close', float), ('volume', float), - # XXX: don't need these in shm history right? + ('bar_wap', float), # will be zeroed by sampler if not filled + + # XXX: some additional fields are defined in the docs: + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data + # ('close_time', int), # ('quote_vol', float), # ('num_trades', int), # ('buy_base_vol', float), # ('buy_quote_vol', float), # ('ignore', float), - ('bar_wap', float), # will be zeroed by sampler if not filled ] # UI components allow this to be declared such that additional @@ -106,7 +109,6 @@ class Pair(BaseModel): permissions: List[str] -# TODO: this isn't being used yet right? @dataclass class OHLC: """Description of the flattened OHLC quote format. @@ -114,26 +116,6 @@ class OHLC: For schema details see: https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams - documented format: - ``` - [ - [ - 1499040000000, // Open time - "0.01634790", // Open - "0.80000000", // High - "0.01575800", // Low - "0.01577100", // Close - "148976.11427815", // Volume - 1499644799999, // Close time - "2434.19055334", // Quote asset volume - 308, // Number of trades - "1756.87402397", // Taker buy base asset volume - "28.46694368", // Taker buy quote asset volume - "17928899.62484339" // Ignore. - ] - ] - ``` - """ time: int @@ -155,9 +137,6 @@ class OHLC: # figure out what to extract for this. bar_wap: float = 0.0 - # (sampled) generated tick data - # ticks: List[Any] = field(default_factory=list) - # convert arrow timestamp to unixtime in miliseconds def binance_timestamp(when): @@ -263,17 +242,17 @@ async def get_client() -> Client: # validation type class AggTrade(BaseModel): - e: str # "aggTrade", # Event type - E: int # 123456789, # Event time - s: str # "BNBBTC", # Symbol - a: int # 12345, # Aggregate trade ID - p: float # "0.001", # Price - q: float # "100", # Quantity - f: int # 100, # First trade ID - l: int # 105, # Last trade ID - T: int # 123456785, # Trade time - m: bool # true, # Is the buyer the market maker? - M: bool # true # Ignore + e: str # Event type + E: int # Event time + s: str # Symbol + a: int # Aggregate trade ID + p: float # Price + q: float # Quantity + f: int # First trade ID + l: int # Last trade ID + T: int # Trade time + m: bool # Is the buyer the market maker? + M: bool # Ignore async def stream_messages(ws):