From ab1463d9429b0e8e9708a0dfdbc461cad1cf4b5e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Jun 2024 09:41:23 -0400 Subject: [PATCH] Port binance to `httpx` Like other backends use the `AsyncClient` for all venue specific client-sessions but change to allocating them inside `get_client()` using an `AsyncExitStack` and inserting directly in the `Client.venue_sesh: dict` table during init. Supporting impl tweaks: - remove most of the API client session building logic and instead make `Client.__init__()` take in a `venue_sessions: dict` (set it to `.venue_sesh`) and `conf: dict`, instead opting to do the http client configuration inside `get_client()` since all that code only needs to be run once. |_load config inside `get_client()` once. |_move session token creation into a new util func `init_api_keys()` and also call it from `get_client()` factory; toss in an ex. toml section config to the doc string. - define `_venue_urls: dict[str, str]` (content taken from old static `.venue_sesh` dict) at module level and feed them as `base_url: str` inputs to the client create loop. - adjust all call sigs in httpx-sesh-using methods, namely just `._api()`. - do a `.exch_info()` call in `get_client()` to cache the symbology set. Unrelated changes for various other outstanding buggers: - to get futures feeds correctly loading when selected from search (like 'XMRUSDT.USDTM.PERP'), expect a `MktPair` input to `Client.bars()` such that the exact venue-key can be looked up (via a new `.pair2venuekey()` meth) and then passed to `._api()`. - adjust `.broker.open_trade_dialog()` to failover to paper engine when there's no `api_key` key set for the `subconf` venue-key. --- piker/brokers/binance/api.py | 312 ++++++++++++++++++++------------ piker/brokers/binance/broker.py | 11 +- piker/brokers/binance/feed.py | 8 +- 3 files changed, 212 insertions(+), 119 deletions(-) diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 616be6b3..2d1c4ee6 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -1,8 +1,8 @@ # piker: trading gear for hackers # Copyright (C) -# Guillermo Rodriguez (aka ze jefe) -# Tyler Goodlet -# (in stewardship for pikers) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -25,14 +25,13 @@ from __future__ import annotations from collections import ChainMap from contextlib import ( asynccontextmanager as acm, + AsyncExitStack, ) from datetime import datetime from pprint import pformat from typing import ( Any, Callable, - Hashable, - Sequence, Type, ) import hmac @@ -43,8 +42,7 @@ import trio from pendulum import ( now, ) -import asks -from rapidfuzz import process as fuzzy +import httpx import numpy as np from piker import config @@ -54,6 +52,7 @@ from piker.clearing._messages import ( from piker.accounting import ( Asset, digits_to_dec, + MktPair, ) from piker.types import Struct from piker.data import ( @@ -69,7 +68,6 @@ from .venues import ( PAIRTYPES, Pair, MarketType, - _spot_url, _futes_url, _testnet_futes_url, @@ -79,19 +77,18 @@ from .venues import ( log = get_logger('piker.brokers.binance') -def get_config() -> dict: - +def get_config() -> dict[str, Any]: conf: dict path: Path conf, path = config.load( conf_name='brokers', touch_if_dne=True, ) - - section = conf.get('binance') - + section: dict = conf.get('binance') if not section: - log.warning(f'No config section found for binance in {path}') + log.warning( + f'No config section found for binance in {path}' + ) return {} return section @@ -147,7 +144,7 @@ def binance_timestamp( class Client: ''' - Async ReST API client using ``trio`` + ``asks`` B) + Async ReST API client using `trio` + `httpx` B) Supports all of the spot, margin and futures endpoints depending on method. @@ -156,10 +153,17 @@ class Client: def __init__( self, + venue_sessions: dict[ + str, # venue key + tuple[httpx.AsyncClient, str] # session, eps path + ], + conf: dict[str, Any], # TODO: change this to `Client.[mkt_]venue: MarketType`? mkt_mode: MarketType = 'spot', ) -> None: + self.conf = conf + # build out pair info tables for each market type # and wrap in a chain-map view for search / query. self._spot_pairs: dict[str, Pair] = {} # spot info table @@ -186,44 +190,13 @@ class Client: # market symbols for use by search. See `.exch_info()`. self._pairs: ChainMap[str, Pair] = ChainMap() - # spot EPs sesh - self._sesh = asks.Session(connections=4) - self._sesh.base_location: str = _spot_url - # spot testnet - self._test_sesh: asks.Session = asks.Session(connections=4) - self._test_sesh.base_location: str = _testnet_spot_url - - # margin and extended spot endpoints session. - self._sapi_sesh = asks.Session(connections=4) - self._sapi_sesh.base_location: str = _spot_url - - # futes EPs sesh - self._fapi_sesh = asks.Session(connections=4) - self._fapi_sesh.base_location: str = _futes_url - # futes testnet - self._test_fapi_sesh: asks.Session = asks.Session(connections=4) - self._test_fapi_sesh.base_location: str = _testnet_futes_url - # global client "venue selection" mode. # set this when you want to switch venues and not have to # specify the venue for the next request. self.mkt_mode: MarketType = mkt_mode - # per 8 - self.venue_sesh: dict[ - str, # venue key - tuple[asks.Session, str] # session, eps path - ] = { - 'spot': (self._sesh, '/api/v3/'), - 'spot_testnet': (self._test_sesh, '/fapi/v1/'), - - 'margin': (self._sapi_sesh, '/sapi/v1/'), - - 'usdtm_futes': (self._fapi_sesh, '/fapi/v1/'), - 'usdtm_futes_testnet': (self._test_fapi_sesh, '/fapi/v1/'), - - # 'futes_coin': self._dapi, # TODO - } + # per-mkt-venue API client table + self.venue_sesh = venue_sessions # lookup for going from `.mkt_mode: str` to the config # subsection `key: str` @@ -238,40 +211,6 @@ class Client: 'futes': ['usdtm_futes'], } - # for creating API keys see, - # https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072 - self.conf: dict = get_config() - - for key, subconf in self.conf.items(): - if api_key := subconf.get('api_key', ''): - venue_keys: list[str] = self.confkey2venuekeys[key] - - venue_key: str - sesh: asks.Session - for venue_key in venue_keys: - sesh, _ = self.venue_sesh[venue_key] - - api_key_header: dict = { - # taken from official: - # https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47 - "Content-Type": "application/json;charset=utf-8", - - # TODO: prolly should just always query and copy - # in the real latest ver? - "User-Agent": "binance-connector/6.1.6smbz6", - "X-MBX-APIKEY": api_key, - } - sesh.headers.update(api_key_header) - - # if `.use_tesnet = true` in the config then - # also add headers for the testnet session which - # will be used for all order control - if subconf.get('use_testnet', False): - testnet_sesh, _ = self.venue_sesh[ - venue_key + '_testnet' - ] - testnet_sesh.headers.update(api_key_header) - def _mk_sig( self, data: dict, @@ -290,7 +229,6 @@ class Client: 'to define the creds for auth-ed endpoints!?' ) - # XXX: Info on security and authentification # https://binance-docs.github.io/apidocs/#endpoint-security-type if not (api_secret := subconf.get('api_secret')): @@ -319,7 +257,7 @@ class Client: params: dict, method: str = 'get', - venue: str | None = None, # if None use `.mkt_mode` state + venue: str|None = None, # if None use `.mkt_mode` state signed: bool = False, allow_testnet: bool = False, @@ -330,8 +268,9 @@ class Client: - /fapi/v3/ USD-M FUTURES, or - /api/v3/ SPOT/MARGIN - account/market endpoint request depending on either passed in `venue: str` - or the current setting `.mkt_mode: str` setting, default `'spot'`. + account/market endpoint request depending on either passed in + `venue: str` or the current setting `.mkt_mode: str` setting, + default `'spot'`. Docs per venue API: @@ -360,9 +299,6 @@ class Client: venue=venue_key, ) - sesh: asks.Session - path: str - # Check if we're configured to route order requests to the # venue equivalent's testnet. use_testnet: bool = False @@ -387,11 +323,12 @@ class Client: # ctl machinery B) venue_key += '_testnet' - sesh, path = self.venue_sesh[venue_key] - - meth: Callable = getattr(sesh, method) + client: httpx.AsyncClient + path: str + client, path = self.venue_sesh[venue_key] + meth: Callable = getattr(client, method) resp = await meth( - path=path + endpoint, + url=path + endpoint, params=params, timeout=float('inf'), ) @@ -436,7 +373,11 @@ class Client: try: pair: Pair = pair_type(**item) except Exception as e: - e.add_note(f'\nDon\'t panic, check out this https://binance-docs.github.io/apidocs/spot/en/#exchange-information') + e.add_note( + "\nDon't panic, prolly stupid binance changed their symbology schema again..\n" + 'Check out their API docs here:\n\n' + 'https://binance-docs.github.io/apidocs/spot/en/#exchange-information' + ) raise pair_table[pair.symbol.upper()] = pair @@ -532,7 +473,9 @@ class Client: ''' pair_table: dict[str, Pair] = self._venue2pairs[ - venue or self.mkt_mode + venue + or + self.mkt_mode ] if ( expiry @@ -551,9 +494,9 @@ class Client: venues: list[str] = [venue] # batch per-venue download of all exchange infos - async with trio.open_nursery() as rn: + async with trio.open_nursery() as tn: for ven in venues: - rn.start_soon( + tn.start_soon( self._cache_pairs, ven, ) @@ -606,11 +549,11 @@ class Client: ) -> dict[str, Any]: - fq_pairs: dict = await self.exch_info() + fq_pairs: dict[str, Pair] = await self.exch_info() # TODO: cache this list like we were in # `open_symbol_search()`? - keys: list[str] = list(fq_pairs) + # keys: list[str] = list(fq_pairs) return match_from_pairs( pairs=fq_pairs, @@ -618,9 +561,19 @@ class Client: score_cutoff=50, ) + def pair2venuekey( + self, + pair: Pair, + ) -> str: + return { + 'USDTM': 'usdtm_futes', + # 'COINM': 'coin_futes', + # ^-TODO-^ bc someone might want it..? + }[pair.venue] + async def bars( self, - symbol: str, + mkt: MktPair, start_dt: datetime | None = None, end_dt: datetime | None = None, @@ -650,16 +603,20 @@ class Client: start_time = binance_timestamp(start_dt) end_time = binance_timestamp(end_dt) + bs_pair: Pair = self._pairs[mkt.bs_fqme.upper()] + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data bars = await self._api( 'klines', params={ - 'symbol': symbol.upper(), + # NOTE: always query using their native symbology! + 'symbol': mkt.bs_mktid.upper(), 'interval': '1m', 'startTime': start_time, 'endTime': end_time, 'limit': limit }, + venue=self.pair2venuekey(bs_pair), allow_testnet=False, ) new_bars: list[tuple] = [] @@ -976,17 +933,148 @@ class Client: await self.close_listen_key(key) +_venue_urls: dict[str, str] = { + 'spot': ( + _spot_url, + '/api/v3/', + ), + 'spot_testnet': ( + _testnet_spot_url, + '/fapi/v1/' + ), + # margin and extended spot endpoints session. + # TODO: did this ever get implemented fully? + # 'margin': ( + # _spot_url, + # '/sapi/v1/' + # ), + + 'usdtm_futes': ( + _futes_url, + '/fapi/v1/', + ), + + 'usdtm_futes_testnet': ( + _testnet_futes_url, + '/fapi/v1/', + ), + + # TODO: for anyone who actually needs it ;P + # 'coin_futes': () +} + + +def init_api_keys( + client: Client, + conf: dict[str, Any], +) -> None: + ''' + Set up per-venue API keys each http client according to the user's + `brokers.conf`. + + For ex, to use spot-testnet and live usdt futures APIs: + + ```toml + [binance] + # spot test net + spot.use_testnet = true + spot.api_key = '' + spot.api_secret = '' + + # futes live + futes.use_testnet = false + accounts.usdtm = 'futes' + futes.api_key = '' + futes.api_secret = ''' + + # if uncommented will use the built-in paper engine and not + # connect to `binance` API servers for order ctl. + # accounts.paper = 'paper' + ``` + + ''' + for key, subconf in conf.items(): + if api_key := subconf.get('api_key', ''): + venue_keys: list[str] = client.confkey2venuekeys[key] + + venue_key: str + client: httpx.AsyncClient + for venue_key in venue_keys: + client, _ = client.venue_sesh[venue_key] + + api_key_header: dict = { + # taken from official: + # https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47 + "Content-Type": "application/json;charset=utf-8", + + # TODO: prolly should just always query and copy + # in the real latest ver? + "User-Agent": "binance-connector/6.1.6smbz6", + "X-MBX-APIKEY": api_key, + } + client.headers.update(api_key_header) + + # if `.use_tesnet = true` in the config then + # also add headers for the testnet session which + # will be used for all order control + if subconf.get('use_testnet', False): + testnet_sesh, _ = client.venue_sesh[ + venue_key + '_testnet' + ] + testnet_sesh.headers.update(api_key_header) + + @acm -async def get_client() -> Client: +async def get_client( + mkt_mode: MarketType = 'spot', +) -> Client: + ''' + Construct an single `piker` client which composes multiple underlying venue + specific API clients both for live and test networks. - client = Client() - await client.exch_info() - log.info( - f'{client} in {client.mkt_mode} mode: caching exchange infos..\n' - 'Cached multi-market pairs:\n' - f'spot: {len(client._spot_pairs)}\n' - f'usdtm_futes: {len(client._ufutes_pairs)}\n' - f'Total: {len(client._pairs)}\n' - ) + ''' + venue_sessions: dict[ + str, # venue key + tuple[httpx.AsyncClient, str] # session, eps path + ] = {} + async with AsyncExitStack() as client_stack: + for name, (base_url, path) in _venue_urls.items(): + api: httpx.AsyncClient = await client_stack.enter_async_context( + httpx.AsyncClient( + base_url=base_url, + # headers={}, - yield client + # TODO: is there a way to numerate this? + # https://www.python-httpx.org/advanced/clients/#why-use-a-client + # connections=4 + ) + ) + venue_sessions[name] = ( + api, + path, + ) + + conf: dict[str, Any] = get_config() + # for creating API keys see, + # https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072 + client = Client( + venue_sessions=venue_sessions, + conf=conf, + mkt_mode=mkt_mode, + ) + init_api_keys( + client=client, + conf=conf, + ) + fq_pairs: dict[str, Pair] = await client.exch_info() + assert fq_pairs + log.info( + f'Loaded multi-venue `Client` in mkt_mode={client.mkt_mode!r}\n\n' + f'Symbology Summary:\n' + f'------ - ------\n' + f'spot: {len(client._spot_pairs)}\n' + f'usdtm_futes: {len(client._ufutes_pairs)}\n' + '------ - ------\n' + f'total: {len(client._pairs)}\n' + ) + yield client diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index ff6a2ff5..a13ce38f 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -264,15 +264,20 @@ async def open_trade_dialog( # do a open_symcache() call.. though maybe we can hide # this in a new async version of open_account()? async with open_cached_client('binance') as client: - subconf: dict = client.conf[venue_name] - use_testnet = subconf.get('use_testnet', False) + subconf: dict|None = client.conf.get(venue_name) # XXX: if no futes.api_key or spot.api_key has been set we # always fall back to the paper engine! - if not subconf.get('api_key'): + if ( + not subconf + or + not subconf.get('api_key') + ): await ctx.started('paper') return + use_testnet: bool = subconf.get('use_testnet', False) + async with ( open_cached_client('binance') as client, ): diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index 9e36a626..3a242e02 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -253,15 +253,15 @@ async def open_history_client( else: client.mkt_mode = 'spot' - # NOTE: always query using their native symbology! - mktid: str = mkt.bs_mktid array: np.ndarray = await client.bars( - mktid, + mkt=mkt, start_dt=start_dt, end_dt=end_dt, ) if array.size == 0: - raise NoData('No frame for {start_dt} -> {end_dt}\n') + raise NoData( + f'No frame for {start_dt} -> {end_dt}\n' + ) times = array['time'] if not times.any():