diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index f85b1120..b8e4ec96 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -34,6 +34,7 @@ import asyncio from pprint import pformat import inspect import logging +import platform from random import randint import time @@ -684,7 +685,8 @@ _try_ports = [ # TODO: remove the randint stuff and use proper error checking in client # factor below.. _client_ids = itertools.count(randint(1, 100)) -_client_cache = {} +_client_cache: dict[tuple[str, int], Client] = {} +_scan_ignore: set[tuple[str, int]] = set() def get_config() -> dict[str, Any]: @@ -718,8 +720,7 @@ async def load_aio_clients( TODO: consider doing this with a ctx mngr eventually? ''' - global _accounts2clients - global _client_cache + global _accounts2clients, _client_cache, _scan_ignore conf = get_config() ib = None @@ -750,12 +751,14 @@ async def load_aio_clients( ) order = ports.pop('order', None) if order: - log.warning(f'`ports.order` section in `brokers.toml` is deprecated') + log.warning('`ports.order` section in `brokers.toml` is deprecated') accounts_def = config.load_accounts(['ib']) try_ports = list(ports.values()) ports = try_ports if port is None else [port] we_connected = [] + connect_timeout = 0.5 if platform.system() != 'Windows' else 1 + combos = list(itertools.product(hosts, ports)) # allocate new and/or reload disconnected but cached clients try: @@ -767,72 +770,86 @@ async def load_aio_clients( # (re)load any and all clients that can be found # from connection details in ``brokers.toml``. - combos = list(itertools.product(hosts, ports)) for host, port in combos: - client = _client_cache.get((host, port)) + + sockaddr = (host, port) + client = _client_cache.get(sockaddr) accounts_found: dict[str, Client] = {} - if not client or not client.ib.isConnected(): - try: - ib = NonShittyIB() - # if this is a persistent brokerd, try to allocate - # a new id for each client - client_id = next(_client_ids) + if ( + client and client.ib.isConnected() or + sockaddr in _scan_ignore + ): + continue - log.info(f"Connecting to the EYEBEE on port {port}!") - await ib.connectAsync( - host, - port, - clientId=client_id, - timeout=0.5, - ) + try: + ib = NonShittyIB() - # create and cache client - client = Client(ib) + # if this is a persistent brokerd, try to allocate + # a new id for each client + client_id = next(_client_ids) - # Pre-collect all accounts available for this - # connection and map account names to this client - # instance. - pps = ib.positions() - if pps: - for pp in pps: - accounts_found[ - accounts_def.inverse[pp.account] - ] = client + await ib.connectAsync( + host, + port, + clientId=client_id, - # if there are no positions or accounts - # without positions we should still register - # them for this client - for value in ib.accountValues(): - acct = value.account - if acct not in accounts_found: - accounts_found[ - accounts_def.inverse[acct] - ] = client + # this timeout is sensative on windows and will + # fail without a good "timeout error" so be + # careful. + timeout=connect_timeout, + ) - log.info( - f'Loaded accounts for client @ {host}:{port}\n' - f'{pformat(accounts_found)}' - ) + # create and cache client + client = Client(ib) - # update all actor-global caches - log.info(f"Caching client for {(host, port)}") - _client_cache[(host, port)] = client - we_connected.append(client) - _accounts2clients.update(accounts_found) + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + accounts_found[ + accounts_def.inverse[pp.account] + ] = client - except ( - ConnectionRefusedError, + # if there are no positions or accounts + # without positions we should still register + # them for this client + for value in ib.accountValues(): + acct = value.account + if acct not in accounts_found: + accounts_found[ + accounts_def.inverse[acct] + ] = client - # TODO: if trying to scan for remote api clients - # pretty sure we need to catch this, though it - # definitely needs a shorter timeout since it hangs - # for like 5s.. - asyncio.exceptions.TimeoutError, - OSError, - ) as ce: - _err = ce - log.warning(f'Failed to connect on {port}') + log.info( + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' + ) + + # update all actor-global caches + log.info(f"Caching client for {(host, port)}") + _client_cache[(host, port)] = client + we_connected.append(client) + _accounts2clients.update(accounts_found) + + except ( + ConnectionRefusedError, + + # TODO: if trying to scan for remote api clients + # pretty sure we need to catch this, though it + # definitely needs a shorter timeout since it hangs + # for like 5s.. + asyncio.exceptions.TimeoutError, + OSError, + ) as ce: + _err = ce + log.warning(f'Failed to connect on {port}') + + # cache logic to avoid rescanning if we already have all + # clients loaded. + _scan_ignore.add(sockaddr) else: if not _client_cache: raise ConnectionError( @@ -1082,10 +1099,16 @@ async def get_bars( async def backfill_bars( + sym: str, shm: ShmArray, # type: ignore # noqa - # count: int = 20, # NOTE: any more and we'll overrun underlying buffer - count: int = 6, # NOTE: any more and we'll overrun the underlying buffer + + # TODO: we want to avoid overrunning the underlying shm array buffer + # and we should probably calc the number of calls to make depending + # on that until we have the `marketstore` daemon in place in which + # case the shm size will be driven by user config and available sys + # memory. + count: int = 24, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1121,6 +1144,17 @@ async def backfill_bars( continue bars, bars_array, next_dt = out + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + # TODO we should probably dig into forums to see what peeps + # think this data "means" and then use it as an indicator of + # sorts? dinkus has mentioned that $vlms for the day dont' + # match other platforms nor the summary stat tws shows in + # the monitor - it's probably worth investigating. + shm.push(bars_array, prepend=True) i += 1