diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 51cf4a39..f85b1120 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -25,6 +25,7 @@ from contextlib import asynccontextmanager from dataclasses import asdict from datetime import datetime from functools import partial +import itertools from typing import ( Any, Optional, AsyncIterator, Awaitable, @@ -32,7 +33,6 @@ from typing import ( import asyncio from pprint import pformat import inspect -import itertools import logging from random import randint import time @@ -727,7 +727,17 @@ async def load_aio_clients( # attempt to get connection info from config; if no .toml entry # exists, we try to load from a default localhost connection. - host = conf.get('host', '127.0.0.1') + localhost = '127.0.0.1' + host, hosts = conf.get('host'), conf.get('hosts') + if not (hosts or host): + host = localhost + + if not hosts: + hosts = [host] + elif host and hosts: + raise ValueError( + 'Specify only one of `host` or `hosts` in `brokers.toml` config') + ports = conf.get( 'ports', @@ -735,17 +745,18 @@ async def load_aio_clients( { 'gw': 4002, 'tws': 7497, - 'order': ['gw', 'tws'] + # 'order': ['gw', 'tws'] } ) - order = ports['order'] + order = ports.pop('order', None) + if order: + log.warning(f'`ports.order` section in `brokers.toml` is deprecated') accounts_def = config.load_accounts(['ib']) - - try_ports = [ports[key] for key in order] + try_ports = list(ports.values()) ports = try_ports if port is None else [port] - we_connected = [] + # allocate new and/or reload disconnected but cached clients try: # TODO: support multiple clients allowing for execution on @@ -756,7 +767,8 @@ async def load_aio_clients( # (re)load any and all clients that can be found # from connection details in ``brokers.toml``. - for port in ports: + combos = list(itertools.product(hosts, ports)) + for host, port in combos: client = _client_cache.get((host, port)) accounts_found: dict[str, Client] = {} if not client or not client.ib.isConnected(): @@ -768,7 +780,12 @@ async def load_aio_clients( client_id = next(_client_ids) log.info(f"Connecting to the EYEBEE on port {port}!") - await ib.connectAsync(host, port, clientId=client_id) + await ib.connectAsync( + host, + port, + clientId=client_id, + timeout=0.5, + ) # create and cache client client = Client(ib) @@ -804,12 +821,25 @@ async def load_aio_clients( we_connected.append(client) _accounts2clients.update(accounts_found) - except ConnectionRefusedError as ce: + except ( + ConnectionRefusedError, + + # TODO: if trying to scan for remote api clients + # pretty sure we need to catch this, though it + # definitely needs a shorter timeout since it hangs + # for like 5s.. + asyncio.exceptions.TimeoutError, + OSError, + ) as ce: _err = ce log.warning(f'Failed to connect on {port}') else: if not _client_cache: - raise ConnectionRefusedError(_err) + raise ConnectionError( + 'No ib APIs could be found scanning @:\n' + f'{pformat(combos)}\n' + 'Check your `brokers.toml` and/or network' + ) from _err # retreive first loaded client clients = list(_client_cache.values())