Rejig scan loop for flaky TCP connects, better caching

`ib-gw` seems particularly fragile to connections from clients with the
same id (can result in weird connect hangs and even crashes) and
`ib_insync` doesn't handle intermittent tcp disconnects that
well..(especially on dockerized IBC setups). This adds a bunch of
changes to our client caching and scan loop as well a proper
task-locking-to-cache-proxies so that,

- `asyncio`-side clients aren't double-loaded/connected even when
  explicitly trying to reconnect repeatedly with a given client to work
  around the unreliability of the `asyncio.Transport` design in
  `ib_insync`.
- we can use `tractor.trionics.maybe_open_context()` to lock the `trio`
  side from loading more then one `Client` on the `asyncio` side and
  instead on cache hits only making a new `MethodProxy` around the
  reused `asyncio`-side client (since each `trio` task needs its own
  inter-task msg channel).
- a `finally:` block teardown on all clients loaded in the scan loop
  avoids stale connections.
- the connect params are now exposed as named args to
  `load_aio_clients()` can be easily controlled from caller code.

Oh, and we properly hooked up the internal `ib_insync` logging to our
own internal schema - makes it a lot easier to debug wtf is going on XD
ib_dedicated_data_client
Tyler Goodlet 2022-05-21 10:59:34 -04:00
parent 26f47227d2
commit a5389beccd
1 changed files with 132 additions and 121 deletions

View File

@ -38,8 +38,6 @@ from typing import (
import asyncio import asyncio
from pprint import pformat from pprint import pformat
import inspect import inspect
import logging
from random import randint
import time import time
from types import SimpleNamespace from types import SimpleNamespace
@ -164,13 +162,23 @@ class NonShittyIB(ibis.IB):
- Don't use named tuples - Don't use named tuples
""" """
def __init__(self): def __init__(self):
# override `ib_insync` internal loggers so we can see wtf
# it's doing..
self._logger = get_logger(
'ib_insync.ib',
)
self._createEvents() self._createEvents()
# XXX: just to override this wrapper # XXX: just to override this wrapper
self.wrapper = NonShittyWrapper(self) self.wrapper = NonShittyWrapper(self)
self.client = ib_Client(self.wrapper) self.client = ib_Client(self.wrapper)
self.client._logger = get_logger(
'ib_insync.client',
)
# self.errorEvent += self._onError # self.errorEvent += self._onError
self.client.apiEnd += self.disconnectedEvent self.client.apiEnd += self.disconnectedEvent
self._logger = logging.getLogger('ib_insync.ib')
# map of symbols to contract ids # map of symbols to contract ids
@ -883,9 +891,6 @@ _try_ports = [
_gw_port, _gw_port,
_tws_port _tws_port
] ]
# TODO: remove the randint stuff and use proper error checking in client
# factor below..
_client_ids = itertools.count(randint(1, 100))
_client_cache: dict[tuple[str, int], Client] = {} _client_cache: dict[tuple[str, int], Client] = {}
_scan_ignore: set[tuple[str, int]] = set() _scan_ignore: set[tuple[str, int]] = set()
@ -911,8 +916,12 @@ async def load_aio_clients(
host: str = '127.0.0.1', host: str = '127.0.0.1',
port: int = None, port: int = None,
client_id: int = 6116,
client_id: Optional[int] = None, # the API TCP in `ib_insync` connection can be flaky af so instead
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 0.5,
) -> dict[str, Client]: ) -> dict[str, Client]:
''' '''
@ -949,133 +958,111 @@ async def load_aio_clients(
{ {
'gw': 4002, 'gw': 4002,
'tws': 7497, 'tws': 7497,
# 'order': ['gw', 'tws']
} }
) )
order = ports.pop('order', None) order = ports.pop('order', None)
if order: if order:
log.warning('`ports.order` section in `brokers.toml` is deprecated') log.warning('`ports.order` section in `brokers.toml` is deprecated')
_err = None
accounts_def = config.load_accounts(['ib']) accounts_def = config.load_accounts(['ib'])
try_ports = list(ports.values()) try_ports = list(ports.values())
ports = try_ports if port is None else [port] ports = try_ports if port is None else [port]
# we_connected = []
connect_timeout = 2
combos = list(itertools.product(hosts, ports)) combos = list(itertools.product(hosts, ports))
accounts_found: dict[str, Client] = {}
# allocate new and/or reload disconnected but cached clients
# try:
# TODO: support multiple clients allowing for execution on
# multiple accounts (including a paper instance running on the
# same machine) and switching between accounts in the ems.
_err = None
# (re)load any and all clients that can be found # (re)load any and all clients that can be found
# from connection details in ``brokers.toml``. # from connection details in ``brokers.toml``.
for host, port in combos: for host, port in combos:
sockaddr = (host, port) sockaddr = (host, port)
client = _client_cache.get(sockaddr)
accounts_found: dict[str, Client] = {}
if ( if (
client and client.ib.isConnected() sockaddr in _client_cache
or sockaddr in _scan_ignore or sockaddr in _scan_ignore
): ):
continue continue
try: ib = NonShittyIB()
ib = NonShittyIB()
# XXX: not sure if we ever really need to increment the for i in range(connect_retries):
# client id if teardown is sucessful. try:
client_id = 6116 await ib.connectAsync(
host,
port,
clientId=client_id,
await ib.connectAsync( # this timeout is sensative on windows and will
host, # fail without a good "timeout error" so be
port, # careful.
clientId=client_id, timeout=connect_timeout,
)
break
# this timeout is sensative on windows and will except (
# fail without a good "timeout error" so be ConnectionRefusedError,
# careful.
timeout=connect_timeout,
)
# create and cache client # TODO: if trying to scan for remote api clients
client = Client(ib) # pretty sure we need to catch this, though it
# definitely needs a shorter timeout since it hangs
# for like 5s..
asyncio.exceptions.TimeoutError,
OSError,
) as ce:
_err = ce
# Pre-collect all accounts available for this if i > 8:
# connection and map account names to this client # cache logic to avoid rescanning if we already have all
# instance. # clients loaded.
pps = ib.positions() _scan_ignore.add(sockaddr)
if pps: raise
for pp in pps:
accounts_found[
accounts_def.inverse[pp.account]
] = client
# if there are accounts without positions we should still log.warning(
# register them for this client f'Failed to connect on {port} for {i} time, retrying...')
for value in ib.accountValues():
acct_number = value.account
entry = accounts_def.inverse.get(acct_number) # create and cache client
if not entry: client = Client(ib)
raise ValueError(
'No section in brokers.toml for account:'
f' {acct_number}\n'
f'Please add entry to continue using this API client'
)
# surjection of account names to operating clients. # Pre-collect all accounts available for this
if acct_number not in accounts_found: # connection and map account names to this client
accounts_found[entry] = client # instance.
pps = ib.positions()
if pps:
for pp in pps:
accounts_found[
accounts_def.inverse[pp.account]
] = client
log.info( # if there are accounts without positions we should still
f'Loaded accounts for client @ {host}:{port}\n' # register them for this client
f'{pformat(accounts_found)}' for value in ib.accountValues():
) acct_number = value.account
# update all actor-global caches entry = accounts_def.inverse.get(acct_number)
log.info(f"Caching client for {(host, port)}") if not entry:
_client_cache[(host, port)] = client raise ValueError(
'No section in brokers.toml for account:'
f' {acct_number}\n'
f'Please add entry to continue using this API client'
)
# we_connected.append((host, port, client)) # surjection of account names to operating clients.
if acct_number not in accounts_found:
accounts_found[entry] = client
# TODO: don't do it this way, get a gud to_asyncio log.info(
# context / .start() system goin.. f'Loaded accounts for client @ {host}:{port}\n'
def pop_and_discon(): f'{pformat(accounts_found)}'
log.info(f'Disconnecting client {client}') )
client.ib.disconnect()
_client_cache.pop((host, port), None)
# NOTE: the above callback **CAN'T FAIL** or shm won't get # update all actor-global caches
# torn down correctly ... log.info(f"Caching client for {sockaddr}")
tractor._actor._lifetime_stack.callback(pop_and_discon) _client_cache[sockaddr] = client
# XXX: why aren't we just updating this directy above # XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`? # instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found) _accounts2clients.update(accounts_found)
except (
ConnectionRefusedError,
# TODO: if trying to scan for remote api clients
# pretty sure we need to catch this, though it
# definitely needs a shorter timeout since it hangs
# for like 5s..
asyncio.exceptions.TimeoutError,
OSError,
) as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
# cache logic to avoid rescanning if we already have all
# clients loaded.
_scan_ignore.add(sockaddr)
# if we have no clients after the scan loop then error out.
if not _client_cache: if not _client_cache:
raise ConnectionError( raise ConnectionError(
'No ib APIs could be found scanning @:\n' 'No ib APIs could be found scanning @:\n'
@ -1083,16 +1070,15 @@ async def load_aio_clients(
'Check your `brokers.toml` and/or network' 'Check your `brokers.toml` and/or network'
) from _err ) from _err
yield _accounts2clients try:
yield _accounts2clients
# TODO: this in a way that works xD finally:
# finally: # TODO: for re-scans we'll want to not teardown clients which
# pass # are up and stable right?
# # async with trio.CancelScope(shield=True): for acct, client in _accounts2clients.items():
# for host, port, client in we_connected: log.info(f'Disconnecting {acct}@{client}')
# client.ib.disconnect() client.ib.disconnect()
# _client_cache.pop((host, port)) _client_cache.pop((host, port))
# raise
async def load_clients_for_trio( async def load_clients_for_trio(
@ -1103,36 +1089,56 @@ async def load_clients_for_trio(
''' '''
Pure async mngr proxy to ``load_aio_clients()``. Pure async mngr proxy to ``load_aio_clients()``.
''' This is a bootstrap entrypoing to call from
async with load_aio_clients() as accts2clients: a ``tractor.to_asyncio.open_channel_from()``.
to_trio.send_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead? '''
global _accounts2clients
if _accounts2clients:
to_trio.send_nowait(_accounts2clients)
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
else:
async with load_aio_clients() as accts2clients:
to_trio.send_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead?
await asyncio.sleep(float('inf'))
_proxies: dict[str, MethodProxy] = {}
@acm @acm
async def open_client_proxies() -> tuple[ async def open_client_proxies() -> tuple[
dict[str, MethodProxy], dict[str, MethodProxy],
dict[str, Client], dict[str, Client],
]: ]:
proxies: dict[str, MethodProxy] = {}
async with ( async with (
tractor.to_asyncio.open_channel_from( tractor.trionics.maybe_open_context(
load_clients_for_trio, # acm_func=open_client_proxies,
) as (clients, from_aio), acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio},
# lock around current actor task access
# TODO: maybe this should be the default in tractor?
key=tractor.current_actor().uid,
) as (cache_hit, (clients, from_aio)),
AsyncExitStack() as stack AsyncExitStack() as stack
): ):
if cache_hit:
log.info(f'Re-using cached clients: {clients}')
for acct_name, client in clients.items(): for acct_name, client in clients.items():
proxy = await stack.enter_async_context( proxy = await stack.enter_async_context(
open_client_proxy(client), open_client_proxy(client),
) )
proxies[acct_name] = proxy _proxies[acct_name] = proxy
yield proxies, clients yield _proxies, clients
def get_preferred_data_client( def get_preferred_data_client(
@ -1511,10 +1517,15 @@ async def get_bars(
for _ in range(10): for _ in range(10):
try: try:
bars, bars_array = await proxy.bars( out = await proxy.bars(
fqsn=fqsn, fqsn=fqsn,
end_dt=end_dt, end_dt=end_dt,
) )
if out:
bars, bars_array = out
else:
await tractor.breakpoint()
if bars_array is None: if bars_array is None:
raise SymbolNotFound(fqsn) raise SymbolNotFound(fqsn)