From dac93dd8f823a2b40c04a4513d0a92ec021e8f59 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 10 Jun 2023 18:25:22 -0400 Subject: [PATCH] Support USD-M futes live feeds and exchange info Add the usd-futes "Pair" type and thus ability to load all exchange (info for) contracts settled in USDT. Luckily we don't seem to have to modify anything in the `Client` interface (yet) other then a new `.mkt_mode: str` which determines which endpoint set to make requests. Obviously data received from endpoints will likely need diff handling as per below. Deats: - add a bunch more API and WSS top level domains to `.api` with comments - start a `.binance.schemas` module to house the structs for loading different `Pair` subtypes depending on target market: `SpotPair`, `FutesPair`, .. etc. and implement required `MktPair` fields on the new futes type for compatibility with the clearing layer. - add `Client.mkt_mode: str` and a method lookup for endpoint parent paths depending on market via `.mkt_req: dict` Also related to live feeds, - drop `Struct` typecasting instead opting for specific fields both for speed and simplicity atm. - breakout `subscribe()` into module level acm from being embedded closure. - for now swap over the ws feed to be strictly the futes ep (while testing) and set the `.mkt_mode = 'usd_futes'`. - hack in `Client._pairs` to only load `FutesPair`s until we figure out whether we want separate `Client` instances per market or not.. --- piker/brokers/binance/api.py | 222 ++++++++++++++++++------------- piker/brokers/binance/broker.py | 2 - piker/brokers/binance/feed.py | 139 ++++++++++--------- piker/brokers/binance/schemas.py | 114 ++++++++++++++++ 4 files changed, 324 insertions(+), 153 deletions(-) create mode 100644 piker/brokers/binance/schemas.py diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 9ab9f835..fd0a0c82 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -27,10 +27,10 @@ from contextlib import ( asynccontextmanager as acm, ) from datetime import datetime -from decimal import Decimal from typing import ( Any, - Union, + Callable, + Literal, ) import hmac import hashlib @@ -52,6 +52,10 @@ from piker.brokers._util import ( SymbolNotFound, get_logger, ) +from .schemas import ( + SpotPair, + FutesPair, +) log = get_logger('piker.brokers.binance') @@ -74,9 +78,26 @@ def get_config() -> dict: log = get_logger(__name__) -_url = 'https://api.binance.com' -_sapi_url = 'https://api.binance.com' -_fapi_url = 'https://testnet.binancefuture.com' +_domain: str = 'binance.com' +_spot_url = _url = f'https://api.{_domain}' +_futes_url = f'https://fapi.{_domain}' + +# test nets +_testnet_futes_url = 'https://testnet.binancefuture.com' + +# WEBsocketz +# NOTE XXX: see api docs which show diff addr? +# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information +_spot_ws: str = 'wss://stream.binance.com/ws' +# 'wss://ws-api.binance.com:443/ws-api/v3', + +# NOTE: spot test network only allows certain ep sets: +# https://testnet.binance.vision/ +_testnet_spot_ws: str = 'wss://testnet.binance.vision/ws-api/v3' + +# https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams +_futes_ws: str = f'wss://fstream.{_domain}/ws/' +_auth_futes_ws: str = 'wss://fstream-auth.{_domain}/ws/' # Broker specific ohlc schema (rest) @@ -92,61 +113,6 @@ _fapi_url = 'https://testnet.binancefuture.com' # ('ignore', float), # ] -# UI components allow this to be declared such that additional -# (historical) fields can be exposed. -# ohlc_dtype = np.dtype(_ohlc_dtype) - -_show_wap_in_history = False - - -# https://binance-docs.github.io/apidocs/spot/en/#exchange-information - -# TODO: make this frozen again by pre-processing the -# filters list to a dict at init time? -class Pair(Struct, frozen=True): - symbol: str - status: str - - baseAsset: str - baseAssetPrecision: int - cancelReplaceAllowed: bool - allowTrailingStop: bool - quoteAsset: str - quotePrecision: int - quoteAssetPrecision: int - - baseCommissionPrecision: int - quoteCommissionPrecision: int - - orderTypes: list[str] - - icebergAllowed: bool - ocoAllowed: bool - quoteOrderQtyMarketAllowed: bool - isSpotTradingAllowed: bool - isMarginTradingAllowed: bool - - defaultSelfTradePreventionMode: str - allowedSelfTradePreventionModes: list[str] - - filters: dict[ - str, - Union[str, int, float] - ] - permissions: list[str] - - @property - def price_tick(self) -> Decimal: - # XXX: lul, after manually inspecting the response format we - # just directly pick out the info we need - step_size: str = self.filters['PRICE_FILTER']['tickSize'].rstrip('0') - return Decimal(step_size) - - @property - def size_tick(self) -> Decimal: - step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0') - return Decimal(step_size) - class OHLC(Struct): ''' @@ -184,24 +150,43 @@ def binance_timestamp( return int((when.timestamp() * 1000) + (when.microsecond / 1000)) -class Client: +MarketType: Literal[ + 'spot', + 'margin', + 'usd_futes', + 'coin_futes', +] - def __init__(self) -> None: + +class Client: + ''' + Async ReST API client using ``trio`` + ``asks`` B) + + Supports all of the spot, margin and futures endpoints depending + on method. + + ''' + def __init__( + self, + mkt_mode: MarketType = 'spot', + ) -> None: self._pairs: dict[str, Pair] = {} # mkt info table - # live EP sesh + # spot EPs sesh self._sesh = asks.Session(connections=4) self._sesh.base_location: str = _url - # futes testnet rest EPs - self._fapi_sesh = asks.Session(connections=4) - self._fapi_sesh.base_location = _fapi_url - - # sync rest API + # margin and extended spot endpoints session. self._sapi_sesh = asks.Session(connections=4) - self._sapi_sesh.base_location = _sapi_url + self._sapi_sesh.base_location: str = _url + # futes EPs sesh + self._fapi_sesh = asks.Session(connections=4) + self._fapi_sesh.base_location: str = _futes_url + + # for creating API keys see, + # https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072 conf: dict = get_config() self.api_key: str = conf.get('api_key', '') self.api_secret: str = conf.get('api_secret', '') @@ -211,8 +196,16 @@ class Client: if self.api_key: api_key_header = {'X-MBX-APIKEY': self.api_key} self._sesh.headers.update(api_key_header) - self._fapi_sesh.headers.update(api_key_header) self._sapi_sesh.headers.update(api_key_header) + self._fapi_sesh.headers.update(api_key_header) + + self.mkt_mode: MarketType = mkt_mode + self.mkt_req: dict[str, Callable] = { + 'spot': self._api, + 'margin': self._sapi, + 'usd_futes': self._fapi, + # 'futes_coin': self._dapi, # TODO + } def _get_signature(self, data: OrderedDict) -> str: @@ -235,6 +228,9 @@ class Client: ) return msg_auth.hexdigest() + # TODO: factor all these _api methods into a single impl + # which looks up the parent path for eps depending on a + # mkt_mode: MarketType input! async def _api( self, method: str, @@ -243,7 +239,15 @@ class Client: action: str = 'get' ) -> dict[str, Any]: + ''' + Make a /api/v3/ SPOT account/market endpoint request. + For eg. rest market-data and spot-account-trade eps use + this endpoing parent path: + - https://binance-docs.github.io/apidocs/spot/en/#market-data-endpoints + - https://binance-docs.github.io/apidocs/spot/en/#spot-account-trade + + ''' if signed: params['signature'] = self._get_signature(params) @@ -258,11 +262,19 @@ class Client: async def _fapi( self, method: str, - params: Union[dict, OrderedDict], + params: dict | OrderedDict, signed: bool = False, action: str = 'get' - ) -> dict[str, Any]: + ) -> dict[str, Any]: + ''' + Make a /fapi/v3/ USD-M FUTURES account/market endpoint + request. + + For all USD-M futures endpoints use this parent path: + https://binance-docs.github.io/apidocs/futures/en/#market-data-endpoints + + ''' if signed: params['signature'] = self._get_signature(params) @@ -277,11 +289,21 @@ class Client: async def _sapi( self, method: str, - params: Union[dict, OrderedDict], + params: dict | OrderedDict, signed: bool = False, action: str = 'get' - ) -> dict[str, Any]: + ) -> dict[str, Any]: + ''' + Make a /api/v3/ SPOT/MARGIN account/market endpoint request. + + For eg. all margin and advancecd spot account eps use this + endpoing parent path: + - https://binance-docs.github.io/apidocs/spot/en/#margin-account-trade + - https://binance-docs.github.io/apidocs/spot/en/#listen-key-spot + - https://binance-docs.github.io/apidocs/spot/en/#spot-algo-endpoints + + ''' if signed: params['signature'] = self._get_signature(params) @@ -297,10 +319,19 @@ class Client: self, sym: str | None = None, + mkt_type: MarketType = 'spot', + ) -> dict[str, Pair] | Pair: ''' - Fresh exchange-pairs info query for symbol ``sym: str``: - https://binance-docs.github.io/apidocs/spot/en/#exchange-information + Fresh exchange-pairs info query for symbol ``sym: str``. + + Depending on `mkt_type` different api eps are used: + - spot: + https://binance-docs.github.io/apidocs/spot/en/#exchange-information + - usd futes: + https://binance-docs.github.io/apidocs/futures/en/#check-server-time + - coin futes: + https://binance-docs.github.io/apidocs/delivery/en/#exchange-information ''' cached_pair = self._pairs.get(sym) @@ -313,25 +344,33 @@ class Client: sym = sym.lower() params = {'symbol': sym} - resp = await self._api('exchangeInfo', params=params) + resp = await self.mkt_req[self.mkt_mode]('exchangeInfo', params=params) entries = resp['symbols'] if not entries: raise SymbolNotFound(f'{sym} not found:\n{resp}') - # pre-process .filters field into a table + # import tractor + # await tractor.breakpoint() pairs = {} for item in entries: symbol = item['symbol'] - filters = {} - filters_ls: list = item.pop('filters') - for entry in filters_ls: - ftype = entry['filterType'] - filters[ftype] = entry - pairs[symbol] = Pair( - filters=filters, - **item, - ) + # for spot mkts, pre-process .filters field into + # a table.. + filters_ls: list = item.pop('filters', False) + if filters_ls: + filters = {} + for entry in filters_ls: + ftype = entry['filterType'] + filters[ftype] = entry + + # TODO: lookup pair schema by mkt type + # pair_type = mkt_type + + # pairs[symbol] = SpotPair( + # filters=filters, + # ) + pairs[symbol] = FutesPair(**item) # pairs = { # item['symbol']: Pair(**item) for item in entries @@ -343,8 +382,6 @@ class Client: else: return self._pairs - symbol_info = exch_info - async def search_symbols( self, pattern: str, @@ -448,7 +485,8 @@ class Client: signed=True ) log.info(f'done. len {len(resp)}') - await trio.sleep(3) + + # await trio.sleep(3) return positions, volumes @@ -457,6 +495,8 @@ class Client: recv_window: int = 60000 ) -> list: + # TODO: can't we drop this since normal dicts are + # ordered implicitly in mordern python? params = OrderedDict([ ('recvWindow', recv_window), ('timestamp', binance_timestamp(now())) @@ -594,7 +634,7 @@ class Client: @acm async def get_client() -> Client: - client = Client() + client = Client(mkt_mode='usd_futes') log.info('Caching exchange infos..') await client.exch_info() yield client diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 53dd7a64..d2edbd9a 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -184,5 +184,3 @@ async def trades_dialogue( breakpoint() await ems_stream.send(msg.dict()) - - diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index 9ecda184..83e7bee0 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -24,11 +24,13 @@ from contextlib import ( aclosing, ) from datetime import datetime +from functools import partial import itertools from typing import ( Any, AsyncGenerator, Callable, + Generator, ) import time @@ -59,11 +61,12 @@ from piker.data._web_bs import ( from piker.brokers._util import ( DataUnavailable, get_logger, - get_console_log, ) from .api import ( Client, +) +from .schemas import ( Pair, ) @@ -94,7 +97,7 @@ class AggTrade(Struct, frozen=True): l: int # noqa Last trade ID T: int # Trade time m: bool # Is the buyer the market maker? - M: bool # Ignore + M: bool | None = None # Ignore async def stream_messages( @@ -134,7 +137,9 @@ async def stream_messages( ask=ask, asize=asize, ) - l1.typecast() + # for speed probably better to only specifically + # cast fields we need in numerical form? + # l1.typecast() # repack into piker's tick-quote format yield 'l1', { @@ -142,23 +147,23 @@ async def stream_messages( 'ticks': [ { 'type': 'bid', - 'price': l1.bid, - 'size': l1.bsize, + 'price': float(l1.bid), + 'size': float(l1.bsize), }, { 'type': 'bsize', - 'price': l1.bid, - 'size': l1.bsize, + 'price': float(l1.bid), + 'size': float(l1.bsize), }, { 'type': 'ask', - 'price': l1.ask, - 'size': l1.asize, + 'price': float(l1.ask), + 'size': float(l1.asize), }, { 'type': 'asize', - 'price': l1.ask, - 'size': l1.asize, + 'price': float(l1.ask), + 'size': float(l1.asize), } ] } @@ -281,6 +286,56 @@ async def get_mkt_info( return both +@acm +async def subscribe( + ws: NoBsWs, + symbols: list[str], + + # defined once at import time to keep a global state B) + iter_subids: Generator[int, None, None] = itertools.count(), + +): + # setup subs + + subid: int = next(iter_subids) + + # trade data (aka L1) + # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker + l1_sub = make_sub(symbols, 'bookTicker', subid) + await ws.send_msg(l1_sub) + + # aggregate (each order clear by taker **not** by maker) + # trades data: + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + agg_trades_sub = make_sub(symbols, 'aggTrade', subid) + await ws.send_msg(agg_trades_sub) + + # might get ack from ws server, or maybe some + # other msg still in transit.. + res = await ws.recv_msg() + subid: str | None = res.get('id') + if subid: + assert res['id'] == subid + + yield + + subs = [] + for sym in symbols: + subs.append("{sym}@aggTrade") + subs.append("{sym}@bookTicker") + + # unsub from all pairs on teardown + if ws.connected(): + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": subid, + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -292,8 +347,6 @@ async def stream_quotes( task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) async with ( send_chan as send_chan, @@ -307,57 +360,21 @@ async def stream_quotes( FeedInit(mkt_info=mkt) ) - iter_subids = itertools.count() - @acm - async def subscribe(ws: NoBsWs): - # setup subs - - subid: int = next(iter_subids) - - # trade data (aka L1) - # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker - l1_sub = make_sub(symbols, 'bookTicker', subid) - await ws.send_msg(l1_sub) - - # aggregate (each order clear by taker **not** by maker) - # trades data: - # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams - agg_trades_sub = make_sub(symbols, 'aggTrade', subid) - await ws.send_msg(agg_trades_sub) - - # might get ack from ws server, or maybe some - # other msg still in transit.. - res = await ws.recv_msg() - subid: str | None = res.get('id') - if subid: - assert res['id'] == subid - - yield - - subs = [] - for sym in symbols: - subs.append("{sym}@aggTrade") - subs.append("{sym}@bookTicker") - - # unsub from all pairs on teardown - if ws.connected(): - await ws.send_msg({ - "method": "UNSUBSCRIBE", - "params": subs, - "id": subid, - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() + # TODO: detect whether futes or spot contact was requested + from .api import ( + _futes_ws, + # _spot_ws, + ) + wsep: str = _futes_ws async with ( open_autorecon_ws( - # XXX: see api docs which show diff addr? - # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information - # 'wss://ws-api.binance.com:443/ws-api/v3', - 'wss://stream.binance.com/ws', - fixture=subscribe, + wsep, + fixture=partial( + subscribe, + symbols=symbols, + ), ) as ws, # avoid stream-gen closure from breaking trio.. @@ -387,6 +404,8 @@ async def stream_quotes( topic = msg['symbol'].lower() await send_chan.send({topic: msg}) # last = time.time() + + @tractor.context async def open_symbol_search( ctx: tractor.Context, diff --git a/piker/brokers/binance/schemas.py b/piker/brokers/binance/schemas.py new file mode 100644 index 00000000..df072ba1 --- /dev/null +++ b/piker/brokers/binance/schemas.py @@ -0,0 +1,114 @@ +# piker: trading gear for hackers +# Copyright (C) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Per market data-type definitions and schemas types. + +""" +from decimal import Decimal + +from piker.data.types import Struct + +class Pair(Struct, frozen=True): + symbol: str + status: str + orderTypes: list[str] + + # src + quoteAsset: str + quotePrecision: int + + # dst + baseAsset: str + baseAssetPrecision: int + + +class SpotPair(Struct, frozen=True): + + cancelReplaceAllowed: bool + allowTrailingStop: bool + quoteAssetPrecision: int + + baseCommissionPrecision: int + quoteCommissionPrecision: int + + icebergAllowed: bool + ocoAllowed: bool + quoteOrderQtyMarketAllowed: bool + isSpotTradingAllowed: bool + isMarginTradingAllowed: bool + + defaultSelfTradePreventionMode: str + allowedSelfTradePreventionModes: list[str] + + filters: dict[ + str, + str | int | float, + ] + permissions: list[str] + + @property + def price_tick(self) -> Decimal: + # XXX: lul, after manually inspecting the response format we + # just directly pick out the info we need + step_size: str = self.filters['PRICE_FILTER']['tickSize'].rstrip('0') + return Decimal(step_size) + + @property + def size_tick(self) -> Decimal: + step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0') + return Decimal(step_size) + + +class FutesPair(Pair): + symbol: str # 'BTCUSDT', + pair: str # 'BTCUSDT', + baseAssetPrecision: int # 8, + contractType: str # 'PERPETUAL', + deliveryDate: int # 4133404800000, + liquidationFee: float # '0.012500', + maintMarginPercent: float # '2.5000', + marginAsset: str # 'USDT', + marketTakeBound: float # '0.05', + maxMoveOrderLimit: int # 10000, + onboardDate: int # 1569398400000, + pricePrecision: int # 2, + quantityPrecision: int # 3, + quoteAsset: str # 'USDT', + quotePrecision: int # 8, + requiredMarginPercent: float # '5.0000', + settlePlan: int # 0, + timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], + triggerProtect: float # '0.0500', + underlyingSubType: list[str] # ['PoW'], + underlyingType: str # 'COIN' + + # NOTE: for compat with spot pairs and `MktPair.src: Asset` + # processing.. + @property + def quoteAssetPrecision(self) -> int: + return self.quotePrecision + + @property + def price_tick(self) -> Decimal: + return Decimal(self.pricePrecision) + + @property + def size_tick(self) -> Decimal: + return Decimal(self.quantityPrecision)