From dedfb27a3a4eaa922dcb6b58f822df9e711d0ee4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Sep 2021 15:55:45 -0400 Subject: [PATCH] Add per-account order entry for ib Make the `handle_order_requests()` tasks now lookup the appropriate API client for a given account (or error if it can't be found) and use it for submission. Account names are loaded from the `brokers.toml::accounts.ib` section both UI side and in the `brokerd`. Change `_aio_get_client()` to a `load_aio_client()` which now tries to scan and load api clients for all connections defined in the config as well as deliver the client cache and account lookup tables. --- piker/brokers/ib.py | 179 ++++++++++++++++++++++++++++++++------------ 1 file changed, 132 insertions(+), 47 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 2e80c0a0..c4cab60d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -62,8 +62,7 @@ from ._util import SymbolNotFound, NoData from ..clearing._messages import ( BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdPosition, BrokerdCancel, - BrokerdFill, - # BrokerdError, + BrokerdFill, BrokerdError, ) @@ -196,8 +195,8 @@ _adhoc_futes_set = { 'mgc.nymex', 'xagusd.cmdty', # silver spot - 'ni.nymex', # silver futes - 'qi.comex', # mini-silver futes + 'ni.nymex', # silver futes + 'qi.comex', # mini-silver futes } # exchanges we don't support at the moment due to not knowing @@ -222,7 +221,9 @@ class Client: """ def __init__( self, + ib: ibis.IB, + ) -> None: self.ib = ib self.ib.RaiseRequestErrors = True @@ -513,7 +514,7 @@ class Client: price: float, action: str, size: int, - account: str = '', # if blank the "default" tws account is used + account: str, # if blank the "default" tws account is used # XXX: by default 0 tells ``ib_insync`` methods that there is no # existing order so ask the client to create a new one (which it @@ -536,6 +537,7 @@ class Client: Order( orderId=reqid or 0, # stupid api devs.. action=action.upper(), # BUY/SELL + # lookup the literal account number by name here. account=account, orderType='LMT', lmtPrice=price, @@ -659,9 +661,10 @@ class Client: self.ib.errorEvent.connect(push_err) - async def positions( + def positions( self, account: str = '', + ) -> list[Position]: """ Retrieve position info for ``account``. @@ -695,8 +698,11 @@ def get_config() -> dict[str, Any]: return section +_accounts2clients: dict[str, Client] = {} + + @asynccontextmanager -async def _aio_get_client( +async def load_aio_clients( host: str = '127.0.0.1', port: int = None, @@ -710,23 +716,23 @@ async def _aio_get_client( TODO: consider doing this with a ctx mngr eventually? ''' + global _accounts2clients + global _client_cache + conf = get_config() + client = None # first check cache for existing client + if port: + log.info(f'Loading requested client on port: {port}') + client = _client_cache.get((host, port)) + + if client and client.ib.isConnected(): + yield client, _client_cache, _accounts2clients + return + + # allocate new and/or reload disconnected but cached clients try: - if port: - client = _client_cache[(host, port)] - else: - # grab first cached client - client = list(_client_cache.values())[0] - - if not client.ib.isConnected(): - # we have a stale client to re-allocate - raise KeyError - - yield client - - except (KeyError, IndexError): # TODO: in case the arbiter has no record # of existing brokerd we need to broadcast for one. @@ -753,6 +759,8 @@ async def _aio_get_client( ) order = ports['order'] + accounts_def = config.load_accounts('ib') + try_ports = [ports[key] for key in order] ports = try_ports if port is None else [port] @@ -762,29 +770,64 @@ async def _aio_get_client( _err = None + # (re)load any and all clients that can be found + # from connection details in ``brokers.toml``. for port in ports: - try: - log.info(f"Connecting to the EYEBEE on port {port}!") - await ib.connectAsync(host, port, clientId=client_id) - break - except ConnectionRefusedError as ce: - _err = ce - log.warning(f'Failed to connect on {port}') + client = _client_cache.get((host, port)) + + if not client or not client.ib.isConnected(): + try: + log.info(f"Connecting to the EYEBEE on port {port}!") + await ib.connectAsync(host, port, clientId=client_id) + + # create and cache client + client = Client(ib) + + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + _accounts2clients[ + accounts_def.inverse[pp.account] + ] = client + + # if there are no positions or accounts + # without positions we should still register + # them for this client + for value in ib.accountValues(): + acct = value.account + if acct not in _accounts2clients: + _accounts2clients[ + accounts_def.inverse[acct] + ] = client + + log.info( + f'Loaded accounts: {_accounts2clients} for {client} ' + f'@ {host}:{port}' + ) + + log.info(f"Caching client for {(host, port)}") + _client_cache[(host, port)] = client + + except ConnectionRefusedError as ce: + _err = ce + log.warning(f'Failed to connect on {port}') else: - raise ConnectionRefusedError(_err) + if not _client_cache: + raise ConnectionRefusedError(_err) - # create and cache - try: - client = Client(ib) + # retreive first loaded client + clients = list(_client_cache.values()) + if clients: + client = clients[0] - _client_cache[(host, port)] = client - log.debug(f"Caching client for {(host, port)}") + yield client, _client_cache, _accounts2clients - yield client - - except BaseException: - ib.disconnect() - raise + except BaseException: + ib.disconnect() + raise async def _aio_run_client_method( @@ -793,8 +836,12 @@ async def _aio_run_client_method( from_trio=None, **kwargs, ) -> None: - async with _aio_get_client() as client: + async with load_aio_clients() as ( + client, + clients, + accts2clients, + ): async_meth = getattr(client, meth) # handle streaming methods @@ -1081,8 +1128,11 @@ async def _setup_quote_stream( """ global _quote_streams - async with _aio_get_client() as client: - + async with load_aio_clients() as ( + client, + clients, + accts2clients, + ): contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -1324,11 +1374,41 @@ async def handle_order_requests( ) -> None: + global _accounts2clients + accounts_def = config.load_accounts('ib') + # request_msg: dict async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') action = request_msg['action'] + account = request_msg['account'] + + acct_number = accounts_def.get(account) + if not acct_number: + log.error( + f'An IB account number for name {account} is not found?\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No account found: `{account}` ?', + ).dict()) + continue + + client = _accounts2clients.get(account) + if not client: + log.error( + f'An IB client for account name {account} is not found.\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No api client loaded for account: `{account}` ?', + ).dict()) + continue if action in {'buy', 'sell'}: # validate @@ -1343,6 +1423,7 @@ async def handle_order_requests( price=order.price, action=order.action, size=order.size, + account=order.account, # XXX: by default 0 tells ``ib_insync`` methods that # there is no existing order so ask the client to create @@ -1359,6 +1440,7 @@ async def handle_order_requests( # broker specific request id reqid=reqid, time_ns=time.time_ns(), + account=account, ).dict() ) @@ -1388,15 +1470,16 @@ async def trades_dialogue( ib_trade_events_stream = await _trio_run_client_method( method='recv_trade_updates', ) + global _accounts2clients + global _client_cache # deliver positions to subscriber before anything else - positions = await _trio_run_client_method(method='positions') - all_positions = {} - for pos in positions: - msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() + for client in _client_cache.values(): + for pos in client.positions(): + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() await ctx.started(all_positions) @@ -1413,7 +1496,8 @@ async def trades_dialogue( # ib-gw goes down? Not sure exactly how that's happening looking # at the eventkit code above but we should probably handle it... async for event_name, item in ib_trade_events_stream: - print(f' ib sending {item}') + + log.info(f'ib sending {event_name}:\n{pformat(item)}') # TODO: templating the ib statuses in comparison with other # brokers is likely the way to go: @@ -1453,6 +1537,7 @@ async def trades_dialogue( reqid=trade.order.orderId, time_ns=time.time_ns(), # cuz why not + # account=client. # everyone doin camel case.. status=status.status.lower(), # force lower case