From 3f555b2f5a398adba57d0e334b31d015207c5382 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 17 Jun 2023 16:06:42 -0400 Subject: [PATCH] Fix user event matching Was using the wrong key before from our old code (not sure how that slipped back in.. prolly doing too many git stashes XD), so fix that to properly match against order update events with 'ORDER_TRADE_UPDATE'. Also, don't match on the types we want to *cast to*, that's not how match syntax works (facepalm), so we have to typecast prior to EMS msg creation / downstream logic. Further, - try not bothering with binance's own internal `'orderId'` field tracking since they seem to support just using your own user version for all ctl endpoints? (thus we only need to track the EMS `.oid`s B) - log all event update msgs for now. - pop order dialogs on 'closed' statuses. - wrap cancel requests in an error handler block since it seems the EMS is double sending requests from the client? --- piker/brokers/binance/broker.py | 148 ++++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 37 deletions(-) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 1926fdba..d43f2a99 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -88,12 +88,16 @@ class OrderDialogs(Struct): ) -> None: self._dialogs[oid].maps.insert(0, msg) + # TODO: wrap all this in the `collections.abc.Mapping` interface? def get( self, oid: str, - field: str, - ) -> Any: - return self._dialogs[oid][field] + ) -> ChainMap[str, Any]: + ''' + Return the dialog `ChainMap` for provided id. + + ''' + return self._dialogs.get(oid, None) async def handle_order_requests( @@ -134,10 +138,23 @@ async def handle_order_requests( continue else: - await client.submit_cancel( - existing.symbol, - cancel.oid, - ) + symbol: str = existing['symbol'] + try: + await client.submit_cancel( + symbol, + cancel.oid, + ) + except BrokerError as be: + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=symbol, + reason=( + '`binance` CANCEL failed:\n' + f'{be}' + )) + ) + continue case { 'account': ('binance.usdtm' | 'binance.spot') as account, @@ -151,35 +168,35 @@ async def handle_order_requests( # NOTE: check and report edits if existing := dialogs.get(order.oid): log.info( - f'Existing order for {existing.oid} updated:\n' - f'{pformat(existing.to_dict())} -> {pformat(msg)}' + f'Existing order for {oid} updated:\n' + f'{pformat(existing.maps[-1])} -> {pformat(msg)}' ) # TODO: figure out what special params we have to send? # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade # lookup the binance-native symbol - bs_mktid: str = client._pairs[order.symbol.upper()].symbol + # bs_mktid: str = client._pairs[order.symbol.upper()].symbol # call our client api to submit the order try: - reqid = await client.submit_limit( - symbol=bs_mktid, - side=order.action, - quantity=order.size, - price=order.price, - oid=oid, - ) - # XXX: ACK the request **immediately** before sending # the api side request to ensure the ems maps the oid -> # reqid correctly! resp = BrokerdOrderAck( oid=oid, # ems order request id - reqid=reqid, # our custom int mapping + reqid=oid, # our custom int mapping account='binance', # piker account ) await ems_order_stream.send(resp) + reqid = await client.submit_limit( + symbol=order.symbol, + side=order.action, + quantity=order.size, + price=order.price, + oid=oid, + ) + # SMH they do gen their own order id: ints.. # assert reqid == order.oid dids[order.oid] = reqid @@ -187,7 +204,7 @@ async def handle_order_requests( # track latest request state such that map # lookups start at the most recent msg and then # scan reverse-chronologically. - dialogs.add_msg(msg) + dialogs.add_msg(oid, msg) except BrokerError as be: await ems_order_stream.send( @@ -235,7 +252,7 @@ async def open_trade_dialog( client.mkt_mode: str = 'usdtm_futes' # if client. - account: str = client.mkt_mode + venue: str = client.mkt_mode wss: NoBsWs async with ( @@ -332,14 +349,14 @@ async def open_trade_dialog( pair: Pair | None if ( - pair := client._venue2pairs[account].get(bs_mktid) + pair := client._venue2pairs[venue].get(bs_mktid) and entry_size > 0 ): entry_price: float = float(entry['entryPrice']) ppmsg = BrokerdPosition( broker='binance', - account='binance.futes', + account='binance.usdtm', # TODO: maybe we should be passing back # a `MktPair` here? @@ -365,6 +382,9 @@ async def open_trade_dialog( # .accounting support B) # - live order loading via user stream subscription and # update to the order dialog table. + # - MAKE SURE we add live orders loaded during init + # into the dialogs table to ensure they can be + # cancelled, meaning we can do a symbol lookup. # - position loading using `piker.accounting` subsys # and comparison with binance's own position calcs. # - load pps and accounts using accounting apis, write @@ -388,6 +408,7 @@ async def open_trade_dialog( handle_order_updates, ems_stream, wss, + dialogs, ) @@ -397,6 +418,7 @@ async def open_trade_dialog( async def handle_order_updates( ems_stream: tractor.MsgStream, wss: NoBsWs, + dialogs: OrderDialogs, # apiflows: dict[int, ChainMap[dict[str, dict]]], # ids: bidict[str, int], @@ -418,20 +440,55 @@ async def handle_order_updates( ''' async for msg in wss: + log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') match msg: - log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') # TODO: # POSITION update - # futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update # ORDER update # spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update # futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update + # futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update + # {'o': { + # 'L': '0', + # 'N': 'USDT', + # 'R': False, + # 'S': 'BUY', + # 'T': 1687028772484, + # 'X': 'NEW', + # 'a': '0', + # 'ap': '0', + # 'b': '7012.06520', + # 'c': '518d4122-8d3e-49b0-9a1e-1fabe6f62e4c', + # 'cp': False, + # 'f': 'GTC', + # 'i': 3376956924, + # 'l': '0', + # 'm': False, + # 'n': '0', + # 'o': 'LIMIT', + # 'ot': 'LIMIT', + # 'p': '21136.80', + # 'pP': False, + # 'ps': 'BOTH', + # 'q': '0.047', + # 'rp': '0', + # 's': 'BTCUSDT', + # 'si': 0, + # 'sp': '0', + # 'ss': 0, + # 't': 0, + # 'wt': 'CONTRACT_PRICE', + # 'x': 'NEW', + # 'z': '0'} + # } case { - 'e': 'executionReport', - 'T': float(epoch_ms), + # 'e': 'executionReport', + 'e': 'ORDER_TRADE_UPDATE', + 'T': int(epoch_ms), 'o': { + 'i': reqid, 's': bs_mktid, # XXX NOTE XXX see special ids for market @@ -444,22 +501,22 @@ async def handle_order_updates( 'c': oid, # prices - 'a': float(submit_price), - 'ap': float(avg_price), - 'L': float(fill_price), + 'a': submit_price, + 'ap': avg_price, + 'L': fill_price, # sizing - 'q': float(req_size), - 'l': float(clear_size_filled), # this event - 'z': float(accum_size_filled), # accum + 'q': req_size, + 'l': clear_size_filled, # this event + 'z': accum_size_filled, # accum # commissions - 'n': float(cost), - 'N': str(cost_asset), + 'n': cost, + 'N': cost_asset, # state - 'S': str(side), - 'X': str(status), + 'S': side, + 'X': status, }, } as order_msg: log.info( @@ -485,12 +542,18 @@ async def handle_order_updates( # - CANCELED # - EXPIRED # https://binance-docs.github.io/apidocs/futures/en/#event-order-update + + req_size: float = float(req_size) + accum_size_filled: float = float(accum_size_filled) + fill_price: float = float(fill_price) + match status: case 'PARTIALLY_FILLED' | 'FILLED': status = 'fill' fill_msg = BrokerdFill( time_ns=time_ns(), + # reqid=reqid, reqid=oid, # just use size value for now? @@ -507,21 +570,26 @@ async def handle_order_updates( if accum_size_filled == req_size: status = 'closed' + del dialogs._dialogs[oid] case 'NEW': status = 'open' case 'EXPIRED': status = 'canceled' + del dialogs._dialogs[oid] case _: status = status.lower() resp = BrokerdStatus( time_ns=time_ns(), + # reqid=reqid, reqid=oid, + # account='binance.usdtm', status=status, + filled=accum_size_filled, remaining=req_size - accum_size_filled, broker_details={ @@ -530,3 +598,9 @@ async def handle_order_updates( } ) await ems_stream.send(resp) + + case _: + log.warning( + 'Unhandled event:\n' + f'{pformat(msg)}' + )