Load `Asset`s during echange info queries

Since we need them for accounting and since we can get them directly
from the usdtm futes `exchangeInfo` ep, just preload all asset info that
we can during initial `Pair` caching. Cache the asset infos inside a new per venue
`Client._venues2assets: dict[str, dict[str, Asset | None]]` and mostly
be pedantic with the spot asset list for now since futes seems much
smaller and doesn't include transaction precision info.

Further:
- load a testnet http session if `binance.use_testnet.futes = true`.
- add testnet support for all non-data endpoints.
- hardcode user stream methods to work for usdtm futes for the moment.
- add logging around order request calls.
basic_buy_bot
Tyler Goodlet 2023-06-16 20:43:07 -04:00
parent 1bb7c9a2e4
commit c6d1007e66
1 changed files with 166 additions and 65 deletions

View File

@ -30,6 +30,7 @@ from contextlib import (
asynccontextmanager as acm,
)
from datetime import datetime
from pprint import pformat
from typing import (
Any,
Callable,
@ -48,6 +49,10 @@ from fuzzywuzzy import process as fuzzy
import numpy as np
from piker import config
from piker.accounting import (
Asset,
digits_to_dec,
)
from piker.data.types import Struct
from piker.data import def_iohlcv_fields
from piker.brokers._util import (
@ -59,8 +64,11 @@ from .venues import (
PAIRTYPES,
Pair,
MarketType,
_spot_url,
_futes_url,
_testnet_futes_url,
)
log = get_logger('piker.brokers.binance')
@ -144,15 +152,27 @@ class Client:
mkt_mode: MarketType = 'spot',
) -> None:
# build out pair info tables for each market type
# and wrap in a chain-map view for search / query.
self._spot_pairs: dict[str, Pair] = {} # spot info table
self._ufutes_pairs: dict[str, Pair] = {} # usd-futures table
self._mkt2pairs: dict[str, dict] = {
self._venue2pairs: dict[str, dict] = {
'spot': self._spot_pairs,
'usdtm_futes': self._ufutes_pairs,
}
self._venue2assets: dict[
str,
dict[str, dict] | None,
] = {
# NOTE: only the spot table contains a dict[str, Asset]
# since others (like futes, opts) can just do lookups
# from a list of names to the spot equivalent.
'spot': {},
'usdtm_futes': {},
# 'coinm_futes': {},
}
# NOTE: only stick in the spot table for now until exchange info
# is loaded, since at that point we'll suffix all the futes
# market symbols for use by search. See `.exch_info()`.
@ -177,15 +197,32 @@ class Client:
self.api_key: str = conf.get('api_key', '')
self.api_secret: str = conf.get('api_secret', '')
self.use_testnet: bool = conf.get('use_testnet', False)
if self.use_testnet:
self._test_fapi_sesh = asks.Session(connections=4)
self._test_fapi_sesh.base_location: str = _testnet_futes_url
self.watchlist = conf.get('watchlist', [])
if self.api_key:
api_key_header = {'X-MBX-APIKEY': self.api_key}
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": self.api_key,
}
self._sesh.headers.update(api_key_header)
self._sapi_sesh.headers.update(api_key_header)
self._fapi_sesh.headers.update(api_key_header)
if self.use_testnet:
self._test_fapi_sesh.headers.update(api_key_header)
self.mkt_mode: MarketType = mkt_mode
self.mkt_mode_req: dict[str, Callable] = {
'spot': self._api,
@ -204,11 +241,12 @@ class Client:
"Can't generate a signature without setting up credentials"
)
query_str = '&'.join([
f'{_key}={value}'
for _key, value in data.items()])
query_str: str = '&'.join([
f'{key}={value}'
for key, value in data.items()
])
log.info(query_str)
# log.info(query_str)
msg_auth = hmac.new(
self.api_secret.encode('utf-8'),
@ -253,7 +291,8 @@ class Client:
method: str,
params: dict | OrderedDict,
signed: bool = False,
action: str = 'get'
action: str = 'get',
testnet: bool = True,
) -> dict[str, Any]:
'''
@ -267,7 +306,23 @@ class Client:
if signed:
params['signature'] = self._mk_sig(params)
resp = await getattr(self._fapi_sesh, action)(
# NOTE: only use testnet if user set brokers.toml config
# var to true **and** it's not one of the market data
# endpoints since we basically never want to display the
# test net feeds, we only are using it for testing order
# ctl machinery B)
if (
self.use_testnet
and method not in {
'klines',
'exchangeInfo',
}
):
meth = getattr(self._test_fapi_sesh, action)
else:
meth = getattr(self._fapi_sesh, action)
resp = await meth(
path=f'/fapi/v1/{method}',
params=params,
timeout=float('inf')
@ -306,22 +361,28 @@ class Client:
async def _cache_pairs(
self,
mkt_type: str,
venue: str,
) -> None:
# lookup internal mkt-specific pair table to update
pair_table: dict[str, Pair] = self._mkt2pairs[mkt_type]
pair_table: dict[str, Pair] = self._venue2pairs[venue]
asset_table: dict[str, Asset] = self._venue2assets[venue]
# make API request(s)
resp = await self.mkt_mode_req[mkt_type](
resp = await self.mkt_mode_req[venue](
'exchangeInfo',
params={}, # NOTE: retrieve all symbols by default
)
entries = resp['symbols']
if not entries:
mkt_pairs = resp['symbols']
if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
for item in entries:
pairs_view_subtable: dict[str, Pair] = {}
# if venue == 'spot':
# import tractor
# await tractor.breakpoint()
for item in mkt_pairs:
filters_ls: list = item.pop('filters', False)
if filters_ls:
filters = {}
@ -331,15 +392,50 @@ class Client:
item['filters'] = filters
pair_type: Type = PAIRTYPES[mkt_type]
pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**item)
pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table
# `._pairs: ChainMap` for search B0
pairs_view_subtable[pair.bs_fqme] = pair
if venue == 'spot':
if (name := pair.quoteAsset) not in asset_table:
asset_table[name] = Asset(
name=name,
atype='crypto_currency',
tx_tick=digits_to_dec(pair.quoteAssetPrecision),
)
if (name := pair.baseAsset) not in asset_table:
asset_table[name] = Asset(
name=name,
atype='crypto_currency',
tx_tick=digits_to_dec(pair.baseAssetPrecision),
)
# NOTE: make merged view of all market-type pairs but
# use market specific `Pair.bs_fqme` for keys!
# this allows searching for market pairs with different
# suffixes easily, for ex. `BTCUSDT.USDTM.PERP` will show
# up when a user uses the search endpoint with pattern
# `btc` B)
self._pairs.maps.append(pairs_view_subtable)
if venue == 'spot':
return
assets: list[dict] = resp.get('assets', ())
for entry in assets:
name: str = entry['asset']
asset_table[name] = self._venue2assets['spot'].get(name)
async def exch_info(
self,
sym: str | None = None,
mkt_type: MarketType | None = None,
venue: MarketType | None = None,
) -> dict[str, Pair] | Pair:
'''
@ -354,46 +450,34 @@ class Client:
https://binance-docs.github.io/apidocs/delivery/en/#exchange-information
'''
pair_table: dict[str, Pair] = self._mkt2pairs[
mkt_type or self.mkt_mode
pair_table: dict[str, Pair] = self._venue2pairs[
venue or self.mkt_mode
]
if cached_pair := pair_table.get(sym):
return cached_pair
# params = {}
# if sym is not None:
# params = {'symbol': sym}
mkts: list[str] = ['spot', 'usdtm_futes']
if mkt_type:
mkts: list[str] = [mkt_type]
venues: list[str] = ['spot', 'usdtm_futes']
if venue:
venues: list[str] = [venue]
# batch per-venue download of all exchange infos
async with trio.open_nursery() as rn:
for mkt_type in mkts:
for ven in venues:
rn.start_soon(
self._cache_pairs,
mkt_type,
)
# make merged view of all market-type pairs but
# use market specific `Pair.bs_fqme` for keys!
for venue, venue_pairs_table in self._mkt2pairs.items():
self._pairs.maps.append(
{pair.bs_fqme: pair
for pair in venue_pairs_table.values()}
ven,
)
return pair_table[sym] if sym else self._pairs
# TODO: unused except by `brokers.core.search_symbols()`?
async def search_symbols(
self,
pattern: str,
limit: int = None,
) -> dict[str, Any]:
# if self._spot_pairs is not None:
# data = self._spot_pairs
# else:
fq_pairs: dict = await self.exch_info()
matches = fuzzy.extractBests(
@ -538,56 +622,60 @@ class Client:
async def submit_limit(
self,
symbol: str,
side: str, # SELL / BUY
side: str, # sell / buy
quantity: float,
price: float,
# time_in_force: str = 'GTC',
oid: int | None = None,
tif: str = 'GTC',
recv_window: int = 60000
# iceberg_quantity: float | None = None,
# order_resp_type: str | None = None,
recv_window: int = 60000
) -> int:
symbol = symbol.upper()
) -> str:
'''
Submit a live limit order to ze binance.
await self.cache_symbols()
# asset_precision = self._spot_pairs[symbol]['baseAssetPrecision']
# quote_precision = self._pairs[symbol]['quoteAssetPrecision']
params = OrderedDict([
('symbol', symbol),
'''
params: dict = OrderedDict([
('symbol', symbol.upper()),
('side', side.upper()),
('type', 'LIMIT'),
('timeInForce', 'GTC'),
('timeInForce', tif),
('quantity', quantity),
('price', price),
('recvWindow', recv_window),
('newOrderRespType', 'ACK'),
('timestamp', binance_timestamp(now()))
])
if oid:
params['newClientOrderId'] = oid
log.info(
'Submitting ReST order request:\n'
f'{pformat(params)}'
)
resp = await self._api(
'order',
params=params,
signed=True,
action='post'
)
log.info(resp)
# return resp['orderId']
return resp['orderId']
reqid: str = resp['orderId']
if oid:
assert oid == reqid
return reqid
async def submit_cancel(
self,
symbol: str,
oid: str,
recv_window: int = 60000
) -> None:
symbol = symbol.upper()
params = OrderedDict([
('symbol', symbol),
('orderId', oid),
@ -595,6 +683,10 @@ class Client:
('timestamp', binance_timestamp(now()))
])
log.cancel(
'Submitting ReST order cancel: {oid}\n'
f'{pformat(params)}'
)
return await self._api(
'order',
params=params,
@ -603,22 +695,31 @@ class Client:
)
async def get_listen_key(self) -> str:
return (await self._api(
'userDataStream',
# resp = await self._api(
resp = await self.mkt_mode_req[self.mkt_mode](
# 'userDataStream', # spot
'listenKey',
params={},
action='post'
))['listenKey']
action='post',
signed=True,
)
return resp['listenKey']
async def keep_alive_key(self, listen_key: str) -> None:
await self._fapi(
'userDataStream',
# await self._fapi(
await self.mkt_mode_req[self.mkt_mode](
# 'userDataStream',
'listenKey',
params={'listenKey': listen_key},
action='put'
)
async def close_listen_key(self, listen_key: str) -> None:
await self._fapi(
'userDataStream',
# await self._fapi(
await self.mkt_mode_req[self.mkt_mode](
# 'userDataStream',
'listenKey',
params={'listenKey': listen_key},
action='delete'
)