From c6d1007e663087d382690ae6dcb50d91792be448 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 16 Jun 2023 20:43:07 -0400 Subject: [PATCH] Load `Asset`s during echange info queries Since we need them for accounting and since we can get them directly from the usdtm futes `exchangeInfo` ep, just preload all asset info that we can during initial `Pair` caching. Cache the asset infos inside a new per venue `Client._venues2assets: dict[str, dict[str, Asset | None]]` and mostly be pedantic with the spot asset list for now since futes seems much smaller and doesn't include transaction precision info. Further: - load a testnet http session if `binance.use_testnet.futes = true`. - add testnet support for all non-data endpoints. - hardcode user stream methods to work for usdtm futes for the moment. - add logging around order request calls. --- piker/brokers/binance/api.py | 231 +++++++++++++++++++++++++---------- 1 file changed, 166 insertions(+), 65 deletions(-) diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 0c88142b..39a3e711 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -30,6 +30,7 @@ from contextlib import ( asynccontextmanager as acm, ) from datetime import datetime +from pprint import pformat from typing import ( Any, Callable, @@ -48,6 +49,10 @@ from fuzzywuzzy import process as fuzzy import numpy as np from piker import config +from piker.accounting import ( + Asset, + digits_to_dec, +) from piker.data.types import Struct from piker.data import def_iohlcv_fields from piker.brokers._util import ( @@ -59,8 +64,11 @@ from .venues import ( PAIRTYPES, Pair, MarketType, + _spot_url, _futes_url, + + _testnet_futes_url, ) log = get_logger('piker.brokers.binance') @@ -144,15 +152,27 @@ class Client: mkt_mode: MarketType = 'spot', ) -> None: - # build out pair info tables for each market type # and wrap in a chain-map view for search / query. self._spot_pairs: dict[str, Pair] = {} # spot info table self._ufutes_pairs: dict[str, Pair] = {} # usd-futures table - self._mkt2pairs: dict[str, dict] = { + self._venue2pairs: dict[str, dict] = { 'spot': self._spot_pairs, 'usdtm_futes': self._ufutes_pairs, } + + self._venue2assets: dict[ + str, + dict[str, dict] | None, + ] = { + # NOTE: only the spot table contains a dict[str, Asset] + # since others (like futes, opts) can just do lookups + # from a list of names to the spot equivalent. + 'spot': {}, + 'usdtm_futes': {}, + # 'coinm_futes': {}, + } + # NOTE: only stick in the spot table for now until exchange info # is loaded, since at that point we'll suffix all the futes # market symbols for use by search. See `.exch_info()`. @@ -177,15 +197,32 @@ class Client: self.api_key: str = conf.get('api_key', '') self.api_secret: str = conf.get('api_secret', '') + self.use_testnet: bool = conf.get('use_testnet', False) + + if self.use_testnet: + self._test_fapi_sesh = asks.Session(connections=4) + self._test_fapi_sesh.base_location: str = _testnet_futes_url self.watchlist = conf.get('watchlist', []) if self.api_key: - api_key_header = {'X-MBX-APIKEY': self.api_key} + api_key_header: dict = { + # taken from official: + # https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47 + "Content-Type": "application/json;charset=utf-8", + + # TODO: prolly should just always query and copy + # in the real latest ver? + "User-Agent": "binance-connector/6.1.6smbz6", + "X-MBX-APIKEY": self.api_key, + } self._sesh.headers.update(api_key_header) self._sapi_sesh.headers.update(api_key_header) self._fapi_sesh.headers.update(api_key_header) + if self.use_testnet: + self._test_fapi_sesh.headers.update(api_key_header) + self.mkt_mode: MarketType = mkt_mode self.mkt_mode_req: dict[str, Callable] = { 'spot': self._api, @@ -204,11 +241,12 @@ class Client: "Can't generate a signature without setting up credentials" ) - query_str = '&'.join([ - f'{_key}={value}' - for _key, value in data.items()]) + query_str: str = '&'.join([ + f'{key}={value}' + for key, value in data.items() + ]) - log.info(query_str) + # log.info(query_str) msg_auth = hmac.new( self.api_secret.encode('utf-8'), @@ -253,7 +291,8 @@ class Client: method: str, params: dict | OrderedDict, signed: bool = False, - action: str = 'get' + action: str = 'get', + testnet: bool = True, ) -> dict[str, Any]: ''' @@ -267,7 +306,23 @@ class Client: if signed: params['signature'] = self._mk_sig(params) - resp = await getattr(self._fapi_sesh, action)( + # NOTE: only use testnet if user set brokers.toml config + # var to true **and** it's not one of the market data + # endpoints since we basically never want to display the + # test net feeds, we only are using it for testing order + # ctl machinery B) + if ( + self.use_testnet + and method not in { + 'klines', + 'exchangeInfo', + } + ): + meth = getattr(self._test_fapi_sesh, action) + else: + meth = getattr(self._fapi_sesh, action) + + resp = await meth( path=f'/fapi/v1/{method}', params=params, timeout=float('inf') @@ -306,22 +361,28 @@ class Client: async def _cache_pairs( self, - mkt_type: str, + venue: str, ) -> None: # lookup internal mkt-specific pair table to update - pair_table: dict[str, Pair] = self._mkt2pairs[mkt_type] + pair_table: dict[str, Pair] = self._venue2pairs[venue] + asset_table: dict[str, Asset] = self._venue2assets[venue] # make API request(s) - resp = await self.mkt_mode_req[mkt_type]( + resp = await self.mkt_mode_req[venue]( 'exchangeInfo', params={}, # NOTE: retrieve all symbols by default ) - entries = resp['symbols'] - if not entries: + mkt_pairs = resp['symbols'] + if not mkt_pairs: raise SymbolNotFound(f'No market pairs found!?:\n{resp}') - for item in entries: + pairs_view_subtable: dict[str, Pair] = {} + # if venue == 'spot': + # import tractor + # await tractor.breakpoint() + + for item in mkt_pairs: filters_ls: list = item.pop('filters', False) if filters_ls: filters = {} @@ -331,15 +392,50 @@ class Client: item['filters'] = filters - pair_type: Type = PAIRTYPES[mkt_type] + pair_type: Type = PAIRTYPES[venue] pair: Pair = pair_type(**item) pair_table[pair.symbol.upper()] = pair + # update an additional top-level-cross-venue-table + # `._pairs: ChainMap` for search B0 + pairs_view_subtable[pair.bs_fqme] = pair + + if venue == 'spot': + if (name := pair.quoteAsset) not in asset_table: + asset_table[name] = Asset( + name=name, + atype='crypto_currency', + tx_tick=digits_to_dec(pair.quoteAssetPrecision), + ) + + if (name := pair.baseAsset) not in asset_table: + asset_table[name] = Asset( + name=name, + atype='crypto_currency', + tx_tick=digits_to_dec(pair.baseAssetPrecision), + ) + + # NOTE: make merged view of all market-type pairs but + # use market specific `Pair.bs_fqme` for keys! + # this allows searching for market pairs with different + # suffixes easily, for ex. `BTCUSDT.USDTM.PERP` will show + # up when a user uses the search endpoint with pattern + # `btc` B) + self._pairs.maps.append(pairs_view_subtable) + + if venue == 'spot': + return + + assets: list[dict] = resp.get('assets', ()) + for entry in assets: + name: str = entry['asset'] + asset_table[name] = self._venue2assets['spot'].get(name) + async def exch_info( self, sym: str | None = None, - mkt_type: MarketType | None = None, + venue: MarketType | None = None, ) -> dict[str, Pair] | Pair: ''' @@ -354,46 +450,34 @@ class Client: https://binance-docs.github.io/apidocs/delivery/en/#exchange-information ''' - pair_table: dict[str, Pair] = self._mkt2pairs[ - mkt_type or self.mkt_mode + pair_table: dict[str, Pair] = self._venue2pairs[ + venue or self.mkt_mode ] if cached_pair := pair_table.get(sym): return cached_pair - # params = {} - # if sym is not None: - # params = {'symbol': sym} - - mkts: list[str] = ['spot', 'usdtm_futes'] - if mkt_type: - mkts: list[str] = [mkt_type] + venues: list[str] = ['spot', 'usdtm_futes'] + if venue: + venues: list[str] = [venue] + # batch per-venue download of all exchange infos async with trio.open_nursery() as rn: - for mkt_type in mkts: + for ven in venues: rn.start_soon( self._cache_pairs, - mkt_type, + ven, ) - # make merged view of all market-type pairs but - # use market specific `Pair.bs_fqme` for keys! - for venue, venue_pairs_table in self._mkt2pairs.items(): - self._pairs.maps.append( - {pair.bs_fqme: pair - for pair in venue_pairs_table.values()} - ) - return pair_table[sym] if sym else self._pairs + # TODO: unused except by `brokers.core.search_symbols()`? async def search_symbols( self, pattern: str, limit: int = None, + ) -> dict[str, Any]: - # if self._spot_pairs is not None: - # data = self._spot_pairs - # else: fq_pairs: dict = await self.exch_info() matches = fuzzy.extractBests( @@ -538,56 +622,60 @@ class Client: async def submit_limit( self, symbol: str, - side: str, # SELL / BUY + side: str, # sell / buy quantity: float, price: float, - # time_in_force: str = 'GTC', + oid: int | None = None, + tif: str = 'GTC', + recv_window: int = 60000 # iceberg_quantity: float | None = None, # order_resp_type: str | None = None, - recv_window: int = 60000 - ) -> int: - symbol = symbol.upper() + ) -> str: + ''' + Submit a live limit order to ze binance. - await self.cache_symbols() - - # asset_precision = self._spot_pairs[symbol]['baseAssetPrecision'] - # quote_precision = self._pairs[symbol]['quoteAssetPrecision'] - - params = OrderedDict([ - ('symbol', symbol), + ''' + params: dict = OrderedDict([ + ('symbol', symbol.upper()), ('side', side.upper()), ('type', 'LIMIT'), - ('timeInForce', 'GTC'), + ('timeInForce', tif), ('quantity', quantity), ('price', price), ('recvWindow', recv_window), ('newOrderRespType', 'ACK'), ('timestamp', binance_timestamp(now())) ]) - if oid: params['newClientOrderId'] = oid + log.info( + 'Submitting ReST order request:\n' + f'{pformat(params)}' + ) resp = await self._api( 'order', params=params, signed=True, action='post' ) - log.info(resp) - # return resp['orderId'] - return resp['orderId'] + reqid: str = resp['orderId'] + if oid: + assert oid == reqid + + return reqid async def submit_cancel( self, symbol: str, oid: str, + recv_window: int = 60000 + ) -> None: symbol = symbol.upper() - params = OrderedDict([ ('symbol', symbol), ('orderId', oid), @@ -595,6 +683,10 @@ class Client: ('timestamp', binance_timestamp(now())) ]) + log.cancel( + 'Submitting ReST order cancel: {oid}\n' + f'{pformat(params)}' + ) return await self._api( 'order', params=params, @@ -603,22 +695,31 @@ class Client: ) async def get_listen_key(self) -> str: - return (await self._api( - 'userDataStream', + + # resp = await self._api( + resp = await self.mkt_mode_req[self.mkt_mode]( + # 'userDataStream', # spot + 'listenKey', params={}, - action='post' - ))['listenKey'] + action='post', + signed=True, + ) + return resp['listenKey'] async def keep_alive_key(self, listen_key: str) -> None: - await self._fapi( - 'userDataStream', + # await self._fapi( + await self.mkt_mode_req[self.mkt_mode]( + # 'userDataStream', + 'listenKey', params={'listenKey': listen_key}, action='put' ) async def close_listen_key(self, listen_key: str) -> None: - await self._fapi( - 'userDataStream', + # await self._fapi( + await self.mkt_mode_req[self.mkt_mode]( + # 'userDataStream', + 'listenKey', params={'listenKey': listen_key}, action='delete' )