From f36061a1492f9dbb00d71b851d78f208b3a026b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 16 Jun 2023 20:48:38 -0400 Subject: [PATCH] binance: first draft live order ctl support B) Untested fully but has ostensibly working position and balance loading (by delegating entirely to binance's internals for that) and an MVP ems order request handler; still need to fill out the order status update task implementation.. Notes: - uses user data stream for all per account balance and position tracking. - no support yet for `piker.accounting` position tracking. - no support yet for full order / position real-time update via user stream. --- piker/brokers/binance/broker.py | 451 +++++++++++++++++++++++++------- 1 file changed, 363 insertions(+), 88 deletions(-) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 6dcfd8d0..a9f0a0a9 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -22,15 +22,23 @@ Live order control B) ''' from __future__ import annotations +from collections import ChainMap +from pprint import pformat from typing import ( Any, AsyncIterator, ) import time +from time import time_ns +from bidict import bidict import tractor import trio +from piker.accounting import ( + Asset, + # MktPair, +) from piker.brokers._util import ( get_logger, ) @@ -48,57 +56,111 @@ from piker.clearing._messages import ( BrokerdPosition, BrokerdFill, BrokerdCancel, - # BrokerdError, + BrokerdError, ) +from .venues import Pair +from .api import Client log = get_logger('piker.brokers.binance') async def handle_order_requests( - ems_order_stream: tractor.MsgStream + ems_order_stream: tractor.MsgStream, + client: Client, + + # TODO: update this from open orders loaded at boot! + dialogs: ChainMap[str, BrokerdOrder] = ChainMap(), + ) -> None: - async with open_cached_client('binance') as client: + ''' + Receive order requests from `emsd`, translate tramsit API calls and transmit. - async for request_msg in ems_order_stream: - log.info(f'Received order request {request_msg}') + ''' + msg: dict | BrokerdOrder | BrokerdCancel + async for msg in ems_order_stream: + log.info(f'Rx order request:\n{pformat(msg)}') + match msg: + case { + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + existing: BrokerdOrder | None = dialogs.get(cancel.oid) + if not existing: + log.error( + f'NO Existing order-dialog for {cancel.oid}!?' + ) + await ems_order_stream.send(BrokerdError( + oid=cancel.oid, + symbol=cancel.symbol, + reason=( + 'Invalid `binance` order request dialog oid', + ) + )) + continue - action = request_msg['action'] + else: + await client.submit_cancel( + cancel.symbol, + cancel.oid, + ) + + case { + 'account': ('binance.futes' | 'binance.spot') as account, + 'action': action, + } if action in {'buy', 'sell'}: - if action in {'buy', 'sell'}: # validate - order = BrokerdOrder(**request_msg) + order = BrokerdOrder(**msg) + + # NOTE: check and report edits + if existing := dialogs.get(order.oid): + log.info( + f'Existing order for {existing.oid} updated:\n' + f'{pformat(existing.to_dict())} -> {pformat(msg)}' + ) + # TODO: figure out what special params we have to send? + # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade + + # XXX: ACK the request **immediately** before sending + # the api side request to ensure the ems maps the oid -> + # reqid correctly! + resp = BrokerdOrderAck( + oid=order.oid, # ems order request id + reqid=order.oid, # our custom int mapping + account='binance', # piker account + ) + await ems_order_stream.send(resp) # call our client api to submit the order reqid = await client.submit_limit( - order.symbol, - order.action, - order.size, - order.price, + symbol=order.symbol, + side=order.action, + quantity=order.size, + price=order.price, oid=order.oid ) + # thank god at least someone lets us do this XD + assert reqid == order.oid - # deliver ack that order has been submitted to broker routing + # track latest request state + dialogs[reqid].maps.append(msg) + + case _: + account = msg.get('account') + if account not in {'binance.spot', 'binance.futes'}: + log.error( + 'This is a binance account, \ + only a `binance.spot/.futes` selection is valid' + ) await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - time_ns=time.time_ns(), - - ).dict() + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + 'Invalid `binance` broker request msg:\n{msg}' + )) ) - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - - await client.submit_cancel(msg.symbol, msg.reqid) - - else: - log.error(f'Unknown order command: {request_msg}') - @tractor.context async def open_trade_dialog( @@ -115,74 +177,287 @@ async def open_trade_dialog( # ledger: TransactionLedger # TODO: load pps and accounts using accounting apis! - positions: list[BrokerdPosition] = [] - accounts: list[str] = ['binance.default'] - await ctx.started((positions, accounts)) - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, open_cached_client('binance') as client, - client.manage_listen_key() as listen_key, ): - n.start_soon(handle_order_requests, ems_stream) + client.mkt_mode: str = 'usdtm_futes' - ws: NoBsWs - async with open_autorecon_ws( - f'wss://stream.binance.com:9443/ws/{listen_key}', - ) as ws: - event = await ws.recv_msg() + # if client. + account: str = client.mkt_mode - # https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update - if event.get('e') == 'executionReport': + wss: NoBsWs + async with ( + client.manage_listen_key() as listen_key, + open_autorecon_ws( + f'wss://stream.binancefuture.com/ws/{listen_key}', + # f'wss://stream.binance.com:9443/ws/{listen_key}', + ) as wss, - oid: str = event.get('c') - side: str = event.get('S').lower() - status: str = event.get('X') - order_qty: float = float(event.get('q')) - filled_qty: float = float(event.get('z')) - cum_transacted_qty: float = float(event.get('Z')) - price_avg: float = cum_transacted_qty / filled_qty - broker_time: float = float(event.get('T')) - commission_amount: float = float(event.get('n')) - commission_asset: float = event.get('N') + ): + nsid: int = time_ns() + await wss.send_msg({ + # "method": "SUBSCRIBE", + "method": "REQUEST", + "params": + [ + f"{listen_key}@account", + f"{listen_key}@balance", + f"{listen_key}@position", + ], + "id": nsid + }) - if status == 'TRADE': - if order_qty == filled_qty: - msg = BrokerdFill( + with trio.fail_after(1): + msg = await wss.recv_msg() + assert msg['id'] == nsid + + # TODO: load other market wide data / statistics: + # - OI: https://binance-docs.github.io/apidocs/futures/en/#open-interest + # - OI stats: https://binance-docs.github.io/apidocs/futures/en/#open-interest-statistics + accounts: bidict[str, str] = bidict() + balances: dict[Asset, float] = {} + positions: list[BrokerdPosition] = [] + + for resp_dict in msg['result']: + resp = resp_dict['res'] + req: str = resp_dict['req'] + + # @account response should be something like: + # {'accountAlias': 'sRFzFzAuuXsR', + # 'canDeposit': True, + # 'canTrade': True, + # 'canWithdraw': True, + # 'feeTier': 0} + if 'account' in req: + alias: str = resp['accountAlias'] + accounts['binance.usdtm_futes'] = alias + + # @balance response: + # {'accountAlias': 'sRFzFzAuuXsR', + # 'balances': [{'asset': 'BTC', + # 'availableBalance': '0.00000000', + # 'balance': '0.00000000', + # 'crossUnPnl': '0.00000000', + # 'crossWalletBalance': '0.00000000', + # 'maxWithdrawAmount': '0.00000000', + # 'updateTime': 0}] + # ... + # } + elif 'balance' in req: + for entry in resp['balances']: + name: str = entry['asset'] + balance: float = float(entry['balance']) + last_update_t: int = entry['updateTime'] + + spot_asset: Asset = client._venue2assets['spot'][name] + + if balance > 0: + balances[spot_asset] = (balance, last_update_t) + # await tractor.breakpoint() + + # @position response: + # {'positions': [{'entryPrice': '0.0', + # 'isAutoAddMargin': False, + # 'isolatedMargin': '0', + # 'leverage': 20, + # 'liquidationPrice': '0', + # 'marginType': 'CROSSED', + # 'markPrice': '0.60289650', + # 'markPrice': '0.00000000', + # 'maxNotionalValue': '25000', + # 'notional': '0', + # 'positionAmt': '0', + # 'positionSide': 'BOTH', + # 'symbol': 'ETHUSDT_230630', + # 'unRealizedProfit': '0.00000000', + # 'updateTime': 1672741444894} + # ... + # } + elif 'position' in req: + for entry in resp['positions']: + bs_mktid: str = entry['symbol'] + entry_size: float = float(entry['positionAmt']) + + pair: Pair | None + if ( + pair := client._venue2pairs[account].get(bs_mktid) + and entry_size > 0 + ): + entry_price: float = float(entry['entryPrice']) + + ppmsg = BrokerdPosition( + broker='binance', + account='binance.futes', + + # TODO: maybe we should be passing back + # a `MktPair` here? + symbol=pair.bs_fqme.lower() + '.binance', + + size=entry_size, + avg_price=entry_price, + ) + positions.append(ppmsg) + + if pair is None: + log.warning( + f'`{bs_mktid}` Position entry but no market pair?\n' + f'{pformat(entry)}\n' + ) + + await ctx.started((positions, list(accounts))) + + async with ( + trio.open_nursery() as tn, + ctx.open_stream() as ems_stream, + ): + + tn.start_soon( + handle_order_requests, + ems_stream, + client, + ) + tn.start_soon( + handle_order_updates, + ems_stream, + wss, + + ) + + await trio.sleep_forever() + + +async def handle_order_updates( + ems_stream: tractor.MsgStream, + wss: NoBsWs, + + # apiflows: dict[int, ChainMap[dict[str, dict]]], + # ids: bidict[str, int], + # reqids2txids: bidict[int, str], + + # table: PpTable, + # ledger_trans: dict[str, Transaction], + + # acctid: str, + # acc_name: str, + # token: str, + +) -> None: + ''' + Main msg handling loop for all things order management. + + This code is broken out to make the context explicit and state + variables defined in the signature clear to the reader. + + ''' + async for msg in wss: + match msg: + + # TODO: + # POSITION update + # futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update + + # ORDER update + # spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update + # futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update + case { + 'e': 'executionReport', + 'T': float(epoch_ms), + 'o': { + 's': bs_mktid, + + # XXX NOTE XXX see special ids for market + # events or margin calls: + # // special client order id: + # // starts with "autoclose-": liquidation order + # // "adl_autoclose": ADL auto close order + # // "settlement_autoclose-": settlement order + # for delisting or delivery + 'c': oid, + + # prices + 'a': float(submit_price), + 'ap': float(avg_price), + 'L': float(fill_price), + + # sizing + 'q': float(req_size), + 'l': float(clear_size_filled), # this event + 'z': float(accum_size_filled), # accum + + # commissions + 'n': float(cost), + 'N': str(cost_asset), + + # state + 'S': str(side), + 'X': str(status), + }, + } as order_msg: + log.info( + f'{status} for {side} ORDER oid: {oid}\n' + f'bs_mktid: {bs_mktid}\n\n' + + f'order size: {req_size}\n' + f'cleared size: {clear_size_filled}\n' + f'accum filled size: {accum_size_filled}\n\n' + + f'submit price: {submit_price}\n' + f'fill_price: {fill_price}\n' + f'avg clearing price: {avg_price}\n\n' + + f'cost: {cost}@{cost_asset}\n' + ) + + # status remap from binance to piker's + # status set: + # - NEW + # - PARTIALLY_FILLED + # - FILLED + # - CANCELED + # - EXPIRED + # https://binance-docs.github.io/apidocs/futures/en/#event-order-update + match status: + case 'PARTIALLY_FILLED' | 'FILLED': + status = 'fill' + + fill_msg = BrokerdFill( + time_ns=time_ns(), reqid=oid, - time_ns=time.time_ns(), - action=side, - price=price_avg, - broker_details={ - 'name': 'binance', - 'commissions': { - 'amount': commission_amount, - 'asset': commission_asset - }, - 'broker_time': broker_time - }, - broker_time=broker_time + + # just use size value for now? + # action=action, + size=clear_size_filled, + price=fill_price, + + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'broker'} | order_msg, + broker_time=time.time(), ) + await ems_stream.send(fill_msg) - else: - if status == 'NEW': - status = 'submitted' + if accum_size_filled == req_size: + status = 'closed' - elif status == 'CANCELED': - status = 'cancelled' + case 'NEW': + status = 'open' - msg = BrokerdStatus( - reqid=oid, - time_ns=time.time_ns(), - status=status, - filled=filled_qty, - remaining=order_qty - filled_qty, - broker_details={'name': 'binance'} - ) + case 'EXPIRED': + status = 'canceled' - else: - # XXX: temporary, to catch unhandled msgs - breakpoint() + case _: + status = status.lower() - await ems_stream.send(msg.dict()) + resp = BrokerdStatus( + time_ns=time_ns(), + reqid=oid, + + status=status, + filled=accum_size_filled, + remaining=req_size - accum_size_filled, + broker_details={ + 'name': 'binance', + 'broker_time': epoch_ms / 1000. + } + ) + await ems_stream.send(resp)