From 7c00ca0254f293054b49955a17a8aab02a1a081b Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 19 Feb 2022 18:03:45 -0300 Subject: [PATCH] binance: add deposits/withdrawals API support From @guilledk, - Drop Decimal quantize for now - Minor tweaks to trades_dialogue proto --- piker/brokers/binance.py | 187 ++++++++++++++++++++++++++------------- 1 file changed, 127 insertions(+), 60 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index cc85307d..b2150e86 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -39,13 +39,15 @@ from typing import ( ) import hmac import time -import decimal import hashlib from pathlib import Path import trio from trio_typing import TaskStatus -import pendulum +from pendulum import ( + now, + from_timestamp, +) import asks from fuzzywuzzy import process as fuzzy import numpy as np @@ -79,10 +81,10 @@ from piker.data._web_bs import ( from ..clearing._messages import ( BrokerdOrder, BrokerdOrderAck, - # BrokerdCancel, - #BrokerdStatus, - #BrokerdPosition, - #BrokerdFill, + BrokerdStatus, + BrokerdPosition, + BrokerdFill, + BrokerdCancel, # BrokerdError, ) @@ -108,6 +110,7 @@ log = get_logger(__name__) _url = 'https://api.binance.com' +_sapi_url = 'https://api.binance.com' _fapi_url = 'https://testnet.binancefuture.com' @@ -238,18 +241,25 @@ class Client: self._sesh = asks.Session(connections=4) self._sesh.base_location: str = _url - # testnet EP sesh + # futes testnet rest EPs self._fapi_sesh = asks.Session(connections=4) self._fapi_sesh.base_location = _fapi_url + # sync rest API + self._sapi_sesh = asks.Session(connections=4) + self._sapi_sesh.base_location = _sapi_url + conf: dict = get_config() self.api_key: str = conf.get('api_key', '') self.api_secret: str = conf.get('api_secret', '') + self.watchlist = conf.get('watchlist', []) + if self.api_key: api_key_header = {'X-MBX-APIKEY': self.api_key} self._sesh.headers.update(api_key_header) self._fapi_sesh.headers.update(api_key_header) + self._sapi_sesh.headers.update(api_key_header) def _get_signature(self, data: OrderedDict) -> str: @@ -310,6 +320,25 @@ class Client: return resproc(resp, log) + async def _sapi( + self, + method: str, + params: Union[dict, OrderedDict], + signed: bool = False, + action: str = 'get' + ) -> dict[str, Any]: + + if signed: + params['signature'] = self._get_signature(params) + + resp = await getattr(self._sapi_sesh, action)( + path=f'/sapi/v1/{method}', + params=params, + timeout=float('inf') + ) + + return resproc(resp, log) + async def exch_info( self, sym: str | None = None, @@ -392,7 +421,7 @@ class Client: ) -> dict: if end_dt is None: - end_dt = pendulum.now('UTC').add(minutes=1) + end_dt = now('UTC').add(minutes=1) if start_dt is None: start_dt = end_dt.start_of( @@ -444,6 +473,58 @@ class Client: ) if as_np else bars return array + async def get_positions( + self, + recv_window: int = 60000 + ) -> tuple: + positions = {} + volumes = {} + + for sym in self.watchlist: + log.info(f'doing {sym}...') + params = OrderedDict([ + ('symbol', sym), + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + resp = await self._api( + 'allOrders', + params=params, + signed=True + ) + log.info(f'done. len {len(resp)}') + await trio.sleep(3) + + return positions, volumes + + async def get_deposits( + self, + recv_window: int = 60000 + ) -> list: + + params = OrderedDict([ + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + return await self._sapi( + 'capital/deposit/hisrec', + params=params, + signed=True) + + async def get_withdrawls( + self, + recv_window: int = 60000 + ) -> list: + + params = OrderedDict([ + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + return await self._sapi( + 'capital/withdraw/history', + params=params, + signed=True) + async def submit_limit( self, symbol: str, @@ -461,18 +542,8 @@ class Client: await self.cache_symbols() - asset_precision = self._pairs[symbol]['baseAssetPrecision'] - quote_precision = self._pairs[symbol]['quoteAssetPrecision'] - - quantity = Decimal(quantity).quantize( - Decimal(1 ** -asset_precision), - rounding=decimal.ROUND_HALF_EVEN - ) - - price = Decimal(price).quantize( - Decimal(1 ** -quote_precision), - rounding=decimal.ROUND_HALF_EVEN - ) + # asset_precision = self._pairs[symbol]['baseAssetPrecision'] + # quote_precision = self._pairs[symbol]['quoteAssetPrecision'] params = OrderedDict([ ('symbol', symbol), @@ -483,21 +554,21 @@ class Client: ('price', price), ('recvWindow', recv_window), ('newOrderRespType', 'ACK'), - ('timestamp', binance_timestamp(pendulum.now())) + ('timestamp', binance_timestamp(now())) ]) if oid: params['newClientOrderId'] = oid resp = await self._api( - 'order/test', # TODO: switch to real `order` endpoint + 'order', params=params, signed=True, action='post' ) - - assert resp['orderId'] == oid - return oid + log.info(resp) + # return resp['orderId'] + return resp['orderId'] async def submit_cancel( self, @@ -511,10 +582,10 @@ class Client: ('symbol', symbol), ('orderId', oid), ('recvWindow', recv_window), - ('timestamp', binance_timestamp(pendulum.now())) + ('timestamp', binance_timestamp(now())) ]) - await self._api( + return await self._api( 'order', params=params, signed=True, @@ -522,11 +593,11 @@ class Client: ) async def get_listen_key(self) -> str: - return await self._api( + return (await self._api( 'userDataStream', params={}, action='post' - )['listenKey'] + ))['listenKey'] async def keep_alive_key(self, listen_key: str) -> None: await self._fapi( @@ -557,7 +628,7 @@ class Client: key = await self.get_listen_key() async with trio.open_nursery() as n: - n.start_soon(periodic_keep_alive, key) + n.start_soon(periodic_keep_alive, self, key) yield key n.cancel_scope.cancel() @@ -733,8 +804,8 @@ async def open_history_client( if (inow - times[-1]) > 60: await tractor.breakpoint() - start_dt = pendulum.from_timestamp(times[0]) - end_dt = pendulum.from_timestamp(times[-1]) + start_dt = from_timestamp(times[0]) + end_dt = from_timestamp(times[-1]) return array, start_dt, end_dt @@ -873,15 +944,15 @@ async def stream_quotes( # hz = 1/period if period else float('inf') # if hz > 60: # log.info(f'Binance quotez : {hz}') - - topic = msg['symbol'].lower() - await send_chan.send({topic: msg}) + + if typ == 'l1': + topic = msg['symbol'].lower() + await send_chan.send({topic: msg}) # last = time.time() async def handle_order_requests( - ems_order_stream: tractor.MsgStream, - symbol: str + ems_order_stream: tractor.MsgStream ) -> None: async with open_cached_client('binance') as client: async for request_msg in ems_order_stream: @@ -938,43 +1009,39 @@ async def trades_dialogue( # ledger: TransactionLedger # TODO: load pps and accounts using accounting apis! - # positions: dict = {} - # accounts: set[str] = set() - # await ctx.started((positions, {})) + positions: list[BrokerdPosition] = [] + accounts: list[str] = ['binance.default'] + await ctx.started((positions, accounts)) async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, open_cached_client('binance') as client, - # client.manage_listen_key() as listen_key, + client.manage_listen_key() as listen_key, ): n.start_soon(handle_order_requests, ems_stream) - await trio.sleep_forever() - + # await trio.sleep_forever() + async with open_autorecon_ws( f'wss://stream.binance.com:9443/ws/{listen_key}', ) as ws: event = await ws.recv_msg() + # https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update if event.get('e') == 'executionReport': - """ - https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update - """ - oid = event.get('c') - side = event.get('S').lower() - status = event.get('X') - order_qty = float(event.get('q')) - filled_qty = float(event.get('z')) - cumm_transacted_qty = float(event.get('Z')) - price_avg = cum_transacted_qty / filled_qty + oid: str = event.get('c') + side: str = event.get('S').lower() + status: str = event.get('X') + order_qty: float = float(event.get('q')) + filled_qty: float = float(event.get('z')) + cum_transacted_qty: float = float(event.get('Z')) + price_avg: float = cum_transacted_qty / filled_qty + broker_time: float = float(event.get('T')) + commission_amount: float = float(event.get('n')) + commission_asset: float = event.get('N') - broker_time = float(event.get('T')) - - commission_amount = float(event.get('n')) - commission_asset = event.get('N') - - if status == 'TRADE': + if status == 'TRADE': if order_qty == filled_qty: msg = BrokerdFill( reqid=oid, @@ -993,7 +1060,7 @@ async def trades_dialogue( ) else: - if status == 'NEW': + if status == 'NEW': status = 'submitted' elif status == 'CANCELED':