diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 24a781fd..8e6cf1e8 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -108,6 +108,7 @@ log = get_logger(__name__) _url = 'https://api.binance.com' +_fapi_url = 'https://testnet.binancefuture.com' # Broker specific ohlc schema (rest) @@ -230,18 +231,31 @@ def binance_timestamp( class Client: def __init__(self) -> None: - self._sesh = asks.Session(connections=4) - self._sesh.base_location: str = _url + self._pairs: dict[str, Pair] = {} # mkt info table - conf = get_config() + # live EP sesh + self._sesh = asks.Session(connections=4) + self._sesh.base_location: str = _url + + # testnet EP sesh + self._fapi_sesh = asks.Session(connections=4) + self._fapi_sesh.base_location = _fapi_url + + conf: dict = get_config() self.api_key: str = conf.get('api_key', '') self.api_secret: str = conf.get('api_secret', '') if self.api_key: - self._sesh.headers.update({'X-MBX-APIKEY': self.api_key}) + api_key_header = {'X-MBX-APIKEY': self.api_key} + self._sesh.headers.update(api_key_header) + self._fapi_sesh.headers.update(api_key_header) def _get_signature(self, data: OrderedDict) -> str: + + # XXX: Info on security and authentification + # https://binance-docs.github.io/apidocs/#endpoint-security-type + if not self.api_secret: raise config.NoSignature( "Can't generate a signature without setting up credentials" @@ -277,8 +291,26 @@ class Client: return resproc(resp, log) - async def exch_info( + async def _fapi( + self, + method: str, + params: Union[dict, OrderedDict], + signed: bool = False, + action: str = 'get' + ) -> dict[str, Any]: + if signed: + params['signature'] = self._get_signature(params) + + resp = await getattr(self._fapi_sesh, action)( + path=f'/fapi/v1/{method}', + params=params, + timeout=float('inf') + ) + + return resproc(resp, log) + + async def exch_info( self, sym: str | None = None, @@ -467,6 +499,70 @@ class Client: assert resp['orderId'] == oid return oid + async def submit_cancel( + self, + symbol: str, + oid: str, + recv_window: int = 60000 + ) -> None: + symbol = symbol.upper() + + params = OrderedDict([ + ('symbol', symbol), + ('orderId', oid), + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(pendulum.now())) + ]) + + await self._api( + 'order', + params=params, + signed=True, + action='delete' + ) + + async def get_listen_key(self) -> str: + return await self._api( + 'userDataStream', + params={}, + action='post' + )['listenKey'] + + async def keep_alive_key(self, listen_key: str) -> None: + await self._fapi( + 'userDataStream', + params={'listenKey': listen_key}, + action='put' + ) + + async def close_listen_key(self, listen_key: str) -> None: + await self._fapi( + 'userDataStream', + params={'listenKey': listen_key}, + action='delete' + ) + + @acm + async def manage_listen_key(self): + + async def periodic_keep_alive( + self, + listen_key: str, + timeout=60 * 29 # 29 minutes + ): + while True: + await trio.sleep(timeout) + await self.keep_alive_key(listen_key) + + key = await self.get_listen_key() + + async with trio.open_nursery() as n: + n.start_soon(periodic_keep_alive, key) + yield key + + await self.close_listen_key(key) + + @acm async def get_client() -> Client: client = Client() @@ -484,7 +580,7 @@ class AggTrade(Struct, frozen=True): p: float # Price q: float # Quantity f: int # First trade ID - l: int # Last trade ID + l: int # noqa Last trade ID T: int # Trade time m: bool # Is the buyer the market maker? M: bool # Ignore @@ -783,7 +879,8 @@ async def stream_quotes( async def handle_order_requests( - ems_order_stream: tractor.MsgStream + ems_order_stream: tractor.MsgStream, + symbol: str ) -> None: async with open_cached_client('binance') as client: async for request_msg in ems_order_stream: @@ -817,7 +914,8 @@ async def handle_order_requests( elif action == 'cancel': # msg = BrokerdCancel(**request_msg) - # await run_client_method + # + # await client.submit_cancel(symbol, msg.reqid) ... else: @@ -844,12 +942,18 @@ async def trades_dialogue( # accounts: set[str] = set() # await ctx.started((positions, {})) - # async with ( - # ctx.open_stream() as ems_stream, - # trio.open_nursery() as n - # ): - # n.start_soon(handle_order_requests, ems_stream) - # await trio.sleep_forever() + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + open_cached_client('binance') as client, + # client.manage_listen_key() as listen_key, + ): + n.start_soon(handle_order_requests, ems_stream) + await trio.sleep_forever() + # async with open_autorecon_ws( + # f'wss://stream.binance.com:9443/ws/{listen_key}', + # ) as ws: + # ... @tractor.context