diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 1926fdba..d43f2a99 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -88,12 +88,16 @@ class OrderDialogs(Struct): ) -> None: self._dialogs[oid].maps.insert(0, msg) + # TODO: wrap all this in the `collections.abc.Mapping` interface? def get( self, oid: str, - field: str, - ) -> Any: - return self._dialogs[oid][field] + ) -> ChainMap[str, Any]: + ''' + Return the dialog `ChainMap` for provided id. + + ''' + return self._dialogs.get(oid, None) async def handle_order_requests( @@ -134,10 +138,23 @@ async def handle_order_requests( continue else: - await client.submit_cancel( - existing.symbol, - cancel.oid, - ) + symbol: str = existing['symbol'] + try: + await client.submit_cancel( + symbol, + cancel.oid, + ) + except BrokerError as be: + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=symbol, + reason=( + '`binance` CANCEL failed:\n' + f'{be}' + )) + ) + continue case { 'account': ('binance.usdtm' | 'binance.spot') as account, @@ -151,35 +168,35 @@ async def handle_order_requests( # NOTE: check and report edits if existing := dialogs.get(order.oid): log.info( - f'Existing order for {existing.oid} updated:\n' - f'{pformat(existing.to_dict())} -> {pformat(msg)}' + f'Existing order for {oid} updated:\n' + f'{pformat(existing.maps[-1])} -> {pformat(msg)}' ) # TODO: figure out what special params we have to send? # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade # lookup the binance-native symbol - bs_mktid: str = client._pairs[order.symbol.upper()].symbol + # bs_mktid: str = client._pairs[order.symbol.upper()].symbol # call our client api to submit the order try: - reqid = await client.submit_limit( - symbol=bs_mktid, - side=order.action, - quantity=order.size, - price=order.price, - oid=oid, - ) - # XXX: ACK the request **immediately** before sending # the api side request to ensure the ems maps the oid -> # reqid correctly! resp = BrokerdOrderAck( oid=oid, # ems order request id - reqid=reqid, # our custom int mapping + reqid=oid, # our custom int mapping account='binance', # piker account ) await ems_order_stream.send(resp) + reqid = await client.submit_limit( + symbol=order.symbol, + side=order.action, + quantity=order.size, + price=order.price, + oid=oid, + ) + # SMH they do gen their own order id: ints.. # assert reqid == order.oid dids[order.oid] = reqid @@ -187,7 +204,7 @@ async def handle_order_requests( # track latest request state such that map # lookups start at the most recent msg and then # scan reverse-chronologically. - dialogs.add_msg(msg) + dialogs.add_msg(oid, msg) except BrokerError as be: await ems_order_stream.send( @@ -235,7 +252,7 @@ async def open_trade_dialog( client.mkt_mode: str = 'usdtm_futes' # if client. - account: str = client.mkt_mode + venue: str = client.mkt_mode wss: NoBsWs async with ( @@ -332,14 +349,14 @@ async def open_trade_dialog( pair: Pair | None if ( - pair := client._venue2pairs[account].get(bs_mktid) + pair := client._venue2pairs[venue].get(bs_mktid) and entry_size > 0 ): entry_price: float = float(entry['entryPrice']) ppmsg = BrokerdPosition( broker='binance', - account='binance.futes', + account='binance.usdtm', # TODO: maybe we should be passing back # a `MktPair` here? @@ -365,6 +382,9 @@ async def open_trade_dialog( # .accounting support B) # - live order loading via user stream subscription and # update to the order dialog table. + # - MAKE SURE we add live orders loaded during init + # into the dialogs table to ensure they can be + # cancelled, meaning we can do a symbol lookup. # - position loading using `piker.accounting` subsys # and comparison with binance's own position calcs. # - load pps and accounts using accounting apis, write @@ -388,6 +408,7 @@ async def open_trade_dialog( handle_order_updates, ems_stream, wss, + dialogs, ) @@ -397,6 +418,7 @@ async def open_trade_dialog( async def handle_order_updates( ems_stream: tractor.MsgStream, wss: NoBsWs, + dialogs: OrderDialogs, # apiflows: dict[int, ChainMap[dict[str, dict]]], # ids: bidict[str, int], @@ -418,20 +440,55 @@ async def handle_order_updates( ''' async for msg in wss: + log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') match msg: - log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') # TODO: # POSITION update - # futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update # ORDER update # spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update # futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update + # futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update + # {'o': { + # 'L': '0', + # 'N': 'USDT', + # 'R': False, + # 'S': 'BUY', + # 'T': 1687028772484, + # 'X': 'NEW', + # 'a': '0', + # 'ap': '0', + # 'b': '7012.06520', + # 'c': '518d4122-8d3e-49b0-9a1e-1fabe6f62e4c', + # 'cp': False, + # 'f': 'GTC', + # 'i': 3376956924, + # 'l': '0', + # 'm': False, + # 'n': '0', + # 'o': 'LIMIT', + # 'ot': 'LIMIT', + # 'p': '21136.80', + # 'pP': False, + # 'ps': 'BOTH', + # 'q': '0.047', + # 'rp': '0', + # 's': 'BTCUSDT', + # 'si': 0, + # 'sp': '0', + # 'ss': 0, + # 't': 0, + # 'wt': 'CONTRACT_PRICE', + # 'x': 'NEW', + # 'z': '0'} + # } case { - 'e': 'executionReport', - 'T': float(epoch_ms), + # 'e': 'executionReport', + 'e': 'ORDER_TRADE_UPDATE', + 'T': int(epoch_ms), 'o': { + 'i': reqid, 's': bs_mktid, # XXX NOTE XXX see special ids for market @@ -444,22 +501,22 @@ async def handle_order_updates( 'c': oid, # prices - 'a': float(submit_price), - 'ap': float(avg_price), - 'L': float(fill_price), + 'a': submit_price, + 'ap': avg_price, + 'L': fill_price, # sizing - 'q': float(req_size), - 'l': float(clear_size_filled), # this event - 'z': float(accum_size_filled), # accum + 'q': req_size, + 'l': clear_size_filled, # this event + 'z': accum_size_filled, # accum # commissions - 'n': float(cost), - 'N': str(cost_asset), + 'n': cost, + 'N': cost_asset, # state - 'S': str(side), - 'X': str(status), + 'S': side, + 'X': status, }, } as order_msg: log.info( @@ -485,12 +542,18 @@ async def handle_order_updates( # - CANCELED # - EXPIRED # https://binance-docs.github.io/apidocs/futures/en/#event-order-update + + req_size: float = float(req_size) + accum_size_filled: float = float(accum_size_filled) + fill_price: float = float(fill_price) + match status: case 'PARTIALLY_FILLED' | 'FILLED': status = 'fill' fill_msg = BrokerdFill( time_ns=time_ns(), + # reqid=reqid, reqid=oid, # just use size value for now? @@ -507,21 +570,26 @@ async def handle_order_updates( if accum_size_filled == req_size: status = 'closed' + del dialogs._dialogs[oid] case 'NEW': status = 'open' case 'EXPIRED': status = 'canceled' + del dialogs._dialogs[oid] case _: status = status.lower() resp = BrokerdStatus( time_ns=time_ns(), + # reqid=reqid, reqid=oid, + # account='binance.usdtm', status=status, + filled=accum_size_filled, remaining=req_size - accum_size_filled, broker_details={ @@ -530,3 +598,9 @@ async def handle_order_updates( } ) await ems_stream.send(resp) + + case _: + log.warning( + 'Unhandled event:\n' + f'{pformat(msg)}' + )