binance: add `submit_cancel()` & listen key mgmt

Patch again originally from @guilledk and adds a sesh for futures
testnet as well as a order canceller method B)
basic_buy_bot
Tyler Goodlet 2023-06-09 14:51:51 -04:00
parent 1d9c195506
commit f8af13d010
1 changed files with 118 additions and 14 deletions

View File

@ -108,6 +108,7 @@ log = get_logger(__name__)
_url = 'https://api.binance.com' _url = 'https://api.binance.com'
_fapi_url = 'https://testnet.binancefuture.com'
# Broker specific ohlc schema (rest) # Broker specific ohlc schema (rest)
@ -230,18 +231,31 @@ def binance_timestamp(
class Client: class Client:
def __init__(self) -> None: def __init__(self) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _url
self._pairs: dict[str, Pair] = {} # mkt info table self._pairs: dict[str, Pair] = {} # mkt info table
conf = get_config() # live EP sesh
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _url
# testnet EP sesh
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location = _fapi_url
conf: dict = get_config()
self.api_key: str = conf.get('api_key', '') self.api_key: str = conf.get('api_key', '')
self.api_secret: str = conf.get('api_secret', '') self.api_secret: str = conf.get('api_secret', '')
if self.api_key: if self.api_key:
self._sesh.headers.update({'X-MBX-APIKEY': self.api_key}) api_key_header = {'X-MBX-APIKEY': self.api_key}
self._sesh.headers.update(api_key_header)
self._fapi_sesh.headers.update(api_key_header)
def _get_signature(self, data: OrderedDict) -> str: def _get_signature(self, data: OrderedDict) -> str:
# XXX: Info on security and authentification
# https://binance-docs.github.io/apidocs/#endpoint-security-type
if not self.api_secret: if not self.api_secret:
raise config.NoSignature( raise config.NoSignature(
"Can't generate a signature without setting up credentials" "Can't generate a signature without setting up credentials"
@ -277,8 +291,26 @@ class Client:
return resproc(resp, log) return resproc(resp, log)
async def exch_info( async def _fapi(
self,
method: str,
params: Union[dict, OrderedDict],
signed: bool = False,
action: str = 'get'
) -> dict[str, Any]:
if signed:
params['signature'] = self._get_signature(params)
resp = await getattr(self._fapi_sesh, action)(
path=f'/fapi/v1/{method}',
params=params,
timeout=float('inf')
)
return resproc(resp, log)
async def exch_info(
self, self,
sym: str | None = None, sym: str | None = None,
@ -467,6 +499,70 @@ class Client:
assert resp['orderId'] == oid assert resp['orderId'] == oid
return oid return oid
async def submit_cancel(
self,
symbol: str,
oid: str,
recv_window: int = 60000
) -> None:
symbol = symbol.upper()
params = OrderedDict([
('symbol', symbol),
('orderId', oid),
('recvWindow', recv_window),
('timestamp', binance_timestamp(pendulum.now()))
])
await self._api(
'order',
params=params,
signed=True,
action='delete'
)
async def get_listen_key(self) -> str:
return await self._api(
'userDataStream',
params={},
action='post'
)['listenKey']
async def keep_alive_key(self, listen_key: str) -> None:
await self._fapi(
'userDataStream',
params={'listenKey': listen_key},
action='put'
)
async def close_listen_key(self, listen_key: str) -> None:
await self._fapi(
'userDataStream',
params={'listenKey': listen_key},
action='delete'
)
@acm
async def manage_listen_key(self):
async def periodic_keep_alive(
self,
listen_key: str,
timeout=60 * 29 # 29 minutes
):
while True:
await trio.sleep(timeout)
await self.keep_alive_key(listen_key)
key = await self.get_listen_key()
async with trio.open_nursery() as n:
n.start_soon(periodic_keep_alive, key)
yield key
await self.close_listen_key(key)
@acm @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
@ -484,7 +580,7 @@ class AggTrade(Struct, frozen=True):
p: float # Price p: float # Price
q: float # Quantity q: float # Quantity
f: int # First trade ID f: int # First trade ID
l: int # Last trade ID l: int # noqa Last trade ID
T: int # Trade time T: int # Trade time
m: bool # Is the buyer the market maker? m: bool # Is the buyer the market maker?
M: bool # Ignore M: bool # Ignore
@ -783,7 +879,8 @@ async def stream_quotes(
async def handle_order_requests( async def handle_order_requests(
ems_order_stream: tractor.MsgStream ems_order_stream: tractor.MsgStream,
symbol: str
) -> None: ) -> None:
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
@ -817,7 +914,8 @@ async def handle_order_requests(
elif action == 'cancel': elif action == 'cancel':
# msg = BrokerdCancel(**request_msg) # msg = BrokerdCancel(**request_msg)
# await run_client_method #
# await client.submit_cancel(symbol, msg.reqid)
... ...
else: else:
@ -844,12 +942,18 @@ async def trades_dialogue(
# accounts: set[str] = set() # accounts: set[str] = set()
# await ctx.started((positions, {})) # await ctx.started((positions, {}))
# async with ( async with (
# ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
# trio.open_nursery() as n trio.open_nursery() as n,
# ): open_cached_client('binance') as client,
# n.start_soon(handle_order_requests, ems_stream) # client.manage_listen_key() as listen_key,
# await trio.sleep_forever() ):
n.start_soon(handle_order_requests, ems_stream)
await trio.sleep_forever()
# async with open_autorecon_ws(
# f'wss://stream.binance.com:9443/ws/{listen_key}',
# ) as ws:
# ...
@tractor.context @tractor.context