diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 5f81a158..00f8ca35 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -38,8 +38,6 @@ from typing import ( import asyncio from pprint import pformat import inspect -import logging -from random import randint import time from types import SimpleNamespace @@ -164,13 +162,23 @@ class NonShittyIB(ibis.IB): - Don't use named tuples """ def __init__(self): + + # override `ib_insync` internal loggers so we can see wtf + # it's doing.. + self._logger = get_logger( + 'ib_insync.ib', + ) self._createEvents() + # XXX: just to override this wrapper self.wrapper = NonShittyWrapper(self) self.client = ib_Client(self.wrapper) + self.client._logger = get_logger( + 'ib_insync.client', + ) + # self.errorEvent += self._onError self.client.apiEnd += self.disconnectedEvent - self._logger = logging.getLogger('ib_insync.ib') # map of symbols to contract ids @@ -883,9 +891,6 @@ _try_ports = [ _gw_port, _tws_port ] -# TODO: remove the randint stuff and use proper error checking in client -# factor below.. -_client_ids = itertools.count(randint(1, 100)) _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() @@ -911,8 +916,12 @@ async def load_aio_clients( host: str = '127.0.0.1', port: int = None, + client_id: int = 6116, - client_id: Optional[int] = None, + # the API TCP in `ib_insync` connection can be flaky af so instead + # retry a few times to get the client going.. + connect_retries: int = 3, + connect_timeout: float = 0.5, ) -> dict[str, Client]: ''' @@ -949,133 +958,111 @@ async def load_aio_clients( { 'gw': 4002, 'tws': 7497, - # 'order': ['gw', 'tws'] } ) order = ports.pop('order', None) if order: log.warning('`ports.order` section in `brokers.toml` is deprecated') + _err = None accounts_def = config.load_accounts(['ib']) try_ports = list(ports.values()) ports = try_ports if port is None else [port] - # we_connected = [] - connect_timeout = 2 combos = list(itertools.product(hosts, ports)) - - # allocate new and/or reload disconnected but cached clients - # try: - # TODO: support multiple clients allowing for execution on - # multiple accounts (including a paper instance running on the - # same machine) and switching between accounts in the ems. - - _err = None + accounts_found: dict[str, Client] = {} # (re)load any and all clients that can be found # from connection details in ``brokers.toml``. for host, port in combos: sockaddr = (host, port) - client = _client_cache.get(sockaddr) - accounts_found: dict[str, Client] = {} - if ( - client and client.ib.isConnected() + sockaddr in _client_cache or sockaddr in _scan_ignore ): continue - try: - ib = NonShittyIB() + ib = NonShittyIB() - # XXX: not sure if we ever really need to increment the - # client id if teardown is sucessful. - client_id = 6116 + for i in range(connect_retries): + try: + await ib.connectAsync( + host, + port, + clientId=client_id, - await ib.connectAsync( - host, - port, - clientId=client_id, + # this timeout is sensative on windows and will + # fail without a good "timeout error" so be + # careful. + timeout=connect_timeout, + ) + break - # this timeout is sensative on windows and will - # fail without a good "timeout error" so be - # careful. - timeout=connect_timeout, - ) + except ( + ConnectionRefusedError, - # create and cache client - client = Client(ib) + # TODO: if trying to scan for remote api clients + # pretty sure we need to catch this, though it + # definitely needs a shorter timeout since it hangs + # for like 5s.. + asyncio.exceptions.TimeoutError, + OSError, + ) as ce: + _err = ce - # Pre-collect all accounts available for this - # connection and map account names to this client - # instance. - pps = ib.positions() - if pps: - for pp in pps: - accounts_found[ - accounts_def.inverse[pp.account] - ] = client + if i > 8: + # cache logic to avoid rescanning if we already have all + # clients loaded. + _scan_ignore.add(sockaddr) + raise - # if there are accounts without positions we should still - # register them for this client - for value in ib.accountValues(): - acct_number = value.account + log.warning( + f'Failed to connect on {port} for {i} time, retrying...') - entry = accounts_def.inverse.get(acct_number) - if not entry: - raise ValueError( - 'No section in brokers.toml for account:' - f' {acct_number}\n' - f'Please add entry to continue using this API client' - ) + # create and cache client + client = Client(ib) - # surjection of account names to operating clients. - if acct_number not in accounts_found: - accounts_found[entry] = client + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + accounts_found[ + accounts_def.inverse[pp.account] + ] = client - log.info( - f'Loaded accounts for client @ {host}:{port}\n' - f'{pformat(accounts_found)}' - ) + # if there are accounts without positions we should still + # register them for this client + for value in ib.accountValues(): + acct_number = value.account - # update all actor-global caches - log.info(f"Caching client for {(host, port)}") - _client_cache[(host, port)] = client + entry = accounts_def.inverse.get(acct_number) + if not entry: + raise ValueError( + 'No section in brokers.toml for account:' + f' {acct_number}\n' + f'Please add entry to continue using this API client' + ) - # we_connected.append((host, port, client)) + # surjection of account names to operating clients. + if acct_number not in accounts_found: + accounts_found[entry] = client - # TODO: don't do it this way, get a gud to_asyncio - # context / .start() system goin.. - def pop_and_discon(): - log.info(f'Disconnecting client {client}') - client.ib.disconnect() - _client_cache.pop((host, port), None) + log.info( + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' + ) - # NOTE: the above callback **CAN'T FAIL** or shm won't get - # torn down correctly ... - tractor._actor._lifetime_stack.callback(pop_and_discon) + # update all actor-global caches + log.info(f"Caching client for {sockaddr}") + _client_cache[sockaddr] = client - # XXX: why aren't we just updating this directy above - # instead of using the intermediary `accounts_found`? - _accounts2clients.update(accounts_found) - - except ( - ConnectionRefusedError, - - # TODO: if trying to scan for remote api clients - # pretty sure we need to catch this, though it - # definitely needs a shorter timeout since it hangs - # for like 5s.. - asyncio.exceptions.TimeoutError, - OSError, - ) as ce: - _err = ce - log.warning(f'Failed to connect on {port}') - - # cache logic to avoid rescanning if we already have all - # clients loaded. - _scan_ignore.add(sockaddr) + # XXX: why aren't we just updating this directy above + # instead of using the intermediary `accounts_found`? + _accounts2clients.update(accounts_found) + # if we have no clients after the scan loop then error out. if not _client_cache: raise ConnectionError( 'No ib APIs could be found scanning @:\n' @@ -1083,16 +1070,15 @@ async def load_aio_clients( 'Check your `brokers.toml` and/or network' ) from _err - yield _accounts2clients - - # TODO: this in a way that works xD - # finally: - # pass - # # async with trio.CancelScope(shield=True): - # for host, port, client in we_connected: - # client.ib.disconnect() - # _client_cache.pop((host, port)) - # raise + try: + yield _accounts2clients + finally: + # TODO: for re-scans we'll want to not teardown clients which + # are up and stable right? + for acct, client in _accounts2clients.items(): + log.info(f'Disconnecting {acct}@{client}') + client.ib.disconnect() + _client_cache.pop((host, port)) async def load_clients_for_trio( @@ -1103,36 +1089,56 @@ async def load_clients_for_trio( ''' Pure async mngr proxy to ``load_aio_clients()``. - ''' - async with load_aio_clients() as accts2clients: - to_trio.send_nowait(accts2clients) + This is a bootstrap entrypoing to call from + a ``tractor.to_asyncio.open_channel_from()``. - # TODO: maybe a sync event to wait on instead? + ''' + global _accounts2clients + + if _accounts2clients: + to_trio.send_nowait(_accounts2clients) await asyncio.sleep(float('inf')) + else: + async with load_aio_clients() as accts2clients: + to_trio.send_nowait(accts2clients) + + # TODO: maybe a sync event to wait on instead? + await asyncio.sleep(float('inf')) + + +_proxies: dict[str, MethodProxy] = {} + @acm async def open_client_proxies() -> tuple[ dict[str, MethodProxy], dict[str, Client], ]: - - proxies: dict[str, MethodProxy] = {} - async with ( - tractor.to_asyncio.open_channel_from( - load_clients_for_trio, - ) as (clients, from_aio), + tractor.trionics.maybe_open_context( + # acm_func=open_client_proxies, + acm_func=tractor.to_asyncio.open_channel_from, + kwargs={'target': load_clients_for_trio}, + + # lock around current actor task access + # TODO: maybe this should be the default in tractor? + key=tractor.current_actor().uid, + + ) as (cache_hit, (clients, from_aio)), AsyncExitStack() as stack ): + if cache_hit: + log.info(f'Re-using cached clients: {clients}') + for acct_name, client in clients.items(): proxy = await stack.enter_async_context( open_client_proxy(client), ) - proxies[acct_name] = proxy + _proxies[acct_name] = proxy - yield proxies, clients + yield _proxies, clients def get_preferred_data_client( @@ -1511,10 +1517,15 @@ async def get_bars( for _ in range(10): try: - bars, bars_array = await proxy.bars( + out = await proxy.bars( fqsn=fqsn, end_dt=end_dt, ) + if out: + bars, bars_array = out + + else: + await tractor.breakpoint() if bars_array is None: raise SymbolNotFound(fqsn)