Add (list of) `hosts` support in config and better scan error msg

vlm_plotz_backup
Tyler Goodlet 2021-09-18 17:06:25 -04:00
parent 91c005b3c1
commit eca9b14cd6
1 changed files with 41 additions and 11 deletions

View File

@ -25,6 +25,7 @@ from contextlib import asynccontextmanager
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
import itertools
from typing import ( from typing import (
Any, Optional, Any, Optional,
AsyncIterator, Awaitable, AsyncIterator, Awaitable,
@ -32,7 +33,6 @@ from typing import (
import asyncio import asyncio
from pprint import pformat from pprint import pformat
import inspect import inspect
import itertools
import logging import logging
from random import randint from random import randint
import time import time
@ -727,7 +727,17 @@ async def load_aio_clients(
# attempt to get connection info from config; if no .toml entry # attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection. # exists, we try to load from a default localhost connection.
host = conf.get('host', '127.0.0.1') localhost = '127.0.0.1'
host, hosts = conf.get('host'), conf.get('hosts')
if not (hosts or host):
host = localhost
if not hosts:
hosts = [host]
elif host and hosts:
raise ValueError(
'Specify only one of `host` or `hosts` in `brokers.toml` config')
ports = conf.get( ports = conf.get(
'ports', 'ports',
@ -735,17 +745,18 @@ async def load_aio_clients(
{ {
'gw': 4002, 'gw': 4002,
'tws': 7497, 'tws': 7497,
'order': ['gw', 'tws'] # 'order': ['gw', 'tws']
} }
) )
order = ports['order'] order = ports.pop('order', None)
if order:
log.warning(f'`ports.order` section in `brokers.toml` is deprecated')
accounts_def = config.load_accounts(['ib']) accounts_def = config.load_accounts(['ib'])
try_ports = list(ports.values())
try_ports = [ports[key] for key in order]
ports = try_ports if port is None else [port] ports = try_ports if port is None else [port]
we_connected = [] we_connected = []
# allocate new and/or reload disconnected but cached clients # allocate new and/or reload disconnected but cached clients
try: try:
# TODO: support multiple clients allowing for execution on # TODO: support multiple clients allowing for execution on
@ -756,7 +767,8 @@ async def load_aio_clients(
# (re)load any and all clients that can be found # (re)load any and all clients that can be found
# from connection details in ``brokers.toml``. # from connection details in ``brokers.toml``.
for port in ports: combos = list(itertools.product(hosts, ports))
for host, port in combos:
client = _client_cache.get((host, port)) client = _client_cache.get((host, port))
accounts_found: dict[str, Client] = {} accounts_found: dict[str, Client] = {}
if not client or not client.ib.isConnected(): if not client or not client.ib.isConnected():
@ -768,7 +780,12 @@ async def load_aio_clients(
client_id = next(_client_ids) client_id = next(_client_ids)
log.info(f"Connecting to the EYEBEE on port {port}!") log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id) await ib.connectAsync(
host,
port,
clientId=client_id,
timeout=0.5,
)
# create and cache client # create and cache client
client = Client(ib) client = Client(ib)
@ -804,12 +821,25 @@ async def load_aio_clients(
we_connected.append(client) we_connected.append(client)
_accounts2clients.update(accounts_found) _accounts2clients.update(accounts_found)
except ConnectionRefusedError as ce: except (
ConnectionRefusedError,
# TODO: if trying to scan for remote api clients
# pretty sure we need to catch this, though it
# definitely needs a shorter timeout since it hangs
# for like 5s..
asyncio.exceptions.TimeoutError,
OSError,
) as ce:
_err = ce _err = ce
log.warning(f'Failed to connect on {port}') log.warning(f'Failed to connect on {port}')
else: else:
if not _client_cache: if not _client_cache:
raise ConnectionRefusedError(_err) raise ConnectionError(
'No ib APIs could be found scanning @:\n'
f'{pformat(combos)}\n'
'Check your `brokers.toml` and/or network'
) from _err
# retreive first loaded client # retreive first loaded client
clients = list(_client_cache.values()) clients = list(_client_cache.values())