From bc4ded266207df06963405510cb562e5a2d2df71 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 21 Jun 2021 18:27:53 +0000 Subject: [PATCH] binance: start drafting live order ctl endpoints First draft originally by @guilledk but update by myself 2 years later xD. Will crash at runtime but at least has the machinery to setup signed requests for auth-ed endpoints B) Also adds a generic `NoSignature` error for when credentials are not present in `brokers.toml` but user is trying to access auth-ed eps with the client. --- piker/brokers/binance.py | 211 ++++++++++++++++++++++++++++++++++++--- piker/config.py | 4 + 2 files changed, 203 insertions(+), 12 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index a8791ae9..366054e1 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -1,6 +1,6 @@ # piker: trading gear for hackers # Copyright (C) -# Guillermo Rodriguez +# Guillermo Rodriguez (aka ze jefe) # Tyler Goodlet # (in stewardship for pikers) @@ -21,6 +21,7 @@ Binance backend """ +from collections import OrderedDict from contextlib import ( asynccontextmanager as acm, aclosing, @@ -29,11 +30,16 @@ from datetime import datetime from decimal import Decimal import itertools from typing import ( - Any, Union, Optional, - AsyncGenerator, Callable, + Any, + Union, + AsyncIterator, + AsyncGenerator, + Callable, ) +import hmac import time - +import decimal +import hashlib import trio from trio_typing import TaskStatus import pendulum @@ -42,6 +48,7 @@ from fuzzywuzzy import process as fuzzy import numpy as np import tractor +from .. import config from .._cacheables import async_lifo_cache from ..accounting._mktinfo import ( Asset, @@ -66,6 +73,30 @@ from piker.data._web_bs import ( NoBsWs, ) +from ..clearing._messages import ( + BrokerdOrder, + BrokerdOrderAck, + # BrokerdCancel, + #BrokerdStatus, + #BrokerdPosition, + #BrokerdFill, + # BrokerdError, +) + +log = get_logger('piker.brokers.binance') + + +def get_config() -> dict: + conf, path = config.load() + + section = conf.get('binance') + + if not section: + log.warning(f'No config section found for binance in {path}') + return dict() + + return section + log = get_logger(__name__) @@ -197,16 +228,55 @@ class Client: self._sesh.base_location = _url self._pairs: dict[str, Pair] = {} + conf = get_config() + self.api_key = conf.get('api', {}).get('key') + self.api_secret = conf.get('api', {}).get('secret') + + if self.api_key: + self._sesh.headers.update({'X-MBX-APIKEY': self.api_key}) + + def _get_signature(self, data: OrderedDict) -> str: + if not self.api_secret: + raise config.NoSignature( + "Can't generate a signature without setting up credentials" + ) + + query_str = '&'.join([ + f'{_key}={value}' + for _key, value in data.items()]) + log.info(query_str) + msg_auth = hmac.new( + self.api_secret.encode('utf-8'), + query_str.encode('utf-8'), + hashlib.sha256 + ) + return msg_auth.hexdigest() + async def _api( self, method: str, - params: dict, + params: Union[dict, OrderedDict], + signed: bool = False, + action: str = 'get' ) -> dict[str, Any]: - resp = await self._sesh.get( - path=f'/api/v3/{method}', - params=params, - timeout=float('inf') - ) + + if signed: + params['signature'] = self._get_signature(params) + + if action == 'get': + resp = await self._sesh.get( + path=f'/api/v3/{method}', + params=params, + timeout=float('inf') + ) + + elif action == 'post': + resp = await self._sesh.post( + path=f'/api/v3/{method}', + params=params, + timeout=float('inf') + ) + return resproc(resp, log) async def exch_info( @@ -284,8 +354,8 @@ class Client: async def bars( self, symbol: str, - start_dt: Optional[datetime] = None, - end_dt: Optional[datetime] = None, + start_dt: datetime | None = None, + end_dt: datetime | None = None, limit: int = 1000, # <- max allowed per query as_np: bool = True, @@ -344,6 +414,60 @@ class Client: ) if as_np else bars return array + async def submit_limit( + self, + symbol: str, + side: str, # SELL / BUY + quantity: float, + price: float, + # time_in_force: str = 'GTC', + oid: int | None = None, + # iceberg_quantity: float | None = None, + # order_resp_type: str | None = None, + recv_window: int = 60000 + + ) -> int: + symbol = symbol.upper() + + await self.cache_symbols() + + asset_precision = self._pairs[symbol]['baseAssetPrecision'] + quote_precision = self._pairs[symbol]['quoteAssetPrecision'] + + quantity = Decimal(quantity).quantize( + Decimal(1 ** -asset_precision), + rounding=decimal.ROUND_HALF_EVEN + ) + + price = Decimal(price).quantize( + Decimal(1 ** -quote_precision), + rounding=decimal.ROUND_HALF_EVEN + ) + + params = OrderedDict([ + ('symbol', symbol), + ('side', side.upper()), + ('type', 'LIMIT'), + ('timeInForce', 'GTC'), + ('quantity', quantity), + ('price', price), + ('recvWindow', recv_window), + ('newOrderRespType', 'ACK'), + ('timestamp', binance_timestamp(pendulum.now())) + ]) + + if oid: + params['newClientOrderId'] = oid + + resp = await self._api( + 'order/test', # TODO: switch to real `order` endpoint + params=params, + signed=True, + action='post' + ) + + assert resp['orderId'] == oid + return oid @acm async def get_client() -> Client: @@ -660,6 +784,69 @@ async def stream_quotes( # last = time.time() +async def handle_order_requests( + ems_order_stream: tractor.MsgStream +) -> None: + async with open_cached_client('binance') as client: + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = await client.submit_limit( + order.symbol, + order.action, + order.size, + order.price, + oid=order.oid + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + # ems order request id + oid=order.oid, + # broker specific request id + reqid=reqid, + time_ns=time.time_ns(), + ).dict() + ) + + elif action == 'cancel': + # msg = BrokerdCancel(**request_msg) + # await run_client_method + ... + + else: + log.error(f'Unknown order command: {request_msg}') + + +@tractor.context +async def trades_dialogue( + ctx: tractor.Context, + loglevel: str = None +) -> AsyncIterator[dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + positions = {} # TODO: get already open pos + + await ctx.started(positions, {}) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n + ): + n.start_soon(handle_order_requests, ems_stream) + await trio.sleep_forever() + + @tractor.context async def open_symbol_search( ctx: tractor.Context, diff --git a/piker/config.py b/piker/config.py index 0220f3e6..12085261 100644 --- a/piker/config.py +++ b/piker/config.py @@ -173,6 +173,10 @@ _context_defaults = dict( ) +class NoSignature(Exception): + 'No credentials setup for broker backend!' + + def _override_config_dir( path: str ) -> None: