diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 1fbe89ba..481c427c 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -49,6 +49,9 @@ from fuzzywuzzy import process as fuzzy import numpy as np from piker import config +from piker.clearing._messages import ( + Order, +) from piker.accounting import ( Asset, digits_to_dec, @@ -378,9 +381,6 @@ class Client: raise SymbolNotFound(f'No market pairs found!?:\n{resp}') pairs_view_subtable: dict[str, Pair] = {} - # if venue == 'spot': - # import tractor - # await tractor.breakpoint() for item in mkt_pairs: filters_ls: list = item.pop('filters', False) @@ -619,6 +619,68 @@ class Client: signed=True, ) + async def get_open_orders( + self, + symbol: str | None = None, + + ) -> list[Order]: + ''' + Get all open orders for venue-account. + + WARNING: apparently not specifying the symbol is given + a much heavier API "weight" meaning you shouldn't call it + often to avoid getting throttled as per: + + 'https://binance-docs.github.io/apidocs/futures/en/#current-all-open-orders-user_data + + + ''' + params: dict[str, Any] = { + 'timestamp': binance_timestamp(now()), + } + if symbol is not None: + params['symbol'] = symbol + + resp = await self.mkt_mode_req[self.mkt_mode]( + 'openOrders', + params=params, + signed=True, + action='get', + ) + orders: list[Order] = [] + for entry in resp: + oid: str = entry['clientOrderId'] + + # XXX TODO XXX: it appears as though entries have no + # indicator from the symbology system which market + # / venue the order is from.. which normally isn't + # a huge deal since you could assume based on the + # endpoint you made the request to, BUT the futes USD-M + # endpoints have multiple contracts for the same + # symbols (eg. BTCUSDT.PERP, BTCUSDT.230630.. etc.) + # NOTE: for now until we have a better system we're + # going to assume orders that don't have some kind of + # further info in the order resp dict are perps though + # likely this will need to change in the future.. + venue: str = self.mkt_mode.rstrip('_futes') + bs_mktid: str = entry['symbol'] + fqme: str = f'{bs_mktid.lower()}.{venue}.perp' + + orders.append( + Order( + oid=oid, + symbol=fqme, + + action=entry['side'].lower(), + price=float(entry['price']), + size=float(entry['origQty']), + + exec_mode='live', + account=f'binance.{venue}', + ) + ) + return orders + async def submit_limit( self, symbol: str, diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 0145c7f5..dfc8373c 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -62,6 +62,8 @@ from piker.clearing._messages import ( BrokerdFill, BrokerdCancel, BrokerdError, + Status, + Order, ) from .venues import Pair from .api import Client @@ -69,6 +71,10 @@ from .api import Client log = get_logger('piker.brokers.binance') +# TODO: factor this into `.clearing._util` (or something) +# and use in other backends like kraken which currently has +# a less formalized version more or less: +# `apiflows[reqid].maps.append(status_msg.to_dict())` class OrderDialogs(Struct): ''' Order control dialog (and thus transaction) tracking via @@ -79,25 +85,49 @@ class OrderDialogs(Struct): state using the entire (reverse chronological) msg flow. ''' - _dialogs: defaultdict[str, ChainMap] = defaultdict(ChainMap) + _flows: dict[str, ChainMap] = {} def add_msg( self, oid: str, msg: dict, ) -> None: - self._dialogs[oid].maps.insert(0, msg) + + # NOTE: manually enter a new map on the first msg add to + # avoid creating one with an empty dict first entry in + # `ChainMap.maps` which is the default if none passed at + # init. + cm: ChainMap = self._flows.get(oid) + if cm: + cm.maps.insert(0, msg) + else: + cm = ChainMap(msg) + self._flows[oid] = cm # TODO: wrap all this in the `collections.abc.Mapping` interface? def get( self, oid: str, + ) -> ChainMap[str, Any]: ''' Return the dialog `ChainMap` for provided id. ''' - return self._dialogs.get(oid, None) + return self._flows.get(oid, None) + + def pop( + self, + oid: str, + + ) -> ChainMap[str, Any]: + ''' + Pop and thus remove the `ChainMap` containing the msg flow + for the given order id. + + ''' + return self._flows.pop(oid) + async def handle_order_requests( @@ -277,11 +307,15 @@ async def open_trade_dialog( f"{listen_key}@account", f"{listen_key}@balance", f"{listen_key}@position", + + # TODO: does this even work!? seems to cause + # a hang on the first msg..? lelelel. + # f"{listen_key}@order", ], "id": nsid }) - with trio.fail_after(1): + with trio.fail_after(6): msg = await wss.recv_msg() assert msg['id'] == nsid @@ -401,6 +435,24 @@ async def open_trade_dialog( trio.open_nursery() as tn, ctx.open_stream() as ems_stream, ): + # deliver all pre-exist open orders to EMS thus syncing + # state with the binance existing live limit set. + open_orders: list[Order] = await client.get_open_orders() + + # fill out `Status` with boxed `Order`s and sync the EMS. + for order in open_orders: + status_msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=order.oid, + reqid=order.oid, + + # embedded order info + req=order, + src='binance', + ) + dialogs.add_msg(order.oid, order.to_dict()) + await ems_stream.send(status_msg) tn.start_soon( handle_order_requests, @@ -565,16 +617,14 @@ async def handle_order_updates( if accum_size_filled == req_size: status = 'closed' - del dialogs._dialogs[oid] + dialogs.pop(oid) case 'NEW': status = 'open' case 'EXPIRED': status = 'canceled' - del dialogs._dialogs[oid] - - # case 'TRADE': + dialogs.pop(oid) case _: status = status.lower()