binance: add deposits/withdrawals API support

From @guilledk,
- Drop Decimal quantize for now
- Minor tweaks to trades_dialogue proto
basic_buy_bot
Guillermo Rodriguez 2022-02-19 18:03:45 -03:00 committed by Tyler Goodlet
parent eaaf6e4cc1
commit 7c00ca0254
1 changed files with 127 additions and 60 deletions

View File

@ -39,13 +39,15 @@ from typing import (
)
import hmac
import time
import decimal
import hashlib
from pathlib import Path
import trio
from trio_typing import TaskStatus
import pendulum
from pendulum import (
now,
from_timestamp,
)
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
@ -79,10 +81,10 @@ from piker.data._web_bs import (
from ..clearing._messages import (
BrokerdOrder,
BrokerdOrderAck,
# BrokerdCancel,
#BrokerdStatus,
#BrokerdPosition,
#BrokerdFill,
BrokerdStatus,
BrokerdPosition,
BrokerdFill,
BrokerdCancel,
# BrokerdError,
)
@ -108,6 +110,7 @@ log = get_logger(__name__)
_url = 'https://api.binance.com'
_sapi_url = 'https://api.binance.com'
_fapi_url = 'https://testnet.binancefuture.com'
@ -238,18 +241,25 @@ class Client:
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _url
# testnet EP sesh
# futes testnet rest EPs
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location = _fapi_url
# sync rest API
self._sapi_sesh = asks.Session(connections=4)
self._sapi_sesh.base_location = _sapi_url
conf: dict = get_config()
self.api_key: str = conf.get('api_key', '')
self.api_secret: str = conf.get('api_secret', '')
self.watchlist = conf.get('watchlist', [])
if self.api_key:
api_key_header = {'X-MBX-APIKEY': self.api_key}
self._sesh.headers.update(api_key_header)
self._fapi_sesh.headers.update(api_key_header)
self._sapi_sesh.headers.update(api_key_header)
def _get_signature(self, data: OrderedDict) -> str:
@ -310,6 +320,25 @@ class Client:
return resproc(resp, log)
async def _sapi(
self,
method: str,
params: Union[dict, OrderedDict],
signed: bool = False,
action: str = 'get'
) -> dict[str, Any]:
if signed:
params['signature'] = self._get_signature(params)
resp = await getattr(self._sapi_sesh, action)(
path=f'/sapi/v1/{method}',
params=params,
timeout=float('inf')
)
return resproc(resp, log)
async def exch_info(
self,
sym: str | None = None,
@ -392,7 +421,7 @@ class Client:
) -> dict:
if end_dt is None:
end_dt = pendulum.now('UTC').add(minutes=1)
end_dt = now('UTC').add(minutes=1)
if start_dt is None:
start_dt = end_dt.start_of(
@ -444,6 +473,58 @@ class Client:
) if as_np else bars
return array
async def get_positions(
self,
recv_window: int = 60000
) -> tuple:
positions = {}
volumes = {}
for sym in self.watchlist:
log.info(f'doing {sym}...')
params = OrderedDict([
('symbol', sym),
('recvWindow', recv_window),
('timestamp', binance_timestamp(now()))
])
resp = await self._api(
'allOrders',
params=params,
signed=True
)
log.info(f'done. len {len(resp)}')
await trio.sleep(3)
return positions, volumes
async def get_deposits(
self,
recv_window: int = 60000
) -> list:
params = OrderedDict([
('recvWindow', recv_window),
('timestamp', binance_timestamp(now()))
])
return await self._sapi(
'capital/deposit/hisrec',
params=params,
signed=True)
async def get_withdrawls(
self,
recv_window: int = 60000
) -> list:
params = OrderedDict([
('recvWindow', recv_window),
('timestamp', binance_timestamp(now()))
])
return await self._sapi(
'capital/withdraw/history',
params=params,
signed=True)
async def submit_limit(
self,
symbol: str,
@ -461,18 +542,8 @@ class Client:
await self.cache_symbols()
asset_precision = self._pairs[symbol]['baseAssetPrecision']
quote_precision = self._pairs[symbol]['quoteAssetPrecision']
quantity = Decimal(quantity).quantize(
Decimal(1 ** -asset_precision),
rounding=decimal.ROUND_HALF_EVEN
)
price = Decimal(price).quantize(
Decimal(1 ** -quote_precision),
rounding=decimal.ROUND_HALF_EVEN
)
# asset_precision = self._pairs[symbol]['baseAssetPrecision']
# quote_precision = self._pairs[symbol]['quoteAssetPrecision']
params = OrderedDict([
('symbol', symbol),
@ -483,21 +554,21 @@ class Client:
('price', price),
('recvWindow', recv_window),
('newOrderRespType', 'ACK'),
('timestamp', binance_timestamp(pendulum.now()))
('timestamp', binance_timestamp(now()))
])
if oid:
params['newClientOrderId'] = oid
resp = await self._api(
'order/test', # TODO: switch to real `order` endpoint
'order',
params=params,
signed=True,
action='post'
)
assert resp['orderId'] == oid
return oid
log.info(resp)
# return resp['orderId']
return resp['orderId']
async def submit_cancel(
self,
@ -511,10 +582,10 @@ class Client:
('symbol', symbol),
('orderId', oid),
('recvWindow', recv_window),
('timestamp', binance_timestamp(pendulum.now()))
('timestamp', binance_timestamp(now()))
])
await self._api(
return await self._api(
'order',
params=params,
signed=True,
@ -522,11 +593,11 @@ class Client:
)
async def get_listen_key(self) -> str:
return await self._api(
return (await self._api(
'userDataStream',
params={},
action='post'
)['listenKey']
))['listenKey']
async def keep_alive_key(self, listen_key: str) -> None:
await self._fapi(
@ -557,7 +628,7 @@ class Client:
key = await self.get_listen_key()
async with trio.open_nursery() as n:
n.start_soon(periodic_keep_alive, key)
n.start_soon(periodic_keep_alive, self, key)
yield key
n.cancel_scope.cancel()
@ -733,8 +804,8 @@ async def open_history_client(
if (inow - times[-1]) > 60:
await tractor.breakpoint()
start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1])
start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1])
return array, start_dt, end_dt
@ -873,15 +944,15 @@ async def stream_quotes(
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
if typ == 'l1':
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
# last = time.time()
async def handle_order_requests(
ems_order_stream: tractor.MsgStream,
symbol: str
ems_order_stream: tractor.MsgStream
) -> None:
async with open_cached_client('binance') as client:
async for request_msg in ems_order_stream:
@ -938,43 +1009,39 @@ async def trades_dialogue(
# ledger: TransactionLedger
# TODO: load pps and accounts using accounting apis!
# positions: dict = {}
# accounts: set[str] = set()
# await ctx.started((positions, {}))
positions: list[BrokerdPosition] = []
accounts: list[str] = ['binance.default']
await ctx.started((positions, accounts))
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
open_cached_client('binance') as client,
# client.manage_listen_key() as listen_key,
client.manage_listen_key() as listen_key,
):
n.start_soon(handle_order_requests, ems_stream)
await trio.sleep_forever()
# await trio.sleep_forever()
async with open_autorecon_ws(
f'wss://stream.binance.com:9443/ws/{listen_key}',
) as ws:
event = await ws.recv_msg()
# https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
if event.get('e') == 'executionReport':
"""
https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
"""
oid = event.get('c')
side = event.get('S').lower()
status = event.get('X')
order_qty = float(event.get('q'))
filled_qty = float(event.get('z'))
cumm_transacted_qty = float(event.get('Z'))
price_avg = cum_transacted_qty / filled_qty
oid: str = event.get('c')
side: str = event.get('S').lower()
status: str = event.get('X')
order_qty: float = float(event.get('q'))
filled_qty: float = float(event.get('z'))
cum_transacted_qty: float = float(event.get('Z'))
price_avg: float = cum_transacted_qty / filled_qty
broker_time: float = float(event.get('T'))
commission_amount: float = float(event.get('n'))
commission_asset: float = event.get('N')
broker_time = float(event.get('T'))
commission_amount = float(event.get('n'))
commission_asset = event.get('N')
if status == 'TRADE':
if status == 'TRADE':
if order_qty == filled_qty:
msg = BrokerdFill(
reqid=oid,
@ -993,7 +1060,7 @@ async def trades_dialogue(
)
else:
if status == 'NEW':
if status == 'NEW':
status = 'submitted'
elif status == 'CANCELED':