Support USD-M futes live feeds and exchange info

Add the usd-futes "Pair" type and thus ability to load all exchange
(info for) contracts settled in USDT. Luckily we don't seem to have to
modify anything in the `Client` interface (yet) other then a new
`.mkt_mode: str` which determines which endpoint set to make requests.
Obviously data received from endpoints will likely need diff handling as
per below.

Deats:
- add a bunch more API and WSS top level domains to `.api` with comments
- start a `.binance.schemas` module to house the structs for loading
  different `Pair` subtypes depending on target market: `SpotPair`,
  `FutesPair`, .. etc. and implement required `MktPair` fields on the
  new futes type for compatibility with the clearing layer.
- add `Client.mkt_mode: str` and a method lookup for endpoint parent
  paths depending on market via `.mkt_req: dict`

Also related to live feeds,
- drop `Struct` typecasting instead opting for specific fields both for
  speed and simplicity atm.
- breakout `subscribe()` into module level acm from being embedded
  closure.
- for now swap over the ws feed to be strictly the futes ep (while
  testing) and set the `.mkt_mode = 'usd_futes'`.
- hack in `Client._pairs` to only load `FutesPair`s until we figure out
  whether we want separate `Client` instances per market or not..
basic_buy_bot
Tyler Goodlet 2023-06-10 18:25:22 -04:00
parent ae1c5a0db0
commit dac93dd8f8
4 changed files with 324 additions and 153 deletions

View File

@ -27,10 +27,10 @@ from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from datetime import datetime from datetime import datetime
from decimal import Decimal
from typing import ( from typing import (
Any, Any,
Union, Callable,
Literal,
) )
import hmac import hmac
import hashlib import hashlib
@ -52,6 +52,10 @@ from piker.brokers._util import (
SymbolNotFound, SymbolNotFound,
get_logger, get_logger,
) )
from .schemas import (
SpotPair,
FutesPair,
)
log = get_logger('piker.brokers.binance') log = get_logger('piker.brokers.binance')
@ -74,9 +78,26 @@ def get_config() -> dict:
log = get_logger(__name__) log = get_logger(__name__)
_url = 'https://api.binance.com' _domain: str = 'binance.com'
_sapi_url = 'https://api.binance.com' _spot_url = _url = f'https://api.{_domain}'
_fapi_url = 'https://testnet.binancefuture.com' _futes_url = f'https://fapi.{_domain}'
# test nets
_testnet_futes_url = 'https://testnet.binancefuture.com'
# WEBsocketz
# NOTE XXX: see api docs which show diff addr?
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
_spot_ws: str = 'wss://stream.binance.com/ws'
# 'wss://ws-api.binance.com:443/ws-api/v3',
# NOTE: spot test network only allows certain ep sets:
# https://testnet.binance.vision/
_testnet_spot_ws: str = 'wss://testnet.binance.vision/ws-api/v3'
# https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
_futes_ws: str = f'wss://fstream.{_domain}/ws/'
_auth_futes_ws: str = 'wss://fstream-auth.{_domain}/ws/'
# Broker specific ohlc schema (rest) # Broker specific ohlc schema (rest)
@ -92,61 +113,6 @@ _fapi_url = 'https://testnet.binancefuture.com'
# ('ignore', float), # ('ignore', float),
# ] # ]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
# ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = False
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
# TODO: make this frozen again by pre-processing the
# filters list to a dict at init time?
class Pair(Struct, frozen=True):
symbol: str
status: str
baseAsset: str
baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str
quotePrecision: int
quoteAssetPrecision: int
baseCommissionPrecision: int
quoteCommissionPrecision: int
orderTypes: list[str]
icebergAllowed: bool
ocoAllowed: bool
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
filters: dict[
str,
Union[str, int, float]
]
permissions: list[str]
@property
def price_tick(self) -> Decimal:
# XXX: lul, after manually inspecting the response format we
# just directly pick out the info we need
step_size: str = self.filters['PRICE_FILTER']['tickSize'].rstrip('0')
return Decimal(step_size)
@property
def size_tick(self) -> Decimal:
step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0')
return Decimal(step_size)
class OHLC(Struct): class OHLC(Struct):
''' '''
@ -184,24 +150,43 @@ def binance_timestamp(
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
class Client: MarketType: Literal[
'spot',
'margin',
'usd_futes',
'coin_futes',
]
def __init__(self) -> None:
class Client:
'''
Async ReST API client using ``trio`` + ``asks`` B)
Supports all of the spot, margin and futures endpoints depending
on method.
'''
def __init__(
self,
mkt_mode: MarketType = 'spot',
) -> None:
self._pairs: dict[str, Pair] = {} # mkt info table self._pairs: dict[str, Pair] = {} # mkt info table
# live EP sesh # spot EPs sesh
self._sesh = asks.Session(connections=4) self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _url self._sesh.base_location: str = _url
# futes testnet rest EPs # margin and extended spot endpoints session.
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location = _fapi_url
# sync rest API
self._sapi_sesh = asks.Session(connections=4) self._sapi_sesh = asks.Session(connections=4)
self._sapi_sesh.base_location = _sapi_url self._sapi_sesh.base_location: str = _url
# futes EPs sesh
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location: str = _futes_url
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
conf: dict = get_config() conf: dict = get_config()
self.api_key: str = conf.get('api_key', '') self.api_key: str = conf.get('api_key', '')
self.api_secret: str = conf.get('api_secret', '') self.api_secret: str = conf.get('api_secret', '')
@ -211,8 +196,16 @@ class Client:
if self.api_key: if self.api_key:
api_key_header = {'X-MBX-APIKEY': self.api_key} api_key_header = {'X-MBX-APIKEY': self.api_key}
self._sesh.headers.update(api_key_header) self._sesh.headers.update(api_key_header)
self._fapi_sesh.headers.update(api_key_header)
self._sapi_sesh.headers.update(api_key_header) self._sapi_sesh.headers.update(api_key_header)
self._fapi_sesh.headers.update(api_key_header)
self.mkt_mode: MarketType = mkt_mode
self.mkt_req: dict[str, Callable] = {
'spot': self._api,
'margin': self._sapi,
'usd_futes': self._fapi,
# 'futes_coin': self._dapi, # TODO
}
def _get_signature(self, data: OrderedDict) -> str: def _get_signature(self, data: OrderedDict) -> str:
@ -235,6 +228,9 @@ class Client:
) )
return msg_auth.hexdigest() return msg_auth.hexdigest()
# TODO: factor all these _api methods into a single impl
# which looks up the parent path for eps depending on a
# mkt_mode: MarketType input!
async def _api( async def _api(
self, self,
method: str, method: str,
@ -243,7 +239,15 @@ class Client:
action: str = 'get' action: str = 'get'
) -> dict[str, Any]: ) -> dict[str, Any]:
'''
Make a /api/v3/ SPOT account/market endpoint request.
For eg. rest market-data and spot-account-trade eps use
this endpoing parent path:
- https://binance-docs.github.io/apidocs/spot/en/#market-data-endpoints
- https://binance-docs.github.io/apidocs/spot/en/#spot-account-trade
'''
if signed: if signed:
params['signature'] = self._get_signature(params) params['signature'] = self._get_signature(params)
@ -258,11 +262,19 @@ class Client:
async def _fapi( async def _fapi(
self, self,
method: str, method: str,
params: Union[dict, OrderedDict], params: dict | OrderedDict,
signed: bool = False, signed: bool = False,
action: str = 'get' action: str = 'get'
) -> dict[str, Any]:
) -> dict[str, Any]:
'''
Make a /fapi/v3/ USD-M FUTURES account/market endpoint
request.
For all USD-M futures endpoints use this parent path:
https://binance-docs.github.io/apidocs/futures/en/#market-data-endpoints
'''
if signed: if signed:
params['signature'] = self._get_signature(params) params['signature'] = self._get_signature(params)
@ -277,11 +289,21 @@ class Client:
async def _sapi( async def _sapi(
self, self,
method: str, method: str,
params: Union[dict, OrderedDict], params: dict | OrderedDict,
signed: bool = False, signed: bool = False,
action: str = 'get' action: str = 'get'
) -> dict[str, Any]:
) -> dict[str, Any]:
'''
Make a /api/v3/ SPOT/MARGIN account/market endpoint request.
For eg. all margin and advancecd spot account eps use this
endpoing parent path:
- https://binance-docs.github.io/apidocs/spot/en/#margin-account-trade
- https://binance-docs.github.io/apidocs/spot/en/#listen-key-spot
- https://binance-docs.github.io/apidocs/spot/en/#spot-algo-endpoints
'''
if signed: if signed:
params['signature'] = self._get_signature(params) params['signature'] = self._get_signature(params)
@ -297,10 +319,19 @@ class Client:
self, self,
sym: str | None = None, sym: str | None = None,
mkt_type: MarketType = 'spot',
) -> dict[str, Pair] | Pair: ) -> dict[str, Pair] | Pair:
''' '''
Fresh exchange-pairs info query for symbol ``sym: str``: Fresh exchange-pairs info query for symbol ``sym: str``.
Depending on `mkt_type` different api eps are used:
- spot:
https://binance-docs.github.io/apidocs/spot/en/#exchange-information https://binance-docs.github.io/apidocs/spot/en/#exchange-information
- usd futes:
https://binance-docs.github.io/apidocs/futures/en/#check-server-time
- coin futes:
https://binance-docs.github.io/apidocs/delivery/en/#exchange-information
''' '''
cached_pair = self._pairs.get(sym) cached_pair = self._pairs.get(sym)
@ -313,25 +344,33 @@ class Client:
sym = sym.lower() sym = sym.lower()
params = {'symbol': sym} params = {'symbol': sym}
resp = await self._api('exchangeInfo', params=params) resp = await self.mkt_req[self.mkt_mode]('exchangeInfo', params=params)
entries = resp['symbols'] entries = resp['symbols']
if not entries: if not entries:
raise SymbolNotFound(f'{sym} not found:\n{resp}') raise SymbolNotFound(f'{sym} not found:\n{resp}')
# pre-process .filters field into a table # import tractor
# await tractor.breakpoint()
pairs = {} pairs = {}
for item in entries: for item in entries:
symbol = item['symbol'] symbol = item['symbol']
# for spot mkts, pre-process .filters field into
# a table..
filters_ls: list = item.pop('filters', False)
if filters_ls:
filters = {} filters = {}
filters_ls: list = item.pop('filters')
for entry in filters_ls: for entry in filters_ls:
ftype = entry['filterType'] ftype = entry['filterType']
filters[ftype] = entry filters[ftype] = entry
pairs[symbol] = Pair( # TODO: lookup pair schema by mkt type
filters=filters, # pair_type = mkt_type
**item,
) # pairs[symbol] = SpotPair(
# filters=filters,
# )
pairs[symbol] = FutesPair(**item)
# pairs = { # pairs = {
# item['symbol']: Pair(**item) for item in entries # item['symbol']: Pair(**item) for item in entries
@ -343,8 +382,6 @@ class Client:
else: else:
return self._pairs return self._pairs
symbol_info = exch_info
async def search_symbols( async def search_symbols(
self, self,
pattern: str, pattern: str,
@ -448,7 +485,8 @@ class Client:
signed=True signed=True
) )
log.info(f'done. len {len(resp)}') log.info(f'done. len {len(resp)}')
await trio.sleep(3)
# await trio.sleep(3)
return positions, volumes return positions, volumes
@ -457,6 +495,8 @@ class Client:
recv_window: int = 60000 recv_window: int = 60000
) -> list: ) -> list:
# TODO: can't we drop this since normal dicts are
# ordered implicitly in mordern python?
params = OrderedDict([ params = OrderedDict([
('recvWindow', recv_window), ('recvWindow', recv_window),
('timestamp', binance_timestamp(now())) ('timestamp', binance_timestamp(now()))
@ -594,7 +634,7 @@ class Client:
@acm @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client(mkt_mode='usd_futes')
log.info('Caching exchange infos..') log.info('Caching exchange infos..')
await client.exch_info() await client.exch_info()
yield client yield client

View File

@ -184,5 +184,3 @@ async def trades_dialogue(
breakpoint() breakpoint()
await ems_stream.send(msg.dict()) await ems_stream.send(msg.dict())

View File

@ -24,11 +24,13 @@ from contextlib import (
aclosing, aclosing,
) )
from datetime import datetime from datetime import datetime
from functools import partial
import itertools import itertools
from typing import ( from typing import (
Any, Any,
AsyncGenerator, AsyncGenerator,
Callable, Callable,
Generator,
) )
import time import time
@ -59,11 +61,12 @@ from piker.data._web_bs import (
from piker.brokers._util import ( from piker.brokers._util import (
DataUnavailable, DataUnavailable,
get_logger, get_logger,
get_console_log,
) )
from .api import ( from .api import (
Client, Client,
)
from .schemas import (
Pair, Pair,
) )
@ -94,7 +97,7 @@ class AggTrade(Struct, frozen=True):
l: int # noqa Last trade ID l: int # noqa Last trade ID
T: int # Trade time T: int # Trade time
m: bool # Is the buyer the market maker? m: bool # Is the buyer the market maker?
M: bool # Ignore M: bool | None = None # Ignore
async def stream_messages( async def stream_messages(
@ -134,7 +137,9 @@ async def stream_messages(
ask=ask, ask=ask,
asize=asize, asize=asize,
) )
l1.typecast() # for speed probably better to only specifically
# cast fields we need in numerical form?
# l1.typecast()
# repack into piker's tick-quote format # repack into piker's tick-quote format
yield 'l1', { yield 'l1', {
@ -142,23 +147,23 @@ async def stream_messages(
'ticks': [ 'ticks': [
{ {
'type': 'bid', 'type': 'bid',
'price': l1.bid, 'price': float(l1.bid),
'size': l1.bsize, 'size': float(l1.bsize),
}, },
{ {
'type': 'bsize', 'type': 'bsize',
'price': l1.bid, 'price': float(l1.bid),
'size': l1.bsize, 'size': float(l1.bsize),
}, },
{ {
'type': 'ask', 'type': 'ask',
'price': l1.ask, 'price': float(l1.ask),
'size': l1.asize, 'size': float(l1.asize),
}, },
{ {
'type': 'asize', 'type': 'asize',
'price': l1.ask, 'price': float(l1.ask),
'size': l1.asize, 'size': float(l1.asize),
} }
] ]
} }
@ -281,36 +286,15 @@ async def get_mkt_info(
return both return both
async def stream_quotes( @acm
async def subscribe(
send_chan: trio.abc.SendChannel, ws: NoBsWs,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # defined once at import time to keep a global state B)
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, iter_subids: Generator[int, None, None] = itertools.count(),
) -> None: ):
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with (
send_chan as send_chan,
):
init_msgs: list[FeedInit] = []
for sym in symbols:
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
)
iter_subids = itertools.count()
@acm
async def subscribe(ws: NoBsWs):
# setup subs # setup subs
subid: int = next(iter_subids) subid: int = next(iter_subids)
@ -351,13 +335,46 @@ async def stream_quotes(
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
async with (
send_chan as send_chan,
):
init_msgs: list[FeedInit] = []
for sym in symbols:
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
)
# TODO: detect whether futes or spot contact was requested
from .api import (
_futes_ws,
# _spot_ws,
)
wsep: str = _futes_ws
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
# XXX: see api docs which show diff addr? wsep,
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information fixture=partial(
# 'wss://ws-api.binance.com:443/ws-api/v3', subscribe,
'wss://stream.binance.com/ws', symbols=symbols,
fixture=subscribe, ),
) as ws, ) as ws,
# avoid stream-gen closure from breaking trio.. # avoid stream-gen closure from breaking trio..
@ -387,6 +404,8 @@ async def stream_quotes(
topic = msg['symbol'].lower() topic = msg['symbol'].lower()
await send_chan.send({topic: msg}) await send_chan.send({topic: msg})
# last = time.time() # last = time.time()
@tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,

View File

@ -0,0 +1,114 @@
# piker: trading gear for hackers
# Copyright (C)
# Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet
# (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from decimal import Decimal
from piker.data.types import Struct
class Pair(Struct, frozen=True):
symbol: str
status: str
orderTypes: list[str]
# src
quoteAsset: str
quotePrecision: int
# dst
baseAsset: str
baseAssetPrecision: int
class SpotPair(Struct, frozen=True):
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAssetPrecision: int
baseCommissionPrecision: int
quoteCommissionPrecision: int
icebergAllowed: bool
ocoAllowed: bool
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
filters: dict[
str,
str | int | float,
]
permissions: list[str]
@property
def price_tick(self) -> Decimal:
# XXX: lul, after manually inspecting the response format we
# just directly pick out the info we need
step_size: str = self.filters['PRICE_FILTER']['tickSize'].rstrip('0')
return Decimal(step_size)
@property
def size_tick(self) -> Decimal:
step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0')
return Decimal(step_size)
class FutesPair(Pair):
symbol: str # 'BTCUSDT',
pair: str # 'BTCUSDT',
baseAssetPrecision: int # 8,
contractType: str # 'PERPETUAL',
deliveryDate: int # 4133404800000,
liquidationFee: float # '0.012500',
maintMarginPercent: float # '2.5000',
marginAsset: str # 'USDT',
marketTakeBound: float # '0.05',
maxMoveOrderLimit: int # 10000,
onboardDate: int # 1569398400000,
pricePrecision: int # 2,
quantityPrecision: int # 3,
quoteAsset: str # 'USDT',
quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'],
underlyingType: str # 'COIN'
# NOTE: for compat with spot pairs and `MktPair.src: Asset`
# processing..
@property
def quoteAssetPrecision(self) -> int:
return self.quotePrecision
@property
def price_tick(self) -> Decimal:
return Decimal(self.pricePrecision)
@property
def size_tick(self) -> Decimal:
return Decimal(self.quantityPrecision)